Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
tableName,
commitUser,
partitionType,
schemaManager,
pathFactory(),
newKeyComparator(),
bucketMode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ public ExpireFileEntry(
@Nullable FileSource fileSource,
@Nullable String externalPath,
long rowCount,
@Nullable Long firstRowId) {
@Nullable Long firstRowId,
long schemaId,
@Nullable List<String> writeCols) {
super(
kind,
partition,
Expand All @@ -59,7 +61,9 @@ public ExpireFileEntry(
maxKey,
externalPath,
rowCount,
firstRowId);
firstRowId,
schemaId,
writeCols);
this.fileSource = fileSource;
}

Expand All @@ -82,7 +86,9 @@ public static ExpireFileEntry from(ManifestEntry entry) {
entry.file().fileSource().orElse(null),
entry.externalPath(),
entry.rowCount(),
entry.firstRowId());
entry.firstRowId(),
entry.file().schemaId(),
entry.file().writeCols());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public interface FileEntry {
@Nullable
Long firstRowId();

long schemaId();

@Nullable
List<String> writeCols();

/**
* The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data
* file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ static ManifestEntry create(

DataFileMeta file();

@Override
default long schemaId() {
return file().schemaId();
}

@Nullable
@Override
default List<String> writeCols() {
return file().writeCols();
}

ManifestEntry copyWithoutStats();

ManifestEntry assignSequenceNumber(long minSequenceNumber, long maxSequenceNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class SimpleFileEntry implements FileEntry {
@Nullable private final String externalPath;
private final long rowCount;
@Nullable private final Long firstRowId;
private final long schemaId;
@Nullable private final List<String> writeCols;

public SimpleFileEntry(
FileKind kind,
Expand All @@ -59,7 +61,9 @@ public SimpleFileEntry(
BinaryRow maxKey,
@Nullable String externalPath,
long rowCount,
@Nullable Long firstRowId) {
@Nullable Long firstRowId,
long schemaId,
@Nullable List<String> writeCols) {
this.kind = kind;
this.partition = partition;
this.bucket = bucket;
Expand All @@ -73,6 +77,8 @@ public SimpleFileEntry(
this.externalPath = externalPath;
this.rowCount = rowCount;
this.firstRowId = firstRowId;
this.schemaId = schemaId;
this.writeCols = writeCols;
}

public static SimpleFileEntry from(ManifestEntry entry) {
Expand All @@ -89,7 +95,9 @@ public static SimpleFileEntry from(ManifestEntry entry) {
entry.maxKey(),
entry.externalPath(),
entry.file().rowCount(),
entry.firstRowId());
entry.firstRowId(),
entry.file().schemaId(),
entry.file().writeCols());
}

public SimpleFileEntry toDelete() {
Expand All @@ -106,7 +114,9 @@ public SimpleFileEntry toDelete() {
maxKey,
externalPath,
rowCount,
firstRowId);
firstRowId,
schemaId,
writeCols);
}

public static List<SimpleFileEntry> from(List<ManifestEntry> entries) {
Expand Down Expand Up @@ -185,6 +195,17 @@ public long rowCount() {
return firstRowId;
}

@Override
public long schemaId() {
return schemaId;
}

@Nullable
@Override
public List<String> writeCols() {
return writeCols;
}

public long nonNullFirstRowId() {
Long firstRowId = firstRowId();
checkArgument(firstRowId != null, "First row id of '%s' should not be null.", fileName());
Expand Down Expand Up @@ -216,7 +237,9 @@ public boolean equals(Object o) {
&& Objects.equals(maxKey, that.maxKey)
&& Objects.equals(externalPath, that.externalPath)
&& rowCount == that.rowCount
&& Objects.equals(firstRowId, that.firstRowId);
&& Objects.equals(firstRowId, that.firstRowId)
&& schemaId == that.schemaId
&& Objects.equals(writeCols, that.writeCols);
}

@Override
Expand All @@ -233,7 +256,9 @@ public int hashCode() {
maxKey,
externalPath,
rowCount,
firstRowId);
firstRowId,
schemaId,
writeCols);
}

@Override
Expand Down Expand Up @@ -263,6 +288,10 @@ public String toString() {
+ rowCount
+ ", firstRowId="
+ firstRowId
+ ", schemaId="
+ schemaId
+ ", writeCols="
+ writeCols
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public SimpleFileEntryWithDV(SimpleFileEntry entry, @Nullable String dvFileName)
entry.maxKey(),
entry.externalPath(),
entry.rowCount(),
entry.firstRowId());
entry.firstRowId(),
entry.schemaId(),
entry.writeCols());
this.dvFileName = dvFileName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.manifest.SimpleFileEntryWithDV;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RangeHelper;
import org.apache.paimon.utils.SnapshotManager;

Expand All @@ -64,6 +64,7 @@
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions;
import static org.apache.paimon.types.VectorType.isVectorStoreFile;
import static org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
import static org.apache.paimon.utils.Preconditions.checkState;

Expand All @@ -76,6 +77,7 @@ public class ConflictDetection {
private final String tableName;
private final String commitUser;
private final RowType partitionType;
private final SchemaManager schemaManager;
private final FileStorePathFactory pathFactory;
private final @Nullable Comparator<InternalRow> keyComparator;
private final BucketMode bucketMode;
Expand All @@ -100,6 +102,7 @@ public ConflictDetection(
String tableName,
String commitUser,
RowType partitionType,
SchemaManager schemaManager,
FileStorePathFactory pathFactory,
@Nullable Comparator<InternalRow> keyComparator,
BucketMode bucketMode,
Expand All @@ -112,6 +115,7 @@ public ConflictDetection(
this.tableName = tableName;
this.commitUser = commitUser;
this.partitionType = partitionType;
this.schemaManager = schemaManager;
this.pathFactory = pathFactory;
this.keyComparator = keyComparator;
this.bucketMode = bucketMode;
Expand Down Expand Up @@ -473,7 +477,7 @@ private Optional<RuntimeException> checkRowIdRangeConflicts(
for (List<SimpleFileEntry> group : merged) {
List<SimpleFileEntry> dataFiles = new ArrayList<>();
for (SimpleFileEntry f : group) {
if (!isBlobFile(f.fileName())) {
if (!dedicatedStorageFile(f.fileName())) {
dataFiles.add(f);
}
}
Expand All @@ -500,14 +504,10 @@ private Optional<RuntimeException> checkForRowIdFromSnapshot(
}

List<BinaryRow> changedPartitions = changedPartitions(deltaEntries, deltaIndexEntries);
// collect history row id ranges
List<Range> historyIdRanges = new ArrayList<>();
for (SimpleFileEntry entry : deltaEntries) {
Long firstRowId = entry.firstRowId();
long rowCount = entry.rowCount();
if (firstRowId != null) {
historyIdRanges.add(new Range(firstRowId, firstRowId + rowCount - 1));
}
RowIdColumnConflictChecker columnChecker =
RowIdColumnConflictChecker.fromDeltaEntries(schemaManager, deltaEntries);
if (columnChecker.isEmpty()) {
return Optional.empty();
}

// check history row id ranges
Expand All @@ -525,23 +525,24 @@ private Optional<RuntimeException> checkForRowIdFromSnapshot(
commitScanner.readIncrementalEntries(snapshot, changedPartitions);
for (ManifestEntry entry : changes) {
DataFileMeta file = entry.file();
Range fileRange = file.nonNullRowIdRange();
if (fileRange.from < checkNextRowId) {
for (Range range : historyIdRanges) {
if (range.hasIntersection(fileRange)) {
return Optional.of(
new RuntimeException(
"For Data Evolution table, multiple 'MERGE INTO' operations have encountered conflicts,"
+ " updating the same file, which can render some updates ineffective."));
}
}
if (file.firstRowId() != null
&& file.nonNullRowIdRange().from < checkNextRowId
&& columnChecker.conflictsWith(entry)) {
return Optional.of(
new RuntimeException(
"For Data Evolution table, multiple 'MERGE INTO' operations have encountered conflicts,"
+ " updating the same file, which can render some updates ineffective."));
}
}
}

return Optional.empty();
}

private static boolean dedicatedStorageFile(String fileName) {
return isBlobFile(fileName) || isVectorStoreFile(fileName);
}

static List<SimpleFileEntry> buildBaseEntriesWithDV(
List<SimpleFileEntry> baseEntries, List<IndexManifestEntry> baseIndexEntries) {
if (baseEntries.isEmpty()) {
Expand Down
Loading
Loading