Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 55 additions & 31 deletions dev/diffs/4.1.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -535,46 +535,70 @@ index ed182322aec..1ae6afa686a 100644
spark.range(100).write.saveAsTable(s"$dbName.$table2Name")

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
index 93ff7becaec..7b2871cc656 100644
index 93ff7becaec..87537a25b3b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
import java.sql.{Date, Timestamp}
import java.util.Locale

+import org.apache.spark.sql.IgnoreComet
@@ -23,10 +23,11 @@ import java.util.Locale
import org.apache.spark.sql.catalyst.optimizer.RemoveNoopUnion
import org.apache.spark.sql.catalyst.plans.logical.Union
import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
@@ -1511,7 +1512,8 @@ class DataFrameSetOperationsSuite extends QueryTest
}
}
+import org.apache.spark.sql.comet.CometUnionExec
import org.apache.spark.sql.execution.{SparkPlan, UnionExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec}
+import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeLike}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSparkSession, SQLTestData}
@@ -1519,11 +1520,12 @@ class DataFrameSetOperationsSuite extends QueryTest
val union = df1.repartition($"a").union(df2.repartition($"a"))
val unionExec = union.queryExecution.executedPlan.collect {
case u: UnionExec => u
+ case u: CometUnionExec => u
}
assert(unionExec.size == 1)

- test("SPARK-52921: union partitioning - reused shuffle") {
+ test("SPARK-52921: union partitioning - reused shuffle",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098")) {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", "c")
val df2 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", "c")
@@ -1538,7 +1540,8 @@ class DataFrameSetOperationsSuite extends QueryTest
}
}
val shuffle = df1.repartition($"a").queryExecution.executedPlan.collect {
- case s: ShuffleExchangeExec => s
+ case s: ShuffleExchangeLike => s
}
assert(shuffle.size == 1)

- test("SPARK-52921: union partitioning - semantic equality") {
+ test("SPARK-52921: union partitioning - semantic equality",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098")) {
val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", "c")
val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e", "f")
@@ -1554,11 +1556,12 @@ class DataFrameSetOperationsSuite extends QueryTest
val union = df1.repartition($"a").union(df2.repartition($"d"))
val unionExec = union.queryExecution.executedPlan.collect {
case u: UnionExec => u
+ case u: CometUnionExec => u
}
assert(unionExec.size == 1)

@@ -1589,7 +1592,8 @@ class DataFrameSetOperationsSuite extends QueryTest
}
}
val shuffle = df1.repartition($"a").queryExecution.executedPlan.collect {
- case s: ShuffleExchangeExec => s
+ case s: ShuffleExchangeLike => s
}
assert(shuffle.size == 1)

@@ -1573,10 +1576,10 @@ class DataFrameSetOperationsSuite extends QueryTest
// Avoid unnecessary shuffle if union output partitioning is enabled
val shuffledUnion = union.repartition($"a")
val shuffleNumBefore = union.queryExecution.executedPlan.collect {
- case s: ShuffleExchangeExec => s
+ case s: ShuffleExchangeLike => s
}
val shuffleNumAfter = shuffledUnion.queryExecution.executedPlan.collect {
- case s: ShuffleExchangeExec => s
+ case s: ShuffleExchangeLike => s
}

- test("SPARK-52921: union partitioning - range partitioning") {
+ test("SPARK-52921: union partitioning - range partitioning",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098")) {
val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", "c")
val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e", "f")
if (enabled) {
@@ -1605,6 +1608,7 @@ class DataFrameSetOperationsSuite extends QueryTest
val union = df1.repartitionByRange($"a").union(df2.repartitionByRange($"d"))
val unionExec = union.queryExecution.executedPlan.collect {
case u: UnionExec => u
+ case u: CometUnionExec => u
}
assert(unionExec.size == 1)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 5b88eeefeca..d4f07bc182a 100644
Expand Down
Loading