fix: broadcast exchange bypasses AQE partition coalescing#4163
fix: broadcast exchange bypasses AQE partition coalescing#4163mbutrovich merged 5 commits intoapache:mainfrom
Conversation
When a shuffle feeds into a broadcast exchange, AQE may coalesce the shuffle partitions (e.g. from 200 to 7). CometBroadcastExchangeExec was extracting the underlying plan from AQEShuffleReadExec and executing it directly, bypassing the coalescing. This caused the broadcast collect to run against all original shuffle partitions instead of the coalesced count, inflating task overhead and shuffle data. Execute through the AQEShuffleReadExec to respect partition coalescing.
Use REBALANCE so AQE actually inserts AQEShuffleReadExec, then verify the read exec's `numPartitions` driver metric reflects coalescing. The metric is only set when AQEShuffleReadExec.executeColumnar runs, so a broadcast that bypasses the wrapper leaves it at 0 and fails the test.
mbutrovich
left a comment
There was a problem hiding this comment.
Thanks @andygrove! I think this fix lines up with how vanilla Spark handles this: BroadcastExchangeExec.relationFuture just calls child.executeCollectIterator() on whatever the direct child is (sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala:182), so when AQE wraps the shuffle stage in AQEShuffleReadExec, coalescing comes for free via AQEShuffleReadExec.shuffleRDD building from partitionSpecs.toArray (AQEShuffleReadExec.scala:250-258). The previous Comet code was reaching past that wrapper to grab s.plan, which dropped the partition specs. Executing through the wrapper restores parity with Spark.
One thought worth considering: the new getByteArrayRdd(plan: SparkPlan) helper is byte-for-byte identical to CometExec.getByteArrayRdd (spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:376) apart from the parameter type. Now that the helper accepts any SparkPlan, every branch in the match block in CometBroadcastExchangeExec.scala:126-151 reduces to the same call. Could the whole match collapse to roughly:
val countsAndBytes = if (child.supportsColumnar) {
getByteArrayRdd(child).collect()
} else {
// existing RowToColumnar fallback for the LocalTableScan case
...
}That mirrors Spark's "dispatch by execution, not by type peeking" pattern and would make it harder to reintroduce the same class of bug the next time AQE adds a wrapper (skew read, local read, or some future variant). Happy to defer if there is a reason the per-case branches need to stay.
Minor: the test comment explaining the numPartitions metric mechanism is great. Might be worth noting in the PR description that the fix incidentally also restores correct behavior for skew splits and local reads, since those paths also flow through AQEShuffleReadExec. Reviewers will appreciate knowing the blast radius is broader than just coalescing.
…eeking Replace the per-wrapper-type match block with a single getByteArrayRdd(child) call. This executes through AQE wrappers (coalescing, skew splits, local reads) instead of pattern-matching past them, preventing future regressions when AQE adds new wrappers.
mbutrovich
left a comment
There was a problem hiding this comment.
I think this looks right to me now, thanks @andygrove!
Which issue does this PR close?
Closes #.
Rationale for this change
When a shuffle feeds into a broadcast exchange, AQE may coalesce the shuffle partitions (e.g. from 200 to 7). CometBroadcastExchangeExec was extracting the underlying plan from AQEShuffleReadExec and executing it directly, bypassing the coalescing. This caused the broadcast collect to run against all original shuffle partitions instead of the coalesced count, inflating task overhead and shuffle data.
Execute through the AQEShuffleReadExec to respect partition coalescing.
What changes are included in this PR?
How are these changes tested?