Skip to content

Commit 58cf6e1

Browse files
authored
fix: fall back to Spark when Parquet field ID matching is enabled in native_datafusion (#3415)
1 parent 2c6a8ac commit 58cf6e1

2 files changed

Lines changed: 6 additions & 73 deletions

File tree

dev/diffs/3.5.8.diff

Lines changed: 0 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -2065,79 +2065,6 @@ index 07e2849ce6f..3e73645b638 100644
20652065
val extraOptions = Map[String, String](
20662066
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
20672067
)
2068-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
2069-
index 5e01d3f447c..284d6657d4f 100644
2070-
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
2071-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
2072-
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
2073-
import scala.collection.JavaConverters._
2074-
2075-
import org.apache.spark.SparkException
2076-
-import org.apache.spark.sql.{QueryTest, Row}
2077-
+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest, Row}
2078-
import org.apache.spark.sql.internal.SQLConf
2079-
import org.apache.spark.sql.test.SharedSparkSession
2080-
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, Metadata, MetadataBuilder, StringType, StructType}
2081-
@@ -30,7 +30,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS
2082-
private def withId(id: Int): Metadata =
2083-
new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
2084-
2085-
- test("Parquet reads infer fields using field ids correctly") {
2086-
+ test("Parquet reads infer fields using field ids correctly",
2087-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) {
2088-
withTempDir { dir =>
2089-
val readSchema =
2090-
new StructType()
2091-
@@ -78,7 +79,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS
2092-
}
2093-
}
2094-
2095-
- test("absence of field ids") {
2096-
+ test("absence of field ids",
2097-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) {
2098-
withTempDir { dir =>
2099-
val readSchema =
2100-
new StructType()
2101-
@@ -107,7 +109,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS
2102-
}
2103-
}
2104-
2105-
- test("SPARK-38094: absence of field ids: reading nested schema") {
2106-
+ test("SPARK-38094: absence of field ids: reading nested schema",
2107-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) {
2108-
withTempDir { dir =>
2109-
// now with nested schema/complex type
2110-
val readSchema =
2111-
@@ -136,7 +139,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS
2112-
}
2113-
}
2114-
2115-
- test("multiple id matches") {
2116-
+ test("multiple id matches",
2117-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) {
2118-
withTempDir { dir =>
2119-
val readSchema =
2120-
new StructType()
2121-
@@ -163,7 +167,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS
2122-
}
2123-
}
2124-
2125-
- test("read parquet file without ids") {
2126-
+ test("read parquet file without ids",
2127-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) {
2128-
withTempDir { dir =>
2129-
val readSchema =
2130-
new StructType()
2131-
@@ -196,7 +201,8 @@ class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkS
2132-
}
2133-
}
2134-
2135-
- test("global read/write flag should work correctly") {
2136-
+ test("global read/write flag should work correctly",
2137-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3316")) {
2138-
withTempDir { dir =>
2139-
val readSchema =
2140-
new StructType()
21412068
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
21422069
index 8e88049f51e..49f2001dc6b 100644
21432070
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefa
3535
import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
3636
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
3737
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
38+
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
3839
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
3940
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
4041
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
@@ -201,6 +202,11 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
201202
withInfo(scanExec, "Native DataFusion scan does not support row index generation")
202203
return None
203204
}
205+
if (session.sessionState.conf.getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED) &&
206+
ParquetUtils.hasFieldIds(scanExec.requiredSchema)) {
207+
withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching")
208+
return None
209+
}
204210
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
205211
return None
206212
}

0 commit comments

Comments
 (0)