11diff --git a/pom.xml b/pom.xml
2- index a0e25ce4d8d..b95fba458f2 100644
2+ index 68e2c422a24..540bdabf825 100644
33--- a/pom.xml
44+++ b/pom.xml
55@@ -152,6 +152,8 @@
@@ -38,7 +38,7 @@ index a0e25ce4d8d..b95fba458f2 100644
3838 </dependencyManagement>
3939
4040diff --git a/sql/core/pom.xml b/sql/core/pom.xml
41- index e3d324c8edb..22342150522 100644
41+ index f08b33575fc..424e0da32fd 100644
4242--- a/sql/core/pom.xml
4343+++ b/sql/core/pom.xml
4444@@ -77,6 +77,10 @@
@@ -216,7 +216,7 @@ index 0efe0877e9b..423d3b3d76d 100644
216216 -- SELECT_HAVING
217217 -- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
218218diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
219- index e5494726695..00937f025c2 100644
219+ index 9815cb816c9..95b5f9992b0 100644
220220--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
221221+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
222222@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants
@@ -239,7 +239,7 @@ index e5494726695..00937f025c2 100644
239239
240240 test("A cached table preserves the partitioning and ordering of its cached SparkPlan") {
241241diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
242- index 6f3090d8908..c08a60fb0c2 100644
242+ index 5a8681aed97..da9d25e2eb4 100644
243243--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
244244+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
245245@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Expand
@@ -336,7 +336,7 @@ index 7ee18df3756..d09f70e5d99 100644
336336 assert(exchanges.size == 2)
337337 }
338338diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
339- index a1d5d579338..c201d39cc78 100644
339+ index 47a311c71d5..342e71cfdd4 100644
340340--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
341341+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
342342@@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
@@ -624,7 +624,7 @@ index 7af826583bd..3c3def1eb67 100644
624624 assert(shuffleMergeJoins.size == 1)
625625 }
626626diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
627- index 44c8cb92fc3..f098beeca26 100644
627+ index 4d256154c85..66a5473852d 100644
628628--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
629629+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
630630@@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -822,7 +822,7 @@ index 44c8cb92fc3..f098beeca26 100644
822822 checkAnswer(fullJoinDF, Row(100))
823823 }
824824 }
825- @@ -1611 ,6 +1640 ,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
825+ @@ -1583 ,6 +1612 ,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
826826 Seq(semiJoinDF, antiJoinDF).foreach { df =>
827827 assert(collect(df.queryExecution.executedPlan) {
828828 case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true
@@ -832,7 +832,7 @@ index 44c8cb92fc3..f098beeca26 100644
832832 }.size == 1)
833833 }
834834 }
835- @@ -1655 ,14 +1687 ,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
835+ @@ -1627 ,14 +1659 ,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
836836
837837 test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") {
838838 def check(plan: SparkPlan): Unit = {
@@ -855,7 +855,7 @@ index 44c8cb92fc3..f098beeca26 100644
855855 }
856856 dupStreamSideColTest("SHUFFLE_HASH", check)
857857 }
858- @@ -1798 ,7 +1836 ,8 @@ class ThreadLeakInSortMergeJoinSuite
858+ @@ -1770 ,7 +1808 ,8 @@ class ThreadLeakInSortMergeJoinSuite
859859 sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
860860 }
861861
@@ -879,7 +879,7 @@ index c26757c9cff..d55775f09d7 100644
879879 protected val baseResourcePath = {
880880 // use the same way as `SQLQueryTestSuite` to get the resource path
881881diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
882- index 3cf2bfd17ab..49728c35c42 100644
882+ index 793a0da6a86..181bfc16e4b 100644
883883--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
884884+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
885885@@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
@@ -2091,10 +2091,10 @@ index 8e88049f51e..8f3cf8a0f80 100644
20912091 case _ =>
20922092 throw new AnalysisException("Can not match ParquetTable in the query.")
20932093diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
2094- index 8ed9ef1630e..eed2a6f5ad5 100644
2094+ index 4f8a9e39716..5da031994ff 100644
20952095--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
20962096+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
2097- @@ -1345 ,7 +1345 ,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2097+ @@ -1335 ,7 +1335 ,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
20982098 }
20992099 }
21002100
@@ -2104,6 +2104,39 @@ index 8ed9ef1630e..eed2a6f5ad5 100644
21042104 withAllParquetReaders {
21052105 checkAnswer(
21062106 // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
2107+ @@ -1541,7 +1542,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2108+ }
2109+ }
2110+
2111+ - test("Write Spark version into Parquet metadata") {
2112+ + test("Write Spark version into Parquet metadata",
2113+ + IgnoreComet("comet does not write spark version in parquet metadata")) {
2114+ withTempPath { dir =>
2115+ spark.range(1).repartition(1).write.parquet(dir.getAbsolutePath)
2116+ assert(getMetaData(dir)(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT)
2117+ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
2118+ index 8b386e8f689..67a41628a3b 100644
2119+ --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
2120+ +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
2121+ @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
2122+ import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
2123+ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
2124+
2125+ - import org.apache.spark.sql.Row
2126+ + import org.apache.spark.sql.{IgnoreComet, Row}
2127+ import org.apache.spark.sql.catalyst.util.DateTimeUtils
2128+ import org.apache.spark.sql.internal.SQLConf
2129+ import org.apache.spark.sql.test.SharedSparkSession
2130+ @@ -153,7 +153,8 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
2131+ }
2132+ }
2133+
2134+ - test("parquet timestamp conversion") {
2135+ + test("parquet timestamp conversion",
2136+ + IgnoreComet("timestamp96 conversion failed with the native writer")) {
2137+ // Make a table with one parquet file written by impala, and one parquet file written by spark.
2138+ // We should only adjust the timestamps in the impala file, and only if the conf is set
2139+ val impalaFile = "test-data/impala_timestamp.parq"
21072140diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
21082141index f6472ba3d9d..7a8f5317ed7 100644
21092142--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -2782,7 +2815,7 @@ index abe606ad9c1..2d930b64cca 100644
27822815 val tblTargetName = "tbl_target"
27832816 val tblSourceQualified = s"default.$tblSourceName"
27842817diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
2785- index e937173a590..ca06132102d 100644
2818+ index e937173a590..5fede1579f7 100644
27862819--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
27872820+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
27882821@@ -27,6 +27,7 @@ import scala.concurrent.duration._
@@ -2831,7 +2864,7 @@ index e937173a590..ca06132102d 100644
28312864 }
28322865 }
28332866
2834- @@ -242,6 +265,29 @@ private[sql] trait SQLTestUtilsBase
2867+ @@ -242,6 +265,39 @@ private[sql] trait SQLTestUtilsBase
28352868 protected override def _sqlContext: SQLContext = self.spark.sqlContext
28362869 }
28372870
@@ -2857,11 +2890,21 @@ index e937173a590..ca06132102d 100644
28572890+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
28582891+ v != null && v.toBoolean
28592892+ }
2893+ +
2894+ + /**
2895+ + * Whether Spark should apply comet writer is enabled. This is only effective when
2896+ + * [[isCometEnabled]] returns true.
2897+ + */
2898+ +
2899+ + protected def isCometWriterEnabled: Boolean = {
2900+ + val v = System.getenv("ENABLE_COMET_WRITER")
2901+ + v != null && v.toBoolean
2902+ + }
28602903+
28612904 protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
28622905 SparkSession.setActiveSession(spark)
28632906 super.withSQLConf(pairs: _*)(f)
2864- @@ -435,6 +481 ,8 @@ private[sql] trait SQLTestUtilsBase
2907+ @@ -435,6 +491 ,8 @@ private[sql] trait SQLTestUtilsBase
28652908 val schema = df.schema
28662909 val withoutFilters = df.queryExecution.executedPlan.transform {
28672910 case FilterExec(_, child) => child
@@ -2871,10 +2914,10 @@ index e937173a590..ca06132102d 100644
28712914
28722915 spark.internalCreateDataFrame(withoutFilters.execute(), schema)
28732916diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2874- index ed2e309fa07..a5ea58146ad 100644
2917+ index ed2e309fa07..9c5c393ad14 100644
28752918--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
28762919+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2877- @@ -74,6 +74,31 @@ trait SharedSparkSessionBase
2920+ @@ -74,6 +74,36 @@ trait SharedSparkSessionBase
28782921 // this rule may potentially block testing of other optimization rules such as
28792922 // ConstantPropagation etc.
28802923 .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
@@ -2902,6 +2945,11 @@ index ed2e309fa07..a5ea58146ad 100644
29022945+ conf
29032946+ .set("spark.sql.ansi.enabled", "true")
29042947+ }
2948+ +
2949+ + if (isCometWriterEnabled) {
2950+ + conf.set("spark.comet.parquet.write.enabled", "true")
2951+ + conf.set("spark.comet.operator.DataWritingCommandExec.allowIncompatible", "true")
2952+ + }
29052953+ }
29062954 conf.set(
29072955 StaticSQLConf.WAREHOUSE_PATH,
0 commit comments