[FLINK-32940] [SQL] Unnest projection and filter pushdown#28127
Open
venkata91 wants to merge 14 commits intoapache:masterfrom
Open
[FLINK-32940] [SQL] Unnest projection and filter pushdown#28127venkata91 wants to merge 14 commits intoapache:masterfrom
venkata91 wants to merge 14 commits intoapache:masterfrom
Conversation
Collaborator
…late Add two transpose rules that fire when the right input of a Correlate is a LogicalTableFunctionScan wrapping INTERNAL_UNNEST_ROWS or INTERNAL_UNNEST_ROWS_WITH_ORDINALITY (the post-LogicalUnnestRule shape): - FlinkProjectCorrelateUnnestTransposeRule: subclass of Calcite's ProjectCorrelateTransposeRule; pushes a Project below the Correlate so unused left-side columns are pruned before UNNEST expansion. - FlinkFilterCorrelateUnnestTransposeRule: subclass of Calcite's FilterCorrelateRule; splits a Filter into left-only / right-only / mixed predicates. LEFT correlate semantics are preserved (right-only predicates stay above) via JoinRelType.LEFT.canPushRightFromAbove() == false. - UnnestRuleUtil.isUnnestCorrelate() shared guard handles HepRelVertex and RelSubset unwrapping. Both rules registered in FILTER_RULES and PROJECT_RULES of FlinkBatchRuleSets and FlinkStreamRuleSets, so they propagate into FILTER_PREPARE_RULES and LOGICAL_OPT_RULES automatically. New test FlinkProjectFilterCorrelateUnnestTransposeRuleTest covers project pruning, left-only / right-only / mixed filters for INNER and LEFT correlates, filter on the array column, ordinality, and array-of-rows. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
After the new transpose rules were added in the previous commit, 17 existing UnnestTest cases (8 batch, 9 stream) now produce tighter plans where unused source columns are pruned via a Calc below the Correlate, and correspondingly narrower Correlate row types (some queries also drop the top Calc when it becomes identity). Also seeds FlinkProjectFilterCorrelateUnnestTransposeRuleTest.xml with the golden plans for the 10 new cases. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pushdown tests The Project pushdown rule introduced in earlier commits caused runtime correctness regressions in UnnestITCase: pruning the Correlate's left input shifted field indices that the right-side LogicalTableFunctionScan's RexCall references via $cor0.X, but Calcite's RelNodesExprsHandler skips TableFunctionScan (its visit dispatches through visit(RelNode), not visit(TableFunctionScan), because LogicalTableFunctionScan does not override accept(RelShuttle)). The naive fix (force-applying the rex shuttle to leaf nodes) interacted badly with PushProjector when the right side already had a LogicalProject(k, v) wrapper, producing plans that returned the wrong column. Reverting the Project rule for now; pursuing it correctly requires reworking PushProjector's interaction with UNNEST-shaped right children. The Filter rule remains and is sufficient to drive predicate pushdown end-to-end into table sources. Coverage added: - UnnestSourcePushDownTest: 7 plan tests confirming filter pushdown into TestValuesTableFactory sources via 'filterable-fields'. Exercises left-only, right-only, mixed, LEFT correlate, and projection-pushdown-still-works scenarios. - UnnestSourcePushDownITCase: 6 batch ITCases verifying e2e correctness with source-level filter and projection pushdown. Includes a LEFT-correlate case that confirms right-only filters don't reach the source (LEFT semantics preserved). Test status: 281/281 pass across UnnestITCase (batch + stream), UnnestTest (batch + stream), LogicalUnnestRuleTest, FlinkProjectFilterCorrelateUnnestTransposeRuleTest, UnnestSourcePushDownTest, UnnestSourcePushDownITCase, and the *Correlate*Test sweep (lookup, temporal, Python, async, restore). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reintroduces project pushdown through UNNEST Correlate, this time with a custom rule that bypasses Calcite's broken ProjectCorrelateTransposeRule / PushProjector for UNNEST shapes. The rule prunes unused left-side columns before the cross-product expansion; right-side pruning is intentionally not attempted (see Bug 2 analysis below). This delivers the high-value optimization (trimming wide source tables before they fan out by array length) while sidestepping the silent data corruption that earlier attempts produced. Calcite bugs worked around (verified unfixed in main as of 2026-05-01): 1. RelShuttleImpl.visit(TableFunctionScan) only calls visitChildren, never applies the RexShuttle to the scan's rexCall. Combined with LogicalTableFunctionScan not overriding accept(RelShuttle), dispatch routes through visit(RelNode) and the rex shuttle never fires on the leaf. Result: $cor0.X RexFieldAccess indices in the unnest call go stale when the left input is pruned, and runtime codegen reads from the wrong field. We fix this by walking the right tree manually and applying the rebinder to every TableFunctionScan we find. 2. PushProjector mishandles renumbering when the right side has a wrapper LogicalProject(named_fields...) over LogicalTableFunctionScan and that wrapper has more than one output field. Pruning produces a plan whose surviving wrapper output silently resolves to the wrong source field -- e.g. UNNEST(MAP) returns keys where values were expected. We sidestep by not pruning the right side at all; downstream Calc-merging trims any unused right-side columns at the physical-plan level. End-to-end source pushdown verified: with TestValuesTableFactory configured for 'filterable-fields' = 'a;b' on a four-column table, a query like SELECT a, s FROM T, UNNEST(d) AS f(s) WHERE a > 5 now produces: TableSourceScan(table=[..., filter=[>(a, 5)], project=[a, d], ...]) The source reads only two of four columns AND applies the filter directly. Test status: 281/281 pass (84 ITCases + 197 plan tests + Correlate sweep). Failures from previous attempts (testCrossWithUnnestForMap returning keys, testUnnestArrayOfArrayWithOrdinality / testUnnestFromTableWithOrdinality crashing) all green now. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
testFilterAndProjectionBothPushIntoSource had the exact same SQL as
testFilterOnLeftOnlyPushesIntoSource ("SELECT a, s FROM MyTable, UNNEST(d)
AS T(s) WHERE a > 5") and produced an identical golden plan. The intent it
captured ("filter + projection both push into source") is already covered by
testFilterOnLeftOnlyPushesIntoSource, whose plan shows both filter=[>(a, 5)]
and project=[a, d] on the source TableSourceScan.
Updated testProjectionPushDownIntoSourceWithUnnest's docstring to point at
that test for the combined case.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…lter wrappers Devil's-advocate review found the project rule was firing in only ~30% of the cases the test names implied. UnnestRuleUtil.isUnnestCorrelate required the right input (after planner-shell unwrap) to be a *direct* LogicalTableFunctionScan. But LogicalUnnestRule produces Correlate(left, LogicalProject(LogicalTableFunctionScan(...))), and the matcher was relying on PROJECT_REMOVE to strip the wrapper. That dependency silently failed for: - LEFT correlate: getLogicalProjectWithAdjustedNullability adds CASTs to the wrapper Project for non-nullable element types, making it non-identity, so PROJECT_REMOVE can't strip it. ALL LEFT JOIN UNNEST queries against non-nullable arrays were getting no left-side pruning. - WITH ORDINALITY: wrapper is Project(val=$0, pos=$1) -- a renaming Project that PROJECT_REMOVE doesn't strip. - Right-only filter pushdown: filter rule produces Filter(TFS) on the right, not a direct TFS; the matcher then misses entirely. - Mixed predicates: same as right-only because the right portion produces a Filter wrapper. Fix: extend isUnnestCorrelate (now via findUnnestTableFunctionScan) to look through one or more LogicalProject and LogicalFilter wrappers between the Correlate and the underlying TFS. The rule's onMatch already walks the right subtree applying the rex shuttle to every node, so the wrapper structure can pass through unchanged -- only the $cor0.X RexFieldAccess indices inside the TFS's RexCall need to be remapped, which the existing rewriteCorrelationRefs already handles. This does NOT reintroduce Bug 2 (PushProjector mishandling wrapper-Project field renumbering during right-side pruning). The rule still does only left-side pruning -- the right side, including any wrapper Project, passes through unchanged structurally. Hardening from the same review: - M2: replaced reference equality `refExpr == newCorVar` with semantic id check `instanceof RexCorrelVariable && id.equals(newCorId)` so the rebinder remains correct against any future RexBuilder normalization. - M3: added `usedLeftCols.cardinality() == 0` defensive guard. Test impact: - 14 plan-test goldens regenerated. Audited each one: every change is a strict improvement -- the rule now fires (new $cor1 correlation id, new Calc(select=[...]) below the Correlate pruning unused source columns) in cases that were previously silent no-ops. The testInnerUnnestArrayOfRowsProjectionDropsLeftColumns test was renamed to testInnerUnnestArrayOfRowsAllLeftColumnsUsed because the table genuinely has no left columns to prune; previous name was misleading. - 84/84 ITCases still pass (UnnestITCase batch+stream, UnnestSourcePushDown ITCase). Critical: this confirms the matcher loosening does NOT reintroduce the silent data corruption that earlier attempts hit on UNNEST(MAP) / ARRAY<ROW> / WITH ORDINALITY. - Source pushdown integration confirmed for the previously-broken cases: testFilterOnRightOnlyDoesNotReachSource now shows project=[a, d] in the TableSourceScan descriptor (projection pushed into source) while still correctly keeping the right-only filter at the Correlate. Total: 284 tests pass across UnnestITCase, UnnestSourcePushDownITCase, UnnestTest (batch + stream), LogicalUnnestRuleTest, FlinkProjectFilterCorrelateUnnestTransposeRuleTest, UnnestSourcePushDownTest, and the *Correlate* sweep. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…EST rule
Devil's-advocate review flagged that the project rule's pruning could in
theory drop a rowtime column the user "didn't ask for", breaking watermark
propagation downstream. Adding focused stream-mode plan tests confirms the
rule preserves time attributes correctly:
1. testRowtimeSelectedThroughUnnestIsPreserved -- query selects ts via the
UNNEST. Rule sees ts in the project's input refs, includes it in
usedLeftCols, preserves it in the pruned left. Plan retains
`*ROWTIME* ts` marker on the Correlate's row type.
2. testRowtimeUsedByDownstreamWindowIsPreserved -- inner query selects ts
through UNNEST, outer query uses it in TUMBLE. End-to-end plan shows:
- WatermarkAssigner survives intact above the source
- Correlate keeps ts with *ROWTIME* marker, prunes b and c
- GroupWindowAggregate consumes the rowtime correctly
- TableSourceScan project=[a, ts, d] (source-level pruning)
3. testRowtimeNotSelectedIsPruned -- when user explicitly does not select
ts, normal projection drops it. The WatermarkAssigner survives in the
plan because the source still emits ts upstream of it; only the Calc
below the Correlate trims a, d. Behavior is consistent with non-UNNEST
projection of a time-attribute column.
Total: 287/287 tests pass (84 ITCases + 200+ plan tests + 3 new).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three tests added in earlier commits were arguably redundant with other coverage in this branch and added no distinct verification value: 1. FlinkProjectFilterCorrelateUnnestTransposeRuleTest.testProjectAndFilterCombined -- "WHERE a > 5 AND s < 100" was just the composition of testInnerUnnestFilterOnLeftOnly and testInnerUnnestFilterOnRightOnly. Each component is verified individually; the combined case is implied by rule composition. 2. UnnestSourcePushDownITCase.testProjectionTrimsSourceColumnsThroughUnnest -- the runtime correctness of filter+projection-into-source is already covered by testFilterOnLeftPushedIntoSource (whose query exercises both pushdown paths via the source's filterable-fields and projection support). 3. UnnestStreamTimeAttributeTest.testRowtimeSelectedThroughUnnestIsPreserved -- subsumed by testRowtimeUsedByDownstreamWindowIsPreserved, which exercises the same rowtime-preservation path under a stricter downstream consumer (TUMBLE window). The downstream-window case is the realistic scenario. Net: 25 new tests in this branch -> 22. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Fix spotless formatting violations flagged by CI on FlinkProjectCorrelateUnnestTransposeRule.java (line wrapping in javadoc and comments). Also reformats four test files that had cached-clean state but fail current spotless rules. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tream/master
Upstream merged FLINK-39558 ("LogicalUnnestRule: use Calcite Uncollect rowType
instead of LogicalType round-trip") which:
- Removed getLogicalProjectWithAdjustedNullability (CAST wrapper for LEFT
correlate). This actually simplifies the wrapper structure for LEFT and the
matcher's wrapper-traversal still handles the residual cases (right-side
filter pushdown, WITH ORDINALITY).
- Switched the unnest TFS field naming from f0/f1 (LogicalType round-trip)
to d0/d1 etc. (preserving Calcite Uncollect naming). This propagates into
every plan touching UNNEST output columns.
Goldens regenerated for UnnestTest (batch + stream),
FlinkProjectFilterCorrelateUnnestTransposeRuleTest, UnnestSourcePushDownTest,
UnnestStreamTimeAttributeTest. All 305 tests pass (89 ITCases + 216 plan
tests across the UNNEST + Correlate sweep) — note the count went up from 284
because upstream added new test cases via FLINK-39558, all of which still
work with the rule firing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
3113950 to
514a29c
Compare
…INK-39558 The wrapper-traversal in UnnestRuleUtil.isUnnestCorrelate was originally motivated partly by the CAST wrapper that getLogicalProjectWithAdjustedNullability produced for LEFT correlates with non-nullable element types. FLINK-39558 removed that helper, but the traversal is still required for renaming Projects (AS T(alias), WITH ORDINALITY's two-field output) which are not identity and therefore not stripped by PROJECT_REMOVE. Update the docstring so a future reader doesn't argue "CAST is gone, why are we walking through projects?". No code change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…sposeRule - Use RelBuilder.field(oldIdx) instead of constructing RexInputRef with RexBuilder.makeInputRef and tracking field names manually; RelBuilder derives the names from the input refs. - Extract a requireMapped helper that centralizes the missing-index consistency check used by both CorrelationFieldAccessRebinder and InputRefRemapper. No functional change. All 197 UNNEST plan + IT tests pass.
…t/filter pushdown rules Adds two regression tests proving the rule fires for UNNEST(MAP) where left columns are prunable: - testInnerUnnestMapProjectionDropsLeftColumns: confirms left-side pruning is type-agnostic (works for MAP the same as for ARRAY); the wrapper LogicalProject(KEY, VALUE) over the TFS is passed through unchanged. - testInnerUnnestMapFilterOnLeftOnly: confirms left-only predicates are pushed below the Correlate when the unnested column is a MAP. Existing UNNEST(MAP) coverage was implicit in UnnestITCase but those queries had no unused source columns, so they didn't exercise the new rules' pruning path.
…d with null-padding case Add an empty-array row (a=8) to the IT fixture so the LEFT-join filter pushdown test now actually exercises null-padding semantics. The INNER counterpart (testFilterOnLeftPushedIntoSource) excludes a=8 entirely — the delta between the two outputs is what proves LEFT semantics survive the filter pushdown. Other tests in the file are unaffected: INNER tests drop a=8 (empty array → no expansion) and the LEFT right-side filter test excludes (8, NULL) via WHERE s < 100.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
This pull request implements FLINK-32940 (partial — see scope below). It adds two planner rules that push
ProjectandFilteroperators through theCorrelateproduced by Flink's UNNEST rewrite, so unused source columns and applicable predicates are evaluated at the table source instead of after the array-expansion cross product.For the example in the JIRA description, against a source that supports filter and projection pushdown:
The source reads only the columns it needs and applies the filter directly, instead of expanding the array first.
Scope
e.enameof anARRAY<ROW<...>>is referenced). Deferred — the planner already supports nested projection for direct sources, but field usage on the right side of the Correlate doesn't flow back to the source. Tracked as a follow-up.UNNEST(MAP). Trade-off documented in the rule.Verifying this change
This change adds tests and can be verified as follows:
WITH ORDINALITYandARRAY<ROW>shapes.TestValuesTableFactoryconfirming filters and projections drive into theTableSourceScandescriptor for sourcesmvn clean verifypasses locally onflink-table-planner.Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation
Was generative AI tooling used to co-author this PR?