Skip to content

Commit f83f51c

Browse files
andygroveclaude
andauthored
chore: Invert usingDataSourceExec test helper to usingLegacyNativeCometScan (#3310)
* Invert usingDataSourceExec test helper to usingLegacyNativeCometScan (#3309) With native_datafusion enabled in auto scan mode, test helpers that check for specific scan config values fail because auto resolves at plan time, not config time. Invert the logic so tests check for the legacy native_comet mode instead, which is forward-compatible with auto and any future scan implementations. - Rename usingDataSourceExec → usingLegacyNativeCometScan (inverted) - Rename usingDataSourceExecWithIncompatTypes → hasUnsignedSmallIntSafetyCheck - Update all call sites across 11 test files Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Fix schema evolution test to only expect success for native_datafusion The inversion from usingDataSourceExec to !usingLegacyNativeCometScan incorrectly broadened the condition to include native_iceberg_compat and auto modes, which do not support schema evolution (INT32 to bigint). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent e3761e0 commit f83f51c

10 files changed

Lines changed: 43 additions & 44 deletions

spark/src/test/scala/org/apache/comet/CometCastSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
6464
private val timestampPattern = "0123456789/:T" + whitespaceChars
6565

6666
lazy val usingParquetExecWithIncompatTypes: Boolean =
67-
usingDataSourceExecWithIncompatTypes(conf)
67+
hasUnsignedSmallIntSafetyCheck(conf)
6868

6969
test("all valid cast combinations covered") {
7070
val names = testNames
@@ -1087,7 +1087,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
10871087
|USING parquet
10881088
""".stripMargin)
10891089
sql("INSERT INTO TABLE tab1 SELECT named_struct('col1','1','col2','2')")
1090-
if (usingDataSourceExec) {
1090+
if (!usingLegacyNativeCometScan) {
10911091
checkSparkAnswerAndOperator(
10921092
"SELECT CAST(s AS struct<field1:string, field2:string>) AS new_struct FROM tab1")
10931093
} else {

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,7 +1509,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
15091509

15101510
test("round") {
15111511
// https://github.com/apache/datafusion-comet/issues/1441
1512-
assume(!usingDataSourceExec)
1512+
assume(usingLegacyNativeCometScan)
15131513
Seq(true, false).foreach { dictionaryEnabled =>
15141514
withTempDir { dir =>
15151515
val path = new Path(dir.toURI.toString, "test.parquet")
@@ -1573,7 +1573,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
15731573

15741574
test("hex") {
15751575
// https://github.com/apache/datafusion-comet/issues/1441
1576-
assume(!usingDataSourceExec)
1576+
assume(usingLegacyNativeCometScan)
15771577
Seq(true, false).foreach { dictionaryEnabled =>
15781578
withTempDir { dir =>
15791579
val path = new Path(dir.toURI.toString, "hex.parquet")
@@ -2607,7 +2607,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
26072607
}
26082608

26092609
test("get_struct_field with DataFusion ParquetExec - read entire struct") {
2610-
assume(usingDataSourceExec(conf))
2610+
assume(!usingLegacyNativeCometScan(conf))
26112611
withTempPath { dir =>
26122612
// create input file with Comet disabled
26132613
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
@@ -2644,7 +2644,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
26442644
}
26452645

26462646
test("read array[int] from parquet") {
2647-
assume(usingDataSourceExec(conf))
2647+
assume(!usingLegacyNativeCometScan(conf))
26482648

26492649
withTempPath { dir =>
26502650
// create input file with Comet disabled

spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
2929
for (col <- df.schema.fields.filterNot(f => isComplexType(f.dataType)).map(_.name)) {
3030
val sql = s"SELECT count(distinct $col) FROM t1"
3131
val (_, cometPlan) = checkSparkAnswer(sql)
32-
if (usingDataSourceExec) {
32+
if (!usingLegacyNativeCometScan) {
3333
assert(1 == collectNativeScans(cometPlan).length)
3434
}
3535

@@ -45,7 +45,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
4545
for (col <- df.schema.fields.filter(f => isComplexType(f.dataType)).map(_.name)) {
4646
val sql = s"SELECT count(distinct $col) FROM t1"
4747
val (_, cometPlan) = checkSparkAnswer(sql)
48-
if (usingDataSourceExec) {
48+
if (!usingLegacyNativeCometScan) {
4949
assert(1 == collectNativeScans(cometPlan).length)
5050
}
5151
}
@@ -57,7 +57,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
5757
for (col <- df.schema.fields.filterNot(f => isComplexType(f.dataType)).map(_.name)) {
5858
val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1, c2, c3"
5959
val (_, cometPlan) = checkSparkAnswer(sql)
60-
if (usingDataSourceExec) {
60+
if (!usingLegacyNativeCometScan) {
6161
assert(1 == collectNativeScans(cometPlan).length)
6262
}
6363

@@ -73,7 +73,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
7373
for (col <- df.schema.fields.filter(f => isComplexType(f.dataType)).map(_.name)) {
7474
val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1, c2, c3"
7575
val (_, cometPlan) = checkSparkAnswer(sql)
76-
if (usingDataSourceExec) {
76+
if (!usingLegacyNativeCometScan) {
7777
assert(1 == collectNativeScans(cometPlan).length)
7878
}
7979
}
@@ -87,7 +87,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
8787
for (col <- df.columns) {
8888
val sql = s"SELECT c1, c2, c3, count(distinct $col, c4, c5) FROM t1 group by c1, c2, c3"
8989
val (_, cometPlan) = checkSparkAnswer(sql)
90-
if (usingDataSourceExec) {
90+
if (!usingLegacyNativeCometScan) {
9191
assert(1 == collectNativeScans(cometPlan).length)
9292
}
9393
}
@@ -99,7 +99,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
9999
for (col <- df.columns) {
100100
val sql = s"SELECT $col, count(*) FROM t1 GROUP BY $col ORDER BY $col"
101101
val (_, cometPlan) = checkSparkAnswer(sql)
102-
if (usingDataSourceExec) {
102+
if (!usingLegacyNativeCometScan) {
103103
assert(1 == collectNativeScans(cometPlan).length)
104104
}
105105
}
@@ -112,7 +112,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
112112
for (col <- df.columns.drop(1)) {
113113
val sql = s"SELECT $groupCol, count($col) FROM t1 GROUP BY $groupCol ORDER BY $groupCol"
114114
val (_, cometPlan) = checkSparkAnswer(sql)
115-
if (usingDataSourceExec) {
115+
if (!usingLegacyNativeCometScan) {
116116
assert(1 == collectNativeScans(cometPlan).length)
117117
}
118118
}
@@ -126,7 +126,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
126126
val sql = s"SELECT $groupCol, count(${otherCol.mkString(", ")}) FROM t1 " +
127127
s"GROUP BY $groupCol ORDER BY $groupCol"
128128
val (_, cometPlan) = checkSparkAnswer(sql)
129-
if (usingDataSourceExec) {
129+
if (!usingLegacyNativeCometScan) {
130130
assert(1 == collectNativeScans(cometPlan).length)
131131
}
132132
}
@@ -138,7 +138,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
138138
// cannot run fully native due to HashAggregate
139139
val sql = s"SELECT min($col), max($col) FROM t1"
140140
val (_, cometPlan) = checkSparkAnswer(sql)
141-
if (usingDataSourceExec) {
141+
if (!usingLegacyNativeCometScan) {
142142
assert(1 == collectNativeScans(cometPlan).length)
143143
}
144144
}

spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
3737
val df = spark.read.parquet(filename)
3838
df.createOrReplaceTempView("t1")
3939
val sql = "SELECT * FROM t1"
40-
if (usingDataSourceExec) {
40+
if (!usingLegacyNativeCometScan) {
4141
checkSparkAnswerAndOperator(sql)
4242
} else {
4343
checkSparkAnswer(sql)
@@ -59,7 +59,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
5959
val df = spark.read.parquet(filename)
6060
df.createOrReplaceTempView("t1")
6161
val sql = "SELECT * FROM t1 LIMIT 500"
62-
if (usingDataSourceExec) {
62+
if (!usingLegacyNativeCometScan) {
6363
checkSparkAnswerAndOperator(sql)
6464
} else {
6565
checkSparkAnswer(sql)
@@ -112,7 +112,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
112112
s"alter table t2 add column col2 $defaultValueType default $defaultValueString")
113113
// Verify that our default value matches Spark's answer
114114
val sql = "select col2 from t2"
115-
if (usingDataSourceExec) {
115+
if (!usingLegacyNativeCometScan) {
116116
checkSparkAnswerAndOperator(sql)
117117
} else {
118118
checkSparkAnswer(sql)
@@ -139,7 +139,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
139139
val sql = s"SELECT $col FROM t1 ORDER BY $col"
140140
// cannot run fully natively due to range partitioning and sort
141141
val (_, cometPlan) = checkSparkAnswer(sql)
142-
if (usingDataSourceExec) {
142+
if (!usingLegacyNativeCometScan) {
143143
assert(1 == collectNativeScans(cometPlan).length)
144144
}
145145
}
@@ -152,7 +152,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
152152
val sql = s"SELECT $allCols FROM t1 ORDER BY $allCols"
153153
// cannot run fully natively due to range partitioning and sort
154154
val (_, cometPlan) = checkSparkAnswer(sql)
155-
if (usingDataSourceExec) {
155+
if (!usingLegacyNativeCometScan) {
156156
assert(1 == collectNativeScans(cometPlan).length)
157157
}
158158
}
@@ -207,7 +207,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
207207
val df = spark.read.parquet(filename)
208208
val df2 = df.repartition(8, df.col("c0")).sort("c1")
209209
df2.collect()
210-
if (usingDataSourceExec) {
210+
if (!usingLegacyNativeCometScan) {
211211
val cometShuffles = collectCometShuffleExchanges(df2.queryExecution.executedPlan)
212212
val expectedNumCometShuffles = CometConf.COMET_NATIVE_SCAN_IMPL.get() match {
213213
case CometConf.SCAN_NATIVE_COMET =>
@@ -233,7 +233,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase {
233233
// cannot run fully native due to HashAggregate
234234
val sql = s"SELECT count(*) FROM t1 JOIN t2 ON t1.$col = t2.$col"
235235
val (_, cometPlan) = checkSparkAnswer(sql)
236-
if (usingDataSourceExec) {
236+
if (!usingLegacyNativeCometScan) {
237237
assert(2 == collectNativeScans(cometPlan).length)
238238
}
239239
}

spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOpti
3131
class CometMapExpressionSuite extends CometTestBase {
3232

3333
test("read map[int, int] from parquet") {
34-
assume(usingDataSourceExec(conf))
34+
assume(!usingLegacyNativeCometScan(conf))
3535

3636
withTempPath { dir =>
3737
// create input file with Comet disabled
@@ -63,7 +63,7 @@ class CometMapExpressionSuite extends CometTestBase {
6363

6464
// repro for https://github.com/apache/datafusion-comet/issues/1754
6565
test("read map[struct, struct] from parquet") {
66-
assume(usingDataSourceExec(conf))
66+
assume(!usingLegacyNativeCometScan(conf))
6767

6868
withTempPath { dir =>
6969
// create input file with Comet disabled

spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
618618
// TODO: revisit this when we have resolution of https://github.com/apache/arrow-rs/issues/7040
619619
// and https://github.com/apache/arrow-rs/issues/7097
620620
val fieldsToTest =
621-
if (usingDataSourceExec(conf)) {
621+
if (!usingLegacyNativeCometScan(conf)) {
622622
Seq(
623623
$"_1",
624624
$"_4",

spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ class CometJoinSuite extends CometTestBase {
199199

200200
test("HashJoin struct key") {
201201
// https://github.com/apache/datafusion-comet/issues/1441
202-
assume(!usingDataSourceExec)
202+
assume(usingLegacyNativeCometScan)
203203
withSQLConf(
204204
"spark.sql.join.forceApplyShuffledHashJoin" -> "true",
205205
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ abstract class ParquetReadSuite extends CometTestBase {
9292
// for native iceberg compat, CometScanExec supports some types that native_comet does not.
9393
// note that native_datafusion does not use CometScanExec so we need not include that in
9494
// the check
95-
val isDataFusionScan = usingDataSourceExec(conf)
95+
val isDataFusionScan = !usingLegacyNativeCometScan(conf)
9696
Seq(
9797
NullType -> false,
9898
BooleanType -> true,
@@ -143,7 +143,7 @@ abstract class ParquetReadSuite extends CometTestBase {
143143

144144
// Arrays support for iceberg compat native and for Parquet V1
145145
val cometScanExecSupported =
146-
if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite])
146+
if (!usingLegacyNativeCometScan(conf) && this.isInstanceOf[ParquetReadV1Suite])
147147
Seq(true, true, true)
148148
else Seq(true, false, false)
149149

@@ -185,7 +185,7 @@ abstract class ParquetReadSuite extends CometTestBase {
185185
i.toDouble,
186186
DateTimeUtils.toJavaDate(i))
187187
}
188-
if (!usingDataSourceExecWithIncompatTypes(conf)) {
188+
if (!hasUnsignedSmallIntSafetyCheck(conf)) {
189189
checkParquetScan(data)
190190
}
191191
checkParquetFile(data)
@@ -207,7 +207,7 @@ abstract class ParquetReadSuite extends CometTestBase {
207207
i.toDouble,
208208
DateTimeUtils.toJavaDate(i))
209209
}
210-
if (!usingDataSourceExecWithIncompatTypes(conf)) {
210+
if (!hasUnsignedSmallIntSafetyCheck(conf)) {
211211
checkParquetScan(data)
212212
}
213213
checkParquetFile(data)
@@ -228,7 +228,7 @@ abstract class ParquetReadSuite extends CometTestBase {
228228
DateTimeUtils.toJavaDate(i))
229229
}
230230
val filter = (row: Row) => row.getBoolean(0)
231-
if (!usingDataSourceExecWithIncompatTypes(conf)) {
231+
if (!hasUnsignedSmallIntSafetyCheck(conf)) {
232232
checkParquetScan(data, filter)
233233
}
234234
checkParquetFile(data, filter)
@@ -1515,7 +1515,7 @@ abstract class ParquetReadSuite extends CometTestBase {
15151515
test("row group skipping doesn't overflow when reading into larger type") {
15161516
// Spark 4.0 no longer fails for widening types SPARK-40876
15171517
// https://github.com/apache/spark/commit/3361f25dc0ff6e5233903c26ee105711b79ba967
1518-
assume(!isSpark40Plus && !usingDataSourceExec(conf))
1518+
assume(!isSpark40Plus && usingLegacyNativeCometScan(conf))
15191519
withTempPath { path =>
15201520
Seq(0).toDF("a").write.parquet(path.toString)
15211521
// Reading integer 'a' as a long isn't supported. Check that an exception is raised instead

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ abstract class CometTestBase
591591
}
592592

593593
def getPrimitiveTypesParquetSchema: String = {
594-
if (usingDataSourceExecWithIncompatTypes(conf)) {
594+
if (hasUnsignedSmallIntSafetyCheck(conf)) {
595595
// Comet complex type reader has different behavior for uint_8, uint_16 types.
596596
// The issue stems from undefined behavior in the parquet spec and is tracked
597597
// here: https://github.com/apache/parquet-java/issues/3142
@@ -1268,14 +1268,13 @@ abstract class CometTestBase
12681268
writer.close()
12691269
}
12701270

1271-
def usingDataSourceExec: Boolean = usingDataSourceExec(SQLConf.get)
1271+
def usingLegacyNativeCometScan: Boolean = usingLegacyNativeCometScan(SQLConf.get)
12721272

1273-
def usingDataSourceExec(conf: SQLConf): Boolean =
1274-
Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT, CometConf.SCAN_NATIVE_DATAFUSION).contains(
1275-
CometConf.COMET_NATIVE_SCAN_IMPL.get(conf))
1273+
def usingLegacyNativeCometScan(conf: SQLConf): Boolean =
1274+
CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_COMET
12761275

1277-
def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = {
1278-
usingDataSourceExec(conf) &&
1276+
def hasUnsignedSmallIntSafetyCheck(conf: SQLConf): Boolean = {
1277+
!usingLegacyNativeCometScan(conf) &&
12791278
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get(conf)
12801279
}
12811280
}

spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase {
5252
// Parquet file written by 2.4.5 should throw exception for both Spark and Comet
5353
// For Spark 4.0+, Parquet file written by 2.4.5 should not throw exception
5454
if ((exceptionOnRebase || sparkVersion == "2_4_5") && (!isSpark40Plus || sparkVersion != "2_4_5") &&
55-
!usingDataSourceExec(conf)) {
55+
usingLegacyNativeCometScan(conf)) {
5656
intercept[SparkException](df.collect())
5757
} else {
5858
checkSparkNoRebaseAnswer(df)
@@ -63,7 +63,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase {
6363
}
6464

6565
test("reading ancient timestamps before 1582") {
66-
assume(!usingDataSourceExec(conf))
66+
assume(usingLegacyNativeCometScan(conf))
6767
Seq(true, false).foreach { exceptionOnRebase =>
6868
withSQLConf(
6969
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET,
@@ -78,7 +78,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase {
7878
// Parquet file written by 2.4.5 should throw exception for both Spark and Comet
7979
// For Spark 4.0+, Parquet file written by 2.4.5 should not throw exception
8080
if ((exceptionOnRebase || sparkVersion == "2_4_5") && (!isSpark40Plus || sparkVersion != "2_4_5")
81-
&& !usingDataSourceExec(conf)) {
81+
&& usingLegacyNativeCometScan(conf)) {
8282
intercept[SparkException](df.collect())
8383
} else {
8484
checkSparkNoRebaseAnswer(df)
@@ -90,7 +90,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase {
9090
}
9191

9292
test("reading ancient int96 timestamps before 1582") {
93-
assume(!usingDataSourceExec(conf))
93+
assume(usingLegacyNativeCometScan(conf))
9494
Seq(true, false).foreach { exceptionOnRebase =>
9595
withSQLConf(
9696
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET,
@@ -105,7 +105,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase {
105105
// Parquet file written by 2.4.5 should throw exception for both Spark and Comet
106106
// For Spark 4.0+, Parquet file written by 2.4.5 should not throw exception
107107
if ((exceptionOnRebase || sparkVersion == "2_4_5") && (!isSpark40Plus || sparkVersion != "2_4_5")
108-
&& !usingDataSourceExec(conf)) {
108+
&& usingLegacyNativeCometScan(conf)) {
109109
intercept[SparkException](df.collect())
110110
} else {
111111
checkSparkNoRebaseAnswer(df)

0 commit comments

Comments
 (0)