Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,541 changes: 1,068 additions & 473 deletions Cargo.lock

Large diffs are not rendered by default.

69 changes: 39 additions & 30 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,18 @@ anyerror = { version = "=0.1.13" }
anyhow = { version = "1.0.65" }
apache-avro = { version = "0.17.0", features = ["snappy", "zstandard", "xz", "snappy", "bzip"] }
approx = "0.5.1"
arrow = { version = "56" }
arrow-array = { version = "56" }
arrow-buffer = { version = "56" }
arrow-cast = { version = "56", features = ["prettyprint"] }
arrow-csv = { version = "56" }
arrow-data = { version = "56" }
arrow-flight = { version = "56", features = ["flight-sql-experimental", "tls-ring"] }
arrow-ipc = { version = "56", features = ["lz4", "zstd"] }
arrow-json = { version = "56" }
arrow-ord = { version = "56" }
arrow-schema = { version = "56", features = ["serde"] }
arrow-select = { version = "56" }
arrow = { version = "58.1.0" }
arrow-array = { version = "58.1.0" }
arrow-buffer = { version = "58.1.0" }
arrow-cast = { version = "58.1.0", features = ["prettyprint"] }
arrow-csv = { version = "58.1.0" }
arrow-data = { version = "58.1.0" }
arrow-flight = { version = "58.1.0", features = ["flight-sql-experimental", "tls-ring"] }
arrow-ipc = { version = "58.1.0", features = ["lz4", "zstd"] }
arrow-json = { version = "58.1.0" }
arrow-ord = { version = "58.1.0" }
arrow-schema = { version = "58.1.0", features = ["serde"] }
arrow-select = { version = "58.1.0" }
arrow-udf-runtime = { version = "0.8.0", default-features = false, features = ["javascript", "wasm"] }
async-backtrace = "0.2"
async-channel = "2.3.1"
Expand Down Expand Up @@ -298,13 +298,13 @@ proc-macro2 = "1.0"
quote = "1.0"

## in branch dev
iceberg = { version = "0.8.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6ccaa60e", features = [
iceberg = { version = "0.8.0", git = "https://github.com/SkyFan2002/iceberg-rust.git", rev = "edb4e4f8158821d274850d19d2c7d0030e16e142", features = [
"storage-all",
] }
iceberg-catalog-glue = { version = "0.8.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6ccaa60e" }
iceberg-catalog-hms = { version = "0.8.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6ccaa60e" }
iceberg-catalog-rest = { version = "0.8.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6ccaa60e" }
iceberg-catalog-s3tables = { version = "0.8.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "6ccaa60e" }
iceberg-catalog-glue = { version = "0.8.0", git = "https://github.com/SkyFan2002/iceberg-rust.git", rev = "edb4e4f8158821d274850d19d2c7d0030e16e142" }
iceberg-catalog-hms = { version = "0.8.0", git = "https://github.com/SkyFan2002/iceberg-rust.git", rev = "edb4e4f8158821d274850d19d2c7d0030e16e142" }
iceberg-catalog-rest = { version = "0.8.0", git = "https://github.com/SkyFan2002/iceberg-rust.git", rev = "edb4e4f8158821d274850d19d2c7d0030e16e142" }
iceberg-catalog-s3tables = { version = "0.8.0", git = "https://github.com/SkyFan2002/iceberg-rust.git", rev = "edb4e4f8158821d274850d19d2c7d0030e16e142" }

# Explicitly specify compatible AWS SDK versions
aws-config = "1.5.18"
Expand All @@ -324,11 +324,11 @@ jaq-std = "2.1.2"
jiff = { version = "0.2.16", features = ["serde", "tzdb-bundle-always"] }
jsonb = "0.5.6"
jwt-simple = { version = "0.12.12", default-features = false, features = ["pure-rust"] }
lance-core = "1.0.4"
lance-encoding = { version = "1.0.4", features = ["protoc"] }
lance-file = "1.0.4"
lance-io = { version = "1.0.4", default-features = false }
lance-table = "1.0.4"
lance-core = "5.1.0-beta.3"
lance-encoding = { version = "5.1.0-beta.3", features = ["protoc"] }
lance-file = "5.1.0-beta.3"
lance-io = { version = "5.1.0-beta.3", default-features = false }
lance-table = "5.1.0-beta.3"
lenient_semver = "0.4.2"
levenshtein_automata = "0.2.1"
lexical-core = "1"
Expand Down Expand Up @@ -389,7 +389,7 @@ orc-rust = "0.6.0"
ordered-float = { version = "5.1.0", default-features = false }
p256 = "0.13"
parking_lot = "0.12.1"
parquet = { version = "56", features = ["async"] }
parquet = { version = "58.1.0", features = ["async"] }
passwords = { version = "3.1.16" }
paste = "1.0.15"
percent-encoding = "2.3.1"
Expand All @@ -406,8 +406,8 @@ pretty_assertions = "1.3.0"
procfs = { version = "0.17.0" }
proj4rs = { version = "0.1.10", features = ["geo-types", "crs-definitions"] }
proptest = { version = "1", default-features = false, features = ["std"] }
prost = { version = "0.13" }
prost-build = { version = "0.13" }
prost = { version = "0.14.3" }
prost-build = { version = "0.14.3" }
prqlc = "0.11.3"
rand = { version = "0.8.5", features = ["small_rng", "serde1"] }
rand_distr = "0.4.3"
Expand Down Expand Up @@ -489,9 +489,11 @@ tokio = { version = "1.35.0", features = ["full"] }
tokio-stream = { version = "0.1.11", features = ["net"] }
tokio-util = { version = "0.7.13" }
toml = { version = "0.8", features = ["parse"] }
tonic = { version = "0.13", features = ["transport", "codegen", "tls-native-roots"] }
tonic-build = { version = "0.13" }
tonic-reflection = { version = "0.13" }
tonic = { version = "0.14.5", features = ["transport", "codegen", "tls-native-roots"] }
tonic-build = { version = "0.14.5" }
tonic-prost = { version = "0.14.5" }
tonic-prost-build = { version = "0.14.5" }
tonic-reflection = { version = "0.14.5" }
tower = { version = "0.5.1", features = ["util"] }
tower-service = "0.3.3"
twox-hash = "1.6.3"
Expand Down Expand Up @@ -616,7 +618,7 @@ overflow-checks = true
rpath = true

[patch.crates-io]
arrow-udf-runtime = { git = "https://github.com/datafuse-extras/arrow-udf.git", rev = "2480dccf1" }
arrow-udf-runtime = { git = "https://github.com/SkyFan2002/arrow-udf.git", rev = "ec0c74d1bb53d68243c2bb23ebaa9392223e094c" }
async-backtrace = { git = "https://github.com/datafuse-extras/async-backtrace.git", rev = "dea4553" }
async-recursion = { git = "https://github.com/datafuse-extras/async-recursion.git", rev = "a353334" }
backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "72265be" }
Expand All @@ -630,7 +632,14 @@ deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "9954bff" }
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.4.2" }
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.17" }
openraft-rt = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.17" }
orc-rust = { git = "https://github.com/datafuse-extras/orc-rust", rev = "fc812ad7010" }
# Keep ORC on a local patch while the upstream crate has not caught up with Arrow 58 yet.
lance-arrow = { git = "https://github.com/SkyFan2002/lance.git", rev = "983238285f482cc6d229483d15d6dde8f3bc3e96" }
lance-core = { git = "https://github.com/SkyFan2002/lance.git", rev = "983238285f482cc6d229483d15d6dde8f3bc3e96" }
lance-encoding = { git = "https://github.com/SkyFan2002/lance.git", rev = "983238285f482cc6d229483d15d6dde8f3bc3e96" }
lance-file = { git = "https://github.com/SkyFan2002/lance.git", rev = "983238285f482cc6d229483d15d6dde8f3bc3e96" }
lance-io = { git = "https://github.com/SkyFan2002/lance.git", rev = "983238285f482cc6d229483d15d6dde8f3bc3e96" }
lance-table = { git = "https://github.com/SkyFan2002/lance.git", rev = "983238285f482cc6d229483d15d6dde8f3bc3e96" }
orc-rust = { git = "https://github.com/SkyFan2002/orc-rust.git", rev = "3fba34ccde628b4063d7f892d15866f89236064c" }
recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "16e433a" }
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" }
state-machine-api = { git = "https://github.com/databendlabs/state-machine-api.git", tag = "v0.3.4" }
Expand Down
4 changes: 2 additions & 2 deletions src/bendpy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ publish = { workspace = true }
edition = { workspace = true }

[build-dependencies]
pyo3-build-config = "0.25"
pyo3-build-config = "0.28"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
Expand All @@ -32,7 +32,7 @@ databend-query = { workspace = true, features = [
"simd",
"disable_initial_exec_tls",
] }
pyo3 = { version = "0.25", features = ["generate-import-lib", "abi3-py312"] }
pyo3 = { version = "0.28", features = ["generate-import-lib", "abi3-py312"] }
serde_json = { workspace = true }
sysinfo = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync"] }
Expand Down
7 changes: 6 additions & 1 deletion src/bendpy/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ fn build_position_select(col_names: &[String]) -> PyResult<String> {
.join(", "))
}

#[pyclass(name = "SessionContext", module = "databend", subclass)]
#[pyclass(
name = "SessionContext",
module = "databend",
subclass,
skip_from_py_object
)]
#[derive(Clone)]
pub(crate) struct PySessionContext {
pub(crate) session: Arc<Session>,
Expand Down
42 changes: 15 additions & 27 deletions src/bendpy/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ use databend_query::interpreters::InterpreterFactory;
use databend_query::sessions::QueryContext;
use databend_query::sql::plans::Plan;
use pyo3::prelude::*;
use pyo3::types::PyTuple;
use pyo3::types::PyAny;
use tokio_stream::StreamExt;

use crate::datablock::PyDataBlocks;
use crate::schema::PySchema;
use crate::utils::wait_for_future;

#[pyclass(name = "BoxSize", module = "databend", subclass)]
#[pyclass(name = "BoxSize", module = "databend", subclass, skip_from_py_object)]
#[derive(Clone, Debug)]
pub(crate) struct PyBoxSize {
pub(crate) bs_max_display_rows: usize,
pub(crate) bs_max_width: usize,
pub(crate) bs_max_col_width: usize,
}

#[pyclass(name = "DataFrame", module = "databend", subclass)]
#[pyclass(name = "DataFrame", module = "databend", subclass, skip_from_py_object)]
#[derive(Clone)]
pub(crate) struct PyDataFrame {
ctx: Arc<QueryContext>,
Expand Down Expand Up @@ -119,7 +119,7 @@ impl PyDataFrame {
}
}

pub fn to_py_arrow(&self, py: Python) -> PyResult<Vec<PyObject>> {
pub fn to_py_arrow(&self, py: Python) -> PyResult<Vec<Py<PyAny>>> {
let blocks = wait_for_future(py, self.df_collect());
let blocks = blocks.map_err(|err| {
pyo3::exceptions::PyRuntimeError::new_err(format!("DataFrame collect error: {:?}", err))
Expand All @@ -132,50 +132,38 @@ impl PyDataFrame {
.to_record_batch_with_dataschema(self.df.schema().as_ref())
.unwrap()
.to_pyarrow(py)
.map(Bound::unbind)
})
.collect()
}

/// Convert to Arrow Table
/// Collect the batches and pass to Arrow Table
pub fn to_arrow_table(&self, py: Python) -> PyResult<PyObject> {
pub fn to_arrow_table(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
let batches = self.to_py_arrow(py)?.into_pyobject(py)?;
let schema = ArrowSchema::from(self.df.schema().as_ref());
let schema = PyArrowType(schema);
let schema = schema.into_pyobject(py)?;

Python::with_gil(|py| {
// Instantiate pyarrow Table object and use its from_batches method
let table_class = py.import("pyarrow")?.getattr("Table")?;
let args = PyTuple::new(py, &[batches, schema])?;
let table: PyObject = table_class.call_method1("from_batches", args)?.into();
Ok(table)
})
let table_class = py.import("pyarrow")?.getattr("Table")?;
Ok(table_class
.call_method1("from_batches", (batches, schema))?
.unbind())
}

/// Convert to pandas dataframe with pyarrow
/// Collect the batches, pass to Arrow Table & then convert to Pandas DataFrame
fn to_pandas(&self, py: Python) -> PyResult<PyObject> {
fn to_pandas(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
let table = self.to_arrow_table(py)?;

Python::with_gil(|py| {
// See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas
let result = table.call_method0(py, "to_pandas")?;
Ok(result)
})
table.call_method0(py, "to_pandas")
}

/// Convert to polars dataframe with pyarrow
/// Collect the batches, pass to Arrow Table & then convert to polars DataFrame
fn to_polars(&self, py: Python) -> PyResult<PyObject> {
fn to_polars(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
let table = self.to_arrow_table(py)?;

Python::with_gil(|py| {
let dataframe = py.import("polars")?.getattr("DataFrame")?;
let args = PyTuple::new(py, &[table])?;
let result: PyObject = dataframe.call1(args)?.into();
Ok(result)
})
let dataframe = py.import("polars")?.getattr("DataFrame")?;
Ok(dataframe.call1((table,))?.unbind())
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/bendpy/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ where
F: Future + Send,
F::Output: Send,
{
py.allow_threads(|| RUNTIME.block_on(f))
py.detach(|| RUNTIME.block_on(f))
}
6 changes: 5 additions & 1 deletion src/common/cloud_control/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ hyper-util = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
tonic = { workspace = true }
tonic-prost = { workspace = true }

[build-dependencies]
lenient_semver = { workspace = true }
prost-build = { workspace = true }
semver = { workspace = true }
tonic-build = { workspace = true }
tonic-prost-build = { workspace = true }

[dev-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true }
tower = { workspace = true }

[package.metadata.cargo-machete]
ignored = ["tonic-prost"]

[lints]
workspace = true
2 changes: 1 addition & 1 deletion src/common/cloud_control/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,5 @@ fn build_proto() -> Result<()> {
config.protoc_arg("--experimental_allow_proto3_optional");
}

tonic_build::configure().compile_protos_with_config(config, &proto_defs, &[proto_path])
tonic_prost_build::configure().compile_with_config(config, &proto_defs, &[proto_path])
}
5 changes: 3 additions & 2 deletions src/common/cloud_control/src/task_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::fmt::Display;
use std::fmt::Formatter;

use chrono::DateTime;
use chrono::FixedOffset;
use chrono::Utc;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down Expand Up @@ -165,7 +166,7 @@ impl TryFrom<crate::pb::Task> for Task {
value.next_scheduled_at
))
})
.map(|d| d.with_timezone(&Utc))
.map(|d: DateTime<FixedOffset>| d.with_timezone(&Utc))
})
.transpose()?;

Expand All @@ -180,7 +181,7 @@ impl TryFrom<crate::pb::Task> for Task {
value.last_suspended_at
))
})
.map(|d| d.with_timezone(&Utc))
.map(|d: DateTime<FixedOffset>| d.with_timezone(&Utc))
})
.transpose()?;
let schedule = match value.schedule_options {
Expand Down
8 changes: 4 additions & 4 deletions src/common/column/src/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ impl From<Utf8ViewColumn> for ArrayData {
column
.buffers
.iter()
.map(|x| x.clone().into())
.map(|buffer| arrow_buffer::Buffer::from(buffer.clone()))
.collect::<Vec<_>>(),
);
unsafe { builder.build_unchecked() }
Expand All @@ -674,7 +674,7 @@ impl From<BinaryViewColumn> for ArrayData {
column
.buffers
.iter()
.map(|x| x.clone().into())
.map(|buffer| arrow_buffer::Buffer::from(buffer.clone()))
.collect::<Vec<_>>(),
);
unsafe { builder.build_unchecked() }
Expand All @@ -686,8 +686,8 @@ impl From<ArrayData> for Utf8ViewColumn {
let views = data.buffers()[0].clone();
let buffers = data.buffers()[1..]
.iter()
.map(|x| x.clone().into())
.collect();
.map(|buffer| crate::buffer::Buffer::from(buffer.clone()))
.collect::<Arc<[crate::buffer::Buffer<u8>]>>();

unsafe { Utf8ViewColumn::new_unchecked_unknown_md(views.into(), buffers, None) }
}
Expand Down
Loading
Loading