[WIP][SPARK-56479][SQL] df.cache() with DSv2 interfaces to enable V2ScanRelationPushDown optimizer rules#55017
[WIP][SPARK-56479][SQL] df.cache() with DSv2 interfaces to enable V2ScanRelationPushDown optimizer rules#55017dbtsai wants to merge 11 commits into
Conversation
Currently InMemoryTableScanExec has column pruning already. It only deserializes necessary column. |
szehon-ho
left a comment
There was a problem hiding this comment.
Thanks for the PR! I spent some time studying the approach. Before going deeper into implementation details, I'd like to step back and discuss whether this is the right architecture.
I mapped out which capabilities are genuinely new vs. already provided by the existing InMemoryScans + InMemoryTableScanExec path:
| Capability | Already works today? | How it works today |
|---|---|---|
| Column pruning | Yes | InMemoryScans uses pruneFilterProject → InMemoryTableScanExec(attributes=prunedCols) → serializer only deserializes requested columns |
| Batch-level filter pruning (min/max) | Yes | Filters passed to InMemoryTableScanExec.predicates → CachedBatchSerializer.buildFilter |
| Sort order propagation | Yes | InMemoryTableScanExec.outputOrdering derives from cachedPlan.outputOrdering |
| Statistics reporting | Yes | InMemoryRelation.computeStats() reports accumulator-based size/rowCount after materialization |
| Dynamic Partition Pruning | No | PartitionPruning.getFilterableTableScan has no InMemoryRelation case |
| Per-partition limit pushdown | No | Not supported today |
The DSv2 wrapping adds the last two, but comes with tradeoffs:
- Stats behavioral change:
InMemoryCacheScan.estimateStatistics()applies a column-ratio scaling heuristic (sizeInBytes * prunedAttrs.size / output.size) that doesn't exist today. CBO sees different statistics, which could affect join strategy selection. - EXPLAIN / plan shape change:
withCachedDatashowsDataSourceV2Relationinstead ofInMemoryRelation;optimizedPlanshowsDataSourceV2ScanRelation. This is user-visible and could affect tooling or downstream code that inspects plans.
Given that the two genuinely new capabilities (DPP, limit pushdown) could alternatively be added directly to InMemoryScans / InMemoryTableScanExec with much less surface area, do we want to proceed with the DSv2 wrapping approach? The main argument for it would be future-proofing (new V2 capabilities auto-apply), but the tradeoff is coupling the in-memory cache to the full V2 contract surface area.
Would it make sense to either:
- Add DPP + limit support directly to the existing path (targeted, low-risk), or
- If we prefer the DSv2 approach for extensibility, add a config toggle so
InMemoryScansstays alive as a fallback?
|
|
||
| /** | ||
| * Fallback strategy for cached in-memory tables when the DSv2 cache path is disabled | ||
| * (spark.sql.inMemoryColumnarStorage.useDataSourceV2 = false). |
There was a problem hiding this comment.
This config spark.sql.inMemoryColumnarStorage.useDataSourceV2 doesn't exist anywhere in the codebase. Also, after this PR InMemoryScans becomes unreachable dead code since CacheManager always wraps in DataSourceV2Relation — should we add a config toggle, or remove this strategy?
There was a problem hiding this comment.
Fixed in da6ae80 — added spark.sql.inMemoryColumnarStorage.enableDatasourceV2 (default true) to SQLConf. The InMemoryScans ScalaDoc now explicitly states it is only reached when the config is false, and documents that both paths produce InMemoryTableScanExec with the same column-pruning, filter-pushdown, and sort-order semantics — the DSv2 path additionally enables DPP and per-partition LIMIT pushdown.
This comment was generated with GitHub MCP.
| @transient relation: InMemoryRelation) | ||
| @transient relation: InMemoryRelation, | ||
| limit: Option[Int] = None, | ||
| runtimeFilters: Seq[Expression] = Nil) |
There was a problem hiding this comment.
The case class now has 5 fields — sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala line 42 still has InMemoryTableScanExec(_, _, relation) which won't compile.
There was a problem hiding this comment.
Fixed in da6ae80 — updated the pattern to InMemoryTableScanExec(_, _, relation, _, _) to match the new 5-field case class.
This comment was generated with GitHub MCP.
| * The DPP pipeline injects [[DynamicPruning]] expressions into the plan, which | ||
| * [[DataSourceV2Strategy]] separates and passes as runtimeFilters to the exec node. | ||
| */ | ||
| override def filter(predicates: Array[V2Predicate]): Unit = {} |
There was a problem hiding this comment.
filter() is a no-op — runtime filtering is actually routed through InMemoryTableScanExec.runtimeFilters instead. This deviates from the V2 contract where filter() is expected to prune InputPartitions.
There was a problem hiding this comment.
Updated the ScalaDoc in da6ae80 to make the deviation explicit: filter() is an intentional no-op because InMemoryCacheScan never goes through BatchScanExec — DataSourceV2Strategy special-cases it and creates InMemoryTableScanExec directly, passing DynamicPruning expressions via runtimeFilters. We implement SupportsRuntimeV2Filtering solely to expose filterAttributes(), which is what lets the optimizer inject DPP filters into the plan.
This comment was generated with GitHub MCP.
| DataSourceV2Strategy.withProjectAndFilter( | ||
| project, filters, localScanExec, needsUnsafeConversion = false) :: Nil | ||
|
|
||
| case PhysicalOperation(project, filters, |
There was a problem hiding this comment.
InMemoryCacheScan implements SupportsRuntimeV2Filtering and reports filterAttributes(), but this special case doesn't extract scalar subquery filters on those attributes the way the generic DataSourceV2ScanRelation case does (lines 168-183 on master). Should these be handled consistently?
There was a problem hiding this comment.
Added a comment in da6ae80 explaining why scalar subquery filters are intentionally not extracted here: InMemoryCacheScan never goes through BatchScanExec, so scan.filter() is never called for partition pruning. Scalar subquery filters stay in compiledFilters and are applied by FilterExec above the scan — the same post-scan re-evaluation path used by all other compile-time filters.
This comment was generated with GitHub MCP.
|
just to clarify, it looks like a migration and pointing out some risk. (im ok to do it, if we feel its worth to do) |
|
Thanks @szehon-ho and @viirya for the detailed review — you're both right that my original description overstated the gaps. After double-checking with
I've pushed a follow-up commit that:
Regarding the two tradeoffs you flagged:
Does this approach address the architecture concern, or would you prefer I pursue option 1 (add DPP + limit directly to |
- Add spark.sql.inMemoryColumnarStorage.enableDatasourceV2 (default true) so users can fall back to the pre-DSv2 InMemoryRelation path - Wire the config into CacheManager.useCachedData and analyzeColumnCacheQuery - Fix InMemoryCacheTable/ScanBuilder scaladoc to accurately state that column pruning, filter pushdown, and sort-order propagation already work via InMemoryScans/pruneFilterProject; the genuinely new capabilities are DPP (SupportsRuntimeV2Filtering) and per-partition LIMIT pushdown - Update InMemoryScans comment to reference the actual config key
- Fix InMemoryTableScanExec pattern match in hive CachedTableSuite (now 5 fields) - Clarify filter() ScalaDoc: intentional no-op because DataSourceV2Strategy routes InMemoryCacheScan directly to InMemoryTableScanExec, bypassing BatchScanExec - Add comment in DataSourceV2Strategy explaining why scalar subquery filters are not extracted for InMemoryCacheScan (FilterExec handles them; filter() is never called) - Fix outputOrdering: use takeWhile instead of flatMap to emit only a valid leading prefix; skipping a non-AttributeReference key mid-sequence and emitting later keys would report a non-existent ordering and allow incorrect sort elimination Co-authored-by: DB Tsai
|
Sorry for the delay — the changes are now pushed (commit da6ae80 on Config toggle: Migration strategy: Both the DSv2 path and the fallback path ultimately produce I'm happy to flip the default to This comment was generated with GitHub MCP. |
Three bugs in InMemoryTableScanExec: 1. allPredicates: DynamicPruningExpression(InSubqueryExec) was passed directly to buildFilter, which cannot translate it to a min/max stats check, so DPP provided zero batch-level pruning. Fix: materialize the evaluated subquery result into In(child, literals) so buildFilter can skip batches. 2. doExecuteColumnar: limit was applied as result.mapPartitions(_.take(n)) on RDD[ColumnarBatch], which takes N batches (up to batchSize*N rows), not N rows. Fix: add limitBatchesToRows helper that truncates the last batch via ColumnarBatch.setNumRows so the per-partition limit is row-accurate. 3. numOutputRows metric was accumulated before the per-partition limit was applied, overcounting rows that LocalLimit would later discard. Fix: count rows/batches after the limit in both doExecute and doExecuteColumnar. Co-authored-by: DB Tsai
When the DPP subquery returns zero results (empty broadcast side of the join), IN () is always false — every batch should be skippable. Using TrueLiteral was semantically wrong: it told buildFilter there was no constraint, causing all batches to be scanned even though the join will produce no output rows. Co-authored-by: DB Tsai
DataSourceV2ScanRelation has 6 fields (relation, scan, output, keyGroupedPartitioning, ordering, pushedFilters) but the InMemoryCacheScan arm only destructured 5, causing a compile error after master added pushedFilters. Co-authored-by: DB Tsai
…cleanup - InMemoryCacheTable: add missing 6th field (pushedFilters) to the DataSourceV2ScanRelation pattern in CachedRelation.unapply. This was the sole error in CI's main-source precompile; the prior fix missed this site. - build.properties: restore sbt.version to 1.12.8 (a local 1.12.4 downgrade had leaked into the WIP history). - InMemoryTableScanExec: replace a non-ASCII em-dash in a comment. - InMemoryCacheDSv2Benchmark: drop unused numIters; add jdk21 results. Co-authored-by: Isaac
master added a concrete `pushedPredicates()` default method to SupportsRuntimeV2Filtering (since 4.2.0), which collides with the InMemoryCacheScan constructor val of the same name and requires an `override`. The two are semantically distinct: the inherited method reports runtime-pushed predicates (from filter()), while the field held compile-time predicates recorded by the builder. Since filter() is a no-op for this scan and the field was never read, remove it and let the inherited empty default stand, which is the correct behavior. Co-authored-by: Isaac
The .map over the takeWhile-filtered sort keys deconstructed SortOrder with a nested AttributeReference type-test and no catch-all, which Scala flags as a non-exhaustive match - fatal under Spark's -Wconf. Fold the filter and the conversion into a single total map that yields Option, then take the longest defined prefix, preserving the original "longest valid prefix" semantics. Co-authored-by: Isaac
⏺ ## What changes were proposed in this pull request?
df.cache()is backed byInMemoryRelation, a pre-DSv2 logical plan node. Queries on cachedDataFrames skip the entire
V2ScanRelationPushDownoptimizer batch, losing column pruning, filterpushdown, sort-order propagation, and standard statistics reporting that all DSv2-backed sources
benefit from.
This PR introduces a thin DSv2 wrapper layer —
InMemoryCacheTable,InMemoryScanBuilder, andInMemoryCacheScan— in a new fileInMemoryCacheTable.scala.CacheManager.useCachedData()nowsubstitutes matching plan fragments with
DataSourceV2Relation(InMemoryCacheTable)instead of bareInMemoryRelation, so theV2ScanRelationPushDownbatch fires on cached DataFrames just as it doesfor any other DSv2 source.
The DSv2 interfaces implemented:
SupportsPushDownRequiredColumnsInMemoryTableScanExecdeserializes only the requested columnsSupportsPushDownV2FiltersCachedBatchSerializer.buildFilter; all predicates are returned (category-2) so a post-scanFilterExecis always added forSupportsReportOrderingV2ScanPartitioningAndOrdering, eliminating redundant sorts on ordered cached dataSupportsReportStatisticsANALYZE) are reported to the optimizer and AQE via the standard V2 pathPhysical execution is fully preserved. A new case in
DataSourceV2StrategyinterceptsDataSourceV2ScanRelation(InMemoryCacheScan)before the genericBatchScanExecpath and routes itback to
InMemoryTableScanExec. No change to the columnar scan hot path.A
CachedRelationextractor object matches any of the three plan forms a cached relation takesacross query stages —
InMemoryRelation,DataSourceV2Relation(InMemoryCacheTable), andDataSourceV2ScanRelation(InMemoryCacheScan)— providing backward compatibility for all existingpattern-match sites.
Why are the changes needed?
Without this change, every query on a cached DataFrame ignores standard optimizer rules that every
DSv2 source benefits from. The most impactful gap is column pruning: even
SELECT one_col FROM cached_wide_tablecauses all columns to be deserialized from the in-memorycolumnar store.
Does this PR introduce any user-facing change?
Yes, in a positive way:
ANALYZEcolumn statistics API for cached queries now correctly propagates stats through theoptimized plan.
The
InMemoryRelationtype is still used internally and visible inCachedData.cachedRepresentation.The change is transparent:
df.cache(),spark.catalog.cacheTable(), and related APIs behaveidentically from the user's perspective.
How was this patch tested?
All existing tests in
CachedTableSuite(104 tests),InMemoryColumnarQuerySuite,DatasetCacheSuite,UDFSuite,DataSourceV2SQLSuite, andLogicalPlanTagInSparkPlanSuitepasswithout modification to test logic (only callsite updates to use the new
CachedRelationextractor).A new benchmark
InMemoryCacheDSv2Benchmarkwas added. Results on 1M rows (AQE off, singlepartition):
Column pruning - 1000000 rows, 10 cols, select 2:
sum 2 of 10 cols (column pruning via DSv2) 19ms 1.0X
sum all 10 cols (no pruning - pre-DSv2) 54ms 0.3X (~2.8x speedup)
Planning overhead - 1000 plan-only iterations:
optimizedPlan (DSv2 path, V2ScanRelationPushDown) 254ms total (~0.25ms/plan)
Column pruning yields a ~3x speedup when accessing 2 of 10 cached columns. Planning overhead
from the additional optimizer rules is ~0.25ms per query, negligible in practice.