diff --git a/Cargo.toml b/Cargo.toml index 31829b30..29765198 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,8 +34,7 @@ all-features = true [dependencies] async-trait = { version = "0.1.77" } bytes = "1.4" -datafusion = "51.0" -datafusion-datasource = "51.0" +datafusion = "52.0" futures = { version = "0.3", default-features = false, features = ["std"] } futures-util = { version = "0.3" } object_store = { version = "0.12" } diff --git a/src/file_format.rs b/src/file_format.rs index c06f6c99..8ecaecf1 100644 --- a/src/file_format.rs +++ b/src/file_format.rs @@ -25,6 +25,7 @@ use datafusion::common::Statistics; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::physical_plan::{FileScanConfig, FileSource}; +use datafusion::datasource::table_schema::TableSchema; use datafusion::error::{DataFusionError, Result}; use datafusion::physical_plan::ExecutionPlan; use futures::TryStreamExt; @@ -122,7 +123,7 @@ impl FileFormat for OrcFormat { Ok(DataSourceExec::from_data_source(conf)) } - fn file_source(&self) -> Arc { - Arc::new(OrcSource::default()) + fn file_source(&self, table_schema: TableSchema) -> Arc { + Arc::new(OrcSource::new(table_schema)) } } diff --git a/src/file_source.rs b/src/file_source.rs index 521962f9..023a1e82 100644 --- a/src/file_source.rs +++ b/src/file_source.rs @@ -16,10 +16,11 @@ // under the License. use crate::physical_exec::OrcOpener; -use datafusion::common::Statistics; +use datafusion::common::DataFusionError; use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource}; +use datafusion::datasource::table_schema::TableSchema; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; -use datafusion_datasource::TableSchema; +use datafusion::physical_plan::projection::ProjectionExprs; use object_store::ObjectStore; use std::any::Any; use std::sync::Arc; @@ -27,16 +28,23 @@ use std::sync::Arc; #[derive(Debug, Clone)] pub struct OrcSource { metrics: ExecutionPlanMetricsSet, - statistics: Statistics, batch_size: usize, + table_schema: TableSchema, + projection: ProjectionExprs, } -impl Default for OrcSource { - fn default() -> Self { +impl OrcSource { + pub fn new(table_schema: TableSchema) -> Self { + let table_schema_ref = table_schema.table_schema(); + let projection = ProjectionExprs::from_indices( + &(0..table_schema_ref.fields().len()).collect::>(), + table_schema_ref, + ); Self { metrics: ExecutionPlanMetricsSet::default(), - statistics: Statistics::default(), batch_size: 1024, + table_schema, + projection, } } } @@ -47,14 +55,24 @@ impl FileSource for OrcSource { object_store: Arc, config: &FileScanConfig, _partition: usize, - ) -> Arc { - Arc::new(OrcOpener::new(object_store, config, self.batch_size)) + ) -> Result, DataFusionError> { + OrcOpener::try_new( + object_store, + self.table_schema.table_schema().clone(), + config.batch_size.unwrap_or(self.batch_size), + self.projection.clone(), + ) + .map(|f| Arc::new(f) as Arc) } fn as_any(&self) -> &dyn Any { self } + fn table_schema(&self) -> &TableSchema { + &self.table_schema + } + fn with_batch_size(&self, batch_size: usize) -> Arc { Arc::new(Self { batch_size, @@ -62,30 +80,24 @@ impl FileSource for OrcSource { }) } - fn with_schema(&self, _schema: TableSchema) -> Arc { - Arc::new(self.clone()) - } - - fn with_projection(&self, _config: &FileScanConfig) -> Arc { - Arc::new(self.clone()) - } - - fn with_statistics(&self, statistics: Statistics) -> Arc { - Arc::new(Self { - statistics, - ..self.clone() - }) + fn projection(&self) -> Option<&ProjectionExprs> { + Some(&self.projection) } fn metrics(&self) -> &ExecutionPlanMetricsSet { &self.metrics } - fn statistics(&self) -> datafusion::common::Result { - Ok(self.statistics.clone()) - } - fn file_type(&self) -> &str { "orc" } + + fn try_pushdown_projection( + &self, + projection: &ProjectionExprs, + ) -> Result>, DataFusionError> { + let mut source = self.clone(); + source.projection = self.projection.try_merge(projection)?; + Ok(Some(Arc::new(source))) + } } diff --git a/src/physical_exec.rs b/src/physical_exec.rs index 3be77d6d..53c1887f 100644 --- a/src/physical_exec.rs +++ b/src/physical_exec.rs @@ -19,9 +19,10 @@ use std::sync::Arc; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::error::ArrowError; -use datafusion::datasource::physical_plan::{FileOpenFuture, FileOpener, FileScanConfig}; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::{FileOpenFuture, FileOpener}; use datafusion::error::Result; -use datafusion_datasource::PartitionedFile; +use datafusion::physical_plan::projection::ProjectionExprs; use orc_rust::projection::ProjectionMask; use orc_rust::ArrowReaderBuilder; @@ -31,28 +32,25 @@ use object_store::ObjectStore; use super::object_store_reader::ObjectStoreReader; pub(crate) struct OrcOpener { - projection: Vec, + projection: ProjectionExprs, batch_size: usize, table_schema: SchemaRef, object_store: Arc, } impl OrcOpener { - pub(crate) fn new( + pub(crate) fn try_new( object_store: Arc, - config: &FileScanConfig, + table_schema: SchemaRef, batch_size: usize, - ) -> Self { - let projection = config - .file_column_projection_indices() - .unwrap_or_else(|| (0..config.file_schema().fields().len()).collect()); - - Self { + projection: ProjectionExprs, + ) -> Result { + Ok(Self { projection, - batch_size: config.batch_size.unwrap_or(batch_size), - table_schema: config.file_schema().clone(), + batch_size, + table_schema, object_store, - } + }) } } @@ -61,7 +59,7 @@ impl FileOpener for OrcOpener { let object_meta = &file.object_meta; let reader = ObjectStoreReader::new(self.object_store.clone(), object_meta.clone()); let batch_size = self.batch_size; - let projected_schema = SchemaRef::from(self.table_schema.project(&self.projection)?); + let projected_schema = self.projection.project_schema(&self.table_schema)?; Ok(Box::pin(async move { let mut builder = ArrowReaderBuilder::try_new_async(reader)