diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestExpireSnapshotsKeepReferencedFiles.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestExpireSnapshotsKeepReferencedFiles.java new file mode 100644 index 0000000000..9a93962e9b --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestExpireSnapshotsKeepReferencedFiles.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.optimizing.maintainer; + +import org.apache.amoro.BasicTableTestHelper; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableTestHelper; +import org.apache.amoro.catalog.BasicCatalogTestHelper; +import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer; +import org.apache.amoro.server.scheduler.inline.ExecutorTestBase; +import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.table.TableProperties; +import org.apache.amoro.table.UnkeyedTable; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.data.Record; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +/** + * Reproduces a data-file loss observed in production during snapshot expiration. + * + *

When a table has a single ref (the common case after a tag is dropped), {@code + * RemoveSnapshots} auto-selects {@link org.apache.iceberg.IncrementalFileCleanup}. That strategy + * walks the current snapshot's ancestor chain via {@code SnapshotUtil.ancestorIds}; if a parent + * snapshot is missing from metadata (e.g. expired by an earlier cycle) the walk terminates + * silently. Snapshots below that break are then treated as "not an ancestor", so the ADDED entries + * in their (superseded but still referenced) manifests are reverted and the data files are + * physically deleted - even though those files are still carried over as EXISTING entries in the + * current snapshot's manifest. The result is a current snapshot that references a missing file. + * + *

This test builds exactly that state and asserts the invariant that expiration must never + * delete a data file referenced by the current snapshot. It fails on the buggy incremental path and + * passes once {@link IcebergTableMaintainer} detects the off-main snapshot and forces the reachable + * cleanup strategy, which never walks the ancestor chain. + */ +@RunWith(Parameterized.class) +public class TestExpireSnapshotsKeepReferencedFiles extends ExecutorTestBase { + + @Parameterized.Parameters(name = "{0}, {1}") + public static Object[] parameters() { + return new Object[][] { + {new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(false, false)} + }; + } + + public TestExpireSnapshotsKeepReferencedFiles( + CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { + super(catalogTestHelper, tableTestHelper); + } + + @Test + public void testExpireKeepsFilesReferencedByCurrentSnapshot() { + UnkeyedTable table = getMixedTable().asUnkeyedTable(); + // Merge manifests explicitly below; keep auto-merge off so the fixture is deterministic. + table + .updateProperties() + .set("commit.manifest-merge.enabled", "false") + .set(TableProperties.SNAPSHOT_KEEP_DURATION, "0") + .commit(); + + // S0, S1: each append writes one data file in its own manifest (F0 in m0, F1 in m1). + DataFile f0 = appendOneRecord(table, 1, 1L); + DataFile f1 = appendOneRecord(table, 2, 2L); + + // S2: rewrite manifests into a single merged manifest holding F0, F1 as EXISTING entries. + // m0/m1 are now superseded but remain referenced by the manifest lists of S0/S1. + table.rewriteManifests().clusterBy(file -> 0).commit(); + long midSnapshotId = table.currentSnapshot().snapshotId(); + + // S3 (head): append F2. The current snapshot still references F0, F1 via the merged manifest. + DataFile f2 = appendOneRecord(table, 3, 3L); + + Assert.assertTrue(table.io().exists(f0.path().toString())); + Assert.assertTrue(table.io().exists(f1.path().toString())); + + // Explicitly expire the middle snapshot S2. This breaks head's ancestor chain + // (head.parent == S2, now absent), so a later ancestor walk truncates above S0/S1. + // Snapshot-id expiration uses reachable cleanup, so the baseline files stay safe here. + table.expireSnapshots().expireSnapshotId(midSnapshotId).cleanExpiredFiles(true).commit(); + Assert.assertTrue(table.io().exists(f0.path().toString())); + Assert.assertTrue(table.io().exists(f1.path().toString())); + + // Only the main ref remains, which is what makes iceberg auto-select incremental cleanup. + Assert.assertEquals(1, table.refs().size()); + + IcebergTableMaintainer maintainer = + new IcebergTableMaintainer(table, table.id(), TestTableMaintainerContext.of(table)); + maintainer.expireSnapshots(System.currentTimeMillis(), 1); + + // Sanity: head retained, history expired. + Assert.assertEquals(1, Iterables.size(table.snapshots())); + Assert.assertTrue(table.io().exists(f2.path().toString())); + + // Invariant: F0 and F1 are still referenced by the current snapshot, so they must survive. + Assert.assertTrue( + "F0 is referenced by the current snapshot and must not be deleted by expiration", + table.io().exists(f0.path().toString())); + Assert.assertTrue( + "F1 is referenced by the current snapshot and must not be deleted by expiration", + table.io().exists(f1.path().toString())); + } + + private DataFile appendOneRecord(UnkeyedTable table, int id, long txId) { + Record record = tableTestHelper().generateTestRecord(id, "name" + id, 0, "2022-01-01T00:00:00"); + List dataFiles = + tableTestHelper().writeBaseStore(table, txId, Lists.newArrayList(record), false); + Assert.assertEquals(1, dataFiles.size()); + AppendFiles appendFiles = table.newAppend(); + dataFiles.forEach(appendFiles::appendFile); + appendFiles.commit(); + return dataFiles.get(0); + } +} diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java index f906d4b866..f121d1fe62 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java @@ -46,10 +46,12 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.ReachableFileCleanupBridge; import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Schema; @@ -70,6 +72,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.SerializableFunction; +import org.apache.iceberg.util.SnapshotUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -201,14 +204,24 @@ private void expireSnapshots(long olderThan, int minCount, Set exclude) minCount, exclude); RollingFileCleaner expiredFileCleaner = new RollingFileCleaner(fileIO(), exclude); - table - .expireSnapshots() - .retainLast(Math.max(minCount, 1)) - .expireOlderThan(olderThan) - .deleteWith(expiredFileCleaner::addFile) - .cleanExpiredFiles( - true) /* enable clean only for collecting the expired files, will delete them later */ - .commit(); + ExpireSnapshots expireSnapshots = + table + .expireSnapshots() + .retainLast(Math.max(minCount, 1)) + .expireOlderThan(olderThan) + .deleteWith(expiredFileCleaner::addFile) + .cleanExpiredFiles( + true) /* enable clean only for collecting the expired files, will delete them later */; + // iceberg auto-selects IncrementalFileCleanup for single-ref tables. That strategy walks the + // current snapshot's ancestor chain and can terminate silently at a missing parent, then revert + // the ADDED entries of superseded-but-still-referenced manifests below the break - physically + // deleting data files the current snapshot still references (observed as partition data-file + // loss in production). The walk only truncates when a snapshot sits outside the current main + // ancestry, so force the safe ReachableFileCleanup then. + if (hasSnapshotsOutsideMainAncestry()) { + ReachableFileCleanupBridge.forceReachable(expireSnapshots); + } + expireSnapshots.commit(); int collectedFiles = expiredFileCleaner.fileCount(); expiredFileCleaner.clear(); @@ -227,6 +240,28 @@ private void expireSnapshots(long olderThan, int minCount, Set exclude) } } + /** + * Whether the table holds any snapshot that is not reachable from the current snapshot's ancestor + * chain. Such a snapshot makes {@code IncrementalFileCleanup}'s ancestor walk truncate, which can + * lead it to delete data files still referenced by the current snapshot. The walk used here is + * the same {@link SnapshotUtil#ancestorIds} used by the cleanup, so it detects exactly the states + * the incremental strategy would mishandle. + */ + private boolean hasSnapshotsOutsideMainAncestry() { + Snapshot currentSnapshot = table.currentSnapshot(); + if (currentSnapshot == null) { + return false; + } + Set mainAncestors = + Sets.newHashSet(SnapshotUtil.ancestorIds(currentSnapshot, table::snapshot)); + for (Snapshot snapshot : table.snapshots()) { + if (!mainAncestors.contains(snapshot.snapshotId())) { + return true; + } + } + return false; + } + @Override public void expireData() { DataExpirationConfig expirationConfig = context.getTableConfiguration().getExpiringDataConfig(); diff --git a/amoro-format-iceberg/src/main/java/org/apache/iceberg/ReachableFileCleanupBridge.java b/amoro-format-iceberg/src/main/java/org/apache/iceberg/ReachableFileCleanupBridge.java new file mode 100644 index 0000000000..fbc5eabcc7 --- /dev/null +++ b/amoro-format-iceberg/src/main/java/org/apache/iceberg/ReachableFileCleanupBridge.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg; + +/** + * Bridge into iceberg-core to force the reachable file-cleanup strategy during snapshot expiration. + * + *

{@link RemoveSnapshots#withIncrementalCleanup(boolean)} is package-private, so this class + * lives in {@code org.apache.iceberg} to call it without reflection. Binding at compile time means + * a future change to that method's signature breaks the build instead of silently falling back. + * + *

By default iceberg auto-selects {@link IncrementalFileCleanup} whenever the table has a single + * ref. That strategy walks the current snapshot's ancestor chain and terminates silently if a + * parent snapshot is missing from metadata; snapshots below the break are then treated as + * non-ancestors and the ADDED entries in their superseded-but-still-referenced manifests are + * reverted, physically deleting data files the current snapshot still references. Forcing {@link + * ReachableFileCleanup} avoids the ancestor walk entirely and is safe for any ref count. + */ +public final class ReachableFileCleanupBridge { + + private ReachableFileCleanupBridge() {} + + /** + * Forces {@code expireSnapshots} to use {@link ReachableFileCleanup} instead of letting iceberg + * auto-select the incremental strategy. + * + * @param expireSnapshots the expire operation to configure + * @return the same operation, for fluent chaining + */ + public static ExpireSnapshots forceReachable(ExpireSnapshots expireSnapshots) { + if (expireSnapshots instanceof RemoveSnapshots) { + ((RemoveSnapshots) expireSnapshots).withIncrementalCleanup(false); + } + return expireSnapshots; + } +}