From 86e272e96c222b531467e17fcbf28d3168d6170f Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 27 Nov 2025 18:08:53 +0800 Subject: [PATCH 1/2] feat: upgrade to datafusion 51 --- Cargo.toml | 8 ++++---- src/file_source.rs | 4 ++-- src/physical_exec.rs | 14 +++++++------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dfdc3cfb..ae1a3f7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,15 +32,15 @@ rust-version = "1.73" all-features = true [dependencies] -arrow = { version = "56.0", features = ["prettyprint", "chrono-tz"] } +arrow = { version = "57.0", features = ["prettyprint", "chrono-tz"] } async-trait = { version = "0.1.77" } bytes = "1.4" -datafusion = "50.0" -datafusion-datasource = "50.0" +datafusion = "51.0" +datafusion-datasource = "51.0" futures = { version = "0.3", default-features = false, features = ["std"] } futures-util = { version = "0.3" } object_store = { version = "0.12" } -orc-rust = { version = "0.6.3", features = ["async"] } +orc-rust = { version = "0.7", features = ["async"] } tokio = { version = "1.28", features = [ "io-util", "sync", diff --git a/src/file_source.rs b/src/file_source.rs index a72b3749..521962f9 100644 --- a/src/file_source.rs +++ b/src/file_source.rs @@ -16,10 +16,10 @@ // under the License. use crate::physical_exec::OrcOpener; -use arrow::datatypes::SchemaRef; use datafusion::common::Statistics; use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_datasource::TableSchema; use object_store::ObjectStore; use std::any::Any; use std::sync::Arc; @@ -62,7 +62,7 @@ impl FileSource for OrcSource { }) } - fn with_schema(&self, _schema: SchemaRef) -> Arc { + fn with_schema(&self, _schema: TableSchema) -> Arc { Arc::new(self.clone()) } diff --git a/src/physical_exec.rs b/src/physical_exec.rs index b04e6b1b..34126080 100644 --- a/src/physical_exec.rs +++ b/src/physical_exec.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use arrow::error::ArrowError; use datafusion::arrow::datatypes::SchemaRef; -use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener, FileScanConfig}; +use datafusion::datasource::physical_plan::{FileOpenFuture, FileOpener, FileScanConfig}; use datafusion::error::Result; use datafusion_datasource::PartitionedFile; use orc_rust::projection::ProjectionMask; @@ -45,21 +45,21 @@ impl OrcOpener { ) -> Self { let projection = config .file_column_projection_indices() - .unwrap_or_else(|| (0..config.file_schema.fields().len()).collect()); + .unwrap_or_else(|| (0..config.file_schema().fields().len()).collect()); Self { projection, batch_size: config.batch_size.unwrap_or(batch_size), - table_schema: config.file_schema.clone(), + table_schema: config.file_schema().clone(), object_store, } } } impl FileOpener for OrcOpener { - fn open(&self, file_meta: FileMeta, _: PartitionedFile) -> Result { - let reader = - ObjectStoreReader::new(self.object_store.clone(), file_meta.object_meta.clone()); + fn open(&self, file: PartitionedFile) -> Result { + let object_meta = &file.object_meta; + let reader = ObjectStoreReader::new(self.object_store.clone(), object_meta.clone()); let batch_size = self.batch_size; let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?); @@ -78,7 +78,7 @@ impl FileOpener for OrcOpener { } let projection_mask = ProjectionMask::roots(builder.file_metadata().root_data_type(), projection); - if let Some(range) = file_meta.range.clone() { + if let Some(range) = file.range.clone() { let range = range.start as usize..range.end as usize; builder = builder.with_file_byte_range(range); } From ca69bb65bb64ac53923c0135e2967fce90c9466c Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 27 Nov 2025 18:24:06 +0800 Subject: [PATCH 2/2] refactor: use datafusion reexported arrow --- Cargo.toml | 1 - src/file_format.rs | 3 +-- src/physical_exec.rs | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ae1a3f7d..17dc4199 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,6 @@ rust-version = "1.73" all-features = true [dependencies] -arrow = { version = "57.0", features = ["prettyprint", "chrono-tz"] } async-trait = { version = "0.1.77" } bytes = "1.4" datafusion = "51.0" diff --git a/src/file_format.rs b/src/file_format.rs index 89ad595a..c06f6c99 100644 --- a/src/file_format.rs +++ b/src/file_format.rs @@ -20,8 +20,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; -use arrow::datatypes::Schema; -use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; use datafusion::common::Statistics; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::file_format::FileFormat; diff --git a/src/physical_exec.rs b/src/physical_exec.rs index 34126080..3be77d6d 100644 --- a/src/physical_exec.rs +++ b/src/physical_exec.rs @@ -17,8 +17,8 @@ use std::sync::Arc; -use arrow::error::ArrowError; use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::error::ArrowError; use datafusion::datasource::physical_plan::{FileOpenFuture, FileOpener, FileScanConfig}; use datafusion::error::Result; use datafusion_datasource::PartitionedFile;