Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[UNIFORM] Disable cleanup of files in expire snapshots via API" #3692

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class IcebergConversionTransaction(
}

def getExpireSnapshotHelper(): ExpireSnapshotHelper = {
val ret = new ExpireSnapshotHelper(txn.expireSnapshots().cleanExpiredFiles(false))
val ret = new ExpireSnapshotHelper(txn.expireSnapshots())
fileUpdates += ret
ret
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkContext
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogStorageFormat}
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.icebergShaded.IcebergTransactionUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType, StructField}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -111,7 +109,6 @@ class ConvertToIcebergSuite extends QueryTest with Eventually {
runDeltaSql(
s"""CREATE TABLE `${testTableName}` (col1 INT) USING DELTA
|TBLPROPERTIES (
| 'delta.enableIcebergCompatV2' = 'true',
| 'delta.columnMapping.mode' = 'name',
| 'delta.universalFormat.enabledFormats' = 'iceberg'
|)""".stripMargin)
Expand All @@ -126,85 +123,21 @@ class ConvertToIcebergSuite extends QueryTest with Eventually {
withDefaultTablePropsInSQLConf {
deltaSpark.range(10).write.format("delta")
.option("path", testTablePath)
.option("delta.enableIcebergCompatV2", "true")
.saveAsTable(testTableName)
}
}
withDeltaSparkSession { deltaSpark =>
deltaSpark.range(10, 20, 1)
.write.format("delta").mode("append")
.option("path", testTablePath)
.option("delta.enableIcebergCompatV2", "true")
.saveAsTable(testTableName)
}
verifyReadWithIceberg(testTableName, 0 to 19 map (Row(_)))
}
}

test("Expire Snapshots") {
if (hmsReady(PORT)) {
runDeltaSql(
s"""CREATE TABLE `${testTableName}` (col1 INT) USING DELTA
|TBLPROPERTIES (
| 'delta.enableIcebergCompatV2' = 'true',
| 'delta.columnMapping.mode' = 'name',
| 'delta.universalFormat.enabledFormats' = 'iceberg'
|)""".stripMargin)

val icebergTable = loadIcebergTable()
icebergTable.updateProperties().set("history.expire.max-snapshot-age-ms", "1").commit()

for (i <- 0 to 7) {
runDeltaSql(s"INSERT INTO ${testTableName} VALUES (${i})",
DeltaSQLConf.DELTA_UNIFORM_ICEBERG_SYNC_CONVERT_ENABLED.key -> "true")
}

// Sleep past snapshot retention duration
Thread.sleep(5)
withIcebergSparkSession { icebergSpark => {
icebergSpark.sql(s"REFRESH TABLE $testTableName")
val manifestListsBeforeExpiration = icebergSpark
.sql(s"SELECT * FROM default.${testTableName}.snapshots")
.select("manifest_list")
.collect()

assert(manifestListsBeforeExpiration.length == 8)

// Trigger snapshot expiration
runDeltaSql(s"OPTIMIZE ${testTableName}")
icebergSpark.sql(s"REFRESH TABLE $testTableName")

val manifestListsAfterExpiration = icebergSpark
.sql(s"SELECT * FROM default.${testTableName}.snapshots")
.select("manifest_list")
.collect()

assert(manifestListsAfterExpiration.length == 1)
// Manifests from earlier snapshots should not be removed
manifestListsBeforeExpiration.toStream.foreach(
manifestList => assert(
icebergTable.io().newInputFile(manifestList.get(0).asInstanceOf[String]).exists()))
}}
}
}

private def loadIcebergTable(): shadedForDelta.org.apache.iceberg.Table = {
withDeltaSparkSession { deltaSpark => {
val log = DeltaLog.forTable(deltaSpark, testTablePath)
val hiveCatalog = IcebergTransactionUtils.createHiveCatalog(
log.newDeltaHadoopConf()
)
val table = hiveCatalog.loadTable(
shadedForDelta.org.apache.iceberg.catalog.TableIdentifier
.of("default", testTableName)
)
table
}}
}

def runDeltaSql(sqlStr: String, conf: (String, String)*): Unit = {
def runDeltaSql(sqlStr: String): Unit = {
withDeltaSparkSession { deltaSpark =>
conf.foreach(c => deltaSpark.conf.set(c._1, c._2))
deltaSpark.sql(sqlStr)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
iceberg core must NOT delete any delta data files

---
.../iceberg/IncrementalFileCleanup.java | 8 +--
.../apache/iceberg/ReachableFileCleanup.java | 5 +-
.../apache/iceberg/TestRemoveSnapshots.java | 57 +++++++++++--------
3 files changed, 40 insertions(+), 30 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
index d894dcbf36d..ead7ea6b076 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
@@ -256,10 +256,10 @@ class IncrementalFileCleanup extends FileCleanupStrategy {
}
});

- Set<String> filesToDelete =
- findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration);
-
- deleteFiles(filesToDelete, "data");
+ // iceberg core MUST NOT delete any data files which are managed by delta
+ // Set<String> filesToDelete =
+ // findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration);
+ // deleteFiles(filesToDelete, "data");
LOG.warn("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete));
LOG.warn("Manifests Lists to delete: {}", Joiner.on(", ").join(manifestListsToDelete));
deleteFiles(manifestsToDelete, "manifest");
diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
index ccbee78e27b..da888a63b3d 100644
--- a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
+++ b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
@@ -72,8 +72,9 @@ class ReachableFileCleanup extends FileCleanupStrategy {
snapshotsAfterExpiration, deletionCandidates, currentManifests::add);

if (!manifestsToDelete.isEmpty()) {
- Set<String> dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests);
- deleteFiles(dataFilesToDelete, "data");
+ // iceberg core MUST NOT delete any data files which are managed by delta
+ // Set<String> dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests);
+ // deleteFiles(dataFilesToDelete, "data");
Set<String> manifestPathsToDelete =
manifestsToDelete.stream().map(ManifestFile::path).collect(Collectors.toSet());
deleteFiles(manifestPathsToDelete, "manifest");
diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/connector/iceberg-core/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
index 53e5af520d9..95fa8e41de1 100644
--- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
+++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
@@ -147,8 +147,9 @@ public class TestRemoveSnapshots extends TableTestBase {
secondSnapshot
.allManifests(table.io())
.get(0)
- .path(), // manifest contained only deletes, was dropped
- FILE_A.path()), // deleted
+ .path() // manifest contained only deletes, was dropped
+ // FILE_A.path() should NOT delete data files
+ ), // deleted
deletedFiles);
}

@@ -209,8 +210,9 @@ public class TestRemoveSnapshots extends TableTestBase {
.allManifests(table.io())
.get(0)
.path(), // manifest was rewritten for delete
- secondSnapshot.manifestListLocation(), // snapshot expired
- FILE_A.path()), // deleted
+ secondSnapshot.manifestListLocation() // snapshot expired
+ // FILE_A.path() should not delete any data files
+ ),
deletedFiles);
}

@@ -309,8 +311,9 @@ public class TestRemoveSnapshots extends TableTestBase {
Sets.newHashSet(
secondSnapshot.manifestListLocation(), // snapshot expired
Iterables.getOnlyElement(secondSnapshotManifests)
- .path(), // manifest is no longer referenced
- FILE_B.path()), // added, but rolled back
+ .path() // manifest is no longer referenced
+ // FILE_B.path() should not delete any data files
+ ),
deletedFiles);
}

@@ -686,7 +689,8 @@ public class TestRemoveSnapshots extends TableTestBase {

removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit();

- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString()));
+ Assert.assertTrue("FILE_A should NOT be deleted",
+ !deletedFiles.contains(FILE_A.path().toString()));
}

@Test
@@ -712,7 +716,8 @@ public class TestRemoveSnapshots extends TableTestBase {

removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit();

- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString()));
+ Assert.assertTrue("FILE_A should NOT be deleted",
+ !deletedFiles.contains(FILE_A.path().toString()));
}

@Test
@@ -749,8 +754,10 @@ public class TestRemoveSnapshots extends TableTestBase {

removeSnapshots(table).expireOlderThan(t4).deleteWith(deletedFiles::add).commit();

- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString()));
- Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString()));
+ Assert.assertTrue("FILE_A should NOT be deleted",
+ !deletedFiles.contains(FILE_A.path().toString()));
+ Assert.assertTrue("FILE_B should NOT be deleted",
+ !deletedFiles.contains(FILE_B.path().toString()));
}

@Test
@@ -824,9 +831,11 @@ public class TestRemoveSnapshots extends TableTestBase {
Sets.newHashSet(
"remove-snapshot-0", "remove-snapshot-1", "remove-snapshot-2", "remove-snapshot-3"));

- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString()));
- Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString()));
- Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0);
+ Assert.assertTrue("FILE_A should NOT be deleted",
+ !deletedFiles.contains(FILE_A.path().toString()));
+ Assert.assertTrue("FILE_B should NOT be deleted",
+ !deletedFiles.contains(FILE_B.path().toString()));
+ // Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0);
}

@Test
@@ -885,13 +894,13 @@ public class TestRemoveSnapshots extends TableTestBase {
Set<String> expectedDeletes = Sets.newHashSet();
expectedDeletes.add(snapshotA.manifestListLocation());

- // Files should be deleted of dangling staged snapshot
- snapshotB
- .addedDataFiles(table.io())
- .forEach(
- i -> {
- expectedDeletes.add(i.path().toString());
- });
+ // Files should NOT be deleted of dangling staged snapshot
+ // snapshotB
+ // .addedDataFiles(table.io())
+ // .forEach(
+ // i -> {
+ // expectedDeletes.add(i.path().toString());
+ // });

// ManifestList should be deleted too
expectedDeletes.add(snapshotB.manifestListLocation());
@@ -1144,10 +1153,10 @@ public class TestRemoveSnapshots extends TableTestBase {
removeSnapshots(table).expireOlderThan(fourthSnapshotTs).deleteWith(deletedFiles::add).commit();

Assert.assertEquals(
- "Should remove old delete files and delete file manifests",
+ "Should only delete file manifests",
ImmutableSet.builder()
- .add(FILE_A.path())
- .add(FILE_A_DELETES.path())
+ // .add(FILE_A.path())
+ // .add(FILE_A_DELETES.path())
.add(firstSnapshot.manifestListLocation())
.add(secondSnapshot.manifestListLocation())
.add(thirdSnapshot.manifestListLocation())
@@ -1501,7 +1510,7 @@ public class TestRemoveSnapshots extends TableTestBase {
expectedDeletes.addAll(manifestPaths(appendA, table.io()));
expectedDeletes.add(branchDelete.manifestListLocation());
expectedDeletes.addAll(manifestPaths(branchDelete, table.io()));
- expectedDeletes.add(FILE_A.path().toString());
+ // expectedDeletes.add(FILE_A.path().toString());

Assert.assertEquals(2, Iterables.size(table.snapshots()));
Assert.assertEquals(expectedDeletes, deletedFiles);
--
2.39.2 (Apple Git-143)
Loading