diff --git a/Cargo.toml b/Cargo.toml index dfdc3cfb..17dc4199 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,15 +32,14 @@ rust-version = "1.73" all-features = true [dependencies] -arrow = { version = "56.0", features = ["prettyprint", "chrono-tz"] } async-trait = { version = "0.1.77" } bytes = "1.4" -datafusion = "50.0" -datafusion-datasource = "50.0" +datafusion = "51.0" +datafusion-datasource = "51.0" futures = { version = "0.3", default-features = false, features = ["std"] } futures-util = { version = "0.3" } object_store = { version = "0.12" } -orc-rust = { version = "0.6.3", features = ["async"] } +orc-rust = { version = "0.7", features = ["async"] } tokio = { version = "1.28", features = [ "io-util", "sync", diff --git a/src/file_format.rs b/src/file_format.rs index 89ad595a..c06f6c99 100644 --- a/src/file_format.rs +++ b/src/file_format.rs @@ -20,8 +20,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; -use arrow::datatypes::Schema; -use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; use datafusion::common::Statistics; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::file_format::FileFormat; diff --git a/src/file_source.rs b/src/file_source.rs index a72b3749..521962f9 100644 --- a/src/file_source.rs +++ b/src/file_source.rs @@ -16,10 +16,10 @@ // under the License. use crate::physical_exec::OrcOpener; -use arrow::datatypes::SchemaRef; use datafusion::common::Statistics; use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_datasource::TableSchema; use object_store::ObjectStore; use std::any::Any; use std::sync::Arc; @@ -62,7 +62,7 @@ impl FileSource for OrcSource { }) } - fn with_schema(&self, _schema: SchemaRef) -> Arc { + fn with_schema(&self, _schema: TableSchema) -> Arc { Arc::new(self.clone()) } diff --git a/src/physical_exec.rs b/src/physical_exec.rs index b04e6b1b..3be77d6d 100644 --- a/src/physical_exec.rs +++ b/src/physical_exec.rs @@ -17,9 +17,9 @@ use std::sync::Arc; -use arrow::error::ArrowError; use datafusion::arrow::datatypes::SchemaRef; -use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener, FileScanConfig}; +use datafusion::arrow::error::ArrowError; +use datafusion::datasource::physical_plan::{FileOpenFuture, FileOpener, FileScanConfig}; use datafusion::error::Result; use datafusion_datasource::PartitionedFile; use orc_rust::projection::ProjectionMask; @@ -45,21 +45,21 @@ impl OrcOpener { ) -> Self { let projection = config .file_column_projection_indices() - .unwrap_or_else(|| (0..config.file_schema.fields().len()).collect()); + .unwrap_or_else(|| (0..config.file_schema().fields().len()).collect()); Self { projection, batch_size: config.batch_size.unwrap_or(batch_size), - table_schema: config.file_schema.clone(), + table_schema: config.file_schema().clone(), object_store, } } } impl FileOpener for OrcOpener { - fn open(&self, file_meta: FileMeta, _: PartitionedFile) -> Result { - let reader = - ObjectStoreReader::new(self.object_store.clone(), file_meta.object_meta.clone()); + fn open(&self, file: PartitionedFile) -> Result { + let object_meta = &file.object_meta; + let reader = ObjectStoreReader::new(self.object_store.clone(), object_meta.clone()); let batch_size = self.batch_size; let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?); @@ -78,7 +78,7 @@ impl FileOpener for OrcOpener { } let projection_mask = ProjectionMask::roots(builder.file_metadata().root_data_type(), projection); - if let Some(range) = file_meta.range.clone() { + if let Some(range) = file.range.clone() { let range = range.start as usize..range.end as usize; builder = builder.with_file_byte_range(range); }