Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.SparkVersionUtil

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -113,6 +114,40 @@ object VeloxBackendSettings extends BackendSettingsApi {
hadoopConf: Configuration,
partitionFileFormats: Set[ReadFileFormat]): ValidationResult = {

def containsStructType(dataType: DataType): Boolean = {
dataType match {
case _: StructType => true
case ArrayType(elementType, _) => containsStructType(elementType)
case MapType(keyType, valueType, _) =>
containsStructType(keyType) || containsStructType(valueType)
case _ => false
}
}

def shouldFallbackBySpark41ParquetStructBehavior: Boolean = {
if (!SparkVersionUtil.gteSpark41) {
return false
}
if (!fields.exists(field => containsStructType(field.dataType))) {
return false
}
val returnNullStructIfAllFieldsMissingKey =
"spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing"
val legacyReturnNullStruct = SQLConf.get
.getConfString(returnNullStructIfAllFieldsMissingKey, "false")
.toBoolean
if (legacyReturnNullStruct) {
return false
}
// Only fall back for the genuine SPARK-53535 incompatibility: a struct that exists in the
// file but whose requested fields are all missing. Regular struct reads stay on the native
// scan.
ParquetMetadataUtils.hasStructWithAllRequestedFieldsMissing(
rootPaths,
hadoopConf,
StructType(fields))
}

def validateScheme(): Option[String] = {
val filteredRootPaths = distinctRootPaths(rootPaths)
if (
Expand Down Expand Up @@ -156,6 +191,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
if (parquetOptions.mergeSchema) {
// https://github.com/apache/gluten/issues/7174
Some(s"not support when merge schema is true")
} else if (shouldFallbackBySpark41ParquetStructBehavior) {
Some(
"Spark 4.1 Parquet struct compatibility (all requested struct fields missing) " +
"is not supported by Velox native scan yet when " +
"spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing=false")
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReaderShim, ParquetOptions}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReaderShim, ParquetOptions, ParquetToSparkSchemaConverter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
import org.apache.parquet.crypto.ParquetCryptoRuntimeException
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.metadata.ParquetMetadata

import java.util.Locale

object ParquetMetadataUtils extends Logging {

/**
Expand Down Expand Up @@ -223,4 +227,121 @@ object ParquetMetadataUtils extends Logging {
None
}

private val PARQUET_FIELD_ID_METADATA_KEY = "parquet.field.id"

/**
* Detects the SPARK-53535 incompatibility: a struct that exists in the Parquet file but whose
* requested fields are all absent. Spark 4.1 reads an extra present field to determine such a
* struct's nullness while the Velox native scan still returns a NULL struct, so the scan must
* fall back to the vanilla Spark reader. Regular struct reads are unaffected.
*/
def hasStructWithAllRequestedFieldsMissing(
rootPaths: Seq[String],
hadoopConf: Configuration,
requestedSchema: StructType): Boolean = {
// Field-id matching (e.g. Delta column mapping) addresses columns by id, not name.
if (usesParquetFieldId(requestedSchema)) {
return false
}
val footer = readFirstParquetFooter(rootPaths, hadoopConf)
if (footer.isEmpty) {
return false
}
val fileSchema =
try {
new ParquetToSparkSchemaConverter(SQLConf.get)
.convert(footer.get.getFileMetaData.getSchema)
} catch {
case e: Exception =>
logWarning("Failed to convert parquet file schema for struct field check", e)
return false
}
val caseSensitive = SQLConf.get.caseSensitiveAnalysis
matchedStructHasAllFieldsMissing(requestedSchema, fileSchema, caseSensitive, topLevel = true)
}

private def usesParquetFieldId(dataType: DataType): Boolean = dataType match {
case s: StructType =>
s.fields.exists(
f => f.metadata.contains(PARQUET_FIELD_ID_METADATA_KEY) || usesParquetFieldId(f.dataType))
case ArrayType(elementType, _) => usesParquetFieldId(elementType)
case MapType(keyType, valueType, _) =>
usesParquetFieldId(keyType) || usesParquetFieldId(valueType)
case _ => false
}

private def matchedStructHasAllFieldsMissing(
requested: StructType,
file: StructType,
caseSensitive: Boolean,
topLevel: Boolean): Boolean = {
val fileByName: Map[String, StructField] =
if (caseSensitive) {
file.fields.map(f => f.name -> f).toMap
} else {
file.fields.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap
}
def lookup(name: String): Option[StructField] =
if (caseSensitive) fileByName.get(name) else fileByName.get(name.toLowerCase(Locale.ROOT))

val matched = requested.fields.flatMap(rf => lookup(rf.name).map(ff => (rf, ff)))

// A nested struct present in the file with none of its requested fields is the SPARK-53535 case
// (skipped at top level, where a fully absent column is a normal missing column).
if (!topLevel && requested.fields.nonEmpty && matched.isEmpty) {
return true
}

matched.exists {
case (rf, ff) => containsStructWithAllFieldsMissing(rf.dataType, ff.dataType, caseSensitive)
}
}

private def containsStructWithAllFieldsMissing(
requested: DataType,
file: DataType,
caseSensitive: Boolean): Boolean = (requested, file) match {
case (r: StructType, f: StructType) =>
matchedStructHasAllFieldsMissing(r, f, caseSensitive, topLevel = false)
case (r: ArrayType, f: ArrayType) =>
containsStructWithAllFieldsMissing(r.elementType, f.elementType, caseSensitive)
case (r: MapType, f: MapType) =>
containsStructWithAllFieldsMissing(r.keyType, f.keyType, caseSensitive) ||
containsStructWithAllFieldsMissing(r.valueType, f.valueType, caseSensitive)
case _ => false
}

private def readFirstParquetFooter(
rootPaths: Seq[String],
hadoopConf: Configuration): Option[ParquetMetadata] = {
val pathsIt = rootPaths.iterator
while (pathsIt.hasNext) {
val rootPath = pathsIt.next()
try {
val path = new Path(rootPath)
val fs = path.getFileSystem(hadoopConf)
if (fs.exists(path)) {
val filesIt = fs.listFiles(path, true)
while (filesIt.hasNext) {
val fileStatus = filesIt.next()
val name = fileStatus.getPath.getName
if (fileStatus.getLen > 0 && !name.startsWith(".") && !name.startsWith("_")) {
try {
return Some(
ParquetFooterReaderShim
.readFooter(hadoopConf, fileStatus, ParquetMetadataConverter.NO_FILTER))
} catch {
case _: RuntimeException => // Not a parquet file, keep looking.
}
}
}
}
} catch {
case e: Exception =>
logWarning(s"Failed to read parquet footer under $rootPath for struct field check", e)
}
}
None
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuf
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.util.SparkVersionUtil
import org.apache.spark.utils.GlutenSuiteUtils

class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper {
Expand Down Expand Up @@ -352,6 +354,28 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl
}
}

test("fallback Spark 4.1 parquet missing all struct fields compatibility") {
if (!SparkVersionUtil.gteSpark41) {
cancel("Only applicable on Spark 4.1+")
}
withTempPath {
path =>
val schema = new StructType().add("s", new StructType().add("b", IntegerType))
val file = path.getCanonicalPath
spark.range(10).selectExpr("named_struct('a', cast(id as int)) as s").write.parquet(file)
withSQLConf("spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" -> "false") {
spark.read.schema(schema).parquet(file).createOrReplaceTempView("struct_tbl")
runQueryAndCompare("select s is null as is_null from struct_tbl") {
df =>
val plan = df.queryExecution.executedPlan
assert(
collect(plan) { case s: FileSourceScanExecTransformer => s }.isEmpty,
"Parquet scan should fall back to the vanilla Spark reader")
}
}
}
}

test("get correct fallback reason on nodes without logicalLink") {
withSQLConf(GlutenConfig.COLUMNAR_SORT_ENABLED.key -> "false") {
GlutenSuiteUtils.withFallbackEventListener(spark.sparkContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings")
// TODO: fix in Spark-4.0
.exclude("explode nested lists crossing a rowgroup boundary")
// TODO: fix on Spark-4.1
.excludeByPrefix("SPARK-53535") // see https://issues.apache.org/jira/browse/SPARK-53535
.excludeByPrefix("vectorized reader: missing all struct fields")
.excludeByPrefix("SPARK-54220") // https://issues.apache.org/jira/browse/SPARK-54220
enableSuite[GlutenParquetV1PartitionDiscoverySuite]
enableSuite[GlutenParquetV2PartitionDiscoverySuite]
Expand Down
Loading