From 0e87d72802a01ae7d1b4716f4c987dbc1ad4035f Mon Sep 17 00:00:00 2001 From: Talat Uyarer Date: Thu, 14 May 2026 00:02:47 -0700 Subject: [PATCH 1/2] Add rewrite-unclustered option to RewriteDataFiles planner --- .../iceberg/actions/RewriteDataFiles.java | 16 ++ .../actions/BinPackRewriteFilePlanner.java | 31 +++- .../org/apache/iceberg/MockFileScanTask.java | 5 + .../TestBinPackRewriteFilePlanner.java | 140 +++++++++++++++++- docs/docs/spark-procedures.md | 8 + .../TestRewriteDataFilesProcedure.java | 39 +++++ .../actions/TestRewriteDataFilesAction.java | 45 ++++++ 7 files changed, 281 insertions(+), 3 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java index 0459538d1cda..7fd31ec3105a 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java @@ -147,6 +147,22 @@ public interface RewriteDataFiles */ String OUTPUT_SPEC_ID = "output-spec-id"; + /** + * If true, the rewrite also selects data files whose sort order id does not match the table's + * current default sort order id, even when they are already optimally sized. This enables + * incremental sort compaction: a sort-based rewrite reorganizes only the files that are not + * already sorted by the current sort order, instead of re-sorting every selected file. + * + *

This option has no effect on tables without a sort order, and is intended to be used with + * the {@link #sort()} strategy so that rewritten files are marked with the table's current sort + * order id. + * + *

Defaults to false. + */ + String REWRITE_STALE_SORT_ORDER = "rewrite-stale-sort-order"; + + boolean REWRITE_STALE_SORT_ORDER_DEFAULT = false; + /** * Choose BINPACK as a strategy for this rewrite operation * diff --git a/core/src/main/java/org/apache/iceberg/actions/BinPackRewriteFilePlanner.java b/core/src/main/java/org/apache/iceberg/actions/BinPackRewriteFilePlanner.java index ee768fcde460..fc8baddf2c77 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BinPackRewriteFilePlanner.java +++ b/core/src/main/java/org/apache/iceberg/actions/BinPackRewriteFilePlanner.java @@ -28,6 +28,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.RewriteJobOrder; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -101,6 +102,8 @@ public class BinPackRewriteFilePlanner private double deleteRatioThreshold; private RewriteJobOrder rewriteJobOrder; private Integer maxFilesToRewrite; + private boolean rewriteStaleSortOrder; + private int currentSortOrderId; public BinPackRewriteFilePlanner(Table table) { this(table, Expressions.alwaysTrue()); @@ -139,6 +142,7 @@ public Set validOptions() { .add(DELETE_RATIO_THRESHOLD) .add(RewriteDataFiles.REWRITE_JOB_ORDER) .add(MAX_FILES_TO_REWRITE) + .add(RewriteDataFiles.REWRITE_STALE_SORT_ORDER) .build(); } @@ -154,6 +158,12 @@ public void init(Map options) { RewriteDataFiles.REWRITE_JOB_ORDER, RewriteDataFiles.REWRITE_JOB_ORDER_DEFAULT)); this.maxFilesToRewrite = maxFilesToRewrite(options); + this.rewriteStaleSortOrder = + PropertyUtil.propertyAsBoolean( + options, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER_DEFAULT); + this.currentSortOrderId = table().sortOrder().orderId(); } private int deleteFileThreshold(Map options) { @@ -190,7 +200,10 @@ protected Iterable filterFiles(Iterable tasks) { return Iterables.filter( tasks, task -> - outsideDesiredFileSizeRange(task) || tooManyDeletes(task) || tooHighDeleteRatio(task)); + outsideDesiredFileSizeRange(task) + || tooManyDeletes(task) + || tooHighDeleteRatio(task) + || hasStaleSortOrder(task)); } @Override @@ -202,7 +215,8 @@ protected Iterable> filterFileGroups(List> || enoughContent(group) || tooMuchContent(group) || group.stream().anyMatch(this::tooManyDeletes) - || group.stream().anyMatch(this::tooHighDeleteRatio)); + || group.stream().anyMatch(this::tooHighDeleteRatio) + || group.stream().anyMatch(this::hasStaleSortOrder)); } @Override @@ -270,6 +284,19 @@ private boolean tooManyDeletes(FileScanTask task) { return task.deletes() != null && task.deletes().size() >= deleteFileThreshold; } + private boolean hasStaleSortOrder(FileScanTask task) { + if (!rewriteStaleSortOrder || currentSortOrderId == SortOrder.unsorted().orderId()) { + return false; + } + + if (task.file().specId() != table().spec().specId()) { + return false; + } + + Integer sortOrderId = task.file().sortOrderId(); + return sortOrderId == null || sortOrderId != currentSortOrderId; + } + private boolean tooHighDeleteRatio(FileScanTask task) { if (task.deletes() == null || task.deletes().isEmpty()) { return false; diff --git a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java index 7717d25ea4aa..f415581c3c28 100644 --- a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java +++ b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java @@ -56,9 +56,14 @@ public MockFileScanTask( } public static MockFileScanTask mockTask(long length, int sortOrderId) { + return mockTask(length, (Integer) sortOrderId, 0); + } + + public static MockFileScanTask mockTask(long length, Integer sortOrderId, int specId) { DataFile mockFile = Mockito.mock(DataFile.class); Mockito.when(mockFile.fileSizeInBytes()).thenReturn(length); Mockito.when(mockFile.sortOrderId()).thenReturn(sortOrderId); + Mockito.when(mockFile.specId()).thenReturn(specId); return new MockFileScanTask(mockFile); } diff --git a/core/src/test/java/org/apache/iceberg/actions/TestBinPackRewriteFilePlanner.java b/core/src/test/java/org/apache/iceberg/actions/TestBinPackRewriteFilePlanner.java index aa65140c0b89..e84c1a9b8a6b 100644 --- a/core/src/test/java/org/apache/iceberg/actions/TestBinPackRewriteFilePlanner.java +++ b/core/src/test/java/org/apache/iceberg/actions/TestBinPackRewriteFilePlanner.java @@ -292,7 +292,8 @@ void testValidOptions() { BinPackRewriteFilePlanner.DELETE_FILE_THRESHOLD, BinPackRewriteFilePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE)); + BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER)); } @Test @@ -581,6 +582,143 @@ public void testRewriteMaxFilesRewriteGreaterThanTotalFiles() { assertThat(fileScanTasks).isLessThanOrEqualTo(numFiles).isLessThanOrEqualTo(500); } + @Test + void testRewriteStaleSortOrderSelectsStaleFiles() { + int currentSortOrderId = setTableSortOrder(); + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init(rewriteStaleSortOrderOptions()); + + // both files are optimally sized, so only the stale sort order id makes one a rewrite target + FileScanTask sorted = MockFileScanTask.mockTask(450L, currentSortOrderId); + FileScanTask stale = MockFileScanTask.mockTask(450L, currentSortOrderId - 1); + + Iterable> groups = planner.planFileGroups(ImmutableList.of(sorted, stale)); + assertThat(groups).as("Only the file with a stale sort order must be rewritten").hasSize(1); + assertThat(Iterables.getOnlyElement(groups)).containsExactly(stale); + } + + @Test + void testRewriteStaleSortOrderSkipsSortedFiles() { + int currentSortOrderId = setTableSortOrder(); + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init(rewriteStaleSortOrderOptions()); + + FileScanTask sorted1 = MockFileScanTask.mockTask(450L, currentSortOrderId); + FileScanTask sorted2 = MockFileScanTask.mockTask(450L, currentSortOrderId); + + assertThat(planner.planFileGroups(ImmutableList.of(sorted1, sorted2))) + .as("Optimally sized files already sorted by the current sort order must be skipped") + .isEmpty(); + } + + @Test + void testRewriteStaleSortOrderSelectsFilesWithNullSortOrderId() { + setTableSortOrder(); + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init(rewriteStaleSortOrderOptions()); + + FileScanTask nullSortOrderId = MockFileScanTask.mockTask(450L, null, table.spec().specId()); + + Iterable> groups = planner.planFileGroups(ImmutableList.of(nullSortOrderId)); + assertThat(groups) + .as("A file with no sort order id must be treated as having a stale sort order") + .hasSize(1); + assertThat(Iterables.getOnlyElement(groups)).containsExactly(nullSortOrderId); + } + + @Test + void testRewriteStaleSortOrderIsNoOpForUnsortedTable() { + // table has no sort order set (orderId 0) + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init(rewriteStaleSortOrderOptions()); + + FileScanTask file1 = MockFileScanTask.mockTask(450L, 0); + FileScanTask file2 = MockFileScanTask.mockTask(450L, 0); + + assertThat(planner.planFileGroups(ImmutableList.of(file1, file2))) + .as("rewrite-stale-sort-order must be a no-op when the table has no sort order") + .isEmpty(); + } + + @Test + void testRewriteStaleSortOrderIgnoresFilesFromOtherSpec() { + int currentSortOrderId = setTableSortOrder(); + int otherSpecId = table.spec().specId() + 1; + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init(rewriteStaleSortOrderOptions()); + + // stale sort order id, but written under a non-current partition spec + FileScanTask otherSpec = MockFileScanTask.mockTask(450L, currentSortOrderId - 1, otherSpecId); + + assertThat(planner.planFileGroups(ImmutableList.of(otherSpec))) + .as( + "Files from a non-current partition spec must not be selected by rewrite-stale-sort-order") + .isEmpty(); + } + + @Test + void testRewriteStaleSortOrderDisabledByDefault() { + int currentSortOrderId = setTableSortOrder(); + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init( + ImmutableMap.of( + BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, "250", + BinPackRewriteFilePlanner.TARGET_FILE_SIZE_BYTES, "500", + BinPackRewriteFilePlanner.MAX_FILE_SIZE_BYTES, "750")); + + FileScanTask stale = MockFileScanTask.mockTask(450L, currentSortOrderId - 1); + + assertThat(planner.planFileGroups(ImmutableList.of(stale))) + .as("Stale files must not be rewritten unless rewrite-stale-sort-order is enabled") + .isEmpty(); + } + + @Test + void testRewriteStaleSortOrderWithRewriteAll() { + int currentSortOrderId = setTableSortOrder(); + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init( + ImmutableMap.builder() + .put(BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, "250") + .put(BinPackRewriteFilePlanner.TARGET_FILE_SIZE_BYTES, "500") + .put(BinPackRewriteFilePlanner.MAX_FILE_SIZE_BYTES, "750") + .put(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .put(BinPackRewriteFilePlanner.REWRITE_ALL, "true") + .build()); + + // both files are already sorted by the current order, but rewrite-all forces them to be + // rewritten + FileScanTask sorted1 = MockFileScanTask.mockTask(450L, currentSortOrderId); + FileScanTask sorted2 = MockFileScanTask.mockTask(450L, currentSortOrderId); + + Iterable> groups = + planner.planFileGroups(ImmutableList.of(sorted1, sorted2)); + assertThat(groups).hasSize(1); + assertThat(Iterables.getOnlyElement(groups)) + .as("rewrite-all must take precedence over rewrite-stale-sort-order") + .containsExactlyInAnyOrder(sorted1, sorted2); + } + + private int setTableSortOrder() { + table.replaceSortOrder().asc("data").commit(); + table.refresh(); + return table.sortOrder().orderId(); + } + + private static Map rewriteStaleSortOrderOptions() { + return ImmutableMap.of( + BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, "250", + BinPackRewriteFilePlanner.TARGET_FILE_SIZE_BYTES, "500", + BinPackRewriteFilePlanner.MAX_FILE_SIZE_BYTES, "750", + RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true"); + } + private void addFiles() { table .newAppend() diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index 8e594caa12d4..a0e6c4064871 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -415,6 +415,7 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile | `output-spec-id` | current partition spec id | Identifier of the output partition spec. Data will be reorganized during the rewrite to align with the output partitioning. | | `remove-dangling-deletes` | false | Remove dangling position and equality deletes after rewriting. A delete file is considered dangling if it does not apply to any live data files. Enabling this will generate an additional commit for the removal. | | `max-files-to-rewrite` | null | This option sets an upper limit on the number of eligible files that will be rewritten. If this option is not specified, all eligible files will be rewritten. | +| `rewrite-stale-sort-order` | false | When true, also rewrite data files whose sort order id does not match the table's current default sort order, even if they are already optimally sized. This enables incremental sort compaction, where a `sort` rewrite only reorganizes files that are not already sorted by the current sort order. It has no effect on tables without a sort order, and is intended for use with the `sort` strategy so that rewritten files are marked with the table's current sort order id. It is not supported with a `zorder` sort order. | !!! info Dangling delete files are removed based solely on data sequence numbers. This action does not apply to global @@ -475,6 +476,13 @@ Rewrite the data files in table `db.sample` and select the files that may contai CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"'); ``` +Incrementally sort the data files in table `db.sample`: rewrite only the files that are not already sorted +by the table's current sort order, leaving already-sorted files untouched. This is useful for maintaining +sort order on a table over time, or after changing the table's sort order with `ALTER TABLE ... WRITE ORDERED BY`. +```sql +CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', options => map('rewrite-stale-sort-order', 'true')); +``` + ### `rewrite_manifests` Rewrite manifests for a table to optimize scan planning. diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index b37422beacf4..e3baf98d3c2e 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -264,6 +264,45 @@ public void testRewriteDataFilesWithSortStrategy() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @TestTemplate + public void testRewriteDataFilesWithStaleSortOrder() { + createTable(); + // create 10 files under non-partitioned table + insertData(10); + List expectedRecords = currentData(); + + // the sort order is set after the data is written, so all 10 files have a stale sort order id + sql("ALTER TABLE %s WRITE ORDERED BY c1", tableName); + + // min-input-files = 12 would normally skip these 10 files; rewrite-stale-sort-order selects + // them anyway because their sort order id no longer matches the table's sort order + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 10 data files and add 1 data file", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + + // a second run is a no-op: every file now matches the table's current sort order + List secondOutput = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Second run should not rewrite any data files", + ImmutableList.of(row(0, 0, 0L, 0, 0)), + secondOutput); + } + @TestTemplate public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() { createTable(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 110e43ede1f9..0713b819b1c8 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1548,6 +1548,51 @@ public void testSimpleSort() throws IOException { dataFilesSortOrderShouldMatchTableSortOrder(table); } + @TestTemplate + public void testSortIncrementalWithStaleSortOrder() throws IOException { + Table table = createTable(20); + shouldHaveFiles(table, 20); + // the sort order is set after the files were written, so all 20 files have a stale sort order + table.replaceSortOrder().asc("c2").commit(); + table.refresh(); + + List originalData = currentData(); + + // the size thresholds make every file "optimally sized", so files are selected purely because + // their sort order id no longer matches the table's current sort order + RewriteDataFiles.Result firstResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(firstResult.rewrittenDataFilesCount()) + .as("All files with a stale sort order should be rewritten") + .isEqualTo(20); + + table.refresh(); + assertEquals("Rewrite must not change data", originalData, currentData()); + dataFilesSortOrderShouldMatchTableSortOrder(table); + + // a second run is a no-op: every file now matches the table's current sort order + RewriteDataFiles.Result secondResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(secondResult.rewriteResults()) + .as("Incremental sort compaction should converge to a no-op") + .isEmpty(); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + } + @TestTemplate public void testSortAfterPartitionChange() throws IOException { Table table = createTable(20); From 813b200d2af377f9575634988e4d10cf8f1eb271 Mon Sep 17 00:00:00 2001 From: Talat Uyarer Date: Thu, 14 May 2026 11:22:08 -0700 Subject: [PATCH 2/2] Addressed Russell's comment --- .../iceberg/actions/RewriteDataFiles.java | 8 +--- docs/docs/spark-procedures.md | 6 +-- .../maintenance/api/RewriteDataFiles.java | 14 ++++++ .../TestRewriteDataFilesProcedure.java | 39 ++++++++++++++++ .../actions/TestRewriteDataFilesAction.java | 45 +++++++++++++++++++ .../TestSparkShufflingDataRewritePlanner.java | 3 +- .../TestRewriteDataFilesProcedure.java | 39 ++++++++++++++++ .../actions/TestRewriteDataFilesAction.java | 45 +++++++++++++++++++ .../TestSparkShufflingDataRewritePlanner.java | 3 +- .../TestRewriteDataFilesProcedure.java | 39 ++++++++++++++++ .../actions/TestRewriteDataFilesAction.java | 45 +++++++++++++++++++ .../TestSparkShufflingDataRewritePlanner.java | 3 +- .../TestSparkShufflingDataRewritePlanner.java | 3 +- 13 files changed, 277 insertions(+), 15 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java index 7fd31ec3105a..4d0516fa5ffa 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java @@ -149,13 +149,7 @@ public interface RewriteDataFiles /** * If true, the rewrite also selects data files whose sort order id does not match the table's - * current default sort order id, even when they are already optimally sized. This enables - * incremental sort compaction: a sort-based rewrite reorganizes only the files that are not - * already sorted by the current sort order, instead of re-sorting every selected file. - * - *

This option has no effect on tables without a sort order, and is intended to be used with - * the {@link #sort()} strategy so that rewritten files are marked with the table's current sort - * order id. + * current default sort order id, even when they are already optimally sized. * *

Defaults to false. */ diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index a0e6c4064871..cb9691488e36 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -415,7 +415,7 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile | `output-spec-id` | current partition spec id | Identifier of the output partition spec. Data will be reorganized during the rewrite to align with the output partitioning. | | `remove-dangling-deletes` | false | Remove dangling position and equality deletes after rewriting. A delete file is considered dangling if it does not apply to any live data files. Enabling this will generate an additional commit for the removal. | | `max-files-to-rewrite` | null | This option sets an upper limit on the number of eligible files that will be rewritten. If this option is not specified, all eligible files will be rewritten. | -| `rewrite-stale-sort-order` | false | When true, also rewrite data files whose sort order id does not match the table's current default sort order, even if they are already optimally sized. This enables incremental sort compaction, where a `sort` rewrite only reorganizes files that are not already sorted by the current sort order. It has no effect on tables without a sort order, and is intended for use with the `sort` strategy so that rewritten files are marked with the table's current sort order id. It is not supported with a `zorder` sort order. | +| `rewrite-stale-sort-order` | false | When true, also rewrite data files whose sort order id does not match the table's current default sort order, even if they are already optimally sized. | !!! info Dangling delete files are removed based solely on data sequence numbers. This action does not apply to global @@ -476,9 +476,7 @@ Rewrite the data files in table `db.sample` and select the files that may contai CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"'); ``` -Incrementally sort the data files in table `db.sample`: rewrite only the files that are not already sorted -by the table's current sort order, leaving already-sorted files untouched. This is useful for maintaining -sort order on a table over time, or after changing the table's sort order with `ALTER TABLE ... WRITE ORDERED BY`. +Rewrite only the data files in table `db.sample` whose sort order id does not match the table's current sort order. ```sql CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', options => map('rewrite-stale-sort-order', 'true')); ``` diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java index f03f33a3fd81..1b3470206e45 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java @@ -209,6 +209,20 @@ public Builder maxFilesToRewrite(int maxFilesToRewrite) { return this; } + /** + * Configures whether to also rewrite data files whose sort order id does not match the table's + * current default sort order. See {@link + * org.apache.iceberg.actions.RewriteDataFiles#REWRITE_STALE_SORT_ORDER} for more details. + * + * @param rewriteStaleSortOrder enables rewriting files with a stale sort order + */ + public Builder rewriteStaleSortOrder(boolean rewriteStaleSortOrder) { + this.rewriteOptions.put( + org.apache.iceberg.actions.RewriteDataFiles.REWRITE_STALE_SORT_ORDER, + String.valueOf(rewriteStaleSortOrder)); + return this; + } + /** * A user provided filter for determining which files will be considered by the rewrite * strategy. diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index a2a693c56b21..0b7383ae396b 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -215,6 +215,45 @@ public void testRewriteDataFilesWithSortStrategy() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @TestTemplate + public void testRewriteDataFilesWithStaleSortOrder() { + createTable(); + // create 10 files under non-partitioned table + insertData(10); + List expectedRecords = currentData(); + + // the sort order is set after the data is written, so all 10 files have a stale sort order id + sql("ALTER TABLE %s WRITE ORDERED BY c1", tableName); + + // min-input-files = 12 would normally skip these 10 files; rewrite-stale-sort-order selects + // them anyway because their sort order id no longer matches the table's sort order + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 10 data files and add 1 data file", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + + // a second run is a no-op: every file now matches the table's current sort order + List secondOutput = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Second run should not rewrite any data files", + ImmutableList.of(row(0, 0, 0L, 0, 0)), + secondOutput); + } + @TestTemplate public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() { createTable(); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index d74d8a29f994..73ede3089de7 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1547,6 +1547,51 @@ public void testSimpleSort() throws IOException { dataFilesSortOrderShouldMatchTableSortOrder(table); } + @TestTemplate + public void testSortIncrementalWithStaleSortOrder() throws IOException { + Table table = createTable(20); + shouldHaveFiles(table, 20); + // the sort order is set after the files were written, so all 20 files have a stale sort order + table.replaceSortOrder().asc("c2").commit(); + table.refresh(); + + List originalData = currentData(); + + // the size thresholds make every file "optimally sized", so files are selected purely because + // their sort order id no longer matches the table's current sort order + RewriteDataFiles.Result firstResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(firstResult.rewrittenDataFilesCount()) + .as("All files with a stale sort order should be rewritten") + .isEqualTo(20); + + table.refresh(); + assertEquals("Rewrite must not change data", originalData, currentData()); + dataFilesSortOrderShouldMatchTableSortOrder(table); + + // a second run is a no-op: every file now matches the table's current sort order + RewriteDataFiles.Result secondResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(secondResult.rewriteResults()) + .as("Incremental sort compaction should converge to a no-op") + .isEmpty(); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + } + @TestTemplate public void testSortAfterPartitionChange() throws IOException { Table table = createTable(20); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java index 9fcf9f5ec51a..eacf82db6534 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() { SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD, SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE)); + SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER)); } @Test diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index ee727bd13900..a98b51c74327 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -265,6 +265,45 @@ public void testRewriteDataFilesWithSortStrategy() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @TestTemplate + public void testRewriteDataFilesWithStaleSortOrder() { + createTable(); + // create 10 files under non-partitioned table + insertData(10); + List expectedRecords = currentData(); + + // the sort order is set after the data is written, so all 10 files have a stale sort order id + sql("ALTER TABLE %s WRITE ORDERED BY c1", tableName); + + // min-input-files = 12 would normally skip these 10 files; rewrite-stale-sort-order selects + // them anyway because their sort order id no longer matches the table's sort order + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 10 data files and add 1 data file", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + + // a second run is a no-op: every file now matches the table's current sort order + List secondOutput = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Second run should not rewrite any data files", + ImmutableList.of(row(0, 0, 0L, 0, 0)), + secondOutput); + } + @TestTemplate public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() { createTable(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index d74d8a29f994..73ede3089de7 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1547,6 +1547,51 @@ public void testSimpleSort() throws IOException { dataFilesSortOrderShouldMatchTableSortOrder(table); } + @TestTemplate + public void testSortIncrementalWithStaleSortOrder() throws IOException { + Table table = createTable(20); + shouldHaveFiles(table, 20); + // the sort order is set after the files were written, so all 20 files have a stale sort order + table.replaceSortOrder().asc("c2").commit(); + table.refresh(); + + List originalData = currentData(); + + // the size thresholds make every file "optimally sized", so files are selected purely because + // their sort order id no longer matches the table's current sort order + RewriteDataFiles.Result firstResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(firstResult.rewrittenDataFilesCount()) + .as("All files with a stale sort order should be rewritten") + .isEqualTo(20); + + table.refresh(); + assertEquals("Rewrite must not change data", originalData, currentData()); + dataFilesSortOrderShouldMatchTableSortOrder(table); + + // a second run is a no-op: every file now matches the table's current sort order + RewriteDataFiles.Result secondResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(secondResult.rewriteResults()) + .as("Incremental sort compaction should converge to a no-op") + .isEmpty(); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + } + @TestTemplate public void testSortAfterPartitionChange() throws IOException { Table table = createTable(20); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java index 9fcf9f5ec51a..eacf82db6534 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() { SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD, SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE)); + SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER)); } @Test diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index 6b8244a331e7..8aa43f4deb82 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -264,6 +264,45 @@ public void testRewriteDataFilesWithSortStrategy() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @TestTemplate + public void testRewriteDataFilesWithStaleSortOrder() { + createTable(); + // create 10 files under non-partitioned table + insertData(10); + List expectedRecords = currentData(); + + // the sort order is set after the data is written, so all 10 files have a stale sort order id + sql("ALTER TABLE %s WRITE ORDERED BY c1", tableName); + + // min-input-files = 12 would normally skip these 10 files; rewrite-stale-sort-order selects + // them anyway because their sort order id no longer matches the table's sort order + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 10 data files and add 1 data file", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + + // a second run is a no-op: every file now matches the table's current sort order + List secondOutput = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Second run should not rewrite any data files", + ImmutableList.of(row(0, 0, 0L, 0, 0)), + secondOutput); + } + @TestTemplate public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() { createTable(); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 38ddefd26a45..268dbb306f31 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1548,6 +1548,51 @@ public void testSimpleSort() throws IOException { dataFilesSortOrderShouldMatchTableSortOrder(table); } + @TestTemplate + public void testSortIncrementalWithStaleSortOrder() throws IOException { + Table table = createTable(20); + shouldHaveFiles(table, 20); + // the sort order is set after the files were written, so all 20 files have a stale sort order + table.replaceSortOrder().asc("c2").commit(); + table.refresh(); + + List originalData = currentData(); + + // the size thresholds make every file "optimally sized", so files are selected purely because + // their sort order id no longer matches the table's current sort order + RewriteDataFiles.Result firstResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(firstResult.rewrittenDataFilesCount()) + .as("All files with a stale sort order should be rewritten") + .isEqualTo(20); + + table.refresh(); + assertEquals("Rewrite must not change data", originalData, currentData()); + dataFilesSortOrderShouldMatchTableSortOrder(table); + + // a second run is a no-op: every file now matches the table's current sort order + RewriteDataFiles.Result secondResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(secondResult.rewriteResults()) + .as("Incremental sort compaction should converge to a no-op") + .isEmpty(); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + } + @TestTemplate public void testSortAfterPartitionChange() throws IOException { Table table = createTable(20); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java index 9fcf9f5ec51a..eacf82db6534 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() { SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD, SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE)); + SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER)); } @Test diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java index 9fcf9f5ec51a..eacf82db6534 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() { SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD, SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE)); + SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER)); } @Test