From 2971b041ce1c94e315ac5b327589da6d61f449c0 Mon Sep 17 00:00:00 2001 From: luofucong Date: Sun, 28 Sep 2025 19:28:36 +0800 Subject: [PATCH] update datafusion --- Cargo.toml | 20 ++---- src/file_format.rs | 33 ++++----- src/file_source.rs | 91 ++++++++++++++++++++++++ src/lib.rs | 12 ++-- src/object_store_reader.rs | 10 +-- src/physical_exec.rs | 137 +++++++------------------------------ 6 files changed, 148 insertions(+), 155 deletions(-) create mode 100644 src/file_source.rs diff --git a/Cargo.toml b/Cargo.toml index 5b6acf0c..50666e66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,16 +32,15 @@ rust-version = "1.73" all-features = true [dependencies] -arrow = { version = "53", features = ["prettyprint", "chrono-tz"] } +arrow = { version = "56.0", features = ["prettyprint", "chrono-tz"] } async-trait = { version = "0.1.77" } bytes = "1.4" -datafusion = { version = "42.0" } -datafusion-expr = { version = "42.0" } -datafusion-physical-expr = { version = "42.0" } +datafusion = "50.0" +datafusion-datasource = "50.0" futures = { version = "0.3", default-features = false, features = ["std"] } futures-util = { version = "0.3" } -object_store = { version = "0.11" } -orc-rust = { version = "0.5", features = ["async"] } +object_store = { version = "0.12" } +orc-rust = { version = "0.6", features = ["async"] } tokio = { version = "1.28", features = [ "io-util", "sync", @@ -51,15 +50,6 @@ tokio = { version = "1.28", features = [ "rt-multi-thread", ] } -[dev-dependencies] -arrow-ipc = { version = "53.0.0", features = ["lz4"] } -arrow-json = "53.0.0" -criterion = { version = "0.5", default-features = false, features = ["async_tokio"] } -opendal = { version = "0.48", default-features = false, features = ["services-memory"] } -pretty_assertions = "1.3.0" -proptest = "1.0.0" -serde_json = { version = "1.0", default-features = false, features = ["std"] } - [[example]] name = "datafusion_integration" # Some issue when publishing and path isn't specified, so adding here diff --git a/src/file_format.rs b/src/file_format.rs index 8cacb70d..89ad595a 100644 --- a/src/file_format.rs +++ b/src/file_format.rs @@ -25,21 +25,21 @@ use datafusion::arrow::datatypes::SchemaRef; use datafusion::common::Statistics; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::file_format::FileFormat; -use datafusion::datasource::physical_plan::FileScanConfig; +use datafusion::datasource::physical_plan::{FileScanConfig, FileSource}; use datafusion::error::{DataFusionError, Result}; -use datafusion::execution::context::SessionState; use datafusion::physical_plan::ExecutionPlan; -use datafusion_physical_expr::PhysicalExpr; use futures::TryStreamExt; use orc_rust::reader::metadata::read_metadata_async; +use crate::OrcSource; use async_trait::async_trait; +use datafusion::catalog::Session; +use datafusion::datasource::source::DataSourceExec; use futures_util::StreamExt; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; use super::object_store_reader::ObjectStoreReader; -use super::physical_exec::OrcExec; async fn fetch_schema(store: &Arc, file: &ObjectMeta) -> Result<(Path, Schema)> { let loc_path = file.location.clone(); @@ -54,13 +54,7 @@ async fn fetch_schema(store: &Arc, file: &ObjectMeta) -> Result } #[derive(Clone, Debug)] -pub struct OrcFormat {} - -impl OrcFormat { - pub fn new() -> Self { - Self {} - } -} +pub struct OrcFormat; #[async_trait] impl FileFormat for OrcFormat { @@ -76,9 +70,13 @@ impl FileFormat for OrcFormat { Ok("orc".to_string()) } + fn compression_type(&self) -> Option { + None + } + async fn infer_schema( &self, - state: &SessionState, + state: &dyn Session, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -109,7 +107,7 @@ impl FileFormat for OrcFormat { async fn infer_stats( &self, - _state: &SessionState, + _state: &dyn Session, _store: &Arc, table_schema: SchemaRef, _object: &ObjectMeta, @@ -119,10 +117,13 @@ impl FileFormat for OrcFormat { async fn create_physical_plan( &self, - _state: &SessionState, + _state: &dyn Session, conf: FileScanConfig, - _filters: Option<&Arc>, ) -> Result> { - Ok(Arc::new(OrcExec::new(conf))) + Ok(DataSourceExec::from_data_source(conf)) + } + + fn file_source(&self) -> Arc { + Arc::new(OrcSource::default()) } } diff --git a/src/file_source.rs b/src/file_source.rs new file mode 100644 index 00000000..a72b3749 --- /dev/null +++ b/src/file_source.rs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::physical_exec::OrcOpener; +use arrow::datatypes::SchemaRef; +use datafusion::common::Statistics; +use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource}; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use object_store::ObjectStore; +use std::any::Any; +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub struct OrcSource { + metrics: ExecutionPlanMetricsSet, + statistics: Statistics, + batch_size: usize, +} + +impl Default for OrcSource { + fn default() -> Self { + Self { + metrics: ExecutionPlanMetricsSet::default(), + statistics: Statistics::default(), + batch_size: 1024, + } + } +} + +impl FileSource for OrcSource { + fn create_file_opener( + &self, + object_store: Arc, + config: &FileScanConfig, + _partition: usize, + ) -> Arc { + Arc::new(OrcOpener::new(object_store, config, self.batch_size)) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn with_batch_size(&self, batch_size: usize) -> Arc { + Arc::new(Self { + batch_size, + ..self.clone() + }) + } + + fn with_schema(&self, _schema: SchemaRef) -> Arc { + Arc::new(self.clone()) + } + + fn with_projection(&self, _config: &FileScanConfig) -> Arc { + Arc::new(self.clone()) + } + + fn with_statistics(&self, statistics: Statistics) -> Arc { + Arc::new(Self { + statistics, + ..self.clone() + }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn statistics(&self) -> datafusion::common::Result { + Ok(self.statistics.clone()) + } + + fn file_type(&self) -> &str { + "orc" + } +} diff --git a/src/lib.rs b/src/lib.rs index faf9333b..53cef42e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,12 +58,14 @@ use datafusion::execution::options::ReadOptions; use async_trait::async_trait; -use self::file_format::OrcFormat; - mod file_format; +mod file_source; mod object_store_reader; mod physical_exec; +pub use file_format::OrcFormat; +pub use file_source::OrcSource; + /// Configuration options for reading ORC files. #[derive(Clone)] pub struct OrcReadOptions<'a> { @@ -85,8 +87,7 @@ impl ReadOptions<'_> for OrcReadOptions<'_> { _config: &SessionConfig, _table_options: TableOptions, ) -> ListingOptions { - let file_format = OrcFormat::new(); - ListingOptions::new(Arc::new(file_format)).with_file_extension(self.file_extension) + ListingOptions::new(Arc::new(OrcFormat)).with_file_extension(self.file_extension) } async fn get_resolved_schema( @@ -126,8 +127,7 @@ impl SessionContextOrcExt for SessionContext { // SessionContext::_read_type let table_paths = table_paths.to_urls()?; let session_config = self.copied_config(); - let listing_options = - ListingOptions::new(Arc::new(OrcFormat::new())).with_file_extension(".orc"); + let listing_options = ListingOptions::new(Arc::new(OrcFormat)).with_file_extension(".orc"); let option_extension = listing_options.file_extension.clone(); diff --git a/src/object_store_reader.rs b/src/object_store_reader.rs index 6ad84f7d..925d227a 100644 --- a/src/object_store_reader.rs +++ b/src/object_store_reader.rs @@ -22,7 +22,7 @@ use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; use orc_rust::reader::AsyncChunkReader; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{GetOptions, ObjectMeta, ObjectStore}; /// Implements [`AsyncChunkReader`] to allow reading ORC files via `object_store` API. pub struct ObjectStoreReader { @@ -38,7 +38,11 @@ impl ObjectStoreReader { impl AsyncChunkReader for ObjectStoreReader { fn len(&mut self) -> BoxFuture<'_, std::io::Result> { - async move { Ok(self.file.size as u64) }.boxed() + self.store + .get_opts(&self.file.location, GetOptions::default()) + .map(|result| result.map(|x| x.meta.size)) + .map_err(|e| e.into()) + .boxed() } fn get_bytes( @@ -46,8 +50,6 @@ impl AsyncChunkReader for ObjectStoreReader { offset_from_start: u64, length: u64, ) -> BoxFuture<'_, std::io::Result> { - let offset_from_start = offset_from_start as usize; - let length = length as usize; let range = offset_from_start..(offset_from_start + length); self.store .get_range(&self.file.location, range) diff --git a/src/physical_exec.rs b/src/physical_exec.rs index ffd1ee80..b04e6b1b 100644 --- a/src/physical_exec.rs +++ b/src/physical_exec.rs @@ -15,140 +15,49 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; -use std::fmt::{self, Debug}; use std::sync::Arc; use arrow::error::ArrowError; use datafusion::arrow::datatypes::SchemaRef; -use datafusion::datasource::physical_plan::{ - FileMeta, FileOpenFuture, FileOpener, FileScanConfig, FileStream, -}; +use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener, FileScanConfig}; use datafusion::error::Result; -use datafusion::execution::context::TaskContext; -use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; -use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, - SendableRecordBatchStream, -}; -use datafusion_physical_expr::EquivalenceProperties; +use datafusion_datasource::PartitionedFile; use orc_rust::projection::ProjectionMask; use orc_rust::ArrowReaderBuilder; -use futures_util::StreamExt; +use futures_util::{StreamExt, TryStreamExt}; use object_store::ObjectStore; use super::object_store_reader::ObjectStoreReader; -#[derive(Debug, Clone)] -pub struct OrcExec { - config: FileScanConfig, - metrics: ExecutionPlanMetricsSet, - properties: PlanProperties, -} - -impl OrcExec { - pub fn new(config: FileScanConfig) -> Self { - let metrics = ExecutionPlanMetricsSet::new(); - let (projected_schema, _, orderings) = config.project(); - let properties = PlanProperties::new( - EquivalenceProperties::new_with_orderings(projected_schema, &orderings), - Partitioning::UnknownPartitioning(config.file_groups.len()), - ExecutionMode::Bounded, - ); - Self { - config, - metrics, - properties, - } - } -} - -impl DisplayAs for OrcExec { - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result { - write!(f, "OrcExec: ")?; - self.config.fmt_as(t, f) - } +pub(crate) struct OrcOpener { + projection: Vec, + batch_size: usize, + table_schema: SchemaRef, + object_store: Arc, } -impl ExecutionPlan for OrcExec { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - "OrcExec" - } - - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - _: Vec>, - ) -> Result> { - Ok(self) - } +impl OrcOpener { + pub(crate) fn new( + object_store: Arc, + config: &FileScanConfig, + batch_size: usize, + ) -> Self { + let projection = config + .file_column_projection_indices() + .unwrap_or_else(|| (0..config.file_schema.fields().len()).collect()); - fn properties(&self) -> &PlanProperties { - &self.properties - } - - fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) - } - - fn execute( - &self, - partition_index: usize, - context: Arc, - ) -> Result { - let projection: Vec<_> = self - .config - .projection - .as_ref() - .map(|p| { - // FileScanConfig::file_column_projection_indices - p.iter() - .filter(|col_idx| **col_idx < self.config.file_schema.fields().len()) - .copied() - .collect() - }) - .unwrap_or_else(|| (0..self.config.file_schema.fields().len()).collect()); - - let object_store = context - .runtime_env() - .object_store(&self.config.object_store_url)?; - - let opener = OrcOpener { - _partition_index: partition_index, + Self { projection, - batch_size: context.session_config().batch_size(), - _limit: self.config.limit, - table_schema: self.config.file_schema.clone(), - _metrics: self.metrics.clone(), + batch_size: config.batch_size.unwrap_or(batch_size), + table_schema: config.file_schema.clone(), object_store, - }; - - let stream = FileStream::new(&self.config, partition_index, opener, &self.metrics)?; - Ok(Box::pin(stream)) + } } } -// TODO: make use of the unused fields (e.g. implement metrics) -struct OrcOpener { - _partition_index: usize, - projection: Vec, - batch_size: usize, - _limit: Option, - table_schema: SchemaRef, - _metrics: ExecutionPlanMetricsSet, - object_store: Arc, -} - impl FileOpener for OrcOpener { - fn open(&self, file_meta: FileMeta) -> Result { + fn open(&self, file_meta: FileMeta, _: PartitionedFile) -> Result { let reader = ObjectStoreReader::new(self.object_store.clone(), file_meta.object_meta.clone()); let batch_size = self.batch_size; @@ -178,7 +87,7 @@ impl FileOpener for OrcOpener { .with_projection(projection_mask) .build_async(); - Ok(reader.boxed()) + Ok(reader.map_err(Into::into).boxed()) })) } }