Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
Expand Down Expand Up @@ -61,9 +62,11 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -184,20 +187,39 @@ public Plan innerPlan() {

private static class FileKeyRangesRead implements InnerTableRead {

private static final Set<String> SCAN_PUSHDOWN_FIELDS = scanPushdownFields();

private static Set<String> scanPushdownFields() {
Set<String> fields = new HashSet<>();
fields.add("partition");
fields.add("bucket");
fields.add("level");
return Collections.unmodifiableSet(fields);
}

private final SchemaManager schemaManager;

private final FileStoreTable storeTable;

private RowType readType;

@Nullable private Predicate postFilter;

private FileKeyRangesRead(SchemaManager schemaManager, FileStoreTable fileStoreTable) {
this.schemaManager = schemaManager;
this.storeTable = fileStoreTable;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
// TODO
if (predicate == null) {
this.postFilter = null;
return this;
}
List<Predicate> remaining =
PredicateBuilder.excludePredicateWithFields(
PredicateBuilder.splitAnd(predicate), SCAN_PUSHDOWN_FIELDS);
this.postFilter = remaining.isEmpty() ? null : PredicateBuilder.and(remaining);
return this;
}

Expand Down Expand Up @@ -264,6 +286,11 @@ public RowDataToObjectArrayConverter apply(Long schemaId) {
file)));
}
Iterator<InternalRow> rows = Iterators.concat(iteratorList.iterator());

if (postFilter != null) {
rows = Iterators.filter(rows, postFilter::test);
}

if (readType != null) {
rows =
Iterators.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,15 @@ private class TagsRead implements InnerTableRead {
private final FileIO fileIO;
private RowType readType;

@Nullable private Predicate postFilter;

public TagsRead(FileIO fileIO) {
this.fileIO = fileIO;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
// TODO
this.postFilter = predicate;
return this;
}

Expand Down Expand Up @@ -272,6 +274,11 @@ public RecordReader<InternalRow> createReader(Split split) {

Iterator<InternalRow> rows =
Iterators.transform(nameToSnapshot.entrySet().iterator(), this::toRow);

if (postFilter != null) {
rows = Iterators.filter(rows, postFilter::test);
}

if (readType != null) {
rows =
Iterators.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,58 @@ public void testPartitionInFilter() throws Exception {
assertThat(hasPt2).isTrue();
}

@Test
public void testReadWithRecordCountPostFilter() throws Exception {
PredicateBuilder builder = new PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
assertThat(readPartBucketLevel(builder.greaterOrEqual(6, 1L))).isNotEmpty();
assertThat(readPartBucketLevel(builder.greaterThan(6, 100L))).isEmpty();
}

@Test
public void testReadWithSchemaIdPostFilter() throws Exception {
PredicateBuilder builder = new PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
List<String> baseline = readPartBucketLevel(null);
assertThat(baseline).isNotEmpty();
assertThat(readPartBucketLevel(builder.equal(4, 0L))).isEqualTo(baseline);
assertThat(readPartBucketLevel(builder.equal(4, 9999L))).isEmpty();
}

@Test
public void testReadWithCombinedScanAndPostFilter() throws Exception {
// partition is scan-pushdown, record_count is post-filter only.
PredicateBuilder builder = new PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
Predicate combined =
PredicateBuilder.and(
builder.equal(0, BinaryString.fromString("{1}")),
builder.greaterOrEqual(6, 1L));
List<String> rows = readPartBucketLevel(combined);
assertThat(rows).isNotEmpty();
for (String row : rows) {
assertThat(row).startsWith("{1}-");
}
Predicate combinedEmpty =
PredicateBuilder.and(
builder.equal(0, BinaryString.fromString("{1}")),
builder.greaterThan(6, 100L));
assertThat(readPartBucketLevel(combinedEmpty)).isEmpty();
}

@Test
public void testReadWithPartitionRangeScanPushdown() throws Exception {
write(table, GenericRow.of(3, 11, 50));

PredicateBuilder builder = new PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
List<String> rows =
readPartBucketLevel(builder.lessThan(0, BinaryString.fromString("{11}")));

assertThat(rows).isNotEmpty();
for (String row : rows) {
assertThat(row).doesNotStartWith("{11}-");
}
assertThat(rows.stream().anyMatch(r -> r.startsWith("{1}-"))).isTrue();
assertThat(rows.stream().anyMatch(r -> r.startsWith("{2}-"))).isTrue();
}

private List<String> readPartBucketLevel(Predicate predicate) throws IOException {
ReadBuilder rb = fileKeyRangesTable.newReadBuilder();
if (predicate != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.DateTimeUtils;
Expand All @@ -37,8 +41,10 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand All @@ -49,6 +55,7 @@
class TagsTableTest extends TableTestBase {

private static final String tableName = "MyTable";
private FileStoreTable table;
private TagsTable tagsTable;
private TagManager tagManager;

Expand All @@ -66,7 +73,7 @@ void before() throws Exception {
.option("tag.num-retained-max", "3")
.build();
catalog.createTable(identifier, schema, true);
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
table = (FileStoreTable) catalog.getTable(identifier);
TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
commit.commit(
new ManifestCommittable(
Expand All @@ -90,6 +97,85 @@ void testTagsTable() throws Exception {
assertThat(result).containsExactlyElementsOf(expectRow);
}

@Test
void testReadWithTagNameEqualFilter() throws Exception {
table.createTag("tag-a");
table.createTag("tag-b");
table.createTag("tag-c");

PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE);
assertThat(readTagNames(builder.equal(0, BinaryString.fromString("tag-b"))))
.containsExactly("tag-b");

assertThat(readTagNames(builder.equal(0, BinaryString.fromString("missing")))).isEmpty();
}

@Test
void testReadWithTagNameInFilter() throws Exception {
table.createTag("tag-a");
table.createTag("tag-b");
table.createTag("tag-c");

PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE);
assertThat(
readTagNames(
builder.in(
0,
Arrays.asList(
(Object) BinaryString.fromString("tag-a"),
BinaryString.fromString("tag-c")))))
.containsExactlyInAnyOrder("tag-a", "tag-c");
}

@Test
void testReadWithTagNameNotEqualFilter() throws Exception {
table.createTag("tag-a");
table.createTag("tag-b");
table.createTag("tag-c");

PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE);
List<String> rows = readTagNames(builder.notEqual(0, BinaryString.fromString("tag-b")));
assertThat(rows).contains("tag-a", "tag-c");
assertThat(rows).doesNotContain("tag-b");
}

@Test
void testReadWithNonTagNameFieldFilter() throws Exception {
table.createTag("tag-a");
table.createTag("tag-b");

PredicateBuilder builder = new PredicateBuilder(TagsTable.TABLE_TYPE);
long maxSnapshotId =
tagManager.tagObjects().stream().mapToLong(p -> p.getKey().id()).max().orElse(0L);
assertThat(readTagNames(builder.greaterOrEqual(1, maxSnapshotId))).isNotEmpty();
assertThat(readTagNames(builder.greaterThan(1, maxSnapshotId))).isEmpty();
}

@Test
void testReadWithNullFilterReturnsAll() throws Exception {
table.createTag("tag-a");
table.createTag("tag-b");

List<String> all =
tagManager.tagObjects().stream()
.map(Pair::getValue)
.collect(java.util.stream.Collectors.toList());
assertThat(readTagNames(null)).containsExactlyInAnyOrderElementsOf(all);
}

private List<String> readTagNames(Predicate predicate) throws IOException {
ReadBuilder readBuilder = tagsTable.newReadBuilder();
if (predicate != null) {
readBuilder = readBuilder.withFilter(predicate);
}
List<String> names = new ArrayList<>();
try (RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(readBuilder.newScan().plan())) {
reader.forEachRemaining(row -> names.add(row.getString(0).toString()));
}
return names;
}

private List<InternalRow> getExpectedResult() {
Map<String, InternalRow> tagToRows = new TreeMap<>();
for (Pair<Tag, String> snapshot : tagManager.tagObjects()) {
Expand Down