Skip to content

Commit fdf00d4

Browse files
authored
fix: fall back for shredded Variant scans on Spark 4.0 (#4084)
1 parent 023d912 commit fdf00d4

4 files changed

Lines changed: 20 additions & 41 deletions

File tree

common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@ package org.apache.comet.shims
2121

2222
import scala.annotation.nowarn
2323

24-
import org.apache.spark.sql.types.DataType
24+
import org.apache.spark.sql.types.{DataType, StructType}
2525

2626
trait CometTypeShim {
2727
@nowarn // Spark 4 feature; stubbed to false in Spark 3.x for compatibility.
2828
def isStringCollationType(dt: DataType): Boolean = false
29+
30+
@nowarn // Spark 4 feature; Variant shredding doesn't exist in Spark 3.x.
31+
def isVariantStruct(s: StructType): Boolean = false
2932
}

common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
package org.apache.comet.shims
2121

22-
import org.apache.spark.sql.types.{DataType, StringType}
22+
import org.apache.spark.sql.execution.datasources.VariantMetadata
23+
import org.apache.spark.sql.types.{DataType, StringType, StructType}
2324

2425
trait CometTypeShim {
2526
// A `StringType` carries collation metadata in Spark 4.0. Only non-default (non-UTF8_BINARY)
@@ -31,4 +32,11 @@ trait CometTypeShim {
3132
case st: StringType => st.collationId != StringType.collationId
3233
case _ => false
3334
}
35+
36+
// Spark 4.0's `PushVariantIntoScan` rewrites `VariantType` columns into a `StructType` whose
37+
// fields each carry `__VARIANT_METADATA_KEY` metadata, then pushes `variant_get` paths down as
38+
// ordinary struct field accesses. Comet's native scans don't understand the on-disk Parquet
39+
// variant shredding layout, so reading such a struct natively returns nulls. Detect the marker
40+
// and force scan fallback.
41+
def isVariantStruct(s: StructType): Boolean = VariantMetadata.isVariantStruct(s)
3442
}

dev/diffs/4.0.1.diff

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,21 +1333,6 @@ index 2e33f6505ab..54f5081e10a 100644
13331333
}
13341334

13351335
withTable("t1", "t2") {
1336-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
1337-
index fee375db10a..8c2c24e2c5f 100644
1338-
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
1339-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
1340-
@@ -33,7 +33,9 @@ import org.apache.spark.sql.types._
1341-
import org.apache.spark.types.variant._
1342-
import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
1343-
1344-
-class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest {
1345-
+class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest
1346-
+ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed
1347-
+ with IgnoreCometSuite {
1348-
def parseJson(s: String): VariantVal = {
1349-
val v = VariantBuilder.parseJson(s, false)
1350-
new VariantVal(v.getValue, v.getMetadata)
13511336
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
13521337
index 11e9547dfc5..637411056ae 100644
13531338
--- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
@@ -3130,30 +3115,6 @@ index 09ed6955a51..5cd856ff7b6 100644
31303115
) {
31313116
checkAllParquetReaders(
31323117
values = Seq("1.23", "10.34"),
3133-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
3134-
index 458b5dfc0f4..d209f3c85bc 100644
3135-
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
3136-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
3137-
@@ -26,7 +26,7 @@ import org.apache.parquet.hadoop.util.HadoopInputFile
3138-
import org.apache.parquet.schema.{LogicalTypeAnnotation, PrimitiveType}
3139-
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
3140-
3141-
-import org.apache.spark.sql.{QueryTest, Row}
3142-
+import org.apache.spark.sql.{IgnoreCometSuite, QueryTest, Row}
3143-
import org.apache.spark.sql.internal.SQLConf
3144-
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
3145-
import org.apache.spark.sql.test.SharedSparkSession
3146-
@@ -35,7 +35,9 @@ import org.apache.spark.unsafe.types.VariantVal
3147-
/**
3148-
* Test shredding Variant values in the Parquet reader/writer.
3149-
*/
3150-
-class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with SharedSparkSession {
3151-
+class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with SharedSparkSession
3152-
+ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed
3153-
+ with IgnoreCometSuite {
3154-
3155-
private def testWithTempDir(name: String)(block: File => Unit): Unit = test(name) {
3156-
withTempDir { dir =>
31573118
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
31583119
index b8f3ea3c6f3..bbd44221288 100644
31593120
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,13 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C
713713
// we don't need specific support for collation in scans, but this
714714
// is a convenient place to force the whole query to fall back to Spark for now
715715
false
716+
case s: StructType if isVariantStruct(s) =>
717+
// Spark 4.0's PushVariantIntoScan rewrites a VariantType column into a struct of typed
718+
// fields plus per-field VariantMetadata, expecting the scan to honor Parquet variant
719+
// shredding semantics. Comet's native scans don't, so fall back to Spark.
720+
fallbackReasons +=
721+
s"$scanImpl scan does not support shredded Variant reads (column $name)"
722+
false
716723
case s: StructType if s.fields.isEmpty =>
717724
false
718725
case _ =>

0 commit comments

Comments
 (0)