diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java index fcd9373a28..bead94410a 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java @@ -522,6 +522,105 @@ public void testRetainMinSnapshot() { Iterators.elementsEqual(expectedSnapshots.iterator(), table.snapshots().iterator())); } + /** + * Regression test for issue #4006. + * + *

In keyed Mixed-Iceberg tables, {@link MixedTableMaintainer#expireSnapshots()} invokes the + * change maintainer before the base maintainer. When base and change stores share the same + * server-side TableMetadata (e.g. AMS internal REST catalog), the change commit bumps the shared + * meta_version, leaving the base maintainer's cached Iceberg {@link org.apache.iceberg.Table} + * reference stale and causing the subsequent base commit to fail with {@code + * CommitFailedException}. The fix refreshes the base table right after the change maintainer + * commits. + * + *

The bug is not reproducible with the default {@link BasicCatalogTestHelper} backend because + * base and change stores there live in two independent Iceberg tables and do not share a single + * meta_version. We therefore assert the fix at the behavioural level: swap the base + * maintainer's cached Iceberg table with a Mockito spy and verify that {@code refresh()} is + * invoked during {@link MixedTableMaintainer#expireSnapshots()}. The pre-fix code path did not + * call {@code refresh()} at all inside the expire-snapshots flow, so this assertion fails without + * the fix and passes with it. + */ + @Test + public void testRefreshBaseTableBeforeBaseExpiration() throws Exception { + Assume.assumeTrue(isKeyedTable()); + KeyedTable testKeyedTable = getMixedTable().asKeyedTable(); + testKeyedTable.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, "0").commit(); + testKeyedTable.updateProperties().set(TableProperties.CHANGE_DATA_TTL, "0").commit(); + + writeAndCommitBaseStore(testKeyedTable); + writeAndCommitBaseStore(testKeyedTable); + insertChangeDataFiles(testKeyedTable, 1); + insertChangeDataFiles(testKeyedTable, 2); + + MixedTableMaintainer tableMaintainer = + new MixedTableMaintainer(testKeyedTable, TestTableMaintainerContext.of(testKeyedTable)); + + // Wrap the base maintainer's cached Iceberg table with a spy so we can observe refresh() + // calls. The `table` field lives in IcebergTableMaintainer (super class) and is declared + // public, but we still go through reflection to stay robust against future visibility + // changes. + org.apache.iceberg.Table originalBaseTable = tableMaintainer.getBaseMaintainer().table; + org.apache.iceberg.Table spyBaseTable = org.mockito.Mockito.spy(originalBaseTable); + java.lang.reflect.Field tableField = + org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer.class.getDeclaredField( + "table"); + tableField.setAccessible(true); + tableField.set(tableMaintainer.getBaseMaintainer(), spyBaseTable); + + tableMaintainer.expireSnapshots(); + + // The fix must trigger at least one refresh() on the base table between the change + // maintainer's commit and the base maintainer's own commit. Without the fix, no call to + // refresh() happens inside the expire-snapshots flow on the base maintainer's cached table. + org.mockito.Mockito.verify(spyBaseTable, org.mockito.Mockito.atLeastOnce()).refresh(); + } + + /** + * End-to-end regression test for issue + * #4006: with the fix in place, a full {@code expireSnapshots()} run on a keyed table must + * (1) not throw, and (2) actually shrink both the change-store and the base-store snapshot lists. + * Prior to the fix, the base-store commit would silently abort inside {@link + * MixedTableMaintainer#expireSnapshots()} under the shared-metadata scenario, leaving base + * snapshots intact. + */ + @Test + public void testExpireSnapshotsShrinksBothStores() { + Assume.assumeTrue(isKeyedTable()); + KeyedTable testKeyedTable = getMixedTable().asKeyedTable(); + testKeyedTable.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, "0").commit(); + testKeyedTable.updateProperties().set(TableProperties.CHANGE_DATA_TTL, "0").commit(); + + // Produce multiple expirable snapshots on both stores. + writeAndCommitBaseStore(testKeyedTable); + writeAndCommitBaseStore(testKeyedTable); + writeAndCommitBaseStore(testKeyedTable); + insertChangeDataFiles(testKeyedTable, 1); + insertChangeDataFiles(testKeyedTable, 2); + insertChangeDataFiles(testKeyedTable, 3); + + int baseSnapshotsBefore = Iterables.size(testKeyedTable.baseTable().snapshots()); + int changeSnapshotsBefore = Iterables.size(testKeyedTable.changeTable().snapshots()); + Assert.assertTrue(baseSnapshotsBefore > 1); + Assert.assertTrue(changeSnapshotsBefore > 1); + + MixedTableMaintainer tableMaintainer = + new MixedTableMaintainer(testKeyedTable, TestTableMaintainerContext.of(testKeyedTable)); + tableMaintainer.expireSnapshots(); + + testKeyedTable.baseTable().refresh(); + testKeyedTable.changeTable().refresh(); + int baseSnapshotsAfter = Iterables.size(testKeyedTable.baseTable().snapshots()); + int changeSnapshotsAfter = Iterables.size(testKeyedTable.changeTable().snapshots()); + + Assert.assertTrue( + "base store snapshots should have been reduced (see issue #4006)", + baseSnapshotsAfter < baseSnapshotsBefore); + Assert.assertTrue( + "change store snapshots should have been reduced", + changeSnapshotsAfter < changeSnapshotsBefore); + } + @Test public void testSnapshotExpireConfig() { UnkeyedTable table = diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/MixedTableMaintainer.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/MixedTableMaintainer.java index 72e4a6429c..1df36cff85 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/MixedTableMaintainer.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/MixedTableMaintainer.java @@ -115,6 +115,14 @@ public void cleanDanglingDeleteFiles() { public void expireSnapshots() { if (changeMaintainer != null) { changeMaintainer.expireSnapshots(); + // For Mixed-Iceberg tables managed by AMS internal REST catalog, the base store and the + // change store share the same server-side TableMetadata record (and thus the same + // meta_version). After the change store commits its snapshot expiration the shared + // meta_version has been bumped, while the base store's Iceberg Table instance still + // holds the stale metadata reference captured at construction time. Refresh it before + // base expiration to avoid CommitFailedException from the optimistic-lock check. + // See https://github.com/apache/amoro/issues/4006 + refreshBaseTable(); } baseMaintainer.expireSnapshots(); } @@ -123,10 +131,24 @@ public void expireSnapshots() { protected void expireSnapshots(long mustOlderThan, int minCount) { if (changeMaintainer != null) { changeMaintainer.expireSnapshots(mustOlderThan, minCount); + // See the comment in expireSnapshots() above and issue #4006. + refreshBaseTable(); } baseMaintainer.expireSnapshots(mustOlderThan, minCount); } + private void refreshBaseTable() { + try { + baseMaintainer.table.refresh(); + } catch (Throwable t) { + LOG.warn( + "Failed to refresh base table metadata for {} before base snapshot expiration, " + + "base commit may fail with a stale meta_version conflict", + mixedTable.id(), + t); + } + } + @Override public void expireData() { DataExpirationConfig expirationConfig = context.getTableConfiguration().getExpiringDataConfig();