Skip to content

Commit b49942c

Browse files
committed
perf: short-circuit repeat serde checks via per-handler attempt tag
Record the handler class name on operators whose getSupportLevel returned Unsupported or Incompatible (without allowIncompat), so that later AQE stage-prep passes skip re-running the same check. The tag is keyed per handler, so other handlers on the same node (e.g. CometSparkToColumnarExec on a scan tagged by CometScanRule) are unaffected. Closes #3990
1 parent cee98e3 commit b49942c

File tree

3 files changed

+91
-2
lines changed

3 files changed

+91
-2
lines changed

spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,15 @@ object CometCoverageStats {
214214
object CometExplainInfo {
215215
val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")
216216

217+
/**
218+
* Records handler class names whose `getSupportLevel` has already returned `Unsupported` or
219+
* `Incompatible` (without `allowIncompat`) on a given operator, so that repeat invocations of
220+
* the same handler on the same node during later rule passes can short-circuit without
221+
* re-running the check. Orthogonal to [[EXTENSION_INFO]]; keyed per handler so other handlers
222+
* on the same node are unaffected.
223+
*/
224+
val FAILED_HANDLERS = new TreeNodeTag[Set[String]]("CometFailedHandlers")
225+
217226
def getActualPlan(node: TreeNode[_]): TreeNode[_] = {
218227
node match {
219228
case p: AdaptiveSparkPlanExec => getActualPlan(p.executedPlan)

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,8 +480,13 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
480480
}
481481

482482
/** Convert a Spark plan to a Comet plan using the specified serde handler */
483-
private def convertToComet(op: SparkPlan, handler: CometOperatorSerde[_]): Option[SparkPlan] = {
483+
private[rules] def convertToComet(
484+
op: SparkPlan,
485+
handler: CometOperatorSerde[_]): Option[SparkPlan] = {
484486
val serde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
487+
if (hasFailedHandler(op, handler)) {
488+
return None
489+
}
485490
if (isOperatorEnabled(serde, op)) {
486491
// For operators that require native children (like writes), check if all data-producing
487492
// children are CometNativeExec. This prevents runtime failures when the native operator
@@ -522,6 +527,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
522527
handler.getSupportLevel(op) match {
523528
case Unsupported(notes) =>
524529
withInfo(op, notes.getOrElse(""))
530+
recordFailedHandler(op, handler)
525531
false
526532
case Incompatible(notes) =>
527533
val allowIncompat = CometConf.isOperatorAllowIncompat(opName)
@@ -540,6 +546,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
540546
s"$opName is not fully compatible with Spark$optionalNotes. " +
541547
s"To enable it anyway, set $incompatConf=true. " +
542548
s"${CometConf.COMPAT_GUIDE}.")
549+
recordFailedHandler(op, handler)
543550
false
544551
}
545552
case Compatible(notes) =>
@@ -557,6 +564,16 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
557564
}
558565
}
559566

567+
private def hasFailedHandler(op: SparkPlan, handler: CometOperatorSerde[_]): Boolean = {
568+
op.getTagValue(CometExplainInfo.FAILED_HANDLERS)
569+
.exists(_.contains(handler.getClass.getName))
570+
}
571+
572+
private def recordFailedHandler(op: SparkPlan, handler: CometOperatorSerde[_]): Unit = {
573+
val existing = op.getTagValue(CometExplainInfo.FAILED_HANDLERS).getOrElse(Set.empty[String])
574+
op.setTagValue(CometExplainInfo.FAILED_HANDLERS, existing + handler.getClass.getName)
575+
}
576+
560577
private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {
561578
// Only consider converting leaf nodes to columnar currently, so that all the following
562579
// operators can have a chance to be converted to columnar. Leaf operators that output

spark/src/test/scala/org/apache/comet/rules/CometExecRuleSuite.scala

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ import org.apache.spark.sql.execution.aggregate.HashAggregateExec
3030
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
3131
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
3232

33-
import org.apache.comet.CometConf
33+
import org.apache.comet.{CometConf, ConfigEntry}
34+
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass, SupportLevel, Unsupported}
35+
import org.apache.comet.serde.OperatorOuterClass.Operator
3436
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}
3537

3638
/**
@@ -206,6 +208,67 @@ class CometExecRuleSuite extends CometTestBase {
206208
}
207209
}
208210

211+
test("convertToComet short-circuits repeat call for a handler that already failed") {
212+
withTempView("test_data") {
213+
createTestDataFrame.createOrReplaceTempView("test_data")
214+
215+
val sparkPlan = createSparkPlan(spark, "SELECT id FROM test_data WHERE id > 0")
216+
val op = sparkPlan.collectFirst { case f: FilterExec => f }.get
217+
218+
val stub = new StubUnsupportedHandlerA
219+
val rule = CometExecRule(spark)
220+
221+
assert(rule.convertToComet(op, stub).isEmpty)
222+
assert(stub.calls == 1, s"first call should invoke getSupportLevel once, got ${stub.calls}")
223+
224+
assert(rule.convertToComet(op, stub).isEmpty)
225+
assert(
226+
stub.calls == 1,
227+
s"second call should short-circuit; getSupportLevel calls = ${stub.calls}")
228+
}
229+
}
230+
231+
test("convertToComet short-circuit is per-handler (other handlers still run)") {
232+
withTempView("test_data") {
233+
createTestDataFrame.createOrReplaceTempView("test_data")
234+
235+
val sparkPlan = createSparkPlan(spark, "SELECT id FROM test_data WHERE id > 0")
236+
val op = sparkPlan.collectFirst { case f: FilterExec => f }.get
237+
238+
val stubA = new StubUnsupportedHandlerA
239+
val stubB = new StubUnsupportedHandlerB
240+
val rule = CometExecRule(spark)
241+
242+
assert(rule.convertToComet(op, stubA).isEmpty)
243+
assert(rule.convertToComet(op, stubA).isEmpty)
244+
assert(
245+
stubA.calls == 1,
246+
s"handler A should be short-circuited on repeat, got ${stubA.calls}")
247+
248+
// Different handler class on the same node must still run even though A already failed.
249+
assert(rule.convertToComet(op, stubB).isEmpty)
250+
assert(stubB.calls == 1, s"handler B (different class) must still run, got ${stubB.calls}")
251+
}
252+
}
253+
254+
private abstract class CountingStubHandler extends CometOperatorSerde[SparkPlan] {
255+
var calls: Int = 0
256+
override def enabledConfig: Option[ConfigEntry[Boolean]] = None
257+
override def getSupportLevel(operator: SparkPlan): SupportLevel = {
258+
calls += 1
259+
Unsupported(Some("stub fallback"))
260+
}
261+
override def convert(
262+
op: SparkPlan,
263+
builder: Operator.Builder,
264+
childOp: Operator*): Option[OperatorOuterClass.Operator] = None
265+
override def createExec(nativeOp: Operator, op: SparkPlan): CometNativeExec =
266+
throw new AssertionError("createExec should not be invoked on Unsupported")
267+
}
268+
269+
private class StubUnsupportedHandlerA extends CountingStubHandler
270+
private class StubUnsupportedHandlerB extends CountingStubHandler
271+
209272
test("CometExecRule should apply shuffle exchange transformations") {
210273
withTempView("test_data") {
211274
createTestDataFrame.createOrReplaceTempView("test_data")

0 commit comments

Comments
 (0)