Skip to content

Commit ee140c7

Browse files
authored
feat: non-AQE DPP for native Parquet scans, broadcast exchange reuse for DPP subqueries (#4011)
1 parent 36e0a3e commit ee140c7

346 files changed

Lines changed: 31481 additions & 36481 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -554,13 +554,6 @@ object CometConf extends ShimCometConf {
554554
.doubleConf
555555
.createWithDefault(1.0)
556556

557-
val COMET_DPP_FALLBACK_ENABLED: ConfigEntry[Boolean] =
558-
conf("spark.comet.dppFallback.enabled")
559-
.category(CATEGORY_EXEC)
560-
.doc("Whether to fall back to Spark for queries that use DPP.")
561-
.booleanConf
562-
.createWithDefault(true)
563-
564557
val COMET_DEBUG_ENABLED: ConfigEntry[Boolean] =
565558
conf("spark.comet.debug.enabled")
566559
.category(CATEGORY_EXEC)

dev/diffs/3.4.3.diff

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git a/pom.xml b/pom.xml
2-
index d3544881af1..377683b10c5 100644
2+
index d3544881af1..d075572c5b3 100644
33
--- a/pom.xml
44
+++ b/pom.xml
55
@@ -148,6 +148,8 @@
@@ -417,28 +417,48 @@ index daef11ae4d6..9f3cc9181f2 100644
417417
assert(exchanges.size == 2)
418418
}
419419
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
420-
index f33432ddb6f..7d758d2481f 100644
420+
index f33432ddb6f..99729e465e4 100644
421421
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
422422
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
423423
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
424424
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
425425
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
426426
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
427-
+import org.apache.spark.sql.comet.CometScanExec
427+
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec, CometSubqueryBroadcastExec}
428428
import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, InMemoryTableWithV2FilterCatalog}
429429
import org.apache.spark.sql.execution._
430430
import org.apache.spark.sql.execution.adaptive._
431-
@@ -262,6 +263,9 @@ abstract class DynamicPartitionPruningSuiteBase
431+
@@ -193,6 +194,7 @@ abstract class DynamicPartitionPruningSuiteBase
432+
}
433+
val subqueryBroadcast = dpExprs.collect {
434+
case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _, _, _) => b
435+
+ case InSubqueryExec(_, b: CometSubqueryBroadcastExec, _, _, _, _) => b
436+
}
437+
438+
val hasFilter = if (withSubquery) "Should" else "Shouldn't"
439+
@@ -247,6 +249,8 @@ abstract class DynamicPartitionPruningSuiteBase
440+
val buf = collectDynamicPruningExpressions(df.queryExecution.executedPlan).collect {
441+
case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _, _, _) =>
442+
b.index
443+
+ case InSubqueryExec(_, b: CometSubqueryBroadcastExec, _, _, _, _) =>
444+
+ b.indices.head
445+
}
446+
assert(buf.distinct.size == n)
447+
}
448+
@@ -262,6 +266,12 @@ abstract class DynamicPartitionPruningSuiteBase
432449
case s: BatchScanExec => s.runtimeFilters.collect {
433450
case d: DynamicPruningExpression => d.child
434451
}
435452
+ case s: CometScanExec => s.partitionFilters.collect {
436453
+ case d: DynamicPruningExpression => d.child
454+
+ }
455+
+ case s: CometNativeScanExec => s.partitionFilters.collect {
456+
+ case d: DynamicPruningExpression => d.child
437457
+ }
438458
case _ => Nil
439459
}
440460
}
441-
@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase
461+
@@ -1027,7 +1037,8 @@ abstract class DynamicPartitionPruningSuiteBase
442462
}
443463
}
444464

@@ -448,7 +468,26 @@ index f33432ddb6f..7d758d2481f 100644
448468
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
449469
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
450470
withTable("large", "dimTwo", "dimThree") {
451-
@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase
471+
@@ -1204,10 +1215,16 @@ abstract class DynamicPartitionPruningSuiteBase
472+
473+
val plan = df.queryExecution.executedPlan
474+
val countSubqueryBroadcasts =
475+
- collectWithSubqueries(plan)({ case _: SubqueryBroadcastExec => 1 }).sum
476+
+ collectWithSubqueries(plan)({
477+
+ case _: SubqueryBroadcastExec => 1
478+
+ case _: CometSubqueryBroadcastExec => 1
479+
+ }).sum
480+
481+
val countReusedSubqueryBroadcasts =
482+
- collectWithSubqueries(plan)({ case ReusedSubqueryExec(_: SubqueryBroadcastExec) => 1}).sum
483+
+ collectWithSubqueries(plan)({
484+
+ case ReusedSubqueryExec(_: SubqueryBroadcastExec) => 1
485+
+ case ReusedSubqueryExec(_: CometSubqueryBroadcastExec) => 1
486+
+ }).sum
487+
488+
assert(countSubqueryBroadcasts == 1)
489+
assert(countReusedSubqueryBroadcasts == 1)
490+
@@ -1215,7 +1232,8 @@ abstract class DynamicPartitionPruningSuiteBase
452491
}
453492

454493
test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
@@ -458,7 +497,7 @@ index f33432ddb6f..7d758d2481f 100644
458497
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
459498
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
460499
val df = sql(
461-
@@ -1423,7 +1430,8 @@ abstract class DynamicPartitionPruningSuiteBase
500+
@@ -1423,7 +1441,8 @@ abstract class DynamicPartitionPruningSuiteBase
462501
}
463502
}
464503

@@ -468,7 +507,15 @@ index f33432ddb6f..7d758d2481f 100644
468507
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
469508
val df = sql(
470509
""" WITH v as (
471-
@@ -1698,7 +1706,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
510+
@@ -1577,6 +1596,7 @@ abstract class DynamicPartitionPruningSuiteBase
511+
512+
val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
513+
case s: SubqueryBroadcastExec => s
514+
+ case s: CometSubqueryBroadcastExec => s
515+
}
516+
assert(subqueryBroadcastExecs.size === 1)
517+
subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
518+
@@ -1698,7 +1718,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
472519
* Check the static scan metrics with and without DPP
473520
*/
474521
test("static scan metrics",
@@ -478,7 +525,7 @@ index f33432ddb6f..7d758d2481f 100644
478525
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
479526
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
480527
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
481-
@@ -1729,6 +1738,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
528+
@@ -1729,6 +1750,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
482529
case s: BatchScanExec =>
483530
// we use f1 col for v2 tables due to schema pruning
484531
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
@@ -933,7 +980,7 @@ index b5b34922694..a72403780c4 100644
933980
protected val baseResourcePath = {
934981
// use the same way as `SQLQueryTestSuite` to get the resource path
935982
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
936-
index 525d97e4998..843f0472c23 100644
983+
index 525d97e4998..8a3e7457618 100644
937984
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
938985
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
939986
@@ -1508,7 +1508,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -1045,14 +1092,14 @@ index 18123a4d6ec..0fe185baa33 100644
10451092

10461093
test("non-matching optional group") {
10471094
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
1048-
index 75eabcb96f2..7c0bbd71551 100644
1095+
index 75eabcb96f2..7a681f147e4 100644
10491096
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
10501097
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
10511098
@@ -21,10 +21,11 @@ import scala.collection.mutable.ArrayBuffer
10521099

10531100
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
10541101
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, LogicalPlan, Project, Sort, Union}
1055-
+import org.apache.spark.sql.comet.CometScanExec
1102+
+import org.apache.spark.sql.comet.{CometNativeColumnarToRowExec, CometNativeScanExec, CometScanExec}
10561103
import org.apache.spark.sql.execution._
10571104
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
10581105
import org.apache.spark.sql.execution.datasources.FileScanRDD
@@ -1061,7 +1108,7 @@ index 75eabcb96f2..7c0bbd71551 100644
10611108
import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
10621109
import org.apache.spark.sql.internal.SQLConf
10631110
import org.apache.spark.sql.test.SharedSparkSession
1064-
@@ -1543,6 +1544,12 @@ class SubquerySuite extends QueryTest
1111+
@@ -1543,6 +1544,17 @@ class SubquerySuite extends QueryTest
10651112
fs.inputRDDs().forall(
10661113
_.asInstanceOf[FileScanRDD].filePartitions.forall(
10671114
_.files.forall(_.urlEncodedPath.contains("p=0"))))
@@ -1070,11 +1117,16 @@ index 75eabcb96f2..7c0bbd71551 100644
10701117
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
10711118
+ fs.inputRDDs().forall(
10721119
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
1120+
+ _.files.forall(_.urlEncodedPath.contains("p=0"))))
1121+
+ case CometNativeColumnarToRowExec(fs: CometNativeScanExec) =>
1122+
+ fs.partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
1123+
+ fs.inputRDDs().forall(
1124+
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
10731125
+ _.files.forall(_.urlEncodedPath.contains("p=0"))))
10741126
case _ => false
10751127
})
10761128
}
1077-
@@ -2108,7 +2115,7 @@ class SubquerySuite extends QueryTest
1129+
@@ -2108,7 +2120,7 @@ class SubquerySuite extends QueryTest
10781130

10791131
df.collect()
10801132
val exchanges = collect(df.queryExecution.executedPlan) {
@@ -1294,17 +1346,16 @@ index 4b3d3a4b805..56e1e0e6f16 100644
12941346

12951347
setupTestData()
12961348
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
1297-
index 9e9d717db3b..ec73082f458 100644
1349+
index 9e9d717db3b..bb04190c3ca 100644
12981350
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
12991351
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
13001352
@@ -17,7 +17,10 @@
13011353

13021354
package org.apache.spark.sql.execution
13031355

1304-
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
13051356
+import org.apache.comet.CometConf
13061357
+
1307-
+import org.apache.spark.sql.{DataFrame, IgnoreComet, QueryTest, Row}
1358+
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
13081359
+import org.apache.spark.sql.comet.CometProjectExec
13091360
import org.apache.spark.sql.connector.SimpleWritableDataSource
13101361
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
@@ -1321,17 +1372,7 @@ index 9e9d717db3b..ec73082f458 100644
13211372
assert(actual == expected)
13221373
}
13231374
}
1324-
@@ -112,7 +118,8 @@ abstract class RemoveRedundantProjectsSuiteBase
1325-
assertProjectExec(query, 1, 3)
1326-
}
1327-
1328-
- test("join with ordering requirement") {
1329-
+ test("join with ordering requirement",
1330-
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
1331-
val query = "select * from (select key, a, c, b from testView) as t1 join " +
1332-
"(select key, a, b, c from testView) as t2 on t1.key = t2.key where t2.a > 50"
1333-
assertProjectExec(query, 2, 2)
1334-
@@ -134,12 +141,21 @@ abstract class RemoveRedundantProjectsSuiteBase
1375+
@@ -134,12 +140,21 @@ abstract class RemoveRedundantProjectsSuiteBase
13351376
val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
13361377
df.collect()
13371378
val plan = df.queryExecution.executedPlan
@@ -1358,7 +1399,7 @@ index 9e9d717db3b..ec73082f458 100644
13581399
case g @ GenerateExec(_, requiredChildOutput, _, _, child) =>
13591400
g.copy(requiredChildOutput = requiredChildOutput.reverse,
13601401
child = ProjectExec(requiredChildOutput.reverse, child))
1361-
@@ -151,6 +167,7 @@ abstract class RemoveRedundantProjectsSuiteBase
1402+
@@ -151,6 +166,7 @@ abstract class RemoveRedundantProjectsSuiteBase
13621403
// The manually added ProjectExec node shouldn't be removed.
13631404
assert(collectWithSubqueries(newExecutedPlan) {
13641405
case p: ProjectExec => p
@@ -3056,7 +3097,7 @@ index 1510e8957f9..7618419d8ff 100644
30563097
implicit val formats = new DefaultFormats {
30573098
override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
30583099
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
3059-
index 52abd248f3a..7a199931a08 100644
3100+
index 52abd248f3a..b4e096cae24 100644
30603101
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
30613102
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
30623103
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive
@@ -3067,12 +3108,15 @@ index 52abd248f3a..7a199931a08 100644
30673108
import org.apache.spark.sql.execution._
30683109
import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
30693110
import org.apache.spark.sql.hive.execution.HiveTableScanExec
3070-
@@ -35,6 +36,9 @@ abstract class DynamicPartitionPruningHiveScanSuiteBase
3111+
@@ -35,6 +36,12 @@ abstract class DynamicPartitionPruningHiveScanSuiteBase
30713112
case s: FileSourceScanExec => s.partitionFilters.collect {
30723113
case d: DynamicPruningExpression => d.child
30733114
}
30743115
+ case s: CometScanExec => s.partitionFilters.collect {
30753116
+ case d: DynamicPruningExpression => d.child
3117+
+ }
3118+
+ case s: CometNativeScanExec => s.partitionFilters.collect {
3119+
+ case d: DynamicPruningExpression => d.child
30763120
+ }
30773121
case h: HiveTableScanExec => h.partitionPruningPred.collect {
30783122
case d: DynamicPruningExpression => d.child

0 commit comments

Comments
 (0)