Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ee1ccf8
Continue #3295, experimental DPP support.
mbutrovich Jan 30, 2026
70ad4f5
Remove unnecessary steps in convert(), hoist reflection calls out of …
mbutrovich Jan 30, 2026
e820616
scalastyle
mbutrovich Jan 30, 2026
b08abe0
Fix scenario with multiple DPP expressions (i.e., join on two partiti…
mbutrovich Jan 30, 2026
9dc84f9
Docs.
mbutrovich Jan 30, 2026
b3c7c79
Throw an exception on reflection error in setInSubqueryResult, strong…
mbutrovich Jan 30, 2026
a90f06d
Comments cleanup. Throw exception if column not found in subquery.
mbutrovich Jan 30, 2026
09066fb
Avoid capturing perPartitionByLocation in closure when:
mbutrovich Jan 30, 2026
4ea7c78
Remove IcebergFilePartition from proto and clean up native code now t…
mbutrovich Jan 30, 2026
9eb95d7
Use sab.index and sab.buildKeys with exprId matching (handles renamed…
mbutrovich Jan 31, 2026
0fd297e
Simplify matching logic for SubqueryAdaptiveBroadcastExec expressions…
mbutrovich Jan 31, 2026
8f7b29d
add shim for Spark 4.0 SAB API change (indices instead of index), add…
mbutrovich Jan 31, 2026
da27f5f
add Spark 3.4 shim, whoops
mbutrovich Jan 31, 2026
4af89b2
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Jan 31, 2026
97f3693
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Feb 1, 2026
43a56e5
Comment.
mbutrovich Feb 1, 2026
bac6d66
Refactor down to just one CometExecRDD. Let's see how CI goes.
mbutrovich Feb 1, 2026
e73cca0
Fix spotless.
mbutrovich Feb 1, 2026
95c4e6d
Fix broadcast with DPP?
mbutrovich Feb 1, 2026
67c8bdb
Minor refactor for variable names, comments.
mbutrovich Feb 1, 2026
aa048a7
Fix scalastyle.
mbutrovich Feb 1, 2026
02a52a3
cache parsed commonData.
mbutrovich Feb 1, 2026
9cc541a
Address PR feedback.
mbutrovich Feb 2, 2026
bd70924
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Feb 4, 2026
b10ed64
add test
mbutrovich Feb 5, 2026
b54c87a
Address PR feedback.
mbutrovich Feb 5, 2026
51f8c42
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Feb 5, 2026
268cc00
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Feb 6, 2026
6d62175
upmerge main, resolve conflicts, format
mbutrovich Feb 6, 2026
82e4513
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Feb 6, 2026
f0720f5
Add docs to try to address PR feedback.
mbutrovich Feb 6, 2026
212ebef
Minor refactor for readability. spotless:apply
mbutrovich Feb 6, 2026
4e13899
Add LRU cache to IcebergPlanDataInjector.
mbutrovich Feb 7, 2026
fb10fb5
Merge branch 'main' into iceberg-split-serialization-dpp
mbutrovich Feb 7, 2026
96af2a2
Clean up imports.
mbutrovich Feb 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/iceberg_spark_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ jobs:
-Pquick=true -x javadoc

iceberg-spark-rust:
if: contains(github.event.pull_request.title, '[iceberg]')
if: contains(github.event.pull_request.title, '[iceberg-rust]')
strategy:
matrix:
os: [ubuntu-24.04]
Expand Down Expand Up @@ -203,7 +203,7 @@ jobs:
-Pquick=true -x javadoc

iceberg-spark-extensions-rust:
if: contains(github.event.pull_request.title, '[iceberg]')
if: contains(github.event.pull_request.title, '[iceberg-rust]')
strategy:
matrix:
os: [ubuntu-24.04]
Expand Down Expand Up @@ -242,7 +242,7 @@ jobs:
-Pquick=true -x javadoc

iceberg-spark-runtime-rust:
if: contains(github.event.pull_request.title, '[iceberg]')
if: contains(github.event.pull_request.title, '[iceberg-rust]')
strategy:
matrix:
os: [ubuntu-24.04]
Expand Down
14 changes: 12 additions & 2 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,18 @@ impl ExecutionPlan for IcebergScanExec {
partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
if partition < self.file_task_groups.len() {
let tasks = &self.file_task_groups[partition];
// In split mode (single task group), always use index 0 regardless of requested partition.
// This is because in Comet's per-partition execution model, each task builds its own plan
// with only its partition's data. The parent operator may request partition N, but this
// IcebergScanExec already contains the correct data for partition N in task_groups[0].
let effective_partition = if self.file_task_groups.len() == 1 {
0
} else {
partition
};

if effective_partition < self.file_task_groups.len() {
let tasks = &self.file_task_groups[effective_partition];
self.execute_with_tasks(tasks.clone(), partition, context)
} else {
Err(DataFusionError::Execution(format!(
Expand Down
65 changes: 31 additions & 34 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,26 +1132,27 @@ impl PhysicalPlanner {
))
}
OpStruct::IcebergScan(scan) => {
let required_schema: SchemaRef =
convert_spark_types_to_arrow_schema(scan.required_schema.as_slice());
// Extract common data and single partition's file tasks
// Per-partition injection happens in Scala before sending to native
let common = scan
.common
.as_ref()
.ok_or_else(|| GeneralError("IcebergScan missing common data".into()))?;
let partition = scan
.partition
.as_ref()
.ok_or_else(|| GeneralError("IcebergScan missing partition data".into()))?;

let catalog_properties: HashMap<String, String> = scan
let required_schema =
convert_spark_types_to_arrow_schema(common.required_schema.as_slice());
let catalog_properties: HashMap<String, String> = common
.catalog_properties
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let metadata_location = common.metadata_location.clone();
let tasks = parse_file_scan_tasks_from_common(common, &partition.file_scan_tasks)?;

let metadata_location = scan.metadata_location.clone();

debug_assert!(
!scan.file_partitions.is_empty(),
"IcebergScan must have at least one file partition. This indicates a bug in Scala serialization."
);

let tasks = parse_file_scan_tasks(
scan,
&scan.file_partitions[self.partition as usize].file_scan_tasks,
)?;
let file_task_groups = vec![tasks];

let iceberg_scan = IcebergScanExec::new(
Expand Down Expand Up @@ -2743,15 +2744,14 @@ fn partition_data_to_struct(
/// Each task contains a residual predicate that is used for row-group level filtering
/// during Parquet scanning.
///
/// This function uses deduplication pools from the IcebergScan to avoid redundant parsing
/// of schemas, partition specs, partition types, name mappings, and other repeated data.
fn parse_file_scan_tasks(
proto_scan: &spark_operator::IcebergScan,
/// This function uses deduplication pools from the IcebergScanCommon to avoid redundant
/// parsing of schemas, partition specs, partition types, name mappings, and other repeated data.
fn parse_file_scan_tasks_from_common(
proto_common: &spark_operator::IcebergScanCommon,
proto_tasks: &[spark_operator::IcebergFileScanTask],
) -> Result<Vec<iceberg::scan::FileScanTask>, ExecutionError> {
// Build caches upfront: for 10K tasks with 1 schema, this parses the schema
// once instead of 10K times, eliminating redundant JSON deserialization
let schema_cache: Vec<Arc<iceberg::spec::Schema>> = proto_scan
// Build caches upfront from common data
let schema_cache: Vec<Arc<iceberg::spec::Schema>> = proto_common
.schema_pool
.iter()
.map(|json| {
Expand All @@ -2764,7 +2764,7 @@ fn parse_file_scan_tasks(
})
.collect::<Result<Vec<_>, _>>()?;

let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = proto_scan
let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = proto_common
.partition_spec_pool
.iter()
.map(|json| {
Expand All @@ -2774,7 +2774,7 @@ fn parse_file_scan_tasks(
})
.collect();

let name_mapping_cache: Vec<Option<Arc<iceberg::spec::NameMapping>>> = proto_scan
let name_mapping_cache: Vec<Option<Arc<iceberg::spec::NameMapping>>> = proto_common
.name_mapping_pool
.iter()
.map(|json| {
Expand All @@ -2784,7 +2784,7 @@ fn parse_file_scan_tasks(
})
.collect();

let delete_files_cache: Vec<Vec<iceberg::scan::FileScanTaskDeleteFile>> = proto_scan
let delete_files_cache: Vec<Vec<iceberg::scan::FileScanTaskDeleteFile>> = proto_common
.delete_files_pool
.iter()
.map(|list| {
Expand All @@ -2796,7 +2796,7 @@ fn parse_file_scan_tasks(
"EQUALITY_DELETES" => iceberg::spec::DataContentType::EqualityDeletes,
other => {
return Err(GeneralError(format!(
"Invalid delete content type '{}'. This indicates a bug in Scala serialization.",
"Invalid delete content type '{}'",
other
)))
}
Expand All @@ -2817,7 +2817,6 @@ fn parse_file_scan_tasks(
})
.collect::<Result<Vec<_>, _>>()?;

// Partition data pool is in protobuf messages
let results: Result<Vec<_>, _> = proto_tasks
.iter()
.map(|proto_task| {
Expand Down Expand Up @@ -2851,7 +2850,7 @@ fn parse_file_scan_tasks(
};

let bound_predicate = if let Some(idx) = proto_task.residual_idx {
proto_scan
proto_common
.residual_pool
.get(idx as usize)
.and_then(convert_spark_expr_to_predicate)
Expand All @@ -2871,24 +2870,22 @@ fn parse_file_scan_tasks(
};

let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx {
// Get partition data from protobuf pool
let partition_data_proto = proto_scan
let partition_data_proto = proto_common
.partition_data_pool
.get(partition_data_idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
"Invalid partition_data_idx: {} (pool size: {})",
partition_data_idx,
proto_scan.partition_data_pool.len()
proto_common.partition_data_pool.len()
))
})?;

// Convert protobuf PartitionData to iceberg Struct
match partition_data_to_struct(partition_data_proto) {
Ok(s) => Some(s),
Err(e) => {
return Err(ExecutionError::GeneralError(format!(
"Failed to deserialize partition data from protobuf: {}",
"Failed to deserialize partition data: {}",
e
)))
}
Expand All @@ -2907,14 +2904,14 @@ fn parse_file_scan_tasks(
.and_then(|idx| name_mapping_cache.get(idx as usize))
.and_then(|opt| opt.clone());

let project_field_ids = proto_scan
let project_field_ids = proto_common
.project_field_ids_pool
.get(proto_task.project_field_ids_idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
"Invalid project_field_ids_idx: {} (pool size: {})",
proto_task.project_field_ids_idx,
proto_scan.project_field_ids_pool.len()
proto_common.project_field_ids_pool.len()
))
})?
.field_ids
Expand Down
1 change: 1 addition & 0 deletions native/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod spark_partitioning {

// Include generated modules from .proto files.
#[allow(missing_docs)]
#[allow(clippy::large_enum_variant)]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably have a follow up issue to revisit the choice to ignore this warning

pub mod spark_operator {
include!(concat!("generated", "/spark.spark_operator.rs"));
}
Expand Down
42 changes: 24 additions & 18 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -156,28 +156,34 @@ message PartitionData {
repeated PartitionValue values = 1;
}

message IcebergScan {
// Common data shared by all partitions in split mode (sent once, captured in closure)
message IcebergScanCommon {
// Catalog-specific configuration for FileIO (credentials, S3/GCS config, etc.)
map<string, string> catalog_properties = 1;

// Table metadata file path for FileIO initialization
string metadata_location = 2;

// Schema to read
repeated SparkStructField required_schema = 1;
repeated SparkStructField required_schema = 3;

// Catalog-specific configuration for FileIO (credentials, S3/GCS config, etc.)
map<string, string> catalog_properties = 2;
// Deduplication pools (must contain ALL entries for cross-partition deduplication)
repeated string schema_pool = 4;
repeated string partition_type_pool = 5;
repeated string partition_spec_pool = 6;
repeated string name_mapping_pool = 7;
repeated ProjectFieldIdList project_field_ids_pool = 8;
repeated PartitionData partition_data_pool = 9;
repeated DeleteFileList delete_files_pool = 10;
repeated spark.spark_expression.Expr residual_pool = 11;
}

// Pre-planned file scan tasks grouped by Spark partition
repeated IcebergFilePartition file_partitions = 3;
message IcebergScan {
// Common data shared across partitions (pools, metadata, catalog props)
IcebergScanCommon common = 1;

// Table metadata file path for FileIO initialization
string metadata_location = 4;

// Deduplication pools - shared data referenced by index from tasks
repeated string schema_pool = 5;
repeated string partition_type_pool = 6;
repeated string partition_spec_pool = 7;
repeated string name_mapping_pool = 8;
repeated ProjectFieldIdList project_field_ids_pool = 9;
repeated PartitionData partition_data_pool = 10;
repeated DeleteFileList delete_files_pool = 11;
repeated spark.spark_expression.Expr residual_pool = 12;
// Single partition's file scan tasks
IcebergFilePartition partition = 2;
}

// Helper message for deduplicating field ID lists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ case class CometIcebergNativeScanMetadata(
table: Any,
metadataLocation: String,
nameMapping: Option[String],
tasks: java.util.List[_],
@transient tasks: java.util.List[_],
scanSchema: Any,
tableSchema: Any,
globalFieldIdMapping: Map[String, Int],
Expand Down
26 changes: 20 additions & 6 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import scala.jdk.CollectionConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GenericInternalRow, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, PlanExpression}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, GenericArrayData, MetadataColumnHelper}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.InSubqueryExec
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
Expand Down Expand Up @@ -327,10 +328,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
case _
if scanExec.scan.getClass.getName ==
"org.apache.iceberg.spark.source.SparkBatchQueryScan" =>
if (scanExec.runtimeFilters.exists(isDynamicPruningFilter)) {
return withInfo(scanExec, "Dynamic Partition Pruning is not supported")
}

val fallbackReasons = new ListBuffer[String]()

// Native Iceberg scan requires both configs to be enabled
Expand Down Expand Up @@ -621,10 +618,27 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
!hasUnsupportedDeletes
}

// Check that all DPP subqueries use InSubqueryExec which we know how to handle.
// Future Spark versions might introduce new subquery types we haven't tested.
val dppSubqueriesSupported = {
val unsupportedSubqueries = scanExec.runtimeFilters.collect {
case DynamicPruningExpression(e) if !e.isInstanceOf[InSubqueryExec] =>
e.getClass.getSimpleName
}
if (unsupportedSubqueries.nonEmpty) {
fallbackReasons +=
s"Unsupported DPP subquery types: ${unsupportedSubqueries.mkString(", ")}. " +
"CometIcebergNativeScanExec only supports InSubqueryExec for DPP"
false
} else {
true
}
}

if (schemaSupported && fileIOCompatible && formatVersionSupported && allParquetFiles &&
allSupportedFilesystems && partitionTypesSupported &&
complexTypePredicatesSupported && transformFunctionsSupported &&
deleteFileTypesSupported) {
deleteFileTypesSupported && dppSubqueriesSupported) {
CometBatchScanExec(
scanExec.clone().asInstanceOf[BatchScanExec],
runtimeFilters = scanExec.runtimeFilters,
Expand Down
Loading
Loading