Skip to content

Commit

Permalink
Revert "[UNIFORM] Disable cleanup of files in expire snapshots via AP…
Browse files Browse the repository at this point in the history
…I" (#3692)

#### Which Delta project/connector is this regarding?
Uniform

## Description
This reverts commit b51b5b4.
disabling the cleanExpiredFiles API technically prevents removal
manifests/manifest lists and users may not be running orphan file
removal, so for those users manifests/manifest list may never be cleaned
up. For now we can revert this patch to preserve the original behavior
of just preventing data file removal so storage can be reclaimed via
manifest/manifest list cleanup during uniform commits.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

This reverts to cleaning up unreachable manifests/manifest lists during
background commits, not really a direct user facing change.
  • Loading branch information
amogh-jahagirdar committed Sep 20, 2024
1 parent 2547e91 commit 413a5cb
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class IcebergConversionTransaction(
}

def getExpireSnapshotHelper(): ExpireSnapshotHelper = {
val ret = new ExpireSnapshotHelper(txn.expireSnapshots().cleanExpiredFiles(false))
val ret = new ExpireSnapshotHelper(txn.expireSnapshots())
fileUpdates += ret
ret
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkContext
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogStorageFormat}
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.icebergShaded.IcebergTransactionUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType, StructField}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -111,7 +109,6 @@ class ConvertToIcebergSuite extends QueryTest with Eventually {
runDeltaSql(
s"""CREATE TABLE `${testTableName}` (col1 INT) USING DELTA
|TBLPROPERTIES (
| 'delta.enableIcebergCompatV2' = 'true',
| 'delta.columnMapping.mode' = 'name',
| 'delta.universalFormat.enabledFormats' = 'iceberg'
|)""".stripMargin)
Expand All @@ -126,85 +123,21 @@ class ConvertToIcebergSuite extends QueryTest with Eventually {
withDefaultTablePropsInSQLConf {
deltaSpark.range(10).write.format("delta")
.option("path", testTablePath)
.option("delta.enableIcebergCompatV2", "true")
.saveAsTable(testTableName)
}
}
withDeltaSparkSession { deltaSpark =>
deltaSpark.range(10, 20, 1)
.write.format("delta").mode("append")
.option("path", testTablePath)
.option("delta.enableIcebergCompatV2", "true")
.saveAsTable(testTableName)
}
verifyReadWithIceberg(testTableName, 0 to 19 map (Row(_)))
}
}

test("Expire Snapshots") {
if (hmsReady(PORT)) {
runDeltaSql(
s"""CREATE TABLE `${testTableName}` (col1 INT) USING DELTA
|TBLPROPERTIES (
| 'delta.enableIcebergCompatV2' = 'true',
| 'delta.columnMapping.mode' = 'name',
| 'delta.universalFormat.enabledFormats' = 'iceberg'
|)""".stripMargin)

val icebergTable = loadIcebergTable()
icebergTable.updateProperties().set("history.expire.max-snapshot-age-ms", "1").commit()

for (i <- 0 to 7) {
runDeltaSql(s"INSERT INTO ${testTableName} VALUES (${i})",
DeltaSQLConf.DELTA_UNIFORM_ICEBERG_SYNC_CONVERT_ENABLED.key -> "true")
}

// Sleep past snapshot retention duration
Thread.sleep(5)
withIcebergSparkSession { icebergSpark => {
icebergSpark.sql(s"REFRESH TABLE $testTableName")
val manifestListsBeforeExpiration = icebergSpark
.sql(s"SELECT * FROM default.${testTableName}.snapshots")
.select("manifest_list")
.collect()

assert(manifestListsBeforeExpiration.length == 8)

// Trigger snapshot expiration
runDeltaSql(s"OPTIMIZE ${testTableName}")
icebergSpark.sql(s"REFRESH TABLE $testTableName")

val manifestListsAfterExpiration = icebergSpark
.sql(s"SELECT * FROM default.${testTableName}.snapshots")
.select("manifest_list")
.collect()

assert(manifestListsAfterExpiration.length == 1)
// Manifests from earlier snapshots should not be removed
manifestListsBeforeExpiration.toStream.foreach(
manifestList => assert(
icebergTable.io().newInputFile(manifestList.get(0).asInstanceOf[String]).exists()))
}}
}
}

private def loadIcebergTable(): shadedForDelta.org.apache.iceberg.Table = {
withDeltaSparkSession { deltaSpark => {
val log = DeltaLog.forTable(deltaSpark, testTablePath)
val hiveCatalog = IcebergTransactionUtils.createHiveCatalog(
log.newDeltaHadoopConf()
)
val table = hiveCatalog.loadTable(
shadedForDelta.org.apache.iceberg.catalog.TableIdentifier
.of("default", testTableName)
)
table
}}
}

def runDeltaSql(sqlStr: String, conf: (String, String)*): Unit = {
def runDeltaSql(sqlStr: String): Unit = {
withDeltaSparkSession { deltaSpark =>
conf.foreach(c => deltaSpark.conf.set(c._1, c._2))
deltaSpark.sql(sqlStr)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
iceberg core must NOT delete any delta data files

---
.../iceberg/IncrementalFileCleanup.java | 8 +--
.../apache/iceberg/ReachableFileCleanup.java | 5 +-
.../apache/iceberg/TestRemoveSnapshots.java | 57 +++++++++++--------
3 files changed, 40 insertions(+), 30 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
index d894dcbf36d..ead7ea6b076 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
@@ -256,10 +256,10 @@ class IncrementalFileCleanup extends FileCleanupStrategy {
}
});

- Set<String> filesToDelete =
- findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration);
-
- deleteFiles(filesToDelete, "data");
+ // iceberg core MUST NOT delete any data files which are managed by delta
+ // Set<String> filesToDelete =
+ // findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration);
+ // deleteFiles(filesToDelete, "data");
LOG.warn("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete));
LOG.warn("Manifests Lists to delete: {}", Joiner.on(", ").join(manifestListsToDelete));
deleteFiles(manifestsToDelete, "manifest");
diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
index ccbee78e27b..da888a63b3d 100644
--- a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
+++ b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
@@ -72,8 +72,9 @@ class ReachableFileCleanup extends FileCleanupStrategy {
snapshotsAfterExpiration, deletionCandidates, currentManifests::add);

if (!manifestsToDelete.isEmpty()) {
- Set<String> dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests);
- deleteFiles(dataFilesToDelete, "data");
+ // iceberg core MUST NOT delete any data files which are managed by delta
+ // Set<String> dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests);
+ // deleteFiles(dataFilesToDelete, "data");
Set<String> manifestPathsToDelete =
manifestsToDelete.stream().map(ManifestFile::path).collect(Collectors.toSet());
deleteFiles(manifestPathsToDelete, "manifest");
diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/connector/iceberg-core/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
index 53e5af520d9..95fa8e41de1 100644
--- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
+++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
@@ -147,8 +147,9 @@ public class TestRemoveSnapshots extends TableTestBase {
secondSnapshot
.allManifests(table.io())
.get(0)
- .path(), // manifest contained only deletes, was dropped
- FILE_A.path()), // deleted
+ .path() // manifest contained only deletes, was dropped
+ // FILE_A.path() should NOT delete data files
+ ), // deleted
deletedFiles);
}

@@ -209,8 +210,9 @@ public class TestRemoveSnapshots extends TableTestBase {
.allManifests(table.io())
.get(0)
.path(), // manifest was rewritten for delete
- secondSnapshot.manifestListLocation(), // snapshot expired
- FILE_A.path()), // deleted
+ secondSnapshot.manifestListLocation() // snapshot expired
+ // FILE_A.path() should not delete any data files
+ ),
deletedFiles);
}

@@ -309,8 +311,9 @@ public class TestRemoveSnapshots extends TableTestBase {
Sets.newHashSet(
secondSnapshot.manifestListLocation(), // snapshot expired
Iterables.getOnlyElement(secondSnapshotManifests)
- .path(), // manifest is no longer referenced
- FILE_B.path()), // added, but rolled back
+ .path() // manifest is no longer referenced
+ // FILE_B.path() should not delete any data files
+ ),
deletedFiles);
}

@@ -686,7 +689,8 @@ public class TestRemoveSnapshots extends TableTestBase {

removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit();

- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString()));
+ Assert.assertTrue("FILE_A should NOT be deleted",
+ !deletedFiles.contains(FILE_A.path().toString()));
}

@Test
@@ -712,7 +716,8 @@ public class TestRemoveSnapshots extends TableTestBase {

removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit();

- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString()));
+ Assert.assertTrue("FILE_A should NOT be deleted",
+ !deletedFiles.contains(FILE_A.path().toString()));
}

@Test
@@ -749,8 +754,10 @@ public class TestRemoveSnapshots extends TableTestBase {

removeSnapshots(table).expireOlderThan(t4).deleteWith(deletedFiles::add).commit();

- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString()));
- Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString()));
+ Assert.assertTrue("FILE_A should NOT be deleted",
+ !deletedFiles.contains(FILE_A.path().toString()));
+ Assert.assertTrue("FILE_B should NOT be deleted",
+ !deletedFiles.contains(FILE_B.path().toString()));
}

@Test
@@ -824,9 +831,11 @@ public class TestRemoveSnapshots extends TableTestBase {
Sets.newHashSet(
"remove-snapshot-0", "remove-snapshot-1", "remove-snapshot-2", "remove-snapshot-3"));

- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString()));
- Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString()));
- Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0);
+ Assert.assertTrue("FILE_A should NOT be deleted",
+ !deletedFiles.contains(FILE_A.path().toString()));
+ Assert.assertTrue("FILE_B should NOT be deleted",
+ !deletedFiles.contains(FILE_B.path().toString()));
+ // Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0);
}

@Test
@@ -885,13 +894,13 @@ public class TestRemoveSnapshots extends TableTestBase {
Set<String> expectedDeletes = Sets.newHashSet();
expectedDeletes.add(snapshotA.manifestListLocation());

- // Files should be deleted of dangling staged snapshot
- snapshotB
- .addedDataFiles(table.io())
- .forEach(
- i -> {
- expectedDeletes.add(i.path().toString());
- });
+ // Files should NOT be deleted of dangling staged snapshot
+ // snapshotB
+ // .addedDataFiles(table.io())
+ // .forEach(
+ // i -> {
+ // expectedDeletes.add(i.path().toString());
+ // });

// ManifestList should be deleted too
expectedDeletes.add(snapshotB.manifestListLocation());
@@ -1144,10 +1153,10 @@ public class TestRemoveSnapshots extends TableTestBase {
removeSnapshots(table).expireOlderThan(fourthSnapshotTs).deleteWith(deletedFiles::add).commit();

Assert.assertEquals(
- "Should remove old delete files and delete file manifests",
+ "Should only delete file manifests",
ImmutableSet.builder()
- .add(FILE_A.path())
- .add(FILE_A_DELETES.path())
+ // .add(FILE_A.path())
+ // .add(FILE_A_DELETES.path())
.add(firstSnapshot.manifestListLocation())
.add(secondSnapshot.manifestListLocation())
.add(thirdSnapshot.manifestListLocation())
@@ -1501,7 +1510,7 @@ public class TestRemoveSnapshots extends TableTestBase {
expectedDeletes.addAll(manifestPaths(appendA, table.io()));
expectedDeletes.add(branchDelete.manifestListLocation());
expectedDeletes.addAll(manifestPaths(branchDelete, table.io()));
- expectedDeletes.add(FILE_A.path().toString());
+ // expectedDeletes.add(FILE_A.path().toString());

Assert.assertEquals(2, Iterables.size(table.snapshots()));
Assert.assertEquals(expectedDeletes, deletedFiles);
--
2.39.2 (Apple Git-143)

0 comments on commit 413a5cb

Please sign in to comment.