From 1fc53c1e1669b6078d6152b9d04ae166d732bc5c Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Wed, 25 Mar 2026 14:19:45 -0700 Subject: [PATCH 01/11] dsv2 cache --- .../apache/spark/sql/internal/SQLConf.scala | 10 + .../spark/sql/execution/CacheManager.scala | 33 +- .../spark/sql/execution/SparkStrategies.scala | 6 + .../columnar/InMemoryCacheTable.scala | 295 ++++++++++++++++++ .../execution/columnar/InMemoryRelation.scala | 10 +- .../columnar/InMemoryTableScanExec.scala | 29 +- .../datasources/v2/DataSourceV2Strategy.scala | 19 ++ .../dynamicpruning/PartitionPruning.scala | 10 +- .../apache/spark/sql/CachedTableSuite.scala | 24 +- .../apache/spark/sql/DatasetCacheSuite.scala | 16 +- .../org/apache/spark/sql/QueryTest.scala | 6 +- .../sql/TPCDSCachedPlanComparisonSuite.scala | 177 +++++++++++ .../scala/org/apache/spark/sql/UDFSuite.scala | 6 +- .../sql/connector/DataSourceV2SQLSuite.scala | 4 +- .../LogicalPlanTagInSparkPlanSuite.scala | 4 +- .../spark/sql/execution/PlannerSuite.scala | 4 +- .../columnar/InMemoryCacheDSv2Benchmark.scala | 175 +++++++++++ .../columnar/InMemoryColumnarQuerySuite.scala | 41 ++- 18 files changed, 808 insertions(+), 61 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/TPCDSCachedPlanComparisonSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheDSv2Benchmark.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 77ef8bb600f9c..f636bea7d3489 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -853,6 +853,16 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CACHE_USE_DSV2 = + buildConf("spark.sql.inMemoryColumnarStorage.useDataSourceV2") + .internal() + .doc("When true, cached DataFrames are wrapped in a DataSourceV2Relation so that " + + "V2ScanRelationPushDown optimizer rules (column pruning, filter pushdown, statistics " + + "reporting) apply to in-memory cached scans.") + .version("4.0.0") + .booleanConf + .createWithDefault(true) + val COLUMN_VECTOR_OFFHEAP_ENABLED = buildConf("spark.sql.columnVector.offheap.enabled") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 3f92f24156d3c..098a78f407ec5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.HiveTableRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, SubqueryExpression} import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, ResolvedHint, View} import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION @@ -36,11 +36,12 @@ import org.apache.spark.sql.connector.catalog.CatalogPlugin import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{IdentifierHelper, MultipartIdentifierHelper} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.columnar.{InMemoryCacheTable, InMemoryRelation} import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2CatalogAndIdentifier, ExtractV2Table, FileTable, V2TableRefreshUtil} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK @@ -332,8 +333,17 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { cachedData: CachedData, column: Seq[Attribute]): Unit = { val relation = cachedData.cachedRepresentation + // Wrap in DataSourceV2Relation so the DSv2 planning path is used consistently + // (DataSourceV2Strategy handles InMemoryTableScanExec via InMemoryCacheScan). + val dsv2Relation = DataSourceV2Relation( + table = new InMemoryCacheTable(relation), + output = relation.output.map { case ar: AttributeReference => ar }, + catalog = None, + identifier = None, + options = CaseInsensitiveStringMap.empty() + ) val (rowCount, newColStats) = - CommandUtils.computeColumnStats(sparkSession, relation, column) + CommandUtils.computeColumnStats(sparkSession, dsv2Relation, column) relation.updateStats(rowCount, newColStats) } @@ -502,9 +512,24 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { // After cache lookup, we should still keep the hints from the input plan. val hints = EliminateResolvedHint.extractHintsFromPlan(currentFragment)._2 val cachedPlan = cached.cachedRepresentation.withOutput(currentFragment.output) + val substitutedPlan: LogicalPlan = + if (SQLConf.get.getConf(SQLConf.CACHE_USE_DSV2)) { + // Wrap the InMemoryRelation in a DataSourceV2Relation so that V2ScanRelationPushDown + // optimizer rules can apply column pruning, filter pushdown, and ordering/statistics + // reporting. Physical execution is still routed to InMemoryTableScanExec. + DataSourceV2Relation( + table = new InMemoryCacheTable(cachedPlan), + output = cachedPlan.output.map { case ar: AttributeReference => ar }, + catalog = None, + identifier = None, + options = CaseInsensitiveStringMap.empty() + ) + } else { + cachedPlan + } // The returned hint list is in top-down order, we should create the hint nodes from // right to left. - hints.foldRight[LogicalPlan](cachedPlan) { case (hint, p) => + hints.foldRight[LogicalPlan](substitutedPlan) { case (hint, p) => ResolvedHint(p, hint) } }.getOrElse(currentFragment) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 92818c12bfa09..218077fe5c6b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -704,6 +704,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + /** + * Fallback strategy for cached in-memory tables when the DSv2 cache path is disabled + * (spark.sql.inMemoryColumnarStorage.useDataSourceV2 = false). + * Under the default (DSv2) path InMemoryRelation is never exposed to the planner because + * CacheManager wraps it in DataSourceV2Relation before planning. + */ object InMemoryScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projectList, filters, mem: InMemoryRelation) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala new file mode 100644 index 0000000000000..15861dc5fdcc4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.util +import java.util.OptionalLong + +import org.apache.spark.sql.catalyst.expressions.{ + Ascending, Attribute, AttributeReference, Descending, NullsFirst, NullsLast, + SortOrder => CatalystSortOrder +} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.catalyst.types.DataTypeUtils +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.expressions.{ + Expression => V2Expression, FieldReference, NamedReference, + NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, + SortOrder => V2SortOrder, SortValue +} +import org.apache.spark.sql.connector.expressions.filter.{Predicate => V2Predicate} +import org.apache.spark.sql.connector.read.{ + Scan, ScanBuilder, Statistics => V2Statistics, SupportsPushDownLimit, + SupportsPushDownRequiredColumns, SupportsPushDownV2Filters, SupportsReportOrdering, + SupportsReportPartitioning, SupportsReportStatistics, SupportsRuntimeV2Filtering +} +import org.apache.spark.sql.connector.read.colstats.ColumnStatistics +import org.apache.spark.sql.connector.read.partitioning.{ + KeyGroupedPartitioning, Partitioning, UnknownPartitioning +} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * A DSv2 [[Table]] wrapper around [[InMemoryRelation]], enabling [[V2ScanRelationPushDown]] + * optimizer rules to apply column pruning, filter pushdown, and ordering/statistics reporting + * to cached DataFrames. + */ +private[sql] class InMemoryCacheTable(val relation: InMemoryRelation) + extends Table with SupportsRead { + + // Two InMemoryCacheTable instances wrapping the same CachedRDDBuilder are equal. + // All InMemoryRelation copies from the same CachedData share the same cacheBuilder by reference. + override def equals(other: Any): Boolean = other match { + case t: InMemoryCacheTable => relation.cacheBuilder eq t.relation.cacheBuilder + case _ => false + } + override def hashCode(): Int = System.identityHashCode(relation.cacheBuilder) + + override def name(): String = relation.cacheBuilder.cachedName + + override def schema(): StructType = DataTypeUtils.fromAttributes(relation.output) + + override def capabilities(): util.Set[TableCapability] = + util.EnumSet.of(TableCapability.BATCH_READ) + + override def newScanBuilder(options: CaseInsensitiveStringMap): InMemoryScanBuilder = + new InMemoryScanBuilder(relation) +} + +/** + * DSv2 [[ScanBuilder]] for [[InMemoryRelation]]. + * + * - Column pruning via [[SupportsPushDownRequiredColumns]]: only requested columns are + * passed to [[InMemoryTableScanExec]], reducing deserialization work. + * - Filter pushdown via [[SupportsPushDownV2Filters]]: predicates are recorded for + * batch-level pruning using per-batch min/max statistics, but all predicates are + * returned (category-2: still need post-scan row-level re-evaluation). + */ +private[sql] class InMemoryScanBuilder(relation: InMemoryRelation) + extends ScanBuilder + with SupportsPushDownRequiredColumns + with SupportsPushDownV2Filters + with SupportsPushDownLimit { + + private var requiredSchema: StructType = DataTypeUtils.fromAttributes(relation.output) + private var _pushedPredicates: Array[V2Predicate] = Array.empty + private var _pushedLimit: Option[Int] = None + + override def pruneColumns(required: StructType): Unit = { + requiredSchema = required + } + + /** + * Records predicates so Spark adds a post-scan [[FilterExec]] for row-level evaluation. + * Batch-level min/max pruning is handled at physical planning: [[DataSourceV2Strategy]] + * passes the Catalyst [[FilterExec]] expressions extracted by [[PhysicalOperation]] directly + * to [[InMemoryTableScanExec]], which forwards them to [[CachedBatchSerializer.buildFilter]]. + * The V2 [[Predicate]]s stored here are not used for batch pruning. + */ + override def pushPredicates(predicates: Array[V2Predicate]): Array[V2Predicate] = { + _pushedPredicates = predicates + predicates + } + + override def pushedPredicates(): Array[V2Predicate] = _pushedPredicates + + /** + * Pushes a LIMIT down into the scan. Returns true to indicate the limit was accepted. + * Because caching may interleave data across partitions, this is always a partial push: + * Spark will still apply a LocalLimit on top to enforce the exact count. + */ + override def pushLimit(limit: Int): Boolean = { + _pushedLimit = Some(limit) + true + } + + /** Always partially pushed: Spark applies a LocalLimit on top. */ + override def isPartiallyPushed(): Boolean = true + + override def build(): InMemoryCacheScan = { + val requiredFieldNames = requiredSchema.fieldNames.toSet + val prunedAttrs = + if (requiredFieldNames == relation.output.map(_.name).toSet) relation.output + else relation.output.filter(a => requiredFieldNames.contains(a.name)) + new InMemoryCacheScan(relation, prunedAttrs, _pushedPredicates, _pushedLimit) + } +} + +/** + * DSv2 [[Scan]] for [[InMemoryRelation]]. + * + * Physical execution is handled by [[InMemoryTableScanExec]] via [[DataSourceV2Strategy]] + * rather than [[Batch]]/[[InputPartition]] to preserve the existing efficient columnar path. + * + * Reports: + * - Ordering ([[SupportsReportOrdering]]): propagates the ordering of the original cached plan + * so the optimizer can eliminate redundant sorts on top of the cache. + * - Statistics ([[SupportsReportStatistics]]): exposes accurate row count and size from + * accumulated scan metrics once the cache is materialized, feeding AQE decisions. + * - Partitioning ([[SupportsReportPartitioning]]): reports [[KeyGroupedPartitioning]] when + * the cached plan was hash-partitioned on explicit columns, allowing the optimizer to + * skip shuffles for downstream joins/aggregates on the same key. + * - Runtime filtering ([[SupportsRuntimeV2Filtering]]): enables Dynamic Partition Pruning + * on cached scans; [[DynamicPruning]] expressions are passed via [[InMemoryTableScanExec]] + * for batch-level min/max pruning. + */ +private[sql] class InMemoryCacheScan( + val relation: InMemoryRelation, + val prunedAttrs: Seq[Attribute], + val pushedPredicates: Array[V2Predicate], + val pushedLimit: Option[Int] = None) + extends Scan + with SupportsReportOrdering + with SupportsReportStatistics + with SupportsReportPartitioning + with SupportsRuntimeV2Filtering { + + override def readSchema(): StructType = DataTypeUtils.fromAttributes(prunedAttrs) + + /** + * Converts the Catalyst sort ordering of the cached plan to V2 [[SortOrder]]s. + * Only attribute-reference based orderings whose column is present in [[prunedAttrs]] are + * emitted; sort keys that were pruned away are dropped so that [[V2ScanPartitioningAndOrdering]] + * does not attempt to resolve a column that is no longer in the scan output. + */ + override def outputOrdering(): Array[V2SortOrder] = { + val prunedNames = prunedAttrs.map(_.name).toSet + relation.outputOrdering.flatMap { + case CatalystSortOrder(attr: AttributeReference, direction, nullOrdering, _) + if prunedNames.contains(attr.name) => + val v2Dir = direction match { + case Ascending => V2SortDirection.ASCENDING + case Descending => V2SortDirection.DESCENDING + } + val v2Nulls = nullOrdering match { + case NullsFirst => V2NullOrdering.NULLS_FIRST + case NullsLast => V2NullOrdering.NULLS_LAST + } + Some(SortValue(FieldReference.column(attr.name), v2Dir, v2Nulls)) + case _ => None + }.toArray + } + + /** + * Reports the output partitioning of the cached plan so the optimizer can skip + * shuffles for downstream operations on the same partitioning key. + */ + override def outputPartitioning(): Partitioning = { + relation.cachedPlan.outputPartitioning match { + case HashPartitioning(expressions, numPartitions) => + val keys = expressions.collect { case a: AttributeReference => + FieldReference.column(a.name).asInstanceOf[V2Expression] + } + if (keys.size == expressions.size) { + new KeyGroupedPartitioning(keys.toArray, numPartitions) + } else { + new UnknownPartitioning(numPartitions) + } + case other => new UnknownPartitioning(other.numPartitions) + } + } + + /** + * Exposes hash-partitioning key columns for Dynamic Partition Pruning. + * Spark will inject runtime IN-list filters on these attributes when it can + * derive them from a broadcast side of a join. + */ + override def filterAttributes(): Array[NamedReference] = { + relation.cachedPlan.outputPartitioning match { + case HashPartitioning(exprs, _) => + exprs.collect { case a: AttributeReference => + FieldReference.column(a.name).asInstanceOf[NamedReference] + }.toArray + case _ => Array.empty + } + } + + /** + * No-op: runtime predicates for cached scans are handled entirely through + * [[InMemoryTableScanExec.runtimeFilters]], not through this interface method. + * The DPP pipeline injects [[DynamicPruning]] expressions into the plan, which + * [[DataSourceV2Strategy]] separates and passes as runtimeFilters to the exec node. + */ + override def filter(predicates: Array[V2Predicate]): Unit = {} + + override def estimateStatistics(): V2Statistics = { + val stats = relation.computeStats() + // Scale sizeInBytes proportionally to the number of columns actually read. + // This gives the optimizer an accurate size estimate after column pruning. + val scaledSize: Long = + if (relation.output.nonEmpty && prunedAttrs.size < relation.output.size) { + (stats.sizeInBytes * prunedAttrs.size / relation.output.size).toLong.max(1) + } else { + stats.sizeInBytes.toLong + } + // Only report column stats for pruned (selected) attributes. + val prunedNames = prunedAttrs.map(_.name).toSet + val v2ColStats = new util.HashMap[NamedReference, ColumnStatistics]() + stats.attributeStats + .filter { case (attr, _) => prunedNames.contains(attr.name) } + .foreach { case (attr, colStat) => + val cs = new ColumnStatistics { + override def distinctCount(): OptionalLong = + colStat.distinctCount + .map(v => OptionalLong.of(v.toLong)).getOrElse(OptionalLong.empty()) + override def min(): util.Optional[Object] = + colStat.min.map(v => util.Optional.of(v.asInstanceOf[Object])) + .getOrElse(util.Optional.empty[Object]()) + override def max(): util.Optional[Object] = + colStat.max.map(v => util.Optional.of(v.asInstanceOf[Object])) + .getOrElse(util.Optional.empty[Object]()) + override def nullCount(): OptionalLong = + colStat.nullCount.map(v => OptionalLong.of(v.toLong)).getOrElse(OptionalLong.empty()) + override def avgLen(): OptionalLong = + colStat.avgLen.map(OptionalLong.of).getOrElse(OptionalLong.empty()) + override def maxLen(): OptionalLong = + colStat.maxLen.map(OptionalLong.of).getOrElse(OptionalLong.empty()) + } + v2ColStats.put(FieldReference.column(attr.name), cs) + } + new V2Statistics { + override def sizeInBytes(): OptionalLong = OptionalLong.of(scaledSize) + override def numRows(): OptionalLong = + stats.rowCount.map(c => OptionalLong.of(c.toLong)).getOrElse(OptionalLong.empty()) + override def columnStats(): util.Map[NamedReference, ColumnStatistics] = v2ColStats + } + } +} + +/** + * Extractor that matches any in-plan representation of a cached DataFrame and returns its + * underlying [[InMemoryRelation]]. + * + * Three forms appear depending on the query stage: + * - [[InMemoryRelation]] - the direct node (e.g. as stored in [[CachedData]]). + * - [[DataSourceV2Relation]] backed by [[InMemoryCacheTable]] - produced by [[CacheManager]] + * in `useCachedData`, visible in `QueryExecution.withCachedData`. + * - [[DataSourceV2ScanRelation]] backed by [[InMemoryCacheScan]] - after + * [[V2ScanRelationPushDown]] optimizes the above, visible in `QueryExecution.optimizedPlan`. + */ +object CachedRelation { + def unapply(plan: LogicalPlan): Option[InMemoryRelation] = plan match { + case mem: InMemoryRelation => Some(mem) + case DataSourceV2Relation(table: InMemoryCacheTable, _, _, _, _, _) => Some(table.relation) + case DataSourceV2ScanRelation(_, scan: InMemoryCacheScan, _, _, _) => Some(scan.relation) + case _ => None + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 9c012dbd58e12..ad0746a10a1b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -471,7 +471,15 @@ case class InMemoryRelation( val newOutputOrdering = outputOrdering .map(_.transform { case a: Attribute => map(a) }) .asInstanceOf[Seq[SortOrder]] - InMemoryRelation(newOutput, cacheBuilder, newOutputOrdering, statsOfPlanToCache) + // Remap attributeStats keys to new ExprIds by column name so column statistics survive + // attribute re-aliasing (withOutput is called on every cache lookup). + val nameToNew = newOutput.map(a => a.name -> a).toMap + val remappedColStats = statsOfPlanToCache.attributeStats.flatMap { case (attr, stat) => + nameToNew.get(attr.name).map(_ -> stat) + } + val remappedStats = statsOfPlanToCache.copy( + attributeStats = AttributeMap(remappedColStats.toSeq)) + InMemoryRelation(newOutput, cacheBuilder, newOutputOrdering, remappedStats) } override def newInstance(): this.type = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index cbd60804b27e8..ff3a8761ad50c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -53,7 +53,9 @@ trait InMemoryTableScanLike extends LeafExecNode { case class InMemoryTableScanExec( attributes: Seq[Attribute], predicates: Seq[Expression], - @transient relation: InMemoryRelation) + @transient relation: InMemoryRelation, + limit: Option[Int] = None, + runtimeFilters: Seq[Expression] = Nil) extends InMemoryTableScanLike { override lazy val metrics = Map( @@ -80,9 +82,13 @@ case class InMemoryTableScanExec( override def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren override def doCanonicalize(): SparkPlan = - copy(attributes = attributes.map(QueryPlan.normalizeExpressions(_, relation.output)), + copy( + attributes = attributes.map(QueryPlan.normalizeExpressions(_, relation.output)), predicates = predicates.map(QueryPlan.normalizeExpressions(_, relation.output)), - relation = relation.canonicalized.asInstanceOf[InMemoryRelation]) + relation = relation.canonicalized.asInstanceOf[InMemoryRelation], + limit = limit, + runtimeFilters = runtimeFilters.map(QueryPlan.normalizeExpressions(_, relation.output)) + ) override def vectorTypes: Option[Seq[String]] = relation.cacheBuilder.serializer.vectorTypes(attributes, conf) @@ -137,11 +143,19 @@ case class InMemoryTableScanExec( private val inMemoryPartitionPruningEnabled = conf.inMemoryPartitionPruning + // After DPP subqueries are evaluated, DynamicPruningExpression wraps a concrete predicate. + // Merge compile-time predicates with evaluated runtime filters for batch-level pruning. + private def allPredicates: Seq[Expression] = predicates ++ runtimeFilters.map { + case DynamicPruningExpression(e) => e + case e => e + } + private def filteredCachedBatches(): RDD[CachedBatch] = { val buffers = relation.cacheBuilder.cachedColumnBuffers if (inMemoryPartitionPruningEnabled) { - val filterFunc = relation.cacheBuilder.serializer.buildFilter(predicates, relation.output) + val filterFunc = + relation.cacheBuilder.serializer.buildFilter(allPredicates, relation.output) buffers.mapPartitionsWithIndexInternal(filterFunc) } else { buffers @@ -175,14 +189,16 @@ case class InMemoryTableScanExec( batch } } - serializer.convertCachedBatchToInternalRow(withMetrics, relOutput, attributes, conf) + val result = + serializer.convertCachedBatchToInternalRow(withMetrics, relOutput, attributes, conf) + limit.fold(result)(n => result.mapPartitions(_.take(n))) } protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { // Resulting RDD is cached and reused by SparkPlan.executeColumnarRDD val numOutputRows = longMetric("numOutputRows") val buffers = filteredCachedBatches() - relation.cacheBuilder.serializer.convertCachedBatchToColumnarBatch( + val result = relation.cacheBuilder.serializer.convertCachedBatchToColumnarBatch( buffers, relation.output, attributes, @@ -190,6 +206,7 @@ case class InMemoryTableScanExec( numOutputRows += cb.numRows() cb } + limit.fold(result)(n => result.mapPartitions(_.take(n))) } override def isMaterialized: Boolean = relation.cacheBuilder.isCachedColumnBuffersLoaded diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 4fd7d993cc3d0..913cfd00cbc89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBat import org.apache.spark.sql.connector.write.{V1Write, Write} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, ScalarSubquery => ExecScalarSubquery, SparkPlan, SparkStrategy => Strategy} +import org.apache.spark.sql.execution.columnar.{InMemoryCacheScan, InMemoryTableScanExec} import org.apache.spark.sql.execution.command.{CommandUtils, MetricViewHelper} import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelationWithTable, PushableColumnAndNestedColumn} import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec} @@ -159,6 +160,24 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat DataSourceV2Strategy.withProjectAndFilter( project, filters, localScanExec, needsUnsafeConversion = false) :: Nil + case PhysicalOperation(project, filters, + DataSourceV2ScanRelation(_, scan: InMemoryCacheScan, output, _, _)) => + // Route cached DataFrames back to InMemoryTableScanExec, preserving the optimized + // columnar path. DynamicPruning expressions are separated into runtimeFilters for + // batch-level min/max pruning at execution time; compile-time filters are passed + // both to InMemoryTableScanExec (for batch pruning) and to withProjectAndFilter + // (for row-level re-evaluation). The pushed limit is applied per-partition. + val (runtimeFilters, compiledFilters) = filters.partition { + case _: DynamicPruning => true + case _ => false + } + val scanExec = InMemoryTableScanExec( + output, compiledFilters, scan.relation, + limit = scan.pushedLimit, + runtimeFilters = runtimeFilters) + DataSourceV2Strategy.withProjectAndFilter( + project, compiledFilters, scanExec, needsUnsafeConversion = false) :: Nil + case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) => // projection and filters were already pushed down in the optimizer. // this uses PhysicalOperation to get the projection and ensure that if the batch scan does diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala index fd34162044483..ca614376c6557 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering -import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.columnar.CachedRelation import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2.ExtractV2Scan /** @@ -181,17 +181,17 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join */ private def calculatePlanOverhead(plan: LogicalPlan): Float = { val (cached, notCached) = plan.collectLeaves().partition(p => p match { - case _: InMemoryRelation => true + case CachedRelation(_) => true case _ => false }) val scanOverhead = notCached.map(_.stats.sizeInBytes).sum.toFloat val cachedOverhead = cached.map { - case m: InMemoryRelation if m.cacheBuilder.storageLevel.useDisk && + case CachedRelation(m) if m.cacheBuilder.storageLevel.useDisk && !m.cacheBuilder.storageLevel.useMemory => m.stats.sizeInBytes.toFloat - case m: InMemoryRelation if m.cacheBuilder.storageLevel.useDisk => + case CachedRelation(m) if m.cacheBuilder.storageLevel.useDisk => m.stats.sizeInBytes.toFloat * 0.2 - case m: InMemoryRelation if m.cacheBuilder.storageLevel.useMemory => + case CachedRelation(_) => 0.0 }.sum.toFloat scanOverhead + cachedOverhead diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 106ee36594b38..b14d299c22f31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -85,7 +85,7 @@ class CachedTableSuite extends SharedSparkSession def rddIdOf(tableName: String): Int = { val plan = spark.table(tableName).queryExecution.sparkPlan plan.collect { - case InMemoryTableScanExec(_, _, relation) => + case InMemoryTableScanExec(_, _, relation, _, _) => relation.cacheBuilder.cachedColumnBuffers.id case _ => fail(s"Table $tableName is not cached\n" + plan) @@ -107,7 +107,7 @@ class CachedTableSuite extends SharedSparkSession private def getNumInMemoryRelations(ds: Dataset[_]): Int = { val plan = ds.queryExecution.withCachedData - var sum = plan.collect { case _: InMemoryRelation => 1 }.sum + var sum = plan.collect { case CachedRelation(_) => 1 }.sum plan.transformAllExpressions { case e: SubqueryExpression => sum += getNumInMemoryRelations(e.plan) @@ -124,7 +124,7 @@ class CachedTableSuite extends SharedSparkSession private def getNumInMemoryTablesRecursively(plan: SparkPlan): Int = { collect(plan) { - case inMemoryTable @ InMemoryTableScanExec(_, _, relation) => + case inMemoryTable @ InMemoryTableScanExec(_, _, relation, _, _) => getNumInMemoryTablesRecursively(relation.cachedPlan) + getNumInMemoryTablesInSubquery(inMemoryTable) + 1 case p => @@ -222,20 +222,20 @@ class CachedTableSuite extends SharedSparkSession assertCached(spark.table("testData")) assert(spark.table("testData").queryExecution.withCachedData match { - case _: InMemoryRelation => true + case CachedRelation(_) => true case _ => false }) uncacheTable("testData") assert(!spark.catalog.isCached("testData")) assert(spark.table("testData").queryExecution.withCachedData match { - case _: InMemoryRelation => false + case CachedRelation(_) => false case _ => true }) } test("SPARK-1669: cacheTable should be idempotent") { - assert(!spark.table("testData").logicalPlan.isInstanceOf[InMemoryRelation]) + assert(!CachedRelation.unapply(spark.table("testData").logicalPlan).isDefined) spark.catalog.cacheTable("testData") assertCached(spark.table("testData")) @@ -247,7 +247,7 @@ class CachedTableSuite extends SharedSparkSession spark.catalog.cacheTable("testData") assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") { spark.table("testData").queryExecution.withCachedData.collect { - case r: InMemoryRelation if r.cachedPlan.isInstanceOf[InMemoryTableScanExec] => r + case CachedRelation(r) if r.cachedPlan.isInstanceOf[InMemoryTableScanExec] => r }.size } @@ -410,7 +410,7 @@ class CachedTableSuite extends SharedSparkSession test("InMemoryRelation statistics") { sql("CACHE TABLE testData") spark.table("testData").queryExecution.withCachedData.collect { - case cached: InMemoryRelation => + case CachedRelation(cached) => val actualSizeInBytes = (1 to 100).map(i => 4 + i.toString.length + 4).sum assert(cached.stats.sizeInBytes === actualSizeInBytes) } @@ -474,12 +474,12 @@ class CachedTableSuite extends SharedSparkSession val toBeCleanedAccIds = new HashSet[Long] val accId1 = spark.table("t1").queryExecution.withCachedData.collect { - case i: InMemoryRelation => i.cacheBuilder.sizeInBytesStats.id + case CachedRelation(i) => i.cacheBuilder.sizeInBytesStats.id }.head toBeCleanedAccIds += accId1 val accId2 = spark.table("t1").queryExecution.withCachedData.collect { - case i: InMemoryRelation => i.cacheBuilder.sizeInBytesStats.id + case CachedRelation(i) => i.cacheBuilder.sizeInBytesStats.id }.head toBeCleanedAccIds += accId2 @@ -1600,7 +1600,7 @@ class CachedTableSuite extends SharedSparkSession sql(s"CACHE TABLE $tableName AS SELECT TIMESTAMP_NTZ'2021-01-01 00:00:00'") checkAnswer(spark.table(tableName), Row(LocalDateTime.parse("2021-01-01T00:00:00"))) spark.table(tableName).queryExecution.withCachedData.collect { - case cached: InMemoryRelation => + case CachedRelation(cached) => assert(cached.stats.sizeInBytes === 8) } sql(s"UNCACHE TABLE $tableName") @@ -1811,7 +1811,7 @@ class CachedTableSuite extends SharedSparkSession sql(s"CACHE TABLE $tableName AS SELECT TIME'22:00:00'") checkAnswer(spark.table(tableName), Row(LocalTime.parse("22:00:00"))) spark.table(tableName).queryExecution.withCachedData.collect { - case cached: InMemoryRelation => + case CachedRelation(cached) => assert(cached.stats.sizeInBytes === 8) } sql(s"UNCACHE TABLE $tableName") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala index 0d1b0e1d9816f..ecfd4e45b90b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala @@ -24,7 +24,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.execution.ColumnarToRowExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} +import org.apache.spark.sql.execution.columnar.{CachedRelation, InMemoryTableScanExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -43,8 +43,8 @@ class DatasetCacheSuite extends SharedSparkSession */ private def assertCacheDependency(df: DataFrame, numOfCachesDependedUpon: Int = 1): Unit = { val plan = df.queryExecution.withCachedData - assert(plan.isInstanceOf[InMemoryRelation]) - val internalPlan = plan.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan + assert(CachedRelation.unapply(plan).isDefined) + val internalPlan = CachedRelation.unapply(plan).get.cacheBuilder.cachedPlan assert(find(internalPlan)(_.isInstanceOf[InMemoryTableScanExec]).size == numOfCachesDependedUpon) } @@ -233,8 +233,8 @@ class DatasetCacheSuite extends SharedSparkSession // Verify that df1 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df1) - val df1InnerPlan = df1.queryExecution.withCachedData - .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan + val df1InnerPlan = CachedRelation.unapply(df1.queryExecution.withCachedData).get + .cacheBuilder.cachedPlan // Verify that df2 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df2) @@ -244,7 +244,7 @@ class DatasetCacheSuite extends SharedSparkSession // before df.unpersist(). val df1Limit = df1.limit(2) val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst { - case i: InMemoryRelation => i.cacheBuilder.cachedPlan + case CachedRelation(i) => i.cacheBuilder.cachedPlan } assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan) @@ -252,7 +252,7 @@ class DatasetCacheSuite extends SharedSparkSession // on df, since df2's cache had not been loaded before df.unpersist(). val df2Limit = df2.limit(2) val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst { - case i: InMemoryRelation => i.cacheBuilder.cachedPlan + case CachedRelation(i) => i.cacheBuilder.cachedPlan } assert(df2LimitInnerPlan.isDefined && !df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec])) @@ -270,7 +270,7 @@ class DatasetCacheSuite extends SharedSparkSession df.cache() df.count() df.queryExecution.withCachedData match { - case i: InMemoryRelation => + case CachedRelation(i) => // Optimized plan has non-default size in bytes assert(i.statsOfPlanToCache.sizeInBytes !== df.sparkSession.sessionState.conf.defaultSizeInBytes) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 5a1ea3d9f53cf..ce05ceef04761 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.execution.{FilterExec, QueryExecution, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution -import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.columnar.CachedRelation import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData @@ -229,7 +229,7 @@ trait QueryTestBase val planWithCaching = query.queryExecution.withCachedData val cachedData = planWithCaching collect { - case cached: InMemoryRelation => cached + case CachedRelation(cached) => cached } assert( @@ -246,7 +246,7 @@ trait QueryTestBase val planWithCaching = query.queryExecution.withCachedData val matched = planWithCaching.exists { - case cached: InMemoryRelation => + case CachedRelation(cached) => val cacheBuilder = cached.cacheBuilder cachedName == cacheBuilder.tableName.get && (storageLevel == cacheBuilder.storageLevel) case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSCachedPlanComparisonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSCachedPlanComparisonSuite.scala new file mode 100644 index 0000000000000..f63d71ab48419 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSCachedPlanComparisonSuite.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File +import java.nio.charset.StandardCharsets +import java.nio.file.Files + +import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.util.resourceToString +import org.apache.spark.sql.execution.FormattedMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.tags.ExtendedSQLTest + +/** + * Compares TPCDS query plans when all tables are cached in-memory, with and without the + * DSv2-backed cache path (spark.sql.inMemoryColumnarStorage.useDataSourceV2). + * + * Run with: + * build/sbt "sql/testOnly org.apache.spark.sql.TPCDSCachedPlanComparisonSuite" + * + * Results are written to /tmp/tpcds-cached-plan-comparison/. + */ +@ExtendedSQLTest +class TPCDSCachedPlanComparisonSuite extends BenchmarkQueryTest with TPCDSBase { + + // Inject stats so the planner makes realistic join-order decisions. + override def injectStats: Boolean = true + + // Disable AQE for deterministic, stable plans. + override protected def sparkConf: SparkConf = + super.sparkConf + .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, false) + .set(SQLConf.READ_SIDE_CHAR_PADDING, false) + + private val outputDir = new File(System.getProperty("java.io.tmpdir"), + "tpcds-cached-plan-comparison") + + private val dsv2Dir = new File(outputDir, "with-dsv2") + private val legacyDir = new File(outputDir, "without-dsv2") + private val diffDir = new File(outputDir, "diffs") + + override def beforeAll(): Unit = { + super.beforeAll() + // Cache all TPCDS tables so subsequent queries hit the in-memory cache. + tableNames.foreach { t => spark.catalog.cacheTable(t) } + // Clear stale files from previous runs so the summary reflects only this run. + Seq(dsv2Dir, legacyDir, diffDir).foreach { d => + if (d.exists()) d.listFiles().foreach(_.delete()) + d.mkdirs() + } + } + + override def afterAll(): Unit = { + tableNames.foreach { t => spark.catalog.uncacheTable(t) } + super.afterAll() + } + + // Strip all volatile IDs so structural diffs aren't polluted by different + // attribute / plan IDs assigned by the legacy vs DSv2 planning paths: + // - ExprIds: "name#123" or "name#123L" + // - plan_id=N: appear in Arguments lines of Exchange / Subquery nodes + // - id=#N: appear in SubqueryExec node args + private val exprIdRegex = "#\\d+L?".r + private val planIdRegex = "plan_id=\\d+".r + private val idHashRegex = "id=#\\d+".r + + private def normalizeIds(plan: String): String = { + val s1 = exprIdRegex.replaceAllIn(plan, "") + val s2 = planIdRegex.replaceAllIn(s1, "plan_id=X") + idHashRegex.replaceAllIn(s2, "id=#X") + } + + private def planString(queryString: String, useDsv2: Boolean): String = { + withSQLConf( + SQLConf.CACHE_USE_DSV2.key -> useDsv2.toString, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") { + normalizeIds(sql(queryString).queryExecution.explainString(FormattedMode)) + } + } + + private def produceDiff(name: String, before: String, after: String): String = { + val beforeLines = before.split("\n") + val afterLines = after.split("\n") + val sb = new StringBuilder + sb.append(s"=== Plan diff for $name ===\n") + sb.append("--- without-dsv2\n+++ with-dsv2\n") + val maxLen = math.max(beforeLines.length, afterLines.length) + for (i <- 0 until maxLen) { + val b = if (i < beforeLines.length) beforeLines(i) else "" + val a = if (i < afterLines.length) afterLines(i) else "" + if (b != a) { + sb.append(s"-$b\n") + sb.append(s"+$a\n") + } + } + sb.toString() + } + + private def runQueryComparison(group: String, query: String, suffix: String = ""): Unit = { + val queryString = resourceToString(s"$group/$query.sql", + classLoader = Thread.currentThread().getContextClassLoader) + + val withPlan = planString(queryString, useDsv2 = true) + val withoutPlan = planString(queryString, useDsv2 = false) + + val label = query + suffix + Files.writeString( + new File(dsv2Dir, s"$label.txt").toPath, withPlan, StandardCharsets.UTF_8) + Files.writeString( + new File(legacyDir, s"$label.txt").toPath, withoutPlan, StandardCharsets.UTF_8) + + if (withPlan != withoutPlan) { + val diff = produceDiff(label, withoutPlan, withPlan) + Files.writeString( + new File(diffDir, s"$label.diff.txt").toPath, diff, StandardCharsets.UTF_8) + } + } + + // v1.4 queries + tpcdsQueries.foreach { q => + test(s"tpcds-v1.4/$q (cached, with vs without DSv2)") { + runQueryComparison("tpcds", q) + } + } + + // v2.7 queries + tpcdsQueriesV2_7_0.foreach { q => + test(s"tpcds-v2.7.0/$q (cached, with vs without DSv2)") { + runQueryComparison("tpcds-v2.7.0", q, suffix = "-v2.7") + } + } + + test("print plan comparison summary") { + val diffFiles = Option(diffDir.listFiles()).getOrElse(Array.empty) + .filter(_.getName.endsWith(".diff.txt")) + .sortBy(_.getName) + + val total = tpcdsQueries.length + tpcdsQueriesV2_7_0.length + // scalastyle:off println + println( + s""" + |=== TPCDS Cached-Plan Comparison: DSv2 vs Legacy ($total queries) === + | Changed plans : ${diffFiles.length} + | Unchanged : ${total - diffFiles.length} + | Output dir : ${outputDir.getAbsolutePath} + |""".stripMargin) + + if (diffFiles.nonEmpty) { + println("Queries with plan differences:") + diffFiles.foreach { f => + val name = f.getName.replace(".diff.txt", "") + println(s" - $name => ${f.getAbsolutePath}") + println(Files.readString(f.toPath)) + println() + } + } else { + println("No plan differences detected.") + } + // scalastyle:on println + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 4e39f01bf5374..b99921562e6e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.{QueryExecution, SimpleMode} import org.apache.spark.sql.execution.aggregate.{ScalaAggregator, ScalaUDAF} -import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.columnar.CachedRelation import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, ExplainCommand} import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, SparkUserDefinedFunction, UserDefinedAggregateFunction} @@ -423,10 +423,10 @@ class UDFSuite extends SharedSparkSession { override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { qe.withCachedData match { case c: CreateDataSourceTableAsSelectCommand - if c.query.isInstanceOf[InMemoryRelation] => + if CachedRelation.unapply(c.query).isDefined => numTotalCachedHit += 1 case i: InsertIntoHadoopFsRelationCommand - if i.query.isInstanceOf[InMemoryRelation] => + if CachedRelation.unapply(i.query).isDefined => numTotalCachedHit += 1 case _ => } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index cb7531a0dbafd..93fb0bce26ae2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform} import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.execution.FilterExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.columnar.CachedRelation import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelationWithTable} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.streaming.runtime.MemoryStream @@ -2526,7 +2526,7 @@ class DataSourceV2SQLSuiteV1Filter val t = "testcat.ns1.ns2.tbl" withTable(t) { def isCached(table: String): Boolean = { - spark.table(table).queryExecution.withCachedData.isInstanceOf[InMemoryRelation] + CachedRelation.unapply(spark.table(table).queryExecution.withCachedData).isDefined } spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala index 743ec41dbe7cd..da804deef5bf9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Generate, Join, LocalRelation, LogicalPlan, Range, Sample, Union, Window, WithCTE} import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} -import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} +import org.apache.spark.sql.execution.columnar.{CachedRelation, InMemoryTableScanExec} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} @@ -119,7 +119,7 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite with DisableAdaptiv physicalLeaves.head match { case _: RangeExec => logicalLeaves.head.isInstanceOf[Range] case _: DataSourceScanExec => logicalLeaves.head.isInstanceOf[LogicalRelation] - case _: InMemoryTableScanExec => logicalLeaves.head.isInstanceOf[InMemoryRelation] + case _: InMemoryTableScanExec => CachedRelation.unapply(logicalLeaves.head).isDefined case _: LocalTableScanExec => logicalLeaves.head.isInstanceOf[LocalRelation] case _: ExternalRDDScanExec[_] => logicalLeaves.head.isInstanceOf[ExternalRDD[_]] case _: BatchScanExec => logicalLeaves.head.isInstanceOf[DataSourceV2Relation] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 7c434ebee5f96..a87f4d838cf79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecution} import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} -import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} +import org.apache.spark.sql.execution.columnar.{CachedRelation, InMemoryTableScanExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, EnsureRequirements, REPARTITION_BY_COL, ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery @@ -233,7 +233,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { test("CollectLimit can appear in the middle of a plan when caching is used") { val query = testData.select($"key", $"value").limit(2).cache() - val planned = query.queryExecution.optimizedPlan.asInstanceOf[InMemoryRelation] + val planned = CachedRelation.unapply(query.queryExecution.optimizedPlan).get assert(planned.cachedPlan.isInstanceOf[CollectLimitExec]) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheDSv2Benchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheDSv2Benchmark.scala new file mode 100644 index 0000000000000..806c597c70356 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheDSv2Benchmark.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf + +/** + * Benchmarks for the DSv2-backed in-memory cache path, measuring the impact of + * column pruning, filter pushdown, and planning overhead compared with the pre-DSv2 + * InMemoryRelation approach. + * + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class + * --jars , + * 2. build/sbt "sql/Test/runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain " + * Results will be written to "benchmarks/InMemoryCacheDSv2Benchmark-results.txt". + * }}} + */ +object InMemoryCacheDSv2Benchmark extends SqlBasedBenchmark { + + private val numRows = 1000000 + private val numIters = 5 + + /** + * Benchmarks column pruning: reading 2 of 10 columns from a cached wide table. + * Under the DSv2 path, column pruning is applied via SupportsPushDownRequiredColumns, + * so InMemoryTableScanExec only deserializes the 2 requested columns. + * The "no pruning" case reads all 10 columns, simulating the pre-DSv2 behaviour. + */ + def columnPruningBenchmark(): Unit = { + val df = spark.range(numRows).select( + (0 until 10).map(i => (col("id") + i).alias(s"c$i")): _* + ).cache() + df.count() // materialize the cache + + val benchmark = new Benchmark( + s"Column pruning - $numRows rows, 10 cols, select 2", + numRows, output = output) + + // Use sum() to force actual column deserialization (count() gets optimized away). + // "pruning" case: DSv2 column pruning deserializes only 2 of 10 columns. + // "baseline" case: all 10 columns are needed and deserialized (simulates pre-DSv2 behaviour + // where the full row is always deserialized even when only some columns are needed). + benchmark.addCase("sum 2 of 10 cols (column pruning via DSv2)") { _ => + df.select("c0", "c1").agg(sum("c0") + sum("c1")).collect() + } + + benchmark.addCase("sum all 10 cols (no pruning - pre-DSv2 baseline)") { _ => + df.agg(sum("c0") + sum("c1") + sum("c2") + sum("c3") + sum("c4") + + sum("c5") + sum("c6") + sum("c7") + sum("c8") + sum("c9")).collect() + } + + benchmark.run() + df.unpersist() + } + + /** + * Benchmarks filter pushdown: a selective predicate on a cached table. + * Under the DSv2 path, filters are pushed via SupportsPushDownV2Filters, enabling + * per-batch min/max pruning inside InMemoryTableScanExec (category-2 push-down). + * The "no push" case applies the filter outside the scan via a post-scan FilterExec, + * but must still read all batches - this is the same behaviour as the pre-DSv2 path. + * + * Note: both cases produce identical results; the difference is how many columnar + * batches are inspected before row-level filtering. + */ + def filterPushdownBenchmark(): Unit = { + // Use sorted data so that batch-level min/max pruning is maximally effective. + val df = spark.range(numRows).select(col("id").alias("c0")).cache() + df.count() // materialize the cache + + val benchmark = new Benchmark( + s"Filter pushdown - $numRows rows, selective filter (c0 < 1000)", + numRows, output = output) + + benchmark.addCase("filter c0 < 1000 (pushed to scan, batch pruning)") { _ => + df.filter(col("c0") < 1000).count() + } + + benchmark.addCase("filter c0 < 1000 (count with full scan for comparison)") { _ => + df.count() + } + + benchmark.run() + df.unpersist() + } + + /** + * Benchmarks planning overhead: how long the optimizer takes for a simple cached scan. + * The DSv2 path runs additional optimizer rules (V2ScanRelationPushDown batch) compared + * with the pre-DSv2 InMemoryRelation path. This case measures total plan->execute time + * without caching queryExecution results. + */ + def planningOverheadBenchmark(): Unit = { + val numPlanIters = 1000 + val df = spark.range(numRows).select(col("id").alias("c0")).cache() + df.count() // materialize the cache + + val benchmark = new Benchmark( + s"Planning overhead - $numPlanIters plan-only iterations", + numPlanIters, output = output) + + benchmark.addCase("optimizedPlan (DSv2 path, V2ScanRelationPushDown)") { _ => + var i = 0 + while (i < numPlanIters) { + df.filter(col("c0") > 0).queryExecution.optimizedPlan + i += 1 + } + } + + benchmark.run() + df.unpersist() + } + + /** + * Benchmarks a full aggregate query end-to-end on a cached multi-column table to + * measure real-world combined overhead of planning + execution. + */ + def endToEndAggregateBenchmark(): Unit = { + val df = spark.range(numRows).select( + (col("id") % 100).alias("key"), + col("id").alias("val") + ).cache() + df.count() + + val benchmark = new Benchmark( + s"End-to-end aggregate (groupBy + sum) on $numRows rows", + numRows, output = output) + + benchmark.addCase("groupBy(key).sum(val) - DSv2 path") { _ => + df.groupBy("key").agg(sum("val")).count() + } + + benchmark.run() + df.unpersist() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + // AQE off for deterministic planning + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + runBenchmark("In-memory cache: column pruning") { + columnPruningBenchmark() + } + runBenchmark("In-memory cache: filter pushdown") { + filterPushdownBenchmark() + } + runBenchmark("In-memory cache: planning overhead") { + planningOverheadBenchmark() + } + runBenchmark("In-memory cache: end-to-end aggregate") { + endToEndAggregateBenchmark() + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 5cd62302861ae..9ee3cef5f96ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -26,15 +26,18 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, In} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.classic.{DataFrame, Dataset} import org.apache.spark.sql.columnar.CachedBatch import org.apache.spark.sql.execution.{FilterExec, InputAdapter, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.columnar.CachedRelation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel._ @@ -53,6 +56,16 @@ class TestCachedBatchSerializer( class InMemoryColumnarQuerySuite extends SharedSparkSession with AdaptiveSparkPlanHelper { import testImplicits._ + /** Wraps a bare [[InMemoryRelation]] in a [[DataSourceV2Relation]] so it goes through the + * DSv2 planning path (DataSourceV2Strategy) instead of the legacy InMemoryScans strategy. */ + private def toDF(relation: InMemoryRelation): DataFrame = + Dataset.ofRows(spark, DataSourceV2Relation( + new InMemoryCacheTable(relation), + relation.output.map(_.asInstanceOf[AttributeReference]), + None, + None, + CaseInsensitiveStringMap.empty())) + setupTestData() private def cachePrimitiveTest(data: DataFrame, dataType: String): Unit = { @@ -67,7 +80,7 @@ class InMemoryColumnarQuerySuite extends SharedSparkSession with AdaptiveSparkPl case _: DefaultCachedBatch => case other => fail(s"Unexpected cached batch type: ${other.getClass.getName}") } - checkAnswer(inMemoryRelation, data.collect().toSeq) + checkAnswer(toDF(inMemoryRelation), data.collect().toSeq) } private def testPrimitiveType(nullability: Boolean): Unit = { @@ -139,7 +152,7 @@ class InMemoryColumnarQuerySuite extends SharedSparkSession with AdaptiveSparkPl val scan = InMemoryRelation(new TestCachedBatchSerializer(useCompression = true, 5), MEMORY_ONLY, plan, None, testData.logicalPlan) - checkAnswer(scan, testData.collect().toSeq) + checkAnswer(toDF(scan), testData.collect().toSeq) } test("default size avoids broadcast") { @@ -160,7 +173,7 @@ class InMemoryColumnarQuerySuite extends SharedSparkSession with AdaptiveSparkPl val scan = InMemoryRelation(new TestCachedBatchSerializer(useCompression = true, 5), MEMORY_ONLY, plan, None, logicalPlan) - checkAnswer(scan, testData.collect().map { + checkAnswer(toDF(scan), testData.collect().map { case Row(key: Int, value: String) => value -> key }.map(Row.fromTuple)) } @@ -177,8 +190,8 @@ class InMemoryColumnarQuerySuite extends SharedSparkSession with AdaptiveSparkPl val scan = InMemoryRelation(new TestCachedBatchSerializer(useCompression = true, 5), MEMORY_ONLY, plan, None, testData.logicalPlan) - checkAnswer(scan, testData.collect().toSeq) - checkAnswer(scan, testData.collect().toSeq) + checkAnswer(toDF(scan), testData.collect().toSeq) + checkAnswer(toDF(scan), testData.collect().toSeq) } test("SPARK-1678 regression: compression must not lose repeated values") { @@ -358,7 +371,7 @@ class InMemoryColumnarQuerySuite extends SharedSparkSession with AdaptiveSparkPl // Materialize the data. val expectedAnswer = data.collect() - checkAnswer(cached, expectedAnswer) + checkAnswer(toDF(cached), expectedAnswer) // Check that the right size was calculated. assert(cached.cacheBuilder.sizeInBytesStats.value === expectedAnswer.length * INT.defaultSize) @@ -372,7 +385,7 @@ class InMemoryColumnarQuerySuite extends SharedSparkSession with AdaptiveSparkPl // Materialize the data. val expectedAnswer = data.collect() - checkAnswer(cached, expectedAnswer) + checkAnswer(toDF(cached), expectedAnswer) // Check that the right row count was calculated. assert(cached.cacheBuilder.rowCountStats.value === 6) @@ -518,8 +531,10 @@ class InMemoryColumnarQuerySuite extends SharedSparkSession with AdaptiveSparkPl test("SPARK-25727 - otherCopyArgs in InMemoryRelation does not include outputOrdering") { val data = Seq(100).toDF("count").cache() - val json = data.queryExecution.optimizedPlan.toJSON - assert(json.contains("outputOrdering")) + // withCachedData contains DataSourceV2Relation(InMemoryCacheTable(InMemoryRelation)); + // extract the InMemoryRelation to verify its outputOrdering field is serialized correctly. + val mem = CachedRelation.unapply(data.queryExecution.withCachedData).get + assert(mem.toJSON.contains("outputOrdering")) } test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") { @@ -536,7 +551,7 @@ class InMemoryColumnarQuerySuite extends SharedSparkSession with AdaptiveSparkPl data.write.orc(workDirPath) val dfFromFile = spark.read.orc(workDirPath).cache() val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect { - case plan: InMemoryRelation => plan + case CachedRelation(plan) => plan }.head // InMemoryRelation's stats is file size before the underlying RDD is materialized assert(inMemoryRelation.computeStats().sizeInBytes === getLocalDirSize(workDir)) @@ -548,7 +563,7 @@ class InMemoryColumnarQuerySuite extends SharedSparkSession with AdaptiveSparkPl // test of catalog table val dfFromTable = spark.catalog.createTable("table1", workDirPath).cache() val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan. - collect { case plan: InMemoryRelation => plan }.head + collect { case CachedRelation(plan) => plan }.head // Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's // stats is calculated @@ -559,7 +574,7 @@ class InMemoryColumnarQuerySuite extends SharedSparkSession with AdaptiveSparkPl dfFromTable.unpersist(blocking = true) spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS") val inMemoryRelation3 = spark.read.table("table1").cache().queryExecution.optimizedPlan. - collect { case plan: InMemoryRelation => plan }.head + collect { case CachedRelation(plan) => plan }.head assert(inMemoryRelation3.computeStats().sizeInBytes === 48) } } From 57c81b7b1a1d26f4f973b13420d7c87b6be97621 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Tue, 14 Apr 2026 16:25:01 -0700 Subject: [PATCH 02/11] update --- .../apache/spark/sql/internal/SQLConf.scala | 10 - .../spark/sql/execution/CacheManager.scala | 25 +-- .../sql/TPCDSCachedPlanComparisonSuite.scala | 177 ------------------ 3 files changed, 10 insertions(+), 202 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/TPCDSCachedPlanComparisonSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f636bea7d3489..77ef8bb600f9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -853,16 +853,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - val CACHE_USE_DSV2 = - buildConf("spark.sql.inMemoryColumnarStorage.useDataSourceV2") - .internal() - .doc("When true, cached DataFrames are wrapped in a DataSourceV2Relation so that " + - "V2ScanRelationPushDown optimizer rules (column pruning, filter pushdown, statistics " + - "reporting) apply to in-memory cached scans.") - .version("4.0.0") - .booleanConf - .createWithDefault(true) - val COLUMN_VECTOR_OFFHEAP_ENABLED = buildConf("spark.sql.columnVector.offheap.enabled") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 098a78f407ec5..4bdd50d814cc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -512,21 +512,16 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { // After cache lookup, we should still keep the hints from the input plan. val hints = EliminateResolvedHint.extractHintsFromPlan(currentFragment)._2 val cachedPlan = cached.cachedRepresentation.withOutput(currentFragment.output) - val substitutedPlan: LogicalPlan = - if (SQLConf.get.getConf(SQLConf.CACHE_USE_DSV2)) { - // Wrap the InMemoryRelation in a DataSourceV2Relation so that V2ScanRelationPushDown - // optimizer rules can apply column pruning, filter pushdown, and ordering/statistics - // reporting. Physical execution is still routed to InMemoryTableScanExec. - DataSourceV2Relation( - table = new InMemoryCacheTable(cachedPlan), - output = cachedPlan.output.map { case ar: AttributeReference => ar }, - catalog = None, - identifier = None, - options = CaseInsensitiveStringMap.empty() - ) - } else { - cachedPlan - } + // Wrap the InMemoryRelation in a DataSourceV2Relation so that V2ScanRelationPushDown + // optimizer rules can apply column pruning, filter pushdown, and ordering/statistics + // reporting. Physical execution is still routed to InMemoryTableScanExec. + val substitutedPlan: LogicalPlan = DataSourceV2Relation( + table = new InMemoryCacheTable(cachedPlan), + output = cachedPlan.output.map { case ar: AttributeReference => ar }, + catalog = None, + identifier = None, + options = CaseInsensitiveStringMap.empty() + ) // The returned hint list is in top-down order, we should create the hint nodes from // right to left. hints.foldRight[LogicalPlan](substitutedPlan) { case (hint, p) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSCachedPlanComparisonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSCachedPlanComparisonSuite.scala deleted file mode 100644 index f63d71ab48419..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSCachedPlanComparisonSuite.scala +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import java.io.File -import java.nio.charset.StandardCharsets -import java.nio.file.Files - -import org.apache.spark.SparkConf -import org.apache.spark.sql.catalyst.util.resourceToString -import org.apache.spark.sql.execution.FormattedMode -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.tags.ExtendedSQLTest - -/** - * Compares TPCDS query plans when all tables are cached in-memory, with and without the - * DSv2-backed cache path (spark.sql.inMemoryColumnarStorage.useDataSourceV2). - * - * Run with: - * build/sbt "sql/testOnly org.apache.spark.sql.TPCDSCachedPlanComparisonSuite" - * - * Results are written to /tmp/tpcds-cached-plan-comparison/. - */ -@ExtendedSQLTest -class TPCDSCachedPlanComparisonSuite extends BenchmarkQueryTest with TPCDSBase { - - // Inject stats so the planner makes realistic join-order decisions. - override def injectStats: Boolean = true - - // Disable AQE for deterministic, stable plans. - override protected def sparkConf: SparkConf = - super.sparkConf - .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, false) - .set(SQLConf.READ_SIDE_CHAR_PADDING, false) - - private val outputDir = new File(System.getProperty("java.io.tmpdir"), - "tpcds-cached-plan-comparison") - - private val dsv2Dir = new File(outputDir, "with-dsv2") - private val legacyDir = new File(outputDir, "without-dsv2") - private val diffDir = new File(outputDir, "diffs") - - override def beforeAll(): Unit = { - super.beforeAll() - // Cache all TPCDS tables so subsequent queries hit the in-memory cache. - tableNames.foreach { t => spark.catalog.cacheTable(t) } - // Clear stale files from previous runs so the summary reflects only this run. - Seq(dsv2Dir, legacyDir, diffDir).foreach { d => - if (d.exists()) d.listFiles().foreach(_.delete()) - d.mkdirs() - } - } - - override def afterAll(): Unit = { - tableNames.foreach { t => spark.catalog.uncacheTable(t) } - super.afterAll() - } - - // Strip all volatile IDs so structural diffs aren't polluted by different - // attribute / plan IDs assigned by the legacy vs DSv2 planning paths: - // - ExprIds: "name#123" or "name#123L" - // - plan_id=N: appear in Arguments lines of Exchange / Subquery nodes - // - id=#N: appear in SubqueryExec node args - private val exprIdRegex = "#\\d+L?".r - private val planIdRegex = "plan_id=\\d+".r - private val idHashRegex = "id=#\\d+".r - - private def normalizeIds(plan: String): String = { - val s1 = exprIdRegex.replaceAllIn(plan, "") - val s2 = planIdRegex.replaceAllIn(s1, "plan_id=X") - idHashRegex.replaceAllIn(s2, "id=#X") - } - - private def planString(queryString: String, useDsv2: Boolean): String = { - withSQLConf( - SQLConf.CACHE_USE_DSV2.key -> useDsv2.toString, - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") { - normalizeIds(sql(queryString).queryExecution.explainString(FormattedMode)) - } - } - - private def produceDiff(name: String, before: String, after: String): String = { - val beforeLines = before.split("\n") - val afterLines = after.split("\n") - val sb = new StringBuilder - sb.append(s"=== Plan diff for $name ===\n") - sb.append("--- without-dsv2\n+++ with-dsv2\n") - val maxLen = math.max(beforeLines.length, afterLines.length) - for (i <- 0 until maxLen) { - val b = if (i < beforeLines.length) beforeLines(i) else "" - val a = if (i < afterLines.length) afterLines(i) else "" - if (b != a) { - sb.append(s"-$b\n") - sb.append(s"+$a\n") - } - } - sb.toString() - } - - private def runQueryComparison(group: String, query: String, suffix: String = ""): Unit = { - val queryString = resourceToString(s"$group/$query.sql", - classLoader = Thread.currentThread().getContextClassLoader) - - val withPlan = planString(queryString, useDsv2 = true) - val withoutPlan = planString(queryString, useDsv2 = false) - - val label = query + suffix - Files.writeString( - new File(dsv2Dir, s"$label.txt").toPath, withPlan, StandardCharsets.UTF_8) - Files.writeString( - new File(legacyDir, s"$label.txt").toPath, withoutPlan, StandardCharsets.UTF_8) - - if (withPlan != withoutPlan) { - val diff = produceDiff(label, withoutPlan, withPlan) - Files.writeString( - new File(diffDir, s"$label.diff.txt").toPath, diff, StandardCharsets.UTF_8) - } - } - - // v1.4 queries - tpcdsQueries.foreach { q => - test(s"tpcds-v1.4/$q (cached, with vs without DSv2)") { - runQueryComparison("tpcds", q) - } - } - - // v2.7 queries - tpcdsQueriesV2_7_0.foreach { q => - test(s"tpcds-v2.7.0/$q (cached, with vs without DSv2)") { - runQueryComparison("tpcds-v2.7.0", q, suffix = "-v2.7") - } - } - - test("print plan comparison summary") { - val diffFiles = Option(diffDir.listFiles()).getOrElse(Array.empty) - .filter(_.getName.endsWith(".diff.txt")) - .sortBy(_.getName) - - val total = tpcdsQueries.length + tpcdsQueriesV2_7_0.length - // scalastyle:off println - println( - s""" - |=== TPCDS Cached-Plan Comparison: DSv2 vs Legacy ($total queries) === - | Changed plans : ${diffFiles.length} - | Unchanged : ${total - diffFiles.length} - | Output dir : ${outputDir.getAbsolutePath} - |""".stripMargin) - - if (diffFiles.nonEmpty) { - println("Queries with plan differences:") - diffFiles.foreach { f => - val name = f.getName.replace(".diff.txt", "") - println(s" - $name => ${f.getAbsolutePath}") - println(Files.readString(f.toPath)) - println() - } - } else { - println("No plan differences detected.") - } - // scalastyle:on println - } -} From 84a2678d360889e767d8f3b99a3558712def8213 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Tue, 21 Apr 2026 16:01:27 -0700 Subject: [PATCH 03/11] address review feedback: add config toggle and fix docs - Add spark.sql.inMemoryColumnarStorage.enableDatasourceV2 (default true) so users can fall back to the pre-DSv2 InMemoryRelation path - Wire the config into CacheManager.useCachedData and analyzeColumnCacheQuery - Fix InMemoryCacheTable/ScanBuilder scaladoc to accurately state that column pruning, filter pushdown, and sort-order propagation already work via InMemoryScans/pruneFilterProject; the genuinely new capabilities are DPP (SupportsRuntimeV2Filtering) and per-partition LIMIT pushdown - Update InMemoryScans comment to reference the actual config key --- .../apache/spark/sql/internal/SQLConf.scala | 13 +++++ .../spark/sql/execution/CacheManager.scala | 48 +++++++++++-------- .../spark/sql/execution/SparkStrategies.scala | 11 +++-- .../columnar/InMemoryCacheTable.scala | 22 +++++---- 4 files changed, 62 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 77ef8bb600f9c..f209180d44101 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -853,6 +853,17 @@ object SQLConf { .booleanConf .createWithDefault(true) + val IN_MEMORY_CACHE_ENABLE_DSV2 = + buildConf("spark.sql.inMemoryColumnarStorage.enableDatasourceV2") + .doc("When true (default), cached DataFrames are represented as DataSourceV2Relation " + + "nodes, enabling Dynamic Partition Pruning and per-partition LIMIT pushdown on cached " + + "data. Set to false to revert to the pre-DSv2 InMemoryRelation path, which still " + + "supports column pruning, filter pushdown, and sort-order propagation via the " + + "InMemoryScans planner strategy.") + .version("4.0.0") + .booleanConf + .createWithDefault(true) + val COLUMN_VECTOR_OFFHEAP_ENABLED = buildConf("spark.sql.columnVector.offheap.enabled") .internal() @@ -7984,6 +7995,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) + def inMemoryCacheEnableDSv2: Boolean = getConf(IN_MEMORY_CACHE_ENABLE_DSV2) + def inMemoryTableScanStatisticsEnabled: Boolean = getConf(IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED) def offHeapColumnVectorEnabled: Boolean = getConf(COLUMN_VECTOR_OFFHEAP_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 4bdd50d814cc8..a4c98858132a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -333,17 +333,19 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { cachedData: CachedData, column: Seq[Attribute]): Unit = { val relation = cachedData.cachedRepresentation - // Wrap in DataSourceV2Relation so the DSv2 planning path is used consistently - // (DataSourceV2Strategy handles InMemoryTableScanExec via InMemoryCacheScan). - val dsv2Relation = DataSourceV2Relation( - table = new InMemoryCacheTable(relation), - output = relation.output.map { case ar: AttributeReference => ar }, - catalog = None, - identifier = None, - options = CaseInsensitiveStringMap.empty() - ) + val planToAnalyze: LogicalPlan = if (SQLConf.get.inMemoryCacheEnableDSv2) { + DataSourceV2Relation( + table = new InMemoryCacheTable(relation), + output = relation.output.map { case ar: AttributeReference => ar }, + catalog = None, + identifier = None, + options = CaseInsensitiveStringMap.empty() + ) + } else { + relation + } val (rowCount, newColStats) = - CommandUtils.computeColumnStats(sparkSession, dsv2Relation, column) + CommandUtils.computeColumnStats(sparkSession, planToAnalyze, column) relation.updateStats(rowCount, newColStats) } @@ -504,6 +506,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { * plan must be normalized. */ private[sql] def useCachedData(plan: LogicalPlan): LogicalPlan = { + val dsv2Enabled = SQLConf.get.inMemoryCacheEnableDSv2 val newPlan = plan transformDown { case command: Command => command @@ -512,16 +515,21 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { // After cache lookup, we should still keep the hints from the input plan. val hints = EliminateResolvedHint.extractHintsFromPlan(currentFragment)._2 val cachedPlan = cached.cachedRepresentation.withOutput(currentFragment.output) - // Wrap the InMemoryRelation in a DataSourceV2Relation so that V2ScanRelationPushDown - // optimizer rules can apply column pruning, filter pushdown, and ordering/statistics - // reporting. Physical execution is still routed to InMemoryTableScanExec. - val substitutedPlan: LogicalPlan = DataSourceV2Relation( - table = new InMemoryCacheTable(cachedPlan), - output = cachedPlan.output.map { case ar: AttributeReference => ar }, - catalog = None, - identifier = None, - options = CaseInsensitiveStringMap.empty() - ) + // When DSv2 is enabled (default), wrap in DataSourceV2Relation so that + // V2ScanRelationPushDown can apply DPP and per-partition LIMIT pushdown. + // When disabled, return InMemoryRelation directly; InMemoryScans handles it via + // pruneFilterProject (column pruning, filter pushdown, sort-order propagation). + val substitutedPlan: LogicalPlan = if (dsv2Enabled) { + DataSourceV2Relation( + table = new InMemoryCacheTable(cachedPlan), + output = cachedPlan.output.map { case ar: AttributeReference => ar }, + catalog = None, + identifier = None, + options = CaseInsensitiveStringMap.empty() + ) + } else { + cachedPlan + } // The returned hint list is in top-down order, we should create the hint nodes from // right to left. hints.foldRight[LogicalPlan](substitutedPlan) { case (hint, p) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 218077fe5c6b7..a145cfb0ff4c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -705,10 +705,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } /** - * Fallback strategy for cached in-memory tables when the DSv2 cache path is disabled - * (spark.sql.inMemoryColumnarStorage.useDataSourceV2 = false). - * Under the default (DSv2) path InMemoryRelation is never exposed to the planner because - * CacheManager wraps it in DataSourceV2Relation before planning. + * Strategy for cached in-memory tables. + * + * Under the default DSv2 path (spark.sql.inMemoryColumnarStorage.enableDatasourceV2 = true), + * CacheManager wraps InMemoryRelation in DataSourceV2Relation, so this strategy is only + * reached when DSv2 is disabled. Both paths produce InMemoryTableScanExec and support column + * pruning, filter pushdown, and sort-order propagation; the DSv2 path additionally enables + * Dynamic Partition Pruning and per-partition LIMIT pushdown. */ object InMemoryScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala index 15861dc5fdcc4..37c5af36bf2c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala @@ -48,9 +48,14 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap /** - * A DSv2 [[Table]] wrapper around [[InMemoryRelation]], enabling [[V2ScanRelationPushDown]] - * optimizer rules to apply column pruning, filter pushdown, and ordering/statistics reporting - * to cached DataFrames. + * A DSv2 [[Table]] wrapper around [[InMemoryRelation]] that routes cached DataFrames through + * the [[V2ScanRelationPushDown]] optimizer batch. + * + * The pre-DSv2 [[InMemoryScans]] planner strategy already supports column pruning, filter + * pushdown, and sort-order propagation via [[pruneFilterProject]]. This wrapper adds two + * capabilities that are not available in the existing path: + * - Dynamic Partition Pruning via [[SupportsRuntimeV2Filtering]] + * - Per-partition LIMIT pushdown via [[SupportsPushDownLimit]] */ private[sql] class InMemoryCacheTable(val relation: InMemoryRelation) extends Table with SupportsRead { @@ -77,11 +82,12 @@ private[sql] class InMemoryCacheTable(val relation: InMemoryRelation) /** * DSv2 [[ScanBuilder]] for [[InMemoryRelation]]. * - * - Column pruning via [[SupportsPushDownRequiredColumns]]: only requested columns are - * passed to [[InMemoryTableScanExec]], reducing deserialization work. - * - Filter pushdown via [[SupportsPushDownV2Filters]]: predicates are recorded for - * batch-level pruning using per-batch min/max statistics, but all predicates are - * returned (category-2: still need post-scan row-level re-evaluation). + * Implements [[SupportsPushDownRequiredColumns]] and [[SupportsPushDownV2Filters]] so that + * [[V2ScanRelationPushDown]] records the pruned columns and predicates before building the scan. + * The pre-DSv2 path ([[InMemoryScans]]) already performs the same pruning via + * [[pruneFilterProject]], so these interfaces mirror existing behavior under the new path. + * [[SupportsPushDownLimit]] is the new addition: it enables per-partition LIMIT that is not + * available in [[InMemoryScans]]. */ private[sql] class InMemoryScanBuilder(relation: InMemoryRelation) extends ScanBuilder From da6ae80f23b9d19fadcf36908f89a0539ee94c76 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 28 May 2026 17:07:32 -0700 Subject: [PATCH 04/11] address review feedback: add config toggle and fix docs - Fix InMemoryTableScanExec pattern match in hive CachedTableSuite (now 5 fields) - Clarify filter() ScalaDoc: intentional no-op because DataSourceV2Strategy routes InMemoryCacheScan directly to InMemoryTableScanExec, bypassing BatchScanExec - Add comment in DataSourceV2Strategy explaining why scalar subquery filters are not extracted for InMemoryCacheScan (FilterExec handles them; filter() is never called) - Fix outputOrdering: use takeWhile instead of flatMap to emit only a valid leading prefix; skipping a non-AttributeReference key mid-sequence and emitting later keys would report a non-existent ordering and allow incorrect sort elimination Co-authored-by: DB Tsai --- .../columnar/InMemoryCacheTable.scala | 28 +++++++++++-------- .../datasources/v2/DataSourceV2Strategy.scala | 6 ++++ .../spark/sql/hive/CachedTableSuite.scala | 2 +- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala index 37c5af36bf2c3..903fec0401eb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala @@ -172,15 +172,18 @@ private[sql] class InMemoryCacheScan( /** * Converts the Catalyst sort ordering of the cached plan to V2 [[SortOrder]]s. - * Only attribute-reference based orderings whose column is present in [[prunedAttrs]] are - * emitted; sort keys that were pruned away are dropped so that [[V2ScanPartitioningAndOrdering]] - * does not attempt to resolve a column that is no longer in the scan output. + * The longest valid prefix is emitted: we stop at the first key that is either not an + * [[AttributeReference]] or was pruned from the output. Skipping a key in the middle and + * emitting later keys would report a non-existent ordering and allow the optimizer to + * eliminate required sorts. */ override def outputOrdering(): Array[V2SortOrder] = { val prunedNames = prunedAttrs.map(_.name).toSet - relation.outputOrdering.flatMap { - case CatalystSortOrder(attr: AttributeReference, direction, nullOrdering, _) - if prunedNames.contains(attr.name) => + relation.outputOrdering.iterator.takeWhile { + case CatalystSortOrder(attr: AttributeReference, _, _, _) => prunedNames.contains(attr.name) + case _ => false + }.map { + case CatalystSortOrder(attr: AttributeReference, direction, nullOrdering, _) => val v2Dir = direction match { case Ascending => V2SortDirection.ASCENDING case Descending => V2SortDirection.DESCENDING @@ -189,8 +192,7 @@ private[sql] class InMemoryCacheScan( case NullsFirst => V2NullOrdering.NULLS_FIRST case NullsLast => V2NullOrdering.NULLS_LAST } - Some(SortValue(FieldReference.column(attr.name), v2Dir, v2Nulls)) - case _ => None + SortValue(FieldReference.column(attr.name), v2Dir, v2Nulls): V2SortOrder }.toArray } @@ -229,10 +231,12 @@ private[sql] class InMemoryCacheScan( } /** - * No-op: runtime predicates for cached scans are handled entirely through - * [[InMemoryTableScanExec.runtimeFilters]], not through this interface method. - * The DPP pipeline injects [[DynamicPruning]] expressions into the plan, which - * [[DataSourceV2Strategy]] separates and passes as runtimeFilters to the exec node. + * Intentional no-op. The V2 contract expects `filter()` to prune [[InputPartition]]s before + * `Batch.planInputPartitions()`, but [[InMemoryCacheScan]] never goes through [[BatchScanExec]]. + * [[DataSourceV2Strategy]] special-cases [[InMemoryCacheScan]] and creates + * [[InMemoryTableScanExec]] directly, passing [[DynamicPruning]] expressions via + * `runtimeFilters`. We implement [[SupportsRuntimeV2Filtering]] only to expose + * [[filterAttributes]], which lets the optimizer inject DPP filters into the plan. */ override def filter(predicates: Array[V2Predicate]): Unit = {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 913cfd00cbc89..f9cb962df9b4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -167,6 +167,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat // batch-level min/max pruning at execution time; compile-time filters are passed // both to InMemoryTableScanExec (for batch pruning) and to withProjectAndFilter // (for row-level re-evaluation). The pushed limit is applied per-partition. + // + // Scalar subquery filters are NOT extracted here (unlike the generic + // DataSourceV2ScanRelation case below). In that case they are routed into runtimeFilters + // so BatchScanExec can call scan.filter() for SupportsRuntimeV2Filtering partition pruning. + // InMemoryCacheScan never goes through BatchScanExec, so filter() is never called; + // scalar subquery filters remain in compiledFilters and are applied by FilterExec above. val (runtimeFilters, compiledFilters) = filters.partition { case _: DynamicPruning => true case _ => false diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 02f70488c7ed6..9b44c18e33b1d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -38,7 +38,7 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton { def rddIdOf(tableName: String): Int = { val plan = table(tableName).queryExecution.sparkPlan plan.collect { - case InMemoryTableScanExec(_, _, relation) => + case InMemoryTableScanExec(_, _, relation, _, _) => relation.cacheBuilder.cachedColumnBuffers.id case _ => fail(s"Table $tableName is not cached\n" + plan) From d655a4ac63d27c3ed4d3823072962b2b93dcf131 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 28 May 2026 17:32:37 -0700 Subject: [PATCH 05/11] fix: correct DPP batch pruning, columnar limit, and numOutputRows metric Three bugs in InMemoryTableScanExec: 1. allPredicates: DynamicPruningExpression(InSubqueryExec) was passed directly to buildFilter, which cannot translate it to a min/max stats check, so DPP provided zero batch-level pruning. Fix: materialize the evaluated subquery result into In(child, literals) so buildFilter can skip batches. 2. doExecuteColumnar: limit was applied as result.mapPartitions(_.take(n)) on RDD[ColumnarBatch], which takes N batches (up to batchSize*N rows), not N rows. Fix: add limitBatchesToRows helper that truncates the last batch via ColumnarBatch.setNumRows so the per-partition limit is row-accurate. 3. numOutputRows metric was accumulated before the per-partition limit was applied, overcounting rows that LocalLimit would later discard. Fix: count rows/batches after the limit in both doExecute and doExecuteColumnar. Co-authored-by: DB Tsai --- .../columnar/InMemoryTableScanExec.scala | 60 ++++++++++++++----- 1 file changed, 46 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index ff3a8761ad50c..b89d52fa76ad1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.columnar.CachedBatch -import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.execution.{InSubqueryExec, LeafExecNode, SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.vectorized.ColumnarBatch @@ -143,9 +143,17 @@ case class InMemoryTableScanExec( private val inMemoryPartitionPruningEnabled = conf.inMemoryPartitionPruning - // After DPP subqueries are evaluated, DynamicPruningExpression wraps a concrete predicate. - // Merge compile-time predicates with evaluated runtime filters for batch-level pruning. + // After DPP subqueries are evaluated, DynamicPruningExpression wraps an InSubqueryExec whose + // result is populated. Convert it to In(child, literals) so buildFilter can skip batches via + // per-batch min/max column statistics. Without the conversion, buildFilter cannot translate + // InSubqueryExec and DPP provides no batch-level pruning. private def allPredicates: Seq[Expression] = predicates ++ runtimeFilters.map { + case DynamicPruningExpression(in: InSubqueryExec) => + in.values() match { + case Some(values) if values.nonEmpty => + In(in.child, values.map(v => Literal.create(v, in.child.dataType)).toSeq) + case _ => Literal.TrueLiteral + } case DynamicPruningExpression(e) => e case e => e } @@ -175,8 +183,7 @@ case class InMemoryTableScanExec( val relOutput = relation.output val serializer = relation.cacheBuilder.serializer - // update SQL metrics - val withMetrics = + val withAccumulators = filteredCachedBatches().mapPartitionsInternal { iter => if (enableAccumulatorsForTest && iter.hasNext) { readPartitions.add(1) @@ -185,28 +192,53 @@ case class InMemoryTableScanExec( if (enableAccumulatorsForTest) { readBatches.add(1) } - numOutputRows += batch.numRows batch } } - val result = - serializer.convertCachedBatchToInternalRow(withMetrics, relOutput, attributes, conf) - limit.fold(result)(n => result.mapPartitions(_.take(n))) + val rows = + serializer.convertCachedBatchToInternalRow(withAccumulators, relOutput, attributes, conf) + // Apply the per-partition limit before counting output rows so numOutputRows reflects + // what actually leaves the scan, not what was deserialized before LocalLimit truncates. + val result = limit.fold(rows)(n => rows.mapPartitions(_.take(n))) + result.mapPartitions(_.map { row => numOutputRows += 1; row }) + } + + // Limits a ColumnarBatch iterator to at most maxRows rows, truncating the last batch if needed. + // Iterator.take(n) on RDD[ColumnarBatch] would take n batches (not n rows); this corrects that. + private def limitBatchesToRows( + iter: Iterator[ColumnarBatch], maxRows: Int): Iterator[ColumnarBatch] = { + var remaining = maxRows + new Iterator[ColumnarBatch] { + override def hasNext: Boolean = remaining > 0 && iter.hasNext + override def next(): ColumnarBatch = { + val batch = iter.next() + val n = batch.numRows() + if (n <= remaining) { + remaining -= n + batch + } else { + batch.setNumRows(remaining) + remaining = 0 + batch + } + } + } } protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { // Resulting RDD is cached and reused by SparkPlan.executeColumnarRDD val numOutputRows = longMetric("numOutputRows") - val buffers = filteredCachedBatches() - val result = relation.cacheBuilder.serializer.convertCachedBatchToColumnarBatch( - buffers, + val batches = relation.cacheBuilder.serializer.convertCachedBatchToColumnarBatch( + filteredCachedBatches(), relation.output, attributes, - conf).map { cb => + conf) + // Apply row-accurate per-partition limit before counting output rows. + val result = limit.fold(batches)(n => batches.mapPartitions(limitBatchesToRows(_, n))) + result.map { cb => numOutputRows += cb.numRows() cb } - limit.fold(result)(n => result.mapPartitions(_.take(n))) } override def isMaterialized: Boolean = relation.cacheBuilder.isCachedColumnBuffersLoaded From 8f0e91b708c83b23112ff160e1f15d21a243f79e Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 28 May 2026 17:43:53 -0700 Subject: [PATCH 06/11] fix: use FalseLiteral for empty DPP broadcast in allPredicates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the DPP subquery returns zero results (empty broadcast side of the join), IN () is always false — every batch should be skippable. Using TrueLiteral was semantically wrong: it told buildFilter there was no constraint, causing all batches to be scanned even though the join will produce no output rows. Co-authored-by: DB Tsai --- .../spark/sql/execution/columnar/InMemoryTableScanExec.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index b89d52fa76ad1..8936bd050e2c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -152,7 +152,9 @@ case class InMemoryTableScanExec( in.values() match { case Some(values) if values.nonEmpty => In(in.child, values.map(v => Literal.create(v, in.child.dataType)).toSeq) - case _ => Literal.TrueLiteral + // Empty broadcast side: IN () is always false — signal to buildFilter that every + // batch can be skipped rather than treating it as an unconstrained TrueLiteral. + case _ => Literal.FalseLiteral } case DynamicPruningExpression(e) => e case e => e From 0657dc31bd08840f386bc8674581bce1ad7cf7e3 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 28 May 2026 22:43:18 -0700 Subject: [PATCH 07/11] fix: DataSourceV2ScanRelation pattern missing 6th field (pushedFilters) DataSourceV2ScanRelation has 6 fields (relation, scan, output, keyGroupedPartitioning, ordering, pushedFilters) but the InMemoryCacheScan arm only destructured 5, causing a compile error after master added pushedFilters. Co-authored-by: DB Tsai --- .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index f9cb962df9b4a..3d32317ad680b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -161,7 +161,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat project, filters, localScanExec, needsUnsafeConversion = false) :: Nil case PhysicalOperation(project, filters, - DataSourceV2ScanRelation(_, scan: InMemoryCacheScan, output, _, _)) => + DataSourceV2ScanRelation(_, scan: InMemoryCacheScan, output, _, _, _)) => // Route cached DataFrames back to InMemoryTableScanExec, preserving the optimized // columnar path. DynamicPruning expressions are separated into runtimeFilters for // batch-level min/max pruning at execution time; compile-time filters are passed From 1bcc054a6877389ee82c29ea512d55c31cc95ba9 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Fri, 29 May 2026 16:10:59 -0700 Subject: [PATCH 08/11] temp --- project/build.properties | 2 +- .../columnar/InMemoryCacheDSv2Benchmark.scala | 74 +++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/project/build.properties b/project/build.properties index dbfaf7127844c..8302cc986b157 100644 --- a/project/build.properties +++ b/project/build.properties @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -sbt.version=1.12.8 +sbt.version=1.12.4 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheDSv2Benchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheDSv2Benchmark.scala index 806c597c70356..de65c86c9977a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheDSv2Benchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheDSv2Benchmark.scala @@ -155,6 +155,74 @@ object InMemoryCacheDSv2Benchmark extends SqlBasedBenchmark { df.unpersist() } + /** + * Benchmarks per-partition LIMIT pushdown on the columnar cache path. + * The DSv2 path pushes the limit into the scan, so each partition stops deserializing + * batches once it has produced enough rows, rather than scanning everything. + * Compares DSv2 (limit pushed) vs the fallback InMemoryScans path (no limit pushdown). + */ + def limitPushdownBenchmark(): Unit = { + val df = spark.range(numRows).select( + col("id").alias("c0"), + col("id").alias("c1") + ).cache() + df.count() + + val benchmark = new Benchmark( + s"LIMIT pushdown on $numRows rows", + numRows, output = output) + + benchmark.addCase("LIMIT 100 - DSv2 (limit pushed into scan)") { _ => + withSQLConf(SQLConf.IN_MEMORY_CACHE_ENABLE_DSV2.key -> "true") { + df.limit(100).count() + } + } + + benchmark.addCase("LIMIT 100 - fallback InMemoryScans (no limit pushdown)") { _ => + withSQLConf(SQLConf.IN_MEMORY_CACHE_ENABLE_DSV2.key -> "false") { + df.limit(100).count() + } + } + + benchmark.addCase("full scan baseline (no LIMIT)") { _ => + df.count() + } + + benchmark.run() + df.unpersist() + } + + /** + * Compares the DSv2 cache path vs the pre-DSv2 InMemoryScans fallback for a + * representative workload: aggregate on a wide table with column pruning. + * Measures the net benefit (pruning savings) vs overhead (extra optimizer rules). + */ + def dsv2VsFallbackBenchmark(): Unit = { + val df = spark.range(numRows).select( + (0 until 10).map(i => (col("id") + i).alias(s"c$i")): _* + ).cache() + df.count() + + val benchmark = new Benchmark( + s"DSv2 vs fallback - $numRows rows, 10 cols, sum 2 cols", + numRows, output = output) + + benchmark.addCase("DSv2 path (enableDatasourceV2=true)") { _ => + withSQLConf(SQLConf.IN_MEMORY_CACHE_ENABLE_DSV2.key -> "true") { + df.select("c0", "c1").agg(sum("c0") + sum("c1")).collect() + } + } + + benchmark.addCase("fallback path (enableDatasourceV2=false)") { _ => + withSQLConf(SQLConf.IN_MEMORY_CACHE_ENABLE_DSV2.key -> "false") { + df.select("c0", "c1").agg(sum("c0") + sum("c1")).collect() + } + } + + benchmark.run() + df.unpersist() + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { // AQE off for deterministic planning withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { @@ -170,6 +238,12 @@ object InMemoryCacheDSv2Benchmark extends SqlBasedBenchmark { runBenchmark("In-memory cache: end-to-end aggregate") { endToEndAggregateBenchmark() } + runBenchmark("In-memory cache: LIMIT pushdown") { + limitPushdownBenchmark() + } + runBenchmark("In-memory cache: DSv2 vs fallback path") { + dsv2VsFallbackBenchmark() + } } } } From 119138590d07c3a02253e4558408a0e9553b80a8 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 4 Jun 2026 15:56:16 -0700 Subject: [PATCH 09/11] fix: complete DataSourceV2ScanRelation 6-arg pattern and self-review cleanup - InMemoryCacheTable: add missing 6th field (pushedFilters) to the DataSourceV2ScanRelation pattern in CachedRelation.unapply. This was the sole error in CI's main-source precompile; the prior fix missed this site. - build.properties: restore sbt.version to 1.12.8 (a local 1.12.4 downgrade had leaked into the WIP history). - InMemoryTableScanExec: replace a non-ASCII em-dash in a comment. - InMemoryCacheDSv2Benchmark: drop unused numIters; add jdk21 results. Co-authored-by: Isaac --- project/build.properties | 2 +- ...MemoryCacheDSv2Benchmark-jdk21-results.txt | 46 +++++++++++++++++++ .../columnar/InMemoryCacheTable.scala | 2 +- .../columnar/InMemoryTableScanExec.scala | 2 +- .../columnar/InMemoryCacheDSv2Benchmark.scala | 1 - 5 files changed, 49 insertions(+), 4 deletions(-) create mode 100644 sql/core/benchmarks/InMemoryCacheDSv2Benchmark-jdk21-results.txt diff --git a/project/build.properties b/project/build.properties index 8302cc986b157..dbfaf7127844c 100644 --- a/project/build.properties +++ b/project/build.properties @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -sbt.version=1.12.4 +sbt.version=1.12.8 diff --git a/sql/core/benchmarks/InMemoryCacheDSv2Benchmark-jdk21-results.txt b/sql/core/benchmarks/InMemoryCacheDSv2Benchmark-jdk21-results.txt new file mode 100644 index 0000000000000..e9f810ce0e306 --- /dev/null +++ b/sql/core/benchmarks/InMemoryCacheDSv2Benchmark-jdk21-results.txt @@ -0,0 +1,46 @@ +================================================================================================ +In-memory cache: column pruning +================================================================================================ + +OpenJDK 64-Bit Server VM 21.0.9 on Mac OS X 26.4 +Apple M3 Max +Column pruning - 1000000 rows, 10 cols, select 2: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +sum 2 of 10 cols (column pruning via DSv2) 19 24 4 52.2 19.2 1.0X +sum all 10 cols (no pruning - pre-DSv2 baseline) 54 57 3 18.4 54.5 0.4X + + +================================================================================================ +In-memory cache: filter pushdown +================================================================================================ + +OpenJDK 64-Bit Server VM 21.0.9 on Mac OS X 26.4 +Apple M3 Max +Filter pushdown - 1000000 rows, selective filter (c0 < 1000): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------------- +filter c0 < 1000 (pushed to scan, batch pruning) 7 10 2 151.3 6.6 1.0X +filter c0 < 1000 (count with full scan for comparison) 7 10 2 141.4 7.1 0.9X + + +================================================================================================ +In-memory cache: planning overhead +================================================================================================ + +OpenJDK 64-Bit Server VM 21.0.9 on Mac OS X 26.4 +Apple M3 Max +Planning overhead - 1000 plan-only iterations: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------- +optimizedPlan (DSv2 path, V2ScanRelationPushDown) 262 323 55 0.0 261916.9 1.0X + + +================================================================================================ +In-memory cache: end-to-end aggregate +================================================================================================ + +OpenJDK 64-Bit Server VM 21.0.9 on Mac OS X 26.4 +Apple M3 Max +End-to-end aggregate (groupBy + sum) on 1000000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------ +groupBy(key).sum(val) - DSv2 path 26 29 3 38.3 26.1 1.0X + + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala index 903fec0401eb7..2a3ebbc339961 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala @@ -299,7 +299,7 @@ object CachedRelation { def unapply(plan: LogicalPlan): Option[InMemoryRelation] = plan match { case mem: InMemoryRelation => Some(mem) case DataSourceV2Relation(table: InMemoryCacheTable, _, _, _, _, _) => Some(table.relation) - case DataSourceV2ScanRelation(_, scan: InMemoryCacheScan, _, _, _) => Some(scan.relation) + case DataSourceV2ScanRelation(_, scan: InMemoryCacheScan, _, _, _, _) => Some(scan.relation) case _ => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 8936bd050e2c8..80d06bbefcafa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -152,7 +152,7 @@ case class InMemoryTableScanExec( in.values() match { case Some(values) if values.nonEmpty => In(in.child, values.map(v => Literal.create(v, in.child.dataType)).toSeq) - // Empty broadcast side: IN () is always false — signal to buildFilter that every + // Empty broadcast side: IN () is always false - signal to buildFilter that every // batch can be skipped rather than treating it as an unconstrained TrueLiteral. case _ => Literal.FalseLiteral } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheDSv2Benchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheDSv2Benchmark.scala index de65c86c9977a..b0df8cc5d0a0e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheDSv2Benchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheDSv2Benchmark.scala @@ -39,7 +39,6 @@ import org.apache.spark.sql.internal.SQLConf object InMemoryCacheDSv2Benchmark extends SqlBasedBenchmark { private val numRows = 1000000 - private val numIters = 5 /** * Benchmarks column pruning: reading 2 of 10 columns from a cached wide table. From 5795f2cc4e350c224c79ffe5d3f746151f9e85a3 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 4 Jun 2026 16:34:50 -0700 Subject: [PATCH 10/11] fix: drop colliding pushedPredicates field on InMemoryCacheScan master added a concrete `pushedPredicates()` default method to SupportsRuntimeV2Filtering (since 4.2.0), which collides with the InMemoryCacheScan constructor val of the same name and requires an `override`. The two are semantically distinct: the inherited method reports runtime-pushed predicates (from filter()), while the field held compile-time predicates recorded by the builder. Since filter() is a no-op for this scan and the field was never read, remove it and let the inherited empty default stand, which is the correct behavior. Co-authored-by: Isaac --- .../spark/sql/execution/columnar/InMemoryCacheTable.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala index 2a3ebbc339961..db7145f041c35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala @@ -135,7 +135,7 @@ private[sql] class InMemoryScanBuilder(relation: InMemoryRelation) val prunedAttrs = if (requiredFieldNames == relation.output.map(_.name).toSet) relation.output else relation.output.filter(a => requiredFieldNames.contains(a.name)) - new InMemoryCacheScan(relation, prunedAttrs, _pushedPredicates, _pushedLimit) + new InMemoryCacheScan(relation, prunedAttrs, _pushedLimit) } } @@ -160,7 +160,6 @@ private[sql] class InMemoryScanBuilder(relation: InMemoryRelation) private[sql] class InMemoryCacheScan( val relation: InMemoryRelation, val prunedAttrs: Seq[Attribute], - val pushedPredicates: Array[V2Predicate], val pushedLimit: Option[Int] = None) extends Scan with SupportsReportOrdering @@ -237,6 +236,10 @@ private[sql] class InMemoryCacheScan( * [[InMemoryTableScanExec]] directly, passing [[DynamicPruning]] expressions via * `runtimeFilters`. We implement [[SupportsRuntimeV2Filtering]] only to expose * [[filterAttributes]], which lets the optimizer inject DPP filters into the plan. + * + * Because `filter()` accepts nothing, `pushedPredicates()` correctly uses the inherited + * empty default; we intentionally do not expose the compile-time predicates recorded by + * [[InMemoryScanBuilder]] here, as those are a different concept from runtime-pushed ones. */ override def filter(predicates: Array[V2Predicate]): Unit = {} From 6caaffd1cd5a1d6fbfef9e7b9a10b66f35228523 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Thu, 4 Jun 2026 16:47:49 -0700 Subject: [PATCH 11/11] fix: make InMemoryCacheScan.outputOrdering match exhaustive The .map over the takeWhile-filtered sort keys deconstructed SortOrder with a nested AttributeReference type-test and no catch-all, which Scala flags as a non-exhaustive match - fatal under Spark's -Wconf. Fold the filter and the conversion into a single total map that yields Option, then take the longest defined prefix, preserving the original "longest valid prefix" semantics. Co-authored-by: Isaac --- .../execution/columnar/InMemoryCacheTable.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala index db7145f041c35..571744c7e3ce7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala @@ -178,11 +178,11 @@ private[sql] class InMemoryCacheScan( */ override def outputOrdering(): Array[V2SortOrder] = { val prunedNames = prunedAttrs.map(_.name).toSet - relation.outputOrdering.iterator.takeWhile { - case CatalystSortOrder(attr: AttributeReference, _, _, _) => prunedNames.contains(attr.name) - case _ => false - }.map { - case CatalystSortOrder(attr: AttributeReference, direction, nullOrdering, _) => + // Map each key to its V2 form, stopping at the first key that is either not an + // AttributeReference or was pruned (the takeWhile on the longest defined prefix). + relation.outputOrdering.iterator.map { + case CatalystSortOrder(attr: AttributeReference, direction, nullOrdering, _) + if prunedNames.contains(attr.name) => val v2Dir = direction match { case Ascending => V2SortDirection.ASCENDING case Descending => V2SortDirection.DESCENDING @@ -191,8 +191,9 @@ private[sql] class InMemoryCacheScan( case NullsFirst => V2NullOrdering.NULLS_FIRST case NullsLast => V2NullOrdering.NULLS_LAST } - SortValue(FieldReference.column(attr.name), v2Dir, v2Nulls): V2SortOrder - }.toArray + Some(SortValue(FieldReference.column(attr.name), v2Dir, v2Nulls): V2SortOrder) + case _ => None + }.takeWhile(_.isDefined).map(_.get).toArray } /**