defaultCompactionMap) {
+ this.lsmFiles = lsmFiles;
+ this.deleteEntries = deleteEntries;
+ this.defaultCompactionMap = defaultCompactionMap;
+ }
+ }
+
+ /**
+ * Try to sort-rewrite the merged manifest list by a configured partition field. If the sort
+ * field cannot be resolved, the input is returned as-is.
+ *
+ * Dispatches to {@link #tryFullCompaction} when totalDeltaFileSize >= sizeTrigger, or {@link
+ * #tryMinorCompaction} otherwise.
+ */
+ static List trySortCompaction(
+ List input,
+ List newFilesForAbort,
+ ManifestFile manifestFile,
+ RowType partitionType,
+ CoreOptions options)
+ throws Exception {
+ String sortPartitionField = options.manifestSortPartitionField();
+ long suggestedMetaSize = options.manifestTargetSize().getBytes();
+ int suggestedMinMetaCount = options.manifestMergeMinCount();
+ long fullCompactionThreshold = options.manifestFullCompactionThresholdSize().getBytes();
+ long maxRewriteSize = options.manifestSortMaxRewriteSize();
+ int maxSizeAmplificationPercent = options.maxSizeAmplificationPercent();
+ int sortedRunSizeRatio = options.sortedRunSizeRatio();
+ Integer manifestReadParallelism = options.scanManifestParallelism();
+
+ Optional> fullCompacted =
+ tryFullCompaction(
+ input,
+ newFilesForAbort,
+ manifestFile,
+ partitionType,
+ sortPartitionField,
+ suggestedMetaSize,
+ suggestedMinMetaCount,
+ fullCompactionThreshold,
+ maxRewriteSize,
+ maxSizeAmplificationPercent,
+ sortedRunSizeRatio,
+ manifestReadParallelism);
+ if (fullCompacted.isPresent()) {
+ return fullCompacted.get();
+ }
+ return tryMinorCompaction(
+ input,
+ newFilesForAbort,
+ manifestFile,
+ partitionType,
+ sortPartitionField,
+ suggestedMetaSize,
+ suggestedMinMetaCount,
+ maxRewriteSize,
+ maxSizeAmplificationPercent,
+ sortedRunSizeRatio,
+ manifestReadParallelism);
+ }
+
+ /**
+ * Full compaction path: totalDeltaFileSize >= sizeTrigger.
+ *
+ * Does not build index mapping. sortAndRewriteSection writes all entries (ADD+DELETE merged)
+ * together without separating them.
+ */
+ private static Optional> tryFullCompaction(
+ List input,
+ List newFilesForAbort,
+ ManifestFile manifestFile,
+ RowType partitionType,
+ String sortPartitionField,
+ long suggestedMetaSize,
+ int suggestedMinMetaCount,
+ long fullCompactionThreshold,
+ long maxRewriteSize,
+ int maxSizeAmplificationPercent,
+ int sortedRunSizeRatio,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ // Step 1: Check if full compaction threshold is met
+ long totalDeltaFileSize = 0;
+ for (ManifestFileMeta file : input) {
+ if (file.numDeletedFiles() > 0 || file.fileSize() < suggestedMetaSize) {
+ totalDeltaFileSize += file.fileSize();
+ }
+ }
+ if (totalDeltaFileSize < fullCompactionThreshold) {
+ return Optional.empty();
+ }
+ // Step 2: Prepare compaction context
+ CompactionContext ctx =
+ prepareCompaction(
+ input,
+ true,
+ manifestFile,
+ partitionType,
+ sortPartitionField,
+ suggestedMetaSize,
+ maxSizeAmplificationPercent,
+ sortedRunSizeRatio,
+ manifestReadParallelism);
+ List levelRuns = ctx.levelRuns;
+ List pickedRuns = ctx.pickedRuns;
+
+ if (pickedRuns.isEmpty() && ctx.defaultCompactionMap.isEmpty()) {
+ LOG.debug(
+ "Manifest sort full compact skipped: no runs picked and no defaultCompaction files.");
+ return Optional.empty();
+ }
+
+ LOG.info(
+ "Manifest sort full compact: input={} files, lsm={} runs, picked={} runs, "
+ + "defaultCompaction={} files.",
+ input.size(),
+ levelRuns.size(),
+ pickedRuns.size(),
+ ctx.defaultCompactionMap.size());
+
+ // Step 3: Collect reused files (not picked) and picked files
+ Set pickedSet = new HashSet<>(pickedRuns);
+ List result = new ArrayList<>();
+ for (ManifestAdjacentSortedRun run : levelRuns) {
+ if (!pickedSet.contains(run)) {
+ result.addAll(run.files());
+ }
+ }
+ List pickedFiles = new ArrayList<>();
+ for (ManifestAdjacentSortedRun run : pickedRuns) {
+ pickedFiles.addAll(run.files());
+ }
+ pickedFiles.addAll(ctx.defaultCompactionMap.keySet());
+
+ // Step 4: Split into sections and merge small adjacent sections
+ List sections =
+ splitIntoSections(pickedFiles, ctx.fieldComparator, ctx.defaultCompactionMap);
+ sections = mergeSmallAdjacentSections(sections, suggestedMetaSize);
+
+ LOG.info(
+ "Manifest sort full compact: pickedFiles={}, sections={}.",
+ pickedFiles.size(),
+ sections.size());
+
+ // Step 5: Rewrite sections
+ FullCompactOutput output = new FullCompactOutput(result);
+ rewriteSections(
+ sections,
+ output,
+ newFilesForAbort,
+ ctx,
+ manifestFile,
+ suggestedMetaSize,
+ suggestedMinMetaCount,
+ maxRewriteSize,
+ manifestReadParallelism);
+
+ LOG.info(
+ "Manifest sort full compact completed: input={}, resultFiles={}.",
+ input.size(),
+ result.size());
+ return Optional.of(result);
+ }
+
+ /**
+ * Minor compaction path: totalDeltaFileSize < sizeTrigger.
+ *
+ * Builds index mapping to preserve original positions. sortAndRewriteSection separates ADD
+ * and DELETE entries, placing ADD at result[minIdx] and DELETE at result[maxIdx].
+ */
+ private static List tryMinorCompaction(
+ List input,
+ List newFilesForAbort,
+ ManifestFile manifestFile,
+ RowType partitionType,
+ String sortPartitionField,
+ long suggestedMetaSize,
+ int suggestedMinMetaCount,
+ long maxRewriteSize,
+ int maxSizeAmplificationPercent,
+ int sortedRunSizeRatio,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ // Step 1: Prepare compaction context (early-return if nothing to compact)
+ CompactionContext ctx =
+ prepareCompaction(
+ input,
+ false,
+ manifestFile,
+ partitionType,
+ sortPartitionField,
+ suggestedMetaSize,
+ maxSizeAmplificationPercent,
+ sortedRunSizeRatio,
+ manifestReadParallelism);
+ List levelRuns = ctx.levelRuns;
+ List pickedRuns = ctx.pickedRuns;
+
+ if (pickedRuns.isEmpty() && ctx.defaultCompactionMap.isEmpty()) {
+ LOG.debug(
+ "Manifest sort minor compact skipped: no runs picked and no defaultCompaction files.");
+ return input;
+ }
+
+ LOG.info(
+ "Manifest sort minor compact: input={} files, lsm={} runs, picked={} runs, "
+ + "defaultCompaction={} files.",
+ input.size(),
+ levelRuns.size(),
+ pickedRuns.size(),
+ ctx.defaultCompactionMap.size());
+
+ // Step 2: Build fileName -> index mapping and initialize 2D result
+ Map fileNameToIndex = new HashMap<>();
+ List> result = new ArrayList<>(input.size());
+ for (int i = 0; i < input.size(); i++) {
+ fileNameToIndex.put(input.get(i).fileName(), i);
+ result.add(new ArrayList<>());
+ }
+
+ // Step 3: Collect reused files and picked files
+ Set pickedSet = new HashSet<>(pickedRuns);
+ for (ManifestAdjacentSortedRun run : levelRuns) {
+ if (!pickedSet.contains(run)) {
+ for (ManifestFileMeta file : run.files()) {
+ Integer idx = fileNameToIndex.get(file.fileName());
+ if (idx != null) {
+ result.get(idx).add(file);
+ }
+ }
+ }
+ }
+
+ List pickedFiles = new ArrayList<>();
+ for (ManifestAdjacentSortedRun run : pickedRuns) {
+ pickedFiles.addAll(run.files());
+ }
+ pickedFiles.addAll(ctx.defaultCompactionMap.keySet());
+
+ // Step 4: Compute index range
+ int minIdx = Integer.MAX_VALUE;
+ int maxIdx = Integer.MIN_VALUE;
+ for (ManifestFileMeta meta : pickedFiles) {
+ Integer idx = fileNameToIndex.get(meta.fileName());
+ if (idx != null) {
+ minIdx = Math.min(minIdx, idx);
+ maxIdx = Math.max(maxIdx, idx);
+ }
+ }
+ Pair indexRange = Pair.of(minIdx, maxIdx);
+
+ // Step 5: Split into sections and merge small adjacent sections
+ List sections =
+ splitIntoSections(pickedFiles, ctx.fieldComparator, ctx.defaultCompactionMap);
+ sections = mergeSmallAdjacentSections(sections, suggestedMetaSize);
+
+ LOG.info(
+ "Manifest sort minor compact: pickedFiles={}, sections={}.",
+ pickedFiles.size(),
+ sections.size());
+
+ // Step 6: Rewrite sections
+ MinorCompactOutput output = new MinorCompactOutput(result, indexRange, fileNameToIndex);
+ rewriteSections(
+ sections,
+ output,
+ newFilesForAbort,
+ ctx,
+ manifestFile,
+ suggestedMetaSize,
+ suggestedMinMetaCount,
+ maxRewriteSize,
+ manifestReadParallelism);
+
+ // Step 7: Flatten 2D result into a single list
+ List flatResult = new ArrayList<>();
+ for (List subList : result) {
+ flatResult.addAll(subList);
+ }
+
+ LOG.info(
+ "Manifest sort minor compact completed: input={}, resultFiles={}.",
+ input.size(),
+ flatResult.size());
+ return flatResult;
+ }
+
+ /**
+ * Prepare compaction context: resolve sort field, classify manifests, build level runs, and
+ * pick runs for compaction.
+ *
+ * @return CompactionContext containing all shared state
+ */
+ private static CompactionContext prepareCompaction(
+ List input,
+ boolean fullCompaction,
+ ManifestFile manifestFile,
+ RowType partitionType,
+ String sortPartitionField,
+ long suggestedMetaSize,
+ int maxSizeAmplificationPercent,
+ int sortedRunSizeRatio,
+ @Nullable Integer manifestReadParallelism) {
+
+ // Step 1: Resolve sort field and build comparator for partition ordering.
+ String sortField = resolveSortField(sortPartitionField, partitionType);
+ if (sortField == null) {
+ throw new IllegalArgumentException(
+ "Cannot resolve sort field for manifest sort rewrite.");
+ }
+ int sortFieldIndex = partitionType.getFieldNames().indexOf(sortField);
+ RecordComparator fieldComparator =
+ CodeGenUtils.newRecordComparator(
+ partitionType.getFieldTypes(), new int[] {sortFieldIndex});
+
+ // Step 2: Classify manifests into LSM files and collect delete entries.
+ ClassifyResult classifyResult =
+ classifyManifests(
+ input,
+ fullCompaction,
+ manifestFile,
+ partitionType,
+ suggestedMetaSize,
+ manifestReadParallelism);
+ List lsmFiles = classifyResult.lsmFiles;
+
+ // Step 3: Build level-sorted runs from LSM files based on partition order.
+ List levelRuns =
+ lsmFiles.isEmpty()
+ ? new ArrayList<>()
+ : buildLevelSortedRuns(lsmFiles, fieldComparator);
+
+ // Step 4: Pick runs for compaction using size amplification and ratio strategy.
+ ManifestPickStrategy pickStrategy =
+ new ManifestPickStrategy(maxSizeAmplificationPercent, sortedRunSizeRatio);
+ List pickedRuns = pickStrategy.pick(levelRuns);
+
+ return new CompactionContext(
+ fullCompaction,
+ fieldComparator,
+ classifyResult.deleteEntries,
+ classifyResult.defaultCompactionMap,
+ levelRuns,
+ pickedRuns);
+ }
+
+ /**
+ * Classify manifest files into default-compaction group and LSM group.
+ *
+ * Full compaction: small files and files overlapping delete partitions go into
+ * defaultCompactionMap; the rest are returned as lsmFiles.
+ *
+ *
Non-full compaction: small files go to defaultCompactionMap for minor-style merge; the
+ * rest are returned as lsmFiles.
+ *
+ * @return ClassifyResult containing lsmFiles, deleteEntries, and defaultCompactionMap
+ */
+ private static ClassifyResult classifyManifests(
+ List input,
+ boolean fullCompaction,
+ ManifestFile manifestFile,
+ RowType partitionType,
+ long suggestedMetaSize,
+ @Nullable Integer manifestReadParallelism) {
+ // Initialize classification containers and read delete entries
+ Map classifiedDefaultMap = new LinkedHashMap<>();
+ List lsmFiles = new LinkedList<>(input);
+ Set classifiedDeleteEntries = Collections.emptySet();
+ PartitionPredicate predicate = null;
+ if (fullCompaction) {
+ classifiedDeleteEntries =
+ FileEntry.readDeletedEntries(manifestFile, input, manifestReadParallelism);
+
+ // Build partition predicate from delete entries for overlap detection
+ if (classifiedDeleteEntries.isEmpty()) {
+ predicate = PartitionPredicate.ALWAYS_FALSE;
+ } else {
+ if (partitionType.getFieldCount() > 0) {
+ Set deletePartitions =
+ ManifestFileMerger.computeDeletePartitions(classifiedDeleteEntries);
+ predicate = PartitionPredicate.fromMultiple(partitionType, deletePartitions);
+ } else {
+ predicate = PartitionPredicate.ALWAYS_TRUE;
+ }
+ }
+ }
+
+ // Classify each file based on size and delete-partition overlap
+ Iterator iterator = lsmFiles.iterator();
+ while (iterator.hasNext()) {
+ ManifestFileMeta file = iterator.next();
+ boolean small = file.fileSize() < suggestedMetaSize;
+ boolean inDeleteRange =
+ predicate != null
+ && predicate.test(
+ file.numAddedFiles() + file.numDeletedFiles(),
+ file.partitionStats().minValues(),
+ file.partitionStats().maxValues(),
+ file.partitionStats().nullCounts());
+ if (small || inDeleteRange) {
+ iterator.remove();
+ classifiedDefaultMap.put(file, inDeleteRange);
+ }
+ }
+
+ return new ClassifyResult(lsmFiles, classifiedDeleteEntries, classifiedDefaultMap);
+ }
+
+ /**
+ * Build level-sorted runs from a list of manifest files. Sorts files by min partition value,
+ * greedy-scans to build non-overlapping SortedRuns, then assigns levels by totalSize (Top-4
+ * largest to level 1~4, rest to level 0).
+ */
+ static List buildLevelSortedRuns(
+ List input, RecordComparator fieldComparator) {
+ // Step 1: Sort by min value (if equal, then by max value)
+ input.sort(
+ (a, b) -> {
+ int cmp =
+ fieldComparator.compare(
+ a.partitionStats().minValues(), b.partitionStats().minValues());
+ if (cmp != 0) {
+ return cmp;
+ }
+ return fieldComparator.compare(
+ a.partitionStats().maxValues(), b.partitionStats().maxValues());
+ });
+
+ // Step 2: Interval graph coloring algorithm - assign files to runs
+ // Use priority queue to track runs by their max values
+ PriorityQueue> runs =
+ new PriorityQueue<>(
+ (r1, r2) -> {
+ ManifestFileMeta last1 = r1.get(r1.size() - 1);
+ ManifestFileMeta last2 = r2.get(r2.size() - 1);
+ return fieldComparator.compare(
+ last1.partitionStats().maxValues(),
+ last2.partitionStats().maxValues());
+ });
+
+ for (ManifestFileMeta file : input) {
+ List earliestRun = runs.poll();
+ if (earliestRun == null) {
+ // No existing runs, create a new one
+ List newRun = new ArrayList<>();
+ newRun.add(file);
+ runs.offer(newRun);
+ } else if (fieldComparator.compare(
+ file.partitionStats().minValues(),
+ earliestRun.get(earliestRun.size() - 1).partitionStats().maxValues())
+ >= 0) {
+ // Current file's min >= run's max, append to this run
+ // Note: When min == max (boundary equality), files are considered
+ // non-overlapping and can be placed in the same SortedRun. This allows
+ // building fewer SortedRuns, improving compaction efficiency while
+ // maintaining correct sort order.
+ earliestRun.add(file);
+ runs.offer(earliestRun);
+ } else {
+ // Overlap detected, put the run back and create a new one
+ runs.offer(earliestRun);
+ List newRun = new ArrayList<>();
+ newRun.add(file);
+ runs.offer(newRun);
+ }
+ }
+
+ // Step 3: Convert to ManifestAdjacentSortedRun list
+ List result = new ArrayList<>();
+ while (!runs.isEmpty()) {
+ result.add(ManifestAdjacentSortedRun.fromSorted(runs.poll()));
+ }
+
+ // Step 4: Sort by totalSize and assign levels
+ result.sort(Comparator.comparingLong(ManifestAdjacentSortedRun::totalSize));
+ int n = result.size();
+ int maxLevel = ManifestPickStrategy.MAX_LEVEL;
+ for (int i = 0; i < n; i++) {
+ if (i >= n - maxLevel) {
+ result.get(i).setLevel(i - (n - maxLevel) + 1);
+ } else {
+ result.get(i).setLevel(0);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Split picked files into sections. Files with overlapping sort-key intervals go into the same
+ * section. Each section is built with pre-computed totalSize and hasDefaultCompactMeta.
+ */
+ static List splitIntoSections(
+ List pickedFiles,
+ RecordComparator fieldComparator,
+ Map defaultCompactionMap) {
+ pickedFiles.sort(
+ (a, b) -> {
+ int cmp =
+ fieldComparator.compare(
+ a.partitionStats().minValues(), b.partitionStats().minValues());
+ if (cmp != 0) {
+ return cmp;
+ }
+ return fieldComparator.compare(
+ a.partitionStats().maxValues(), b.partitionStats().maxValues());
+ });
+
+ List sections = new ArrayList<>();
+ List currentFiles = new ArrayList<>();
+ long currentTotalSize = 0;
+ boolean currentHasDefault = false;
+ ManifestFileMeta first = pickedFiles.get(0);
+ currentFiles.add(first);
+ currentTotalSize += first.fileSize();
+ currentHasDefault = defaultCompactionMap.containsKey(first);
+ BinaryRow sectionMaxBound = first.partitionStats().maxValues();
+
+ for (int i = 1; i < pickedFiles.size(); i++) {
+ ManifestFileMeta file = pickedFiles.get(i);
+ // Note: Boundary equality (file.min == sectionMaxBound) results in separate
+ // sections. This avoids merge-sort overhead while maintaining partition filtering
+ // capability. Files with non-overlapping boundaries (including equal boundaries)
+ // can be processed independently without significantly impacting partition pruning
+ // efficiency.
+ if (fieldComparator.compare(file.partitionStats().minValues(), sectionMaxBound) >= 0) {
+ sections.add(new Section(currentFiles, currentTotalSize, currentHasDefault));
+ currentFiles = new ArrayList<>();
+ currentTotalSize = 0;
+ currentFiles.add(file);
+ currentTotalSize += file.fileSize();
+ currentHasDefault = defaultCompactionMap.containsKey(file);
+ sectionMaxBound = file.partitionStats().maxValues();
+ } else {
+ currentFiles.add(file);
+ currentTotalSize += file.fileSize();
+ if (!currentHasDefault && defaultCompactionMap.containsKey(file)) {
+ currentHasDefault = true;
+ }
+ if (fieldComparator.compare(file.partitionStats().maxValues(), sectionMaxBound)
+ > 0) {
+ sectionMaxBound = file.partitionStats().maxValues();
+ }
+ }
+ }
+ sections.add(new Section(currentFiles, currentTotalSize, currentHasDefault));
+ return sections;
+ }
+
+ /**
+ * Merge small adjacent sections to avoid producing too many small rewrite batches. If either
+ * the pending section or the current section total size is smaller than {@code
+ * suggestedMetaSize}, they are combined into a single section.
+ */
+ private static List mergeSmallAdjacentSections(
+ List sections, long suggestedMetaSize) {
+ List merged = new ArrayList<>();
+ Section pending = null;
+
+ for (Section section : sections) {
+ if (pending == null) {
+ pending = section;
+ } else {
+ if (pending.totalSize < suggestedMetaSize
+ || section.totalSize < suggestedMetaSize) {
+ pending = Section.merge(pending, section);
+ } else {
+ merged.add(pending);
+ pending = section;
+ }
+ }
+ }
+ if (pending != null) {
+ merged.add(pending);
+ }
+ return merged;
+ }
+
+ /** Rewrite sections with budget control. */
+ private static void rewriteSections(
+ List sections,
+ RewriteOutput output,
+ List sortNewFiles,
+ CompactionContext ctx,
+ ManifestFile manifestFile,
+ long suggestedMetaSize,
+ int suggestedMinMetaCount,
+ long maxRewriteSize,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ long processedSize = 0;
+ boolean reachedLimit = false;
+
+ for (int i = 0; i < sections.size(); i++) {
+ Section section = sections.get(i);
+ if (section.files.size() == 1) {
+ sortAndRewriteSection(
+ section.files,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ manifestReadParallelism);
+ continue;
+ }
+
+ if (processedSize + section.totalSize <= maxRewriteSize) {
+ processedSize += section.totalSize;
+ sortAndRewriteSection(
+ section.files,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ manifestReadParallelism);
+ } else if (!reachedLimit) {
+ long rewriteTotalSize = maxRewriteSize - processedSize;
+ processedSize += section.totalSize;
+ List rewriteFiles = new ArrayList<>();
+ List remainingFiles = new ArrayList<>();
+ long rewriteSize = 0;
+ long remainingSize = 0;
+ boolean remainingHasDefault = false;
+
+ for (ManifestFileMeta file : section.files) {
+ if (rewriteSize + file.fileSize() <= rewriteTotalSize) {
+ rewriteFiles.add(file);
+ rewriteSize += file.fileSize();
+ } else {
+ remainingFiles.add(file);
+ remainingSize += file.fileSize();
+ if (ctx.defaultCompactionMap.containsKey(file)) {
+ remainingHasDefault = true;
+ }
+ }
+ }
+
+ sortAndRewriteSection(
+ rewriteFiles,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ manifestReadParallelism);
+
+ if (!remainingFiles.isEmpty()) {
+ Section remainingSection =
+ new Section(remainingFiles, remainingSize, remainingHasDefault);
+ sections.add(remainingSection);
+ }
+ reachedLimit = true;
+ } else if (section.hasDefaultCompactMeta) {
+ rewriteSubSegments(
+ section.files,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ suggestedMetaSize,
+ suggestedMinMetaCount,
+ manifestReadParallelism);
+ } else {
+ output.addAllUnchanged(section.files);
+ }
+ }
+ }
+
+ /** Rewrite sub-segments within a section that exceeded the budget. */
+ private static void rewriteSubSegments(
+ List section,
+ RewriteOutput output,
+ List sortNewFiles,
+ CompactionContext ctx,
+ ManifestFile manifestFile,
+ long suggestedMetaSize,
+ int suggestedMinMetaCount,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ List subSegment = new ArrayList<>();
+ long subSegmentSize = 0;
+ for (ManifestFileMeta m : section) {
+ subSegmentSize += m.fileSize();
+ subSegment.add(m);
+
+ if (subSegmentSize >= suggestedMetaSize) {
+ sortAndRewriteSection(
+ subSegment,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ manifestReadParallelism);
+ subSegment.clear();
+ subSegmentSize = 0;
+ }
+ }
+ // Flush tail only if delete entries exist or file count >= minCount.
+ if (!subSegment.isEmpty()) {
+ if (!ctx.deleteEntries.isEmpty() || subSegment.size() >= suggestedMinMetaCount) {
+ sortAndRewriteSection(
+ subSegment,
+ output,
+ sortNewFiles,
+ ctx,
+ manifestFile,
+ manifestReadParallelism);
+ } else {
+ output.addAllUnchanged(subSegment);
+ }
+ }
+ }
+
+ /**
+ * Sort and rewrite a section. Dispatches to full or minor compact path.
+ *
+ * sortNewFiles is the same reference as newFilesForAbort, ensuring newly written files are
+ * cleaned up on exception by the caller's catch block.
+ */
+ private static void sortAndRewriteSection(
+ List section,
+ RewriteOutput output,
+ List sortNewFiles,
+ CompactionContext ctx,
+ ManifestFile manifestFile,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ // Skip rewrite for single file not in delete-range.
+ if (section.size() == 1 && !ctx.defaultCompactionMap.getOrDefault(section.get(0), false)) {
+ output.addUnchanged(section.get(0));
+ return;
+ }
+
+ if (ctx.fullCompaction) {
+ sortAndRewriteFull(
+ section, output, sortNewFiles, ctx, manifestFile, manifestReadParallelism);
+ } else {
+ sortAndRewriteMinor(
+ section, output, sortNewFiles, ctx, manifestFile, manifestReadParallelism);
+ }
+ }
+
+ /**
+ * Full compaction path: read all surviving entries (ADD merged with DELETE), sort them
+ * together, and write to output as a single sorted stream.
+ */
+ private static void sortAndRewriteFull(
+ List section,
+ RewriteOutput output,
+ List sortNewFiles,
+ CompactionContext ctx,
+ ManifestFile manifestFile,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ // Read surviving ADD entries: filter out entries cancelled by deleteEntries.
+ Function> reader =
+ meta -> {
+ List batch = new ArrayList<>();
+ for (ManifestEntry entry :
+ manifestFile.read(
+ meta.fileName(),
+ meta.fileSize(),
+ FileEntry.addFilter(),
+ Filter.alwaysTrue())) {
+ if (!ctx.deleteEntries.contains(entry.identifier())) {
+ batch.add(entry);
+ }
+ }
+ return batch;
+ };
+
+ List entries = new ArrayList<>();
+ for (ManifestEntry entry :
+ sequentialBatchedExecute(reader, section, manifestReadParallelism)) {
+ entries.add(entry);
+ }
+
+ if (!entries.isEmpty()) {
+ List sorted =
+ sortAndWriteEntries(entries, ctx.fieldComparator, manifestFile);
+ output.addSortedFiles(sorted);
+ sortNewFiles.addAll(sorted);
+ }
+ }
+
+ /**
+ * Minor compaction path: read entries with ADD/DELETE classified in a single pass per file,
+ * then sort each group independently and write them to output.
+ *
+ * Each file is read in parallel (via sequentialBatchedExecute). The reader classifies
+ * entries into ADD and DELETE within each file, returning a Pair. Results are merged in the
+ * main thread.
+ */
+ private static void sortAndRewriteMinor(
+ List section,
+ RewriteOutput output,
+ List sortNewFiles,
+ CompactionContext ctx,
+ ManifestFile manifestFile,
+ @Nullable Integer manifestReadParallelism)
+ throws Exception {
+ // Read and classify ADD/DELETE in one pass per file.
+ Function, List>>> reader =
+ meta -> {
+ List addBatch = new ArrayList<>();
+ List deleteBatch = new ArrayList<>();
+ for (ManifestEntry entry :
+ manifestFile.read(meta.fileName(), meta.fileSize())) {
+ if (entry.kind() == FileKind.ADD) {
+ addBatch.add(entry);
+ } else {
+ deleteBatch.add(entry);
+ }
+ }
+ return singletonList(Pair.of(addBatch, deleteBatch));
+ };
+
+ Map addMap = new HashMap<>();
+ List minorDeleteEntries = new ArrayList<>();
+ for (Pair, List> pair :
+ sequentialBatchedExecute(reader, section, manifestReadParallelism)) {
+ for (ManifestEntry entry : pair.getLeft()) {
+ addMap.put(entry.identifier(), entry);
+ }
+ minorDeleteEntries.addAll(pair.getRight());
+ }
+
+ // Cancel out ADD+DELETE pairs with the same identifier within the section.
+ minorDeleteEntries.removeIf(
+ manifestEntry -> addMap.remove(manifestEntry.identifier()) != null);
+ List addEntries = new ArrayList<>(addMap.values());
+
+ if (!addEntries.isEmpty()) {
+ List sorted =
+ sortAndWriteEntries(addEntries, ctx.fieldComparator, manifestFile);
+ output.addSortedFiles(sorted);
+ sortNewFiles.addAll(sorted);
+ }
+
+ if (!minorDeleteEntries.isEmpty()) {
+ List sorted =
+ sortAndWriteEntries(minorDeleteEntries, ctx.fieldComparator, manifestFile);
+ output.addDeleteFiles(sorted);
+ sortNewFiles.addAll(sorted);
+ }
+ }
+
+ /** Sort entries and write them to a new manifest file with proper error handling. */
+ private static List sortAndWriteEntries(
+ List entries,
+ RecordComparator fieldComparator,
+ ManifestFile manifestFile)
+ throws Exception {
+ entries.sort((a, b) -> compareSortKey(a, b, fieldComparator));
+ RollingFileWriter writer =
+ manifestFile.createRollingWriter();
+ Exception exception = null;
+ try {
+ writer.write(entries);
+ } catch (Exception e) {
+ exception = e;
+ } finally {
+ if (exception != null) {
+ writer.abort();
+ throw exception;
+ }
+ writer.close();
+ }
+ return writer.result();
+ }
+
+ /**
+ * Compare two {@link ManifestEntry}s by the composite key {@code (sort-field, kind, fileName)}.
+ * {@code fileName} is used as the tie-breaker so that all entries sharing the same sort-field
+ * value AND the same data file are emitted contiguously.
+ */
+ static int compareSortKey(ManifestEntry a, ManifestEntry b, RecordComparator fieldComparator) {
+ int c = fieldComparator.compare(a.partition(), b.partition());
+ if (c != 0) {
+ return c;
+ }
+ // ADD before DELETE
+ int kindCmp = a.kind().compareTo(b.kind());
+ if (kindCmp != 0) {
+ return kindCmp;
+ }
+ return a.file().fileName().compareTo(b.file().fileName());
+ }
+
+ /**
+ * Resolve the partition field to sort manifests by.
+ *
+ * Resolution rules:
+ *
+ *
+ * - If {@code manifest-sort.partition-field} is configured, return that value.
+ *
- Otherwise, default to the first partition field.
+ *
+ */
+ static String resolveSortField(String sortPartitionField, RowType partitionType) {
+ if (sortPartitionField != null && !sortPartitionField.isEmpty()) {
+ return sortPartitionField;
+ }
+ return partitionType.getFieldNames().get(0);
+ }
+
+ /** Strategy interface for writing compaction results. */
+ interface RewriteOutput {
+ void addUnchanged(ManifestFileMeta file);
+
+ void addAllUnchanged(List files);
+
+ void addSortedFiles(List files);
+
+ void addDeleteFiles(List files);
+ }
+
+ private static class FullCompactOutput implements RewriteOutput {
+ private final List result;
+
+ FullCompactOutput(List result) {
+ this.result = result;
+ }
+
+ @Override
+ public void addUnchanged(ManifestFileMeta file) {
+ result.add(file);
+ }
+
+ @Override
+ public void addAllUnchanged(List files) {
+ result.addAll(files);
+ }
+
+ @Override
+ public void addSortedFiles(List files) {
+ result.addAll(files);
+ }
+
+ @Override
+ public void addDeleteFiles(List files) {
+ result.addAll(files);
+ }
+ }
+
+ private static class MinorCompactOutput implements RewriteOutput {
+ private final List> result;
+ private final Pair indexRange;
+ private final Map fileNameToIndex;
+
+ MinorCompactOutput(
+ List> result,
+ Pair indexRange,
+ Map fileNameToIndex) {
+ this.result = result;
+ this.indexRange = indexRange;
+ this.fileNameToIndex = fileNameToIndex;
+ }
+
+ @Override
+ public void addUnchanged(ManifestFileMeta file) {
+ Integer idx = fileNameToIndex.get(file.fileName());
+ result.get(idx).add(file);
+ }
+
+ @Override
+ public void addAllUnchanged(List files) {
+ for (ManifestFileMeta file : files) {
+ addUnchanged(file);
+ }
+ }
+
+ @Override
+ public void addSortedFiles(List files) {
+ result.get(indexRange.getLeft()).addAll(files);
+ }
+
+ @Override
+ public void addDeleteFiles(List files) {
+ result.get(indexRange.getRight()).addAll(files);
+ }
+ }
+
+ /** A section of manifest files with pre-computed metadata. */
+ static class Section {
+ final List files;
+ final long totalSize;
+ final boolean hasDefaultCompactMeta;
+
+ Section(List files, long totalSize, boolean hasDefaultCompactMeta) {
+ this.files = files;
+ this.totalSize = totalSize;
+ this.hasDefaultCompactMeta = hasDefaultCompactMeta;
+ }
+
+ /** Create a merged section from two sections. */
+ static Section merge(Section a, Section b) {
+ List merged = new ArrayList<>(a.files);
+ merged.addAll(b.files);
+ return new Section(
+ merged,
+ a.totalSize + b.totalSize,
+ a.hasDefaultCompactMeta || b.hasDefaultCompactMeta);
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestPickStrategy.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestPickStrategy.java
new file mode 100644
index 000000000000..519c49676ce3
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestPickStrategy.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.utils.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Pick strategy for manifest LSM Tree compaction.
+ *
+ * Strategy priority:
+ *
+ *
+ * - SizeAmp: if all lower-level runs' total size exceeds the highest-level run's size
+ * times {@code sizeAmpThreshold}, trigger full compaction (pick all runs).
+ *
- SizeRatio: from low to high, pick adjacent runs whose amplification factor is less
+ * than {@code sizeRatioThreshold}.
+ *
- Forced pick: level0 and level1 runs are always picked.
+ *
+ */
+public class ManifestPickStrategy {
+
+ public static final int MAX_LEVEL = 4;
+
+ private final int sizeAmpThreshold;
+ private final int sizeRatioThreshold;
+
+ public ManifestPickStrategy(int sizeAmpThreshold, int sizeRatioThreshold) {
+ Preconditions.checkArgument(sizeAmpThreshold > 0, "sizeAmpThreshold must be positive");
+ Preconditions.checkArgument(sizeRatioThreshold > 0, "sizeRatioThreshold must be positive");
+ this.sizeAmpThreshold = sizeAmpThreshold;
+ this.sizeRatioThreshold = sizeRatioThreshold;
+ }
+
+ /**
+ * Pick runs that need compaction from the given level runs.
+ *
+ * @param levelRuns runs with assigned levels (level 0~4)
+ * @return list of picked runs to compact
+ */
+ public List pick(List levelRuns) {
+ if (levelRuns.isEmpty() || levelRuns.size() <= MAX_LEVEL) {
+ return new ArrayList<>();
+ }
+
+ // Try SizeAmp first
+ List sizeAmpResult = pickForSizeAmp(levelRuns);
+ if (sizeAmpResult != null) {
+ return sizeAmpResult;
+ }
+
+ // SizeRatio + forced pick
+ return pickForSizeRatioAndForce(levelRuns);
+ }
+
+ /**
+ * SizeAmp check: if all lower-level (0~3) runs' total size exceeds the highest-level run's size
+ * by more than {@code sizeAmpThreshold} percent, pick all runs for full compaction.
+ *
+ * Formula (consistent with {@code UniversalCompaction#pickForSizeAmp}): {@code
+ * lowerLevelTotalSize * 100 > sizeAmpThreshold * highestRunSize}
+ */
+ private List pickForSizeAmp(
+ List levelRuns) {
+ if (levelRuns.isEmpty()) {
+ return null;
+ }
+
+ // The last run has the highest level (set by buildLevelSortedRuns)
+ ManifestAdjacentSortedRun highestRun = levelRuns.get(levelRuns.size() - 1);
+ int maxLevel = highestRun.level();
+
+ if (maxLevel <= 0) {
+ return null;
+ }
+
+ long lowerLevelTotalSize = 0;
+ for (ManifestAdjacentSortedRun run : levelRuns) {
+ if (run.level() < maxLevel) {
+ lowerLevelTotalSize += run.totalSize();
+ }
+ }
+
+ // size amplification = percentage of additional size
+ if (lowerLevelTotalSize * 100 > (long) sizeAmpThreshold * highestRun.totalSize()) {
+ return new ArrayList<>(levelRuns);
+ }
+ return null;
+ }
+
+ /**
+ * SizeRatio + forced pick.
+ *
+ *
+ * - Level0 and level1 are always picked.
+ *
- From low to high, if the cumulative picked size with ratio amplification covers the
+ * next run's size, continue picking.
+ *
+ *
+ * Formula (consistent with {@code UniversalCompaction#pickForSizeRatio}): {@code pickedSize
+ * * (100.0 + sizeRatioThreshold) / 100.0 >= nextRunSize}
+ */
+ private List pickForSizeRatioAndForce(
+ List levelRuns) {
+ // levelRuns is already sorted by level ascending (set by buildLevelSortedRuns)
+ List picked = new ArrayList<>();
+
+ // Always pick the first run to guarantee a non-empty result.
+ picked.add(levelRuns.get(0));
+ long pickedSize = levelRuns.get(0).totalSize();
+
+ // From the second run onward: forced pick level0/level1, then SizeRatio for the rest.
+ for (int i = 1; i < levelRuns.size(); i++) {
+ ManifestAdjacentSortedRun run = levelRuns.get(i);
+ if (run.level() <= 1) {
+ picked.add(run);
+ pickedSize += run.totalSize();
+ } else {
+ long nextRunSize = run.totalSize();
+ if (pickedSize * (100 + sizeRatioThreshold) >= nextRunSize * 100L) {
+ picked.add(run);
+ pickedSize += nextRunSize;
+ }
+ }
+ }
+ if (picked.size() == 1) {
+ return new ArrayList<>();
+ }
+ return picked;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 4ffc3ec0259e..48939fff6ceb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -314,6 +314,8 @@ public static void validateTableSchema(TableSchema schema) {
validateChangelogReadSequenceNumber(schema, options);
validatePkClusteringOverride(options);
+
+ validateManifestSort(schema, options);
}
public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) {
@@ -1013,4 +1015,22 @@ public static void validatePkClusteringOverride(CoreOptions options) {
}
}
}
+
+ private static void validateManifestSort(TableSchema schema, CoreOptions options) {
+ if (options.manifestSortEnabled()) {
+ checkArgument(
+ !schema.partitionKeys().isEmpty(),
+ "Cannot enable '%s' for non-partition table.",
+ CoreOptions.MANIFEST_SORT_ENABLED.key());
+ String sortPartitionField = options.manifestSortPartitionField();
+ if (sortPartitionField != null && !sortPartitionField.isEmpty()) {
+ checkArgument(
+ schema.partitionKeys().contains(sortPartitionField),
+ "'%s' = '%s' is not a partition field. Available partition fields: %s.",
+ CoreOptions.MANIFEST_SORT_PARTITION_FIELD.key(),
+ sortPartitionField,
+ schema.partitionKeys());
+ }
+ }
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
index 36b0d15f114f..462ab337ee73 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
@@ -18,16 +18,26 @@
package org.apache.paimon.manifest;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.SeekableInputStreamWrapper;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.operation.ManifestFileMerger;
+import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.stats.StatsTestUtils;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -42,6 +52,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -84,9 +95,16 @@ public void testMergeWithoutFullCompaction(int numLastBits) {
createData(numLastBits, input, expected);
// no trigger Full Compaction
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "9223372036854775807B");
List actual =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, Long.MAX_VALUE, getPartitionType(), null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertThat(actual).hasSameSizeAs(expected);
// these two manifest files are merged from the input
@@ -118,14 +136,16 @@ private void testCleanUp(List input, long fullCompactionThresh
ManifestFile failingManifestFile =
createManifestFile(FailingFileIO.getFailingPath(failingName, tempDir.toString()));
try {
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set(
+ "manifest.full-compaction-threshold-size", fullCompactionThreshold + "B");
ManifestFileMerger.merge(
input,
failingManifestFile,
- 500,
- 3,
- fullCompactionThreshold,
getPartitionType(),
- null);
+ CoreOptions.fromMap(testOptions.toMap()));
} catch (Throwable e) {
assertThat(e).hasRootCauseExactlyInstanceOf(FailingFileIO.ArtificialException.class);
// old files should be kept untouched, while new files should be cleaned up
@@ -156,9 +176,16 @@ public void testMerge() {
// delta with delete apply partition 1,2
addDeltaManifests(input, true);
// trigger full compaction
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(), null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
// 1st Manifest don't need to Merge
assertSameContent(input.get(0), merged.get(0), manifestFile);
@@ -173,9 +200,16 @@ public void testMergeWithoutDelta() {
// base
List input = createBaseManifestFileMetas(true);
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(), null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertEquivalentEntries(input, merged);
assertThat(merged).hasSameElementsAs(input);
@@ -186,9 +220,16 @@ public void testMergeWithoutDelta() {
ManifestFileMeta delta = makeManifest(makeEntry(true, "A", 1), makeEntry(false, "A", 1));
input1.add(delta);
+ Options testOptions1 = new Options();
+ testOptions1.set("manifest.target-file-size", "500B");
+ testOptions1.set("manifest.merge-min-count", "3");
+ testOptions1.set("manifest.full-compaction-threshold-size", "200B");
List merged1 =
ManifestFileMerger.merge(
- input1, manifestFile, 500, 3, 200, getPartitionType(), null);
+ input1,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions1.toMap()));
assertThat(base).hasSameElementsAs(merged1);
assertEquivalentEntries(input1, merged1);
@@ -198,9 +239,16 @@ public void testMergeWithoutDelta() {
public void testMergeWithoutBase() {
List input = new ArrayList<>();
addDeltaManifests(input, true);
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(), null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertEquivalentEntries(input, merged);
}
@@ -225,9 +273,16 @@ public void testMergeWithoutDeleteFile() {
input.add(makeManifest(makeEntry(true, "F")));
input.add(makeManifest(makeEntry(true, "G")));
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(), null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertEquivalentEntries(input, merged);
}
@@ -489,9 +544,16 @@ public void testMergeFullCompactionWithoutDeleteFile() {
input.add(makeManifest(makeEntry(true, "F")));
input.add(makeManifest(makeEntry(true, "G")));
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", threshold + "B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List merged =
ManifestFileMerger.merge(
- input, manifestFile, threshold, 3, 200, getPartitionType(), null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertEquivalentEntries(
input.stream()
.filter(f -> !baseFiles.contains(f.fileName()))
@@ -819,4 +881,415 @@ private void beforeFirstRead() throws IOException {
}
}
}
+
+ // ==================== Manifest Sort Tests ====================
+
+ /**
+ * Test manifest sort with overlapping partition ranges. Each manifest contains entries spanning
+ * multiple partitions, creating overlapping intervals that require sort rewrite to resolve.
+ * After sort rewrite, all surviving ADD entries should be sorted by partition field.
+ */
+ @Test
+ public void testManifestSortWithOverlappingPartitions() {
+ List input = new ArrayList<>();
+
+ // manifest-A: partitions [5, 13]
+ List entriesA = new ArrayList<>();
+ for (int p = 5; p <= 13; p++) {
+ entriesA.add(makeEntry(true, String.format("A-p%d", p), p));
+ }
+ input.add(makeManifest(entriesA.toArray(new ManifestEntry[0])));
+
+ // manifest-B: partitions [0, 9]
+ List entriesB = new ArrayList<>();
+ for (int p = 0; p <= 9; p++) {
+ entriesB.add(makeEntry(true, String.format("B-p%d", p), p));
+ }
+ input.add(makeManifest(entriesB.toArray(new ManifestEntry[0])));
+
+ // manifest-C: partitions [3, 7] -- overlaps with A and B
+ List entriesC = new ArrayList<>();
+ for (int p = 3; p <= 7; p++) {
+ entriesC.add(makeEntry(true, String.format("C-p%d", p), p));
+ }
+ input.add(makeManifest(entriesC.toArray(new ManifestEntry[0])));
+
+ // manifest-D: partitions [8, 12] -- overlaps with A
+ List entriesD = new ArrayList<>();
+ for (int p = 8; p <= 12; p++) {
+ entriesD.add(makeEntry(true, String.format("D-p%d", p), p));
+ }
+ input.add(makeManifest(entriesD.toArray(new ManifestEntry[0])));
+
+ // manifest-E: partitions [1, 6] -- overlaps with B and C
+ List entriesE = new ArrayList<>();
+ for (int p = 1; p <= 6; p++) {
+ entriesE.add(makeEntry(true, String.format("E-p%d", p), p));
+ }
+ input.add(makeManifest(entriesE.toArray(new ManifestEntry[0])));
+
+ // manifest-F: partitions [4, 14] -- overlaps with D
+ List entriesF = new ArrayList<>();
+ for (int p = 4; p <= 14; p++) {
+ entriesF.add(makeEntry(true, String.format("F-p%d", p), p));
+ }
+ input.add(makeManifest(entriesF.toArray(new ManifestEntry[0])));
+
+ Options testOptions = new Options();
+ testOptions.set("manifest-sort.enabled", "true");
+ List merged =
+ ManifestFileMerger.merge(
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
+
+ // Verify entries are equivalent (no data loss)
+ assertEquivalentEntries(input, merged);
+
+ // Verify all entries within each output manifest are sorted by partition
+ for (ManifestFileMeta meta : merged) {
+ List entries = manifestFile.read(meta.fileName(), meta.fileSize());
+ for (int i = 1; i < entries.size(); i++) {
+ int prevPartition = entries.get(i - 1).partition().getInt(0);
+ int currPartition = entries.get(i).partition().getInt(0);
+ assertThat(currPartition)
+ .as("Entries within a manifest should be sorted by partition")
+ .isGreaterThanOrEqualTo(prevPartition);
+ }
+ }
+ }
+
+ /**
+ * Test that sort rewrite correctly eliminates DELETE entries and their corresponding ADD
+ * entries. The key condition is that totalDeltaFileSize must reach manifestFullCompactionSize
+ * to trigger the full compaction path inside trySortRewrite, which reads deleteEntries and
+ * passes them to sortAndRewriteSection for elimination.
+ *
+ * Design:
+ *
+ *
+ * - Base manifests with overlapping partitions (all ADD, large enough to be "mustChange"
+ * since fileSize < suggestedMetaSize):
+ * manifest-A: partitions [0, 4] with entries A-p0..A-p4
+ * manifest-B: partitions [2, 6] with entries B-p2..B-p6 (overlaps A)
+ * manifest-C: partitions [5, 9] with entries C-p5..C-p9 (overlaps B)
+ * - Delta manifests with DELETE entries (cancel some ADD entries):
+ * manifest-D: DELETE A-p2, DELETE B-p4, ADD new-p2, ADD new-p4
+ * manifest-E: DELETE C-p7, ADD new-p7
+ * - After sort rewrite: A-p2, B-p4, C-p7 should be eliminated,
+ * replaced by new-p2, new-p4, new-p7. Output should only contain ADD entries,
+ * sorted by partition.
+ *
+ */
+ @Test
+ public void testManifestSortEliminatesDeleteEntries() {
+ List input = new ArrayList<>();
+
+ // manifest-A: partitions [0, 4]
+ List entriesA = new ArrayList<>();
+ for (int p = 0; p <= 4; p++) {
+ entriesA.add(makeEntry(true, String.format("A-p%d", p), p));
+ }
+ input.add(makeManifest(entriesA.toArray(new ManifestEntry[0])));
+
+ // manifest-B: partitions [2, 6] -- overlaps A
+ List entriesB = new ArrayList<>();
+ for (int p = 2; p <= 6; p++) {
+ entriesB.add(makeEntry(true, String.format("B-p%d", p), p));
+ }
+ input.add(makeManifest(entriesB.toArray(new ManifestEntry[0])));
+
+ // manifest-C: partitions [5, 9] -- overlaps B
+ List entriesC = new ArrayList<>();
+ for (int p = 5; p <= 9; p++) {
+ entriesC.add(makeEntry(true, String.format("C-p%d", p), p));
+ }
+ input.add(makeManifest(entriesC.toArray(new ManifestEntry[0])));
+
+ // manifest-D: DELETE A-p2, DELETE B-p4, ADD new-p2, ADD new-p4
+ input.add(
+ makeManifest(
+ makeEntry(false, "A-p2", 2),
+ makeEntry(false, "B-p4", 4),
+ makeEntry(true, "new-p2", 2),
+ makeEntry(true, "new-p4", 4)));
+
+ // manifest-E: DELETE C-p7, ADD new-p7
+ input.add(makeManifest(makeEntry(false, "C-p7", 7), makeEntry(true, "new-p7", 7)));
+
+ Options testOptions = new Options();
+ testOptions.set("manifest-sort.enabled", "true");
+ testOptions.set("manifest.full-compaction-threshold-size", "10B");
+
+ List merged =
+ ManifestFileMerger.merge(
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
+
+ // Collect all output entries
+ List allOutputEntries = new ArrayList<>();
+ for (ManifestFileMeta meta : merged) {
+ allOutputEntries.addAll(manifestFile.read(meta.fileName(), meta.fileSize()));
+ }
+
+ // Verify: no DELETE entries in output (all DELETE pairs eliminated)
+ long deleteCount =
+ allOutputEntries.stream().filter(e -> e.kind() == FileKind.DELETE).count();
+ assertThat(deleteCount).as("Sort rewrite should eliminate all DELETE entries").isEqualTo(0);
+
+ // Verify: the deleted ADD entries (A-p2, B-p4, C-p7) are NOT in output
+ Set outputFileNames =
+ allOutputEntries.stream().map(e -> e.file().fileName()).collect(Collectors.toSet());
+ assertThat(outputFileNames).doesNotContain("A-p2", "B-p4", "C-p7");
+
+ // Verify: the replacement entries (new-p2, new-p4, new-p7) ARE in output
+ assertThat(outputFileNames).contains("new-p2", "new-p4", "new-p7");
+
+ // Verify: all surviving entries match what FileEntry.mergeEntries would produce
+ assertEquivalentEntries(input, merged);
+
+ // Verify entries within each output manifest are sorted by partition
+ for (ManifestFileMeta meta : merged) {
+ List entries = manifestFile.read(meta.fileName(), meta.fileSize());
+ for (int i = 1; i < entries.size(); i++) {
+ int prevPartition = entries.get(i - 1).partition().getInt(0);
+ int currPartition = entries.get(i).partition().getInt(0);
+ assertThat(currPartition)
+ .as("Entries within manifest should be sorted by partition")
+ .isGreaterThanOrEqualTo(prevPartition);
+ }
+ }
+ }
+
+ /**
+ * Test manifest sort with a multi-field partition type.
+ *
+ * Setup: partition=(region INT, dt INT, hour INT), sort by dt (field index=1). 9 manifest
+ * files form 6 overlapping sorted runs by dt range:
+ *
+ *
+ * Run1: 3 files, dt=[0,15],[3,5],[6,8]
+ * Run2: 2 files, dt=[1,8],[5,7]
+ * Run3: 1 file, dt=[0,9]
+ * Run4: 1 file, dt=[5,14]
+ * Run5: 1 file, dt=[8,15]
+ * Run6: 1 file, dt=[4,12]
+ *
+ *
+ * Verifies: 1) no data loss after sort-rewrite, 2) entries within each output manifest are
+ * sorted by dt.
+ */
+ @Test
+ public void testManifestSortWithMultiplePartitions() {
+ // Use a 3-field partition type: (region INT, dt INT, hour INT)
+ RowType multiPartitionType = RowType.of(new IntType(), new IntType(), new IntType());
+
+ // Create a dedicated ManifestFile for the 3-field partition type
+ Path path = new Path(tempDir.toString());
+ FileIO fileIO = FileIOFinder.find(path);
+ ManifestFile multiPartManifestFile =
+ new ManifestFile.Factory(
+ fileIO,
+ new SchemaManager(fileIO, path),
+ multiPartitionType,
+ avro,
+ "zstd",
+ new FileStorePathFactory(
+ path,
+ multiPartitionType,
+ "default",
+ CoreOptions.FILE_FORMAT.defaultValue(),
+ CoreOptions.DATA_FILE_PREFIX.defaultValue(),
+ CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+ CoreOptions.PARTITION_GENERATE_LEGACY_NAME.defaultValue(),
+ CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue(),
+ null,
+ null,
+ CoreOptions.ExternalPathStrategy.NONE,
+ null,
+ false,
+ null),
+ Long.MAX_VALUE,
+ null)
+ .create();
+
+ List input = new ArrayList<>();
+
+ // Run1
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r1a-p0", 10, 0, 1),
+ makeMultiPartEntry(true, "r1a-p1", 20, 1, 2),
+ makeMultiPartEntry(true, "r1a-p2", 30, 15, 3)))
+ .get(0));
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r1b-p3", 10, 3, 4),
+ makeMultiPartEntry(true, "r1b-p4", 20, 4, 5),
+ makeMultiPartEntry(true, "r1b-p5", 30, 5, 6)))
+ .get(0));
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r1c-p6", 10, 6, 7),
+ makeMultiPartEntry(true, "r1c-p7", 20, 7, 8),
+ makeMultiPartEntry(true, "r1c-p8", 30, 8, 9)))
+ .get(0));
+
+ // Run2
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r2a-p1", 5, 1, 10),
+ makeMultiPartEntry(true, "r2a-p2", 15, 2, 11),
+ makeMultiPartEntry(true, "r2a-p3", 25, 3, 12),
+ makeMultiPartEntry(true, "r2a-p4", 35, 8, 13)))
+ .get(0));
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r2b-p5", 5, 5, 14),
+ makeMultiPartEntry(true, "r2b-p6", 15, 6, 15),
+ makeMultiPartEntry(true, "r2b-p7", 25, 7, 16)))
+ .get(0));
+
+ // Run3
+ List run3Entries = new ArrayList<>();
+ for (int p = 0; p <= 9; p++) {
+ run3Entries.add(makeMultiPartEntry(true, String.format("r3-p%d", p), 99, p, p + 20));
+ }
+ input.add(multiPartManifestFile.write(run3Entries).get(0));
+
+ // Run4
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r4a-p10", 10, 5, 30),
+ makeMultiPartEntry(true, "r4a-p11", 20, 11, 31),
+ makeMultiPartEntry(true, "r4a-p12", 30, 12, 32),
+ makeMultiPartEntry(true, "r4a-p13", 40, 13, 33),
+ makeMultiPartEntry(true, "r4a-p14", 50, 14, 34)))
+ .get(0));
+
+ // Run5
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r5a-p11", 11, 8, 40),
+ makeMultiPartEntry(true, "r5a-p12", 21, 12, 41),
+ makeMultiPartEntry(true, "r5a-p13", 31, 13, 42),
+ makeMultiPartEntry(true, "r5a-p14", 41, 14, 43),
+ makeMultiPartEntry(true, "r5a-p15", 51, 15, 44)))
+ .get(0));
+
+ // Run6
+ input.add(
+ multiPartManifestFile
+ .write(
+ Arrays.asList(
+ makeMultiPartEntry(true, "r6a-p7", 7, 4, 50),
+ makeMultiPartEntry(true, "r6a-p8", 17, 8, 51),
+ makeMultiPartEntry(true, "r6a-p9", 27, 9, 52),
+ makeMultiPartEntry(true, "r6a-p10", 37, 10, 53),
+ makeMultiPartEntry(true, "r6a-p11", 47, 11, 54),
+ makeMultiPartEntry(true, "r6a-p12", 57, 12, 55)))
+ .get(0));
+
+ Options testOptions = new Options();
+ testOptions.set("manifest-sort.enabled", "true");
+ // Sort by the second partition field "f1" (dt)
+ testOptions.set("manifest-sort.partition-field", "f1");
+ List merged =
+ ManifestFileMerger.merge(
+ input,
+ multiPartManifestFile,
+ multiPartitionType,
+ CoreOptions.fromMap(testOptions.toMap()));
+
+ // Verify no data loss
+ List inputEntries =
+ input.stream()
+ .flatMap(
+ f ->
+ multiPartManifestFile.read(f.fileName(), f.fileSize())
+ .stream())
+ .collect(Collectors.toList());
+ List entryBeforeMerge =
+ FileEntry.mergeEntries(inputEntries).stream()
+ .filter(entry -> entry.kind() == FileKind.ADD)
+ .map(entry -> entry.kind() + "-" + entry.file().fileName())
+ .collect(Collectors.toList());
+ List entryAfterMerge = new ArrayList<>();
+ for (ManifestFileMeta meta : merged) {
+ for (ManifestEntry entry :
+ multiPartManifestFile.read(meta.fileName(), meta.fileSize())) {
+ entryAfterMerge.add(entry.kind() + "-" + entry.file().fileName());
+ }
+ }
+ assertThat(entryBeforeMerge).hasSameElementsAs(entryAfterMerge);
+
+ // Verify entries within each output manifest are sorted by the second field (dt)
+ for (ManifestFileMeta meta : merged) {
+ List entries =
+ multiPartManifestFile.read(meta.fileName(), meta.fileSize());
+ for (int i = 1; i < entries.size(); i++) {
+ int prevDt = entries.get(i - 1).partition().getInt(1);
+ int currDt = entries.get(i).partition().getInt(1);
+ assertThat(currDt)
+ .as("Entries within manifest should be sorted by partition")
+ .isGreaterThanOrEqualTo(prevDt);
+ }
+ }
+ }
+
+ /** Create a ManifestEntry with a 3-field partition row (region, dt, hour). */
+ private ManifestEntry makeMultiPartEntry(
+ boolean isAdd, String fileName, int region, int dt, int hour) {
+ BinaryRow binaryRow = new BinaryRow(3);
+ BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
+ writer.writeInt(0, region);
+ writer.writeInt(1, dt);
+ writer.writeInt(2, hour);
+ writer.complete();
+
+ return ManifestEntry.create(
+ isAdd ? FileKind.ADD : FileKind.DELETE,
+ binaryRow,
+ 0,
+ 0,
+ DataFileMeta.create(
+ fileName,
+ 0,
+ 0,
+ binaryRow,
+ binaryRow,
+ StatsTestUtils.newEmptySimpleStats(),
+ StatsTestUtils.newEmptySimpleStats(),
+ 0,
+ 0,
+ 0,
+ 0,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(200000),
+ 0L,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ null,
+ null));
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
index 591b3206518d..66465f1e7531 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
@@ -18,7 +18,9 @@
package org.apache.paimon.manifest;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.operation.ManifestFileMerger;
+import org.apache.paimon.options.Options;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.BeforeEach;
@@ -49,9 +51,16 @@ public void testMerge() {
List input = createBaseManifestFileMetas(false);
addDeltaManifests(input, false);
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", "500B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List merged =
ManifestFileMerger.merge(
- input, manifestFile, 500, 3, 200, getPartitionType(), null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertEquivalentEntries(input, merged);
// the first one is not deleted, it should not be merged
@@ -89,9 +98,16 @@ public void testMergeFullCompactionWithoutDeleteFile() {
input.add(makeManifest(makeEntry(true, "F", null)));
input.add(makeManifest(makeEntry(true, "G", null)));
+ Options testOptions = new Options();
+ testOptions.set("manifest.target-file-size", threshold + "B");
+ testOptions.set("manifest.merge-min-count", "3");
+ testOptions.set("manifest.full-compaction-threshold-size", "200B");
List merged =
ManifestFileMerger.merge(
- input, manifestFile, threshold, 3, 200, getPartitionType(), null);
+ input,
+ manifestFile,
+ getPartitionType(),
+ CoreOptions.fromMap(testOptions.toMap()));
assertEquivalentEntries(
input.stream()
.filter(f -> !baseFiles.contains(f.fileName()))
diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index d518f79a20f5..15810f210e9f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -485,4 +485,66 @@ public void testFileFormatPerLevelAcceptsCompatibleSchema() {
validateTableSchema(
new TableSchema(1, fields, 10, emptyList(), singletonList("k"), options, ""));
}
+
+ @Test
+ void testManifestSortValidation() {
+ List fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.INT()));
+
+ // Test 1: manifest-sort.enabled on non-partition table should fail
+ Map options1 = new HashMap<>();
+ options1.put(CoreOptions.MANIFEST_SORT_ENABLED.key(), "true");
+ options1.put(BUCKET.key(), String.valueOf(-1));
+ assertThatThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ emptyList(),
+ emptyList(),
+ options1,
+ "")))
+ .hasMessageContaining(
+ "Cannot enable 'manifest-sort.enabled' for non-partition table.");
+
+ // Test 2: manifest-sort-partition-field not in partition keys should fail
+ Map options2 = new HashMap<>();
+ options2.put(CoreOptions.MANIFEST_SORT_ENABLED.key(), "true");
+ options2.put(CoreOptions.MANIFEST_SORT_PARTITION_FIELD.key(), "f1");
+ options2.put(BUCKET.key(), String.valueOf(-1));
+ assertThatThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ singletonList("f0"),
+ emptyList(),
+ options2,
+ "")))
+ .hasMessageContaining("is not a partition field");
+
+ // Test 3: valid manifest-sort config should pass
+ Map options3 = new HashMap<>();
+ options3.put(CoreOptions.MANIFEST_SORT_ENABLED.key(), "true");
+ options3.put(CoreOptions.MANIFEST_SORT_PARTITION_FIELD.key(), "f0");
+ options3.put(BUCKET.key(), String.valueOf(-1));
+ assertThatNoException()
+ .isThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ singletonList("f0"),
+ emptyList(),
+ options3,
+ "")));
+ }
}