diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCorrelateUnnestTransposeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCorrelateUnnestTransposeRule.java
new file mode 100644
index 0000000000000..f68a988bdf0b6
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCorrelateUnnestTransposeRule.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.rules.FilterCorrelateRule;
+
+/**
+ * Pushes a {@link org.apache.calcite.rel.core.Filter} down through a {@link Correlate} produced by
+ * Flink's UNNEST rewrite. Left-only predicates land on the left input; right-only predicates land
+ * on the right input for INNER correlates; mixed predicates stay above. The {@code LEFT} correlate
+ * case correctly leaves right-only predicates above the join, preserving the null-padded row for
+ * empty arrays.
+ *
+ *
Delegates the transformation to Calcite's {@link FilterCorrelateRule}; only the match
+ * predicate is restricted to the UNNEST shape (right input is a {@code LogicalTableFunctionScan}
+ * wrapping {@code INTERNAL_UNNEST_ROWS} or {@code INTERNAL_UNNEST_ROWS_WITH_ORDINALITY}).
+ */
+public class FlinkFilterCorrelateUnnestTransposeRule extends FilterCorrelateRule {
+
+ public static final FlinkFilterCorrelateUnnestTransposeRule INSTANCE =
+ new FlinkFilterCorrelateUnnestTransposeRule(
+ FilterCorrelateRule.Config.DEFAULT
+ .withDescription("FlinkFilterCorrelateUnnestTransposeRule")
+ .as(FilterCorrelateRule.Config.class));
+
+ private FlinkFilterCorrelateUnnestTransposeRule(FilterCorrelateRule.Config config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ Correlate correlate = call.rel(1);
+ return UnnestRuleUtil.isUnnestCorrelate(correlate);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectCorrelateUnnestTransposeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectCorrelateUnnestTransposeRule.java
new file mode 100644
index 0000000000000..f0aab65c60b4f
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectCorrelateUnnestTransposeRule.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.TableFunctionScan;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.TransformationRule;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Pushes a {@link LogicalProject} down through a {@link LogicalCorrelate} produced by Flink's
+ * UNNEST rewrite, pruning unused left-side columns before the cross-product expansion.
+ *
+ *
This is a custom replacement for Calcite's {@link
+ * org.apache.calcite.rel.rules.ProjectCorrelateTransposeRule}, which has two bugs that cause
+ * runtime failures on UNNEST:
+ *
+ *
+ * - Bug 1. {@code RelShuttleImpl.visit(TableFunctionScan)} only walks {@code
+ * visitChildren(scan)} — it never applies the {@link RexShuttle} to the scan's {@code
+ * rexCall}, and {@code LogicalTableFunctionScan} doesn't override {@code accept(RelShuttle)},
+ * so dispatch routes through {@code visit(RelNode)} instead. The result: {@code
+ * RexFieldAccess($cor0.X)} indices inside the unnest call are never re-numbered when the left
+ * input is pruned, and runtime codegen reads from a stale field index. We fix this here by
+ * walking the right tree explicitly and applying our shuttle to every {@code
+ * TableFunctionScan} we find.
+ *
- Bug 2. Calcite's {@code PushProjector} mishandles renumbering when the right side is
+ * {@code LogicalProject(named_fields...) over LogicalTableFunctionScan} and the wrapper
+ * Project has more than one output field — pruning produces a plan whose remaining wrapper
+ * output silently resolves to the wrong source field (e.g. {@code UNNEST(MAP)} returns keys
+ * where values were expected). We sidestep this entirely by NOT pruning the right side at
+ * all. Only the left input is pruned; the right side (TFS plus any wrapper Project) is passed
+ * through unchanged except for the {@code $cor0.X} index rewrite from Bug 1.
+ *
+ *
+ * The trade-off vs. a fully-general project pushdown: when a query reads only a subset of an
+ * UNNEST's output columns (e.g. only {@code v} from {@code UNNEST(MAP) AS f(k, v)}), we still pass
+ * both columns out of the Correlate. The unused column gets trimmed by downstream Calc-merging.
+ * Pruning the (typically wide) source-table left input is the bigger win and is what this rule
+ * delivers safely.
+ */
+@Value.Enclosing
+public class FlinkProjectCorrelateUnnestTransposeRule
+ extends RelRule
+ implements TransformationRule {
+
+ public static final FlinkProjectCorrelateUnnestTransposeRule INSTANCE =
+ FlinkProjectCorrelateUnnestTransposeRule.Config.DEFAULT.toRule();
+
+ protected FlinkProjectCorrelateUnnestTransposeRule(Config config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ Correlate correlate = call.rel(1);
+ return UnnestRuleUtil.isUnnestCorrelate(correlate);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final LogicalProject topProject = call.rel(0);
+ final LogicalCorrelate correlate = call.rel(1);
+ final RelNode left = correlate.getLeft();
+ final int leftFieldCount = left.getRowType().getFieldCount();
+
+ // Determine which left columns the top project references, plus the columns required by
+ // the right side via the correlation variable.
+ ImmutableBitSet projectInputs = RelOptUtil.InputFinder.bits(topProject.getProjects(), null);
+ ImmutableBitSet projectLeftRefs =
+ projectInputs.intersect(ImmutableBitSet.range(0, leftFieldCount));
+ ImmutableBitSet usedLeftCols = projectLeftRefs.union(correlate.getRequiredColumns());
+
+ // Nothing to prune: rule is a no-op. Also defensively bail if no left columns survive,
+ // which would produce a zero-column Project that some Calcite paths reject.
+ if (usedLeftCols.cardinality() == leftFieldCount || usedLeftCols.cardinality() == 0) {
+ return;
+ }
+
+ // Build mapping oldLeftIndex -> newLeftIndex for kept columns.
+ final Map leftMapping = new HashMap<>();
+ int newIdx = 0;
+ for (Integer oldIdx : usedLeftCols) {
+ leftMapping.put(oldIdx, newIdx++);
+ }
+ final int newLeftFieldCount = newIdx;
+
+ final RelBuilder builder = call.builder();
+ final RexBuilder rexBuilder = builder.getRexBuilder();
+
+ // Build new left input as a Project that keeps only the used columns. RelBuilder.project
+ // derives field names from the input refs.
+ builder.push(left);
+ final List leftRefs = new ArrayList<>();
+ for (Integer oldIdx : usedLeftCols) {
+ leftRefs.add(builder.field(oldIdx));
+ }
+ final RelNode newLeft = builder.project(leftRefs).build();
+
+ // Allocate a fresh correlation id bound to the new left's narrower row type, and rewrite
+ // every $cor0.X reference inside the right side using the index mapping. This is the
+ // explicit Bug 1 workaround.
+ final CorrelationId newCorId = correlate.getCluster().createCorrel();
+ final RexCorrelVariable newCorVar =
+ (RexCorrelVariable) rexBuilder.makeCorrel(newLeft.getRowType(), newCorId);
+ final CorrelationFieldAccessRebinder rebinder =
+ new CorrelationFieldAccessRebinder(
+ correlate.getCorrelationId(), newCorVar, leftMapping, rexBuilder);
+ final RelNode newRight = rewriteCorrelationRefs(correlate.getRight(), rebinder);
+
+ // Re-map requiredColumns to the new left's index space.
+ final ImmutableBitSet.Builder newReqColsBuilder = ImmutableBitSet.builder();
+ for (Integer oldCol : correlate.getRequiredColumns()) {
+ newReqColsBuilder.set(leftMapping.get(oldCol));
+ }
+ final ImmutableBitSet newReqCols = newReqColsBuilder.build();
+
+ // Build the new Correlate.
+ final RelNode newCorrelate =
+ correlate.copy(
+ correlate.getTraitSet(),
+ newLeft,
+ newRight,
+ newCorId,
+ newReqCols,
+ correlate.getJoinType());
+
+ // Rewrite the top Project's input refs:
+ // left side ([0, leftFieldCount)) -> via leftMapping
+ // right side ([leftFieldCount, total)) -> shifted by (newLeftFieldCount -
+ // leftFieldCount)
+ // Right-side row width is unchanged, so a uniform shift is sufficient.
+ final InputRefRemapper remapper =
+ new InputRefRemapper(leftMapping, leftFieldCount, newLeftFieldCount);
+ final List newProjects =
+ topProject.getProjects().stream()
+ .map(rex -> rex.accept(remapper))
+ .collect(Collectors.toList());
+
+ final RelNode result =
+ builder.push(newCorrelate)
+ .project(newProjects, topProject.getRowType().getFieldNames())
+ .build();
+
+ call.transformTo(result);
+ }
+
+ /**
+ * Looks up {@code oldIdx} in {@code mapping} and returns the new index, throwing a clear
+ * exception if the index is missing. Centralizes the consistency check used by both the
+ * correlation-variable rewrite and the top-project input-ref remap.
+ */
+ private static int requireMapped(
+ Map mapping, int oldIdx, String description) {
+ Integer newIdx = mapping.get(oldIdx);
+ if (newIdx == null) {
+ throw new IllegalStateException(
+ description + " " + oldIdx + " missing from mapping " + mapping);
+ }
+ return newIdx;
+ }
+
+ /**
+ * Walks the right subtree applying {@code rexShuttle} to every node's expressions. We do this
+ * manually rather than going through {@code RelShuttleImpl} because the stock implementation
+ * skips leaf {@link TableFunctionScan} nodes' RexCalls (Bug 1 in this rule's javadoc).
+ *
+ * The traversal:
+ *
+ *
+ * - Recursively visits each input.
+ *
- Re-builds the parent if any input changed.
+ *
- Calls {@code accept(rexShuttle)} on every node so that {@code TableFunctionScan}'s
+ * {@code rexCall}, {@code LogicalProject}'s projections, etc. are all rewritten.
+ *
+ */
+ private static RelNode rewriteCorrelationRefs(RelNode rel, RexShuttle rexShuttle) {
+ RelNode unwrapped = UnnestRuleUtil.unwrap(rel);
+ final List originalInputs = unwrapped.getInputs();
+ final List newInputs = new ArrayList<>(originalInputs.size());
+ boolean changed = false;
+ for (RelNode input : originalInputs) {
+ RelNode newInput = rewriteCorrelationRefs(input, rexShuttle);
+ newInputs.add(newInput);
+ if (newInput != input) {
+ changed = true;
+ }
+ }
+ RelNode current = changed ? unwrapped.copy(unwrapped.getTraitSet(), newInputs) : unwrapped;
+ return current.accept(rexShuttle);
+ }
+
+ /**
+ * {@link RexShuttle} that swaps an old {@link RexCorrelVariable} for a new one and re-numbers
+ * the field index of any {@link RexFieldAccess} on it using the supplied mapping.
+ *
+ * Mirrors Calcite's {@code ProjectCorrelateTransposeRule.RexFieldAccessReplacer}, but lives
+ * here so the rule is self-contained.
+ */
+ private static class CorrelationFieldAccessRebinder extends RexShuttle {
+ private final CorrelationId oldCorId;
+ private final CorrelationId newCorId;
+ private final RexCorrelVariable newCorVar;
+ private final Map indexMapping;
+ private final RexBuilder rexBuilder;
+
+ CorrelationFieldAccessRebinder(
+ CorrelationId oldCorId,
+ RexCorrelVariable newCorVar,
+ Map indexMapping,
+ RexBuilder rexBuilder) {
+ this.oldCorId = oldCorId;
+ this.newCorId = newCorVar.id;
+ this.newCorVar = newCorVar;
+ this.indexMapping = indexMapping;
+ this.rexBuilder = rexBuilder;
+ }
+
+ @Override
+ public RexNode visitCorrelVariable(RexCorrelVariable variable) {
+ return variable.id.equals(oldCorId) ? newCorVar : variable;
+ }
+
+ @Override
+ public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
+ RexNode refExpr = fieldAccess.getReferenceExpr().accept(this);
+ // Semantic id check (rather than reference equality on `newCorVar`) so the rebinder
+ // remains correct if Calcite ever interns/normalizes RexCorrelVariable instances.
+ if (refExpr instanceof RexCorrelVariable
+ && ((RexCorrelVariable) refExpr).id.equals(newCorId)) {
+ int newFieldIdx =
+ requireMapped(
+ indexMapping,
+ fieldAccess.getField().getIndex(),
+ "Required column index");
+ return rexBuilder.makeFieldAccess(refExpr, newFieldIdx);
+ }
+ return super.visitFieldAccess(fieldAccess);
+ }
+ }
+
+ /**
+ * {@link RexShuttle} that re-numbers {@link RexInputRef}s in the top project after the left
+ * input has been pruned. Left-side refs use the index mapping; right-side refs are shifted by
+ * the change in left-field count (right-side row width is unchanged).
+ */
+ private static class InputRefRemapper extends RexShuttle {
+ private final Map leftMapping;
+ private final int oldLeftFieldCount;
+ private final int newLeftFieldCount;
+
+ InputRefRemapper(
+ Map leftMapping, int oldLeftFieldCount, int newLeftFieldCount) {
+ this.leftMapping = leftMapping;
+ this.oldLeftFieldCount = oldLeftFieldCount;
+ this.newLeftFieldCount = newLeftFieldCount;
+ }
+
+ @Override
+ public RexNode visitInputRef(RexInputRef ref) {
+ int oldIdx = ref.getIndex();
+ if (oldIdx < oldLeftFieldCount) {
+ int newIdx =
+ requireMapped(
+ leftMapping, oldIdx, "Top project references pruned left column");
+ return new RexInputRef(newIdx, ref.getType());
+ }
+ int rightOffset = oldIdx - oldLeftFieldCount;
+ return new RexInputRef(newLeftFieldCount + rightOffset, ref.getType());
+ }
+ }
+
+ /** Rule configuration. */
+ @Value.Immutable(singleton = false)
+ public interface Config extends RelRule.Config {
+ Config DEFAULT =
+ ImmutableFlinkProjectCorrelateUnnestTransposeRule.Config.builder()
+ .build()
+ .withOperandSupplier(
+ b0 ->
+ b0.operand(LogicalProject.class)
+ .oneInput(
+ b1 ->
+ b1.operand(LogicalCorrelate.class)
+ .anyInputs()))
+ .withDescription("FlinkProjectCorrelateUnnestTransposeRule");
+
+ @Override
+ default FlinkProjectCorrelateUnnestTransposeRule toRule() {
+ return new FlinkProjectCorrelateUnnestTransposeRule(this);
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/UnnestRuleUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/UnnestRuleUtil.java
new file mode 100644
index 0000000000000..e75319d21c203
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/UnnestRuleUtil.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+
+/**
+ * Helpers for rules that target the {@link Correlate} produced by Flink's UNNEST rewrite.
+ *
+ * {@link LogicalUnnestRule} converts {@code UNNEST} into {@code Correlate(left,
+ * LogicalTableFunctionScan(INTERNAL_UNNEST_ROWS[_WITH_ORDINALITY](...)))}. Other Correlate shapes
+ * (lateral table functions, temporal joins, vector search, ML predict, Python UDTFs) deliberately
+ * fall outside this matcher.
+ */
+final class UnnestRuleUtil {
+
+ private UnnestRuleUtil() {}
+
+ /**
+ * Returns whether the right input of {@code correlate} is a Flink UNNEST table function. The
+ * matcher looks through {@link LogicalProject} and {@link LogicalFilter} wrappers because
+ * {@link LogicalUnnestRule} emits a renaming Project on top of the {@link
+ * LogicalTableFunctionScan} (e.g. for {@code AS T(alias)} and {@code WITH ORDINALITY}'s
+ * two-field output), and the filter rule may push a right-only predicate as a {@link
+ * LogicalFilter} above the TFS. Renaming Projects are not identity and therefore not stripped
+ * by {@code PROJECT_REMOVE}; strict matching (direct TFS only) would silently disable this rule
+ * for {@code WITH ORDINALITY} and right-side filter pushdown shapes.
+ */
+ static boolean isUnnestCorrelate(Correlate correlate) {
+ return findUnnestTableFunctionScan(correlate.getRight()) != null;
+ }
+
+ /**
+ * Walks through planner shells and {@link LogicalProject} / {@link LogicalFilter} wrappers to
+ * locate the underlying UNNEST {@link LogicalTableFunctionScan}, or returns {@code null} if
+ * none is found.
+ */
+ static LogicalTableFunctionScan findUnnestTableFunctionScan(RelNode rel) {
+ rel = unwrap(rel);
+ if (rel instanceof LogicalProject) {
+ return findUnnestTableFunctionScan(((LogicalProject) rel).getInput());
+ }
+ if (rel instanceof LogicalFilter) {
+ return findUnnestTableFunctionScan(((LogicalFilter) rel).getInput());
+ }
+ if (rel instanceof LogicalTableFunctionScan) {
+ LogicalTableFunctionScan tfs = (LogicalTableFunctionScan) rel;
+ if (isUnnestCall(tfs.getCall())) {
+ return tfs;
+ }
+ }
+ return null;
+ }
+
+ private static boolean isUnnestCall(RexNode call) {
+ if (!(call instanceof RexCall)) {
+ return false;
+ }
+ SqlOperator op = ((RexCall) call).getOperator();
+ if (!(op instanceof BridgingSqlFunction)) {
+ return false;
+ }
+ FunctionDefinition def = ((BridgingSqlFunction) op).getDefinition();
+ return def == BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS
+ || def == BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS_WITH_ORDINALITY;
+ }
+
+ /**
+ * Unwraps planner shells ({@link HepRelVertex}, {@link RelSubset}) to expose the underlying
+ * {@link RelNode}. Returns the input unchanged if it is not a planner shell.
+ */
+ static RelNode unwrap(RelNode rel) {
+ if (rel instanceof HepRelVertex) {
+ return unwrap(((HepRelVertex) rel).getCurrentRel());
+ }
+ if (rel instanceof RelSubset) {
+ RelSubset subset = (RelSubset) rel;
+ RelNode best = subset.getBest();
+ if (best != null) {
+ return unwrap(best);
+ }
+ RelNode original = subset.getOriginal();
+ if (original != null) {
+ return unwrap(original);
+ }
+ }
+ return rel;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 7ff8a16a70e4a..b4afd0b55297a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -149,6 +149,8 @@ object FlinkBatchRuleSets {
// push a filter past a project
FlinkFilterProjectTransposeRule.INSTANCE,
CoreRules.FILTER_SET_OP_TRANSPOSE,
+ // push a filter through a Correlate produced by UNNEST
+ FlinkFilterCorrelateUnnestTransposeRule.INSTANCE,
CoreRules.FILTER_MERGE
)
@@ -215,6 +217,8 @@ object FlinkBatchRuleSets {
RelFactories.LOGICAL_BUILDER),
// push a projection to the children of a semi/anti Join
ProjectSemiAntiJoinTransposeRule.INSTANCE,
+ // push a projection through a Correlate produced by UNNEST (left-side pruning only)
+ FlinkProjectCorrelateUnnestTransposeRule.INSTANCE,
// merge projections
FlinkProjectMergeRule.INSTANCE,
// remove identity project
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 05de12a6950a9..bebf679b8f56d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -153,6 +153,8 @@ object FlinkStreamRuleSets {
FlinkFilterProjectTransposeRule.INSTANCE,
// push a filter past a setop
CoreRules.FILTER_SET_OP_TRANSPOSE,
+ // push a filter through a Correlate produced by UNNEST
+ FlinkFilterCorrelateUnnestTransposeRule.INSTANCE,
CoreRules.FILTER_MERGE
)
@@ -215,6 +217,8 @@ object FlinkStreamRuleSets {
ProjectMultiJoinTransposeRule.INSTANCE,
// push a projection to the children of a semi/anti Join
ProjectSemiAntiJoinTransposeRule.INSTANCE,
+ // push a projection through a Correlate produced by UNNEST (left-side pruning only)
+ FlinkProjectCorrelateUnnestTransposeRule.INSTANCE,
// merge projections
FlinkProjectMergeRule.INSTANCE,
// remove identity project
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
index 5e0da95bb0719..14eda4ad1f9d8 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
@@ -34,8 +34,9 @@ LogicalProject(a=[$0], s=[$3])
@@ -149,8 +150,9 @@ LogicalProject(bd_name=[$3])
@@ -172,8 +174,9 @@ LogicalProject(bd_name=[$3])
@@ -216,8 +219,9 @@ LogicalProject(bd_name=[$3])
@@ -239,8 +243,9 @@ LogicalProject(bd_name=[$3])
@@ -307,8 +312,9 @@ LogicalProject(bd_name=[$3])
@@ -330,8 +336,9 @@ LogicalProject(bd_name=[$3])
@@ -394,8 +401,9 @@ LogicalProject(a=[$0], s=[$3])
@@ -498,8 +506,8 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
@@ -610,8 +618,8 @@ LogicalProject(id=[$0], array_val=[$2], array_pos=[$3], elem=[$4], element_pos=[
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectFilterCorrelateUnnestTransposeRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectFilterCorrelateUnnestTransposeRuleTest.xml
new file mode 100644
index 0000000000000..7dad6eaae6cd3
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectFilterCorrelateUnnestTransposeRuleTest.xml
@@ -0,0 +1,289 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 5]]>
+
+
+ ($0, 5)])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalProject(s=[$0])
+ +- Uncollect
+ +- LogicalProject(d=[$cor0.d])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+
+
+ (a, 5)])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])
+]]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 5]]>
+
+
+ ($0, 5)])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyMapTable]])
+ +- LogicalProject(k=[$0], v=[$1])
+ +- Uncollect
+ +- LogicalProject(m=[$cor0.m])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+
+
+ (a, 5)])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyMapTable]], fields=[a, b, m])
+]]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 5 AND s < 100 AND a + s > 10]]>
+
+
+ ($0, 5), <($4, 100), >(+($0, $4), 10))])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalProject(s=[$0])
+ +- Uncollect
+ +- LogicalProject(d=[$cor0.d])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+
+
+ (+(a, d0), 10)])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor1.d)], correlate=[table($UNNEST_ROWS$1($cor1.d))], select=[a,b,d,d0], rowType=[RecordType(INTEGER a, INTEGER b, INTEGER ARRAY d, INTEGER d0)], joinType=[INNER], condition=[<($0, 100)])
+ +- Calc(select=[a, b, d], where=[>(a, 5)])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])
+]]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/UnnestSourcePushDownTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/UnnestSourcePushDownTest.xml
new file mode 100644
index 0000000000000..fa7d144bc861d
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/UnnestSourcePushDownTest.xml
@@ -0,0 +1,163 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 5]]>
+
+
+ ($0, 5)])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalProject(s=[$0])
+ +- Uncollect
+ +- LogicalProject(d=[$cor0.d])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+
+
+ (a, 5)], project=[a, d], metadata=[]]], fields=[a, d])
+]]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 5 AND s < 100]]>
+
+
+ ($0, 5), <($4, 100))])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalProject(s=[$0])
+ +- Uncollect
+ +- LogicalProject(d=[$cor0.d])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+
+
+ (a, 5)], project=[a, d], metadata=[]]], fields=[a, d])
+]]>
+
+
+
+
+ 5 AND s < 100 AND a + s > 10]]>
+
+
+ ($0, 5), <($4, 100), >(+($0, $4), 10))])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalProject(s=[$0])
+ +- Uncollect
+ +- LogicalProject(d=[$cor0.d])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+
+
+ (+(a, d0), 10)])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor1.d)], correlate=[table($UNNEST_ROWS$1($cor1.d))], select=[a,b,d,d0], rowType=[RecordType(INTEGER a, BIGINT b, INTEGER ARRAY d, INTEGER d0)], joinType=[INNER], condition=[<($0, 100)])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[>(a, 5)], project=[a, b, d], metadata=[]]], fields=[a, b, d])
+]]>
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/UnnestStreamTimeAttributeTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/UnnestStreamTimeAttributeTest.xml
new file mode 100644
index 0000000000000..d784fc5878bd1
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/UnnestStreamTimeAttributeTest.xml
@@ -0,0 +1,80 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
index 138d56d8c12a7..25ac93e579ffd 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
@@ -34,8 +34,9 @@ LogicalProject(a=[$0], s=[$3])
@@ -144,8 +145,9 @@ LogicalProject(bd_name=[$3])
@@ -167,8 +169,9 @@ LogicalProject(bd_name=[$3])
@@ -211,8 +214,9 @@ LogicalProject(bd_name=[$3])
@@ -234,8 +238,9 @@ LogicalProject(bd_name=[$3])
@@ -302,8 +307,9 @@ LogicalProject(bd_name=[$3])
@@ -325,8 +331,9 @@ LogicalProject(bd_name=[$3])
@@ -384,8 +391,9 @@ LogicalProject(a=[$0], s=[$3])
@@ -483,8 +491,8 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
@@ -622,8 +630,8 @@ LogicalProject(id=[$0], array_val=[$2], array_pos=[$3], elem=[$4], element_pos=[
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectFilterCorrelateUnnestTransposeRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectFilterCorrelateUnnestTransposeRuleTest.scala
new file mode 100644
index 0000000000000..27e1cf3228374
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectFilterCorrelateUnnestTransposeRuleTest.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.types.AbstractDataType
+
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+/**
+ * Test for [[FlinkProjectCorrelateUnnestTransposeRule]] and
+ * [[FlinkFilterCorrelateUnnestTransposeRule]]. Both rules are wired into
+ * [[org.apache.flink.table.planner.plan.rules.FlinkBatchRuleSets.PROJECT_RULES]] /
+ * [[org.apache.flink.table.planner.plan.rules.FlinkBatchRuleSets.FILTER_RULES]] (and the stream
+ * equivalents), so the standard batch optimization chain exercises them after [[LogicalUnnestRule]]
+ * runs.
+ */
+class FlinkProjectFilterCorrelateUnnestTransposeRuleTest extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @BeforeEach
+ def setup(): Unit = {
+ util.addTableSource[(Int, Int, Long, Array[Int])]("MyTable", 'a, 'b, 'c, 'd)
+ util.addTableSource[(Int, Array[(Int, String)])]("MyRowArrayTable", 'a, 'b)
+ util.addTableSource(
+ "MyMapTable",
+ Array[AbstractDataType[_]](
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ Array("a", "b", "m"))
+ }
+
+ @Test
+ def testInnerUnnestProjectionDropsLeftColumns(): Unit = {
+ util.verifyRelPlan("SELECT a, s FROM MyTable, UNNEST(d) AS T(s)")
+ }
+
+ @Test
+ def testLeftUnnestProjectionDropsLeftColumns(): Unit = {
+ util.verifyRelPlan("SELECT a, s FROM MyTable LEFT JOIN UNNEST(d) AS T(s) ON TRUE")
+ }
+
+ /**
+ * UNNEST of an ARRAY of ROWs. The table has only two columns ({@code a} selected by the project,
+ * {@code b} required by the correlation), so there is nothing to prune from the left input — the
+ * rule correctly no-ops. The test still locks in the post-rule plan shape for ARRAY<ROW> so
+ * a future regression in correlation handling for that shape would surface as a plan diff.
+ */
+ @Test
+ def testInnerUnnestArrayOfRowsAllLeftColumnsUsed(): Unit = {
+ util.verifyRelPlan("SELECT a, x FROM MyRowArrayTable, UNNEST(b) AS T(x, y)")
+ }
+
+ @Test
+ def testInnerUnnestFilterOnLeftOnly(): Unit = {
+ util.verifyRelPlan("SELECT a, b, s FROM MyTable, UNNEST(d) AS T(s) WHERE a > 5")
+ }
+
+ @Test
+ def testInnerUnnestFilterOnRightOnly(): Unit = {
+ util.verifyRelPlan("SELECT a, b, s FROM MyTable, UNNEST(d) AS T(s) WHERE s < 100")
+ }
+
+ @Test
+ def testLeftUnnestFilterOnRightOnly(): Unit = {
+ util.verifyRelPlan("SELECT a, s FROM MyTable LEFT JOIN UNNEST(d) AS T(s) ON TRUE WHERE s < 100")
+ }
+
+ @Test
+ def testInnerUnnestMixedPredicate(): Unit = {
+ util.verifyRelPlan(
+ "SELECT a, b, s FROM MyTable, UNNEST(d) AS T(s) " +
+ "WHERE a > 5 AND s < 100 AND a + s > 10")
+ }
+
+ @Test
+ def testFilterOnArrayColumn(): Unit = {
+ util.verifyRelPlan("SELECT a, s FROM MyTable, UNNEST(d) AS T(s) WHERE d IS NOT NULL")
+ }
+
+ @Test
+ def testUnnestWithOrdinalityFilterOnPos(): Unit = {
+ util.verifyRelPlan(
+ "SELECT a, val, pos FROM MyTable " +
+ "CROSS JOIN UNNEST(d) WITH ORDINALITY AS T(val, pos) WHERE pos = 1")
+ }
+
+ /**
+ * UNNEST of a MAP column. {@code MyMapTable} has {@code (a, b, m)} where {@code b} is
+ * unreferenced so the rule should prune it from the left input. Confirms left-side pruning is
+ * type-agnostic (works for {@code MAP} the same as for {@code ARRAY}). The right side is {@code
+ * LogicalProject(KEY, VALUE) over LogicalTableFunctionScan(INTERNAL_UNNEST_ROWS)} — the wrapper
+ * Project is passed through unchanged.
+ */
+ @Test
+ def testInnerUnnestMapProjectionDropsLeftColumns(): Unit = {
+ util.verifyRelPlan("SELECT a, k, v FROM MyMapTable, UNNEST(m) AS T(k, v)")
+ }
+
+ /**
+ * UNNEST of a MAP column with a left-only filter. Verifies the filter rule pushes {@code a > 5}
+ * onto the left input below the Correlate even when the unnested column is a MAP, and the project
+ * rule still prunes {@code b}.
+ */
+ @Test
+ def testInnerUnnestMapFilterOnLeftOnly(): Unit = {
+ util.verifyRelPlan("SELECT a, v FROM MyMapTable, UNNEST(m) AS T(k, v) WHERE a > 5")
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/UnnestSourcePushDownTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/UnnestSourcePushDownTest.scala
new file mode 100644
index 0000000000000..25c14554aeb32
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/UnnestSourcePushDownTest.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.planner.utils.TableTestBase
+
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+/**
+ * End-to-end plan tests that exercise [[FlinkFilterCorrelateUnnestTransposeRule]] together with the
+ * existing source-pushdown rules ([[PushFilterIntoTableSourceScanRule]],
+ * [[PushProjectIntoTableSourceScanRule]]). The goal is to confirm that:
+ *
+ * - Filters that this rule pushes onto the left input of a Correlate continue down into the table
+ * source when the source supports [[SupportsFilterPushDown]].
+ * - Existing projection pushdown into the table source still works alongside UNNEST (the planner
+ * trims columns from the scan even though we don't yet push Project through Correlate).
+ *
+ * Stream + batch both go through the same FILTER_RULES set; batch coverage is sufficient to
+ * exercise the rule wiring end-to-end.
+ */
+class UnnestSourcePushDownTest extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @BeforeEach
+ def setup(): Unit = {
+ util.tableEnv.executeSql("""
+ |CREATE TABLE MyTable (
+ | a INT,
+ | b BIGINT,
+ | c STRING,
+ | d ARRAY
+ |) WITH (
+ | 'connector' = 'values',
+ | 'filterable-fields' = 'a;b',
+ | 'bounded' = 'true'
+ |)
+ """.stripMargin)
+ }
+
+ /**
+ * Filter on left-only column should land in the table source scan as a pushed predicate when the
+ * source declares it filterable. Without the new rule the filter would sit above the Correlate
+ * and the source would see no predicate.
+ */
+ @Test
+ def testFilterOnLeftOnlyPushesIntoSource(): Unit = {
+ util.verifyRelPlan("SELECT a, s FROM MyTable, UNNEST(d) AS T(s) WHERE a > 5")
+ }
+
+ /** Right-only filter must remain near the UNNEST and never reach the source. */
+ @Test
+ def testFilterOnRightOnlyDoesNotReachSource(): Unit = {
+ util.verifyRelPlan("SELECT a, s FROM MyTable, UNNEST(d) AS T(s) WHERE s < 100")
+ }
+
+ /** Mixed predicate splits: left part to source, right part above TFS, mixed stays above. */
+ @Test
+ def testMixedPredicateSplitsAcrossSourceAndCorrelate(): Unit = {
+ util.verifyRelPlan(
+ "SELECT a, b, s FROM MyTable, UNNEST(d) AS T(s) " +
+ "WHERE a > 5 AND s < 100 AND a + s > 10")
+ }
+
+ /**
+ * Filter on a left column that is NOT in {@code filterable-fields} stays above the source scan
+ * but should still land below the Correlate (i.e. the new rule fires regardless of whether the
+ * source can absorb it).
+ */
+ @Test
+ def testFilterOnLeftNonFilterableColumn(): Unit = {
+ util.verifyRelPlan("SELECT a, s FROM MyTable, UNNEST(d) AS T(s) WHERE c LIKE 'foo%'")
+ }
+
+ /**
+ * Existing projection pushdown into source should still trim {@code b, c} from the scan when the
+ * top-level query only needs {@code a, s}. The same query with {@code WHERE a > 5} would
+ * additionally push the filter into the source — that combined case is already covered by
+ * [[testFilterOnLeftOnlyPushesIntoSource]] above, whose golden plan shows both {@code
+ * filter=[>(a, 5)]} and {@code project=[a, d]} on the source scan.
+ */
+ @Test
+ def testProjectionPushDownIntoSourceWithUnnest(): Unit = {
+ util.verifyRelPlan("SELECT a, s FROM MyTable, UNNEST(d) AS T(s)")
+ }
+
+ /**
+ * LEFT correlate variant: filter on the right side must not push to source (would break null
+ * padding) but filter on left should still flow into source.
+ */
+ @Test
+ def testLeftCorrelateMixedFilterPushDown(): Unit = {
+ util.verifyRelPlan(
+ "SELECT a, s FROM MyTable LEFT JOIN UNNEST(d) AS T(s) ON TRUE " +
+ "WHERE a > 5 AND s < 100")
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/UnnestStreamTimeAttributeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/UnnestStreamTimeAttributeTest.scala
new file mode 100644
index 0000000000000..2db2dbdd08fb0
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/UnnestStreamTimeAttributeTest.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.planner.utils.TableTestBase
+
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+/**
+ * Stream-mode plan tests that verify [[FlinkProjectCorrelateUnnestTransposeRule]] preserves rowtime
+ * / watermark attributes when they are referenced downstream of UNNEST.
+ *
+ * The concern: pruning a left-side column the user "didn't ask for" could break watermark
+ * propagation if some downstream operator needs the rowtime. The test confirms that:
+ *
+ *
- When the rowtime column is referenced (directly or by a downstream window function), the
+ * rule preserves it in the pruned left input.
- Pruning of unrelated columns still happens around
+ * the time attribute.
+ *
+ * Note: when a user explicitly does not select the rowtime through UNNEST, that's a deliberate
+ * projection — the resulting view loses its time attribute, just as it would for any other
+ * projection. That is consistent with normal Flink projection semantics and not the rule's concern.
+ */
+class UnnestStreamTimeAttributeTest extends TableTestBase {
+
+ private val util = streamTestUtil()
+
+ @BeforeEach
+ def setup(): Unit = {
+ util.tableEnv.executeSql("""
+ |CREATE TABLE T (
+ | a INT,
+ | b BIGINT,
+ | c STRING,
+ | ts TIMESTAMP(3),
+ | d ARRAY,
+ | WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
+ |) WITH (
+ | 'connector' = 'values',
+ | 'bounded' = 'false'
+ |)
+ """.stripMargin)
+ }
+
+ /**
+ * Downstream window aggregation requires the rowtime. The inner SELECT lists {@code ts} so it
+ * propagates through the UNNEST, and the outer GROUP BY tumbles on it. The rule should preserve
+ * {@code ts} (with the {@code *ROWTIME*} marker) in the pruned left while still dropping
+ * unreferenced columns ({@code b}, {@code c}). This is the realistic streaming case; a simpler
+ * "rowtime appears in plan" assertion is implied by it.
+ */
+ @Test
+ def testRowtimeUsedByDownstreamWindowIsPreserved(): Unit = {
+ util.verifyRelPlan("""
+ |SELECT a, TUMBLE_END(ts, INTERVAL '1' HOUR) AS w_end, COUNT(*) AS cnt
+ |FROM (SELECT a, ts, s FROM T, UNNEST(d) AS f(s))
+ |GROUP BY a, TUMBLE(ts, INTERVAL '1' HOUR)
+ """.stripMargin)
+ }
+
+ /**
+ * User explicitly does not select rowtime through UNNEST. Normal projection semantics: rowtime is
+ * dropped. The rule additionally prunes {@code b}, {@code c} from the source scan. This test
+ * confirms behavior is consistent with non-UNNEST projection — losing the time attribute is the
+ * user's choice, not a side effect of the rule.
+ */
+ @Test
+ def testRowtimeNotSelectedIsPruned(): Unit = {
+ util.verifyRelPlan("SELECT a, s FROM T, UNNEST(d) AS f(s)")
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestSourcePushDownITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestSourcePushDownITCase.scala
new file mode 100644
index 0000000000000..3c2525742fce7
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestSourcePushDownITCase.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.runtime.batch.sql
+
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.types.Row
+
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+/**
+ * End-to-end correctness tests for [[FlinkFilterCorrelateUnnestTransposeRule]] combined with
+ * source-level filter and projection pushdown. Uses {@code TestValuesTableFactory} with filterable
+ * fields so the planner can drive predicates all the way into the source scan.
+ */
+class UnnestSourcePushDownITCase extends BatchTestBase {
+
+ // (a, b, c, d) where d = ARRAY. Row a=8 has an empty array so LEFT-join tests can
+ // exercise the null-padding path.
+ private val rows: Seq[Row] = Seq(
+ row(1, 10L, "x", Array[Integer](1, 2)),
+ row(2, 20L, "y", Array[Integer](3)),
+ row(6, 60L, "z", Array[Integer](50, 150)),
+ row(7, 70L, "w", Array[Integer](99)),
+ row(8, 80L, "v", Array.empty[Integer])
+ )
+
+ @BeforeEach
+ override def before(): Unit = {
+ super.before()
+ val dataId = TestValuesTableFactory.registerData(rows)
+ tEnv.executeSql(s"""
+ |CREATE TABLE T (
+ | a INT,
+ | b BIGINT,
+ | c STRING,
+ | d ARRAY
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$dataId',
+ | 'filterable-fields' = 'a;b',
+ | 'bounded' = 'true'
+ |)
+ """.stripMargin)
+ }
+
+ @Test
+ def testFilterOnLeftPushedIntoSource(): Unit = {
+ // a > 5 should be pushed below the Correlate AND into the source scan
+ checkResult(
+ "SELECT a, s FROM T, UNNEST(d) AS T1(s) WHERE a > 5",
+ Seq(
+ row(6, 50),
+ row(6, 150),
+ row(7, 99)
+ ))
+ }
+
+ @Test
+ def testFilterOnRightStaysAtCorrelate(): Unit = {
+ // s < 100 stays on right side; source scan sees no predicate
+ checkResult(
+ "SELECT a, s FROM T, UNNEST(d) AS T1(s) WHERE s < 100",
+ Seq(
+ row(1, 1),
+ row(1, 2),
+ row(2, 3),
+ row(6, 50),
+ row(7, 99)
+ ))
+ }
+
+ @Test
+ def testMixedPredicateBothPushed(): Unit = {
+ // a > 5 to left/source, s < 100 to right side
+ checkResult(
+ "SELECT a, s FROM T, UNNEST(d) AS T1(s) WHERE a > 5 AND s < 100",
+ Seq(
+ row(6, 50),
+ row(7, 99)
+ ))
+ }
+
+ @Test
+ def testLeftJoinFilterOnLeftPushed(): Unit = {
+ // LEFT JOIN: left filter pushes safely AND null-padding is preserved for empty arrays.
+ // a > 5 selects rows {6, 7, 8}. Row a=8 has an empty array, so LEFT JOIN emits (8, NULL).
+ // The INNER counterpart (testFilterOnLeftPushedIntoSource) excludes a=8 entirely — that
+ // delta is what proves LEFT semantics survive the filter pushdown.
+ checkResult(
+ "SELECT a, s FROM T LEFT JOIN UNNEST(d) AS T1(s) ON TRUE WHERE a > 5",
+ Seq(
+ row(6, 50),
+ row(6, 150),
+ row(7, 99),
+ row(8, null)
+ ))
+ }
+
+ @Test
+ def testLeftJoinFilterOnRightStaysAbove(): Unit = {
+ // LEFT JOIN with right-side filter: must NOT push to right (would change null padding
+ // semantics for empty arrays). Verifies LEFT correlate semantics are preserved.
+ checkResult(
+ "SELECT a, s FROM T LEFT JOIN UNNEST(d) AS T1(s) ON TRUE WHERE s < 100",
+ Seq(
+ row(1, 1),
+ row(1, 2),
+ row(2, 3),
+ row(6, 50),
+ row(7, 99)
+ ))
+ }
+
+}