Skip to content
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.backendsapi.velox

import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, StoredBitmap}
import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore
import org.apache.spark.sql.execution.datasources.PartitionedFile

import org.apache.hadoop.fs.Path

import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}

import scala.collection.JavaConverters._
import scala.util.Try
import scala.util.control.NonFatal

object VeloxDeltaMetadataUtils {
val DeltaDvCardinality = "delta_dv_cardinality"
val DeltaDvPayloadIndex = "delta_dv_payload_index"

private val RowIndexFilterIdEncoded = "row_index_filter_id_encoded"
private val RowIndexFilterType = "row_index_filter_type"
private val RowIndexFilterTypeIfContained = "IF_CONTAINED"

final class NormalizedSplitMetadata(
val otherMetadataColumns: JList[JMap[String, Object]],
val deletionVectorPayloads: Array[Array[Byte]])
extends Serializable

private def decodeDescriptor(
normalizedMetadata: JMap[String, Object]): Option[DeletionVectorDescriptor] = {
Option(normalizedMetadata.get(RowIndexFilterIdEncoded))
.map(_.toString)
.filter(_.nonEmpty)
.flatMap(encoded => Try(DeletionVectorDescriptor.deserializeFromBase64(encoded)).toOption)
}

private def serializePayload(
dvStore: HadoopFileSystemDVStore,
tablePath: Path,
descriptor: DeletionVectorDescriptor): Array[Byte] = {
if (tablePath == null) {
throw new IllegalStateException(
"Unable to resolve Delta table path while materializing deletion vector payload")
}
StoredBitmap
.create(descriptor, tablePath)
.load(dvStore)
.serializeAsByteArray(RoaringBitmapArrayFormat.Portable)
}

private def normalizeMetadataWithDescriptor(
metadata: JMap[String, Object],
descriptor: DeletionVectorDescriptor): JMap[String, Object] = {
val normalized = new JHashMap[String, Object]()
if (metadata != null) {
normalized.putAll(metadata)
}
normalized.put(DeltaDvCardinality, Long.box(descriptor.cardinality))
normalized.remove(RowIndexFilterIdEncoded)
if (!normalized.containsKey(RowIndexFilterType)) {
normalized.put(RowIndexFilterType, RowIndexFilterTypeIfContained)
}
normalized
}

def normalizeSplitMetadata(
partitionColumnCount: Int,
files: JList[PartitionedFile]): NormalizedSplitMetadata = {
val dvStore = new HadoopFileSystemDVStore(activeSpark.sessionState.newHadoopConf())
val normalizedMetadataColumns = new JArrayList[JMap[String, Object]](files.size())
val deletionVectorPayloads = scala.collection.mutable.ArrayBuffer.empty[Array[Byte]]

files.asScala.foreach {
file =>
val otherMetadata =
SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file)
val metadataWithDecodedPayload = new JHashMap[String, Object]()
if (otherMetadata != null) {
metadataWithDecodedPayload.putAll(otherMetadata)
}

val descriptor = decodeDescriptor(metadataWithDecodedPayload)

descriptor match {
case Some(descriptor) =>
val normalized = normalizeMetadataWithDescriptor(metadataWithDecodedPayload, descriptor)
val payloadTablePath = resolveTablePath(partitionColumnCount, file)
val serializedPayload = serializePayload(dvStore, payloadTablePath, descriptor)
normalized.put(DeltaDvPayloadIndex, Int.box(deletionVectorPayloads.length))
deletionVectorPayloads += serializedPayload
normalizedMetadataColumns.add(normalized)
case None =>
normalizedMetadataColumns.add(metadataWithDecodedPayload)
}
}

new NormalizedSplitMetadata(normalizedMetadataColumns, deletionVectorPayloads.toArray)
}

private def activeSpark: SparkSession = {
SparkSession.getActiveSession
.orElse(SparkSession.getDefaultSession)
.getOrElse {
throw new IllegalStateException(
"Active SparkSession is required to materialize Delta deletion vectors")
}
}

private def resolveTablePath(partitionColumnCount: Int, file: PartitionedFile): Path = {
val fileParent = new Path(unescapePathName(file.filePath.toString)).getParent
var tablePath = fileParent
for (_ <- 0 until partitionColumnCount) {
tablePath = tablePath.getParent
}
val spark = activeSpark
if (tablePath != null && isDeltaTablePath(spark, tablePath)) {
return tablePath
}

// Spark can report a partition column count that does not map 1:1 to path depth for
// prepared Delta scans. Find the nearest ancestor of the file path that has _delta_log.
var candidate = fileParent
while (candidate != null && !isDeltaTablePath(spark, candidate)) {
candidate = candidate.getParent
}
if (candidate != null) candidate else tablePath
}

private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean = {
val deltaLogPath = new Path(tablePath, "_delta_log")
try {
deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath)
} catch {
case NonFatal(_) => false
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.delta.DeltaParquetFileFormat._
import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable
import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types.StructType

/**
* Rewrites Delta scans over DV-enabled tables to request the backend-specific skip-row metadata
* column only when the snapshot actually contains DVs.
*/
trait PreprocessTableWithDVs extends SubqueryTransformerHelper {
def preprocessTablesWithDVs(plan: LogicalPlan): LogicalPlan = {
transformWithSubqueries(plan) { case ScanWithDeletionVectors(dvScan) => dvScan }
}
}

object ScanWithDeletionVectors {
def unapply(a: LogicalRelation): Option[LogicalPlan] = a match {
case scan @ LogicalRelation(
relation @ HadoopFsRelation(
index: TahoeFileIndex,
_,
_,
_,
format: DeltaParquetFileFormat,
_),
_,
_,
_) =>
dvEnabledScanFor(scan, relation, format, index)
case scan @ LogicalRelation(
relation @ HadoopFsRelation(
index: TahoeFileIndex,
_,
_,
_,
format: GlutenDeltaParquetFileFormat,
_),
_,
_,
_) =>
dvEnabledScanFor(scan, relation, format, index)
case _ => None
}

def dvEnabledScanFor(
scan: LogicalRelation,
hadoopRelation: HadoopFsRelation,
fileFormat: DeltaParquetFileFormat,
index: TahoeFileIndex): Option[LogicalPlan] = {
if (!deletionVectorsReadable(index.protocol, index.metadata)) {
return None
}

require(
!index.isInstanceOf[TahoeLogFileIndex],
"Cannot work with a non-pinned table snapshot of the TahoeFileIndex")

if (fileFormat.hasTablePath) {
return None
}

val filesWithDVs = index
.matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral))
.filter(_.deletionVector != null)
if (filesWithDVs.isEmpty) {
return None
}

val planOutput = scan.output
val spark = SparkSession.getActiveSession.get
val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation)
val rowIndexFilter = createRowIndexFilterNode(newScan)
Some(Project(planOutput, rowIndexFilter))
}

def dvEnabledScanFor(
scan: LogicalRelation,
hadoopRelation: HadoopFsRelation,
fileFormat: GlutenDeltaParquetFileFormat,
index: TahoeFileIndex): Option[LogicalPlan] = {
if (!deletionVectorsReadable(index.protocol, index.metadata)) {
return None
}

require(
!index.isInstanceOf[TahoeLogFileIndex],
"Cannot work with a non-pinned table snapshot of the TahoeFileIndex")

if (fileFormat.hasTablePath) {
return None
}

val filesWithDVs = index
.matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral))
.filter(_.deletionVector != null)
if (filesWithDVs.isEmpty) {
return None
}

val planOutput = scan.output
val spark = SparkSession.getActiveSession.get
val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation)
val rowIndexFilter = createRowIndexFilterNode(newScan)
Some(Project(planOutput, rowIndexFilter))
}

private def addRowIndexIfMissing(attribute: AttributeReference): AttributeReference = {
require(attribute.name == METADATA_NAME)

val dataType = attribute.dataType.asInstanceOf[StructType]
if (dataType.fieldNames.contains(ParquetFileFormat.ROW_INDEX)) {
return attribute
}

val newDatatype = dataType.add(ParquetFileFormat.ROW_INDEX_FIELD)
attribute.copy(dataType = newDatatype)(
exprId = attribute.exprId,
qualifier = attribute.qualifier)
}

private def createScanWithSkipRowColumn(
spark: SparkSession,
inputScan: LogicalRelation,
fileFormat: DeltaParquetFileFormat,
tahoeFileIndex: TahoeFileIndex,
hadoopFsRelation: HadoopFsRelation): LogicalRelation = {
val useMetadataRowIndex =
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)

val skipRowField = IS_ROW_DELETED_STRUCT_FIELD
val scanOutputWithMetadata = if (useMetadataRowIndex) {
if (inputScan.output.map(_.name).contains(METADATA_NAME)) {
inputScan.output.collect {
case a: AttributeReference if a.name == METADATA_NAME => addRowIndexIfMissing(a)
case o => o
}
} else {
inputScan.output :+ fileFormat.createFileMetadataCol()
}
} else {
inputScan.output
}

val newScanOutput =
scanOutputWithMetadata :+ AttributeReference(skipRowField.name, skipRowField.dataType)()
val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField)
val newFileFormat = fileFormat.copyWithDVInfo(
tablePath = tahoeFileIndex.path.toString,
optimizationsEnabled = useMetadataRowIndex)

val newRelation = hadoopFsRelation.copy(fileFormat = newFileFormat, dataSchema = newDataSchema)(
hadoopFsRelation.sparkSession)

inputScan.copy(relation = newRelation, output = newScanOutput)
}

private def createScanWithSkipRowColumn(
spark: SparkSession,
inputScan: LogicalRelation,
fileFormat: GlutenDeltaParquetFileFormat,
tahoeFileIndex: TahoeFileIndex,
hadoopFsRelation: HadoopFsRelation): LogicalRelation = {
val useMetadataRowIndex =
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)

val skipRowField = GlutenDeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD
val scanOutputWithMetadata = if (useMetadataRowIndex) {
if (inputScan.output.map(_.name).contains(METADATA_NAME)) {
inputScan.output.collect {
case a: AttributeReference if a.name == METADATA_NAME => addRowIndexIfMissing(a)
case o => o
}
} else {
inputScan.output :+ fileFormat.createFileMetadataCol()
}
} else {
inputScan.output
}

val newScanOutput =
scanOutputWithMetadata :+ AttributeReference(skipRowField.name, skipRowField.dataType)()
val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField)
val newFileFormat = fileFormat.copyWithDVInfo(
tablePath = tahoeFileIndex.path.toString,
optimizationsEnabled = useMetadataRowIndex)

val newRelation = hadoopFsRelation.copy(fileFormat = newFileFormat, dataSchema = newDataSchema)(
hadoopFsRelation.sparkSession)

inputScan.copy(relation = newRelation, output = newScanOutput)
}

private def createRowIndexFilterNode(newScan: LogicalRelation): Filter = {
val skipRowColumnRefs = newScan.output.filter(_.name == IS_ROW_DELETED_COLUMN_NAME)
require(
skipRowColumnRefs.size == 1,
s"Expected only one column with name=$IS_ROW_DELETED_COLUMN_NAME")
val skipRowColumnRef = skipRowColumnRefs.head
Filter(EqualTo(skipRowColumnRef, Literal(RowIndexFilter.KEEP_ROW_VALUE)), newScan)
}
}
Loading
Loading