Skip to content

Commit e31c3bc

Browse files
committed
update plans
1 parent ca14b18 commit e31c3bc

2 files changed

Lines changed: 20 additions & 7 deletions

File tree

native/core/src/execution/planner.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -967,21 +967,34 @@ impl PhysicalPlanner {
967967
let group_by = PhysicalGroupBy::new_single(group_exprs?);
968968
let schema = child.schema();
969969

970+
let has_mixed_partial_merge = !agg.expr_modes.is_empty()
971+
&& agg.expr_modes.contains(&2)
972+
&& agg.expr_modes.iter().any(|&m| m != 2);
973+
970974
let mode = match agg.mode {
971-
0 => DFAggregateMode::Partial,
975+
0 => {
976+
if has_mixed_partial_merge {
977+
// Mixed {Partial, PartialMerge}: use Partial mode, wrap
978+
// PartialMerge expressions with MergeAsPartial.
979+
DFAggregateMode::Partial
980+
} else {
981+
DFAggregateMode::Partial
982+
}
983+
}
972984
1 => DFAggregateMode::Final,
973-
2 => DFAggregateMode::Partial, // PartialMerge: Partial + MergeAsPartial
985+
// Uniform PartialMerge maps directly to DataFusion's PartialReduce
986+
// which has merge input + state output semantics.
987+
2 => DFAggregateMode::PartialReduce,
974988
other => {
975989
return Err(ExecutionError::GeneralError(format!(
976990
"Unsupported aggregate mode: {other}"
977991
)))
978992
}
979993
};
980994

981-
// Check if any expression uses PartialMerge mode (2). When present,
982-
// those expressions are wrapped with MergeAsPartial to get merge
983-
// semantics inside a Partial-mode AggregateExec.
984-
let has_partial_merge = agg.mode == 2 || agg.expr_modes.contains(&2);
995+
// For mixed mode, PartialMerge expressions need MergeAsPartial wrappers
996+
// to get merge semantics inside a Partial-mode AggregateExec.
997+
let has_partial_merge = has_mixed_partial_merge;
985998

986999
let agg_exprs: PhyAggResult = agg
9871000
.agg_exprs

spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
712712
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
713713
"spark.comet.exec.shuffle.fallbackToColumnar" -> "false",
714714
"spark.comet.cast.allowIncompatible" -> "true",
715-
"spark.sql.adaptive.enabled" -> "false",
715+
"spark.sql.adaptive.enabled" -> "true",
716716
"spark.comet.enabled" -> "true",
717717
"spark.comet.expression.Cast.allowIncompatible" -> "true",
718718
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_iceberg_compat") {

0 commit comments

Comments
 (0)