Skip to content

Commit 336aadd

Browse files
committed
feat: support PartialMerge
1 parent 0c0327a commit 336aadd

File tree

3 files changed

+49
-3
lines changed

3 files changed

+49
-3
lines changed

native/core/src/execution/planner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,8 +1035,8 @@ impl PhysicalPlanner {
10351035
)
10361036
.schema(Arc::clone(&child_schema))
10371037
.alias(format!("col_{idx}"))
1038-
.with_ignore_nulls(false)
1039-
.with_distinct(false)
1038+
.with_ignore_nulls(expr.ignore_nulls())
1039+
.with_distinct(expr.is_distinct())
10401040
.build()
10411041
.map_err(|e| ExecutionError::DataFusionError(e.to_string()))?;
10421042

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,6 +1460,24 @@ trait CometBaseAggregate {
14601460
}
14611461
}
14621462

1463+
// FIRST/LAST are order-dependent aggregates whose merge result depends on
1464+
// hash table processing order. In PartialMerge mode, DataFusion's hash table
1465+
// may process rows in a different order than Spark's, producing different results.
1466+
val hasPartialMergeMode = modeSet.contains(PartialMerge)
1467+
if (hasPartialMergeMode) {
1468+
val unsupportedAggs = aggregateExpressions.filter { a =>
1469+
a.mode == PartialMerge && (a.aggregateFunction.isInstanceOf[First] ||
1470+
a.aggregateFunction.isInstanceOf[Last])
1471+
}
1472+
if (unsupportedAggs.nonEmpty) {
1473+
withInfo(
1474+
aggregate,
1475+
s"PartialMerge not supported for order-dependent aggregates: " +
1476+
unsupportedAggs.map(_.aggregateFunction.prettyName).mkString(", "))
1477+
return None
1478+
}
1479+
}
1480+
14631481
// Per-expression binding: Partial expressions bind to child output,
14641482
// PartialMerge/Final expressions do not (native planner handles their input).
14651483
val output = child.output

spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
640640
sql("CREATE TEMP VIEW v AS SELECT _1, _2 FROM tbl ORDER BY _1")
641641
checkSparkAnswerAndOperator(
642642
"SELECT _2, SUM(_1), SUM(DISTINCT _1), MIN(_1), MAX(_1), COUNT(_1)," +
643-
" COUNT(DISTINCT _1), AVG(_1), FIRST(_1), LAST(_1) FROM v GROUP BY _2")
643+
" COUNT(DISTINCT _1), AVG(_1)" +
644+
" FROM v GROUP BY _2 ORDER BY _2")
644645
}
645646
}
646647
}
@@ -649,6 +650,33 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
649650
}
650651
}
651652

653+
// FIRST/LAST are order-dependent aggregates whose merge result depends on hash table
654+
// processing order. In PartialMerge mode, DataFusion's hash table may process rows
655+
// in a different order than Spark's, so we fall back to Spark for correctness.
656+
test("partialMerge - FIRST/LAST with distinct aggregates falls back") {
657+
val numValues = 10000
658+
Seq(100).foreach { numGroups =>
659+
Seq(128).foreach { batchSize =>
660+
withSQLConf(
661+
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
662+
CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) {
663+
withParquetTable(
664+
(0 until numValues).map(i => (i, Random.nextInt() % numGroups)),
665+
"tbl",
666+
false) {
667+
withView("v") {
668+
sql("CREATE TEMP VIEW v AS SELECT _1, _2 FROM tbl ORDER BY _1")
669+
checkSparkAnswerAndFallbackReason(
670+
"SELECT _2, FIRST(_1), LAST(_1), COUNT(DISTINCT _1)" +
671+
" FROM v GROUP BY _2 ORDER BY _2",
672+
"PartialMerge not supported for order-dependent aggregates")
673+
}
674+
}
675+
}
676+
}
677+
}
678+
}
679+
652680
test("partialMerge - cnt distinct + sum") {
653681
withTempDir(dir => {
654682
withSQLConf("spark.comet.enabled" -> "false") {

0 commit comments

Comments
 (0)