Skip to content

Commit acbdeac

Browse files
authored
fix: skip Comet columnar shuffle for stages with DPP scans (#3879)
When a scan uses Dynamic Partition Pruning (DPP) and falls back to Spark, Comet was still wrapping the stage with columnar shuffle, creating inefficient row-to-columnar transitions: CometShuffleWriter → CometRowToColumnar → SparkFilter → SparkColumnarToRow → SparkScan This adds a check in columnarShuffleSupported() that walks the child plan tree to detect FileSourceScanExec nodes with dynamic pruning filters. When found, the shuffle is not converted to Comet, allowing the entire stage to fall back to Spark.
1 parent 3c927dd commit acbdeac

2 files changed

Lines changed: 59 additions & 2 deletions

File tree

spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD
3030
import org.apache.spark.serializer.Serializer
3131
import org.apache.spark.shuffle.sort.SortShuffleManager
3232
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
33-
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow}
33+
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, PlanExpression, UnsafeProjection, UnsafeRow}
3434
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
3535
import org.apache.spark.sql.catalyst.plans.logical.Statistics
3636
import org.apache.spark.sql.catalyst.plans.physical._
@@ -454,6 +454,11 @@ object CometShuffleExchangeExec
454454
return false
455455
}
456456

457+
if (CometConf.COMET_DPP_FALLBACK_ENABLED.get() && stageContainsDPPScan(s)) {
458+
withInfo(s, "Stage contains a scan with Dynamic Partition Pruning")
459+
return false
460+
}
461+
457462
if (!isCometJVMShuffleMode(s.conf)) {
458463
withInfo(s, "Comet columnar shuffle not enabled")
459464
return false
@@ -546,6 +551,22 @@ object CometShuffleExchangeExec
546551
}
547552
}
548553

554+
/**
555+
* Returns true if the stage (the subtree rooted at this shuffle) contains a scan with Dynamic
556+
* Partition Pruning (DPP). When DPP is present, the scan falls back to Spark, and wrapping the
557+
* stage with Comet shuffle creates inefficient row-to-columnar transitions.
558+
*/
559+
private def stageContainsDPPScan(s: ShuffleExchangeExec): Boolean = {
560+
def isDynamicPruningFilter(e: Expression): Boolean =
561+
e.exists(_.isInstanceOf[PlanExpression[_]])
562+
563+
s.child.exists {
564+
case scan: FileSourceScanExec =>
565+
scan.partitionFilters.exists(isDynamicPruningFilter)
566+
case _ => false
567+
}
568+
}
569+
549570
def isCometShuffleEnabledWithInfo(op: SparkPlan): Boolean = {
550571
if (!COMET_EXEC_SHUFFLE_ENABLED.get(op.conf)) {
551572
withInfo(

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,44 @@ class CometExecSuite extends CometTestBase {
139139
val (_, cometPlan) = checkSparkAnswer(df)
140140
val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
141141
assert(infos.contains("Dynamic Partition Pruning is not supported"))
142+
}
143+
}
144+
}
145+
}
146+
147+
test("DPP fallback avoids inefficient Comet shuffle (#3874)") {
148+
withTempDir { path =>
149+
val factPath = s"${path.getAbsolutePath}/fact.parquet"
150+
val dimPath = s"${path.getAbsolutePath}/dim.parquet"
151+
withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") {
152+
val one_day = 24 * 60 * 60000
153+
val fact = Range(0, 100)
154+
.map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString))
155+
.toDF("fact_id", "fact_date", "fact_str")
156+
fact.write.partitionBy("fact_date").parquet(factPath)
157+
val dim = Range(0, 10)
158+
.map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString))
159+
.toDF("dim_id", "dim_date", "dim_str")
160+
dim.write.parquet(dimPath)
161+
}
162+
163+
// Force sort-merge join to get a shuffle exchange above the DPP scan
164+
Seq("parquet").foreach { v1List =>
165+
withSQLConf(
166+
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
167+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
168+
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") {
169+
spark.read.parquet(factPath).createOrReplaceTempView("dpp_fact2")
170+
spark.read.parquet(dimPath).createOrReplaceTempView("dpp_dim2")
171+
val df =
172+
spark.sql(
173+
"select * from dpp_fact2 join dpp_dim2 on fact_date = dim_date where dim_id > 7")
174+
val (_, cometPlan) = checkSparkAnswer(df)
142175

143-
assert(infos.contains("Comet accelerated"))
176+
// Verify no CometShuffleExchangeExec wraps the DPP stage
177+
assert(
178+
!cometPlan.toString().contains("CometColumnarShuffle"),
179+
"Should not use Comet columnar shuffle for stages with DPP scans")
144180
}
145181
}
146182
}

0 commit comments

Comments
 (0)