Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,22 @@ public interface RewriteDataFiles
*/
String OUTPUT_SPEC_ID = "output-spec-id";

/**
* If true, the rewrite also selects data files whose sort order id does not match the table's
* current default sort order id, even when they are already optimally sized. This enables
Comment thread
talatuyarer marked this conversation as resolved.
Outdated
* incremental sort compaction: a sort-based rewrite reorganizes only the files that are not
* already sorted by the current sort order, instead of re-sorting every selected file.
*
* <p>This option has no effect on tables without a sort order, and is intended to be used with
* the {@link #sort()} strategy so that rewritten files are marked with the table's current sort
* order id.
*
* <p>Defaults to false.
*/
String REWRITE_STALE_SORT_ORDER = "rewrite-stale-sort-order";

boolean REWRITE_STALE_SORT_ORDER_DEFAULT = false;

/**
* Choose BINPACK as a strategy for this rewrite operation
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -101,6 +102,8 @@ public class BinPackRewriteFilePlanner
private double deleteRatioThreshold;
private RewriteJobOrder rewriteJobOrder;
private Integer maxFilesToRewrite;
private boolean rewriteStaleSortOrder;
private int currentSortOrderId;

public BinPackRewriteFilePlanner(Table table) {
this(table, Expressions.alwaysTrue());
Expand Down Expand Up @@ -139,6 +142,7 @@ public Set<String> validOptions() {
.add(DELETE_RATIO_THRESHOLD)
.add(RewriteDataFiles.REWRITE_JOB_ORDER)
.add(MAX_FILES_TO_REWRITE)
.add(RewriteDataFiles.REWRITE_STALE_SORT_ORDER)
.build();
}

Expand All @@ -154,6 +158,12 @@ public void init(Map<String, String> options) {
RewriteDataFiles.REWRITE_JOB_ORDER,
RewriteDataFiles.REWRITE_JOB_ORDER_DEFAULT));
this.maxFilesToRewrite = maxFilesToRewrite(options);
this.rewriteStaleSortOrder =
PropertyUtil.propertyAsBoolean(
options,
RewriteDataFiles.REWRITE_STALE_SORT_ORDER,
RewriteDataFiles.REWRITE_STALE_SORT_ORDER_DEFAULT);
this.currentSortOrderId = table().sortOrder().orderId();
}

private int deleteFileThreshold(Map<String, String> options) {
Expand Down Expand Up @@ -190,7 +200,10 @@ protected Iterable<FileScanTask> filterFiles(Iterable<FileScanTask> tasks) {
return Iterables.filter(
tasks,
task ->
outsideDesiredFileSizeRange(task) || tooManyDeletes(task) || tooHighDeleteRatio(task));
outsideDesiredFileSizeRange(task)
|| tooManyDeletes(task)
|| tooHighDeleteRatio(task)
|| hasStaleSortOrder(task));
}

@Override
Expand All @@ -202,7 +215,8 @@ protected Iterable<List<FileScanTask>> filterFileGroups(List<List<FileScanTask>>
|| enoughContent(group)
|| tooMuchContent(group)
|| group.stream().anyMatch(this::tooManyDeletes)
|| group.stream().anyMatch(this::tooHighDeleteRatio));
|| group.stream().anyMatch(this::tooHighDeleteRatio)
|| group.stream().anyMatch(this::hasStaleSortOrder));
}

@Override
Expand Down Expand Up @@ -270,6 +284,19 @@ private boolean tooManyDeletes(FileScanTask task) {
return task.deletes() != null && task.deletes().size() >= deleteFileThreshold;
}

private boolean hasStaleSortOrder(FileScanTask task) {
if (!rewriteStaleSortOrder || currentSortOrderId == SortOrder.unsorted().orderId()) {
return false;
}

if (task.file().specId() != table().spec().specId()) {
return false;
}

Integer sortOrderId = task.file().sortOrderId();
return sortOrderId == null || sortOrderId != currentSortOrderId;
}

private boolean tooHighDeleteRatio(FileScanTask task) {
if (task.deletes() == null || task.deletes().isEmpty()) {
return false;
Expand Down
5 changes: 5 additions & 0 deletions core/src/test/java/org/apache/iceberg/MockFileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@ public MockFileScanTask(
}

public static MockFileScanTask mockTask(long length, int sortOrderId) {
return mockTask(length, (Integer) sortOrderId, 0);
}

public static MockFileScanTask mockTask(long length, Integer sortOrderId, int specId) {
DataFile mockFile = Mockito.mock(DataFile.class);
Mockito.when(mockFile.fileSizeInBytes()).thenReturn(length);
Mockito.when(mockFile.sortOrderId()).thenReturn(sortOrderId);
Mockito.when(mockFile.specId()).thenReturn(specId);
return new MockFileScanTask(mockFile);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ void testValidOptions() {
BinPackRewriteFilePlanner.DELETE_FILE_THRESHOLD,
BinPackRewriteFilePlanner.DELETE_RATIO_THRESHOLD,
RewriteDataFiles.REWRITE_JOB_ORDER,
BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE));
BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE,
RewriteDataFiles.REWRITE_STALE_SORT_ORDER));
}

@Test
Expand Down Expand Up @@ -581,6 +582,143 @@ public void testRewriteMaxFilesRewriteGreaterThanTotalFiles() {
assertThat(fileScanTasks).isLessThanOrEqualTo(numFiles).isLessThanOrEqualTo(500);
}

@Test
void testRewriteStaleSortOrderSelectsStaleFiles() {
int currentSortOrderId = setTableSortOrder();

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(rewriteStaleSortOrderOptions());

// both files are optimally sized, so only the stale sort order id makes one a rewrite target
FileScanTask sorted = MockFileScanTask.mockTask(450L, currentSortOrderId);
FileScanTask stale = MockFileScanTask.mockTask(450L, currentSortOrderId - 1);

Iterable<List<FileScanTask>> groups = planner.planFileGroups(ImmutableList.of(sorted, stale));
assertThat(groups).as("Only the file with a stale sort order must be rewritten").hasSize(1);
assertThat(Iterables.getOnlyElement(groups)).containsExactly(stale);
}

@Test
void testRewriteStaleSortOrderSkipsSortedFiles() {
int currentSortOrderId = setTableSortOrder();

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(rewriteStaleSortOrderOptions());

FileScanTask sorted1 = MockFileScanTask.mockTask(450L, currentSortOrderId);
FileScanTask sorted2 = MockFileScanTask.mockTask(450L, currentSortOrderId);

assertThat(planner.planFileGroups(ImmutableList.of(sorted1, sorted2)))
.as("Optimally sized files already sorted by the current sort order must be skipped")
.isEmpty();
}

@Test
void testRewriteStaleSortOrderSelectsFilesWithNullSortOrderId() {
setTableSortOrder();

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(rewriteStaleSortOrderOptions());

FileScanTask nullSortOrderId = MockFileScanTask.mockTask(450L, null, table.spec().specId());

Iterable<List<FileScanTask>> groups = planner.planFileGroups(ImmutableList.of(nullSortOrderId));
assertThat(groups)
.as("A file with no sort order id must be treated as having a stale sort order")
.hasSize(1);
assertThat(Iterables.getOnlyElement(groups)).containsExactly(nullSortOrderId);
}

@Test
void testRewriteStaleSortOrderIsNoOpForUnsortedTable() {
// table has no sort order set (orderId 0)
BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(rewriteStaleSortOrderOptions());

FileScanTask file1 = MockFileScanTask.mockTask(450L, 0);
FileScanTask file2 = MockFileScanTask.mockTask(450L, 0);

assertThat(planner.planFileGroups(ImmutableList.of(file1, file2)))
.as("rewrite-stale-sort-order must be a no-op when the table has no sort order")
.isEmpty();
}

@Test
void testRewriteStaleSortOrderIgnoresFilesFromOtherSpec() {
int currentSortOrderId = setTableSortOrder();
int otherSpecId = table.spec().specId() + 1;

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(rewriteStaleSortOrderOptions());

// stale sort order id, but written under a non-current partition spec
FileScanTask otherSpec = MockFileScanTask.mockTask(450L, currentSortOrderId - 1, otherSpecId);

assertThat(planner.planFileGroups(ImmutableList.of(otherSpec)))
.as(
"Files from a non-current partition spec must not be selected by rewrite-stale-sort-order")
.isEmpty();
}

@Test
void testRewriteStaleSortOrderDisabledByDefault() {
int currentSortOrderId = setTableSortOrder();

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(
ImmutableMap.of(
BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, "250",
BinPackRewriteFilePlanner.TARGET_FILE_SIZE_BYTES, "500",
BinPackRewriteFilePlanner.MAX_FILE_SIZE_BYTES, "750"));

FileScanTask stale = MockFileScanTask.mockTask(450L, currentSortOrderId - 1);

assertThat(planner.planFileGroups(ImmutableList.of(stale)))
.as("Stale files must not be rewritten unless rewrite-stale-sort-order is enabled")
.isEmpty();
}

@Test
void testRewriteStaleSortOrderWithRewriteAll() {
int currentSortOrderId = setTableSortOrder();

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(
ImmutableMap.<String, String>builder()
.put(BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, "250")
.put(BinPackRewriteFilePlanner.TARGET_FILE_SIZE_BYTES, "500")
.put(BinPackRewriteFilePlanner.MAX_FILE_SIZE_BYTES, "750")
.put(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true")
.put(BinPackRewriteFilePlanner.REWRITE_ALL, "true")
.build());

// both files are already sorted by the current order, but rewrite-all forces them to be
// rewritten
FileScanTask sorted1 = MockFileScanTask.mockTask(450L, currentSortOrderId);
FileScanTask sorted2 = MockFileScanTask.mockTask(450L, currentSortOrderId);

Iterable<List<FileScanTask>> groups =
planner.planFileGroups(ImmutableList.of(sorted1, sorted2));
assertThat(groups).hasSize(1);
assertThat(Iterables.getOnlyElement(groups))
.as("rewrite-all must take precedence over rewrite-stale-sort-order")
.containsExactlyInAnyOrder(sorted1, sorted2);
}

private int setTableSortOrder() {
table.replaceSortOrder().asc("data").commit();
table.refresh();
return table.sortOrder().orderId();
}

private static Map<String, String> rewriteStaleSortOrderOptions() {
return ImmutableMap.of(
BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, "250",
BinPackRewriteFilePlanner.TARGET_FILE_SIZE_BYTES, "500",
BinPackRewriteFilePlanner.MAX_FILE_SIZE_BYTES, "750",
RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true");
}

private void addFiles() {
table
.newAppend()
Expand Down
8 changes: 8 additions & 0 deletions docs/docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile
| `output-spec-id` | current partition spec id | Identifier of the output partition spec. Data will be reorganized during the rewrite to align with the output partitioning. |
| `remove-dangling-deletes` | false | Remove dangling position and equality deletes after rewriting. A delete file is considered dangling if it does not apply to any live data files. Enabling this will generate an additional commit for the removal. |
| `max-files-to-rewrite` | null | This option sets an upper limit on the number of eligible files that will be rewritten. If this option is not specified, all eligible files will be rewritten. |
| `rewrite-stale-sort-order` | false | When true, also rewrite data files whose sort order id does not match the table's current default sort order, even if they are already optimally sized. This enables incremental sort compaction, where a `sort` rewrite only reorganizes files that are not already sorted by the current sort order. It has no effect on tables without a sort order, and is intended for use with the `sort` strategy so that rewritten files are marked with the table's current sort order id. It is not supported with a `zorder` sort order. |

!!! info
Dangling delete files are removed based solely on data sequence numbers. This action does not apply to global
Expand Down Expand Up @@ -475,6 +476,13 @@ Rewrite the data files in table `db.sample` and select the files that may contai
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"');
```

Incrementally sort the data files in table `db.sample`: rewrite only the files that are not already sorted
by the table's current sort order, leaving already-sorted files untouched. This is useful for maintaining
sort order on a table over time, or after changing the table's sort order with `ALTER TABLE ... WRITE ORDERED BY`.
```sql
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', options => map('rewrite-stale-sort-order', 'true'));
```

### `rewrite_manifests`

Rewrite manifests for a table to optimize scan planning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,45 @@ public void testRewriteDataFilesWithSortStrategy() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@TestTemplate
public void testRewriteDataFilesWithStaleSortOrder() {
createTable();
// create 10 files under non-partitioned table
insertData(10);
List<Object[]> expectedRecords = currentData();

// the sort order is set after the data is written, so all 10 files have a stale sort order id
sql("ALTER TABLE %s WRITE ORDERED BY c1", tableName);

// min-input-files = 12 would normally skip these 10 files; rewrite-stale-sort-order selects
// them anyway because their sort order id no longer matches the table's sort order
List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', "
+ "options => map('min-input-files','12','rewrite-stale-sort-order','true'))",
catalogName, tableIdent);

assertEquals(
"Action should rewrite 10 data files and add 1 data file",
row(10, 1),
Arrays.copyOf(output.get(0), 2));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);

// a second run is a no-op: every file now matches the table's current sort order
List<Object[]> secondOutput =
sql(
"CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', "
+ "options => map('min-input-files','12','rewrite-stale-sort-order','true'))",
catalogName, tableIdent);

assertEquals(
"Second run should not rewrite any data files",
ImmutableList.of(row(0, 0, 0L, 0, 0)),
secondOutput);
}

@TestTemplate
public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() {
createTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1548,6 +1548,51 @@ public void testSimpleSort() throws IOException {
dataFilesSortOrderShouldMatchTableSortOrder(table);
}

@TestTemplate
public void testSortIncrementalWithStaleSortOrder() throws IOException {
Table table = createTable(20);
shouldHaveFiles(table, 20);
// the sort order is set after the files were written, so all 20 files have a stale sort order
table.replaceSortOrder().asc("c2").commit();
table.refresh();

List<Object[]> originalData = currentData();

// the size thresholds make every file "optimally sized", so files are selected purely because
// their sort order id no longer matches the table's current sort order
RewriteDataFiles.Result firstResult =
basicRewrite(table)
.sort()
.option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0")
.option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1))
.option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true")
.execute();

assertThat(firstResult.rewrittenDataFilesCount())
.as("All files with a stale sort order should be rewritten")
.isEqualTo(20);

table.refresh();
assertEquals("Rewrite must not change data", originalData, currentData());
dataFilesSortOrderShouldMatchTableSortOrder(table);

// a second run is a no-op: every file now matches the table's current sort order
RewriteDataFiles.Result secondResult =
basicRewrite(table)
.sort()
.option(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "0")
.option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Long.toString(Long.MAX_VALUE - 1))
.option(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true")
.execute();

assertThat(secondResult.rewriteResults())
.as("Incremental sort compaction should converge to a no-op")
.isEmpty();

shouldHaveSnapshots(table, 2);
shouldHaveACleanCache(table);
}

@TestTemplate
public void testSortAfterPartitionChange() throws IOException {
Table table = createTable(20);
Expand Down
Loading