Skip to content

Commit 370d98d

Browse files
committed
enable_spark_tests_comet_native_writer_fix_spark_rebase_main
1 parent dd05d3e commit 370d98d

4 files changed

Lines changed: 39 additions & 757 deletions

File tree

.github/workflows/spark_sql_test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ jobs:
155155
run: |
156156
cd apache-spark
157157
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
158-
NOLINT_ON_COMPILE=true ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ENABLE_COMET_WRITER=true COMET_PARQUET_SCAN_IMPL=${{ matrix.config.scan-impl }} ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
158+
NOLINT_ON_COMPILE=true ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=${{ matrix.config.scan-impl }} ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
159159
build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
160160
if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; then
161161
find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h "Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot accelerate/' | sort -u > fallback.log

dev/diffs/3.4.3.diff

Lines changed: 6 additions & 196 deletions
Original file line numberDiff line numberDiff line change
@@ -133,17 +133,6 @@ index db587dd9868..aac7295a53d 100644
133133
case _ => Map[String, String]()
134134
}
135135
new SparkPlanInfo(
136-
diff --git a/sql/core/src/test/resources/sql-tests/inputs/charvarchar.sql b/sql/core/src/test/resources/sql-tests/inputs/charvarchar.sql
137-
index b62cbf64323..8d1f0cb7d20 100644
138-
--- a/sql/core/src/test/resources/sql-tests/inputs/charvarchar.sql
139-
+++ b/sql/core/src/test/resources/sql-tests/inputs/charvarchar.sql
140-
@@ -1,3 +1,6 @@
141-
+-- TODO: support empty table write / CTAS in native parquet writer
142-
+--SET spark.comet.parquet.write.enabled = false
143-
+
144-
create table char_tbl(c char(5), v varchar(6)) using parquet;
145-
desc formatted char_tbl;
146-
desc formatted char_tbl c;
147136
diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
148137
index 7aef901da4f..f3d6e18926d 100644
149138
--- a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
@@ -1883,62 +1872,6 @@ index 593bd7bb4ba..32af28b0238 100644
18831872
}
18841873
assert(shuffles2.size == 4)
18851874
val smj2 = findTopLevelSortMergeJoin(adaptive2)
1886-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CharVarcharDDLTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CharVarcharDDLTestBase.scala
1887-
index f77b6336b81..b703603d26b 100644
1888-
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CharVarcharDDLTestBase.scala
1889-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CharVarcharDDLTestBase.scala
1890-
@@ -18,7 +18,7 @@
1891-
package org.apache.spark.sql.execution.command
1892-
1893-
import org.apache.spark.SparkConf
1894-
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
1895-
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row}
1896-
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
1897-
import org.apache.spark.sql.connector.catalog.InMemoryPartitionTableCatalog
1898-
import org.apache.spark.sql.internal.SQLConf
1899-
@@ -112,7 +112,8 @@ trait CharVarcharDDLTestBase extends QueryTest with SQLTestUtils {
1900-
}
1901-
}
1902-
1903-
- test("SPARK-33901: ctas should should not change table's schema") {
1904-
+ test("SPARK-33901: ctas should should not change table's schema",
1905-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3418")) {
1906-
withTable("t1", "t2") {
1907-
sql(s"CREATE TABLE t1(i CHAR(5), c VARCHAR(4)) USING $format")
1908-
sql(s"CREATE TABLE t2 USING $format AS SELECT * FROM t1")
1909-
@@ -129,7 +130,8 @@ trait CharVarcharDDLTestBase extends QueryTest with SQLTestUtils {
1910-
}
1911-
}
1912-
1913-
- test("SPARK-37160: CREATE TABLE AS SELECT with CHAR_AS_VARCHAR") {
1914-
+ test("SPARK-37160: CREATE TABLE AS SELECT with CHAR_AS_VARCHAR",
1915-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3419")) {
1916-
withTable("t1", "t2") {
1917-
sql(s"CREATE TABLE t1(col CHAR(5)) USING $format")
1918-
checkTableSchemaTypeStr("t1", Seq(Row("char(5)")))
1919-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
1920-
index 343b59a311e..9d5789c1d91 100644
1921-
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
1922-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
1923-
@@ -17,7 +17,7 @@
1924-
1925-
package org.apache.spark.sql.execution.datasources
1926-
1927-
-import org.apache.spark.sql.{QueryTest, Row}
1928-
+import org.apache.spark.sql.{IgnoreComet, QueryTest, Row}
1929-
import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest
1930-
import org.apache.spark.sql.test.SharedSparkSession
1931-
1932-
@@ -28,7 +28,8 @@ class FileFormatWriterSuite
1933-
1934-
import testImplicits._
1935-
1936-
- test("empty file should be skipped while write to file") {
1937-
+ test("empty file should be skipped while write to file",
1938-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3417")) {
1939-
withTempPath { path =>
1940-
spark.range(100).repartition(10).where("id = 50").write.parquet(path.toString)
1941-
val partFiles = path.listFiles()
19421875
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
19431876
index bd9c79e5b96..2ada8c28842 100644
19441877
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
@@ -2159,7 +2092,7 @@ index 104b4e416cd..37ea65081e4 100644
21592092
case _ =>
21602093
throw new AnalysisException("Can not match ParquetTable in the query.")
21612094
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
2162-
index 8670d95c65e..3fe49802309 100644
2095+
index 8670d95c65e..b624c3811dd 100644
21632096
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
21642097
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
21652098
@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
@@ -2172,41 +2105,6 @@ index 8670d95c65e..3fe49802309 100644
21722105
withAllParquetReaders {
21732106
checkAnswer(
21742107
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
2175-
@@ -1541,7 +1542,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2176-
}
2177-
}
2178-
2179-
- test("Write Spark version into Parquet metadata") {
2180-
+// TODO : Comet native writer to add spark / comet version into parquet metadata
2181-
+ test("Write Spark version into Parquet metadata",
2182-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3427")) {
2183-
withTempPath { dir =>
2184-
spark.range(1).repartition(1).write.parquet(dir.getAbsolutePath)
2185-
assert(getMetaData(dir)(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT)
2186-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
2187-
index 8b386e8f689..28ced6209e0 100644
2188-
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
2189-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
2190-
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
2191-
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
2192-
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
2193-
2194-
-import org.apache.spark.sql.Row
2195-
+import org.apache.spark.sql.{IgnoreComet, Row}
2196-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2197-
import org.apache.spark.sql.internal.SQLConf
2198-
import org.apache.spark.sql.test.SharedSparkSession
2199-
@@ -153,7 +153,9 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
2200-
}
2201-
}
2202-
2203-
- test("parquet timestamp conversion") {
2204-
+ // TODO : Support legacy timestamps conversion /cast in comet native writer
2205-
+ test("parquet timestamp conversion",
2206-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3425")) {
2207-
// Make a table with one parquet file written by impala, and one parquet file written by spark.
2208-
// We should only adjust the timestamps in the impala file, and only if the conf is set
2209-
val impalaFile = "test-data/impala_timestamp.parq"
22102108
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
22112109
index 29cb224c878..44837aa953b 100644
22122110
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -2735,60 +2633,6 @@ index 1f55742cd67..f20129d9dd8 100644
27352633
assert(bucketedScan.length == expectedNumBucketedScan)
27362634
}
27372635

2738-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
2739-
index 2207661478d..dc4e4b4240c 100644
2740-
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
2741-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
2742-
@@ -237,7 +237,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
2743-
}
2744-
}
2745-
2746-
- test("INSERT INTO TABLE - complex type but different names") {
2747-
+ test("INSERT INTO TABLE - complex type but different names",
2748-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3426")) {
2749-
val tab1 = "tab1"
2750-
val tab2 = "tab2"
2751-
withTable(tab1, tab2) {
2752-
@@ -889,7 +890,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
2753-
assert(message.contains("target table has 2 column(s) but the inserted data has 1 column(s)"))
2754-
}
2755-
2756-
- test("SPARK-38336 INSERT INTO statements with tables with default columns: positive tests") {
2757-
+ test("SPARK-38336 INSERT INTO statements with tables with default columns: positive tests",
2758-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3421")) {
2759-
// When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is enabled, and no
2760-
// explicit DEFAULT value is available when the INSERT INTO statement provides fewer
2761-
// values than expected, NULL values are appended in their place.
2762-
@@ -1286,7 +1288,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
2763-
}
2764-
}
2765-
2766-
- test("SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS: Positive tests") {
2767-
+ test("SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS: Positive tests",
2768-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3422")) {
2769-
// There is a complex expression in the default value.
2770-
val createTableBooleanCol = "create table t(i boolean) using parquet"
2771-
val createTableIntCol = "create table t(i int) using parquet"
2772-
@@ -1984,7 +1987,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
2773-
}
2774-
}
2775-
2776-
- test("SPARK-43071: INSERT INTO from queries whose final operators are not projections") {
2777-
+ test("SPARK-43071: INSERT INTO from queries whose final operators are not projections",
2778-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3423")) {
2779-
def runTest(insert: String, expected: Seq[Row]): Unit = {
2780-
withTable("t1", "t2") {
2781-
sql("create table t1(i boolean, s bigint default 42) using parquet")
2782-
@@ -2052,7 +2056,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
2783-
}
2784-
}
2785-
2786-
- test("SPARK-29174 Support LOCAL in INSERT OVERWRITE DIRECTORY to data source") {
2787-
+ test("SPARK-29174 Support LOCAL in INSERT OVERWRITE DIRECTORY to data source",
2788-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3420")) {
2789-
withTempPath { dir =>
2790-
val path = dir.toURI.getPath
2791-
sql(s"""create table tab1 ( a int) using parquet location '$path'""")
27922636
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
27932637
index 75f440caefc..36b1146bc3a 100644
27942638
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -2959,32 +2803,8 @@ index abe606ad9c1..2d930b64cca 100644
29592803
val tblSourceName = "tbl_src"
29602804
val tblTargetName = "tbl_target"
29612805
val tblSourceQualified = s"default.$tblSourceName"
2962-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
2963-
index 44c9fbadfac..5f98bb9be17 100644
2964-
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
2965-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
2966-
@@ -519,7 +519,8 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
2967-
Option(dir).map(spark.read.format("org.apache.spark.sql.test").load)
2968-
}
2969-
2970-
- test("write path implements onTaskCommit API correctly") {
2971-
+ test("write path implements onTaskCommit API correctly",
2972-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3428")) {
2973-
withSQLConf(
2974-
SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
2975-
classOf[MessageCapturingCommitProtocol].getCanonicalName) {
2976-
@@ -1069,7 +1070,8 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
2977-
}
2978-
}
2979-
2980-
- test("Insert overwrite table command should output correct schema: basic") {
2981-
+ test("Insert overwrite table command should output correct schema: basic",
2982-
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/3424")) {
2983-
withTable("tbl", "tbl2") {
2984-
withView("view1") {
2985-
val df = spark.range(10).toDF("id")
29862806
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
2987-
index dd55fcfe42c..e898fc33bab 100644
2807+
index dd55fcfe42c..a1d390c93d0 100644
29882808
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
29892809
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
29902810
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
@@ -3042,7 +2862,7 @@ index dd55fcfe42c..e898fc33bab 100644
30422862
}
30432863
}
30442864

3045-
@@ -242,6 +265,34 @@ private[sql] trait SQLTestUtilsBase
2865+
@@ -242,6 +265,29 @@ private[sql] trait SQLTestUtilsBase
30462866
protected override def _sqlContext: SQLContext = self.spark.sqlContext
30472867
}
30482868

@@ -3068,16 +2888,11 @@ index dd55fcfe42c..e898fc33bab 100644
30682888
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
30692889
+ v != null && v.toBoolean
30702890
+ }
3071-
+
3072-
+ protected def isCometWriterEnabled: Boolean = {
3073-
+ val v = System.getenv("ENABLE_COMET_WRITER")
3074-
+ v != null && v.toBoolean
3075-
+ }
30762891
+
30772892
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
30782893
SparkSession.setActiveSession(spark)
30792894
super.withSQLConf(pairs: _*)(f)
3080-
@@ -434,6 +485,8 @@ private[sql] trait SQLTestUtilsBase
2895+
@@ -434,6 +480,8 @@ private[sql] trait SQLTestUtilsBase
30812896
val schema = df.schema
30822897
val withoutFilters = df.queryExecution.executedPlan.transform {
30832898
case FilterExec(_, child) => child
@@ -3087,10 +2902,10 @@ index dd55fcfe42c..e898fc33bab 100644
30872902

30882903
spark.internalCreateDataFrame(withoutFilters.execute(), schema)
30892904
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
3090-
index ed2e309fa07..9c5c393ad14 100644
2905+
index ed2e309fa07..a5ea58146ad 100644
30912906
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
30922907
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
3093-
@@ -74,6 +74,36 @@ trait SharedSparkSessionBase
2908+
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
30942909
// this rule may potentially block testing of other optimization rules such as
30952910
// ConstantPropagation etc.
30962911
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
@@ -3118,11 +2933,6 @@ index ed2e309fa07..9c5c393ad14 100644
31182933
+ conf
31192934
+ .set("spark.sql.ansi.enabled", "true")
31202935
+ }
3121-
+
3122-
+ if (isCometWriterEnabled) {
3123-
+ conf.set("spark.comet.parquet.write.enabled", "true")
3124-
+ conf.set("spark.comet.operator.DataWritingCommandExec.allowIncompatible", "true")
3125-
+ }
31262936
+ }
31272937
conf.set(
31282938
StaticSQLConf.WAREHOUSE_PATH,

0 commit comments

Comments
 (0)