Skip to content

Commit

Permalink
Probe whether the metadata path is canonicalized in Spark
Browse files Browse the repository at this point in the history
## Description

(Cherry-pick of e0f0e91 to branch-2.4)

This issue fixes #1725.

The mechanism of this fix is to call the Spark internal method, which is used to generate metadata columns, to see if it will canonicalize spaces in a crafted path string. If the answer is yes, then we don't need to do anything on the Delta side; otherwise, we manually canonicalize the obtained metadata column.

Why don't use the Spark internal method on `FileToDvDescriptor`, so both sides of the join are either canonicalized or not-canonicalized? Because most Delta methods are expecting a canonicalized path, thus the returned DF must be canonicalized in all cases.

## How was this patch tested?
Existing tests didn't fail.
  • Loading branch information
xupefei committed Jul 24, 2023
1 parent 0af1034 commit c6100ac
Showing 1 changed file with 54 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, FileSourceMetadataAttribute}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, FileSourceMetadataAttribute, GenericInternalRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.FileFormat.{FILE_PATH, METADATA_NAME}
Expand Down Expand Up @@ -339,13 +340,18 @@ object DeletionVectorBitmapGenerator {
candidateFiles: Seq[AddFile],
condition: Expression)
: Seq[DeletionVectorResult] = {
// TODO: fix this to work regardless of whether Spark encodes or doesn't encode
// _metadata.file_path. See https://github.com/delta-io/delta/issues/1725
val uriEncode = DeltaUDF.stringFromString(path => {
new Path(path).toUri.toString
})
val matchedRowsDf = targetDf
.withColumn(FILE_NAME_COL, uriEncode(col(s"${METADATA_NAME}.${FILE_PATH}")))
// If the metadata column is not canonicalized, we must canonicalize them before use.
val targetDfWithMetadataColumn = if (sparkMetadataFilePathIsCanonicalized) {
targetDf.withColumn(FILE_NAME_COL, col(s"${METADATA_NAME}.${FILE_PATH}"))
} else {
val canonicalizedPathStringMap = buildCanonicalizedPathStringMap(txn.deltaLog, candidateFiles)
val broadcastCanonicalizedPathStringMap =
sparkSession.sparkContext.broadcast(canonicalizedPathStringMap)

val lookupPathUdf = DeltaUDF.stringFromString(broadcastCanonicalizedPathStringMap.value(_))
targetDf.withColumn(FILE_NAME_COL, lookupPathUdf(col(s"${METADATA_NAME}.${FILE_PATH}")))
}
val matchedRowsDf = targetDfWithMetadataColumn
// Filter after getting input file name as the filter might introduce a join and we
// cannot get input file name on join's output.
.filter(new Column(condition))
Expand All @@ -358,7 +364,9 @@ object DeletionVectorBitmapGenerator {
val filePathToDV = candidateFiles.map { add =>
val serializedDV = Option(add.deletionVector).map(dvd => JsonUtils.toJson(dvd))
// Paths in the metadata column are canonicalized. Thus we must canonicalize the DV path.
FileToDvDescriptor(absolutePath(basePath, add.path).toUri.toString, serializedDV)
FileToDvDescriptor(
SparkPath.fromPath(absolutePath(basePath, add.path)).urlEncoded,
serializedDV)
}
val filePathToDVDf = sparkSession.createDataset(filePathToDV)

Expand All @@ -379,6 +387,43 @@ object DeletionVectorBitmapGenerator {

DeletionVectorBitmapGenerator.buildDeletionVectors(sparkSession, df, txn.deltaLog, txn)
}

private def buildCanonicalizedPathStringMap(
log: DeltaLog,
addFiles: Seq[AddFile]): Map[String, String] = {
val basePath = log.dataPath.toString
addFiles.map { add =>
val absPath = absolutePath(basePath, add.path)
absPath.toString -> SparkPath.fromPath(absPath).urlEncoded
}.toMap
}

/**
* In Spark 3.4 the file path metadata column is not canonicalized but it is in Spark 3.4.1. To
* make Delta Lake works with both Spark versions, we must use Spark's internal path
* transformation method to transform our paths here. This method will return a un-canonicalized
* path in Spark 3.4 and a canonicalized one in Spark 3.4.1.
*
* Related issue: https://github.com/delta-io/delta/issues/1725.
*/
private lazy val sparkMetadataFilePathIsCanonicalized: Boolean = {
try {
val probeString = "file:/path with space/data.parquet"
val row = FileFormat.updateMetadataInternalRow(
new GenericInternalRow(size = 1),
Seq(FileFormat.FILE_PATH),
filePath = new Path(probeString),
fileSize = 0L,
fileBlockStart = 0L,
fileBlockLength = 0L,
fileModificationTime = 0L)
row.getUTF8String(0).toString != probeString
} catch {
// method has changed (for example in Spark 3.5 onwards which does not have this bug).
// Return true in this case.
case _: NoSuchMethodError | _: NoClassDefFoundError => true
}
}
}

/**
Expand Down

0 comments on commit c6100ac

Please sign in to comment.