diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java index cee5499af1..86c846e116 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java @@ -167,7 +167,9 @@ public void expireSnapshots(TableRuntime tableRuntime) { if (!expireSnapshotEnabled(tableRuntime)) { return; } - expireSnapshots(mustOlderThan(tableRuntime, System.currentTimeMillis())); + expireSnapshots( + mustOlderThan(tableRuntime, System.currentTimeMillis()), + tableRuntime.getTableConfiguration().getSnapshotMinCount()); } protected boolean expireSnapshotEnabled(TableRuntime tableRuntime) { @@ -176,18 +178,22 @@ protected boolean expireSnapshotEnabled(TableRuntime tableRuntime) { } @VisibleForTesting - void expireSnapshots(long mustOlderThan) { - expireSnapshots(mustOlderThan, expireSnapshotNeedToExcludeFiles()); + void expireSnapshots(long mustOlderThan, int minCount) { + expireSnapshots(mustOlderThan, minCount, expireSnapshotNeedToExcludeFiles()); } - private void expireSnapshots(long olderThan, Set exclude) { - LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude); + private void expireSnapshots(long olderThan, int minCount, Set exclude) { + LOG.debug( + "start expire snapshots older than {} and retain last {} snapshots, the exclude is {}", + olderThan, + minCount, + exclude); final AtomicInteger toDeleteFiles = new AtomicInteger(0); Set parentDirectories = new HashSet<>(); Set expiredFiles = new HashSet<>(); table .expireSnapshots() - .retainLast(1) + .retainLast(Math.max(minCount, 1)) .expireOlderThan(olderThan) .deleteWith( file -> { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java index a4a6445fd9..1b5c8cbd82 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/MixedTableMaintainer.java @@ -109,11 +109,11 @@ public void expireSnapshots(TableRuntime tableRuntime) { } @VisibleForTesting - protected void expireSnapshots(long mustOlderThan) { + protected void expireSnapshots(long mustOlderThan, int minCount) { if (changeMaintainer != null) { - changeMaintainer.expireSnapshots(mustOlderThan); + changeMaintainer.expireSnapshots(mustOlderThan, minCount); } - baseMaintainer.expireSnapshots(mustOlderThan); + baseMaintainer.expireSnapshots(mustOlderThan, minCount); } @Override @@ -291,9 +291,9 @@ public ChangeTableMaintainer(UnkeyedTable unkeyedTable) { @Override @VisibleForTesting - void expireSnapshots(long mustOlderThan) { + void expireSnapshots(long mustOlderThan, int minCount) { expireFiles(mustOlderThan); - super.expireSnapshots(mustOlderThan); + super.expireSnapshots(mustOlderThan, minCount); } @Override @@ -303,7 +303,9 @@ public void expireSnapshots(TableRuntime tableRuntime) { } long now = System.currentTimeMillis(); expireFiles(now - snapshotsKeepTime(tableRuntime)); - expireSnapshots(mustOlderThan(tableRuntime, now)); + expireSnapshots( + mustOlderThan(tableRuntime, now), + tableRuntime.getTableConfiguration().getSnapshotMinCount()); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java index bdb258c0b2..bb7823c99e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.temporal.ChronoUnit; import java.util.Locale; import java.util.Map; import java.util.Set; @@ -61,10 +62,19 @@ public static TableConfiguration parseTableConfig(Map properties TableProperties.ENABLE_TABLE_EXPIRE, TableProperties.ENABLE_TABLE_EXPIRE_DEFAULT)) .setSnapshotTTLMinutes( - CompatiblePropertyUtil.propertyAsLong( + ConfigHelpers.TimeUtils.parseDuration( + CompatiblePropertyUtil.propertyAsString( + properties, + TableProperties.SNAPSHOT_KEEP_DURATION, + TableProperties.SNAPSHOT_KEEP_DURATION_DEFAULT), + ChronoUnit.MINUTES) + .getSeconds() + / 60) + .setSnapshotMinCount( + CompatiblePropertyUtil.propertyAsInt( properties, - TableProperties.BASE_SNAPSHOT_KEEP_MINUTES, - TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT)) + TableProperties.SNAPSHOT_MIN_COUNT, + TableProperties.SNAPSHOT_MIN_COUNT_DEFAULT)) .setChangeDataTTLMinutes( CompatiblePropertyUtil.propertyAsLong( properties, diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java index 559accaa64..2796b21522 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java @@ -122,7 +122,7 @@ public void testExpireChangeTableFiles() { // In order to advance the snapshot insertChangeDataFiles(testKeyedTable, 2); - tableMaintainer.getChangeMaintainer().expireSnapshots(System.currentTimeMillis()); + tableMaintainer.getChangeMaintainer().expireSnapshots(System.currentTimeMillis(), 1); Assert.assertEquals(1, Iterables.size(testKeyedTable.changeTable().snapshots())); s1Files.forEach( @@ -333,7 +333,7 @@ public void testExpireTableFiles4All() { List newDataFiles = writeAndCommitBaseStore(table); Assert.assertEquals(3, Iterables.size(table.snapshots())); - new MixedTableMaintainer(table).expireSnapshots(System.currentTimeMillis()); + new MixedTableMaintainer(table).expireSnapshots(System.currentTimeMillis(), 1); Assert.assertEquals(1, Iterables.size(table.snapshots())); dataFiles.forEach(file -> Assert.assertFalse(table.io().exists(file.path().toString()))); @@ -383,7 +383,7 @@ public void testExpireTableFilesRepeatedly() { MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(testKeyedTable); tableMaintainer.getChangeMaintainer().expireFiles(secondCommitTime + 1); - tableMaintainer.getChangeMaintainer().expireSnapshots(secondCommitTime + 1); + tableMaintainer.getChangeMaintainer().expireSnapshots(secondCommitTime + 1, 1); Set dataFiles = getDataFiles(testKeyedTable); Assert.assertEquals(last4File, dataFiles); @@ -449,7 +449,7 @@ public void testExpireStatisticsFiles() { Assert.assertTrue(baseTable.io().exists(file1.path())); Assert.assertTrue(baseTable.io().exists(file2.path())); Assert.assertTrue(baseTable.io().exists(file3.path())); - new MixedTableMaintainer(testKeyedTable).expireSnapshots(expireTime); + new MixedTableMaintainer(testKeyedTable).expireSnapshots(expireTime, 1); Assert.assertEquals(1, Iterables.size(baseTable.snapshots())); Assert.assertFalse(baseTable.io().exists(file1.path())); @@ -525,6 +525,62 @@ public void testBaseTableGcDisabled() { Assert.assertEquals(1, Iterables.size(testUnkeyedTable.snapshots())); } + @Test + public void testRetainMinSnapshot() { + UnkeyedTable table = + isKeyedTable() + ? getMixedTable().asKeyedTable().baseTable() + : getMixedTable().asUnkeyedTable(); + table.newAppend().commit(); + table.newAppend().commit(); + List expectedSnapshots = new ArrayList<>(); + expectedSnapshots.add(table.currentSnapshot()); + + table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, "0s").commit(); + table.updateProperties().set(TableProperties.SNAPSHOT_MIN_COUNT, "3").commit(); + + TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); + Mockito.when(tableRuntime.getTableIdentifier()) + .thenReturn( + ServerTableIdentifier.of(AmsUtil.toTableIdentifier(table.id()), getTestFormat())); + Mockito.when(tableRuntime.getTableConfiguration()) + .thenReturn(TableConfigurations.parseTableConfig(table.properties())); + Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE); + + new MixedTableMaintainer(table).expireSnapshots(tableRuntime); + Assert.assertEquals(2, Iterables.size(table.snapshots())); + + table.newAppend().commit(); + expectedSnapshots.add(table.currentSnapshot()); + table.newAppend().commit(); + expectedSnapshots.add(table.currentSnapshot()); + + new MixedTableMaintainer(table).expireSnapshots(tableRuntime); + Assert.assertEquals(3, Iterables.size(table.snapshots())); + Assert.assertTrue( + Iterators.elementsEqual(expectedSnapshots.iterator(), table.snapshots().iterator())); + } + + @Test + public void testSnapshotExpireConfig() { + UnkeyedTable table = + isKeyedTable() + ? getMixedTable().asKeyedTable().baseTable() + : getMixedTable().asUnkeyedTable(); + table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, "180s").commit(); + Assert.assertEquals( + 3L, TableConfigurations.parseTableConfig(table.properties()).getSnapshotTTLMinutes()); + + // using default time unit: minutes + table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, "720").commit(); + Assert.assertEquals( + 720L, TableConfigurations.parseTableConfig(table.properties()).getSnapshotTTLMinutes()); + + table.updateProperties().set(TableProperties.SNAPSHOT_MIN_COUNT, "10").commit(); + Assert.assertEquals( + 10, TableConfigurations.parseTableConfig(table.properties()).getSnapshotMinCount()); + } + private long waitUntilAfter(long timestampMillis) { long current = System.currentTimeMillis(); while (current <= timestampMillis) { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java index efad6c8834..dd9f198c86 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpireHive.java @@ -115,7 +115,7 @@ public void testExpireTableFiles() { ? getMixedTable().asKeyedTable().baseTable() : getMixedTable().asUnkeyedTable(); MixedTableMaintainer mixedTableMaintainer = new MixedTableMaintainer(getMixedTable()); - mixedTableMaintainer.getBaseMaintainer().expireSnapshots(System.currentTimeMillis()); + mixedTableMaintainer.getBaseMaintainer().expireSnapshots(System.currentTimeMillis(), 1); Assert.assertEquals(1, Iterables.size(unkeyedTable.snapshots())); hiveFiles.forEach( diff --git a/amoro-common/src/main/java/org/apache/amoro/config/ConfigHelpers.java b/amoro-common/src/main/java/org/apache/amoro/config/ConfigHelpers.java index 4873e8581c..d7a705e04a 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/ConfigHelpers.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/ConfigHelpers.java @@ -299,6 +299,10 @@ public static class TimeUtils { private static final Map LABEL_TO_UNIT_MAP = Collections.unmodifiableMap(initMap()); + public static Duration parseDuration(String text) { + return parseDuration(text, ChronoUnit.MILLIS); + } + /** * Parse the given string to a java {@link Duration}. The string is in format "{length * value}{time unit label}", e.g. "123ms", "321 s". If no time unit label is specified, it will @@ -318,7 +322,7 @@ public static class TimeUtils { * * @param text string to parse. */ - public static Duration parseDuration(String text) { + public static Duration parseDuration(String text, ChronoUnit defaultUnit) { checkNotNull(text); final String trimmed = text.trim(); @@ -350,7 +354,7 @@ public static Duration parseDuration(String text) { } if (unitLabel.isEmpty()) { - return Duration.of(value, ChronoUnit.MILLIS); + return Duration.of(value, defaultUnit); } ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel); diff --git a/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java b/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java index f20705b1f2..5efedfbbea 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java @@ -27,6 +27,7 @@ public class TableConfiguration { private boolean expireSnapshotEnabled; private long snapshotTTLMinutes; + private int snapshotMinCount; private long changeDataTTLMinutes; private boolean cleanOrphanEnabled; private long orphanExistingMinutes; @@ -45,6 +46,10 @@ public long getSnapshotTTLMinutes() { return snapshotTTLMinutes; } + public int getSnapshotMinCount() { + return snapshotMinCount; + } + public long getChangeDataTTLMinutes() { return changeDataTTLMinutes; } @@ -76,6 +81,11 @@ public TableConfiguration setSnapshotTTLMinutes(long snapshotTTLMinutes) { return this; } + public TableConfiguration setSnapshotMinCount(int snapshotMinCount) { + this.snapshotMinCount = snapshotMinCount; + return this; + } + public TableConfiguration setChangeDataTTLMinutes(long changeDataTTLMinutes) { this.changeDataTTLMinutes = changeDataTTLMinutes; return this; @@ -130,6 +140,7 @@ public boolean equals(Object o) { TableConfiguration that = (TableConfiguration) o; return expireSnapshotEnabled == that.expireSnapshotEnabled && snapshotTTLMinutes == that.snapshotTTLMinutes + && snapshotMinCount == that.snapshotMinCount && changeDataTTLMinutes == that.changeDataTTLMinutes && cleanOrphanEnabled == that.cleanOrphanEnabled && orphanExistingMinutes == that.orphanExistingMinutes @@ -144,6 +155,7 @@ public int hashCode() { return Objects.hashCode( expireSnapshotEnabled, snapshotTTLMinutes, + snapshotMinCount, changeDataTTLMinutes, cleanOrphanEnabled, orphanExistingMinutes, diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java index efd9e994dd..debe3f0cae 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java @@ -150,8 +150,22 @@ private TableProperties() {} public static final String CHANGE_DATA_TTL = "change.data.ttl.minutes"; public static final long CHANGE_DATA_TTL_DEFAULT = 10080; // 7 Days - public static final String BASE_SNAPSHOT_KEEP_MINUTES = "snapshot.base.keep.minutes"; - public static final long BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT = 720; // 12 Hours + /** + * @deprecated Use {@link TableProperties#SNAPSHOT_KEEP_DURATION } instead; will be removed in + * 0.9.0 + */ + @Deprecated public static final String BASE_SNAPSHOT_KEEP_MINUTES = "snapshot.base.keep.minutes"; + /** + * @deprecated Use {@link TableProperties#SNAPSHOT_KEEP_DURATION_DEFAULT } instead; will be + * removed in 0.9.0 + */ + @Deprecated public static final long BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT = 720; // 12 Hours + + public static final String SNAPSHOT_KEEP_DURATION = "snapshot.keep.duration"; + public static final String SNAPSHOT_KEEP_DURATION_DEFAULT = "720min"; // 12 Hours + + public static final String SNAPSHOT_MIN_COUNT = "snapshot.keep.min-count"; + public static final int SNAPSHOT_MIN_COUNT_DEFAULT = 1; public static final String ENABLE_ORPHAN_CLEAN = "clean-orphan-file.enabled"; public static final boolean ENABLE_ORPHAN_CLEAN_DEFAULT = false; diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/CompatiblePropertyUtil.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/CompatiblePropertyUtil.java index ab48600c58..6d68cfe7a6 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/CompatiblePropertyUtil.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/utils/CompatiblePropertyUtil.java @@ -96,6 +96,8 @@ private static String getLegacyProperty(String property) { return TableProperties.ENABLE_ORPHAN_CLEAN_LEGACY; case TableProperties.ENABLE_LOG_STORE: return TableProperties.ENABLE_LOG_STORE_LEGACY; + case TableProperties.SNAPSHOT_KEEP_DURATION: + return TableProperties.BASE_SNAPSHOT_KEEP_MINUTES; default: return null; } diff --git a/docs/user-guides/configurations.md b/docs/user-guides/configurations.md index aa1bda9877..5bcf7afbbd 100644 --- a/docs/user-guides/configurations.md +++ b/docs/user-guides/configurations.md @@ -69,7 +69,8 @@ Data-cleaning configurations are applicable to both Iceberg Format and Mixed str |---------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | table-expire.enabled | true | Enables periodically expire table | | change.data.ttl.minutes | 10080(7 days) | Time to live in minutes for data of ChangeStore | -| snapshot.base.keep.minutes | 720(12 hours) | Table-Expiration keeps the latest snapshots of BaseStore within a specified time in minutes | +| snapshot.keep.duration | 720min(12 hours) | Table-Expiration keeps the latest snapshots within a specified duration | +| snapshot.keep.min-count | 1 | Minimum number of snapshots retained for table expiration | | clean-orphan-file.enabled | false | Enables periodically clean orphan files | | clean-orphan-file.min-existing-time-minutes | 2880(2 days) | Cleaning orphan files keeps the files modified within a specified time in minutes | | clean-dangling-delete-files.enabled | true | Whether to enable cleaning of dangling delete files |