Skip to content

Commit

Permalink
Refactor Vacuum code to properly handle path url encoding (#3678)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This is intended to handle special characters in the table path. For
context, DV team recently made a change to include special characters in
all paths used by tests.

As of today, the paths we get from listing are not url encoded -- they
get url-encoded later in the logic. However, paths contained in delta
log files are already url encoded. To keep these two things compatible,
this change makes the file names from listing to be url encoded and
changes the later logic to not url encode again.

## How was this patch tested?

Existing tests

## Does this PR introduce _any_ user-facing changes?

NO
  • Loading branch information
rajeshparangi authored Sep 17, 2024
1 parent 74d19a5 commit 8e97417
Showing 1 changed file with 32 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
None
}
}
.map { f =>
// Below logic will make paths url-encoded
SerializableFileStatus(pathStringtoUrlEncodedString(f.path), f.length, f.isDir,
f.modificationTime)
}
}

/**
Expand Down Expand Up @@ -273,6 +278,11 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
),
fileListingParallelism = Option(parallelism)
)
.map { f =>
// Below logic will make paths url-encoded
SerializableFileStatus(pathStringtoUrlEncodedString(f.path), f.length, f.isDir,
f.modificationTime)
}
}
val allFilesAndDirs = allFilesAndDirsWithDuplicates.groupByKey(_.path)
.mapGroups { (k, v) =>
Expand All @@ -299,6 +309,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
// 5. We subtract all the valid files and tombstones in our state
// 6. We filter all paths with a count of 1, which will correspond to files not in the
// state, and empty directories. We can safely delete all of these
val canonicalizedBasePath = SparkPath.fromPathString(basePath).urlEncoded
val diff = allFilesAndDirs
.where(col("modificationTime") < deleteBeforeTimestamp || col("isDir"))
.mapPartitions { fileStatusIterator =>
Expand All @@ -307,16 +318,18 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
fileStatusIterator.flatMap { fileStatus =>
if (fileStatus.isDir) {
Iterator.single(FileNameAndSize(
relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true), 0L))
relativize(urlEncodedStringToPath(fileStatus.path), fs,
reservoirBase, isDir = true), 0L))
} else {
val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
val dirs = getAllSubdirs(canonicalizedBasePath, fileStatus.path, fs)
val dirsWithSlash = dirs.map { p =>
val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
val relativizedPath = relativize(urlEncodedStringToPath(p), fs,
reservoirBase, isDir = true)
FileNameAndSize(relativizedPath, 0L)
}
dirsWithSlash ++ Iterator(
FileNameAndSize(relativize(
fileStatus.getHadoopPath, fs, reservoirBase, isDir = false),
urlEncodedStringToPath(fileStatus.path), fs, reservoirBase, isDir = false),
fileStatus.length))
}
}
Expand All @@ -337,9 +350,9 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
.select(col("path"))
.as[String]
.map { relativePath =>
assert(!stringToPath(relativePath).isAbsolute,
assert(!urlEncodedStringToPath(relativePath).isAbsolute,
"Shouldn't have any absolute paths for deletion here.")
pathToString(DeltaFileOperations.absolutePath(basePath, relativePath))
pathToUrlEncodedString(DeltaFileOperations.absolutePath(basePath, relativePath))
}
val timeTakenToIdentifyEligibleFiles =
System.currentTimeMillis() - startTimeToIdentifyEligibleFiles
Expand Down Expand Up @@ -369,7 +382,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories " +
log"that are safe to delete. Vacuum stats: ${MDC(DeltaLogKeys.STATS, stats)}")

return diffFiles.map(f => stringToPath(f).toString).toDF("path")
return diffFiles.map(f => urlEncodedStringToPath(f).toString).toDF("path")
}
logVacuumStart(
spark,
Expand Down Expand Up @@ -574,7 +587,7 @@ trait VacuumCommandImpl extends DeltaCommand {
fs: FileSystem,
reservoirBase: Path,
isDir: Boolean): String = {
pathToString(DeltaFileOperations.tryRelativizePath(fs, reservoirBase, path))
pathToUrlEncodedString(DeltaFileOperations.tryRelativizePath(fs, reservoirBase, path))
}

/**
Expand All @@ -601,21 +614,22 @@ trait VacuumCommandImpl extends DeltaCommand {
diff.repartition(parallelPartitions).mapPartitions { files =>
val fs = new Path(basePath).getFileSystem(hadoopConf.value.value)
val filesDeletedPerPartition =
files.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
files.map(p => urlEncodedStringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
Iterator(filesDeletedPerPartition)
}.collect().sum
} else {
val fs = new Path(basePath).getFileSystem(hadoopConf.value.value)
val fileResultSet = diff.toLocalIterator().asScala
fileResultSet.map(p => stringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
fileResultSet.map(p => urlEncodedStringToPath(p)).count(f => tryDeleteNonRecursive(fs, f))
}
}

// scalastyle:off pathfromuri
protected def stringToPath(path: String): Path = new Path(new URI(path))
// scalastyle:on pathfromuri
protected def urlEncodedStringToPath(path: String): Path = SparkPath.fromUrlString(path).toPath

protected def pathToUrlEncodedString(path: Path): String = SparkPath.fromPath(path).toString

protected def pathToString(path: Path): String = path.toUri.toString
protected def pathStringtoUrlEncodedString(path: String) =
SparkPath.fromPathString(path).toString

/** Returns the relative path of a file action or None if the file lives outside of the table. */
protected def getActionRelativePath(
Expand All @@ -631,18 +645,18 @@ trait VacuumCommandImpl extends DeltaCommand {
fs: FileSystem,
basePath: Path,
relativizeIgnoreError: Boolean): Option[String] = {
val filePath = stringToPath(path)
val filePath = urlEncodedStringToPath(path)
if (filePath.isAbsolute) {
val maybeRelative =
DeltaFileOperations.tryRelativizePath(fs, basePath, filePath, relativizeIgnoreError)
if (maybeRelative.isAbsolute) {
// This file lives outside the directory of the table.
None
} else {
Some(pathToString(maybeRelative))
Some(pathToUrlEncodedString(maybeRelative))
}
} else {
Some(pathToString(filePath))
Some(pathToUrlEncodedString(filePath))
}
}

Expand Down Expand Up @@ -686,7 +700,7 @@ trait VacuumCommandImpl extends DeltaCommand {
case Some(dv) if dv.isOnDisk =>
if (dv.isRelative) {
// We actually want a relative path here.
Some((pathToString(dv.absolutePath(new Path("."))), dv.sizeInBytes))
Some((pathToUrlEncodedString(dv.absolutePath(new Path("."))), dv.sizeInBytes))
} else {
assert(dv.isAbsolute)
// This is never going to be a path relative to `basePath` for DVs.
Expand Down

0 comments on commit 8e97417

Please sign in to comment.