diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java index 0459538d1cda..4d0516fa5ffa 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java @@ -147,6 +147,16 @@ public interface RewriteDataFiles */ String OUTPUT_SPEC_ID = "output-spec-id"; + /** + * If true, the rewrite also selects data files whose sort order id does not match the table's + * current default sort order id, even when they are already optimally sized. + * + *

Defaults to false. + */ + String REWRITE_STALE_SORT_ORDER = "rewrite-stale-sort-order"; + + boolean REWRITE_STALE_SORT_ORDER_DEFAULT = false; + /** * Choose BINPACK as a strategy for this rewrite operation * diff --git a/core/src/main/java/org/apache/iceberg/actions/BinPackRewriteFilePlanner.java b/core/src/main/java/org/apache/iceberg/actions/BinPackRewriteFilePlanner.java index ee768fcde460..fc8baddf2c77 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BinPackRewriteFilePlanner.java +++ b/core/src/main/java/org/apache/iceberg/actions/BinPackRewriteFilePlanner.java @@ -28,6 +28,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.RewriteJobOrder; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -101,6 +102,8 @@ public class BinPackRewriteFilePlanner private double deleteRatioThreshold; private RewriteJobOrder rewriteJobOrder; private Integer maxFilesToRewrite; + private boolean rewriteStaleSortOrder; + private int currentSortOrderId; public BinPackRewriteFilePlanner(Table table) { this(table, Expressions.alwaysTrue()); @@ -139,6 +142,7 @@ public Set validOptions() { .add(DELETE_RATIO_THRESHOLD) .add(RewriteDataFiles.REWRITE_JOB_ORDER) .add(MAX_FILES_TO_REWRITE) + .add(RewriteDataFiles.REWRITE_STALE_SORT_ORDER) .build(); } @@ -154,6 +158,12 @@ public void init(Map options) { RewriteDataFiles.REWRITE_JOB_ORDER, RewriteDataFiles.REWRITE_JOB_ORDER_DEFAULT)); this.maxFilesToRewrite = maxFilesToRewrite(options); + this.rewriteStaleSortOrder = + PropertyUtil.propertyAsBoolean( + options, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER_DEFAULT); + this.currentSortOrderId = table().sortOrder().orderId(); } private int deleteFileThreshold(Map options) { @@ -190,7 +200,10 @@ protected Iterable filterFiles(Iterable tasks) { return Iterables.filter( tasks, task -> - outsideDesiredFileSizeRange(task) || tooManyDeletes(task) || tooHighDeleteRatio(task)); + outsideDesiredFileSizeRange(task) + || tooManyDeletes(task) + || tooHighDeleteRatio(task) + || hasStaleSortOrder(task)); } @Override @@ -202,7 +215,8 @@ protected Iterable> filterFileGroups(List> || enoughContent(group) || tooMuchContent(group) || group.stream().anyMatch(this::tooManyDeletes) - || group.stream().anyMatch(this::tooHighDeleteRatio)); + || group.stream().anyMatch(this::tooHighDeleteRatio) + || group.stream().anyMatch(this::hasStaleSortOrder)); } @Override @@ -270,6 +284,19 @@ private boolean tooManyDeletes(FileScanTask task) { return task.deletes() != null && task.deletes().size() >= deleteFileThreshold; } + private boolean hasStaleSortOrder(FileScanTask task) { + if (!rewriteStaleSortOrder || currentSortOrderId == SortOrder.unsorted().orderId()) { + return false; + } + + if (task.file().specId() != table().spec().specId()) { + return false; + } + + Integer sortOrderId = task.file().sortOrderId(); + return sortOrderId == null || sortOrderId != currentSortOrderId; + } + private boolean tooHighDeleteRatio(FileScanTask task) { if (task.deletes() == null || task.deletes().isEmpty()) { return false; diff --git a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java index 7717d25ea4aa..f415581c3c28 100644 --- a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java +++ b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java @@ -56,9 +56,14 @@ public MockFileScanTask( } public static MockFileScanTask mockTask(long length, int sortOrderId) { + return mockTask(length, (Integer) sortOrderId, 0); + } + + public static MockFileScanTask mockTask(long length, Integer sortOrderId, int specId) { DataFile mockFile = Mockito.mock(DataFile.class); Mockito.when(mockFile.fileSizeInBytes()).thenReturn(length); Mockito.when(mockFile.sortOrderId()).thenReturn(sortOrderId); + Mockito.when(mockFile.specId()).thenReturn(specId); return new MockFileScanTask(mockFile); } diff --git a/core/src/test/java/org/apache/iceberg/actions/TestBinPackRewriteFilePlanner.java b/core/src/test/java/org/apache/iceberg/actions/TestBinPackRewriteFilePlanner.java index aa65140c0b89..e84c1a9b8a6b 100644 --- a/core/src/test/java/org/apache/iceberg/actions/TestBinPackRewriteFilePlanner.java +++ b/core/src/test/java/org/apache/iceberg/actions/TestBinPackRewriteFilePlanner.java @@ -292,7 +292,8 @@ void testValidOptions() { BinPackRewriteFilePlanner.DELETE_FILE_THRESHOLD, BinPackRewriteFilePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE)); + BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER)); } @Test @@ -581,6 +582,143 @@ public void testRewriteMaxFilesRewriteGreaterThanTotalFiles() { assertThat(fileScanTasks).isLessThanOrEqualTo(numFiles).isLessThanOrEqualTo(500); } + @Test + void testRewriteStaleSortOrderSelectsStaleFiles() { + int currentSortOrderId = setTableSortOrder(); + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init(rewriteStaleSortOrderOptions()); + + // both files are optimally sized, so only the stale sort order id makes one a rewrite target + FileScanTask sorted = MockFileScanTask.mockTask(450L, currentSortOrderId); + FileScanTask stale = MockFileScanTask.mockTask(450L, currentSortOrderId - 1); + + Iterable> groups = planner.planFileGroups(ImmutableList.of(sorted, stale)); + assertThat(groups).as("Only the file with a stale sort order must be rewritten").hasSize(1); + assertThat(Iterables.getOnlyElement(groups)).containsExactly(stale); + } + + @Test + void testRewriteStaleSortOrderSkipsSortedFiles() { + int currentSortOrderId = setTableSortOrder(); + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init(rewriteStaleSortOrderOptions()); + + FileScanTask sorted1 = MockFileScanTask.mockTask(450L, currentSortOrderId); + FileScanTask sorted2 = MockFileScanTask.mockTask(450L, currentSortOrderId); + + assertThat(planner.planFileGroups(ImmutableList.of(sorted1, sorted2))) + .as("Optimally sized files already sorted by the current sort order must be skipped") + .isEmpty(); + } + + @Test + void testRewriteStaleSortOrderSelectsFilesWithNullSortOrderId() { + setTableSortOrder(); + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init(rewriteStaleSortOrderOptions()); + + FileScanTask nullSortOrderId = MockFileScanTask.mockTask(450L, null, table.spec().specId()); + + Iterable> groups = planner.planFileGroups(ImmutableList.of(nullSortOrderId)); + assertThat(groups) + .as("A file with no sort order id must be treated as having a stale sort order") + .hasSize(1); + assertThat(Iterables.getOnlyElement(groups)).containsExactly(nullSortOrderId); + } + + @Test + void testRewriteStaleSortOrderIsNoOpForUnsortedTable() { + // table has no sort order set (orderId 0) + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init(rewriteStaleSortOrderOptions()); + + FileScanTask file1 = MockFileScanTask.mockTask(450L, 0); + FileScanTask file2 = MockFileScanTask.mockTask(450L, 0); + + assertThat(planner.planFileGroups(ImmutableList.of(file1, file2))) + .as("rewrite-stale-sort-order must be a no-op when the table has no sort order") + .isEmpty(); + } + + @Test + void testRewriteStaleSortOrderIgnoresFilesFromOtherSpec() { + int currentSortOrderId = setTableSortOrder(); + int otherSpecId = table.spec().specId() + 1; + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init(rewriteStaleSortOrderOptions()); + + // stale sort order id, but written under a non-current partition spec + FileScanTask otherSpec = MockFileScanTask.mockTask(450L, currentSortOrderId - 1, otherSpecId); + + assertThat(planner.planFileGroups(ImmutableList.of(otherSpec))) + .as( + "Files from a non-current partition spec must not be selected by rewrite-stale-sort-order") + .isEmpty(); + } + + @Test + void testRewriteStaleSortOrderDisabledByDefault() { + int currentSortOrderId = setTableSortOrder(); + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init( + ImmutableMap.of( + BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, "250", + BinPackRewriteFilePlanner.TARGET_FILE_SIZE_BYTES, "500", + BinPackRewriteFilePlanner.MAX_FILE_SIZE_BYTES, "750")); + + FileScanTask stale = MockFileScanTask.mockTask(450L, currentSortOrderId - 1); + + assertThat(planner.planFileGroups(ImmutableList.of(stale))) + .as("Stale files must not be rewritten unless rewrite-stale-sort-order is enabled") + .isEmpty(); + } + + @Test + void testRewriteStaleSortOrderWithRewriteAll() { + int currentSortOrderId = setTableSortOrder(); + + BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table); + planner.init( + ImmutableMap.builder() + .put(BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, "250") + .put(BinPackRewriteFilePlanner.TARGET_FILE_SIZE_BYTES, "500") + .put(BinPackRewriteFilePlanner.MAX_FILE_SIZE_BYTES, "750") + .put(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .put(BinPackRewriteFilePlanner.REWRITE_ALL, "true") + .build()); + + // both files are already sorted by the current order, but rewrite-all forces them to be + // rewritten + FileScanTask sorted1 = MockFileScanTask.mockTask(450L, currentSortOrderId); + FileScanTask sorted2 = MockFileScanTask.mockTask(450L, currentSortOrderId); + + Iterable> groups = + planner.planFileGroups(ImmutableList.of(sorted1, sorted2)); + assertThat(groups).hasSize(1); + assertThat(Iterables.getOnlyElement(groups)) + .as("rewrite-all must take precedence over rewrite-stale-sort-order") + .containsExactlyInAnyOrder(sorted1, sorted2); + } + + private int setTableSortOrder() { + table.replaceSortOrder().asc("data").commit(); + table.refresh(); + return table.sortOrder().orderId(); + } + + private static Map rewriteStaleSortOrderOptions() { + return ImmutableMap.of( + BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, "250", + BinPackRewriteFilePlanner.TARGET_FILE_SIZE_BYTES, "500", + BinPackRewriteFilePlanner.MAX_FILE_SIZE_BYTES, "750", + RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true"); + } + private void addFiles() { table .newAppend() diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index 8e594caa12d4..cb9691488e36 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -415,6 +415,7 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile | `output-spec-id` | current partition spec id | Identifier of the output partition spec. Data will be reorganized during the rewrite to align with the output partitioning. | | `remove-dangling-deletes` | false | Remove dangling position and equality deletes after rewriting. A delete file is considered dangling if it does not apply to any live data files. Enabling this will generate an additional commit for the removal. | | `max-files-to-rewrite` | null | This option sets an upper limit on the number of eligible files that will be rewritten. If this option is not specified, all eligible files will be rewritten. | +| `rewrite-stale-sort-order` | false | When true, also rewrite data files whose sort order id does not match the table's current default sort order, even if they are already optimally sized. | !!! info Dangling delete files are removed based solely on data sequence numbers. This action does not apply to global @@ -475,6 +476,11 @@ Rewrite the data files in table `db.sample` and select the files that may contai CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"'); ``` +Rewrite only the data files in table `db.sample` whose sort order id does not match the table's current sort order. +```sql +CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', options => map('rewrite-stale-sort-order', 'true')); +``` + ### `rewrite_manifests` Rewrite manifests for a table to optimize scan planning. diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java index f03f33a3fd81..1b3470206e45 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java @@ -209,6 +209,20 @@ public Builder maxFilesToRewrite(int maxFilesToRewrite) { return this; } + /** + * Configures whether to also rewrite data files whose sort order id does not match the table's + * current default sort order. See {@link + * org.apache.iceberg.actions.RewriteDataFiles#REWRITE_STALE_SORT_ORDER} for more details. + * + * @param rewriteStaleSortOrder enables rewriting files with a stale sort order + */ + public Builder rewriteStaleSortOrder(boolean rewriteStaleSortOrder) { + this.rewriteOptions.put( + org.apache.iceberg.actions.RewriteDataFiles.REWRITE_STALE_SORT_ORDER, + String.valueOf(rewriteStaleSortOrder)); + return this; + } + /** * A user provided filter for determining which files will be considered by the rewrite * strategy. diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index a2a693c56b21..0b7383ae396b 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -215,6 +215,45 @@ public void testRewriteDataFilesWithSortStrategy() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @TestTemplate + public void testRewriteDataFilesWithStaleSortOrder() { + createTable(); + // create 10 files under non-partitioned table + insertData(10); + List expectedRecords = currentData(); + + // the sort order is set after the data is written, so all 10 files have a stale sort order id + sql("ALTER TABLE %s WRITE ORDERED BY c1", tableName); + + // min-input-files = 12 would normally skip these 10 files; rewrite-stale-sort-order selects + // them anyway because their sort order id no longer matches the table's sort order + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 10 data files and add 1 data file", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + + // a second run is a no-op: every file now matches the table's current sort order + List secondOutput = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Second run should not rewrite any data files", + ImmutableList.of(row(0, 0, 0L, 0, 0)), + secondOutput); + } + @TestTemplate public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() { createTable(); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index d74d8a29f994..73ede3089de7 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1547,6 +1547,51 @@ public void testSimpleSort() throws IOException { dataFilesSortOrderShouldMatchTableSortOrder(table); } + @TestTemplate + public void testSortIncrementalWithStaleSortOrder() throws IOException { + Table table = createTable(20); + shouldHaveFiles(table, 20); + // the sort order is set after the files were written, so all 20 files have a stale sort order + table.replaceSortOrder().asc("c2").commit(); + table.refresh(); + + List originalData = currentData(); + + // the size thresholds make every file "optimally sized", so files are selected purely because + // their sort order id no longer matches the table's current sort order + RewriteDataFiles.Result firstResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(firstResult.rewrittenDataFilesCount()) + .as("All files with a stale sort order should be rewritten") + .isEqualTo(20); + + table.refresh(); + assertEquals("Rewrite must not change data", originalData, currentData()); + dataFilesSortOrderShouldMatchTableSortOrder(table); + + // a second run is a no-op: every file now matches the table's current sort order + RewriteDataFiles.Result secondResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(secondResult.rewriteResults()) + .as("Incremental sort compaction should converge to a no-op") + .isEmpty(); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + } + @TestTemplate public void testSortAfterPartitionChange() throws IOException { Table table = createTable(20); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java index 9fcf9f5ec51a..eacf82db6534 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() { SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD, SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE)); + SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER)); } @Test diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index ee727bd13900..a98b51c74327 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -265,6 +265,45 @@ public void testRewriteDataFilesWithSortStrategy() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @TestTemplate + public void testRewriteDataFilesWithStaleSortOrder() { + createTable(); + // create 10 files under non-partitioned table + insertData(10); + List expectedRecords = currentData(); + + // the sort order is set after the data is written, so all 10 files have a stale sort order id + sql("ALTER TABLE %s WRITE ORDERED BY c1", tableName); + + // min-input-files = 12 would normally skip these 10 files; rewrite-stale-sort-order selects + // them anyway because their sort order id no longer matches the table's sort order + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 10 data files and add 1 data file", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + + // a second run is a no-op: every file now matches the table's current sort order + List secondOutput = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Second run should not rewrite any data files", + ImmutableList.of(row(0, 0, 0L, 0, 0)), + secondOutput); + } + @TestTemplate public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() { createTable(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index d74d8a29f994..73ede3089de7 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1547,6 +1547,51 @@ public void testSimpleSort() throws IOException { dataFilesSortOrderShouldMatchTableSortOrder(table); } + @TestTemplate + public void testSortIncrementalWithStaleSortOrder() throws IOException { + Table table = createTable(20); + shouldHaveFiles(table, 20); + // the sort order is set after the files were written, so all 20 files have a stale sort order + table.replaceSortOrder().asc("c2").commit(); + table.refresh(); + + List originalData = currentData(); + + // the size thresholds make every file "optimally sized", so files are selected purely because + // their sort order id no longer matches the table's current sort order + RewriteDataFiles.Result firstResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(firstResult.rewrittenDataFilesCount()) + .as("All files with a stale sort order should be rewritten") + .isEqualTo(20); + + table.refresh(); + assertEquals("Rewrite must not change data", originalData, currentData()); + dataFilesSortOrderShouldMatchTableSortOrder(table); + + // a second run is a no-op: every file now matches the table's current sort order + RewriteDataFiles.Result secondResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(secondResult.rewriteResults()) + .as("Incremental sort compaction should converge to a no-op") + .isEmpty(); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + } + @TestTemplate public void testSortAfterPartitionChange() throws IOException { Table table = createTable(20); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java index 9fcf9f5ec51a..eacf82db6534 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() { SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD, SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE)); + SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER)); } @Test diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index 6b8244a331e7..8aa43f4deb82 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -264,6 +264,45 @@ public void testRewriteDataFilesWithSortStrategy() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @TestTemplate + public void testRewriteDataFilesWithStaleSortOrder() { + createTable(); + // create 10 files under non-partitioned table + insertData(10); + List expectedRecords = currentData(); + + // the sort order is set after the data is written, so all 10 files have a stale sort order id + sql("ALTER TABLE %s WRITE ORDERED BY c1", tableName); + + // min-input-files = 12 would normally skip these 10 files; rewrite-stale-sort-order selects + // them anyway because their sort order id no longer matches the table's sort order + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 10 data files and add 1 data file", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + + // a second run is a no-op: every file now matches the table's current sort order + List secondOutput = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Second run should not rewrite any data files", + ImmutableList.of(row(0, 0, 0L, 0, 0)), + secondOutput); + } + @TestTemplate public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() { createTable(); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 38ddefd26a45..268dbb306f31 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1548,6 +1548,51 @@ public void testSimpleSort() throws IOException { dataFilesSortOrderShouldMatchTableSortOrder(table); } + @TestTemplate + public void testSortIncrementalWithStaleSortOrder() throws IOException { + Table table = createTable(20); + shouldHaveFiles(table, 20); + // the sort order is set after the files were written, so all 20 files have a stale sort order + table.replaceSortOrder().asc("c2").commit(); + table.refresh(); + + List originalData = currentData(); + + // the size thresholds make every file "optimally sized", so files are selected purely because + // their sort order id no longer matches the table's current sort order + RewriteDataFiles.Result firstResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(firstResult.rewrittenDataFilesCount()) + .as("All files with a stale sort order should be rewritten") + .isEqualTo(20); + + table.refresh(); + assertEquals("Rewrite must not change data", originalData, currentData()); + dataFilesSortOrderShouldMatchTableSortOrder(table); + + // a second run is a no-op: every file now matches the table's current sort order + RewriteDataFiles.Result secondResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(secondResult.rewriteResults()) + .as("Incremental sort compaction should converge to a no-op") + .isEmpty(); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + } + @TestTemplate public void testSortAfterPartitionChange() throws IOException { Table table = createTable(20); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java index 9fcf9f5ec51a..eacf82db6534 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() { SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD, SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE)); + SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER)); } @Test diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index b37422beacf4..e3baf98d3c2e 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -264,6 +264,45 @@ public void testRewriteDataFilesWithSortStrategy() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @TestTemplate + public void testRewriteDataFilesWithStaleSortOrder() { + createTable(); + // create 10 files under non-partitioned table + insertData(10); + List expectedRecords = currentData(); + + // the sort order is set after the data is written, so all 10 files have a stale sort order id + sql("ALTER TABLE %s WRITE ORDERED BY c1", tableName); + + // min-input-files = 12 would normally skip these 10 files; rewrite-stale-sort-order selects + // them anyway because their sort order id no longer matches the table's sort order + List output = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Action should rewrite 10 data files and add 1 data file", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + + // a second run is a no-op: every file now matches the table's current sort order + List secondOutput = + sql( + "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', " + + "options => map('min-input-files','12','rewrite-stale-sort-order','true'))", + catalogName, tableIdent); + + assertEquals( + "Second run should not rewrite any data files", + ImmutableList.of(row(0, 0, 0L, 0, 0)), + secondOutput); + } + @TestTemplate public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() { createTable(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 110e43ede1f9..0713b819b1c8 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1548,6 +1548,51 @@ public void testSimpleSort() throws IOException { dataFilesSortOrderShouldMatchTableSortOrder(table); } + @TestTemplate + public void testSortIncrementalWithStaleSortOrder() throws IOException { + Table table = createTable(20); + shouldHaveFiles(table, 20); + // the sort order is set after the files were written, so all 20 files have a stale sort order + table.replaceSortOrder().asc("c2").commit(); + table.refresh(); + + List originalData = currentData(); + + // the size thresholds make every file "optimally sized", so files are selected purely because + // their sort order id no longer matches the table's current sort order + RewriteDataFiles.Result firstResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(firstResult.rewrittenDataFilesCount()) + .as("All files with a stale sort order should be rewritten") + .isEqualTo(20); + + table.refresh(); + assertEquals("Rewrite must not change data", originalData, currentData()); + dataFilesSortOrderShouldMatchTableSortOrder(table); + + // a second run is a no-op: every file now matches the table's current sort order + RewriteDataFiles.Result secondResult = + basicRewrite(table) + .sort() + .option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0") + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1)) + .option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true") + .execute(); + + assertThat(secondResult.rewriteResults()) + .as("Incremental sort compaction should converge to a no-op") + .isEmpty(); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + } + @TestTemplate public void testSortAfterPartitionChange() throws IOException { Table table = createTable(20); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java index 9fcf9f5ec51a..eacf82db6534 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkShufflingDataRewritePlanner.java @@ -75,7 +75,8 @@ void testSparkShufflingDataRewritePlannerValidOptions() { SparkShufflingDataRewritePlanner.DELETE_FILE_THRESHOLD, SparkShufflingDataRewritePlanner.DELETE_RATIO_THRESHOLD, RewriteDataFiles.REWRITE_JOB_ORDER, - SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE)); + SparkShufflingDataRewritePlanner.MAX_FILES_TO_REWRITE, + RewriteDataFiles.REWRITE_STALE_SORT_ORDER)); } @Test