Skip to content

Commit 994189c

Browse files
committed
refactor: coordinate shuffle path checks through a single entry point
Replace the separate STAGE_FALLBACK_TAG with explain-info-based stickiness. Shuffle path checks (`nativeShuffleFailureReasons`, `columnarShuffleFailureReasons`) are now pure and return reasons instead of tagging eagerly. A new `shuffleSupported` coordinator short-circuits on `hasExplainInfo`, tries native then columnar, and tags via `withInfos` only on total failure. DPP fallback, which disqualifies both paths, moves into the coordinator. This removes the need for `CometFallback` and eliminates the semantic split where `withInfo` could fire for a path-specific failure while the node still converted via a different path.
1 parent b80a63d commit 994189c

6 files changed

Lines changed: 167 additions & 266 deletions

File tree

docs/source/contributor-guide/adding_a_new_operator.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -553,8 +553,14 @@ For operators that run in the JVM:
553553
Example pattern from `CometExecRule.scala`:
554554

555555
```scala
556-
case s: ShuffleExchangeExec if nativeShuffleSupported(s) =>
557-
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
556+
case s: ShuffleExchangeExec =>
557+
CometShuffleExchangeExec.shuffleSupported(s) match {
558+
case Some(CometNativeShuffle) =>
559+
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
560+
case Some(CometColumnarShuffle) =>
561+
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
562+
case None => s
563+
}
558564
```
559565

560566
## Common Patterns and Helpers

spark/src/main/scala/org/apache/comet/CometFallback.scala

Lines changed: 0 additions & 67 deletions
This file was deleted.

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,18 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
9898
private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()
9999

100100
private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
101-
plan.transformUp {
102-
case s: ShuffleExchangeExec if CometShuffleExchangeExec.nativeShuffleSupported(s) =>
103-
// Switch to use Decimal128 regardless of precision, since Arrow native execution
104-
// doesn't support Decimal32 and Decimal64 yet.
105-
conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
106-
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
107-
108-
case s: ShuffleExchangeExec if CometShuffleExchangeExec.columnarShuffleSupported(s) =>
109-
// Columnar shuffle for regular Spark operators (not Comet) and Comet operators
110-
// (if configured)
111-
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
101+
plan.transformUp { case s: ShuffleExchangeExec =>
102+
CometShuffleExchangeExec.shuffleSupported(s) match {
103+
case Some(CometNativeShuffle) =>
104+
// Switch to use Decimal128 regardless of precision, since Arrow native execution
105+
// doesn't support Decimal32 and Decimal64 yet.
106+
conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
107+
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
108+
case Some(CometColumnarShuffle) =>
109+
CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle)
110+
case None =>
111+
s
112+
}
112113
}
113114
}
114115

0 commit comments

Comments
 (0)