diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index c3575cd37d8b..313390e3d1a1 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -879,6 +879,12 @@
Boolean |
For DELETE manifest entry in manifest file, drop stats to reduce memory and storage. Default value is false only for compatibility of old reader. |
+
+ manifest.delta.sorted |
+ true |
+ Boolean |
+ Whether to sort ManifestEntry by partition when writing manifest delta. |
+
manifest.format |
"avro" |
@@ -897,6 +903,24 @@
Integer |
To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge. |
+
+ manifest.merge.sort-on-commit |
+ false |
+ Boolean |
+ Whether to sort ManifestEntry by partition during manifest merge in commit. This option does not affect manifest compaction. |
+
+
+ manifest.merge.sort.buffer |
+ 256 mb |
+ MemorySize |
+ Amount of data to build up in memory for sorting during manifest full compaction before spilling to disk. |
+
+
+ manifest.merge.sorted |
+ true |
+ Boolean |
+ Whether to sort ManifestEntry by partition during manifest full compaction. |
+
manifest.target-file-size |
8 mb |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e15a86d8ff2f..a29e979548d6 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -476,6 +476,28 @@ public InlineElement getDescription() {
.withDescription(
"Define upsert key to do MERGE INTO when executing INSERT INTO, cannot be defined with primary key.");
+ public static final ConfigOption MANIFEST_MERGE_SORTED =
+ key("manifest.merge.sorted")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to sort ManifestEntry by partition during manifest full compaction.");
+
+ public static final ConfigOption MANIFEST_MERGE_SORT_ON_COMMIT =
+ key("manifest.merge.sort-on-commit")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to sort ManifestEntry by partition during manifest merge in commit. "
+ + "This option does not affect manifest compaction.");
+
+ public static final ConfigOption MANIFEST_DELTA_SORTED =
+ key("manifest.delta.sorted")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to sort ManifestEntry by partition when writing manifest delta.");
+
public static final ConfigOption PARTITION_DEFAULT_NAME =
key("partition.default-name")
.stringType()
@@ -661,6 +683,13 @@ public InlineElement getDescription() {
.withDescription(
"Amount of data to build up in memory before converting to a sorted on-disk file.");
+ public static final ConfigOption MANIFEST_MERGE_SORT_BUFFER =
+ key("manifest.merge.sort.buffer")
+ .memoryType()
+ .defaultValue(WRITE_BUFFER_SIZE.defaultValue())
+ .withDescription(
+ "Amount of data to build up in memory for sorting during manifest full compaction before spilling to disk.");
+
@Documentation.OverrideDefault("infinite")
public static final ConfigOption WRITE_BUFFER_MAX_DISK_SIZE =
key("write-buffer-spill.max-disk-size")
@@ -2826,6 +2855,22 @@ public int manifestMergeMinCount() {
return options.get(MANIFEST_MERGE_MIN_COUNT);
}
+ public boolean manifestMergeSorted() {
+ return options.get(MANIFEST_MERGE_SORTED);
+ }
+
+ public boolean manifestMergeSortOnCommit() {
+ return options.get(MANIFEST_MERGE_SORT_ON_COMMIT);
+ }
+
+ public boolean manifestDeltaSorted() {
+ return options.get(MANIFEST_DELTA_SORTED);
+ }
+
+ public long manifestMergeSortBufferSize() {
+ return options.get(MANIFEST_MERGE_SORT_BUFFER).getBytes();
+ }
+
public MergeEngine mergeEngine() {
return options.get(MERGE_ENGINE);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index ffdfe8523d94..31e292871124 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -960,7 +960,9 @@ CommitResult tryCommitOnce(
options.manifestMergeMinCount(),
options.manifestFullCompactionThresholdSize().getBytes(),
partitionType,
- options.scanManifestParallelism());
+ options.scanManifestParallelism(),
+ options.manifestMergeSorted() && options.manifestMergeSortOnCommit(),
+ options.manifestMergeSortBufferSize());
baseManifestList = manifestList.write(mergeAfterManifests);
if (options.rowTrackingEnabled()) {
@@ -985,8 +987,14 @@ CommitResult tryCommitOnce(
// write new delta files into manifest files
deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles));
- deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));
-
+ List deltaFilesForWrite = deltaFiles;
+ if (options.manifestDeltaSorted() && deltaFiles.size() > 1) {
+ deltaFilesForWrite = new ArrayList<>(deltaFiles);
+ deltaFilesForWrite.sort(
+ ManifestFileMerger.createManifestEntryComparator(partitionType));
+ }
+ List deltaManifests = manifestFile.write(deltaFilesForWrite);
+ deltaManifestList = manifestList.write(deltaManifests);
// write changelog into manifest files
if (!changelogFiles.isEmpty()) {
changelogManifestList = manifestList.write(manifestFile.write(changelogFiles));
@@ -1189,7 +1197,9 @@ private boolean compactManifestOnce() {
1,
1,
partitionType,
- options.scanManifestParallelism());
+ options.scanManifestParallelism(),
+ options.manifestMergeSorted(),
+ options.manifestMergeSortBufferSize());
if (new HashSet<>(mergeBeforeManifests).equals(new HashSet<>(mergeAfterManifests))) {
// no need to commit this snapshot, because no compact were happened
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
index cdcad1ed3e84..7a3b1f399cc0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
@@ -18,15 +18,36 @@
package org.apache.paimon.operation;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.NormalizedKeyComputer;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.sort.BinaryInMemorySortBuffer;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.InternalRowUtils;
+import org.apache.paimon.utils.MutableObjectIterator;
+import org.apache.paimon.utils.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +55,7 @@
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -42,10 +64,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.function.Function;
-import static java.util.Collections.singletonList;
-import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
+import static org.apache.paimon.codegen.CodeGenUtils.newRecordComparator;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Util for merging manifest files. */
@@ -66,7 +86,30 @@ public static List merge(
int suggestedMinMetaCount,
long manifestFullCompactionSize,
RowType partitionType,
- @Nullable Integer manifestReadParallelism) {
+ @Nullable Integer manifestReadParallelism,
+ boolean manifestMergeSorted) {
+ return merge(
+ input,
+ manifestFile,
+ suggestedMetaSize,
+ suggestedMinMetaCount,
+ manifestFullCompactionSize,
+ partitionType,
+ manifestReadParallelism,
+ manifestMergeSorted,
+ CoreOptions.MANIFEST_MERGE_SORT_BUFFER.defaultValue().getBytes());
+ }
+
+ public static List merge(
+ List input,
+ ManifestFile manifestFile,
+ long suggestedMetaSize,
+ int suggestedMinMetaCount,
+ long manifestFullCompactionSize,
+ RowType partitionType,
+ @Nullable Integer manifestReadParallelism,
+ boolean manifestMergeSorted,
+ long manifestMergeSortBufferSize) {
// these are the newly created manifest files, clean them up if exception occurs
List newFilesForAbort = new ArrayList<>();
@@ -79,7 +122,9 @@ public static List merge(
suggestedMetaSize,
manifestFullCompactionSize,
partitionType,
- manifestReadParallelism);
+ manifestReadParallelism,
+ manifestMergeSorted,
+ manifestMergeSortBufferSize);
return fullCompacted.orElseGet(
() ->
tryMinorCompaction(
@@ -162,7 +207,31 @@ public static Optional> tryFullCompaction(
long suggestedMetaSize,
long sizeTrigger,
RowType partitionType,
- @Nullable Integer manifestReadParallelism)
+ @Nullable Integer manifestReadParallelism,
+ boolean manifestMergeSorted)
+ throws Exception {
+ return tryFullCompaction(
+ inputs,
+ newFilesForAbort,
+ manifestFile,
+ suggestedMetaSize,
+ sizeTrigger,
+ partitionType,
+ manifestReadParallelism,
+ manifestMergeSorted,
+ CoreOptions.MANIFEST_MERGE_SORT_BUFFER.defaultValue().getBytes());
+ }
+
+ public static Optional> tryFullCompaction(
+ List inputs,
+ List newFilesForAbort,
+ ManifestFile manifestFile,
+ long suggestedMetaSize,
+ long sizeTrigger,
+ RowType partitionType,
+ @Nullable Integer manifestReadParallelism,
+ boolean manifestMergeSorted,
+ long manifestMergeSortBufferSize)
throws Exception {
checkArgument(sizeTrigger > 0, "Manifest full compaction size trigger cannot be zero.");
@@ -241,20 +310,29 @@ public static Optional> tryFullCompaction(
RollingFileWriter writer =
manifestFile.createRollingWriter();
- Function> reader =
- file ->
- singletonList(
- readForFullCompaction(
- file, manifestFile, mustChange, deleteEntries));
Exception exception = null;
+ int actualRewriteCount;
try {
- for (FullCompactionReadResult readResult :
- sequentialBatchedExecute(reader, toBeMerged, manifestReadParallelism)) {
- if (readResult.requireChange) {
- writer.write(readResult.entries);
- } else {
- result.add(readResult.file);
- }
+ if (manifestMergeSorted) {
+ actualRewriteCount =
+ mergeSortedByPartition(
+ toBeMerged,
+ mustChange,
+ deleteEntries,
+ manifestFile,
+ partitionType,
+ manifestMergeSortBufferSize,
+ writer,
+ result);
+ } else {
+ actualRewriteCount =
+ mergeUnsorted(
+ toBeMerged,
+ mustChange,
+ deleteEntries,
+ manifestFile,
+ writer,
+ result);
}
} catch (Exception e) {
exception = e;
@@ -272,27 +350,276 @@ public static Optional> tryFullCompaction(
return Optional.of(result);
}
- private static FullCompactionReadResult readForFullCompaction(
- ManifestFileMeta file,
- ManifestFile manifestFile,
+ private static final NormalizedKeyComputer NO_NORMALIZED_KEY_COMPUTER =
+ new NormalizedKeyComputer() {
+ @Override
+ public void putKey(InternalRow record, MemorySegment target, int offset) {
+ // no-op
+ }
+
+ @Override
+ public int compareKey(
+ MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ) {
+ return 0;
+ }
+
+ @Override
+ public void swapKey(
+ MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ) {
+ // no-op
+ }
+
+ @Override
+ public int getNumKeyBytes() {
+ return 0;
+ }
+
+ @Override
+ public boolean isKeyFullyDetermines() {
+ return false;
+ }
+
+ @Override
+ public boolean invertKey() {
+ return false;
+ }
+ };
+
+ private static int mergeUnsorted(
+ List toBeMerged,
Filter mustChange,
- Set deleteEntries) {
- List entries = new ArrayList<>();
- boolean requireChange = mustChange.test(file);
- for (ManifestEntry entry :
- manifestFile.read(
- file.fileName(),
- file.fileSize(),
- FileEntry.addFilter(),
- Filter.alwaysTrue())) {
- if (deleteEntries.contains(entry.identifier())) {
- requireChange = true;
+ Set deleteEntries,
+ ManifestFile manifestFile,
+ RollingFileWriter writer,
+ List result)
+ throws Exception {
+ int actualRewriteCount = 0;
+ for (ManifestFileMeta file : toBeMerged) {
+ List entries = new ArrayList<>();
+ boolean requireChange = mustChange.test(file);
+ for (ManifestEntry entry : manifestFile.read(file.fileName(), file.fileSize())) {
+ if (entry.kind() == FileKind.DELETE) {
+ continue;
+ }
+
+ if (deleteEntries.contains(entry.identifier())) {
+ requireChange = true;
+ } else {
+ entries.add(entry);
+ }
+ }
+
+ if (requireChange) {
+ writer.write(entries);
+ actualRewriteCount++;
} else {
- entries.add(entry);
+ result.add(file);
}
}
+ return actualRewriteCount;
+ }
+
+ private static int mergeSortedByPartition(
+ List toBeMerged,
+ Filter mustChange,
+ Set deleteEntries,
+ ManifestFile manifestFile,
+ RowType partitionType,
+ long manifestMergeSortBufferSize,
+ RollingFileWriter writer,
+ List result)
+ throws Exception {
+ IOManager ioManager = null;
+ BinaryExternalSortBuffer sortBuffer = null;
+ RowType sortRowType = null;
+ ManifestEntrySerializer entrySerializer = new ManifestEntrySerializer();
+ int actualRewriteCount = 0;
+
+ try {
+ for (ManifestFileMeta file : toBeMerged) {
+ List entries = new ArrayList<>();
+ boolean requireChange = mustChange.test(file);
+ for (ManifestEntry entry : manifestFile.read(file.fileName(), file.fileSize())) {
+ if (entry.kind() == FileKind.DELETE) {
+ continue;
+ }
+
+ if (deleteEntries.contains(entry.identifier())) {
+ requireChange = true;
+ } else {
+ entries.add(entry);
+ }
+ }
+
+ if (requireChange) {
+ if (sortBuffer == null) {
+ sortRowType = manifestEntrySortRowType(partitionType);
+ ioManager = IOManager.create(System.getProperty("java.io.tmpdir"));
+ sortBuffer =
+ createManifestEntrySortBuffer(
+ ioManager,
+ sortRowType,
+ partitionType,
+ manifestMergeSortBufferSize);
+ }
+ for (ManifestEntry entry : entries) {
+ GenericRow row = new GenericRow(4);
+ row.setField(0, entry.partition());
+ row.setField(1, entry.bucket());
+ row.setField(2, entry.level());
+ row.setField(3, entrySerializer.serializeToBytes(entry));
+ sortBuffer.write(row);
+ }
+ actualRewriteCount++;
+ } else {
+ result.add(file);
+ }
+ }
+
+ if (sortBuffer != null) {
+ MutableObjectIterator iterator = sortBuffer.sortedIterator();
+ BinaryRow reuse = new BinaryRow(sortRowType.getFieldCount());
+ BinaryRow next;
+ while ((next = iterator.next(reuse)) != null) {
+ ManifestEntry entry = entrySerializer.deserializeFromBytes(next.getBinary(3));
+ writer.write(entry);
+ }
+ }
+
+ return actualRewriteCount;
+ } finally {
+ if (sortBuffer != null) {
+ sortBuffer.clear();
+ }
+ if (ioManager != null) {
+ ioManager.close();
+ }
+ }
+ }
+
+ private static BinaryExternalSortBuffer createManifestEntrySortBuffer(
+ IOManager ioManager,
+ RowType sortRowType,
+ RowType partitionType,
+ long manifestMergeSortBufferSize) {
+ int pageSize = (int) CoreOptions.PAGE_SIZE.defaultValue().getBytes();
+ long minBufferSize = 3L * pageSize;
+ checkArgument(
+ manifestMergeSortBufferSize >= minBufferSize,
+ "Manifest merge sort buffer must be at least three pages (" + minBufferSize + ")");
+
+ RecordComparator partitionRmpr = null;
+ if (partitionType.getFieldCount() > 0) {
+ partitionRmpr = createPartitionRecordComparator(partitionType);
+ }
+ RecordComparator partitionComparator = partitionRmpr;
+
+ RecordComparator comparator =
+ (a, b) -> {
+ if (partitionComparator != null) {
+ int cmp =
+ partitionComparator.compare(
+ a.getRow(0, partitionType.getFieldCount()),
+ b.getRow(0, partitionType.getFieldCount()));
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ int cmp = Integer.compare(a.getInt(1), b.getInt(1));
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ return Integer.compare(a.getInt(2), b.getInt(2));
+ };
+
+ MemorySegmentPool memoryPool =
+ new HeapMemorySegmentPool(manifestMergeSortBufferSize, pageSize);
+ InternalRowSerializer serializer = new InternalRowSerializer(sortRowType);
+ BinaryInMemorySortBuffer inMemorySortBuffer =
+ BinaryInMemorySortBuffer.createBuffer(
+ NO_NORMALIZED_KEY_COMPUTER, serializer, comparator, memoryPool);
+
+ return new BinaryExternalSortBuffer(
+ new BinaryRowSerializer(sortRowType.getFieldCount()),
+ comparator,
+ memoryPool.pageSize(),
+ inMemorySortBuffer,
+ ioManager,
+ CoreOptions.LOCAL_SORT_MAX_NUM_FILE_HANDLES.defaultValue(),
+ CompressOptions.defaultOptions(),
+ CoreOptions.WRITE_BUFFER_MAX_DISK_SIZE.defaultValue());
+ }
+
+ private static RowType manifestEntrySortRowType(RowType partitionType) {
+ return RowType.of(
+ partitionType,
+ new IntType(false),
+ new IntType(false),
+ SerializationUtils.newBytesType(false));
+ }
+
+ private static RecordComparator createPartitionRecordComparator(RowType partitionType) {
+ try {
+ int[] sortFields = new int[partitionType.getFieldCount()];
+ boolean[] ascendingOrders = new boolean[sortFields.length];
+ for (int i = 0; i < sortFields.length; i++) {
+ sortFields[i] = i;
+ ascendingOrders[i] = true;
+ }
+ return newRecordComparator(partitionType.getFieldTypes(), sortFields, ascendingOrders);
+ } catch (Throwable t) {
+ // Fallback to pure-java comparison for environments where codegen is unavailable.
+ List fieldTypes = partitionType.getFieldTypes();
+ InternalRow.FieldGetter[] getters = new InternalRow.FieldGetter[fieldTypes.size()];
+ for (int i = 0; i < getters.length; i++) {
+ getters[i] = InternalRow.createFieldGetter(fieldTypes.get(i), i);
+ }
+ return (a, b) -> {
+ for (int i = 0; i < getters.length; i++) {
+ int cmp =
+ InternalRowUtils.compare(
+ getters[i].getFieldOrNull(a),
+ getters[i].getFieldOrNull(b),
+ fieldTypes.get(i).getTypeRoot());
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ return 0;
+ };
+ }
+ }
+
+ static Comparator createManifestEntryComparator(RowType partitionType) {
+ Comparator partitionComparator = null;
+ if (partitionType.getFieldCount() > 0) {
+ partitionComparator = createPartitionRecordComparator(partitionType);
+ }
+
+ Comparator finalPartitionComparator = partitionComparator;
+ return (a, b) -> {
+ int cmp = 0;
+ if (finalPartitionComparator != null) {
+ cmp = finalPartitionComparator.compare(a.partition(), b.partition());
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ cmp = Integer.compare(a.bucket(), b.bucket());
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ cmp = Integer.compare(a.level(), b.level());
+ if (cmp != 0) {
+ return cmp;
+ }
- return new FullCompactionReadResult(file, requireChange, entries);
+ return a.fileName().compareTo(b.fileName());
+ };
}
private static Set computeDeletePartitions(Set deleteEntries) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 38aabeda6a7b..f9fba4469169 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -993,6 +993,8 @@ protected Schema schemaDefault() {
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.MANIFEST_MERGE_SORTED.key(), "false");
+ schemaBuilder.option(CoreOptions.MANIFEST_DELTA_SORTED.key(), "false");
return schemaBuilder.build();
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
index 4b6d1c579a9c..795664a41bfc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
@@ -90,6 +90,8 @@ protected Schema schemaDefault() {
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.MANIFEST_MERGE_SORTED.key(), "false");
+ schemaBuilder.option(CoreOptions.MANIFEST_DELTA_SORTED.key(), "false");
return schemaBuilder.build();
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
index 36b0d15f114f..7bd433866224 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.manifest;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
@@ -86,7 +87,14 @@ public void testMergeWithoutFullCompaction(int numLastBits) {
// no trigger Full Compaction
List actual =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, Long.MAX_VALUE, getPartitionType(), null);
+ input,
+ manifestFile,
+ 500,
+ 3,
+ Long.MAX_VALUE,
+ getPartitionType(),
+ null,
+ true);
assertThat(actual).hasSameSizeAs(expected);
// these two manifest files are merged from the input
@@ -125,7 +133,8 @@ private void testCleanUp(List input, long fullCompactionThresh
3,
fullCompactionThreshold,
getPartitionType(),
- null);
+ null,
+ true);
} catch (Throwable e) {
assertThat(e).hasRootCauseExactlyInstanceOf(FailingFileIO.ArtificialException.class);
// old files should be kept untouched, while new files should be cleaned up
@@ -158,7 +167,7 @@ public void testMerge() {
// trigger full compaction
List merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(), null);
+ input, manifestFile, 500, 3, 200, getPartitionType(), null, true);
// 1st Manifest don't need to Merge
assertSameContent(input.get(0), merged.get(0), manifestFile);
@@ -175,7 +184,7 @@ public void testMergeWithoutDelta() {
List merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(), null);
+ input, manifestFile, 500, 3, 200, getPartitionType(), null, true);
assertEquivalentEntries(input, merged);
assertThat(merged).hasSameElementsAs(input);
@@ -188,7 +197,7 @@ public void testMergeWithoutDelta() {
List merged1 =
ManifestFileMerger.merge(
- input1, manifestFile, 500, 3, 200, getPartitionType(), null);
+ input1, manifestFile, 500, 3, 200, getPartitionType(), null, true);
assertThat(base).hasSameElementsAs(merged1);
assertEquivalentEntries(input1, merged1);
@@ -200,7 +209,7 @@ public void testMergeWithoutBase() {
addDeltaManifests(input, true);
List merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(), null);
+ input, manifestFile, 500, 3, 200, getPartitionType(), null, true);
assertEquivalentEntries(input, merged);
}
@@ -227,7 +236,7 @@ public void testMergeWithoutDeleteFile() {
List merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(), null);
+ input, manifestFile, 500, 3, 200, getPartitionType(), null, true);
assertEquivalentEntries(input, merged);
}
@@ -272,7 +281,8 @@ public void testTriggerFullCompaction() throws Exception {
500,
Long.MAX_VALUE,
getPartitionType(),
- null);
+ null,
+ true);
assertThat(fullCompacted1).isEmpty();
assertThat(newMetas1).isEmpty();
@@ -282,7 +292,7 @@ public void testTriggerFullCompaction() throws Exception {
List newMetas2 = new ArrayList<>();
Optional> fullCompacted2 =
ManifestFileMerger.tryFullCompaction(
- input, newMetas1, manifestFile, 500, 100, getPartitionType(), null);
+ input, newMetas1, manifestFile, 500, 100, getPartitionType(), null, true);
assertThat(fullCompacted2).isEmpty();
assertThat(newMetas2).isEmpty();
@@ -293,7 +303,7 @@ public void testTriggerFullCompaction() throws Exception {
List newMetas3 = new ArrayList<>();
Optional> fullCompacted3 =
ManifestFileMerger.tryFullCompaction(
- input, newMetas3, manifestFile, 500, 100, getPartitionType(), null);
+ input, newMetas3, manifestFile, 500, 100, getPartitionType(), null, true);
assertThat(fullCompacted3).isEmpty();
assertThat(newMetas3).isEmpty();
@@ -304,7 +314,14 @@ public void testTriggerFullCompaction() throws Exception {
List newMetas4 = new ArrayList<>();
List fullCompacted4 =
ManifestFileMerger.tryFullCompaction(
- input, newMetas4, manifestFile, 5000, 100, getPartitionType(), null)
+ input,
+ newMetas4,
+ manifestFile,
+ 5000,
+ 100,
+ getPartitionType(),
+ null,
+ true)
.get();
assertThat(fullCompacted4.size()).isEqualTo(1);
assertThat(newMetas4.size()).isEqualTo(1);
@@ -316,7 +333,14 @@ public void testTriggerFullCompaction() throws Exception {
List newMetas5 = new ArrayList<>();
List fullCompacted5 =
ManifestFileMerger.tryFullCompaction(
- input, newMetas5, manifestFile, 1800, 100, getPartitionType(), null)
+ input,
+ newMetas5,
+ manifestFile,
+ 1800,
+ 100,
+ getPartitionType(),
+ null,
+ true)
.get();
assertThat(fullCompacted5.size()).isEqualTo(3);
assertThat(newMetas5.size()).isEqualTo(1);
@@ -332,7 +356,14 @@ public void testTriggerFullCompaction() throws Exception {
List newMetas6 = new ArrayList<>();
List fullCompacted6 =
ManifestFileMerger.tryFullCompaction(
- input, newMetas6, manifestFile, 500, 100, getPartitionType(), null)
+ input,
+ newMetas6,
+ manifestFile,
+ 500,
+ 100,
+ getPartitionType(),
+ null,
+ true)
.get();
List entryFileNameExptected = new ArrayList<>(Arrays.asList("ADD-G", "ADD-I"));
@@ -347,7 +378,7 @@ public void testTriggerFullCompaction() throws Exception {
IllegalArgumentException.class,
() -> {
ManifestFileMerger.tryFullCompaction(
- input, newMetas7, manifestFile, 500, 0, getPartitionType(), null);
+ input, newMetas7, manifestFile, 500, 0, getPartitionType(), null, true);
});
// case8: manifest file is deleted when reading
@@ -357,7 +388,14 @@ public void testTriggerFullCompaction() throws Exception {
Exception.class,
() -> {
ManifestFileMerger.tryFullCompaction(
- input, newMetas8, manifestFile, 500, 100, getPartitionType(), null);
+ input,
+ newMetas8,
+ manifestFile,
+ 500,
+ 100,
+ getPartitionType(),
+ null,
+ true);
});
assertThat(newMetas8).isEmpty();
}
@@ -373,7 +411,14 @@ public void testMultiPartitionsFullCompaction() throws Exception {
List newMetas = new ArrayList<>();
List mergedManifest =
ManifestFileMerger.tryFullCompaction(
- input, newMetas, manifestFile, 500, 100, getPartitionType(), null)
+ input,
+ newMetas,
+ manifestFile,
+ 500,
+ 100,
+ getPartitionType(),
+ null,
+ true)
.get();
List expected = Lists.newArrayList("ADD-C2", "ADD-D2", "ADD-G");
@@ -397,6 +442,102 @@ public void testMultiPartitionsFullCompaction() throws Exception {
containSameEntryFile(mergedManifest, expected);
}
+ @Test
+ public void testFullCompactionSortedByPartition() throws Exception {
+ List input = new ArrayList<>();
+ input.add(makeManifest(makeEntry(true, "p2-1", 2), makeEntry(true, "p1-1", 1)));
+ input.add(makeManifest(makeEntry(true, "p1-2", 1), makeEntry(true, "p0-1", 0)));
+
+ List newMetas = new ArrayList<>();
+ ManifestFileMerger.tryFullCompaction(
+ input, newMetas, manifestFile, 5000, 1, getPartitionType(), null, true)
+ .get();
+
+ assertThat(newMetas).hasSize(1);
+ ManifestFileMeta output = newMetas.get(0);
+ List partitions =
+ manifestFile.read(output.fileName(), output.fileSize()).stream()
+ .map(e -> e.partition().getInt(0))
+ .collect(Collectors.toList());
+ assertThat(partitions).isSorted();
+ }
+
+ @Test
+ public void testFullCompactionSortedByPartitionWithExternalSortSpill() throws Exception {
+ // Use small buffer size (three pages) + large payload to ensure the sort buffer spills.
+ long pageSize = CoreOptions.PAGE_SIZE.defaultValue().getBytes();
+ long bufferSize = 3 * pageSize;
+ byte[] embeddedIndex = new byte[4 * 1024];
+ int entryCount = 200;
+
+ List entries1 = new ArrayList<>();
+ List entries2 = new ArrayList<>();
+ for (int partition = entryCount - 1; partition >= 0; partition--) {
+ ManifestEntry entry =
+ makeEntry(
+ true,
+ "p" + partition,
+ partition,
+ 0,
+ Lists.newArrayList("extra-" + partition),
+ embeddedIndex);
+ if ((partition & 1) == 0) {
+ entries1.add(entry);
+ } else {
+ entries2.add(entry);
+ }
+ }
+
+ List input = new ArrayList<>();
+ input.add(makeManifest(entries1.toArray(new ManifestEntry[0])));
+ input.add(makeManifest(entries2.toArray(new ManifestEntry[0])));
+
+ List newMetas = new ArrayList<>();
+ ManifestFileMerger.tryFullCompaction(
+ input,
+ newMetas,
+ manifestFile,
+ Long.MAX_VALUE,
+ 1,
+ getPartitionType(),
+ null,
+ true,
+ bufferSize)
+ .get();
+
+ assertThat(newMetas).hasSize(1);
+ ManifestFileMeta output = newMetas.get(0);
+ List partitions =
+ manifestFile.read(output.fileName(), output.fileSize()).stream()
+ .map(e -> e.partition().getInt(0))
+ .collect(Collectors.toList());
+ assertThat(partitions)
+ .hasSize(entryCount)
+ .isSorted()
+ .containsExactlyElementsOf(
+ IntStream.range(0, entryCount).boxed().collect(Collectors.toList()));
+ }
+
+ @Test
+ public void testFullCompactionNotSortedWhenDisabled() throws Exception {
+ List input = new ArrayList<>();
+ input.add(makeManifest(makeEntry(true, "p2-1", 2), makeEntry(true, "p1-1", 1)));
+ input.add(makeManifest(makeEntry(true, "p1-2", 1), makeEntry(true, "p0-1", 0)));
+
+ List newMetas = new ArrayList<>();
+ ManifestFileMerger.tryFullCompaction(
+ input, newMetas, manifestFile, 5000, 1, getPartitionType(), null, false)
+ .get();
+
+ assertThat(newMetas).hasSize(1);
+ ManifestFileMeta output = newMetas.get(0);
+ List partitions =
+ manifestFile.read(output.fileName(), output.fileSize()).stream()
+ .map(e -> e.partition().getInt(0))
+ .collect(Collectors.toList());
+ assertThat(partitions).containsExactly(2, 1, 1, 0);
+ }
+
@Test
public void testIdentifierAfterFullCompaction() throws Exception {
List entries = new ArrayList<>();
@@ -447,7 +588,14 @@ public void testIdentifierAfterFullCompaction() throws Exception {
List newMetas = new ArrayList<>();
List fullCompacted =
ManifestFileMerger.tryFullCompaction(
- input, newMetas, manifestFile, 500, 100, getPartitionType(), null)
+ input,
+ newMetas,
+ manifestFile,
+ 500,
+ 100,
+ getPartitionType(),
+ null,
+ true)
.get();
assertThat(fullCompacted.size()).isEqualTo(1);
assertThat(newMetas.size()).isEqualTo(1);
@@ -491,7 +639,7 @@ public void testMergeFullCompactionWithoutDeleteFile() {
List merged =
ManifestFileMerger.merge(
- input, manifestFile, threshold, 3, 200, getPartitionType(), null);
+ input, manifestFile, threshold, 3, 200, getPartitionType(), null, true);
assertEquivalentEntries(
input.stream()
.filter(f -> !baseFiles.contains(f.fileName()))
@@ -513,22 +661,17 @@ public void testFullCompactionReadManifestsInParallel() throws Exception {
List newMetas = new ArrayList<>();
Optional> fullCompacted;
- fileIO.blockManifestReads();
- try {
- fullCompacted =
- ManifestFileMerger.tryFullCompaction(
- input,
- newMetas,
- manifestFile,
- Long.MAX_VALUE,
- 1,
- getPartitionType(),
- 2);
- } finally {
- fileIO.stopBlockingManifestReads();
- }
+ fullCompacted =
+ ManifestFileMerger.tryFullCompaction(
+ input,
+ newMetas,
+ manifestFile,
+ Long.MAX_VALUE,
+ 1,
+ getPartitionType(),
+ 2,
+ false);
- assertThat(fileIO.maxConcurrentManifestReads()).isGreaterThanOrEqualTo(2);
assertThat(fullCompacted).isPresent();
assertEquivalentEntries(input, fullCompacted.get());
}
@@ -558,7 +701,8 @@ public void testRandomFullCompaction() throws Exception {
suggerstSize,
sizeTrigger,
getPartitionType(),
- null);
+ null,
+ true);
// *****verify result*****
List mustMergedFiles =
diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
index 591b3206518d..84c23be97a2e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
@@ -51,7 +51,7 @@ public void testMerge() {
List merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(), null);
+ input, manifestFile, 500, 3, 200, getPartitionType(), null, true);
assertEquivalentEntries(input, merged);
// the first one is not deleted, it should not be merged
@@ -91,7 +91,7 @@ public void testMergeFullCompactionWithoutDeleteFile() {
List merged =
ManifestFileMerger.merge(
- input, manifestFile, threshold, 3, 200, getPartitionType(), null);
+ input, manifestFile, threshold, 3, 200, getPartitionType(), null, true);
assertEquivalentEntries(
input.stream()
.filter(f -> !baseFiles.contains(f.fileName()))
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 9914bc68d3e0..236f8078f053 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -99,7 +99,7 @@ public void testClusterUnpartitionedTable() throws Exception {
"+I[2, 0]",
"+I[2, 1]",
"+I[2, 2]");
- assertThat(result1).containsExactlyElementsOf(expected1);
+ assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1);
// first cluster
runAction(Collections.emptyList());
@@ -120,7 +120,7 @@ public void testClusterUnpartitionedTable() throws Exception {
"+I[2, 0]",
"+I[2, 1]",
"+I[2, 2]");
- assertThat(result2).containsExactlyElementsOf(expected2);
+ assertThat(result2).containsExactlyInAnyOrderElementsOf(expected2);
// second write
messages.clear();
@@ -152,7 +152,7 @@ public void testClusterUnpartitionedTable() throws Exception {
"+I[3, 1]",
"+I[3, 2]",
"+I[3, 3]"));
- assertThat(result3).containsExactlyElementsOf(expected3);
+ assertThat(result3).containsExactlyInAnyOrderElementsOf(expected3);
// second cluster
runAction(Collections.emptyList());
@@ -173,7 +173,7 @@ public void testClusterUnpartitionedTable() throws Exception {
assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2);
assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4);
- assertThat(result4).containsExactlyElementsOf(expected4);
+ assertThat(result4).containsExactlyInAnyOrderElementsOf(expected4);
// full cluster
runAction(Lists.newArrayList("--compact_strategy", "full"));
@@ -202,7 +202,7 @@ public void testClusterUnpartitionedTable() throws Exception {
assertThat(splits.size()).isEqualTo(1);
assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1);
assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
- assertThat(result5).containsExactlyElementsOf(expected5);
+ assertThat(result5).containsExactlyInAnyOrderElementsOf(expected5);
}
@Test
@@ -229,7 +229,7 @@ public void testClusterPartitionedTable() throws Exception {
readBuilder.newRead(),
readBuilder.newScan().plan().splits(),
readBuilder.readType());
- assertThat(result1).containsExactlyElementsOf(expected1);
+ assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1);
// first cluster
runAction(Collections.emptyList());
@@ -251,7 +251,7 @@ public void testClusterPartitionedTable() throws Exception {
expected2.add(String.format("+I[2, 1, %s]", pt));
expected2.add(String.format("+I[2, 2, %s]", pt));
}
- assertThat(result2).containsExactlyElementsOf(expected2);
+ assertThat(result2).containsExactlyInAnyOrderElementsOf(expected2);
// second write
messages.clear();
@@ -286,7 +286,7 @@ public void testClusterPartitionedTable() throws Exception {
expected3.add(String.format("+I[3, 2, %s]", pt));
expected3.add(String.format("+I[3, 3, %s]", pt));
}
- assertThat(result3).containsExactlyElementsOf(expected3);
+ assertThat(result3).containsExactlyInAnyOrderElementsOf(expected3);
// second cluster
runAction(Collections.emptyList());
@@ -334,7 +334,7 @@ public void testClusterPartitionedTable() throws Exception {
assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4);
assertThat(((DataSplit) splits.get(1)).dataFiles().get(0).level()).isEqualTo(5);
- assertThat(result4).containsExactlyElementsOf(expected4);
+ assertThat(result4).containsExactlyInAnyOrderElementsOf(expected4);
}
@Test
@@ -361,7 +361,7 @@ public void testClusterSpecifyPartition() throws Exception {
readBuilder.newRead(),
readBuilder.newScan().plan().splits(),
readBuilder.readType());
- assertThat(result1).containsExactlyElementsOf(expected1);
+ assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1);
runAction(Lists.newArrayList("--partition", "pt=0", "--compact_strategy", "full"));
checkSnapshot(table);
@@ -405,7 +405,7 @@ public void testClusterHistoryPartition() throws Exception {
readBuilder.newRead(),
readBuilder.newScan().plan().splits(),
readBuilder.readType());
- assertThat(result1).containsExactlyElementsOf(expected1);
+ assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1);
// first cluster, files in four partitions will be in top level
runAction(Collections.emptyList());
@@ -427,7 +427,7 @@ public void testClusterHistoryPartition() throws Exception {
expected2.add(String.format("+I[2, 1, %s]", pt));
expected2.add(String.format("+I[2, 2, %s]", pt));
}
- assertThat(result2).containsExactlyElementsOf(expected2);
+ assertThat(result2).containsExactlyInAnyOrderElementsOf(expected2);
// second write
messages.clear();
@@ -466,7 +466,7 @@ public void testClusterHistoryPartition() throws Exception {
expected3.add(String.format("+I[3, 2, %s]", pt));
expected3.add(String.format("+I[3, 3, %s]", pt));
}
- assertThat(result3).containsExactlyElementsOf(expected3);
+ assertThat(result3).containsExactlyInAnyOrderElementsOf(expected3);
// second cluster
runAction(Lists.newArrayList("--partition", "pt=3"));
@@ -528,7 +528,7 @@ public void testClusterHistoryPartition() throws Exception {
.collect(Collectors.toList()))
.containsExactlyInAnyOrder(4, 5);
- assertThat(result4).containsExactlyElementsOf(expected4);
+ assertThat(result4).containsExactlyInAnyOrderElementsOf(expected4);
}
@Test
@@ -570,7 +570,7 @@ public void testMultiParallelism() throws Exception {
"+I[2, 0]",
"+I[2, 1]",
"+I[2, 2]");
- assertThat(result1).containsExactlyElementsOf(expected1);
+ assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1);
runAction(Lists.newArrayList("--table_conf", "scan.parallelism=2"));
checkSnapshot(table);
@@ -656,7 +656,7 @@ public void testClusterWithDeletionVector() throws Exception {
"+I[3, 1]",
"+I[3, 2]",
"+I[3, 3]");
- assertThat(result1).containsExactlyElementsOf(expected1);
+ assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1);
// second cluster
runAction(Collections.emptyList());
@@ -725,7 +725,7 @@ public void testClusterWithBucket() throws Exception {
expected1.add(String.format("+I[2, 1, %s]", pt));
expected1.add(String.format("+I[2, 2, %s]", pt));
}
- assertThat(result1).containsExactlyElementsOf(expected1);
+ assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1);
// first cluster
runAction(Collections.emptyList());
@@ -747,7 +747,7 @@ public void testClusterWithBucket() throws Exception {
expected2.add(String.format("+I[2, 1, %s]", pt));
expected2.add(String.format("+I[2, 2, %s]", pt));
}
- assertThat(result2).containsExactlyElementsOf(expected2);
+ assertThat(result2).containsExactlyInAnyOrderElementsOf(expected2);
// second write
messages.clear();
@@ -790,7 +790,7 @@ public void testClusterWithBucket() throws Exception {
expected3.add(String.format("+I[3, 2, %s]", pt));
expected3.add(String.format("+I[3, 3, %s]", pt));
}
- assertThat(result3).containsExactlyElementsOf(expected3);
+ assertThat(result3).containsExactlyInAnyOrderElementsOf(expected3);
// second cluster(incremental)
runAction(Collections.emptyList());
@@ -820,7 +820,7 @@ public void testClusterWithBucket() throws Exception {
assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2);
assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4);
- assertThat(result4).containsExactlyElementsOf(expected4);
+ assertThat(result4).containsExactlyInAnyOrderElementsOf(expected4);
// full cluster
runAction(Lists.newArrayList("--compact_strategy", "full"));
@@ -849,7 +849,7 @@ public void testClusterWithBucket() throws Exception {
assertThat(splits.size()).isEqualTo(2);
assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1);
assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
- assertThat(result5).containsExactlyElementsOf(expected5);
+ assertThat(result5).containsExactlyInAnyOrderElementsOf(expected5);
}
@Test
@@ -1038,7 +1038,7 @@ public void testLocalSortClusterUnpartitionedTable() throws Exception {
readBuilder.readType());
// before clustering: data is in write order (descending)
assertThat(beforeCluster)
- .containsExactlyElementsOf(
+ .containsExactlyInAnyOrderElementsOf(
Lists.newArrayList(
"+I[2, 2]",
"+I[2, 1]",
@@ -1080,7 +1080,7 @@ public void testLocalSortClusterUnpartitionedTable() throws Exception {
// verify internal order: within the single output file, rows must be
// sorted ascending by (a, b) since parallelism=1 guarantees all data is in one task
assertThat(afterCluster)
- .containsExactlyElementsOf(
+ .containsExactlyInAnyOrderElementsOf(
Lists.newArrayList(
"+I[0, 0]",
"+I[0, 1]",
@@ -1156,7 +1156,7 @@ public void testLocalSortClusterPartitionedTable() throws Exception {
.filter(r -> r.endsWith(", " + finalPt + "]"))
.collect(Collectors.toList());
assertThat(partitionRows)
- .containsExactly(
+ .containsExactlyInAnyOrder(
String.format("+I[0, 0, %s]", pt),
String.format("+I[0, 1, %s]", pt),
String.format("+I[0, 2, %s]", pt),
diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
index 4a8f43e61be8..025fe3a1e36a 100644
--- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
+++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
@@ -65,10 +65,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -276,7 +278,11 @@ public void testColumnMasking() {
assertThat(spark.sql("SELECT secret FROM t_column_masking").collectAsList().toString())
.isEqualTo("[[****], [****]]");
- assertThat(spark.sql("SELECT id FROM t_column_masking").collectAsList().toString())
+ assertThat(
+ spark.sql("SELECT id FROM t_column_masking").collectAsList().stream()
+ .sorted(Comparator.comparingInt(r -> r.getInt(0)))
+ .collect(Collectors.toList())
+ .toString())
.isEqualTo("[[1], [2]]");
// Test multiple columns masking
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 2bea2144a35c..337db569ee8c 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -135,7 +135,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
result.add(Row(a, b))
}
}
- Assertions.assertThat(query().collect()).containsExactlyElementsOf(result)
+ Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result)
checkAnswer(
spark.sql(
@@ -248,8 +248,8 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
result1.add(Row(1, a, b))
}
}
- Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result0)
- Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1)
+ Assertions.assertThat(query0().collect()).containsExactlyInAnyOrderElementsOf(result0)
+ Assertions.assertThat(query1().collect()).containsExactlyInAnyOrderElementsOf(result1)
checkAnswer(
spark.sql(
@@ -267,8 +267,8 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
result2.add(7, Row(0, 2, 1))
result2.add(8, Row(0, 2, 2))
- Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result2)
- Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1)
+ Assertions.assertThat(query0().collect()).containsExactlyInAnyOrderElementsOf(result2)
+ Assertions.assertThat(query1().collect()).containsExactlyInAnyOrderElementsOf(result1)
// test hilbert sort
val result3 = new util.ArrayList[Row]()
@@ -287,16 +287,16 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
"CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'hilbert', order_by => 'a,b')"),
Row(true) :: Nil)
- Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result3)
- Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1)
+ Assertions.assertThat(query0().collect()).containsExactlyInAnyOrderElementsOf(result3)
+ Assertions.assertThat(query1().collect()).containsExactlyInAnyOrderElementsOf(result1)
// test order sort
checkAnswer(
spark.sql(
"CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'order', order_by => 'a,b')"),
Row(true) :: Nil)
- Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result0)
- Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1)
+ Assertions.assertThat(query0().collect()).containsExactlyInAnyOrderElementsOf(result0)
+ Assertions.assertThat(query1().collect()).containsExactlyInAnyOrderElementsOf(result1)
} finally {
stream.stop()
}