Skip to content

Commit e7eae66

Browse files
andygroveclaude
andauthored
fix: unignore input_file_name Spark SQL tests for native_datafusion (#3458)
* fix: unignore input_file_name Spark SQL tests for native_datafusion The native_datafusion scan now correctly falls back to Spark's FileSourceScanExec when metadata columns (like input_file_name) are present, so the 3 input_file_name tests no longer need to be ignored. For ExtractPythonUDFsSuite, the issue was that the test's collect pattern didn't match CometNativeScanExec. Fixed by adding CometNativeScanExec to the collect and dataFilters match blocks. Closes #3312 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: restore IgnoreComet.scala in 3.5.8 Spark SQL test diff The previous commit accidentally removed the IgnoreComet.scala file creation from the diff, causing 94 compilation errors when applied to Spark 3.5.8. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: fall back scan when plan uses input_file_name expressions CometScanExec does not populate InputFileBlockHolder (the thread-local that Spark's FileScanRDD sets), so input_file_name(), input_file_block_start(), and input_file_block_length() return empty or default values when Comet replaces the scan. Detect these expressions in the plan and fall back to Spark's FileSourceScanExec. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a6741e8 commit e7eae66

2 files changed

Lines changed: 31 additions & 74 deletions

File tree

dev/diffs/3.5.8.diff

Lines changed: 9 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -238,20 +238,6 @@ index e5494726695..00937f025c2 100644
238238
}
239239

240240
test("A cached table preserves the partitioning and ordering of its cached SparkPlan") {
241-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
242-
index 9e8d77c53f3..855e3ada7d1 100644
243-
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
244-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
245-
@@ -790,7 +790,8 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession {
246-
}
247-
}
248-
249-
- test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") {
250-
+ test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD",
251-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) {
252-
withTempPath { dir =>
253-
val data = sparkContext.parallelize(0 to 10).toDF("id")
254-
data.write.parquet(dir.getCanonicalPath)
255241
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
256242
index 6f3090d8908..c08a60fb0c2 100644
257243
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -1084,20 +1070,6 @@ index 04702201f82..5ee11f83ecf 100644
10841070
}
10851071
assert(exchanges.size === 1)
10861072
}
1087-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
1088-
index 9f8e979e3fb..3bc9dab8023 100644
1089-
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
1090-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
1091-
@@ -87,7 +87,8 @@ class UDFSuite extends QueryTest with SharedSparkSession {
1092-
spark.catalog.dropTempView("tmp_table")
1093-
}
1094-
1095-
- test("SPARK-8005 input_file_name") {
1096-
+ test("SPARK-8005 input_file_name",
1097-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) {
1098-
withTempPath { dir =>
1099-
val data = sparkContext.parallelize(0 to 10, 2).toDF("id")
1100-
data.write.parquet(dir.getCanonicalPath)
11011073
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
11021074
index d269290e616..13726a31e07 100644
11031075
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -2504,42 +2476,32 @@ index 5cdbdc27b32..307fba16578 100644
25042476
spark.range(10).selectExpr("id", "id % 3 as p")
25052477
.write.partitionBy("p").saveAsTable("testDataForScan")
25062478
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
2507-
index 0ab8691801d..7b81f3a8f6d 100644
2479+
index 0ab8691801d..b18a5bea944 100644
25082480
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
25092481
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
2510-
@@ -17,7 +17,9 @@
2511-
2482+
@@ -18,6 +18,7 @@
25122483
package org.apache.spark.sql.execution.python
25132484

2514-
+import org.apache.spark.sql.IgnoreCometNativeDataFusion
25152485
import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit}
25162486
+import org.apache.spark.sql.comet._
25172487
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest}
25182488
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
25192489
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
2520-
@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
2521-
assert(arrowEvalNodes.size == 2)
2522-
}
2523-
2524-
- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") {
2525-
+ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1",
2526-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) {
2527-
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
2528-
withTempPath { f =>
2529-
spark.range(10).select($"id".as("a"), $"id".as("b"))
2530-
@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
2490+
@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
25312491

25322492
val scanNodes = query.queryExecution.executedPlan.collect {
25332493
case scan: FileSourceScanExec => scan
25342494
+ case scan: CometScanExec => scan
2495+
+ case scan: CometNativeScanExec => scan
25352496
}
25362497
assert(scanNodes.length == 1)
25372498
assert(scanNodes.head.output.map(_.name) == Seq("a"))
2538-
@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
2499+
@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
25392500

25402501
val scanNodes = query.queryExecution.executedPlan.collect {
25412502
case scan: FileSourceScanExec => scan
25422503
+ case scan: CometScanExec => scan
2504+
+ case scan: CometNativeScanExec => scan
25432505
}
25442506
assert(scanNodes.length == 1)
25452507
// $"a" is not null and $"a" > 1
@@ -2548,21 +2510,22 @@ index 0ab8691801d..7b81f3a8f6d 100644
25482510
+ val dataFilters = scanNodes.head match {
25492511
+ case scan: FileSourceScanExec => scan.dataFilters
25502512
+ case scan: CometScanExec => scan.dataFilters
2513+
+ case scan: CometNativeScanExec => scan.dataFilters
25512514
+ }
25522515
+ assert(dataFilters.length == 2)
25532516
+ assert(dataFilters.flatMap(_.references.map(_.name)).distinct == Seq("a"))
25542517
}
25552518
}
25562519
}
2557-
@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
2520+
@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
25582521

25592522
val scanNodes = query.queryExecution.executedPlan.collect {
25602523
case scan: BatchScanExec => scan
25612524
+ case scan: CometBatchScanExec => scan
25622525
}
25632526
assert(scanNodes.length == 1)
25642527
assert(scanNodes.head.output.map(_.name) == Seq("a"))
2565-
@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
2528+
@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
25662529

25672530
val scanNodes = query.queryExecution.executedPlan.collect {
25682531
case scan: BatchScanExec => scan
@@ -3243,29 +3206,6 @@ index de3b1ffccf0..2a76d127093 100644
32433206

32443207
override def beforeEach(): Unit = {
32453208
super.beforeEach()
3246-
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
3247-
index f3be79f9022..b4b1ea8dbc4 100644
3248-
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
3249-
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
3250-
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
3251-
import org.apache.hadoop.io.{LongWritable, Writable}
3252-
3253-
import org.apache.spark.{SparkException, SparkFiles, TestUtils}
3254-
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
3255-
+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest, Row}
3256-
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
3257-
import org.apache.spark.sql.catalyst.plans.logical.Project
3258-
import org.apache.spark.sql.execution.WholeStageCodegenExec
3259-
@@ -448,7 +448,8 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
3260-
}
3261-
}
3262-
3263-
- test("SPARK-11522 select input_file_name from non-parquet table") {
3264-
+ test("SPARK-11522 select input_file_name from non-parquet table",
3265-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) {
3266-
3267-
withTempDir { tempDir =>
3268-
32693209
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
32703210
index 6160c3e5f6c..0956d7d9edc 100644
32713211
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import scala.jdk.CollectionConverters._
2828
import org.apache.hadoop.conf.Configuration
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.sql.SparkSession
31-
import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, PlanExpression}
31+
import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, InputFileBlockLength, InputFileBlockStart, InputFileName, PlanExpression}
3232
import org.apache.spark.sql.catalyst.rules.Rule
3333
import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, GenericArrayData, MetadataColumnHelper}
3434
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
@@ -110,7 +110,9 @@ case class CometScanRule(session: SparkSession)
110110
metadataTableSuffix.exists(suffix => scanExec.table.name().endsWith(suffix))
111111
}
112112

113-
def transformScan(plan: SparkPlan): SparkPlan = plan match {
113+
val fullPlan = plan
114+
115+
def transformScan(scanNode: SparkPlan): SparkPlan = scanNode match {
114116
case scan if !CometConf.COMET_NATIVE_SCAN_ENABLED.get(conf) =>
115117
withInfo(scan, "Comet Scan is not enabled")
116118

@@ -119,7 +121,7 @@ case class CometScanRule(session: SparkSession)
119121

120122
// data source V1
121123
case scanExec: FileSourceScanExec =>
122-
transformV1Scan(scanExec)
124+
transformV1Scan(fullPlan, scanExec)
123125

124126
// data source V2
125127
case scanExec: BatchScanExec =>
@@ -135,7 +137,7 @@ case class CometScanRule(session: SparkSession)
135137
}
136138
}
137139

138-
private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
140+
private def transformV1Scan(plan: SparkPlan, scanExec: FileSourceScanExec): SparkPlan = {
139141

140142
if (COMET_DPP_FALLBACK_ENABLED.get() &&
141143
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
@@ -170,7 +172,7 @@ case class CometScanRule(session: SparkSession)
170172
nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
171173
.getOrElse(scanExec)
172174
case SCAN_NATIVE_DATAFUSION =>
173-
nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
175+
nativeDataFusionScan(plan, session, scanExec, r, hadoopConf).getOrElse(scanExec)
174176
case SCAN_NATIVE_ICEBERG_COMPAT =>
175177
nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
176178
}
@@ -181,6 +183,7 @@ case class CometScanRule(session: SparkSession)
181183
}
182184

183185
private def nativeDataFusionScan(
186+
plan: SparkPlan,
184187
session: SparkSession,
185188
scanExec: FileSourceScanExec,
186189
r: HadoopFsRelation,
@@ -196,6 +199,20 @@ case class CometScanRule(session: SparkSession)
196199
withInfo(scanExec, "Native DataFusion scan does not support metadata columns")
197200
return None
198201
}
202+
// input_file_name, input_file_block_start, and input_file_block_length read from
203+
// InputFileBlockHolder, a thread-local set by Spark's FileScanRDD. The native DataFusion
204+
// scan does not use FileScanRDD, so these expressions would return empty/default values.
205+
if (plan.exists(node =>
206+
node.expressions.exists(_.exists {
207+
case _: InputFileName | _: InputFileBlockStart | _: InputFileBlockLength => true
208+
case _ => false
209+
}))) {
210+
withInfo(
211+
scanExec,
212+
"Native DataFusion scan is not compatible with input_file_name, " +
213+
"input_file_block_start, or input_file_block_length")
214+
return None
215+
}
199216
if (ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) {
200217
withInfo(scanExec, "Native DataFusion scan does not support row index generation")
201218
return None

0 commit comments

Comments
 (0)