From 9235da30fca4ff64a0a1a16fd996e6ce213f3a0e Mon Sep 17 00:00:00 2001 From: yehe Date: Thu, 14 May 2026 10:57:25 +0800 Subject: [PATCH] [core] Implement withFilter for TagsTable and FileKeyRangesTable Replace the placeholder TODO in TagsRead#withFilter and FileKeyRangesRead#withFilter with a read-side post-filter that evaluates the predicate on the materialized row before projection. For FileKeyRangesRead, partition / bucket / level are stripped out before storing the post-filter because they are already pushed down by FileKeyRangesScan; re-evaluating them on the materialized row would compare values of incompatible representations (e.g. a String literal vs the BinaryString in the row), mirroring the FilesTable behaviour introduced in #7791. Adds tests covering equal / in / not-equal / non-pushdown column / combined pushdown + post-filter scenarios. --- .../table/system/FileKeyRangesTable.java | 29 +++++- .../apache/paimon/table/system/TagsTable.java | 9 +- .../table/system/FileKeyRangesTableTest.java | 52 +++++++++++ .../paimon/table/system/TagsTableTest.java | 88 ++++++++++++++++++- 4 files changed, 175 insertions(+), 3 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java index 095dcda74a82..39e88f6e9afc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java @@ -30,6 +30,7 @@ import org.apache.paimon.predicate.LeafPredicate; import org.apache.paimon.predicate.LeafPredicateExtractor; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -61,9 +62,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -184,12 +187,24 @@ public Plan innerPlan() { private static class FileKeyRangesRead implements InnerTableRead { + private static final Set SCAN_PUSHDOWN_FIELDS = scanPushdownFields(); + + private static Set scanPushdownFields() { + Set fields = new HashSet<>(); + fields.add("partition"); + fields.add("bucket"); + fields.add("level"); + return Collections.unmodifiableSet(fields); + } + private final SchemaManager schemaManager; private final FileStoreTable storeTable; private RowType readType; + @Nullable private Predicate postFilter; + private FileKeyRangesRead(SchemaManager schemaManager, FileStoreTable fileStoreTable) { this.schemaManager = schemaManager; this.storeTable = fileStoreTable; @@ -197,7 +212,14 @@ private FileKeyRangesRead(SchemaManager schemaManager, FileStoreTable fileStoreT @Override public InnerTableRead withFilter(Predicate predicate) { - // TODO + if (predicate == null) { + this.postFilter = null; + return this; + } + List remaining = + PredicateBuilder.excludePredicateWithFields( + PredicateBuilder.splitAnd(predicate), SCAN_PUSHDOWN_FIELDS); + this.postFilter = remaining.isEmpty() ? null : PredicateBuilder.and(remaining); return this; } @@ -264,6 +286,11 @@ public RowDataToObjectArrayConverter apply(Long schemaId) { file))); } Iterator rows = Iterators.concat(iteratorList.iterator()); + + if (postFilter != null) { + rows = Iterators.filter(rows, postFilter::test); + } + if (readType != null) { rows = Iterators.transform( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index 0dd22e79be29..f5a178998877 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -201,13 +201,15 @@ private class TagsRead implements InnerTableRead { private final FileIO fileIO; private RowType readType; + @Nullable private Predicate postFilter; + public TagsRead(FileIO fileIO) { this.fileIO = fileIO; } @Override public InnerTableRead withFilter(Predicate predicate) { - // TODO + this.postFilter = predicate; return this; } @@ -272,6 +274,11 @@ public RecordReader createReader(Split split) { Iterator rows = Iterators.transform(nameToSnapshot.entrySet().iterator(), this::toRow); + + if (postFilter != null) { + rows = Iterators.filter(rows, postFilter::test); + } + if (readType != null) { rows = Iterators.transform( diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java index ce675e2aaefa..de00e33b3062 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java @@ -183,6 +183,58 @@ public void testPartitionInFilter() throws Exception { assertThat(hasPt2).isTrue(); } + @Test + public void testReadWithRecordCountPostFilter() throws Exception { + PredicateBuilder builder = new PredicateBuilder(FileKeyRangesTable.TABLE_TYPE); + assertThat(readPartBucketLevel(builder.greaterOrEqual(6, 1L))).isNotEmpty(); + assertThat(readPartBucketLevel(builder.greaterThan(6, 100L))).isEmpty(); + } + + @Test + public void testReadWithSchemaIdPostFilter() throws Exception { + PredicateBuilder builder = new PredicateBuilder(FileKeyRangesTable.TABLE_TYPE); + List baseline = readPartBucketLevel(null); + assertThat(baseline).isNotEmpty(); + assertThat(readPartBucketLevel(builder.equal(4, 0L))).isEqualTo(baseline); + assertThat(readPartBucketLevel(builder.equal(4, 9999L))).isEmpty(); + } + + @Test + public void testReadWithCombinedScanAndPostFilter() throws Exception { + // partition is scan-pushdown, record_count is post-filter only. + PredicateBuilder builder = new PredicateBuilder(FileKeyRangesTable.TABLE_TYPE); + Predicate combined = + PredicateBuilder.and( + builder.equal(0, BinaryString.fromString("{1}")), + builder.greaterOrEqual(6, 1L)); + List rows = readPartBucketLevel(combined); + assertThat(rows).isNotEmpty(); + for (String row : rows) { + assertThat(row).startsWith("{1}-"); + } + Predicate combinedEmpty = + PredicateBuilder.and( + builder.equal(0, BinaryString.fromString("{1}")), + builder.greaterThan(6, 100L)); + assertThat(readPartBucketLevel(combinedEmpty)).isEmpty(); + } + + @Test + public void testReadWithPartitionRangeScanPushdown() throws Exception { + write(table, GenericRow.of(3, 11, 50)); + + PredicateBuilder builder = new PredicateBuilder(FileKeyRangesTable.TABLE_TYPE); + List rows = + readPartBucketLevel(builder.lessThan(0, BinaryString.fromString("{11}"))); + + assertThat(rows).isNotEmpty(); + for (String row : rows) { + assertThat(row).doesNotStartWith("{11}-"); + } + assertThat(rows.stream().anyMatch(r -> r.startsWith("{1}-"))).isTrue(); + assertThat(rows.stream().anyMatch(r -> r.startsWith("{2}-"))).isTrue(); + } + private List readPartBucketLevel(Predicate predicate) throws IOException { ReadBuilder rb = fileKeyRangesTable.newReadBuilder(); if (predicate != null) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java index 7ee8cd53d75c..31bc9401ea8c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java @@ -24,10 +24,14 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.tag.Tag; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.DateTimeUtils; @@ -37,8 +41,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -49,6 +55,7 @@ class TagsTableTest extends TableTestBase { private static final String tableName = "MyTable"; + private FileStoreTable table; private TagsTable tagsTable; private TagManager tagManager; @@ -66,7 +73,7 @@ void before() throws Exception { .option("tag.num-retained-max", "3") .build(); catalog.createTable(identifier, schema, true); - FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); + table = (FileStoreTable) catalog.getTable(identifier); TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false); commit.commit( new ManifestCommittable( @@ -90,6 +97,85 @@ void testTagsTable() throws Exception { assertThat(result).containsExactlyElementsOf(expectRow); } + @Test + void testReadWithTagNameEqualFilter() throws Exception { + table.createTag("tag-a"); + table.createTag("tag-b"); + table.createTag("tag-c"); + + PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE); + assertThat(readTagNames(builder.equal(0, BinaryString.fromString("tag-b")))) + .containsExactly("tag-b"); + + assertThat(readTagNames(builder.equal(0, BinaryString.fromString("missing")))).isEmpty(); + } + + @Test + void testReadWithTagNameInFilter() throws Exception { + table.createTag("tag-a"); + table.createTag("tag-b"); + table.createTag("tag-c"); + + PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE); + assertThat( + readTagNames( + builder.in( + 0, + Arrays.asList( + (Object) BinaryString.fromString("tag-a"), + BinaryString.fromString("tag-c"))))) + .containsExactlyInAnyOrder("tag-a", "tag-c"); + } + + @Test + void testReadWithTagNameNotEqualFilter() throws Exception { + table.createTag("tag-a"); + table.createTag("tag-b"); + table.createTag("tag-c"); + + PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE); + List rows = readTagNames(builder.notEqual(0, BinaryString.fromString("tag-b"))); + assertThat(rows).contains("tag-a", "tag-c"); + assertThat(rows).doesNotContain("tag-b"); + } + + @Test + void testReadWithNonTagNameFieldFilter() throws Exception { + table.createTag("tag-a"); + table.createTag("tag-b"); + + PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE); + long maxSnapshotId = + tagManager.tagObjects().stream().mapToLong(p -> p.getKey().id()).max().orElse(0L); + assertThat(readTagNames(builder.greaterOrEqual(1, maxSnapshotId))).isNotEmpty(); + assertThat(readTagNames(builder.greaterThan(1, maxSnapshotId))).isEmpty(); + } + + @Test + void testReadWithNullFilterReturnsAll() throws Exception { + table.createTag("tag-a"); + table.createTag("tag-b"); + + List all = + tagManager.tagObjects().stream() + .map(Pair::getValue) + .collect(java.util.stream.Collectors.toList()); + assertThat(readTagNames(null)).containsExactlyInAnyOrderElementsOf(all); + } + + private List readTagNames(Predicate predicate) throws IOException { + ReadBuilder readBuilder = tagsTable.newReadBuilder(); + if (predicate != null) { + readBuilder = readBuilder.withFilter(predicate); + } + List names = new ArrayList<>(); + try (RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan())) { + reader.forEachRemaining(row -> names.add(row.getString(0).toString())); + } + return names; + } + private List getExpectedResult() { Map tagToRows = new TreeMap<>(); for (Pair snapshot : tagManager.tagObjects()) {