diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index ac56840013..8f427b1107 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -535,46 +535,70 @@ index ed182322aec..1ae6afa686a 100644 spark.range(100).write.saveAsTable(s"$dbName.$table2Name") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala -index 93ff7becaec..7b2871cc656 100644 +index 93ff7becaec..87537a25b3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala -@@ -20,6 +20,7 @@ package org.apache.spark.sql - import java.sql.{Date, Timestamp} - import java.util.Locale - -+import org.apache.spark.sql.IgnoreComet +@@ -23,10 +23,11 @@ import java.util.Locale import org.apache.spark.sql.catalyst.optimizer.RemoveNoopUnion import org.apache.spark.sql.catalyst.plans.logical.Union import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning -@@ -1511,7 +1512,8 @@ class DataFrameSetOperationsSuite extends QueryTest - } - } ++import org.apache.spark.sql.comet.CometUnionExec + import org.apache.spark.sql.execution.{SparkPlan, UnionExec} + import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +-import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec} ++import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeLike} + import org.apache.spark.sql.functions._ + import org.apache.spark.sql.internal.SQLConf + import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSparkSession, SQLTestData} +@@ -1519,11 +1520,12 @@ class DataFrameSetOperationsSuite extends QueryTest + val union = df1.repartition($"a").union(df2.repartition($"a")) + val unionExec = union.queryExecution.executedPlan.collect { + case u: UnionExec => u ++ case u: CometUnionExec => u + } + assert(unionExec.size == 1) -- test("SPARK-52921: union partitioning - reused shuffle") { -+ test("SPARK-52921: union partitioning - reused shuffle", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098")) { - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { - val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", "c") - val df2 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", "c") -@@ -1538,7 +1540,8 @@ class DataFrameSetOperationsSuite extends QueryTest - } - } + val shuffle = df1.repartition($"a").queryExecution.executedPlan.collect { +- case s: ShuffleExchangeExec => s ++ case s: ShuffleExchangeLike => s + } + assert(shuffle.size == 1) -- test("SPARK-52921: union partitioning - semantic equality") { -+ test("SPARK-52921: union partitioning - semantic equality", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098")) { - val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", "c") - val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e", "f") +@@ -1554,11 +1556,12 @@ class DataFrameSetOperationsSuite extends QueryTest + val union = df1.repartition($"a").union(df2.repartition($"d")) + val unionExec = union.queryExecution.executedPlan.collect { + case u: UnionExec => u ++ case u: CometUnionExec => u + } + assert(unionExec.size == 1) -@@ -1589,7 +1592,8 @@ class DataFrameSetOperationsSuite extends QueryTest - } - } + val shuffle = df1.repartition($"a").queryExecution.executedPlan.collect { +- case s: ShuffleExchangeExec => s ++ case s: ShuffleExchangeLike => s + } + assert(shuffle.size == 1) + +@@ -1573,10 +1576,10 @@ class DataFrameSetOperationsSuite extends QueryTest + // Avoid unnecessary shuffle if union output partitioning is enabled + val shuffledUnion = union.repartition($"a") + val shuffleNumBefore = union.queryExecution.executedPlan.collect { +- case s: ShuffleExchangeExec => s ++ case s: ShuffleExchangeLike => s + } + val shuffleNumAfter = shuffledUnion.queryExecution.executedPlan.collect { +- case s: ShuffleExchangeExec => s ++ case s: ShuffleExchangeLike => s + } -- test("SPARK-52921: union partitioning - range partitioning") { -+ test("SPARK-52921: union partitioning - range partitioning", -+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4098")) { - val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", "c") - val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e", "f") + if (enabled) { +@@ -1605,6 +1608,7 @@ class DataFrameSetOperationsSuite extends QueryTest + val union = df1.repartitionByRange($"a").union(df2.repartitionByRange($"d")) + val unionExec = union.queryExecution.executedPlan.collect { + case u: UnionExec => u ++ case u: CometUnionExec => u + } + assert(unionExec.size == 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 5b88eeefeca..d4f07bc182a 100644