Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,12 @@
<td>Boolean</td>
<td>For DELETE manifest entry in manifest file, drop stats to reduce memory and storage. Default value is false only for compatibility of old reader.</td>
</tr>
<tr>
<td><h5>manifest.delta.sorted</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to sort ManifestEntry by partition when writing manifest delta.</td>
</tr>
<tr>
<td><h5>manifest.format</h5></td>
<td style="word-wrap: break-word;">"avro"</td>
Expand All @@ -897,6 +903,24 @@
<td>Integer</td>
<td>To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge.</td>
</tr>
<tr>
<td><h5>manifest.merge.sort-on-commit</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to sort ManifestEntry by partition during manifest merge in commit. This option does not affect manifest compaction.</td>
</tr>
<tr>
<td><h5>manifest.merge.sort.buffer</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
<td>MemorySize</td>
<td>Amount of data to build up in memory for sorting during manifest full compaction before spilling to disk.</td>
</tr>
<tr>
<td><h5>manifest.merge.sorted</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to sort ManifestEntry by partition during manifest full compaction.</td>
</tr>
<tr>
<td><h5>manifest.target-file-size</h5></td>
<td style="word-wrap: break-word;">8 mb</td>
Expand Down
45 changes: 45 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,28 @@ public InlineElement getDescription() {
.withDescription(
"Define upsert key to do MERGE INTO when executing INSERT INTO, cannot be defined with primary key.");

public static final ConfigOption<Boolean> MANIFEST_MERGE_SORTED =
key("manifest.merge.sorted")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to sort ManifestEntry by partition during manifest full compaction.");

public static final ConfigOption<Boolean> MANIFEST_MERGE_SORT_ON_COMMIT =
key("manifest.merge.sort-on-commit")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to sort ManifestEntry by partition during manifest merge in commit. "
+ "This option does not affect manifest compaction.");

public static final ConfigOption<Boolean> MANIFEST_DELTA_SORTED =
key("manifest.delta.sorted")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to sort ManifestEntry by partition when writing manifest delta.");

public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
key("partition.default-name")
.stringType()
Expand Down Expand Up @@ -661,6 +683,13 @@ public InlineElement getDescription() {
.withDescription(
"Amount of data to build up in memory before converting to a sorted on-disk file.");

public static final ConfigOption<MemorySize> MANIFEST_MERGE_SORT_BUFFER =
key("manifest.merge.sort.buffer")
.memoryType()
.defaultValue(WRITE_BUFFER_SIZE.defaultValue())
.withDescription(
"Amount of data to build up in memory for sorting during manifest full compaction before spilling to disk.");

@Documentation.OverrideDefault("infinite")
public static final ConfigOption<MemorySize> WRITE_BUFFER_MAX_DISK_SIZE =
key("write-buffer-spill.max-disk-size")
Expand Down Expand Up @@ -2826,6 +2855,22 @@ public int manifestMergeMinCount() {
return options.get(MANIFEST_MERGE_MIN_COUNT);
}

public boolean manifestMergeSorted() {
return options.get(MANIFEST_MERGE_SORTED);
}

public boolean manifestMergeSortOnCommit() {
return options.get(MANIFEST_MERGE_SORT_ON_COMMIT);
}

public boolean manifestDeltaSorted() {
return options.get(MANIFEST_DELTA_SORTED);
}

public long manifestMergeSortBufferSize() {
return options.get(MANIFEST_MERGE_SORT_BUFFER).getBytes();
}

public MergeEngine mergeEngine() {
return options.get(MERGE_ENGINE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,11 +956,14 @@ CommitResult tryCommitOnce(
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
options.manifestTargetSize().getBytes(),
options.manifestMergeMinCount(),
options.manifestFullCompactionThresholdSize().getBytes(),
manifestTargetSize.getBytes(),
manifestMergeMinCount,
manifestFullCompactionSize.getBytes(),
partitionType,
options.scanManifestParallelism());
manifestReadParallelism,
coreOptions.manifestMergeSorted()
&& coreOptions.manifestMergeSortOnCommit(),
coreOptions.manifestMergeSortBufferSize());
baseManifestList = manifestList.write(mergeAfterManifests);

if (options.rowTrackingEnabled()) {
Expand All @@ -985,8 +988,14 @@ CommitResult tryCommitOnce(

// write new delta files into manifest files
deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles));
deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));

List<ManifestEntry> deltaFilesForWrite = deltaFiles;
if (coreOptions.manifestDeltaSorted() && deltaFiles.size() > 1) {
deltaFilesForWrite = new ArrayList<>(deltaFiles);
deltaFilesForWrite.sort(
ManifestFileMerger.createManifestEntryComparator(partitionType));
}
List<ManifestFileMeta> deltaManifests = manifestFile.write(deltaFilesForWrite);
deltaManifestList = manifestList.write(deltaManifests);
// write changelog into manifest files
if (!changelogFiles.isEmpty()) {
changelogManifestList = manifestList.write(manifestFile.write(changelogFiles));
Expand Down Expand Up @@ -1189,7 +1198,9 @@ private boolean compactManifestOnce() {
1,
1,
partitionType,
options.scanManifestParallelism());
options.scanManifestParallelism(),
coreOptions.manifestMergeSorted(),
coreOptions.manifestMergeSortBufferSize());

if (new HashSet<>(mergeBeforeManifests).equals(new HashSet<>(mergeAfterManifests))) {
// no need to commit this snapshot, because no compact were happened
Expand Down
Loading
Loading