Skip to content

Commit

Permalink
[AMORO-3163] Support for configuring the number of remaining snapshots (
Browse files Browse the repository at this point in the history
#3164)

* configure number of snapshots

* update document
  • Loading branch information
XBaith authored Sep 9, 2024
1 parent a1a1219 commit 17d9d5e
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ public void expireSnapshots(TableRuntime tableRuntime) {
if (!expireSnapshotEnabled(tableRuntime)) {
return;
}
expireSnapshots(mustOlderThan(tableRuntime, System.currentTimeMillis()));
expireSnapshots(
mustOlderThan(tableRuntime, System.currentTimeMillis()),
tableRuntime.getTableConfiguration().getSnapshotMinCount());
}

protected boolean expireSnapshotEnabled(TableRuntime tableRuntime) {
Expand All @@ -176,18 +178,22 @@ protected boolean expireSnapshotEnabled(TableRuntime tableRuntime) {
}

@VisibleForTesting
void expireSnapshots(long mustOlderThan) {
expireSnapshots(mustOlderThan, expireSnapshotNeedToExcludeFiles());
void expireSnapshots(long mustOlderThan, int minCount) {
expireSnapshots(mustOlderThan, minCount, expireSnapshotNeedToExcludeFiles());
}

private void expireSnapshots(long olderThan, Set<String> exclude) {
LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude);
private void expireSnapshots(long olderThan, int minCount, Set<String> exclude) {
LOG.debug(
"start expire snapshots older than {} and retain last {} snapshots, the exclude is {}",
olderThan,
minCount,
exclude);
final AtomicInteger toDeleteFiles = new AtomicInteger(0);
Set<String> parentDirectories = new HashSet<>();
Set<String> expiredFiles = new HashSet<>();
table
.expireSnapshots()
.retainLast(1)
.retainLast(Math.max(minCount, 1))
.expireOlderThan(olderThan)
.deleteWith(
file -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ public void expireSnapshots(TableRuntime tableRuntime) {
}

@VisibleForTesting
protected void expireSnapshots(long mustOlderThan) {
protected void expireSnapshots(long mustOlderThan, int minCount) {
if (changeMaintainer != null) {
changeMaintainer.expireSnapshots(mustOlderThan);
changeMaintainer.expireSnapshots(mustOlderThan, minCount);
}
baseMaintainer.expireSnapshots(mustOlderThan);
baseMaintainer.expireSnapshots(mustOlderThan, minCount);
}

@Override
Expand Down Expand Up @@ -291,9 +291,9 @@ public ChangeTableMaintainer(UnkeyedTable unkeyedTable) {

@Override
@VisibleForTesting
void expireSnapshots(long mustOlderThan) {
void expireSnapshots(long mustOlderThan, int minCount) {
expireFiles(mustOlderThan);
super.expireSnapshots(mustOlderThan);
super.expireSnapshots(mustOlderThan, minCount);
}

@Override
Expand All @@ -303,7 +303,9 @@ public void expireSnapshots(TableRuntime tableRuntime) {
}
long now = System.currentTimeMillis();
expireFiles(now - snapshotsKeepTime(tableRuntime));
expireSnapshots(mustOlderThan(tableRuntime, now));
expireSnapshots(
mustOlderThan(tableRuntime, now),
tableRuntime.getTableConfiguration().getSnapshotMinCount());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.temporal.ChronoUnit;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
Expand All @@ -61,10 +62,19 @@ public static TableConfiguration parseTableConfig(Map<String, String> properties
TableProperties.ENABLE_TABLE_EXPIRE,
TableProperties.ENABLE_TABLE_EXPIRE_DEFAULT))
.setSnapshotTTLMinutes(
CompatiblePropertyUtil.propertyAsLong(
ConfigHelpers.TimeUtils.parseDuration(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.SNAPSHOT_KEEP_DURATION,
TableProperties.SNAPSHOT_KEEP_DURATION_DEFAULT),
ChronoUnit.MINUTES)
.getSeconds()
/ 60)
.setSnapshotMinCount(
CompatiblePropertyUtil.propertyAsInt(
properties,
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES,
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT))
TableProperties.SNAPSHOT_MIN_COUNT,
TableProperties.SNAPSHOT_MIN_COUNT_DEFAULT))
.setChangeDataTTLMinutes(
CompatiblePropertyUtil.propertyAsLong(
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testExpireChangeTableFiles() {
// In order to advance the snapshot
insertChangeDataFiles(testKeyedTable, 2);

tableMaintainer.getChangeMaintainer().expireSnapshots(System.currentTimeMillis());
tableMaintainer.getChangeMaintainer().expireSnapshots(System.currentTimeMillis(), 1);

Assert.assertEquals(1, Iterables.size(testKeyedTable.changeTable().snapshots()));
s1Files.forEach(
Expand Down Expand Up @@ -333,7 +333,7 @@ public void testExpireTableFiles4All() {

List<DataFile> newDataFiles = writeAndCommitBaseStore(table);
Assert.assertEquals(3, Iterables.size(table.snapshots()));
new MixedTableMaintainer(table).expireSnapshots(System.currentTimeMillis());
new MixedTableMaintainer(table).expireSnapshots(System.currentTimeMillis(), 1);
Assert.assertEquals(1, Iterables.size(table.snapshots()));

dataFiles.forEach(file -> Assert.assertFalse(table.io().exists(file.path().toString())));
Expand Down Expand Up @@ -383,7 +383,7 @@ public void testExpireTableFilesRepeatedly() {

MixedTableMaintainer tableMaintainer = new MixedTableMaintainer(testKeyedTable);
tableMaintainer.getChangeMaintainer().expireFiles(secondCommitTime + 1);
tableMaintainer.getChangeMaintainer().expireSnapshots(secondCommitTime + 1);
tableMaintainer.getChangeMaintainer().expireSnapshots(secondCommitTime + 1, 1);

Set<CharSequence> dataFiles = getDataFiles(testKeyedTable);
Assert.assertEquals(last4File, dataFiles);
Expand Down Expand Up @@ -449,7 +449,7 @@ public void testExpireStatisticsFiles() {
Assert.assertTrue(baseTable.io().exists(file1.path()));
Assert.assertTrue(baseTable.io().exists(file2.path()));
Assert.assertTrue(baseTable.io().exists(file3.path()));
new MixedTableMaintainer(testKeyedTable).expireSnapshots(expireTime);
new MixedTableMaintainer(testKeyedTable).expireSnapshots(expireTime, 1);

Assert.assertEquals(1, Iterables.size(baseTable.snapshots()));
Assert.assertFalse(baseTable.io().exists(file1.path()));
Expand Down Expand Up @@ -525,6 +525,62 @@ public void testBaseTableGcDisabled() {
Assert.assertEquals(1, Iterables.size(testUnkeyedTable.snapshots()));
}

@Test
public void testRetainMinSnapshot() {
UnkeyedTable table =
isKeyedTable()
? getMixedTable().asKeyedTable().baseTable()
: getMixedTable().asUnkeyedTable();
table.newAppend().commit();
table.newAppend().commit();
List<Snapshot> expectedSnapshots = new ArrayList<>();
expectedSnapshots.add(table.currentSnapshot());

table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, "0s").commit();
table.updateProperties().set(TableProperties.SNAPSHOT_MIN_COUNT, "3").commit();

TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
Mockito.when(tableRuntime.getTableIdentifier())
.thenReturn(
ServerTableIdentifier.of(AmsUtil.toTableIdentifier(table.id()), getTestFormat()));
Mockito.when(tableRuntime.getTableConfiguration())
.thenReturn(TableConfigurations.parseTableConfig(table.properties()));
Mockito.when(tableRuntime.getOptimizingStatus()).thenReturn(OptimizingStatus.IDLE);

new MixedTableMaintainer(table).expireSnapshots(tableRuntime);
Assert.assertEquals(2, Iterables.size(table.snapshots()));

table.newAppend().commit();
expectedSnapshots.add(table.currentSnapshot());
table.newAppend().commit();
expectedSnapshots.add(table.currentSnapshot());

new MixedTableMaintainer(table).expireSnapshots(tableRuntime);
Assert.assertEquals(3, Iterables.size(table.snapshots()));
Assert.assertTrue(
Iterators.elementsEqual(expectedSnapshots.iterator(), table.snapshots().iterator()));
}

@Test
public void testSnapshotExpireConfig() {
UnkeyedTable table =
isKeyedTable()
? getMixedTable().asKeyedTable().baseTable()
: getMixedTable().asUnkeyedTable();
table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, "180s").commit();
Assert.assertEquals(
3L, TableConfigurations.parseTableConfig(table.properties()).getSnapshotTTLMinutes());

// using default time unit: minutes
table.updateProperties().set(TableProperties.SNAPSHOT_KEEP_DURATION, "720").commit();
Assert.assertEquals(
720L, TableConfigurations.parseTableConfig(table.properties()).getSnapshotTTLMinutes());

table.updateProperties().set(TableProperties.SNAPSHOT_MIN_COUNT, "10").commit();
Assert.assertEquals(
10, TableConfigurations.parseTableConfig(table.properties()).getSnapshotMinCount());
}

private long waitUntilAfter(long timestampMillis) {
long current = System.currentTimeMillis();
while (current <= timestampMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testExpireTableFiles() {
? getMixedTable().asKeyedTable().baseTable()
: getMixedTable().asUnkeyedTable();
MixedTableMaintainer mixedTableMaintainer = new MixedTableMaintainer(getMixedTable());
mixedTableMaintainer.getBaseMaintainer().expireSnapshots(System.currentTimeMillis());
mixedTableMaintainer.getBaseMaintainer().expireSnapshots(System.currentTimeMillis(), 1);
Assert.assertEquals(1, Iterables.size(unkeyedTable.snapshots()));

hiveFiles.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ public static class TimeUtils {
private static final Map<String, ChronoUnit> LABEL_TO_UNIT_MAP =
Collections.unmodifiableMap(initMap());

public static Duration parseDuration(String text) {
return parseDuration(text, ChronoUnit.MILLIS);
}

/**
* Parse the given string to a java {@link Duration}. The string is in format "{length
* value}{time unit label}", e.g. "123ms", "321 s". If no time unit label is specified, it will
Expand All @@ -318,7 +322,7 @@ public static class TimeUtils {
*
* @param text string to parse.
*/
public static Duration parseDuration(String text) {
public static Duration parseDuration(String text, ChronoUnit defaultUnit) {
checkNotNull(text);

final String trimmed = text.trim();
Expand Down Expand Up @@ -350,7 +354,7 @@ public static Duration parseDuration(String text) {
}

if (unitLabel.isEmpty()) {
return Duration.of(value, ChronoUnit.MILLIS);
return Duration.of(value, defaultUnit);
}

ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public class TableConfiguration {
private boolean expireSnapshotEnabled;
private long snapshotTTLMinutes;
private int snapshotMinCount;
private long changeDataTTLMinutes;
private boolean cleanOrphanEnabled;
private long orphanExistingMinutes;
Expand All @@ -45,6 +46,10 @@ public long getSnapshotTTLMinutes() {
return snapshotTTLMinutes;
}

public int getSnapshotMinCount() {
return snapshotMinCount;
}

public long getChangeDataTTLMinutes() {
return changeDataTTLMinutes;
}
Expand Down Expand Up @@ -76,6 +81,11 @@ public TableConfiguration setSnapshotTTLMinutes(long snapshotTTLMinutes) {
return this;
}

public TableConfiguration setSnapshotMinCount(int snapshotMinCount) {
this.snapshotMinCount = snapshotMinCount;
return this;
}

public TableConfiguration setChangeDataTTLMinutes(long changeDataTTLMinutes) {
this.changeDataTTLMinutes = changeDataTTLMinutes;
return this;
Expand Down Expand Up @@ -130,6 +140,7 @@ public boolean equals(Object o) {
TableConfiguration that = (TableConfiguration) o;
return expireSnapshotEnabled == that.expireSnapshotEnabled
&& snapshotTTLMinutes == that.snapshotTTLMinutes
&& snapshotMinCount == that.snapshotMinCount
&& changeDataTTLMinutes == that.changeDataTTLMinutes
&& cleanOrphanEnabled == that.cleanOrphanEnabled
&& orphanExistingMinutes == that.orphanExistingMinutes
Expand All @@ -144,6 +155,7 @@ public int hashCode() {
return Objects.hashCode(
expireSnapshotEnabled,
snapshotTTLMinutes,
snapshotMinCount,
changeDataTTLMinutes,
cleanOrphanEnabled,
orphanExistingMinutes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,22 @@ private TableProperties() {}
public static final String CHANGE_DATA_TTL = "change.data.ttl.minutes";
public static final long CHANGE_DATA_TTL_DEFAULT = 10080; // 7 Days

public static final String BASE_SNAPSHOT_KEEP_MINUTES = "snapshot.base.keep.minutes";
public static final long BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT = 720; // 12 Hours
/**
* @deprecated Use {@link TableProperties#SNAPSHOT_KEEP_DURATION } instead; will be removed in
* 0.9.0
*/
@Deprecated public static final String BASE_SNAPSHOT_KEEP_MINUTES = "snapshot.base.keep.minutes";
/**
* @deprecated Use {@link TableProperties#SNAPSHOT_KEEP_DURATION_DEFAULT } instead; will be
* removed in 0.9.0
*/
@Deprecated public static final long BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT = 720; // 12 Hours

public static final String SNAPSHOT_KEEP_DURATION = "snapshot.keep.duration";
public static final String SNAPSHOT_KEEP_DURATION_DEFAULT = "720min"; // 12 Hours

public static final String SNAPSHOT_MIN_COUNT = "snapshot.keep.min-count";
public static final int SNAPSHOT_MIN_COUNT_DEFAULT = 1;

public static final String ENABLE_ORPHAN_CLEAN = "clean-orphan-file.enabled";
public static final boolean ENABLE_ORPHAN_CLEAN_DEFAULT = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ private static String getLegacyProperty(String property) {
return TableProperties.ENABLE_ORPHAN_CLEAN_LEGACY;
case TableProperties.ENABLE_LOG_STORE:
return TableProperties.ENABLE_LOG_STORE_LEGACY;
case TableProperties.SNAPSHOT_KEEP_DURATION:
return TableProperties.BASE_SNAPSHOT_KEEP_MINUTES;
default:
return null;
}
Expand Down
3 changes: 2 additions & 1 deletion docs/user-guides/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ Data-cleaning configurations are applicable to both Iceberg Format and Mixed str
|---------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| table-expire.enabled | true | Enables periodically expire table |
| change.data.ttl.minutes | 10080(7 days) | Time to live in minutes for data of ChangeStore |
| snapshot.base.keep.minutes | 720(12 hours) | Table-Expiration keeps the latest snapshots of BaseStore within a specified time in minutes |
| snapshot.keep.duration | 720min(12 hours) | Table-Expiration keeps the latest snapshots within a specified duration |
| snapshot.keep.min-count | 1 | Minimum number of snapshots retained for table expiration |
| clean-orphan-file.enabled | false | Enables periodically clean orphan files |
| clean-orphan-file.min-existing-time-minutes | 2880(2 days) | Cleaning orphan files keeps the files modified within a specified time in minutes |
| clean-dangling-delete-files.enabled | true | Whether to enable cleaning of dangling delete files |
Expand Down

0 comments on commit 17d9d5e

Please sign in to comment.