diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index b72fbd2c50..0e827a8f7a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.delta.commands // scalastyle:off import.ordering.noEmptyLine +import java.io.File import java.net.URI import java.util.Date import java.util.concurrent.TimeUnit @@ -25,7 +26,7 @@ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile} import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.util.DeltaFileOperations +import org.apache.spark.sql.delta.util.{DeltaFileOperations, FileNames} import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.hadoop.conf.Configuration @@ -38,7 +39,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric import org.apache.spark.sql.functions.{col, count, lit, replace, startswith, substr, sum} import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType} -import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock} +import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock, Utils} /** * Vacuums the table by clearing all untracked files and folders within this table. @@ -485,11 +486,12 @@ trait VacuumCommandImpl extends DeltaCommand { metrics("numFilesToDelete").set(diff.count()) metrics("sizeOfDataToDelete").set(sizeOfDataToDelete) txn.registerSQLMetrics(spark, metrics) - txn.commit(actions = Seq(), DeltaOperations.VacuumStart( + val version = txn.commit(actions = Seq(), DeltaOperations.VacuumStart( checkEnabled, specifiedRetentionMillis, defaultRetentionMillis )) + setCommitClock(deltaLog, version) } } @@ -529,9 +531,10 @@ trait VacuumCommandImpl extends DeltaCommand { metrics("numVacuumedDirectories").set(dirCounts.get) txn.registerSQLMetrics(spark, metrics) } - txn.commit(actions = Seq(), DeltaOperations.VacuumEnd( + val version = txn.commit(actions = Seq(), DeltaOperations.VacuumEnd( status )) + setCommitClock(deltaLog, version) } if (filesDeleted.nonEmpty) { @@ -539,6 +542,14 @@ trait VacuumCommandImpl extends DeltaCommand { s"of ${dirCounts.get} directories.") } } + protected def setCommitClock(deltaLog: DeltaLog, version: Long) = { + // This is done to make sure that the commit timestamp reflects the one provided by the clock + // object. + if (Utils.isTesting) { + val f = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri) + f.setLastModified(deltaLog.clock.getTimeMillis()) + } + } /** * Attempts to relativize the `path` with respect to the `reservoirBase` and converts the path to @@ -598,7 +609,15 @@ trait VacuumCommandImpl extends DeltaCommand { fs: FileSystem, basePath: Path, relativizeIgnoreError: Boolean): Option[String] = { - val filePath = stringToPath(action.path) + getRelativePath(action.path, fs, basePath, relativizeIgnoreError) + } + /** Returns the relative path of a file or None if the file lives outside of the table. */ + protected def getRelativePath( + path: String, + fs: FileSystem, + basePath: Path, + relativizeIgnoreError: Boolean): Option[String] = { + val filePath = stringToPath(path) if (filePath.isAbsolute) { val maybeRelative = DeltaFileOperations.tryRelativizePath(fs, basePath, filePath, relativizeIgnoreError) @@ -631,16 +650,16 @@ trait VacuumCommandImpl extends DeltaCommand { }.getOrElse(Seq.empty) val deletionVectorPath = - getDeletionVectorRelativePath(action).map(pathToString) + getDeletionVectorRelativePathAndSize(action).map(_._1) paths ++ deletionVectorPath.toSeq } /** * Returns the path of the on-disk deletion vector if it is stored relative to the - * `basePath` otherwise `None`. + * `basePath` and it's size otherwise `None`. */ - protected def getDeletionVectorRelativePath(action: FileAction): Option[Path] = { + protected def getDeletionVectorRelativePathAndSize(action: FileAction): Option[(String, Long)] = { val dv = action match { case a: AddFile if a.deletionVector != null => Some(a.deletionVector) @@ -653,7 +672,7 @@ trait VacuumCommandImpl extends DeltaCommand { case Some(dv) if dv.isOnDisk => if (dv.isRelative) { // We actually want a relative path here. - Some(dv.absolutePath(new Path("."))) + Some((pathToString(dv.absolutePath(new Path("."))), dv.sizeInBytes)) } else { assert(dv.isAbsolute) // This is never going to be a path relative to `basePath` for DVs. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala index 346ff68324..d539e4d4a9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils import org.apache.spark.sql.delta.test.DeltaTestImplicits._ -import org.apache.spark.sql.delta.util.DeltaFileOperations +import org.apache.spark.sql.delta.util.{DeltaFileOperations, FileNames} import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path @@ -161,13 +161,15 @@ trait DeltaVacuumSuiteBase extends QueryTest expectedError: Class[T], msg: Seq[String]) extends Operation + private final val RANDOM_FILE_CONTENT = "gibberish" + protected def createFile( reservoirBase: String, filePath: String, file: File, clock: ManualClock, partitionValues: Map[String, String] = Map.empty): AddFile = { - FileUtils.write(file, "gibberish") + FileUtils.write(file, RANDOM_FILE_CONTENT) file.setLastModified(clock.getTimeMillis()) createTestAddFile( encodedPath = filePath, @@ -188,11 +190,13 @@ trait DeltaVacuumSuiteBase extends QueryTest if (commit) { if (!DeltaTableUtils.isDeltaTable(spark, new Path(basePath))) { // initialize the table - deltaLog.startTransaction().commitManually() + val version = deltaLog.startTransaction().commitManually() + setCommitClock(deltaLog, version, clock) } val txn = deltaLog.startTransaction() val action = createFile(basePath, sanitizedPath, file, clock, partitionValues) - txn.commit(Seq(action), Write(SaveMode.Append)) + val version = txn.commit(Seq(action), Write(SaveMode.Append)) + setCommitClock(deltaLog, version, clock) } else { createFile(basePath, path, file, clock) } @@ -213,8 +217,10 @@ trait DeltaVacuumSuiteBase extends QueryTest ) txn.registerSQLMetrics(spark, metrics) val encodedPath = new Path(path).toUri.toString - txn.commit(Seq(RemoveFile(encodedPath, Option(clock.getTimeMillis()))), + val size = Some(RANDOM_FILE_CONTENT.length.toLong) + val version = txn.commit(Seq(RemoveFile(encodedPath, Option(clock.getTimeMillis()), size = size)), Delete(Seq(Literal.TrueLiteral))) + setCommitClock(deltaLog, version, clock) // scalastyle:on case e: ExecuteVacuumInSQL => Given(s"*** Executing SQL: ${e.sql}") @@ -340,6 +346,11 @@ trait DeltaVacuumSuiteBase extends QueryTest changes.flatMap(_._2).collect { case a: AddCDCFile => a }.toList } + protected def setCommitClock(deltaLog: DeltaLog, version: Long, clock: ManualClock) = { + val f = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri) + f.setLastModified(clock.getTimeMillis()) + } + protected def testCDCVacuumForUpdateMerge(): Unit = { withSQLConf( DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true", @@ -578,7 +589,9 @@ class DeltaVacuumSuite val schema = new StructType().add("_underscore_col_", IntegerType).add("n", IntegerType) val metadata = Metadata(schemaString = schema.json, partitionColumns = Seq("_underscore_col_")) - txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true)) + val version = + txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true)) + setCommitClock(deltaLog, version, clock) gcTest(deltaLog, clock)( CreateFile("file1.txt", commitToActionLog = true, Map("_underscore_col_" -> "10")), CreateFile("_underscore_col_=10/test.txt", true, Map("_underscore_col_" -> "10")), @@ -599,7 +612,9 @@ class DeltaVacuumSuite val schema = new StructType().add("_underscore_col_", IntegerType).add("n", IntegerType) val metadata = Metadata(schemaString = schema.json, partitionColumns = Seq("_underscore_col_")) - txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true)) + val version = + txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true)) + setCommitClock(deltaLog, version, clock) val inventorySchema = StructType( Seq( StructField("file", StringType), @@ -631,7 +646,9 @@ class DeltaVacuumSuite // Vacuum should consider partition folders even for clean up even though it starts with `_` val metadata = Metadata(schemaString = schema.json, partitionColumns = Seq("_underscore_col_")) - txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true)) + val version = + txn.commit(metadata :: Nil, DeltaOperations.CreateTable(metadata, isManaged = true)) + setCommitClock(deltaLog, version, clock) // Create a Seq of Rows containing the data val data = Seq( Row(s"${deltaLog.dataPath}", 300000L, true, 0L), @@ -1258,6 +1275,7 @@ class DeltaVacuumSuite withEnvironment { (dir, clock) => spark.range(2).write.format("delta").save(dir.getAbsolutePath) val deltaLog = DeltaLog.forTable(spark, dir, clock) + setCommitClock(deltaLog, 0L, clock) val expectedReturn = if (isDryRun) { // dry run returns files that will be deleted Seq(new Path(dir.getAbsolutePath, "file1.txt").toString)