From 4fd0c5ddf4183fa7eabacf634cf771200b5a2363 Mon Sep 17 00:00:00 2001 From: senthh Date: Fri, 29 May 2026 14:55:53 +0530 Subject: [PATCH 1/2] [VL] Fallback to Spark Parquet reader for Spark 4.1 struct compatibility --- .../backendsapi/velox/VeloxBackend.scala | 30 +++++++++++++++++++ .../gluten/execution/FallbackSuite.scala | 22 ++++++++++++++ .../utils/velox/VeloxTestSettings.scala | 3 -- 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 599b91851ea..9063811c9c3 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa import org.apache.spark.sql.hive.execution.HiveFileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.util.SparkVersionUtil import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -113,6 +114,30 @@ object VeloxBackendSettings extends BackendSettingsApi { hadoopConf: Configuration, partitionFileFormats: Set[ReadFileFormat]): ValidationResult = { + def containsStructType(dataType: DataType): Boolean = { + dataType match { + case _: StructType => true + case ArrayType(elementType, _) => containsStructType(elementType) + case MapType(keyType, valueType, _) => + containsStructType(keyType) || containsStructType(valueType) + case _ => false + } + } + + def shouldFallbackBySpark41ParquetStructBehavior: Boolean = { + if (!SparkVersionUtil.gteSpark41) { + return false + } + if (!fields.exists(field => containsStructType(field.dataType))) { + return false + } + val returnNullStructIfAllFieldsMissingKey = + "spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" + !SQLConf.get + .getConfString(returnNullStructIfAllFieldsMissingKey, "false") + .toBoolean + } + def validateScheme(): Option[String] = { val filteredRootPaths = distinctRootPaths(rootPaths) if ( @@ -156,6 +181,11 @@ object VeloxBackendSettings extends BackendSettingsApi { if (parquetOptions.mergeSchema) { // https://github.com/apache/gluten/issues/7174 Some(s"not support when merge schema is true") + } else if (shouldFallbackBySpark41ParquetStructBehavior) { + Some( + "Spark 4.1 Parquet struct compatibility (all requested struct fields missing) " + + "is not supported by Velox native scan yet when " + + "spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing=false") } else { None } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala index 53af1664c85..13ebc94d3eb 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala @@ -24,6 +24,8 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuf import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.util.SparkVersionUtil import org.apache.spark.utils.GlutenSuiteUtils class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper { @@ -352,6 +354,26 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl } } + test("fallback Spark 4.1 parquet missing all struct fields compatibility") { + if (!SparkVersionUtil.gteSpark41) { + cancel("Only applicable on Spark 4.1+") + } + withTempPath { + path => + val schema = new StructType().add("s", new StructType().add("b", IntegerType)) + val file = path.getCanonicalPath + spark.range(10).selectExpr("named_struct('a', cast(id as int)) as s").write.parquet(file) + withSQLConf("spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" -> "false") { + spark.read.schema(schema).parquet(file).createOrReplaceTempView("struct_tbl") + runQueryAndCompare("select s is null as is_null from struct_tbl") { + df => + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case g: GlutenPlan => g }.isEmpty) + } + } + } + } + test("get correct fallback reason on nodes without logicalLink") { withSQLConf(GlutenConfig.COLUMNAR_SORT_ENABLED.key -> "false") { GlutenSuiteUtils.withFallbackEventListener(spark.sparkContext) { diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index a3712e62ae1..43c078c6f40 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -548,9 +548,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings") // TODO: fix in Spark-4.0 .exclude("explode nested lists crossing a rowgroup boundary") - // TODO: fix on Spark-4.1 - .excludeByPrefix("SPARK-53535") // see https://issues.apache.org/jira/browse/SPARK-53535 - .excludeByPrefix("vectorized reader: missing all struct fields") .excludeByPrefix("SPARK-54220") // https://issues.apache.org/jira/browse/SPARK-54220 enableSuite[GlutenParquetV1PartitionDiscoverySuite] enableSuite[GlutenParquetV2PartitionDiscoverySuite] From 96ba5c8df1382b9a40bc16adadaaa468d6b56894 Mon Sep 17 00:00:00 2001 From: senthh Date: Fri, 5 Jun 2026 12:59:08 +0530 Subject: [PATCH 2/2] [GLUTEN-11914] Narrow Spark 4.1 Parquet struct fallback to the all-fields-missing case --- .../backendsapi/velox/VeloxBackend.scala | 12 +- .../gluten/utils/ParquetMetadataUtils.scala | 123 +++++++++++++++++- .../gluten/execution/FallbackSuite.scala | 4 +- 3 files changed, 136 insertions(+), 3 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 9063811c9c3..2d2a1e45231 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -133,9 +133,19 @@ object VeloxBackendSettings extends BackendSettingsApi { } val returnNullStructIfAllFieldsMissingKey = "spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" - !SQLConf.get + val legacyReturnNullStruct = SQLConf.get .getConfString(returnNullStructIfAllFieldsMissingKey, "false") .toBoolean + if (legacyReturnNullStruct) { + return false + } + // Only fall back for the genuine SPARK-53535 incompatibility: a struct that exists in the + // file but whose requested fields are all missing. Regular struct reads stay on the native + // scan. + ParquetMetadataUtils.hasStructWithAllRequestedFieldsMissing( + rootPaths, + hadoopConf, + StructType(fields)) } def validateScheme(): Option[String] = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala index 9bf7574efd3..93dc9f68692 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala @@ -21,7 +21,9 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.DataSourceUtils -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReaderShim, ParquetOptions} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReaderShim, ParquetOptions, ParquetToSparkSchemaConverter} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path} @@ -29,6 +31,8 @@ import org.apache.parquet.crypto.ParquetCryptoRuntimeException import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.metadata.ParquetMetadata +import java.util.Locale + object ParquetMetadataUtils extends Logging { /** @@ -223,4 +227,121 @@ object ParquetMetadataUtils extends Logging { None } + private val PARQUET_FIELD_ID_METADATA_KEY = "parquet.field.id" + + /** + * Detects the SPARK-53535 incompatibility: a struct that exists in the Parquet file but whose + * requested fields are all absent. Spark 4.1 reads an extra present field to determine such a + * struct's nullness while the Velox native scan still returns a NULL struct, so the scan must + * fall back to the vanilla Spark reader. Regular struct reads are unaffected. + */ + def hasStructWithAllRequestedFieldsMissing( + rootPaths: Seq[String], + hadoopConf: Configuration, + requestedSchema: StructType): Boolean = { + // Field-id matching (e.g. Delta column mapping) addresses columns by id, not name. + if (usesParquetFieldId(requestedSchema)) { + return false + } + val footer = readFirstParquetFooter(rootPaths, hadoopConf) + if (footer.isEmpty) { + return false + } + val fileSchema = + try { + new ParquetToSparkSchemaConverter(SQLConf.get) + .convert(footer.get.getFileMetaData.getSchema) + } catch { + case e: Exception => + logWarning("Failed to convert parquet file schema for struct field check", e) + return false + } + val caseSensitive = SQLConf.get.caseSensitiveAnalysis + matchedStructHasAllFieldsMissing(requestedSchema, fileSchema, caseSensitive, topLevel = true) + } + + private def usesParquetFieldId(dataType: DataType): Boolean = dataType match { + case s: StructType => + s.fields.exists( + f => f.metadata.contains(PARQUET_FIELD_ID_METADATA_KEY) || usesParquetFieldId(f.dataType)) + case ArrayType(elementType, _) => usesParquetFieldId(elementType) + case MapType(keyType, valueType, _) => + usesParquetFieldId(keyType) || usesParquetFieldId(valueType) + case _ => false + } + + private def matchedStructHasAllFieldsMissing( + requested: StructType, + file: StructType, + caseSensitive: Boolean, + topLevel: Boolean): Boolean = { + val fileByName: Map[String, StructField] = + if (caseSensitive) { + file.fields.map(f => f.name -> f).toMap + } else { + file.fields.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap + } + def lookup(name: String): Option[StructField] = + if (caseSensitive) fileByName.get(name) else fileByName.get(name.toLowerCase(Locale.ROOT)) + + val matched = requested.fields.flatMap(rf => lookup(rf.name).map(ff => (rf, ff))) + + // A nested struct present in the file with none of its requested fields is the SPARK-53535 case + // (skipped at top level, where a fully absent column is a normal missing column). + if (!topLevel && requested.fields.nonEmpty && matched.isEmpty) { + return true + } + + matched.exists { + case (rf, ff) => containsStructWithAllFieldsMissing(rf.dataType, ff.dataType, caseSensitive) + } + } + + private def containsStructWithAllFieldsMissing( + requested: DataType, + file: DataType, + caseSensitive: Boolean): Boolean = (requested, file) match { + case (r: StructType, f: StructType) => + matchedStructHasAllFieldsMissing(r, f, caseSensitive, topLevel = false) + case (r: ArrayType, f: ArrayType) => + containsStructWithAllFieldsMissing(r.elementType, f.elementType, caseSensitive) + case (r: MapType, f: MapType) => + containsStructWithAllFieldsMissing(r.keyType, f.keyType, caseSensitive) || + containsStructWithAllFieldsMissing(r.valueType, f.valueType, caseSensitive) + case _ => false + } + + private def readFirstParquetFooter( + rootPaths: Seq[String], + hadoopConf: Configuration): Option[ParquetMetadata] = { + val pathsIt = rootPaths.iterator + while (pathsIt.hasNext) { + val rootPath = pathsIt.next() + try { + val path = new Path(rootPath) + val fs = path.getFileSystem(hadoopConf) + if (fs.exists(path)) { + val filesIt = fs.listFiles(path, true) + while (filesIt.hasNext) { + val fileStatus = filesIt.next() + val name = fileStatus.getPath.getName + if (fileStatus.getLen > 0 && !name.startsWith(".") && !name.startsWith("_")) { + try { + return Some( + ParquetFooterReaderShim + .readFooter(hadoopConf, fileStatus, ParquetMetadataConverter.NO_FILTER)) + } catch { + case _: RuntimeException => // Not a parquet file, keep looking. + } + } + } + } + } catch { + case e: Exception => + logWarning(s"Failed to read parquet footer under $rootPath for struct field check", e) + } + } + None + } + } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala index 13ebc94d3eb..ccbbf415e95 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala @@ -368,7 +368,9 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl runQueryAndCompare("select s is null as is_null from struct_tbl") { df => val plan = df.queryExecution.executedPlan - assert(collect(plan) { case g: GlutenPlan => g }.isEmpty) + assert( + collect(plan) { case s: FileSourceScanExecTransformer => s }.isEmpty, + "Parquet scan should fall back to the vanilla Spark reader") } } }