Skip to content

Commit 17fa823

Browse files
committed
Remove DPP config.
1 parent c16498b commit 17fa823

7 files changed

Lines changed: 14 additions & 36 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -554,14 +554,6 @@ object CometConf extends ShimCometConf {
554554
.doubleConf
555555
.createWithDefault(1.0)
556556

557-
val COMET_DPP_FALLBACK_ENABLED: ConfigEntry[Boolean] =
558-
conf("spark.comet.dppFallback.enabled")
559-
.category(CATEGORY_EXEC)
560-
.doc("Whether to fall back to Spark for queries that use AQE Dynamic Partition Pruning " +
561-
"(SubqueryAdaptiveBroadcastExec). Non-AQE DPP is always supported natively.")
562-
.booleanConf
563-
.createWithDefault(true)
564-
565557
val COMET_DEBUG_ENABLED: ConfigEntry[Boolean] =
566558
conf("spark.comet.debug.enabled")
567559
.category(CATEGORY_EXEC)

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,7 @@ case class CometScanRule(session: SparkSession)
139139

140140
private def transformV1Scan(plan: SparkPlan, scanExec: FileSourceScanExec): SparkPlan = {
141141

142-
if (COMET_DPP_FALLBACK_ENABLED.get() &&
143-
scanExec.partitionFilters.exists(isAqeDynamicPruningFilter)) {
142+
if (scanExec.partitionFilters.exists(isAqeDynamicPruningFilter)) {
144143
return withInfo(scanExec, "AQE Dynamic Partition Pruning is not supported")
145144
}
146145

spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,11 @@ object CometShuffleExchangeExec
275275
case None =>
276276
}
277277

278-
// DPP fallback is a combined-path decision: a Comet shuffle wrapped around a stage that
279-
// still contains a DPP scan produces inefficient row<->columnar transitions. Disqualifies
280-
// both paths.
281-
if (CometConf.COMET_DPP_FALLBACK_ENABLED.get() && stageContainsDPPScan(s)) {
278+
// A Comet shuffle wrapped around a stage that still contains a Spark FileSourceScanExec
279+
// with DPP produces inefficient row<->columnar transitions. This only happens when the
280+
// scan fell back (e.g., AQE DPP not supported). If the scan converted to
281+
// CometNativeScanExec, stageContainsDPPScan won't match (it checks FileSourceScanExec).
282+
if (stageContainsDPPScan(s)) {
282283
withInfos(s, Set("Stage contains a scan with Dynamic Partition Pruning"))
283284
return None
284285
}

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,7 @@ class CometExecSuite extends CometTestBase {
128128

129129
// note that this test does not trigger DPP with v2 data source
130130
Seq("parquet").foreach { v1List =>
131-
withSQLConf(
132-
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
133-
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") {
131+
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> v1List) {
134132
spark.read.parquet(factPath).createOrReplaceTempView("dpp_fact")
135133
spark.read.parquet(dimPath).createOrReplaceTempView("dpp_dim")
136134
val df =
@@ -164,8 +162,7 @@ class CometExecSuite extends CometTestBase {
164162
Seq("parquet").foreach { v1List =>
165163
withSQLConf(
166164
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
167-
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
168-
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") {
165+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
169166
spark.read.parquet(factPath).createOrReplaceTempView("dpp_fact2")
170167
spark.read.parquet(dimPath).createOrReplaceTempView("dpp_dim2")
171168
val df =
@@ -202,8 +199,7 @@ class CometExecSuite extends CometTestBase {
202199
// with SubqueryBroadcastExec, not SubqueryAdaptiveBroadcastExec
203200
withSQLConf(
204201
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
205-
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
206-
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") {
202+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
207203
spark.read.parquet(factPath).createOrReplaceTempView("dpp_fact_bhj")
208204
spark.read.parquet(dimPath).createOrReplaceTempView("dpp_dim_bhj")
209205
val df = spark.sql(
@@ -249,8 +245,7 @@ class CometExecSuite extends CometTestBase {
249245
withSQLConf(
250246
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
251247
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
252-
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
253-
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") {
248+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
254249
spark.read.parquet(factPath).createOrReplaceTempView("dpp_fact_smj")
255250
spark.read.parquet(dimPath).createOrReplaceTempView("dpp_dim_smj")
256251
val df = spark.sql(

spark/src/test/scala/org/apache/spark/sql/comet/CometDppFallbackRepro3949Suite.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ class CometDppFallbackRepro3949Suite extends CometTestBase {
109109
withTempDir { dir =>
110110
buildDppTables(dir, "mech")
111111
withSQLConf(
112-
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true",
113112
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
114113
SQLConf.PREFER_SORTMERGEJOIN.key -> "true",
115114
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
@@ -336,9 +335,7 @@ class CometDppFallbackRepro3949Suite extends CometTestBase {
336335
val suspicious = mutable.Buffer.empty[(String, Int, String)]
337336

338337
for ((variantName, variantConf) <- variants; (q, idx) <- queries.zipWithIndex) {
339-
val conf = variantConf ++ Map(
340-
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true",
341-
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet")
338+
val conf = variantConf ++ Map(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet")
342339
try {
343340
withSQLConf(conf.toSeq: _*) {
344341
val df = spark.sql(q)

spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,6 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa
218218
CometConf.COMET_ENABLED.key -> "true",
219219
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true",
220220
CometConf.COMET_EXEC_ENABLED.key -> "true",
221-
// Non-AQE DPP is natively supported. AQE is disabled in this suite
222-
// (DisableAdaptiveExecutionSuite), so all DPP is non-AQE and works natively.
223-
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true",
224221
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
225222
CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true",
226223
// as well as for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64

spark/src/test/scala/org/apache/spark/sql/comet/CometShuffleFallbackStickinessSuite.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,9 @@ class CometShuffleFallbackStickinessSuite extends CometTestBase {
5252
val shuffle = ShuffleExchangeExec(SinglePartition, SyntheticLeaf(Nil))
5353
withInfo(shuffle, "pretend prior pass decided Spark fallback")
5454

55-
withSQLConf(CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") {
56-
assert(
57-
CometShuffleExchangeExec.shuffleSupported(shuffle).isEmpty,
58-
"marked shuffle must preserve its prior-pass fallback decision")
59-
}
55+
assert(
56+
CometShuffleExchangeExec.shuffleSupported(shuffle).isEmpty,
57+
"marked shuffle must preserve its prior-pass fallback decision")
6058
}
6159

6260
test(
@@ -85,7 +83,6 @@ class CometShuffleFallbackStickinessSuite extends CometTestBase {
8583
spark.read.parquet(dimPath).createOrReplaceTempView("t_sticky_dim")
8684

8785
withSQLConf(
88-
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true",
8986
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
9087
SQLConf.PREFER_SORTMERGEJOIN.key -> "true",
9188
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",

0 commit comments

Comments
 (0)