Skip to content

feat: non-AQE DPP for native Parquet scans, broadcast exchange reuse for DPP subqueries#4011

Open
mbutrovich wants to merge 20 commits intoapache:mainfrom
mbutrovich:dpp
Open

feat: non-AQE DPP for native Parquet scans, broadcast exchange reuse for DPP subqueries#4011
mbutrovich wants to merge 20 commits intoapache:mainfrom
mbutrovich:dpp

Conversation

@mbutrovich
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich commented Apr 20, 2026

Which issue does this PR close?

Partially addresses #3510. Closes #4014. Related to #121. Closes #242.

Rationale for this change

CometNativeScanExec (the native DataFusion-based Parquet V1 scan) falls back to Spark for queries that use Dynamic Partition Pruning (DPP), even though the non-AQE DPP path works through the existing lazy partition serialization infrastructure from #3511 and #3349.

Spark has two DPP paths:

Rule When it runs Subquery type Comet compatible?
PlanDynamicPruningFilters Before Comet rules SubqueryBroadcastExec / SubqueryExec Yes (this PR)
PlanAdaptiveDynamicPruningFilters After Comet rules (inside AQE) SubqueryAdaptiveBroadcastExec No (tracked in #3510)

The prior gate rejected both. This PR narrows it to only reject AQE DPP.

What changes are included in this PR?

DPP gate changes

  • Added isAqeDynamicPruningFilter to distinguish AQE from non-AQE DPP by checking for SubqueryAdaptiveBroadcastExec inside InSubqueryExec.
  • CometScanRule.transformV1Scan() and CometNativeScan.isSupported() now reject only AQE DPP.
  • Removed spark.comet.dppFallback.enabled config — redundant since isSupported() unconditionally rejects AQE DPP.

DPP subquery resolution

  • CometNativeScanExec.serializedPartitionData explicitly resolves DPP subqueries via updateResult() before accessing file partitions. This is needed because serializedPartitionData can be triggered from findAllPlanData on a BroadcastExchangeExec thread, outside the normal prepare() -> executeSubqueries() flow.
  • CometNativeScanExec.outputPartitioning uses perPartitionData.length instead of originalPlan.inputRDD.getNumPartitions to avoid triggering FileSourceScanExec codegen on unresolved DPP expressions.
  • CometNativeScan.convert() uses scan.selectedPartitions (static) instead of scan.getFilePartitions() for object store option extraction, since DPP subqueries aren't resolved at planning time.

Broadcast exchange reuse (CometSubqueryBroadcastExec)

ReuseExchangeAndSubquery runs after Comet rules. Without intervention, the join side has CometBroadcastExchangeExec and the DPP subquery has BroadcastExchangeExec — different types, no reuse, double broadcast.

CometSubqueryBroadcastExec replaces SubqueryBroadcastExec in DPP expressions. It wraps a CometBroadcastExchangeExec and decodes Arrow broadcast data to extract DPP key values. CometExecRule.convertSubqueryBroadcasts() handles the conversion during transformUp, stripping the CometNativeColumnarToRowExec transition that ApplyColumnarRulesAndInsertTransitions inserted for the original BroadcastExchangeExec. Both sides now have CometBroadcastExchangeExec, so ReuseExchangeAndSubquery matches them.

Shuffle fallback

stageContainsDPPScan in CometShuffleExchangeExec checks for FileSourceScanExec with DPP. Scans converted to CometNativeScanExec don't match, so the shuffle proceeds natively. The COMET_DPP_FALLBACK_ENABLED config gate was removed; the check now always runs.

How are these changes tested?

  • "non-AQE DPP with BHJ works with CometNativeScanExec" — verifies correct results, CometNativeScanExec in plan, DynamicPruningExpression in partition filters
  • "non-AQE DPP with SMJ works with CometNativeScanExec" — same for sort-merge join path
  • "non-AQE DPP with BHJ reuses broadcast exchange" — verifies CometSubqueryBroadcastExec, ReusedExchangeExec, and exactly 1 CometBroadcastExchangeExec
  • Existing "DPP fallback" and "DPP fallback avoids inefficient Comet shuffle" tests updated for new message text and config removal
  • CometDppFallbackRepro3949Suite and CometShuffleFallbackStickinessSuite pass with config references removed
  • TPCDS golden plans regenerated
  • Spark SQL test diffs updated for 3.4.3, 3.5.8, and 4.0.1:
    • CometNativeScanExec added to collectDynamicPruningExpressions in DynamicPartitionPruningSuite and DynamicPartitionPruningHiveScanSuite
    • CometSubqueryBroadcastExec added to checkPartitionPruningPredicate, checkDistinctSubqueries, countSubqueryBroadcasts, countReusedSubqueryBroadcasts, and SPARK-38674 test
    • CometNativeScanExec / CometNativeColumnarToRowExec added to SubquerySuite SPARK-26893 test
    • RemoveRedundantProjectsSuite "join with ordering requirement" unignored (exchange reuse now works)

TPC-DS native acceleration improvement (Spark 3.5, v1.4)

78 queries improved. Previously DPP queries fell back entirely; now they run natively.

Query Main PR Change
q1 36% 96% +60%
q4 31% 95% +64%
q5 32% 94% +62%
q7 45% 97% +52%
q8 66% 97% +31%
q10 38% 70% +32%
q11 32% 95% +63%
q12 44% 85% +41%
q13 44% 97% +53%
q14a 35% 94% +59%
q14b 38% 93% +55%
q15 42% 96% +54%
q17 38% 95% +57%
q18 44% 97% +53%
q20 44% 85% +41%
q21 37% 96% +59%
q22 41% 96% +55%
q23a 60% 95% +35%
q23b 68% 95% +27%
q25 38% 95% +57%
q26 45% 97% +52%
q27 44% 97% +53%
q29 40% 95% +55%
q30 39% 96% +57%
q31 31% 95% +64%
q32 36% 95% +59%
q33 49% 97% +48%
q34 48% 97% +49%
q35 38% 70% +32%
q36 44% 88% +44%
q37 50% 96% +46%
q38 53% 95% +42%
q39a 40% 96% +56%
q39b 40% 96% +56%
q40 88% 97% +9%
q45 43% 80% +37%
q46 44% 97% +53%
q47 37% 80% +43%
q48 45% 96% +51%
q49 37% 78% +41%
q50 42% 96% +54%
q51 48% 82% +34%
q53 42% 84% +42%
q54 53% 93% +40%
q56 51% 97% +46%
q57 37% 80% +43%
q58 53% 93% +40%
q6 67% 94% +27%
q60 51% 97% +46%
q61 43% 94% +51%
q63 42% 84% +42%
q64 94% 99% +5%
q65 35% 96% +61%
q66 40% 97% +57%
q67 40% 83% +43%
q68 44% 97% +53%
q69 39% 72% +33%
q70 33% 68% +35%
q71 42% 94% +52%
q72 54% 98% +44%
q73 48% 97% +49%
q74 32% 95% +63%
q75 66% 96% +30%
q77 33% 89% +56%
q78 84% 93% +9%
q79 45% 97% +52%
q80 92% 97% +5%
q81 39% 96% +57%
q82 50% 96% +46%
q83 59% 97% +38%
q85 46% 98% +52%
q86 42% 85% +43%
q87 42% 86% +44%
q89 42% 84% +42%
q91 48% 97% +49%
q92 36% 95% +59%
q97 60% 94% +34%
q98 51% 89% +38%

@mbutrovich mbutrovich changed the title feat: support non-AQE Dynamic Partition Pruning for CometNativeScanExec (Parquet V1) feat: support non-AQE Dynamic Partition Pruning for CometNativeScan Apr 21, 2026
@mbutrovich mbutrovich self-assigned this Apr 21, 2026
@mbutrovich mbutrovich added enhancement New feature or request native_datafusion Specific to native_datafusion scan type area:scan Parquet scan / data reading labels Apr 21, 2026
@mbutrovich mbutrovich changed the title feat: support non-AQE Dynamic Partition Pruning for CometNativeScan feat: support non-AQE Dynamic Partition Pruning (DPP) for CometNativeScan Apr 21, 2026
@mbutrovich mbutrovich changed the title feat: support non-AQE Dynamic Partition Pruning (DPP) for CometNativeScan feat: non-AQE DPP for native Parquet scans, broadcast exchange reuse for DPP subqueries Apr 21, 2026
Copy link
Copy Markdown
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is amazing! Thank you @mbutrovich

🔥

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:scan Parquet scan / data reading enhancement New feature or request native_datafusion Specific to native_datafusion scan type

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Exchange reuse broken when CometExecRule converts BroadcastExchangeExec after ReuseExchangeAndSubquery Support SubqueryBroadcastExec in Comet

2 participants