Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,105 @@ public void testRetainMinSnapshot() {
Iterators.elementsEqual(expectedSnapshots.iterator(), table.snapshots().iterator()));
}

/**
* Regression test for <a href="https://github.com/apache/amoro/issues/4006">issue #4006</a>.
*
* <p>In keyed Mixed-Iceberg tables, {@link MixedTableMaintainer#expireSnapshots()} invokes the
* change maintainer before the base maintainer. When base and change stores share the same
* server-side TableMetadata (e.g. AMS internal REST catalog), the change commit bumps the shared
* meta_version, leaving the base maintainer's cached Iceberg {@link org.apache.iceberg.Table}
* reference stale and causing the subsequent base commit to fail with {@code
* CommitFailedException}. The fix refreshes the base table right after the change maintainer
* commits.
*
* <p>The bug is not reproducible with the default {@link BasicCatalogTestHelper} backend because
* base and change stores there live in two independent Iceberg tables and do not share a single
* meta_version. We therefore assert the fix at the <i>behavioural</i> level: swap the base
* maintainer's cached Iceberg table with a Mockito spy and verify that {@code refresh()} is
* invoked during {@link MixedTableMaintainer#expireSnapshots()}. The pre-fix code path did not
* call {@code refresh()} at all inside the expire-snapshots flow, so this assertion fails without
* the fix and passes with it.
*/
@Test
public void testRefreshBaseTableBeforeBaseExpiration() throws Exception {
Assume.assumeTrue(isKeyedTable());
KeyedTable testKeyedTable = getMixedTable().asKeyedTable();
testKeyedTable.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, "0").commit();
testKeyedTable.updateProperties().set(TableProperties.CHANGE_DATA_TTL, "0").commit();

writeAndCommitBaseStore(testKeyedTable);
writeAndCommitBaseStore(testKeyedTable);
insertChangeDataFiles(testKeyedTable, 1);
insertChangeDataFiles(testKeyedTable, 2);

MixedTableMaintainer tableMaintainer =
new MixedTableMaintainer(testKeyedTable, TestTableMaintainerContext.of(testKeyedTable));

// Wrap the base maintainer's cached Iceberg table with a spy so we can observe refresh()
// calls. The `table` field lives in IcebergTableMaintainer (super class) and is declared
// public, but we still go through reflection to stay robust against future visibility
// changes.
org.apache.iceberg.Table originalBaseTable = tableMaintainer.getBaseMaintainer().table;
org.apache.iceberg.Table spyBaseTable = org.mockito.Mockito.spy(originalBaseTable);
java.lang.reflect.Field tableField =
org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.class.getDeclaredField(
"table");
tableField.setAccessible(true);
tableField.set(tableMaintainer.getBaseMaintainer(), spyBaseTable);

tableMaintainer.expireSnapshots();

// The fix must trigger at least one refresh() on the base table between the change
// maintainer's commit and the base maintainer's own commit. Without the fix, no call to
// refresh() happens inside the expire-snapshots flow on the base maintainer's cached table.
org.mockito.Mockito.verify(spyBaseTable, org.mockito.Mockito.atLeastOnce()).refresh();
}

/**
* End-to-end regression test for <a href="https://github.com/apache/amoro/issues/4006">issue
* #4006</a>: with the fix in place, a full {@code expireSnapshots()} run on a keyed table must
* (1) not throw, and (2) actually shrink both the change-store and the base-store snapshot lists.
* Prior to the fix, the base-store commit would silently abort inside {@link
* MixedTableMaintainer#expireSnapshots()} under the shared-metadata scenario, leaving base
* snapshots intact.
*/
@Test
public void testExpireSnapshotsShrinksBothStores() {
Assume.assumeTrue(isKeyedTable());
KeyedTable testKeyedTable = getMixedTable().asKeyedTable();
testKeyedTable.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, "0").commit();
testKeyedTable.updateProperties().set(TableProperties.CHANGE_DATA_TTL, "0").commit();

// Produce multiple expirable snapshots on both stores.
writeAndCommitBaseStore(testKeyedTable);
writeAndCommitBaseStore(testKeyedTable);
writeAndCommitBaseStore(testKeyedTable);
insertChangeDataFiles(testKeyedTable, 1);
insertChangeDataFiles(testKeyedTable, 2);
insertChangeDataFiles(testKeyedTable, 3);

int baseSnapshotsBefore = Iterables.size(testKeyedTable.baseTable().snapshots());
int changeSnapshotsBefore = Iterables.size(testKeyedTable.changeTable().snapshots());
Assert.assertTrue(baseSnapshotsBefore > 1);
Assert.assertTrue(changeSnapshotsBefore > 1);

MixedTableMaintainer tableMaintainer =
new MixedTableMaintainer(testKeyedTable, TestTableMaintainerContext.of(testKeyedTable));
tableMaintainer.expireSnapshots();

testKeyedTable.baseTable().refresh();
testKeyedTable.changeTable().refresh();
int baseSnapshotsAfter = Iterables.size(testKeyedTable.baseTable().snapshots());
int changeSnapshotsAfter = Iterables.size(testKeyedTable.changeTable().snapshots());

Assert.assertTrue(
"base store snapshots should have been reduced (see issue #4006)",
baseSnapshotsAfter < baseSnapshotsBefore);
Assert.assertTrue(
"change store snapshots should have been reduced",
changeSnapshotsAfter < changeSnapshotsBefore);
}

@Test
public void testSnapshotExpireConfig() {
UnkeyedTable table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ public void cleanDanglingDeleteFiles() {
public void expireSnapshots() {
if (changeMaintainer != null) {
changeMaintainer.expireSnapshots();
// For Mixed-Iceberg tables managed by AMS internal REST catalog, the base store and the
// change store share the same server-side TableMetadata record (and thus the same
// meta_version). After the change store commits its snapshot expiration the shared
// meta_version has been bumped, while the base store's Iceberg Table instance still
// holds the stale metadata reference captured at construction time. Refresh it before
// base expiration to avoid CommitFailedException from the optimistic-lock check.
// See https://github.com/apache/amoro/issues/4006
refreshBaseTable();
}
baseMaintainer.expireSnapshots();
}
Expand All @@ -123,10 +131,24 @@ public void expireSnapshots() {
protected void expireSnapshots(long mustOlderThan, int minCount) {
if (changeMaintainer != null) {
changeMaintainer.expireSnapshots(mustOlderThan, minCount);
// See the comment in expireSnapshots() above and issue #4006.
refreshBaseTable();
}
baseMaintainer.expireSnapshots(mustOlderThan, minCount);
}

private void refreshBaseTable() {
try {
baseMaintainer.table.refresh();
} catch (Throwable t) {
LOG.warn(
"Failed to refresh base table metadata for {} before base snapshot expiration, "
+ "base commit may fail with a stale meta_version conflict",
mixedTable.id(),
t);
}
}

@Override
public void expireData() {
DataExpirationConfig expirationConfig = context.getTableConfiguration().getExpiringDataConfig();
Expand Down
Loading