diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index ef4c878b9a..e34b4f4a0e 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::execution::jni_api::get_runtime; use crate::execution::operators::ExecutionError; use crate::parquet::encryption_support::{CometEncryptionConfig, ENCRYPTION_FACTORY_ID}; use crate::parquet::parquet_read_cached_factory::CachingParquetReaderFactory; @@ -23,6 +24,7 @@ use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory; use arrow::datatypes::{Field, SchemaRef}; use datafusion::config::TableParquetOptions; use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan; use datafusion::datasource::physical_plan::{ FileGroup, FileScanConfigBuilder, FileSource, ParquetSource, }; @@ -37,6 +39,9 @@ use datafusion::prelude::SessionContext; use datafusion::scalar::ScalarValue; use datafusion_comet_spark_expr::EvalMode; use datafusion_datasource::TableSchema; +use object_store::ObjectStore; +use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; +use parquet::file::metadata::RowGroupMetaData; use std::collections::HashMap; use std::sync::Arc; @@ -153,7 +158,15 @@ pub(crate) fn init_datasource_exec( // Use caching reader factory to avoid redundant footer reads across partitions let store = session_ctx.runtime_env().object_store(&object_store_url)?; parquet_source = parquet_source - .with_parquet_file_reader_factory(Arc::new(CachingParquetReaderFactory::new(store))); + .with_parquet_file_reader_factory(Arc::new(CachingParquetReaderFactory::new( + Arc::clone(&store), + ))); + + // Apply midpoint-based row group pruning to match Spark/parquet-mr behavior. + // DataFusion's built-in prune_by_range uses start-offset which can disagree with + // Spark's midpoint-based assignment, causing some tasks to read no data while + // others read too much. We replace the range with an explicit ParquetAccessPlan. + let file_groups = apply_midpoint_row_group_pruning(file_groups, &store)?; let expr_adapter_factory: Arc = Arc::new( SparkPhysicalExprAdapterFactory::new(spark_parquet_options, default_values), @@ -182,6 +195,78 @@ pub(crate) fn init_datasource_exec( Ok(data_source_exec) } +/// Compute the midpoint offset of a row group, matching the algorithm used by +/// Spark (parquet-mr) and parquet-rs to assign row groups to file splits. +/// +/// The midpoint is: min(data_page_offset, dictionary_page_offset) + compressed_size / 2 +/// +/// A row group belongs to a split if its midpoint falls within [split_start, split_end). +fn get_row_group_midpoint(rg: &RowGroupMetaData) -> i64 { + let col = rg.column(0); + let mut offset = col.data_page_offset(); + if let Some(dict_offset) = col.dictionary_page_offset() { + if dict_offset < offset { + offset = dict_offset; + } + } + offset + rg.compressed_size() / 2 +} + +/// For each PartitionedFile that has a byte range, read the Parquet footer and compute +/// which row groups belong to this split using the midpoint algorithm (matching Spark/parquet-mr). +/// Replace the byte range with an explicit ParquetAccessPlan so that DataFusion's +/// `prune_by_range` (which uses a different algorithm) is bypassed. +fn apply_midpoint_row_group_pruning( + file_groups: Vec>, + store: &Arc, +) -> Result>, ExecutionError> { + let has_ranges = file_groups + .iter() + .any(|group| group.iter().any(|f| f.range.is_some())); + if !has_ranges { + return Ok(file_groups); + } + + let rt = get_runtime(); + + let mut result = Vec::with_capacity(file_groups.len()); + for group in file_groups { + let mut new_group = Vec::with_capacity(group.len()); + for mut file in group { + if let Some(range) = file.range.take() { + let metadata = rt.block_on(async { + let mut reader = + ParquetObjectReader::new(Arc::clone(store), file.object_meta.location.clone()) + .with_file_size(file.object_meta.size); + reader.get_metadata(None).await + }) + .map_err(|e| { + ExecutionError::GeneralError(format!( + "Failed to read Parquet metadata for {}: {e}", + file.object_meta.location + )) + })?; + + let num_row_groups = metadata.num_row_groups(); + let mut access_plan = ParquetAccessPlan::new_none(num_row_groups); + + for i in 0..num_row_groups { + let midpoint = get_row_group_midpoint(metadata.row_group(i)); + if midpoint >= range.start && midpoint < range.end { + access_plan.scan(i); + } + } + + file.extensions = Some(Arc::new(access_plan)); + } + new_group.push(file); + } + result.push(new_group); + } + + Ok(result) +} + fn get_options( session_timezone: &str, case_sensitive: bool,