Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
7 changes: 0 additions & 7 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -554,13 +554,6 @@ object CometConf extends ShimCometConf {
.doubleConf
.createWithDefault(1.0)

val COMET_DPP_FALLBACK_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.dppFallback.enabled")
.category(CATEGORY_EXEC)
.doc("Whether to fall back to Spark for queries that use DPP.")
.booleanConf
.createWithDefault(true)

val COMET_DEBUG_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.debug.enabled")
.category(CATEGORY_EXEC)
Expand Down
106 changes: 75 additions & 31 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index d3544881af1..377683b10c5 100644
index d3544881af1..d075572c5b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,8 @@
Expand Down Expand Up @@ -417,28 +417,48 @@ index daef11ae4d6..9f3cc9181f2 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f33432ddb6f..7d758d2481f 100644
index f33432ddb6f..99729e465e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
+import org.apache.spark.sql.comet.CometScanExec
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec, CometSubqueryBroadcastExec}
import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, InMemoryTableWithV2FilterCatalog}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
@@ -262,6 +263,9 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -193,6 +194,7 @@ abstract class DynamicPartitionPruningSuiteBase
}
val subqueryBroadcast = dpExprs.collect {
case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _, _, _) => b
+ case InSubqueryExec(_, b: CometSubqueryBroadcastExec, _, _, _, _) => b
}

val hasFilter = if (withSubquery) "Should" else "Shouldn't"
@@ -247,6 +249,8 @@ abstract class DynamicPartitionPruningSuiteBase
val buf = collectDynamicPruningExpressions(df.queryExecution.executedPlan).collect {
case InSubqueryExec(_, b: SubqueryBroadcastExec, _, _, _, _) =>
b.index
+ case InSubqueryExec(_, b: CometSubqueryBroadcastExec, _, _, _, _) =>
+ b.indices.head
}
assert(buf.distinct.size == n)
}
@@ -262,6 +266,12 @@ abstract class DynamicPartitionPruningSuiteBase
case s: BatchScanExec => s.runtimeFilters.collect {
case d: DynamicPruningExpression => d.child
}
+ case s: CometScanExec => s.partitionFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
+ case s: CometNativeScanExec => s.partitionFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
case _ => Nil
}
}
@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1027,7 +1037,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -448,7 +468,26 @@ index f33432ddb6f..7d758d2481f 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("large", "dimTwo", "dimThree") {
@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1204,10 +1215,16 @@ abstract class DynamicPartitionPruningSuiteBase

val plan = df.queryExecution.executedPlan
val countSubqueryBroadcasts =
- collectWithSubqueries(plan)({ case _: SubqueryBroadcastExec => 1 }).sum
+ collectWithSubqueries(plan)({
+ case _: SubqueryBroadcastExec => 1
+ case _: CometSubqueryBroadcastExec => 1
+ }).sum

val countReusedSubqueryBroadcasts =
- collectWithSubqueries(plan)({ case ReusedSubqueryExec(_: SubqueryBroadcastExec) => 1}).sum
+ collectWithSubqueries(plan)({
+ case ReusedSubqueryExec(_: SubqueryBroadcastExec) => 1
+ case ReusedSubqueryExec(_: CometSubqueryBroadcastExec) => 1
+ }).sum

assert(countSubqueryBroadcasts == 1)
assert(countReusedSubqueryBroadcasts == 1)
@@ -1215,7 +1232,8 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
Expand All @@ -458,7 +497,7 @@ index f33432ddb6f..7d758d2481f 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
@@ -1423,7 +1430,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1423,7 +1441,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -468,7 +507,15 @@ index f33432ddb6f..7d758d2481f 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH v as (
@@ -1698,7 +1706,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1577,6 +1596,7 @@ abstract class DynamicPartitionPruningSuiteBase

val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
case s: SubqueryBroadcastExec => s
+ case s: CometSubqueryBroadcastExec => s
}
assert(subqueryBroadcastExecs.size === 1)
subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
@@ -1698,7 +1718,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
* Check the static scan metrics with and without DPP
*/
test("static scan metrics",
Expand All @@ -478,7 +525,7 @@ index f33432ddb6f..7d758d2481f 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
@@ -1729,6 +1738,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1729,6 +1750,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
Expand Down Expand Up @@ -933,7 +980,7 @@ index b5b34922694..a72403780c4 100644
protected val baseResourcePath = {
// use the same way as `SQLQueryTestSuite` to get the resource path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 525d97e4998..843f0472c23 100644
index 525d97e4998..8a3e7457618 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1508,7 +1508,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Expand Down Expand Up @@ -1045,14 +1092,14 @@ index 18123a4d6ec..0fe185baa33 100644

test("non-matching optional group") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 75eabcb96f2..7c0bbd71551 100644
index 75eabcb96f2..7a681f147e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -21,10 +21,11 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, LogicalPlan, Project, Sort, Union}
+import org.apache.spark.sql.comet.CometScanExec
+import org.apache.spark.sql.comet.{CometNativeColumnarToRowExec, CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
import org.apache.spark.sql.execution.datasources.FileScanRDD
Expand All @@ -1061,7 +1108,7 @@ index 75eabcb96f2..7c0bbd71551 100644
import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -1543,6 +1544,12 @@ class SubquerySuite extends QueryTest
@@ -1543,6 +1544,17 @@ class SubquerySuite extends QueryTest
fs.inputRDDs().forall(
_.asInstanceOf[FileScanRDD].filePartitions.forall(
_.files.forall(_.urlEncodedPath.contains("p=0"))))
Expand All @@ -1070,11 +1117,16 @@ index 75eabcb96f2..7c0bbd71551 100644
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
+ fs.inputRDDs().forall(
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
+ _.files.forall(_.urlEncodedPath.contains("p=0"))))
+ case CometNativeColumnarToRowExec(fs: CometNativeScanExec) =>
+ fs.partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
+ fs.inputRDDs().forall(
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(
+ _.files.forall(_.urlEncodedPath.contains("p=0"))))
case _ => false
})
}
@@ -2108,7 +2115,7 @@ class SubquerySuite extends QueryTest
@@ -2108,7 +2120,7 @@ class SubquerySuite extends QueryTest

df.collect()
val exchanges = collect(df.queryExecution.executedPlan) {
Expand Down Expand Up @@ -1294,17 +1346,16 @@ index 4b3d3a4b805..56e1e0e6f16 100644

setupTestData()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
index 9e9d717db3b..ec73082f458 100644
index 9e9d717db3b..bb04190c3ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
@@ -17,7 +17,10 @@

package org.apache.spark.sql.execution

-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.comet.CometConf
+
+import org.apache.spark.sql.{DataFrame, IgnoreComet, QueryTest, Row}
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.comet.CometProjectExec
import org.apache.spark.sql.connector.SimpleWritableDataSource
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
Expand All @@ -1321,17 +1372,7 @@ index 9e9d717db3b..ec73082f458 100644
assert(actual == expected)
}
}
@@ -112,7 +118,8 @@ abstract class RemoveRedundantProjectsSuiteBase
assertProjectExec(query, 1, 3)
}

- test("join with ordering requirement") {
+ test("join with ordering requirement",
+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
val query = "select * from (select key, a, c, b from testView) as t1 join " +
"(select key, a, b, c from testView) as t2 on t1.key = t2.key where t2.a > 50"
assertProjectExec(query, 2, 2)
@@ -134,12 +141,21 @@ abstract class RemoveRedundantProjectsSuiteBase
@@ -134,12 +140,21 @@ abstract class RemoveRedundantProjectsSuiteBase
val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0")
df.collect()
val plan = df.queryExecution.executedPlan
Expand All @@ -1358,7 +1399,7 @@ index 9e9d717db3b..ec73082f458 100644
case g @ GenerateExec(_, requiredChildOutput, _, _, child) =>
g.copy(requiredChildOutput = requiredChildOutput.reverse,
child = ProjectExec(requiredChildOutput.reverse, child))
@@ -151,6 +167,7 @@ abstract class RemoveRedundantProjectsSuiteBase
@@ -151,6 +166,7 @@ abstract class RemoveRedundantProjectsSuiteBase
// The manually added ProjectExec node shouldn't be removed.
assert(collectWithSubqueries(newExecutedPlan) {
case p: ProjectExec => p
Expand Down Expand Up @@ -3056,7 +3097,7 @@ index 1510e8957f9..7618419d8ff 100644
implicit val formats = new DefaultFormats {
override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
index 52abd248f3a..7a199931a08 100644
index 52abd248f3a..b4e096cae24 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive
Expand All @@ -3067,12 +3108,15 @@ index 52abd248f3a..7a199931a08 100644
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.hive.execution.HiveTableScanExec
@@ -35,6 +36,9 @@ abstract class DynamicPartitionPruningHiveScanSuiteBase
@@ -35,6 +36,12 @@ abstract class DynamicPartitionPruningHiveScanSuiteBase
case s: FileSourceScanExec => s.partitionFilters.collect {
case d: DynamicPruningExpression => d.child
}
+ case s: CometScanExec => s.partitionFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
+ case s: CometNativeScanExec => s.partitionFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
case h: HiveTableScanExec => h.partitionPruningPred.collect {
case d: DynamicPruningExpression => d.child
Expand Down
Loading
Loading