Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,24 @@
<td>Integer</td>
<td>To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge.</td>
</tr>
<tr>
<td><h5>manifest-sort.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to invoke manifest sort rewrite during commit.</td>
</tr>
<tr>
<td><h5>manifest-sort.partition-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Partition field name to sort manifest entries by. Validated by schema validation, if not configured, defaults to the first partition field.</td>
</tr>
<tr>
<td><h5>manifest-sort.max-rewrite-size</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
<td>MemorySize</td>
<td>Maximum total size of manifest files to rewrite in a single sort rewrite pass. Sections exceeding this limit are skipped. Set to a larger value to allow more aggressive sort rewriting.</td>
</tr>
<tr>
<td><h5>manifest.target-file-size</h5></td>
<td style="word-wrap: break-word;">8 mb</td>
Expand Down
37 changes: 37 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,30 @@ public InlineElement getDescription() {
"To avoid frequent manifest merges, this parameter specifies the minimum number "
+ "of ManifestFileMeta to merge.");

public static final ConfigOption<Boolean> MANIFEST_SORT_ENABLED =
key("manifest-sort.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to invoke manifest sort rewrite during commit.");

public static final ConfigOption<String> MANIFEST_SORT_PARTITION_FIELD =
key("manifest-sort.partition-field")
.stringType()
.noDefaultValue()
.withDescription(
"Partition field name to sort manifest entries by. Validated by"
+ " schema validation, if not configured, defaults to the first partition field.");

public static final ConfigOption<MemorySize> MANIFEST_SORT_MAX_REWRITE_SIZE =
key("manifest-sort.max-rewrite-size")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(256))
.withDescription(
"Maximum total size of manifest files to rewrite in a single"
+ " sort rewrite pass. Sections exceeding this limit are"
+ " skipped. Set to a larger value to allow more aggressive"
+ " sort rewriting.");

public static final ConfigOption<String> UPSERT_KEY =
key("upsert-key")
.stringType()
Expand Down Expand Up @@ -2564,6 +2588,19 @@ public MemorySize manifestFullCompactionThresholdSize() {
return options.get(MANIFEST_FULL_COMPACTION_FILE_SIZE);
}

public boolean manifestSortEnabled() {
return options.get(MANIFEST_SORT_ENABLED);
}

@Nullable
public String manifestSortPartitionField() {
return options.get(MANIFEST_SORT_PARTITION_FIELD);
}

public long manifestSortMaxRewriteSize() {
return options.get(MANIFEST_SORT_MAX_REWRITE_SIZE).getBytes();
}

public String partitionDefaultName() {
return options.get(PARTITION_DEFAULT_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.paimon.operation.commit.SuccessCommitResult;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.operation.metrics.CommitStats;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.predicate.Predicate;
Expand Down Expand Up @@ -958,13 +960,7 @@ CommitResult tryCommitOnce(
// try to merge old manifest files to create base manifest list
mergeAfterManifests =
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
options.manifestTargetSize().getBytes(),
options.manifestMergeMinCount(),
options.manifestFullCompactionThresholdSize().getBytes(),
partitionType,
options.scanManifestParallelism());
mergeBeforeManifests, manifestFile, partitionType, options);
baseManifestList = manifestList.write(mergeAfterManifests);

if (options.rowTrackingEnabled()) {
Expand Down Expand Up @@ -1184,16 +1180,16 @@ private boolean compactManifestOnce() {
manifestList.readDataManifests(latestSnapshot);
List<ManifestFileMeta> mergeAfterManifests;

// the fist trial
// the fist trial: use a copied options with forced full compaction settings
Options compactOptions = Options.fromMap(options.toMap());
compactOptions.set(CoreOptions.MANIFEST_MERGE_MIN_COUNT, 1);
compactOptions.set(CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE, MemorySize.ofBytes(1));
mergeAfterManifests =
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
options.manifestTargetSize().getBytes(),
1,
1,
partitionType,
options.scanManifestParallelism());
new CoreOptions(compactOptions));

if (new HashSet<>(mergeBeforeManifests).equals(new HashSet<>(mergeAfterManifests))) {
// no need to commit this snapshot, because no compact were happened
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation;

import org.apache.paimon.manifest.ManifestFileMeta;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* A {@code ManifestAdjacentSortedRun} is a list of {@link ManifestFileMeta}s sorted by a single
* partition field (the configured manifest sort field). The intervals {@code
* [partitionStats.minValues[k], partitionStats.maxValues[k]]} of these manifests do not overlap on
* field {@code k}, where {@code k} is the configured sort field index.
*/
public class ManifestAdjacentSortedRun {

private int level;
private final List<ManifestFileMeta> files;
private final long totalSize;

private ManifestAdjacentSortedRun(List<ManifestFileMeta> files) {
this.level = -1;
this.files = Collections.unmodifiableList(files);
long size = 0L;
for (ManifestFileMeta file : files) {
size += file.fileSize();
}
this.totalSize = size;
}

/**
* Build a {@code ManifestAdjacentSortedRun} from an already-sorted list. The caller MUST
* guarantee that {@code sortedFiles} is sorted ascending on the configured sort field's min
* value, and that intervals do not overlap on that field.
*/
public static ManifestAdjacentSortedRun fromSorted(List<ManifestFileMeta> sortedFiles) {
return new ManifestAdjacentSortedRun(sortedFiles);
}

public List<ManifestFileMeta> files() {
return files;
}

public long totalSize() {
return totalSize;
}

public int level() {
return level;
}

public void setLevel(int level) {
this.level = level;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ManifestAdjacentSortedRun)) {
return false;
}
ManifestAdjacentSortedRun that = (ManifestAdjacentSortedRun) o;
return level == that.level && files.equals(that.files);
}

@Override
public int hashCode() {
return Objects.hash(level, files);
}

@Override
public String toString() {
return "ManifestAdjacentSortedRun{level="
+ level
+ ", files=["
+ files.stream().map(ManifestFileMeta::fileName).collect(Collectors.joining(", "))
+ "]}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.operation;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.manifest.FileEntry;
Expand Down Expand Up @@ -48,7 +49,7 @@
import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Util for merging manifest files. */
/** Manifest file merger with standard merge logic and optional sort rewrite. */
public class ManifestFileMerger {

private static final Logger LOG = LoggerFactory.getLogger(ManifestFileMerger.class);
Expand All @@ -62,33 +63,44 @@ public class ManifestFileMerger {
public static List<ManifestFileMeta> merge(
List<ManifestFileMeta> input,
ManifestFile manifestFile,
long suggestedMetaSize,
int suggestedMinMetaCount,
long manifestFullCompactionSize,
RowType partitionType,
@Nullable Integer manifestReadParallelism) {
CoreOptions options) {
// Extract configuration from options
long suggestedMetaSize = options.manifestTargetSize().getBytes();
int suggestedMinMetaCount = options.manifestMergeMinCount();
long manifestFullCompactionSize = options.manifestFullCompactionThresholdSize().getBytes();
Integer manifestReadParallelism = options.scanManifestParallelism();

// these are the newly created manifest files, clean them up if exception occurs
List<ManifestFileMeta> newFilesForAbort = new ArrayList<>();

try {
Optional<List<ManifestFileMeta>> fullCompacted =
tryFullCompaction(
input,
newFilesForAbort,
manifestFile,
suggestedMetaSize,
manifestFullCompactionSize,
partitionType,
manifestReadParallelism);
return fullCompacted.orElseGet(
() ->
tryMinorCompaction(
input,
newFilesForAbort,
manifestFile,
suggestedMetaSize,
suggestedMinMetaCount,
manifestReadParallelism));
// If manifest-sort.enabled is enabled and there are partition fields, use
// trySortRewrite
if (options.manifestSortEnabled() && partitionType.getFieldCount() > 0) {
return ManifestFileSorter.trySortCompaction(
input, newFilesForAbort, manifestFile, partitionType, options);
} else {
// Otherwise try full compaction first, then minor compaction if needed
Optional<List<ManifestFileMeta>> fullCompacted =
tryFullCompaction(
input,
newFilesForAbort,
manifestFile,
suggestedMetaSize,
manifestFullCompactionSize,
partitionType,
manifestReadParallelism);
return fullCompacted.orElseGet(
() ->
tryMinorCompaction(
input,
newFilesForAbort,
manifestFile,
suggestedMetaSize,
suggestedMinMetaCount,
manifestReadParallelism));
}
} catch (Throwable e) {
// exception occurs, clean up and rethrow
for (ManifestFileMeta manifest : newFilesForAbort) {
Expand Down Expand Up @@ -234,7 +246,6 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
}

// 2.2. merge

if (toBeMerged.size() <= 1) {
return Optional.empty();
}
Expand Down Expand Up @@ -295,21 +306,21 @@ private static FullCompactionReadResult readForFullCompaction(
return new FullCompactionReadResult(file, requireChange, entries);
}

private static Set<BinaryRow> computeDeletePartitions(Set<FileEntry.Identifier> deleteEntries) {
static Set<BinaryRow> computeDeletePartitions(Set<FileEntry.Identifier> deleteEntries) {
Set<BinaryRow> partitions = new HashSet<>();
for (FileEntry.Identifier identifier : deleteEntries) {
partitions.add(identifier.partition);
}
return partitions;
}

private static class FullCompactionReadResult {
static class FullCompactionReadResult {

private final ManifestFileMeta file;
private final boolean requireChange;
private final List<ManifestEntry> entries;
final ManifestFileMeta file;
final boolean requireChange;
final List<ManifestEntry> entries;

private FullCompactionReadResult(
FullCompactionReadResult(
ManifestFileMeta file, boolean requireChange, List<ManifestEntry> entries) {
this.file = file;
this.requireChange = requireChange;
Expand Down
Loading