1515// specific language governing permissions and limitations
1616// under the License.
1717
18+ use crate :: execution:: jni_api:: get_runtime;
1819use crate :: execution:: operators:: ExecutionError ;
1920use crate :: parquet:: encryption_support:: { CometEncryptionConfig , ENCRYPTION_FACTORY_ID } ;
2021use crate :: parquet:: parquet_read_cached_factory:: CachingParquetReaderFactory ;
@@ -23,6 +24,7 @@ use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
2324use arrow:: datatypes:: { Field , SchemaRef } ;
2425use datafusion:: config:: TableParquetOptions ;
2526use datafusion:: datasource:: listing:: PartitionedFile ;
27+ use datafusion:: datasource:: physical_plan:: parquet:: ParquetAccessPlan ;
2628use datafusion:: datasource:: physical_plan:: {
2729 FileGroup , FileScanConfigBuilder , FileSource , ParquetSource ,
2830} ;
@@ -37,6 +39,9 @@ use datafusion::prelude::SessionContext;
3739use datafusion:: scalar:: ScalarValue ;
3840use datafusion_comet_spark_expr:: EvalMode ;
3941use datafusion_datasource:: TableSchema ;
42+ use object_store:: ObjectStore ;
43+ use parquet:: arrow:: async_reader:: { AsyncFileReader , ParquetObjectReader } ;
44+ use parquet:: file:: metadata:: RowGroupMetaData ;
4045use std:: collections:: HashMap ;
4146use std:: sync:: Arc ;
4247
@@ -153,7 +158,15 @@ pub(crate) fn init_datasource_exec(
153158 // Use caching reader factory to avoid redundant footer reads across partitions
154159 let store = session_ctx. runtime_env ( ) . object_store ( & object_store_url) ?;
155160 parquet_source = parquet_source
156- . with_parquet_file_reader_factory ( Arc :: new ( CachingParquetReaderFactory :: new ( store) ) ) ;
161+ . with_parquet_file_reader_factory ( Arc :: new ( CachingParquetReaderFactory :: new (
162+ Arc :: clone ( & store) ,
163+ ) ) ) ;
164+
165+ // Apply midpoint-based row group pruning to match Spark/parquet-mr behavior.
166+ // DataFusion's built-in prune_by_range uses start-offset which can disagree with
167+ // Spark's midpoint-based assignment, causing some tasks to read no data while
168+ // others read too much. We replace the range with an explicit ParquetAccessPlan.
169+ let file_groups = apply_midpoint_row_group_pruning ( file_groups, & store) ?;
157170
158171 let expr_adapter_factory: Arc < dyn PhysicalExprAdapterFactory > = Arc :: new (
159172 SparkPhysicalExprAdapterFactory :: new ( spark_parquet_options, default_values) ,
@@ -182,6 +195,78 @@ pub(crate) fn init_datasource_exec(
182195 Ok ( data_source_exec)
183196}
184197
198+ /// Compute the midpoint offset of a row group, matching the algorithm used by
199+ /// Spark (parquet-mr) and parquet-rs to assign row groups to file splits.
200+ ///
201+ /// The midpoint is: min(data_page_offset, dictionary_page_offset) + compressed_size / 2
202+ ///
203+ /// A row group belongs to a split if its midpoint falls within [split_start, split_end).
204+ fn get_row_group_midpoint ( rg : & RowGroupMetaData ) -> i64 {
205+ let col = rg. column ( 0 ) ;
206+ let mut offset = col. data_page_offset ( ) ;
207+ if let Some ( dict_offset) = col. dictionary_page_offset ( ) {
208+ if dict_offset < offset {
209+ offset = dict_offset;
210+ }
211+ }
212+ offset + rg. compressed_size ( ) / 2
213+ }
214+
215+ /// For each PartitionedFile that has a byte range, read the Parquet footer and compute
216+ /// which row groups belong to this split using the midpoint algorithm (matching Spark/parquet-mr).
217+ /// Replace the byte range with an explicit ParquetAccessPlan so that DataFusion's
218+ /// `prune_by_range` (which uses a different algorithm) is bypassed.
219+ fn apply_midpoint_row_group_pruning (
220+ file_groups : Vec < Vec < PartitionedFile > > ,
221+ store : & Arc < dyn ObjectStore > ,
222+ ) -> Result < Vec < Vec < PartitionedFile > > , ExecutionError > {
223+ let has_ranges = file_groups
224+ . iter ( )
225+ . any ( |group| group. iter ( ) . any ( |f| f. range . is_some ( ) ) ) ;
226+ if !has_ranges {
227+ return Ok ( file_groups) ;
228+ }
229+
230+ let rt = get_runtime ( ) ;
231+
232+ let mut result = Vec :: with_capacity ( file_groups. len ( ) ) ;
233+ for group in file_groups {
234+ let mut new_group = Vec :: with_capacity ( group. len ( ) ) ;
235+ for mut file in group {
236+ if let Some ( range) = file. range . take ( ) {
237+ let metadata = rt. block_on ( async {
238+ let mut reader =
239+ ParquetObjectReader :: new ( Arc :: clone ( store) , file. object_meta . location . clone ( ) )
240+ . with_file_size ( file. object_meta . size ) ;
241+ reader. get_metadata ( None ) . await
242+ } )
243+ . map_err ( |e| {
244+ ExecutionError :: GeneralError ( format ! (
245+ "Failed to read Parquet metadata for {}: {e}" ,
246+ file. object_meta. location
247+ ) )
248+ } ) ?;
249+
250+ let num_row_groups = metadata. num_row_groups ( ) ;
251+ let mut access_plan = ParquetAccessPlan :: new_none ( num_row_groups) ;
252+
253+ for i in 0 ..num_row_groups {
254+ let midpoint = get_row_group_midpoint ( metadata. row_group ( i) ) ;
255+ if midpoint >= range. start && midpoint < range. end {
256+ access_plan. scan ( i) ;
257+ }
258+ }
259+
260+ file. extensions = Some ( Arc :: new ( access_plan) ) ;
261+ }
262+ new_group. push ( file) ;
263+ }
264+ result. push ( new_group) ;
265+ }
266+
267+ Ok ( result)
268+ }
269+
185270fn get_options (
186271 session_timezone : & str ,
187272 case_sensitive : bool ,
0 commit comments