Skip to content

Commit 28e13dd

Browse files
authored
feat: CometExecRDD supports per-partition plan data, reduce Iceberg native scan serialization, add DPP for Iceberg scans (#3349)
1 parent 1d01b7d commit 28e13dd

15 files changed

Lines changed: 1201 additions & 560 deletions

File tree

native/core/src/execution/operators/iceberg_scan.rs

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::parquet::parquet_support::SparkParquetOptions;
4444
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
4545
use datafusion::datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper};
4646
use datafusion_comet_spark_expr::EvalMode;
47+
use iceberg::scan::FileScanTask;
4748

4849
/// Iceberg table scan operator that uses iceberg-rust to read Iceberg tables.
4950
///
@@ -58,8 +59,8 @@ pub struct IcebergScanExec {
5859
plan_properties: PlanProperties,
5960
/// Catalog-specific configuration for FileIO
6061
catalog_properties: HashMap<String, String>,
61-
/// Pre-planned file scan tasks, grouped by partition
62-
file_task_groups: Vec<Vec<iceberg::scan::FileScanTask>>,
62+
/// Pre-planned file scan tasks
63+
tasks: Vec<FileScanTask>,
6364
/// Metrics
6465
metrics: ExecutionPlanMetricsSet,
6566
}
@@ -69,11 +70,10 @@ impl IcebergScanExec {
6970
metadata_location: String,
7071
schema: SchemaRef,
7172
catalog_properties: HashMap<String, String>,
72-
file_task_groups: Vec<Vec<iceberg::scan::FileScanTask>>,
73+
tasks: Vec<FileScanTask>,
7374
) -> Result<Self, ExecutionError> {
7475
let output_schema = schema;
75-
let num_partitions = file_task_groups.len();
76-
let plan_properties = Self::compute_properties(Arc::clone(&output_schema), num_partitions);
76+
let plan_properties = Self::compute_properties(Arc::clone(&output_schema), 1);
7777

7878
let metrics = ExecutionPlanMetricsSet::new();
7979

@@ -82,7 +82,7 @@ impl IcebergScanExec {
8282
output_schema,
8383
plan_properties,
8484
catalog_properties,
85-
file_task_groups,
85+
tasks,
8686
metrics,
8787
})
8888
}
@@ -127,19 +127,10 @@ impl ExecutionPlan for IcebergScanExec {
127127

128128
fn execute(
129129
&self,
130-
partition: usize,
130+
_partition: usize,
131131
context: Arc<TaskContext>,
132132
) -> DFResult<SendableRecordBatchStream> {
133-
if partition < self.file_task_groups.len() {
134-
let tasks = &self.file_task_groups[partition];
135-
self.execute_with_tasks(tasks.clone(), partition, context)
136-
} else {
137-
Err(DataFusionError::Execution(format!(
138-
"IcebergScanExec: Partition index {} out of range (only {} task groups available)",
139-
partition,
140-
self.file_task_groups.len()
141-
)))
142-
}
133+
self.execute_with_tasks(self.tasks.clone(), context)
143134
}
144135

145136
fn metrics(&self) -> Option<MetricsSet> {
@@ -152,15 +143,14 @@ impl IcebergScanExec {
152143
/// deletes via iceberg-rust's ArrowReader.
153144
fn execute_with_tasks(
154145
&self,
155-
tasks: Vec<iceberg::scan::FileScanTask>,
156-
partition: usize,
146+
tasks: Vec<FileScanTask>,
157147
context: Arc<TaskContext>,
158148
) -> DFResult<SendableRecordBatchStream> {
159149
let output_schema = Arc::clone(&self.output_schema);
160150
let file_io = Self::load_file_io(&self.catalog_properties, &self.metadata_location)?;
161151
let batch_size = context.session_config().batch_size();
162152

163-
let metrics = IcebergScanMetrics::new(&self.metrics, partition);
153+
let metrics = IcebergScanMetrics::new(&self.metrics);
164154
let num_tasks = tasks.len();
165155
metrics.num_splits.add(num_tasks);
166156

@@ -221,10 +211,10 @@ struct IcebergScanMetrics {
221211
}
222212

223213
impl IcebergScanMetrics {
224-
fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
214+
fn new(metrics: &ExecutionPlanMetricsSet) -> Self {
225215
Self {
226-
baseline: BaselineMetrics::new(metrics, partition),
227-
num_splits: MetricBuilder::new(metrics).counter("num_splits", partition),
216+
baseline: BaselineMetrics::new(metrics, 0),
217+
num_splits: MetricBuilder::new(metrics).counter("num_splits", 0),
228218
}
229219
}
230220
}
@@ -311,11 +301,11 @@ where
311301

312302
impl DisplayAs for IcebergScanExec {
313303
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
314-
let num_tasks: usize = self.file_task_groups.iter().map(|g| g.len()).sum();
315304
write!(
316305
f,
317306
"IcebergScanExec: metadata_location={}, num_tasks={}",
318-
self.metadata_location, num_tasks
307+
self.metadata_location,
308+
self.tasks.len()
319309
)
320310
}
321311
}

native/core/src/execution/planner.rs

Lines changed: 28 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,33 +1151,28 @@ impl PhysicalPlanner {
11511151
))
11521152
}
11531153
OpStruct::IcebergScan(scan) => {
1154-
let required_schema: SchemaRef =
1155-
convert_spark_types_to_arrow_schema(scan.required_schema.as_slice());
1154+
// Extract common data and single partition's file tasks
1155+
// Per-partition injection happens in Scala before sending to native
1156+
let common = scan
1157+
.common
1158+
.as_ref()
1159+
.ok_or_else(|| GeneralError("IcebergScan missing common data".into()))?;
11561160

1157-
let catalog_properties: HashMap<String, String> = scan
1161+
let required_schema =
1162+
convert_spark_types_to_arrow_schema(common.required_schema.as_slice());
1163+
let catalog_properties: HashMap<String, String> = common
11581164
.catalog_properties
11591165
.iter()
11601166
.map(|(k, v)| (k.clone(), v.clone()))
11611167
.collect();
1162-
1163-
let metadata_location = scan.metadata_location.clone();
1164-
1165-
debug_assert!(
1166-
!scan.file_partitions.is_empty(),
1167-
"IcebergScan must have at least one file partition. This indicates a bug in Scala serialization."
1168-
);
1169-
1170-
let tasks = parse_file_scan_tasks(
1171-
scan,
1172-
&scan.file_partitions[self.partition as usize].file_scan_tasks,
1173-
)?;
1174-
let file_task_groups = vec![tasks];
1168+
let metadata_location = common.metadata_location.clone();
1169+
let tasks = parse_file_scan_tasks_from_common(common, &scan.file_scan_tasks)?;
11751170

11761171
let iceberg_scan = IcebergScanExec::new(
11771172
metadata_location,
11781173
required_schema,
11791174
catalog_properties,
1180-
file_task_groups,
1175+
tasks,
11811176
)?;
11821177

11831178
Ok((
@@ -2762,15 +2757,14 @@ fn partition_data_to_struct(
27622757
/// Each task contains a residual predicate that is used for row-group level filtering
27632758
/// during Parquet scanning.
27642759
///
2765-
/// This function uses deduplication pools from the IcebergScan to avoid redundant parsing
2766-
/// of schemas, partition specs, partition types, name mappings, and other repeated data.
2767-
fn parse_file_scan_tasks(
2768-
proto_scan: &spark_operator::IcebergScan,
2760+
/// This function uses deduplication pools from the IcebergScanCommon to avoid redundant
2761+
/// parsing of schemas, partition specs, partition types, name mappings, and other repeated data.
2762+
fn parse_file_scan_tasks_from_common(
2763+
proto_common: &spark_operator::IcebergScanCommon,
27692764
proto_tasks: &[spark_operator::IcebergFileScanTask],
27702765
) -> Result<Vec<iceberg::scan::FileScanTask>, ExecutionError> {
2771-
// Build caches upfront: for 10K tasks with 1 schema, this parses the schema
2772-
// once instead of 10K times, eliminating redundant JSON deserialization
2773-
let schema_cache: Vec<Arc<iceberg::spec::Schema>> = proto_scan
2766+
// Parse each unique schema once, not once per task
2767+
let schema_cache: Vec<Arc<iceberg::spec::Schema>> = proto_common
27742768
.schema_pool
27752769
.iter()
27762770
.map(|json| {
@@ -2783,7 +2777,7 @@ fn parse_file_scan_tasks(
27832777
})
27842778
.collect::<Result<Vec<_>, _>>()?;
27852779

2786-
let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = proto_scan
2780+
let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = proto_common
27872781
.partition_spec_pool
27882782
.iter()
27892783
.map(|json| {
@@ -2793,7 +2787,7 @@ fn parse_file_scan_tasks(
27932787
})
27942788
.collect();
27952789

2796-
let name_mapping_cache: Vec<Option<Arc<iceberg::spec::NameMapping>>> = proto_scan
2790+
let name_mapping_cache: Vec<Option<Arc<iceberg::spec::NameMapping>>> = proto_common
27972791
.name_mapping_pool
27982792
.iter()
27992793
.map(|json| {
@@ -2803,7 +2797,7 @@ fn parse_file_scan_tasks(
28032797
})
28042798
.collect();
28052799

2806-
let delete_files_cache: Vec<Vec<iceberg::scan::FileScanTaskDeleteFile>> = proto_scan
2800+
let delete_files_cache: Vec<Vec<iceberg::scan::FileScanTaskDeleteFile>> = proto_common
28072801
.delete_files_pool
28082802
.iter()
28092803
.map(|list| {
@@ -2815,7 +2809,7 @@ fn parse_file_scan_tasks(
28152809
"EQUALITY_DELETES" => iceberg::spec::DataContentType::EqualityDeletes,
28162810
other => {
28172811
return Err(GeneralError(format!(
2818-
"Invalid delete content type '{}'. This indicates a bug in Scala serialization.",
2812+
"Invalid delete content type '{}'",
28192813
other
28202814
)))
28212815
}
@@ -2836,7 +2830,6 @@ fn parse_file_scan_tasks(
28362830
})
28372831
.collect::<Result<Vec<_>, _>>()?;
28382832

2839-
// Partition data pool is in protobuf messages
28402833
let results: Result<Vec<_>, _> = proto_tasks
28412834
.iter()
28422835
.map(|proto_task| {
@@ -2870,7 +2863,7 @@ fn parse_file_scan_tasks(
28702863
};
28712864

28722865
let bound_predicate = if let Some(idx) = proto_task.residual_idx {
2873-
proto_scan
2866+
proto_common
28742867
.residual_pool
28752868
.get(idx as usize)
28762869
.and_then(convert_spark_expr_to_predicate)
@@ -2890,24 +2883,22 @@ fn parse_file_scan_tasks(
28902883
};
28912884

28922885
let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx {
2893-
// Get partition data from protobuf pool
2894-
let partition_data_proto = proto_scan
2886+
let partition_data_proto = proto_common
28952887
.partition_data_pool
28962888
.get(partition_data_idx as usize)
28972889
.ok_or_else(|| {
28982890
ExecutionError::GeneralError(format!(
28992891
"Invalid partition_data_idx: {} (pool size: {})",
29002892
partition_data_idx,
2901-
proto_scan.partition_data_pool.len()
2893+
proto_common.partition_data_pool.len()
29022894
))
29032895
})?;
29042896

2905-
// Convert protobuf PartitionData to iceberg Struct
29062897
match partition_data_to_struct(partition_data_proto) {
29072898
Ok(s) => Some(s),
29082899
Err(e) => {
29092900
return Err(ExecutionError::GeneralError(format!(
2910-
"Failed to deserialize partition data from protobuf: {}",
2901+
"Failed to deserialize partition data: {}",
29112902
e
29122903
)))
29132904
}
@@ -2926,14 +2917,14 @@ fn parse_file_scan_tasks(
29262917
.and_then(|idx| name_mapping_cache.get(idx as usize))
29272918
.and_then(|opt| opt.clone());
29282919

2929-
let project_field_ids = proto_scan
2920+
let project_field_ids = proto_common
29302921
.project_field_ids_pool
29312922
.get(proto_task.project_field_ids_idx as usize)
29322923
.ok_or_else(|| {
29332924
ExecutionError::GeneralError(format!(
29342925
"Invalid project_field_ids_idx: {} (pool size: {})",
29352926
proto_task.project_field_ids_idx,
2936-
proto_scan.project_field_ids_pool.len()
2927+
proto_common.project_field_ids_pool.len()
29372928
))
29382929
})?
29392930
.field_ids

native/proto/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub mod spark_partitioning {
3434

3535
// Include generated modules from .proto files.
3636
#[allow(missing_docs)]
37+
#[allow(clippy::large_enum_variant)]
3738
pub mod spark_operator {
3839
include!(concat!("generated", "/spark.spark_operator.rs"));
3940
}

native/proto/src/proto/operator.proto

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -156,28 +156,34 @@ message PartitionData {
156156
repeated PartitionValue values = 1;
157157
}
158158

159-
message IcebergScan {
160-
// Schema to read
161-
repeated SparkStructField required_schema = 1;
162-
159+
// Common data shared by all partitions in split mode (sent once, captured in closure)
160+
message IcebergScanCommon {
163161
// Catalog-specific configuration for FileIO (credentials, S3/GCS config, etc.)
164-
map<string, string> catalog_properties = 2;
165-
166-
// Pre-planned file scan tasks grouped by Spark partition
167-
repeated IcebergFilePartition file_partitions = 3;
162+
map<string, string> catalog_properties = 1;
168163

169164
// Table metadata file path for FileIO initialization
170-
string metadata_location = 4;
165+
string metadata_location = 2;
166+
167+
// Schema to read
168+
repeated SparkStructField required_schema = 3;
171169

172-
// Deduplication pools - shared data referenced by index from tasks
173-
repeated string schema_pool = 5;
174-
repeated string partition_type_pool = 6;
175-
repeated string partition_spec_pool = 7;
176-
repeated string name_mapping_pool = 8;
177-
repeated ProjectFieldIdList project_field_ids_pool = 9;
178-
repeated PartitionData partition_data_pool = 10;
179-
repeated DeleteFileList delete_files_pool = 11;
180-
repeated spark.spark_expression.Expr residual_pool = 12;
170+
// Deduplication pools (must contain all entries for cross-partition deduplication)
171+
repeated string schema_pool = 4;
172+
repeated string partition_type_pool = 5;
173+
repeated string partition_spec_pool = 6;
174+
repeated string name_mapping_pool = 7;
175+
repeated ProjectFieldIdList project_field_ids_pool = 8;
176+
repeated PartitionData partition_data_pool = 9;
177+
repeated DeleteFileList delete_files_pool = 10;
178+
repeated spark.spark_expression.Expr residual_pool = 11;
179+
}
180+
181+
message IcebergScan {
182+
// Common data shared across partitions (pools, metadata, catalog props)
183+
IcebergScanCommon common = 1;
184+
185+
// Single partition's file scan tasks
186+
repeated IcebergFileScanTask file_scan_tasks = 2;
181187
}
182188

183189
// Helper message for deduplicating field ID lists
@@ -190,11 +196,6 @@ message DeleteFileList {
190196
repeated IcebergDeleteFile delete_files = 1;
191197
}
192198

193-
// Groups FileScanTasks for a single Spark partition
194-
message IcebergFilePartition {
195-
repeated IcebergFileScanTask file_scan_tasks = 1;
196-
}
197-
198199
// Iceberg FileScanTask containing data file, delete files, and residual filter
199200
message IcebergFileScanTask {
200201
// Data file path (e.g., s3://bucket/warehouse/db/table/data/00000-0-abc.parquet)

spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,7 @@ case class CometIcebergNativeScanMetadata(
734734
table: Any,
735735
metadataLocation: String,
736736
nameMapping: Option[String],
737-
tasks: java.util.List[_],
737+
@transient tasks: java.util.List[_],
738738
scanSchema: Any,
739739
tableSchema: Any,
740740
globalFieldIdMapping: Map[String, Int],

0 commit comments

Comments
 (0)