Skip to content

Commit 4de31e5

Browse files
committed
fix: use midpoint-based row group pruning to match Spark parallelism
DataFusion's built-in prune_by_range checks whether a row group's start offset falls within a split's byte range. Spark/parquet-mr instead uses the midpoint (start + compressed_size/2). When these algorithms disagree, some tasks read too many row groups while others read none, wasting cluster parallelism. This adds midpoint-based row group assignment on the native side: for each PartitionedFile with a byte range, we read the Parquet footer, compute which row groups have their midpoint in [range_start, range_end), and attach a ParquetAccessPlan. The byte range is removed so DataFusion's prune_by_range is bypassed entirely. Closes #3817
1 parent eb502ff commit 4de31e5

File tree

1 file changed

+93
-1
lines changed

1 file changed

+93
-1
lines changed

native/core/src/parquet/parquet_exec.rs

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
2323
use arrow::datatypes::{Field, SchemaRef};
2424
use datafusion::config::TableParquetOptions;
2525
use datafusion::datasource::listing::PartitionedFile;
26+
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
2627
use datafusion::datasource::physical_plan::{
2728
FileGroup, FileScanConfigBuilder, FileSource, ParquetSource,
2829
};
@@ -37,6 +38,10 @@ use datafusion::prelude::SessionContext;
3738
use datafusion::scalar::ScalarValue;
3839
use datafusion_comet_spark_expr::EvalMode;
3940
use datafusion_datasource::TableSchema;
41+
use object_store::ObjectStore;
42+
use parquet::arrow::async_reader::ParquetObjectReader;
43+
use parquet::arrow::async_reader::AsyncFileReader;
44+
use parquet::file::metadata::RowGroupMetaData;
4045
use std::collections::HashMap;
4146
use std::sync::Arc;
4247

@@ -153,7 +158,15 @@ pub(crate) fn init_datasource_exec(
153158
// Use caching reader factory to avoid redundant footer reads across partitions
154159
let store = session_ctx.runtime_env().object_store(&object_store_url)?;
155160
parquet_source = parquet_source
156-
.with_parquet_file_reader_factory(Arc::new(CachingParquetReaderFactory::new(store)));
161+
.with_parquet_file_reader_factory(Arc::new(CachingParquetReaderFactory::new(
162+
Arc::clone(&store),
163+
)));
164+
165+
// Apply midpoint-based row group pruning to match Spark/parquet-mr behavior.
166+
// DataFusion's built-in prune_by_range uses start-offset which can disagree with
167+
// Spark's midpoint-based assignment, causing some tasks to read no data while
168+
// others read too much. We replace the range with an explicit ParquetAccessPlan.
169+
let file_groups = apply_midpoint_row_group_pruning(file_groups, &store)?;
157170

158171
let expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory> = Arc::new(
159172
SparkPhysicalExprAdapterFactory::new(spark_parquet_options, default_values),
@@ -182,6 +195,85 @@ pub(crate) fn init_datasource_exec(
182195
Ok(data_source_exec)
183196
}
184197

198+
/// Compute the midpoint offset of a row group, matching the algorithm used by
199+
/// Spark (parquet-mr) and parquet-rs to assign row groups to file splits.
200+
///
201+
/// The midpoint is: min(data_page_offset, dictionary_page_offset) + compressed_size / 2
202+
///
203+
/// A row group belongs to a split if its midpoint falls within [split_start, split_end).
204+
fn get_row_group_midpoint(rg: &RowGroupMetaData) -> i64 {
205+
let col = rg.column(0);
206+
let mut offset = col.data_page_offset();
207+
if let Some(dict_offset) = col.dictionary_page_offset() {
208+
if dict_offset < offset {
209+
offset = dict_offset;
210+
}
211+
}
212+
offset + rg.compressed_size() / 2
213+
}
214+
215+
/// For each PartitionedFile that has a byte range, read the Parquet footer and compute
216+
/// which row groups belong to this split using the midpoint algorithm (matching Spark/parquet-mr).
217+
/// Replace the byte range with an explicit ParquetAccessPlan so that DataFusion's
218+
/// `prune_by_range` (which uses a different algorithm) is bypassed.
219+
fn apply_midpoint_row_group_pruning(
220+
file_groups: Vec<Vec<PartitionedFile>>,
221+
store: &Arc<dyn ObjectStore>,
222+
) -> Result<Vec<Vec<PartitionedFile>>, ExecutionError> {
223+
// Check if any file has a range; if not, return early
224+
let has_ranges = file_groups
225+
.iter()
226+
.any(|group| group.iter().any(|f| f.range.is_some()));
227+
if !has_ranges {
228+
return Ok(file_groups);
229+
}
230+
231+
let rt = tokio::runtime::Handle::try_current().map_err(|e| {
232+
ExecutionError::GeneralError(format!("No tokio runtime available: {e}"))
233+
})?;
234+
235+
let mut result = Vec::with_capacity(file_groups.len());
236+
for group in file_groups {
237+
let mut new_group = Vec::with_capacity(group.len());
238+
for mut file in group {
239+
if let Some(range) = file.range.take() {
240+
let range_start = range.start;
241+
let range_end = range.end;
242+
243+
// Read the Parquet footer to get row group metadata
244+
let metadata = rt.block_on(async {
245+
let location = file.object_meta.location.clone();
246+
let mut reader = ParquetObjectReader::new(Arc::clone(store), location)
247+
.with_file_size(file.object_meta.size);
248+
reader.get_metadata(None).await
249+
})
250+
.map_err(|e| {
251+
ExecutionError::GeneralError(format!(
252+
"Failed to read Parquet metadata for {}: {e}",
253+
file.object_meta.location
254+
))
255+
})?;
256+
257+
let num_row_groups = metadata.num_row_groups();
258+
let mut access_plan = ParquetAccessPlan::new_none(num_row_groups);
259+
260+
for i in 0..num_row_groups {
261+
let midpoint = get_row_group_midpoint(metadata.row_group(i));
262+
if midpoint >= range_start && midpoint < range_end {
263+
access_plan.scan(i);
264+
}
265+
}
266+
267+
file.extensions = Some(Arc::new(access_plan));
268+
}
269+
new_group.push(file);
270+
}
271+
result.push(new_group);
272+
}
273+
274+
Ok(result)
275+
}
276+
185277
fn get_options(
186278
session_timezone: &str,
187279
case_sensitive: bool,

0 commit comments

Comments
 (0)