Skip to content

Commit 4b5d992

Browse files
committed
fix: narrow partial aggregate tag lookup and regenerate TPC-DS golden files
findPartialAggInPlan was using a deep tree traversal that matched partial aggregates separated from the final by other aggregate stages. For Spark's distinct-aggregate rewrite, the partial for non-distinct aggs feeds into a PartialMerge stage rather than directly into the final, so tagging it as unsafe is incorrect and hijacks the natural 'Unsupported aggregation mode PartialMerge' fallback reason. Walk only through exchanges and AQE stages. Also regenerate TPC-DS plan-stability golden files for Spark 3.4, 3.5, and 4.0 to reflect the branch's new safe-mixed-execution behavior where the final aggregate converts to Comet when all aggregate functions have compatible intermediate buffer formats.
1 parent 9826403 commit 4b5d992

47 files changed

Lines changed: 599 additions & 609 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.claude/scheduled_tasks.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"sessionId":"2f25d6dc-76b7-436d-84f7-1d1c08cd0bd4","pid":70968,"acquiredAt":1776748793310}

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -627,20 +627,25 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
627627
*/
628628
private def tagUnsafePartialAggregates(plan: SparkPlan): Unit = {
629629
plan.foreach {
630-
case agg: BaseAggregateExec if agg.aggregateExpressions.exists(_.mode == Final) =>
631-
if (!QueryPlanSerde.allAggsSupportMixedExecution(agg.aggregateExpressions)) {
632-
if (!canAggregateBeConverted(agg, Final)) {
633-
findPartialAggInPlan(agg.child).foreach { partial =>
634-
// Only tag if the Partial would otherwise have been converted. If the Partial
635-
// itself cannot be converted (e.g. the aggregate function is incompatible for the
636-
// input type), there is no buffer-format mismatch to guard against, and tagging
637-
// would mask the natural, more specific fallback reason.
638-
if (canAggregateBeConverted(partial, Partial)) {
639-
partial.setTagValue(
640-
CometExecRule.COMET_UNSAFE_PARTIAL,
641-
"Partial aggregate disabled: corresponding final aggregate " +
642-
"cannot be converted to Comet and intermediate buffer formats are incompatible")
643-
}
630+
case agg: BaseAggregateExec =>
631+
// Only consider single-mode Final aggregates. Multi-mode Finals come from Spark's
632+
// distinct-aggregate rewrite, where the Comet partial (if any) feeds into a Spark
633+
// PartialMerge rather than directly into a Final, which is a different code path
634+
// than the Comet-Partial → Spark-Final crash scenario from issue #1389.
635+
val modes = agg.aggregateExpressions.map(_.mode).distinct
636+
if (modes == Seq(Final) &&
637+
!QueryPlanSerde.allAggsSupportMixedExecution(agg.aggregateExpressions) &&
638+
!canAggregateBeConverted(agg, Final)) {
639+
findPartialAggInPlan(agg.child).foreach { partial =>
640+
// Only tag if the Partial would otherwise have been converted. If the Partial
641+
// itself cannot be converted (e.g. the aggregate function is incompatible for the
642+
// input type), there is no buffer-format mismatch to guard against, and tagging
643+
// would mask the natural, more specific fallback reason.
644+
if (canAggregateBeConverted(partial, Partial)) {
645+
partial.setTagValue(
646+
CometExecRule.COMET_UNSAFE_PARTIAL,
647+
"Partial aggregate disabled: corresponding final aggregate " +
648+
"cannot be converted to Comet and intermediate buffer formats are incompatible")
644649
}
645650
}
646651
}
@@ -717,20 +722,22 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
717722
}
718723

719724
/**
720-
* Search the child subtree for the first Partial-mode aggregate, traversing through exchanges
721-
* and AQE stages. Requires `aggregateExpressions.nonEmpty` so that intermediate distinct stages
722-
* (group-by-only aggregates with empty aggregateExpressions, where `.forall` vacuously matches)
723-
* are not mistaken for the partial we want to tag.
725+
* Look for a Partial-mode aggregate that feeds directly into the given plan (the child of a
726+
* Final). Walks through exchanges and AQE stages only, stopping at anything else including
727+
* other aggregate stages. This avoids tagging unrelated Partials found deeper in the plan (e.g.
728+
* the non-distinct Partial in a distinct-aggregate rewrite, which is separated from the Final
729+
* by intermediate PartialMerge stages). Requires `aggregateExpressions.nonEmpty` so that
730+
* group-by-only dedup stages are not mistaken for the partial we want to tag.
724731
*/
725-
private def findPartialAggInPlan(plan: SparkPlan): Option[BaseAggregateExec] = {
726-
plan.collectFirst {
727-
case agg: BaseAggregateExec
728-
if agg.aggregateExpressions.nonEmpty &&
729-
agg.aggregateExpressions.forall(e => e.mode == Partial) =>
730-
Some(agg)
731-
case a: AQEShuffleReadExec => findPartialAggInPlan(a.child)
732-
case s: ShuffleQueryStageExec => findPartialAggInPlan(s.plan)
733-
}.flatten
732+
private def findPartialAggInPlan(plan: SparkPlan): Option[BaseAggregateExec] = plan match {
733+
case agg: BaseAggregateExec
734+
if agg.aggregateExpressions.nonEmpty &&
735+
agg.aggregateExpressions.forall(e => e.mode == Partial) =>
736+
Some(agg)
737+
case a: AQEShuffleReadExec => findPartialAggInPlan(a.child)
738+
case s: ShuffleQueryStageExec => findPartialAggInPlan(s.plan)
739+
case e: ShuffleExchangeExec => findPartialAggInPlan(e.child)
740+
case _ => None
734741
}
735742

736743
}

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_datafusion/extended.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
TakeOrderedAndProject
2-
+- HashAggregate
3-
+- CometNativeColumnarToRow
1+
CometNativeColumnarToRow
2+
+- CometTakeOrderedAndProject
3+
+- CometHashAggregate
44
+- CometColumnarExchange
55
+- HashAggregate
66
+- Project
@@ -64,4 +64,4 @@ TakeOrderedAndProject
6464
+- CometFilter
6565
+- CometNativeScan parquet spark_catalog.default.customer_demographics
6666

67-
Comet accelerated 21 out of 54 eligible operators (38%). Final plan contains 11 transitions between Spark and Comet.
67+
Comet accelerated 23 out of 54 eligible operators (42%). Final plan contains 11 transitions between Spark and Comet.

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_iceberg_compat/extended.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
TakeOrderedAndProject
2-
+- HashAggregate
3-
+- CometNativeColumnarToRow
1+
CometNativeColumnarToRow
2+
+- CometTakeOrderedAndProject
3+
+- CometHashAggregate
44
+- CometColumnarExchange
55
+- HashAggregate
66
+- Project
@@ -60,4 +60,4 @@ TakeOrderedAndProject
6060
+- CometFilter
6161
+- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics
6262

63-
Comet accelerated 35 out of 54 eligible operators (64%). Final plan contains 7 transitions between Spark and Comet.
63+
Comet accelerated 37 out of 54 eligible operators (68%). Final plan contains 7 transitions between Spark and Comet.

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q23a.native_datafusion/extended.txt

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ CometNativeColumnarToRow
2020
: : : : +- CometFilter
2121
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
2222
: : : +- BroadcastExchange
23-
: : : +- Project
24-
: : : +- Filter
25-
: : : +- HashAggregate
26-
: : : +- CometNativeColumnarToRow
23+
: : : +- CometNativeColumnarToRow
24+
: : : +- CometProject
25+
: : : +- CometFilter
26+
: : : +- CometHashAggregate
2727
: : : +- CometColumnarExchange
2828
: : : +- HashAggregate
2929
: : : +- Project
@@ -52,8 +52,8 @@ CometNativeColumnarToRow
5252
: : +- CometProject
5353
: : +- CometFilter
5454
: : : +- Subquery
55-
: : : +- HashAggregate
56-
: : : +- CometNativeColumnarToRow
55+
: : : +- CometNativeColumnarToRow
56+
: : : +- CometHashAggregate
5757
: : : +- CometColumnarExchange
5858
: : : +- HashAggregate
5959
: : : +- HashAggregate
@@ -109,10 +109,10 @@ CometNativeColumnarToRow
109109
: : : +- Scan parquet spark_catalog.default.web_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
110110
: : : +- ReusedSubquery
111111
: : +- BroadcastExchange
112-
: : +- Project
113-
: : +- Filter
114-
: : +- HashAggregate
115-
: : +- CometNativeColumnarToRow
112+
: : +- CometNativeColumnarToRow
113+
: : +- CometProject
114+
: : +- CometFilter
115+
: : +- CometHashAggregate
116116
: : +- CometColumnarExchange
117117
: : +- HashAggregate
118118
: : +- Project
@@ -157,4 +157,4 @@ CometNativeColumnarToRow
157157
+- CometFilter
158158
+- CometNativeScan parquet spark_catalog.default.date_dim
159159

160-
Comet accelerated 83 out of 138 eligible operators (60%). Final plan contains 20 transitions between Spark and Comet.
160+
Comet accelerated 90 out of 138 eligible operators (65%). Final plan contains 20 transitions between Spark and Comet.

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q23b.native_datafusion/extended.txt

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ CometNativeColumnarToRow
2323
: : : : : +- CometFilter
2424
: : : : : +- CometNativeScan parquet spark_catalog.default.date_dim
2525
: : : : +- BroadcastExchange
26-
: : : : +- Project
27-
: : : : +- Filter
28-
: : : : +- HashAggregate
29-
: : : : +- CometNativeColumnarToRow
26+
: : : : +- CometNativeColumnarToRow
27+
: : : : +- CometProject
28+
: : : : +- CometFilter
29+
: : : : +- CometHashAggregate
3030
: : : : +- CometColumnarExchange
3131
: : : : +- HashAggregate
3232
: : : : +- Project
@@ -55,8 +55,8 @@ CometNativeColumnarToRow
5555
: : : +- CometProject
5656
: : : +- CometFilter
5757
: : : : +- Subquery
58-
: : : : +- HashAggregate
59-
: : : : +- CometNativeColumnarToRow
58+
: : : : +- CometNativeColumnarToRow
59+
: : : : +- CometHashAggregate
6060
: : : : +- CometColumnarExchange
6161
: : : : +- HashAggregate
6262
: : : : +- HashAggregate
@@ -139,10 +139,10 @@ CometNativeColumnarToRow
139139
: : : : +- Scan parquet spark_catalog.default.web_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
140140
: : : : +- ReusedSubquery
141141
: : : +- BroadcastExchange
142-
: : : +- Project
143-
: : : +- Filter
144-
: : : +- HashAggregate
145-
: : : +- CometNativeColumnarToRow
142+
: : : +- CometNativeColumnarToRow
143+
: : : +- CometProject
144+
: : : +- CometFilter
145+
: : : +- CometHashAggregate
146146
: : : +- CometColumnarExchange
147147
: : : +- HashAggregate
148148
: : : +- Project
@@ -209,4 +209,4 @@ CometNativeColumnarToRow
209209
+- CometFilter
210210
+- CometNativeScan parquet spark_catalog.default.date_dim
211211

212-
Comet accelerated 131 out of 190 eligible operators (68%). Final plan contains 20 transitions between Spark and Comet.
212+
Comet accelerated 138 out of 190 eligible operators (72%). Final plan contains 20 transitions between Spark and Comet.
Lines changed: 42 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,45 @@
11
CometNativeColumnarToRow
22
+- CometSort
3-
+- CometColumnarExchange
4-
+- Project
5-
+- BroadcastHashJoin
6-
:- Filter
7-
: +- HashAggregate
8-
: +- CometNativeColumnarToRow
9-
: +- CometColumnarExchange
10-
: +- HashAggregate
11-
: +- Project
12-
: +- BroadcastHashJoin
13-
: :- Project
14-
: : +- BroadcastHashJoin
15-
: : :- Project
16-
: : : +- BroadcastHashJoin
17-
: : : :- Filter
18-
: : : : +- ColumnarToRow
19-
: : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
20-
: : : : +- SubqueryBroadcast
21-
: : : : +- BroadcastExchange
22-
: : : : +- CometNativeColumnarToRow
23-
: : : : +- CometProject
24-
: : : : +- CometFilter
25-
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
26-
: : : +- BroadcastExchange
27-
: : : +- CometNativeColumnarToRow
28-
: : : +- CometProject
29-
: : : +- CometFilter
30-
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
31-
: : +- BroadcastExchange
32-
: : +- CometNativeColumnarToRow
33-
: : +- CometProject
34-
: : +- CometFilter
35-
: : +- CometNativeScan parquet spark_catalog.default.store
36-
: +- BroadcastExchange
37-
: +- CometNativeColumnarToRow
38-
: +- CometProject
39-
: +- CometFilter
40-
: +- CometNativeScan parquet spark_catalog.default.household_demographics
41-
+- BroadcastExchange
42-
+- CometNativeColumnarToRow
43-
+- CometProject
44-
+- CometFilter
45-
+- CometNativeScan parquet spark_catalog.default.customer
3+
+- CometExchange
4+
+- CometProject
5+
+- CometBroadcastHashJoin
6+
:- CometFilter
7+
: +- CometHashAggregate
8+
: +- CometColumnarExchange
9+
: +- HashAggregate
10+
: +- Project
11+
: +- BroadcastHashJoin
12+
: :- Project
13+
: : +- BroadcastHashJoin
14+
: : :- Project
15+
: : : +- BroadcastHashJoin
16+
: : : :- Filter
17+
: : : : +- ColumnarToRow
18+
: : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
19+
: : : : +- SubqueryBroadcast
20+
: : : : +- BroadcastExchange
21+
: : : : +- CometNativeColumnarToRow
22+
: : : : +- CometProject
23+
: : : : +- CometFilter
24+
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
25+
: : : +- BroadcastExchange
26+
: : : +- CometNativeColumnarToRow
27+
: : : +- CometProject
28+
: : : +- CometFilter
29+
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
30+
: : +- BroadcastExchange
31+
: : +- CometNativeColumnarToRow
32+
: : +- CometProject
33+
: : +- CometFilter
34+
: : +- CometNativeScan parquet spark_catalog.default.store
35+
: +- BroadcastExchange
36+
: +- CometNativeColumnarToRow
37+
: +- CometProject
38+
: +- CometFilter
39+
: +- CometNativeScan parquet spark_catalog.default.household_demographics
40+
+- CometBroadcastExchange
41+
+- CometProject
42+
+- CometFilter
43+
+- CometNativeScan parquet spark_catalog.default.customer
4644

47-
Comet accelerated 18 out of 37 eligible operators (48%). Final plan contains 8 transitions between Spark and Comet.
45+
Comet accelerated 23 out of 37 eligible operators (62%). Final plan contains 6 transitions between Spark and Comet.

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q54.native_datafusion/extended.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
TakeOrderedAndProject
2-
+- HashAggregate
3-
+- CometNativeColumnarToRow
1+
CometNativeColumnarToRow
2+
+- CometTakeOrderedAndProject
3+
+- CometHashAggregate
44
+- CometColumnarExchange
55
+- HashAggregate
66
+- HashAggregate
@@ -113,4 +113,4 @@ TakeOrderedAndProject
113113
: +- CometNativeScan parquet spark_catalog.default.date_dim
114114
+- CometNativeScan parquet spark_catalog.default.date_dim
115115

116-
Comet accelerated 51 out of 96 eligible operators (53%). Final plan contains 18 transitions between Spark and Comet.
116+
Comet accelerated 53 out of 96 eligible operators (55%). Final plan contains 18 transitions between Spark and Comet.

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q6.native_datafusion/extended.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
TakeOrderedAndProject
2-
+- Filter
3-
+- HashAggregate
4-
+- CometNativeColumnarToRow
1+
CometNativeColumnarToRow
2+
+- CometTakeOrderedAndProject
3+
+- CometFilter
4+
+- CometHashAggregate
55
+- CometColumnarExchange
66
+- HashAggregate
77
+- Project
@@ -65,4 +65,4 @@ TakeOrderedAndProject
6565
+- CometFilter
6666
+- CometNativeScan parquet spark_catalog.default.item
6767

68-
Comet accelerated 39 out of 58 eligible operators (67%). Final plan contains 8 transitions between Spark and Comet.
68+
Comet accelerated 42 out of 58 eligible operators (72%). Final plan contains 8 transitions between Spark and Comet.

0 commit comments

Comments
 (0)