Skip to content

Commit 3a9f6fa

Browse files
committed
Adopt PR apache#3349's per-partition scan logic to CometNativeScan. Add DPP.
1 parent 28e13dd commit 3a9f6fa

15 files changed

Lines changed: 747 additions & 283 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -530,13 +530,6 @@ object CometConf extends ShimCometConf {
530530
.doubleConf
531531
.createWithDefault(1.0)
532532

533-
val COMET_DPP_FALLBACK_ENABLED: ConfigEntry[Boolean] =
534-
conf("spark.comet.dppFallback.enabled")
535-
.category(CATEGORY_EXEC)
536-
.doc("Whether to fall back to Spark for queries that use DPP.")
537-
.booleanConf
538-
.createWithDefault(true)
539-
540533
val COMET_DEBUG_ENABLED: ConfigEntry[Boolean] =
541534
conf("spark.comet.debug.enabled")
542535
.category(CATEGORY_EXEC)

native/core/src/execution/planner.rs

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -965,20 +965,29 @@ impl PhysicalPlanner {
965965
))
966966
}
967967
OpStruct::NativeScan(scan) => {
968-
let data_schema = convert_spark_types_to_arrow_schema(scan.data_schema.as_slice());
968+
let common = scan
969+
.common
970+
.as_ref()
971+
.ok_or_else(|| GeneralError("NativeScan missing common data".to_string()))?;
972+
973+
let data_schema =
974+
convert_spark_types_to_arrow_schema(common.data_schema.as_slice());
969975
let required_schema: SchemaRef =
970-
convert_spark_types_to_arrow_schema(scan.required_schema.as_slice());
976+
convert_spark_types_to_arrow_schema(common.required_schema.as_slice());
971977
let partition_schema: SchemaRef =
972-
convert_spark_types_to_arrow_schema(scan.partition_schema.as_slice());
973-
let projection_vector: Vec<usize> = scan
978+
convert_spark_types_to_arrow_schema(common.partition_schema.as_slice());
979+
let projection_vector: Vec<usize> = common
974980
.projection_vector
975981
.iter()
976982
.map(|offset| *offset as usize)
977983
.collect();
978984

979-
// Check if this partition has any files (bucketed scan with bucket pruning may have empty partitions)
980-
let partition_files = &scan.file_partitions[self.partition as usize];
985+
// Get this partition's files (injected at execution time)
986+
let partition_files = scan.file_partition.as_ref().ok_or_else(|| {
987+
GeneralError("NativeScan missing file_partition data".to_string())
988+
})?;
981989

990+
// Check if this partition has any files (bucketed scan with bucket pruning may have empty partitions)
982991
if partition_files.partitioned_file.is_empty() {
983992
let empty_exec = Arc::new(EmptyExec::new(required_schema));
984993
return Ok((
@@ -988,19 +997,19 @@ impl PhysicalPlanner {
988997
}
989998

990999
// Convert the Spark expressions to Physical expressions
991-
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
1000+
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = common
9921001
.data_filters
9931002
.iter()
9941003
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema)))
9951004
.collect();
9961005

997-
let default_values: Option<HashMap<usize, ScalarValue>> = if !scan
1006+
let default_values: Option<HashMap<usize, ScalarValue>> = if !common
9981007
.default_values
9991008
.is_empty()
10001009
{
10011010
// We have default values. Extract the two lists (same length) of values and
10021011
// indexes in the schema, and then create a HashMap to use in the SchemaMapper.
1003-
let default_values: Result<Vec<ScalarValue>, DataFusionError> = scan
1012+
let default_values: Result<Vec<ScalarValue>, DataFusionError> = common
10041013
.default_values
10051014
.iter()
10061015
.map(|expr| {
@@ -1015,7 +1024,7 @@ impl PhysicalPlanner {
10151024
})
10161025
.collect();
10171026
let default_values = default_values?;
1018-
let default_values_indexes: Vec<usize> = scan
1027+
let default_values_indexes: Vec<usize> = common
10191028
.default_values_indexes
10201029
.iter()
10211030
.map(|offset| *offset as usize)
@@ -1037,7 +1046,7 @@ impl PhysicalPlanner {
10371046
.map(|f| f.file_path.clone())
10381047
.expect("partition should have files after empty check");
10391048

1040-
let object_store_options: HashMap<String, String> = scan
1049+
let object_store_options: HashMap<String, String> = common
10411050
.object_store_options
10421051
.iter()
10431052
.map(|(k, v)| (k.clone(), v.clone()))
@@ -1048,10 +1057,7 @@ impl PhysicalPlanner {
10481057
&object_store_options,
10491058
)?;
10501059

1051-
// Comet serializes all partitions' PartitionedFiles, but we only want to read this
1052-
// Spark partition's PartitionedFiles
1053-
let files =
1054-
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
1060+
let files = self.get_partitioned_files(partition_files)?;
10551061
let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
10561062
let partition_fields: Vec<Field> = partition_schema
10571063
.fields()
@@ -1060,7 +1066,7 @@ impl PhysicalPlanner {
10601066
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
10611067
})
10621068
.collect_vec();
1063-
let scan = init_datasource_exec(
1069+
let datasource_exec = init_datasource_exec(
10641070
required_schema,
10651071
Some(data_schema),
10661072
Some(partition_schema),
@@ -1070,14 +1076,14 @@ impl PhysicalPlanner {
10701076
Some(projection_vector),
10711077
Some(data_filters?),
10721078
default_values,
1073-
scan.session_timezone.as_str(),
1074-
scan.case_sensitive,
1079+
common.session_timezone.as_str(),
1080+
common.case_sensitive,
10751081
self.session_ctx(),
1076-
scan.encryption_enabled,
1082+
common.encryption_enabled,
10771083
)?;
10781084
Ok((
10791085
vec![],
1080-
Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),
1086+
Arc::new(SparkPlan::new(spark_plan.plan_id, datasource_exec, vec![])),
10811087
))
10821088
}
10831089
OpStruct::CsvScan(scan) => {

native/proto/src/proto/operator.proto

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ message Scan {
8585
bool arrow_ffi_safe = 3;
8686
}
8787

88-
message NativeScan {
88+
// Common data shared across all partitions for NativeScan (sent once via commonBytes)
89+
message NativeScanCommon {
8990
repeated spark.spark_expression.DataType fields = 1;
9091
// The source of the scan (e.g. file scan, broadcast exchange, shuffle, etc). This
9192
// is purely for informational purposes when viewing native query plans in
@@ -95,20 +96,30 @@ message NativeScan {
9596
repeated SparkStructField data_schema = 4;
9697
repeated SparkStructField partition_schema = 5;
9798
repeated spark.spark_expression.Expr data_filters = 6;
98-
repeated SparkFilePartition file_partitions = 7;
99-
repeated int64 projection_vector = 8;
100-
string session_timezone = 9;
101-
repeated spark.spark_expression.Expr default_values = 10;
102-
repeated int64 default_values_indexes = 11;
103-
bool case_sensitive = 12;
99+
repeated int64 projection_vector = 7;
100+
string session_timezone = 8;
101+
repeated spark.spark_expression.Expr default_values = 9;
102+
repeated int64 default_values_indexes = 10;
103+
bool case_sensitive = 11;
104104
// Options for configuring object stores such as AWS S3, GCS, etc. The key-value pairs are taken
105105
// from Hadoop configuration for compatibility with Hadoop FileSystem implementations of object
106106
// stores.
107107
// The configuration values have hadoop. or spark.hadoop. prefix trimmed. For instance, the
108108
// configuration value "spark.hadoop.fs.s3a.access.key" will be stored as "fs.s3a.access.key" in
109109
// the map.
110-
map<string, string> object_store_options = 13;
111-
bool encryption_enabled = 14;
110+
map<string, string> object_store_options = 12;
111+
bool encryption_enabled = 13;
112+
113+
// Unique identifier for this scan, used to match planning data at execution time
114+
string scan_id = 14;
115+
}
116+
117+
message NativeScan {
118+
// Common data shared across partitions
119+
NativeScanCommon common = 1;
120+
121+
// This partition's files only (injected at execution time by NativePlanDataInjector)
122+
SparkFilePartition file_partition = 2;
112123
}
113124

114125
message CsvScan {

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,8 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
180180
// spotless:on
181181
private def transform(plan: SparkPlan): SparkPlan = {
182182
def convertNode(op: SparkPlan): SparkPlan = op match {
183-
// Fully native scan for V1
184-
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION =>
185-
convertToComet(scan, CometNativeScan).getOrElse(scan)
183+
// CometNativeScanExec is created directly by CometScanRule and handles its own execution
184+
// No conversion needed here - it passes through unchanged
186185

187186
// Fully native Iceberg scan for V2 (iceberg-rust path)
188187
// Only handle scans with native metadata; SupportsComet scans fall through to isCometScan

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpre
3232
import org.apache.spark.sql.catalyst.rules.Rule
3333
import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, GenericArrayData, MetadataColumnHelper}
3434
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
35-
import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
35+
import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec}
3636
import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SparkPlan, SubqueryAdaptiveBroadcastExec}
3737
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
3838
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
@@ -50,6 +50,7 @@ import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflecti
5050
import org.apache.comet.objectstore.NativeConfig
5151
import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet}
5252
import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported}
53+
import org.apache.comet.serde.OperatorOuterClass
5354
import org.apache.comet.serde.operator.CometNativeScan
5455
import org.apache.comet.shims.{CometTypeShim, ShimFileFormat, ShimSubqueryBroadcast}
5556

@@ -140,10 +141,9 @@ case class CometScanRule(session: SparkSession)
140141

141142
private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
142143

143-
if (COMET_DPP_FALLBACK_ENABLED.get() &&
144-
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
145-
return withInfo(scanExec, "Dynamic Partition Pruning is not supported")
146-
}
144+
// Check for DPP - only some scan implementations support it
145+
val dppFilters = scanExec.partitionFilters.filter(isDynamicPruningFilter)
146+
val hasDPP = dppFilters.nonEmpty
147147

148148
scanExec.relation match {
149149
case r: HadoopFsRelation =>
@@ -170,13 +170,23 @@ case class CometScanRule(session: SparkSession)
170170
COMET_NATIVE_SCAN_IMPL.get() match {
171171
case SCAN_AUTO =>
172172
// TODO add support for native_datafusion in the future
173+
if (hasDPP) {
174+
return withInfo(scanExec, "Dynamic Partition Pruning is not supported")
175+
}
173176
nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
174177
.getOrElse(scanExec)
175178
case SCAN_NATIVE_DATAFUSION =>
179+
// native_datafusion supports DPP
176180
nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
177181
case SCAN_NATIVE_ICEBERG_COMPAT =>
182+
if (hasDPP) {
183+
return withInfo(scanExec, "Dynamic Partition Pruning is not supported")
184+
}
178185
nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
179186
case SCAN_NATIVE_COMET =>
187+
if (hasDPP) {
188+
return withInfo(scanExec, "Dynamic Partition Pruning is not supported")
189+
}
180190
nativeCometScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
181191
}
182192

@@ -213,7 +223,12 @@ case class CometScanRule(session: SparkSession)
213223
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
214224
return None
215225
}
216-
Some(CometScanExec(scanExec, session, SCAN_NATIVE_DATAFUSION))
226+
227+
// Create placeholder NativeScan operator
228+
val builder = OperatorOuterClass.Operator.newBuilder()
229+
CometNativeScan.convert(scanExec, builder).map { nativeOp =>
230+
CometNativeScanExec(nativeOp, scanExec)
231+
}
217232
}
218233

219234
private def nativeIcebergCompatScan(

0 commit comments

Comments
 (0)