Skip to content

Commit 3ed80e9

Browse files
authored
feat: AQE DPP for native Parquet scans with broadcast reuse (#4112)
1 parent 050320d commit 3ed80e9

27 files changed

Lines changed: 2342 additions & 186 deletions

dev/diffs/3.4.3.diff

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ index daef11ae4d6..9f3cc9181f2 100644
417417
assert(exchanges.size == 2)
418418
}
419419
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
420-
index f33432ddb6f..914afa6b01d 100644
420+
index f33432ddb6f..b375e285dde 100644
421421
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
422422
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
423423
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -477,35 +477,15 @@ index f33432ddb6f..914afa6b01d 100644
477477

478478
assert(countSubqueryBroadcasts == 1)
479479
assert(countReusedSubqueryBroadcasts == 1)
480-
@@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase
481-
}
482-
483-
test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
484-
- "canonicalization and exchange reuse") {
485-
+ "canonicalization and exchange reuse",
486-
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
487-
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
488-
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
489-
val df = sql(
490-
@@ -1423,7 +1440,8 @@ abstract class DynamicPartitionPruningSuiteBase
491-
}
492-
}
493-
494-
- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
495-
+ test("SPARK-34637: DPP side broadcast query stage is created firstly",
496-
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
497-
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
498-
val df = sql(
499-
""" WITH v as (
500-
@@ -1577,6 +1595,7 @@ abstract class DynamicPartitionPruningSuiteBase
480+
@@ -1577,6 +1593,7 @@ abstract class DynamicPartitionPruningSuiteBase
501481

502482
val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
503483
case s: SubqueryBroadcastExec => s
504484
+ case s: CometSubqueryBroadcastExec => s
505485
}
506486
assert(subqueryBroadcastExecs.size === 1)
507487
subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
508-
@@ -1729,6 +1748,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
488+
@@ -1729,6 +1746,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
509489
case s: BatchScanExec =>
510490
// we use f1 col for v2 tables due to schema pruning
511491
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))

dev/diffs/3.5.8.diff

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ index c4fb4fa943c..a04b23870a8 100644
398398
assert(exchanges.size == 2)
399399
}
400400
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
401-
index f33432ddb6f..914afa6b01d 100644
401+
index f33432ddb6f..b375e285dde 100644
402402
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
403403
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
404404
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -458,35 +458,15 @@ index f33432ddb6f..914afa6b01d 100644
458458

459459
assert(countSubqueryBroadcasts == 1)
460460
assert(countReusedSubqueryBroadcasts == 1)
461-
@@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase
462-
}
463-
464-
test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
465-
- "canonicalization and exchange reuse") {
466-
+ "canonicalization and exchange reuse",
467-
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
468-
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
469-
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
470-
val df = sql(
471-
@@ -1423,7 +1440,8 @@ abstract class DynamicPartitionPruningSuiteBase
472-
}
473-
}
474-
475-
- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
476-
+ test("SPARK-34637: DPP side broadcast query stage is created firstly",
477-
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
478-
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
479-
val df = sql(
480-
""" WITH v as (
481-
@@ -1577,6 +1595,7 @@ abstract class DynamicPartitionPruningSuiteBase
461+
@@ -1577,6 +1593,7 @@ abstract class DynamicPartitionPruningSuiteBase
482462

483463
val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
484464
case s: SubqueryBroadcastExec => s
485465
+ case s: CometSubqueryBroadcastExec => s
486466
}
487467
assert(subqueryBroadcastExecs.size === 1)
488468
subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
489-
@@ -1729,6 +1748,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
469+
@@ -1729,6 +1746,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
490470
case s: BatchScanExec =>
491471
// we use f1 col for v2 tables due to schema pruning
492472
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))

dev/diffs/4.0.2.diff

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ index 81713c777bc..b5f92ed9742 100644
535535
assert(exchanges.size == 2)
536536
}
537537
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
538-
index 2c24cc7d570..8c214e7d05c 100644
538+
index 2c24cc7d570..12d897866da 100644
539539
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
540540
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
541541
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -595,35 +595,15 @@ index 2c24cc7d570..8c214e7d05c 100644
595595

596596
assert(countSubqueryBroadcasts == 1)
597597
assert(countReusedSubqueryBroadcasts == 1)
598-
@@ -1215,7 +1231,8 @@ abstract class DynamicPartitionPruningSuiteBase
599-
}
600-
601-
test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
602-
- "canonicalization and exchange reuse") {
603-
+ "canonicalization and exchange reuse",
604-
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
605-
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
606-
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
607-
val df = sql(
608-
@@ -1424,7 +1441,8 @@ abstract class DynamicPartitionPruningSuiteBase
609-
}
610-
}
611-
612-
- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
613-
+ test("SPARK-34637: DPP side broadcast query stage is created firstly",
614-
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/4045")) {
615-
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
616-
val df = sql(
617-
""" WITH v as (
618-
@@ -1578,6 +1596,7 @@ abstract class DynamicPartitionPruningSuiteBase
598+
@@ -1578,6 +1594,7 @@ abstract class DynamicPartitionPruningSuiteBase
619599

620600
val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
621601
case s: SubqueryBroadcastExec => s
622602
+ case s: CometSubqueryBroadcastExec => s
623603
}
624604
assert(subqueryBroadcastExecs.size === 1)
625605
subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
626-
@@ -1730,6 +1749,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
606+
@@ -1730,6 +1747,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
627607
case s: BatchScanExec =>
628608
// we use f1 col for v2 tables due to schema pruning
629609
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution._
3232
import org.apache.spark.sql.internal.SQLConf
3333

3434
import org.apache.comet.CometConf._
35-
import org.apache.comet.rules.{CometExecRule, CometReuseSubquery, CometScanRule, EliminateRedundantTransitions}
35+
import org.apache.comet.rules.{CometExecRule, CometPlanAdaptiveDynamicPruningFilters, CometReuseSubquery, CometScanRule, CometSpark34AqeDppFallbackRule, EliminateRedundantTransitions}
3636
import org.apache.comet.shims.ShimCometSparkSessionExtensions
3737

3838
/**
@@ -43,34 +43,44 @@ import org.apache.comet.shims.ShimCometSparkSessionExtensions
4343
*
4444
* Non-AQE (QueryExecution.preparations):
4545
* {{{
46-
* 1. PlanDynamicPruningFilters -- Spark creates DPP filters
46+
* 1. PlanDynamicPruningFilters -- Spark creates non-AQE DPP (SubqueryBroadcastExec)
4747
* 2. PlanSubqueries -- Spark creates SubqueryExec for scalar subqueries
4848
* 3. EnsureRequirements -- Spark inserts shuffles/sorts
4949
* 4. ApplyColumnarRulesAndInsertTransitions:
50-
* a. preColumnarTransitions: CometScanRule, CometExecRule (replace Spark -> Comet nodes)
50+
* a. preColumnarTransitions: CometScanRule, CometExecRule
51+
* - CometExecRule.convertSubqueryBroadcasts converts SubqueryBroadcastExec to
52+
* CometSubqueryBroadcastExec for exchange reuse with Comet broadcasts
5153
* b. insertTransitions: ColumnarToRow/RowToColumnar added
5254
* c. postColumnarTransitions: EliminateRedundantTransitions
5355
* 5. ReuseExchangeAndSubquery -- Spark deduplicates subqueries (sees Comet nodes)
5456
* }}}
5557
*
56-
* AQE (AdaptiveSparkPlanExec):
58+
* AQE (AdaptiveSparkPlanExec, Spark 3.5+):
5759
* {{{
5860
* Initial plan:
59-
* queryStagePreparationRules: CometScanRule, CometExecRule (replace Spark -> Comet nodes)
61+
* PlanAdaptiveSubqueries: creates SubqueryAdaptiveBroadcastExec (SAB) for AQE DPP
62+
* queryStagePreparationRules: CometScanRule, CometExecRule
63+
* - CometExecRule.convertSubqueryBroadcasts wraps SABs in
64+
* CometSubqueryAdaptiveBroadcastExec to prevent Spark's
65+
* PlanAdaptiveDynamicPruningFilters from replacing DPP with Literal.TrueLiteral
6066
*
6167
* Per stage (optimizeQueryStage + postStageCreationRules):
62-
* 1. queryStageOptimizerRules: ReuseAdaptiveSubquery, CometReuseSubquery
68+
* 1. queryStageOptimizerRules:
69+
* a. PlanAdaptiveDynamicPruningFilters (Spark) -- skips wrapped SABs
70+
* b. ReuseAdaptiveSubquery (Spark)
71+
* c. CometPlanAdaptiveDynamicPruningFilters -- converts wrapped SABs to
72+
* CometSubqueryBroadcastExec with BroadcastQueryStageExec for broadcast reuse
73+
* d. CometReuseSubquery -- deduplicates converted subqueries
6374
* 2. postStageCreationRules -> ApplyColumnarRulesAndInsertTransitions:
6475
* a. preColumnarTransitions: CometScanRule, CometExecRule (no-ops, already converted)
6576
* b. insertTransitions
6677
* c. postColumnarTransitions: EliminateRedundantTransitions
6778
* }}}
6879
*
69-
* CometReuseSubquery is needed in AQE because Spark's ReuseAdaptiveSubquery may run before
70-
* Comet's node replacements in the initial plan construction, and the replacements can disrupt
71-
* subquery reuse that was already applied. The shim-based registration
72-
* (injectQueryStageOptimizerRuleShim) handles API availability: Spark 3.5+ has
73-
* injectQueryStageOptimizerRule, Spark 3.4 does not (no-op).
80+
* On Spark 3.4, injectQueryStageOptimizerRule is unavailable. CometExecRule does not wrap SABs,
81+
* and CometPlanAdaptiveDynamicPruningFilters/CometReuseSubquery are not registered. AQE DPP scans
82+
* fall back to Spark so that Spark's PlanAdaptiveDynamicPruningFilters handles them natively
83+
* (with DPP).
7484
*/
7585
class CometSparkSessionExtensions
7686
extends (SparkSessionExtensions => Unit)
@@ -79,8 +89,13 @@ class CometSparkSessionExtensions
7989
override def apply(extensions: SparkSessionExtensions): Unit = {
8090
extensions.injectColumnar { session => CometScanColumnar(session) }
8191
extensions.injectColumnar { session => CometExecColumnar(session) }
92+
// Pre-3.5 only: tag AQE DPP regions so the conversion rules below leave them Spark-native.
93+
// Registered before CometScanRule/CometExecRule so tags are in place when conversion runs.
94+
// No-op on Spark 3.5+; see CometSpark34AqeDppFallbackRule's class docstring.
95+
injectPreSpark35QueryStagePrepRuleShim(extensions, CometSpark34AqeDppFallbackRule)
8296
extensions.injectQueryStagePrepRule { session => CometScanRule(session) }
8397
extensions.injectQueryStagePrepRule { session => CometExecRule(session) }
98+
injectQueryStageOptimizerRuleShim(extensions, CometPlanAdaptiveDynamicPruningFilters)
8499
injectQueryStageOptimizerRuleShim(extensions, CometReuseSubquery)
85100
}
86101

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,13 @@ object CometExecRule {
9696
val SKIP_COMET_SHUFFLE_TAG: org.apache.spark.sql.catalyst.trees.TreeNodeTag[Unit] =
9797
org.apache.spark.sql.catalyst.trees.TreeNodeTag[Unit]("comet.skipCometShuffle")
9898

99+
/**
100+
* Tag set on a `BroadcastExchangeExec` that should be left as a plain Spark broadcast rather
101+
* than converted to `CometBroadcastExchangeExec`. Written by [[CometSpark34AqeDppFallbackRule]]
102+
* on Spark < 3.5. See that rule's class docstring for the rationale.
103+
*/
104+
val SKIP_COMET_BROADCAST_TAG: org.apache.spark.sql.catalyst.trees.TreeNodeTag[Unit] =
105+
org.apache.spark.sql.catalyst.trees.TreeNodeTag[Unit]("comet.skipCometBroadcast")
99106
}
100107

101108
/**
@@ -297,6 +304,11 @@ case class CometExecRule(session: SparkSession)
297304
// broadcast exchange is forced to be enabled by Comet config.
298305
case plan if plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) =>
299306
val newChildren = plan.children.map {
307+
// Tagged by CometSpark34AqeDppFallbackRule on Spark < 3.5 to keep the build-side
308+
// broadcast Spark-native so Spark's PlanAdaptiveDynamicPruningFilters can match it.
309+
case b: BroadcastExchangeExec
310+
if b.getTagValue(CometExecRule.SKIP_COMET_BROADCAST_TAG).isDefined =>
311+
b
300312
case b: BroadcastExchangeExec if b.children.forall(_.isInstanceOf[CometNativeExec]) =>
301313
convertToComet(b, CometBroadcastExchangeExec).getOrElse(b)
302314
case other => other
@@ -381,18 +393,22 @@ case class CometExecRule(session: SparkSession)
381393
}
382394

383395
/**
384-
* Replace SubqueryBroadcastExec with CometSubqueryBroadcastExec in a node's expressions.
396+
* Replace SubqueryBroadcastExec with CometSubqueryBroadcastExec in a node's expressions
397+
* (non-AQE DPP), and wrap SubqueryAdaptiveBroadcastExec in CometSubqueryAdaptiveBroadcastExec
398+
* (AQE DPP) to protect it from Spark's PlanAdaptiveDynamicPruningFilters.
385399
*
386-
* When CometExecRule converts BroadcastExchangeExec to CometBroadcastExchangeExec on the join
387-
* side, the DPP subquery still references the original BroadcastExchangeExec.
400+
* Non-AQE DPP: When CometExecRule converts BroadcastExchangeExec to CometBroadcastExchangeExec
401+
* on the join side, the DPP subquery still references the original BroadcastExchangeExec.
388402
* ReuseExchangeAndSubquery (which runs after Comet rules) can't match them because they have
389403
* different types. By replacing SubqueryBroadcastExec with CometSubqueryBroadcastExec (which
390404
* wraps a CometBroadcastExchangeExec), both sides have the same exchange type and reuse works.
391405
*
392-
* The BroadcastExchangeExec in the subquery has a CometNativeColumnarToRowExec child (inserted
393-
* by ApplyColumnarRulesAndInsertTransitions because BroadcastExchangeExec expects row input).
394-
* We strip this transition and create CometBroadcastExchangeExec with the underlying Comet plan
395-
* directly.
406+
* AQE DPP: Spark's PlanAdaptiveDynamicPruningFilters (queryStageOptimizerRule) pattern-matches
407+
* on SubqueryAdaptiveBroadcastExec. When it can't find BroadcastHashJoinExec (Comet replaced
408+
* it), it replaces DPP with Literal.TrueLiteral. We wrap SABs in
409+
* CometSubqueryAdaptiveBroadcastExec to prevent this. CometPlanAdaptiveDynamicPruningFilters (a
410+
* later queryStageOptimizerRule) unwraps and converts them with access to the materialized
411+
* BroadcastQueryStageExec.
396412
*/
397413
private def convertSubqueryBroadcasts(plan: SparkPlan): SparkPlan = {
398414
plan.transformExpressionsUp { case inSub: InSubqueryExec =>
@@ -422,6 +438,32 @@ case class CometExecRule(session: SparkSession)
422438
}
423439
case _ => inSub
424440
}
441+
case sab: SubqueryAdaptiveBroadcastExec if isSpark35Plus =>
442+
// Wrap SABs to prevent Spark's PlanAdaptiveDynamicPruningFilters from
443+
// converting them to Literal.TrueLiteral. Spark's rule pattern-matches for
444+
// BroadcastHashJoinExec, which Comet replaced with CometBroadcastHashJoinExec.
445+
// Without wrapping, DPP is disabled for both Comet native scans and non-Comet
446+
// scans (e.g., V2 BatchScan). CometPlanAdaptiveDynamicPruningFilters
447+
// (queryStageOptimizerRule, 3.5+) unwraps and converts them later.
448+
//
449+
// On Spark 3.4, injectQueryStageOptimizerRule is unavailable. The isSpark35Plus
450+
// guard leaves SABs unwrapped; CometSpark34AqeDppFallbackRule then tags the
451+
// matching BHJ's build broadcast so Spark's rule can match it natively.
452+
assert(
453+
sab.buildKeys.nonEmpty,
454+
s"SubqueryAdaptiveBroadcastExec '${sab.name}' has empty buildKeys")
455+
logInfo(
456+
s"Wrapping SubqueryAdaptiveBroadcastExec '${sab.name}' in " +
457+
"CometSubqueryAdaptiveBroadcastExec to preserve AQE DPP")
458+
val indices = getSubqueryBroadcastIndices(sab)
459+
val wrapped = CometSubqueryAdaptiveBroadcastExec(
460+
sab.name,
461+
indices,
462+
sab.onlyInBroadcast,
463+
sab.buildPlan,
464+
sab.buildKeys,
465+
sab.child)
466+
inSub.withNewPlan(wrapped)
425467
case _ => inSub
426468
}
427469
}

0 commit comments

Comments
 (0)