Skip to content

Commit bf3cf9b

Browse files
authored
fix: enable Spark 4 SQL tests previously ignored for issues #3313 and #3314 (#4092)
1 parent fdf00d4 commit bf3cf9b

2 files changed

Lines changed: 22 additions & 56 deletions

File tree

dev/diffs/4.0.1.diff

Lines changed: 19 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ index 81713c777bc..b5f92ed9742 100644
535535
assert(exchanges.size == 2)
536536
}
537537
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
538-
index 2c24cc7d570..5a1fe7017c3 100644
538+
index 2c24cc7d570..8c214e7d05c 100644
539539
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
540540
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
541541
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -605,15 +605,7 @@ index 2c24cc7d570..5a1fe7017c3 100644
605605
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
606606
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
607607
val df = sql(
608-
@@ -1330,6 +1347,7 @@ abstract class DynamicPartitionPruningSuiteBase
609-
}
610-
611-
test("Subquery reuse across the whole plan",
612-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313"),
613-
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
614-
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
615-
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
616-
@@ -1424,7 +1442,8 @@ abstract class DynamicPartitionPruningSuiteBase
608+
@@ -1424,7 +1441,8 @@ abstract class DynamicPartitionPruningSuiteBase
617609
}
618610
}
619611

@@ -623,15 +615,15 @@ index 2c24cc7d570..5a1fe7017c3 100644
623615
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
624616
val df = sql(
625617
""" WITH v as (
626-
@@ -1578,6 +1597,7 @@ abstract class DynamicPartitionPruningSuiteBase
618+
@@ -1578,6 +1596,7 @@ abstract class DynamicPartitionPruningSuiteBase
627619

628620
val subqueryBroadcastExecs = collectWithSubqueries(df.queryExecution.executedPlan) {
629621
case s: SubqueryBroadcastExec => s
630622
+ case s: CometSubqueryBroadcastExec => s
631623
}
632624
assert(subqueryBroadcastExecs.size === 1)
633625
subqueryBroadcastExecs.foreach { subqueryBroadcastExec =>
634-
@@ -1730,6 +1750,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
626+
@@ -1730,6 +1749,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
635627
case s: BatchScanExec =>
636628
// we use f1 col for v2 tables due to schema pruning
637629
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
@@ -668,7 +660,7 @@ index 9c90e0105a4..fadf2f0f698 100644
668660

669661
test("SPARK-35884: Explain Formatted") {
670662
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
671-
index 9c529d14221..a046f1ed1ca 100644
663+
index 9c529d14221..ab2850b5d68 100644
672664
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
673665
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
674666
@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
@@ -680,20 +672,16 @@ index 9c529d14221..a046f1ed1ca 100644
680672
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
681673
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
682674
import org.apache.spark.sql.execution.datasources.FilePartition
683-
@@ -203,7 +205,11 @@ class FileBasedDataSourceSuite extends QueryTest
675+
@@ -203,7 +205,7 @@ class FileBasedDataSourceSuite extends QueryTest
684676
}
685677

686678
allFileBasedDataSources.foreach { format =>
687679
- testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
688-
+ val ignoreMissingTags: Seq[org.scalatest.Tag] = if (format == "parquet") {
689-
+ Seq(IgnoreCometNativeDataFusion(
690-
+ "https://github.com/apache/datafusion-comet/issues/3314"))
691-
+ } else Seq.empty
692-
+ test(s"Enabling/disabling ignoreMissingFiles using $format", ignoreMissingTags: _*) { quietly {
680+
+ test(s"Enabling/disabling ignoreMissingFiles using $format") { quietly {
693681
def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
694682
withTempDir { dir =>
695683
val basePath = dir.getCanonicalPath
696-
@@ -263,7 +269,7 @@ class FileBasedDataSourceSuite extends QueryTest
684+
@@ -263,7 +265,7 @@ class FileBasedDataSourceSuite extends QueryTest
697685
}
698686
}
699687
}
@@ -702,7 +690,7 @@ index 9c529d14221..a046f1ed1ca 100644
702690
}
703691

704692
Seq("json", "orc").foreach { format =>
705-
@@ -668,18 +674,25 @@ class FileBasedDataSourceSuite extends QueryTest
693+
@@ -668,18 +670,25 @@ class FileBasedDataSourceSuite extends QueryTest
706694
checkAnswer(sql(s"select A from $tableName"), data.select("A"))
707695

708696
// RuntimeException is triggered at executor side, which is then wrapped as
@@ -735,31 +723,31 @@ index 9c529d14221..a046f1ed1ca 100644
735723
condition = "_LEGACY_ERROR_TEMP_2093",
736724
parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]")
737725
)
738-
@@ -967,6 +980,7 @@ class FileBasedDataSourceSuite extends QueryTest
726+
@@ -967,6 +976,7 @@ class FileBasedDataSourceSuite extends QueryTest
739727
assert(bJoinExec.isEmpty)
740728
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
741729
case smJoin: SortMergeJoinExec => smJoin
742730
+ case smJoin: CometSortMergeJoinExec => smJoin
743731
}
744732
assert(smJoinExec.nonEmpty)
745733
}
746-
@@ -1027,6 +1041,7 @@ class FileBasedDataSourceSuite extends QueryTest
734+
@@ -1027,6 +1037,7 @@ class FileBasedDataSourceSuite extends QueryTest
747735

748736
val fileScan = df.queryExecution.executedPlan collectFirst {
749737
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
750738
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
751739
}
752740
assert(fileScan.nonEmpty)
753741
assert(fileScan.get.partitionFilters.nonEmpty)
754-
@@ -1068,6 +1083,7 @@ class FileBasedDataSourceSuite extends QueryTest
742+
@@ -1068,6 +1079,7 @@ class FileBasedDataSourceSuite extends QueryTest
755743

756744
val fileScan = df.queryExecution.executedPlan collectFirst {
757745
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
758746
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
759747
}
760748
assert(fileScan.nonEmpty)
761749
assert(fileScan.get.partitionFilters.isEmpty)
762-
@@ -1252,6 +1268,9 @@ class FileBasedDataSourceSuite extends QueryTest
750+
@@ -1252,6 +1264,9 @@ class FileBasedDataSourceSuite extends QueryTest
763751
val filters = df.queryExecution.executedPlan.collect {
764752
case f: FileSourceScanLike => f.dataFilters
765753
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -1802,20 +1790,6 @@ index 47679ed7865..9ffbaecb98e 100644
18021790
}.length == hashAggCount)
18031791
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount)
18041792
}
1805-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
1806-
index 77a988f340e..263208a67d9 100644
1807-
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
1808-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
1809-
@@ -1061,7 +1061,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
1810-
}
1811-
}
1812-
1813-
- test("alter temporary view should follow current storeAnalyzedPlanForView config") {
1814-
+ test("alter temporary view should follow current storeAnalyzedPlanForView config",
1815-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
1816-
withTable("t") {
1817-
Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
1818-
withView("v1") {
18191793
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
18201794
index aed11badb71..1a365b5aacf 100644
18211795
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -2799,7 +2773,7 @@ index 4474ec1fd42..05fa0257c82 100644
27992773
checkAnswer(
28002774
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
28012775
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
2802-
index bba71f1c48d..faee9b4ce83 100644
2776+
index bba71f1c48d..35247c13ad9 100644
28032777
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
28042778
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
28052779
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
@@ -2820,17 +2794,7 @@ index bba71f1c48d..faee9b4ce83 100644
28202794
val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false)))
28212795

28222796
Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType =>
2823-
@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
2824-
}
2825-
}
2826-
2827-
- test("Enabling/disabling ignoreCorruptFiles") {
2828-
+ test("Enabling/disabling ignoreCorruptFiles",
2829-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3314")) {
2830-
def testIgnoreCorruptFiles(options: Map[String, String]): Unit = {
2831-
withTempDir { dir =>
2832-
val basePath = dir.getCanonicalPath
2833-
@@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
2797+
@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
28342798
Seq(Some("A"), Some("A"), None).toDF().repartition(1)
28352799
.write.parquet(path.getAbsolutePath)
28362800
val df = spark.read.parquet(path.getAbsolutePath)
@@ -2843,7 +2807,7 @@ index bba71f1c48d..faee9b4ce83 100644
28432807
}
28442808
}
28452809
}
2846-
@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
2810+
@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
28472811
testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
28482812
}
28492813

@@ -2853,7 +2817,7 @@ index bba71f1c48d..faee9b4ce83 100644
28532817
def readParquet(schema: String, path: File): DataFrame = {
28542818
spark.read.schema(schema).parquet(path.toString)
28552819
}
2856-
@@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
2820+
@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
28572821
checkAnswer(readParquet(schema2, path), df)
28582822
}
28592823

@@ -2863,7 +2827,7 @@ index bba71f1c48d..faee9b4ce83 100644
28632827
val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
28642828
checkAnswer(readParquet(schema1, path), df)
28652829
val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
2866-
@@ -1084,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
2830+
@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
28672831
val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
28682832
df.write.parquet(path.toString)
28692833

@@ -2873,7 +2837,7 @@ index bba71f1c48d..faee9b4ce83 100644
28732837
checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
28742838
checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00"))
28752839
checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
2876-
@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
2840+
@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
28772841
}
28782842
}
28792843

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,11 @@ class CometExecIterator(
180180
case parquetError() =>
181181
// See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError
182182
// See org.apache.parquet.hadoop.ParquetFileReader for error message.
183+
// _LEGACY_ERROR_TEMP_2254 has no message placeholders; Spark 4 strict-checks
184+
// parameters and raises INTERNAL_ERROR if any are passed.
183185
throw new SparkException(
184186
errorClass = "_LEGACY_ERROR_TEMP_2254",
185-
messageParameters = Map("message" -> e.getMessage),
187+
messageParameters = Map.empty,
186188
cause = new SparkException("File is not a Parquet file.", e))
187189
case _ =>
188190
throw e

0 commit comments

Comments
 (0)