Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ public interface RewriteDataFiles
*/
String OUTPUT_SPEC_ID = "output-spec-id";

/**
* If true, the rewrite also selects data files whose sort order id does not match the table's
* current default sort order id, even when they are already optimally sized.
*
* <p>Defaults to false.
*/
String REWRITE_STALE_SORT_ORDER = "rewrite-stale-sort-order";

boolean REWRITE_STALE_SORT_ORDER_DEFAULT = false;

/**
* Choose BINPACK as a strategy for this rewrite operation
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -101,6 +102,8 @@ public class BinPackRewriteFilePlanner
private double deleteRatioThreshold;
private RewriteJobOrder rewriteJobOrder;
private Integer maxFilesToRewrite;
private boolean rewriteStaleSortOrder;
private int currentSortOrderId;

public BinPackRewriteFilePlanner(Table table) {
this(table, Expressions.alwaysTrue());
Expand Down Expand Up @@ -139,6 +142,7 @@ public Set<String> validOptions() {
.add(DELETE_RATIO_THRESHOLD)
.add(RewriteDataFiles.REWRITE_JOB_ORDER)
.add(MAX_FILES_TO_REWRITE)
.add(RewriteDataFiles.REWRITE_STALE_SORT_ORDER)
.build();
}

Expand All @@ -154,6 +158,12 @@ public void init(Map<String, String> options) {
RewriteDataFiles.REWRITE_JOB_ORDER,
RewriteDataFiles.REWRITE_JOB_ORDER_DEFAULT));
this.maxFilesToRewrite = maxFilesToRewrite(options);
this.rewriteStaleSortOrder =
PropertyUtil.propertyAsBoolean(
options,
RewriteDataFiles.REWRITE_STALE_SORT_ORDER,
RewriteDataFiles.REWRITE_STALE_SORT_ORDER_DEFAULT);
this.currentSortOrderId = table().sortOrder().orderId();
}

private int deleteFileThreshold(Map<String, String> options) {
Expand Down Expand Up @@ -190,7 +200,10 @@ protected Iterable<FileScanTask> filterFiles(Iterable<FileScanTask> tasks) {
return Iterables.filter(
tasks,
task ->
outsideDesiredFileSizeRange(task) || tooManyDeletes(task) || tooHighDeleteRatio(task));
outsideDesiredFileSizeRange(task)
|| tooManyDeletes(task)
|| tooHighDeleteRatio(task)
|| hasStaleSortOrder(task));
}

@Override
Expand All @@ -202,7 +215,8 @@ protected Iterable<List<FileScanTask>> filterFileGroups(List<List<FileScanTask>>
|| enoughContent(group)
|| tooMuchContent(group)
|| group.stream().anyMatch(this::tooManyDeletes)
|| group.stream().anyMatch(this::tooHighDeleteRatio));
|| group.stream().anyMatch(this::tooHighDeleteRatio)
|| group.stream().anyMatch(this::hasStaleSortOrder));
}

@Override
Expand Down Expand Up @@ -270,6 +284,19 @@ private boolean tooManyDeletes(FileScanTask task) {
return task.deletes() != null && task.deletes().size() >= deleteFileThreshold;
}

private boolean hasStaleSortOrder(FileScanTask task) {
if (!rewriteStaleSortOrder || currentSortOrderId == SortOrder.unsorted().orderId()) {
return false;
}

if (task.file().specId() != table().spec().specId()) {
return false;
}

Integer sortOrderId = task.file().sortOrderId();
return sortOrderId == null || sortOrderId != currentSortOrderId;
}

private boolean tooHighDeleteRatio(FileScanTask task) {
if (task.deletes() == null || task.deletes().isEmpty()) {
return false;
Expand Down
5 changes: 5 additions & 0 deletions core/src/test/java/org/apache/iceberg/MockFileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@ public MockFileScanTask(
}

public static MockFileScanTask mockTask(long length, int sortOrderId) {
return mockTask(length, (Integer) sortOrderId, 0);
}

public static MockFileScanTask mockTask(long length, Integer sortOrderId, int specId) {
DataFile mockFile = Mockito.mock(DataFile.class);
Mockito.when(mockFile.fileSizeInBytes()).thenReturn(length);
Mockito.when(mockFile.sortOrderId()).thenReturn(sortOrderId);
Mockito.when(mockFile.specId()).thenReturn(specId);
return new MockFileScanTask(mockFile);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ void testValidOptions() {
BinPackRewriteFilePlanner.DELETE_FILE_THRESHOLD,
BinPackRewriteFilePlanner.DELETE_RATIO_THRESHOLD,
RewriteDataFiles.REWRITE_JOB_ORDER,
BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE));
BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE,
RewriteDataFiles.REWRITE_STALE_SORT_ORDER));
}

@Test
Expand Down Expand Up @@ -581,6 +582,143 @@ public void testRewriteMaxFilesRewriteGreaterThanTotalFiles() {
assertThat(fileScanTasks).isLessThanOrEqualTo(numFiles).isLessThanOrEqualTo(500);
}

@Test
void testRewriteStaleSortOrderSelectsStaleFiles() {
int currentSortOrderId = setTableSortOrder();

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(rewriteStaleSortOrderOptions());

// both files are optimally sized, so only the stale sort order id makes one a rewrite target
FileScanTask sorted = MockFileScanTask.mockTask(450L, currentSortOrderId);
FileScanTask stale = MockFileScanTask.mockTask(450L, currentSortOrderId - 1);

Iterable<List<FileScanTask>> groups = planner.planFileGroups(ImmutableList.of(sorted, stale));
assertThat(groups).as("Only the file with a stale sort order must be rewritten").hasSize(1);
assertThat(Iterables.getOnlyElement(groups)).containsExactly(stale);
}

@Test
void testRewriteStaleSortOrderSkipsSortedFiles() {
int currentSortOrderId = setTableSortOrder();

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(rewriteStaleSortOrderOptions());

FileScanTask sorted1 = MockFileScanTask.mockTask(450L, currentSortOrderId);
FileScanTask sorted2 = MockFileScanTask.mockTask(450L, currentSortOrderId);

assertThat(planner.planFileGroups(ImmutableList.of(sorted1, sorted2)))
.as("Optimally sized files already sorted by the current sort order must be skipped")
.isEmpty();
}

@Test
void testRewriteStaleSortOrderSelectsFilesWithNullSortOrderId() {
setTableSortOrder();

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(rewriteStaleSortOrderOptions());

FileScanTask nullSortOrderId = MockFileScanTask.mockTask(450L, null, table.spec().specId());

Iterable<List<FileScanTask>> groups = planner.planFileGroups(ImmutableList.of(nullSortOrderId));
assertThat(groups)
.as("A file with no sort order id must be treated as having a stale sort order")
.hasSize(1);
assertThat(Iterables.getOnlyElement(groups)).containsExactly(nullSortOrderId);
}

@Test
void testRewriteStaleSortOrderIsNoOpForUnsortedTable() {
// table has no sort order set (orderId 0)
BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(rewriteStaleSortOrderOptions());

FileScanTask file1 = MockFileScanTask.mockTask(450L, 0);
FileScanTask file2 = MockFileScanTask.mockTask(450L, 0);

assertThat(planner.planFileGroups(ImmutableList.of(file1, file2)))
.as("rewrite-stale-sort-order must be a no-op when the table has no sort order")
.isEmpty();
}

@Test
void testRewriteStaleSortOrderIgnoresFilesFromOtherSpec() {
int currentSortOrderId = setTableSortOrder();
int otherSpecId = table.spec().specId() + 1;

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(rewriteStaleSortOrderOptions());

// stale sort order id, but written under a non-current partition spec
FileScanTask otherSpec = MockFileScanTask.mockTask(450L, currentSortOrderId - 1, otherSpecId);

assertThat(planner.planFileGroups(ImmutableList.of(otherSpec)))
.as(
"Files from a non-current partition spec must not be selected by rewrite-stale-sort-order")
.isEmpty();
}

@Test
void testRewriteStaleSortOrderDisabledByDefault() {
int currentSortOrderId = setTableSortOrder();

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(
ImmutableMap.of(
BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, "250",
BinPackRewriteFilePlanner.TARGET_FILE_SIZE_BYTES, "500",
BinPackRewriteFilePlanner.MAX_FILE_SIZE_BYTES, "750"));

FileScanTask stale = MockFileScanTask.mockTask(450L, currentSortOrderId - 1);

assertThat(planner.planFileGroups(ImmutableList.of(stale)))
.as("Stale files must not be rewritten unless rewrite-stale-sort-order is enabled")
.isEmpty();
}

@Test
void testRewriteStaleSortOrderWithRewriteAll() {
int currentSortOrderId = setTableSortOrder();

BinPackRewriteFilePlanner planner = new BinPackRewriteFilePlanner(table);
planner.init(
ImmutableMap.<String, String>builder()
.put(BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, "250")
.put(BinPackRewriteFilePlanner.TARGET_FILE_SIZE_BYTES, "500")
.put(BinPackRewriteFilePlanner.MAX_FILE_SIZE_BYTES, "750")
.put(RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true")
.put(BinPackRewriteFilePlanner.REWRITE_ALL, "true")
.build());

// both files are already sorted by the current order, but rewrite-all forces them to be
// rewritten
FileScanTask sorted1 = MockFileScanTask.mockTask(450L, currentSortOrderId);
FileScanTask sorted2 = MockFileScanTask.mockTask(450L, currentSortOrderId);

Iterable<List<FileScanTask>> groups =
planner.planFileGroups(ImmutableList.of(sorted1, sorted2));
assertThat(groups).hasSize(1);
assertThat(Iterables.getOnlyElement(groups))
.as("rewrite-all must take precedence over rewrite-stale-sort-order")
.containsExactlyInAnyOrder(sorted1, sorted2);
}

private int setTableSortOrder() {
table.replaceSortOrder().asc("data").commit();
table.refresh();
return table.sortOrder().orderId();
}

private static Map<String, String> rewriteStaleSortOrderOptions() {
return ImmutableMap.of(
BinPackRewriteFilePlanner.MIN_FILE_SIZE_BYTES, "250",
BinPackRewriteFilePlanner.TARGET_FILE_SIZE_BYTES, "500",
BinPackRewriteFilePlanner.MAX_FILE_SIZE_BYTES, "750",
RewriteDataFiles.REWRITE_STALE_SORT_ORDER, "true");
}

private void addFiles() {
table
.newAppend()
Expand Down
6 changes: 6 additions & 0 deletions docs/docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ Iceberg can compact data files in parallel using Spark with the `rewriteDataFile
| `output-spec-id` | current partition spec id | Identifier of the output partition spec. Data will be reorganized during the rewrite to align with the output partitioning. |
| `remove-dangling-deletes` | false | Remove dangling position and equality deletes after rewriting. A delete file is considered dangling if it does not apply to any live data files. Enabling this will generate an additional commit for the removal. |
| `max-files-to-rewrite` | null | This option sets an upper limit on the number of eligible files that will be rewritten. If this option is not specified, all eligible files will be rewritten. |
| `rewrite-stale-sort-order` | false | When true, also rewrite data files whose sort order id does not match the table's current default sort order, even if they are already optimally sized. |

!!! info
Dangling delete files are removed based solely on data sequence numbers. This action does not apply to global
Expand Down Expand Up @@ -475,6 +476,11 @@ Rewrite the data files in table `db.sample` and select the files that may contai
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"');
```

Rewrite only the data files in table `db.sample` whose sort order id does not match the table's current sort order.
```sql
CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', options => map('rewrite-stale-sort-order', 'true'));
```

### `rewrite_manifests`

Rewrite manifests for a table to optimize scan planning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,20 @@ public Builder maxFilesToRewrite(int maxFilesToRewrite) {
return this;
}

/**
* Configures whether to also rewrite data files whose sort order id does not match the table's
* current default sort order. See {@link
* org.apache.iceberg.actions.RewriteDataFiles#REWRITE_STALE_SORT_ORDER} for more details.
*
* @param rewriteStaleSortOrder enables rewriting files with a stale sort order
*/
public Builder rewriteStaleSortOrder(boolean rewriteStaleSortOrder) {
this.rewriteOptions.put(
org.apache.iceberg.actions.RewriteDataFiles.REWRITE_STALE_SORT_ORDER,
String.valueOf(rewriteStaleSortOrder));
return this;
}

/**
* A user provided filter for determining which files will be considered by the rewrite
* strategy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,45 @@ public void testRewriteDataFilesWithSortStrategy() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@TestTemplate
public void testRewriteDataFilesWithStaleSortOrder() {
createTable();
// create 10 files under non-partitioned table
insertData(10);
List<Object[]> expectedRecords = currentData();

// the sort order is set after the data is written, so all 10 files have a stale sort order id
sql("ALTER TABLE %s WRITE ORDERED BY c1", tableName);

// min-input-files = 12 would normally skip these 10 files; rewrite-stale-sort-order selects
// them anyway because their sort order id no longer matches the table's sort order
List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', "
+ "options => map('min-input-files','12','rewrite-stale-sort-order','true'))",
catalogName, tableIdent);

assertEquals(
"Action should rewrite 10 data files and add 1 data file",
row(10, 1),
Arrays.copyOf(output.get(0), 2));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);

// a second run is a no-op: every file now matches the table's current sort order
List<Object[]> secondOutput =
sql(
"CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort', "
+ "options => map('min-input-files','12','rewrite-stale-sort-order','true'))",
catalogName, tableIdent);

assertEquals(
"Second run should not rewrite any data files",
ImmutableList.of(row(0, 0, 0L, 0, 0)),
secondOutput);
}

@TestTemplate
public void testRewriteDataFilesWithSortStrategyAndMultipleShufflePartitionsPerFile() {
createTable();
Expand Down
Loading