From 214fd2883fc3efa3bb477c1747c1cbe2258cf5d6 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Thu, 19 Sep 2024 09:07:02 -0600 Subject: [PATCH] Revert "[UNIFORM] Disable cleanup of files in expire snapshots via API" This reverts commit b51b5b45595e46748339e42dbb0792e8b485a234. --- .../IcebergConversionTransaction.scala | 2 +- .../sql/delta/ConvertToIcebergSuite.scala | 73 +------- ...must-not-delete-any-delta-data-files.patch | 177 ++++++++++++++++++ 3 files changed, 181 insertions(+), 71 deletions(-) create mode 100644 icebergShaded/iceberg_src_patches/0002-iceberg-core-must-not-delete-any-delta-data-files.patch diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala index deccf470b18..4c2949c8565 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala @@ -255,7 +255,7 @@ class IcebergConversionTransaction( } def getExpireSnapshotHelper(): ExpireSnapshotHelper = { - val ret = new ExpireSnapshotHelper(txn.expireSnapshots().cleanExpiredFiles(false)) + val ret = new ExpireSnapshotHelper(txn.expireSnapshots()) fileUpdates += ret ret } diff --git a/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala b/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala index ad60dcc6061..1f645a30db4 100644 --- a/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala +++ b/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala @@ -25,11 +25,9 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkContext import org.apache.spark.sql.{QueryTest, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogStorageFormat} import org.apache.spark.sql.delta.actions.Metadata -import org.apache.spark.sql.delta.icebergShaded.IcebergTransactionUtils -import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType, StructField} import org.apache.spark.util.Utils /** @@ -111,7 +109,6 @@ class ConvertToIcebergSuite extends QueryTest with Eventually { runDeltaSql( s"""CREATE TABLE `${testTableName}` (col1 INT) USING DELTA |TBLPROPERTIES ( - | 'delta.enableIcebergCompatV2' = 'true', | 'delta.columnMapping.mode' = 'name', | 'delta.universalFormat.enabledFormats' = 'iceberg' |)""".stripMargin) @@ -126,7 +123,6 @@ class ConvertToIcebergSuite extends QueryTest with Eventually { withDefaultTablePropsInSQLConf { deltaSpark.range(10).write.format("delta") .option("path", testTablePath) - .option("delta.enableIcebergCompatV2", "true") .saveAsTable(testTableName) } } @@ -134,77 +130,14 @@ class ConvertToIcebergSuite extends QueryTest with Eventually { deltaSpark.range(10, 20, 1) .write.format("delta").mode("append") .option("path", testTablePath) - .option("delta.enableIcebergCompatV2", "true") .saveAsTable(testTableName) } verifyReadWithIceberg(testTableName, 0 to 19 map (Row(_))) } } - test("Expire Snapshots") { - if (hmsReady(PORT)) { - runDeltaSql( - s"""CREATE TABLE `${testTableName}` (col1 INT) USING DELTA - |TBLPROPERTIES ( - | 'delta.enableIcebergCompatV2' = 'true', - | 'delta.columnMapping.mode' = 'name', - | 'delta.universalFormat.enabledFormats' = 'iceberg' - |)""".stripMargin) - - val icebergTable = loadIcebergTable() - icebergTable.updateProperties().set("history.expire.max-snapshot-age-ms", "1").commit() - - for (i <- 0 to 7) { - runDeltaSql(s"INSERT INTO ${testTableName} VALUES (${i})", - DeltaSQLConf.DELTA_UNIFORM_ICEBERG_SYNC_CONVERT_ENABLED.key -> "true") - } - - // Sleep past snapshot retention duration - Thread.sleep(5) - withIcebergSparkSession { icebergSpark => { - icebergSpark.sql(s"REFRESH TABLE $testTableName") - val manifestListsBeforeExpiration = icebergSpark - .sql(s"SELECT * FROM default.${testTableName}.snapshots") - .select("manifest_list") - .collect() - - assert(manifestListsBeforeExpiration.length == 8) - - // Trigger snapshot expiration - runDeltaSql(s"OPTIMIZE ${testTableName}") - icebergSpark.sql(s"REFRESH TABLE $testTableName") - - val manifestListsAfterExpiration = icebergSpark - .sql(s"SELECT * FROM default.${testTableName}.snapshots") - .select("manifest_list") - .collect() - - assert(manifestListsAfterExpiration.length == 1) - // Manifests from earlier snapshots should not be removed - manifestListsBeforeExpiration.toStream.foreach( - manifestList => assert( - icebergTable.io().newInputFile(manifestList.get(0).asInstanceOf[String]).exists())) - }} - } - } - - private def loadIcebergTable(): shadedForDelta.org.apache.iceberg.Table = { - withDeltaSparkSession { deltaSpark => { - val log = DeltaLog.forTable(deltaSpark, testTablePath) - val hiveCatalog = IcebergTransactionUtils.createHiveCatalog( - log.newDeltaHadoopConf() - ) - val table = hiveCatalog.loadTable( - shadedForDelta.org.apache.iceberg.catalog.TableIdentifier - .of("default", testTableName) - ) - table - }} - } - - def runDeltaSql(sqlStr: String, conf: (String, String)*): Unit = { + def runDeltaSql(sqlStr: String): Unit = { withDeltaSparkSession { deltaSpark => - conf.foreach(c => deltaSpark.conf.set(c._1, c._2)) deltaSpark.sql(sqlStr) } } diff --git a/icebergShaded/iceberg_src_patches/0002-iceberg-core-must-not-delete-any-delta-data-files.patch b/icebergShaded/iceberg_src_patches/0002-iceberg-core-must-not-delete-any-delta-data-files.patch new file mode 100644 index 00000000000..a181f065040 --- /dev/null +++ b/icebergShaded/iceberg_src_patches/0002-iceberg-core-must-not-delete-any-delta-data-files.patch @@ -0,0 +1,177 @@ +iceberg core must NOT delete any delta data files + +--- + .../iceberg/IncrementalFileCleanup.java | 8 +-- + .../apache/iceberg/ReachableFileCleanup.java | 5 +- + .../apache/iceberg/TestRemoveSnapshots.java | 57 +++++++++++-------- + 3 files changed, 40 insertions(+), 30 deletions(-) + +diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java +index d894dcbf36d..ead7ea6b076 100644 +--- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java ++++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java +@@ -256,10 +256,10 @@ class IncrementalFileCleanup extends FileCleanupStrategy { + } + }); + +- Set filesToDelete = +- findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration); +- +- deleteFiles(filesToDelete, "data"); ++ // iceberg core MUST NOT delete any data files which are managed by delta ++ // Set filesToDelete = ++ // findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration); ++ // deleteFiles(filesToDelete, "data"); + LOG.warn("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete)); + LOG.warn("Manifests Lists to delete: {}", Joiner.on(", ").join(manifestListsToDelete)); + deleteFiles(manifestsToDelete, "manifest"); +diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java b/connector/iceberg-core/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java +index ccbee78e27b..da888a63b3d 100644 +--- a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java ++++ b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java +@@ -72,8 +72,9 @@ class ReachableFileCleanup extends FileCleanupStrategy { + snapshotsAfterExpiration, deletionCandidates, currentManifests::add); + + if (!manifestsToDelete.isEmpty()) { +- Set dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests); +- deleteFiles(dataFilesToDelete, "data"); ++ // iceberg core MUST NOT delete any data files which are managed by delta ++ // Set dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests); ++ // deleteFiles(dataFilesToDelete, "data"); + Set manifestPathsToDelete = + manifestsToDelete.stream().map(ManifestFile::path).collect(Collectors.toSet()); + deleteFiles(manifestPathsToDelete, "manifest"); +diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/connector/iceberg-core/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +index 53e5af520d9..95fa8e41de1 100644 +--- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java ++++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java +@@ -147,8 +147,9 @@ public class TestRemoveSnapshots extends TableTestBase { + secondSnapshot + .allManifests(table.io()) + .get(0) +- .path(), // manifest contained only deletes, was dropped +- FILE_A.path()), // deleted ++ .path() // manifest contained only deletes, was dropped ++ // FILE_A.path() should NOT delete data files ++ ), // deleted + deletedFiles); + } + +@@ -209,8 +210,9 @@ public class TestRemoveSnapshots extends TableTestBase { + .allManifests(table.io()) + .get(0) + .path(), // manifest was rewritten for delete +- secondSnapshot.manifestListLocation(), // snapshot expired +- FILE_A.path()), // deleted ++ secondSnapshot.manifestListLocation() // snapshot expired ++ // FILE_A.path() should not delete any data files ++ ), + deletedFiles); + } + +@@ -309,8 +311,9 @@ public class TestRemoveSnapshots extends TableTestBase { + Sets.newHashSet( + secondSnapshot.manifestListLocation(), // snapshot expired + Iterables.getOnlyElement(secondSnapshotManifests) +- .path(), // manifest is no longer referenced +- FILE_B.path()), // added, but rolled back ++ .path() // manifest is no longer referenced ++ // FILE_B.path() should not delete any data files ++ ), + deletedFiles); + } + +@@ -686,7 +689,8 @@ public class TestRemoveSnapshots extends TableTestBase { + + removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); + +- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); ++ Assert.assertTrue("FILE_A should NOT be deleted", ++ !deletedFiles.contains(FILE_A.path().toString())); + } + + @Test +@@ -712,7 +716,8 @@ public class TestRemoveSnapshots extends TableTestBase { + + removeSnapshots(table).expireOlderThan(t3).deleteWith(deletedFiles::add).commit(); + +- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); ++ Assert.assertTrue("FILE_A should NOT be deleted", ++ !deletedFiles.contains(FILE_A.path().toString())); + } + + @Test +@@ -749,8 +754,10 @@ public class TestRemoveSnapshots extends TableTestBase { + + removeSnapshots(table).expireOlderThan(t4).deleteWith(deletedFiles::add).commit(); + +- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); +- Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); ++ Assert.assertTrue("FILE_A should NOT be deleted", ++ !deletedFiles.contains(FILE_A.path().toString())); ++ Assert.assertTrue("FILE_B should NOT be deleted", ++ !deletedFiles.contains(FILE_B.path().toString())); + } + + @Test +@@ -824,9 +831,11 @@ public class TestRemoveSnapshots extends TableTestBase { + Sets.newHashSet( + "remove-snapshot-0", "remove-snapshot-1", "remove-snapshot-2", "remove-snapshot-3")); + +- Assert.assertTrue("FILE_A should be deleted", deletedFiles.contains(FILE_A.path().toString())); +- Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString())); +- Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0); ++ Assert.assertTrue("FILE_A should NOT be deleted", ++ !deletedFiles.contains(FILE_A.path().toString())); ++ Assert.assertTrue("FILE_B should NOT be deleted", ++ !deletedFiles.contains(FILE_B.path().toString())); ++ // Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0); + } + + @Test +@@ -885,13 +894,13 @@ public class TestRemoveSnapshots extends TableTestBase { + Set expectedDeletes = Sets.newHashSet(); + expectedDeletes.add(snapshotA.manifestListLocation()); + +- // Files should be deleted of dangling staged snapshot +- snapshotB +- .addedDataFiles(table.io()) +- .forEach( +- i -> { +- expectedDeletes.add(i.path().toString()); +- }); ++ // Files should NOT be deleted of dangling staged snapshot ++ // snapshotB ++ // .addedDataFiles(table.io()) ++ // .forEach( ++ // i -> { ++ // expectedDeletes.add(i.path().toString()); ++ // }); + + // ManifestList should be deleted too + expectedDeletes.add(snapshotB.manifestListLocation()); +@@ -1144,10 +1153,10 @@ public class TestRemoveSnapshots extends TableTestBase { + removeSnapshots(table).expireOlderThan(fourthSnapshotTs).deleteWith(deletedFiles::add).commit(); + + Assert.assertEquals( +- "Should remove old delete files and delete file manifests", ++ "Should only delete file manifests", + ImmutableSet.builder() +- .add(FILE_A.path()) +- .add(FILE_A_DELETES.path()) ++ // .add(FILE_A.path()) ++ // .add(FILE_A_DELETES.path()) + .add(firstSnapshot.manifestListLocation()) + .add(secondSnapshot.manifestListLocation()) + .add(thirdSnapshot.manifestListLocation()) +@@ -1501,7 +1510,7 @@ public class TestRemoveSnapshots extends TableTestBase { + expectedDeletes.addAll(manifestPaths(appendA, table.io())); + expectedDeletes.add(branchDelete.manifestListLocation()); + expectedDeletes.addAll(manifestPaths(branchDelete, table.io())); +- expectedDeletes.add(FILE_A.path().toString()); ++ // expectedDeletes.add(FILE_A.path().toString()); + + Assert.assertEquals(2, Iterables.size(table.snapshots())); + Assert.assertEquals(expectedDeletes, deletedFiles); +-- +2.39.2 (Apple Git-143)