Skip to content

Commit

Permalink
Data skipping and column pruning in merge
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Jun 21, 2023
1 parent cb89436 commit e27029d
Show file tree
Hide file tree
Showing 6 changed files with 514 additions and 167 deletions.
80 changes: 80 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.planning.NodeWithOnlyDeterministicProjectAndFilter
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform}
import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -325,6 +327,84 @@ object DeltaTableUtils extends PredicateHelper
}
}

/**
* Replace the file index in a logical plan and return the updated plan.
* It's a common pattern that, in Delta commands, we use data skipping to determine a subset of
* files that can be affected by the command, so we replace the whole-table file index in the
* original logical plan with a new index of potentially affected files, while everything else in
* the original plan, e.g., resolved references, remain unchanged.
*
* Many Delta meta-queries involve nondeterminstic functions, which interfere with automatic
* column pruning, so columns can be manually pruned from the new scan. Note that partition
* columns can never be dropped even if they're not referenced in the rest of the query.
*
* @param spark the spark session to use
* @param target the logical plan in which we replace the file index
* @param fileIndex the new file index
* @param columnsToDrop columns to drop from the scan
* @param newOutput If specified, new logical output to replace the current LogicalRelation.
* Used for schema evolution to produce the new schema-evolved types from
* old files, because `target` will have the old types.
*/
def replaceFileIndex(
spark: SparkSession,
target: LogicalPlan,
fileIndex: FileIndex,
columnsToDrop: Seq[String],
newOutput: Option[Seq[AttributeReference]]): LogicalPlan = {
val resolver = spark.sessionState.analyzer.resolver

var actualNewOutput = newOutput
var hasChar = false
var newTarget = target transformDown {
case l @ LogicalRelation(hfsr: HadoopFsRelation, _, _, _) =>
val finalOutput = actualNewOutput.getOrElse(l.output).filterNot { col =>
columnsToDrop.exists(resolver(_, col.name))
}

// If the output columns were changed e.g. by schema evolution, we need to update
// the relation to expose all the columns that are expected after schema evolution.
val newDataSchema = StructType(finalOutput.map(attr =>
StructField(attr.name, attr.dataType, attr.nullable, attr.metadata)))
val newBaseRelation = hfsr.copy(
location = fileIndex, dataSchema = newDataSchema)(
hfsr.sparkSession)
l.copy(relation = newBaseRelation, output = finalOutput)

case p @ Project(projectList, _) =>
def hasCharPadding(e: Expression): Boolean = e.exists {
case s: StaticInvoke => s.staticObject == classOf[CharVarcharCodegenUtils] &&
s.functionName == "readSidePadding"
case _ => false
}
val charColMapping = AttributeMap(projectList.collect {
case a: Alias if hasCharPadding(a.child) && a.references.size == 1 =>
hasChar = true
val tableCol = a.references.head.asInstanceOf[AttributeReference]
a.toAttribute -> tableCol
})
actualNewOutput = newOutput.map(_.map { attr =>
charColMapping.get(attr).map { tableCol =>
attr.withExprId(tableCol.exprId)
}.getOrElse(attr)
})
p
}

if (hasChar) {
newTarget = newTarget.transformUp {
case p @ Project(projectList, child) =>
val newProjectList = projectList.filter { e =>
// Spark does char type read-side padding via an additional Project over the scan node,
// and we need to apply column pruning for the Project as well, otherwise the Project
// will contain missing attributes.
e.references.subsetOf(child.outputSet)
}
p.copy(projectList = newProjectList)
}
}
newTarget
}

/**
* Update FileFormat for a plan and return the updated plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,14 @@ import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.{AnalysisHelper, SetAccumulator}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, BasePredicate, Expression, Literal, NamedExpression, PredicateHelper, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataTypes, LongType, StructType}

Expand Down Expand Up @@ -75,7 +72,6 @@ case class MergeIntoCommand(
notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause],
notMatchedBySourceClauses: Seq[DeltaMergeIntoNotMatchedBySourceClause],
migratedSchema: Option[StructType]) extends MergeIntoCommandBase
with PredicateHelper
with AnalysisHelper
with ImplicitMetadataOperation {

Expand All @@ -92,26 +88,9 @@ case class MergeIntoCommand(
AttributeReference("num_deleted_rows", LongType)(),
AttributeReference("num_inserted_rows", LongType)())

/**
* Map to get target output attributes by name.
* The case sensitivity of the map is set accordingly to Spark configuration.
*/
@transient private lazy val targetOutputAttributesMap: Map[String, Attribute] = {
val attrMap: Map[String, Attribute] = target
.outputSet.view
.map(attr => attr.name -> attr).toMap
if (conf.caseSensitiveAnalysis) {
attrMap
} else {
CaseInsensitiveMap(attrMap)
}
}

/** Whether this merge statement has only a single insert (NOT MATCHED) clause. */
private def isSingleInsertOnly: Boolean =
matchedClauses.isEmpty && notMatchedBySourceClauses.isEmpty && notMatchedClauses.length == 1
/** Whether this merge statement has no insert (NOT MATCHED) clause. */
private def hasNoInserts: Boolean = notMatchedClauses.isEmpty

// We over-count numTargetRowsDeleted when there are multiple matches;
// this is the amount of the overcount, so we can subtract it to get a correct final metric.
Expand Down Expand Up @@ -217,29 +196,30 @@ case class MergeIntoCommand(
status = "MERGE operation - scanning files for matches",
sqlMetricName = "scanTimeMs") {

val columnComparator = spark.sessionState.analyzer.resolver

// Accumulator to collect all the distinct touched files
val touchedFilesAccum = new SetAccumulator[String]()
spark.sparkContext.register(touchedFilesAccum, TOUCHED_FILES_ACCUM_NAME)

// UDFs to records touched files names and add them to the accumulator
val recordTouchedFileName = DeltaUDF.intFromString { fileName =>
touchedFilesAccum.add(fileName)
1
}.asNondeterministic()
val recordTouchedFileName =
DeltaUDF.intFromStringBoolean { (fileName, shouldRecord) => {
if (shouldRecord) {
touchedFilesAccum.add(fileName)
}
1
}}.asNondeterministic()

// Prune non-matching files if we don't need to collect them for NOT MATCHED BY SOURCE clauses.
val dataSkippedFiles =
if (notMatchedBySourceClauses.isEmpty) {
val targetOnlyPredicates =
splitConjunctivePredicates(condition).filter(_.references.subsetOf(target.outputSet))
deltaTxn.filterFiles(targetOnlyPredicates)
deltaTxn.filterFiles(getTargetOnlyPredicates(spark))
} else {
deltaTxn.filterFiles()
}

val incrSourceRowCountExpr = incrementMetricAndReturnBool("numSourceRows", valueToReturn = true)
val sourceDF = getSourceDF()
.filter(new Column(incrSourceRowCountExpr))

// Join the source and target table using the merge condition to find touched files. An inner
// join collects all candidate files for MATCHED clauses, a right outer join also includes
Expand All @@ -249,14 +229,49 @@ case class MergeIntoCommand(
// target row is modified by multiple user or not
// - the target file name the row is from to later identify the files touched by matched rows
val joinType = if (notMatchedBySourceClauses.isEmpty) "inner" else "right_outer"
val targetDF = buildTargetPlanWithFiles(spark, deltaTxn, dataSkippedFiles)

// When they are only MATCHED clauses, we prune after the join the files that have no rows that
// satisfy any of the clause conditions.
val matchedPredicate =
if (isMatchedOnly) {
matchedClauses
.map(clause => clause.condition.getOrElse(Literal(true)))
.reduce((a, b) => Or(a, b))
} else Literal(true)

// Compute the columns needed for the inner join.
val targetColsNeeded = {
condition.references.map(_.name) ++ deltaTxn.snapshot.metadata.partitionColumns ++
matchedPredicate.references.map(_.name)
}

val columnsToDrop = deltaTxn.snapshot.metadata.schema.map(_.name)
.filterNot { field =>
targetColsNeeded.exists { name => columnComparator(name, field) }
}

// We can't use filter() directly on the expression because that will prevent
// column pruning. We don't need the SOURCE_ROW_PRESENT_COL so we immediately drop it.
val sourceDF = getSourceDF()
.withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr))
.filter(SOURCE_ROW_PRESENT_COL)
.drop(SOURCE_ROW_PRESENT_COL)
val targetPlan =
buildTargetPlanWithFiles(
spark,
deltaTxn,
dataSkippedFiles,
columnsToDrop)
val targetDF = Dataset.ofRows(spark, targetPlan)
.withColumn(ROW_ID_COL, monotonically_increasing_id())
.withColumn(FILE_NAME_COL, input_file_name())

val joinToFindTouchedFiles = sourceDF.join(targetDF, new Column(condition), joinType)

// Process the matches from the inner join to record touched files and find multiple matches
val collectTouchedFiles = joinToFindTouchedFiles
.select(col(ROW_ID_COL), recordTouchedFileName(col(FILE_NAME_COL)).as("one"))
.select(col(ROW_ID_COL),
recordTouchedFileName(col(FILE_NAME_COL), new Column(matchedPredicate)).as("one"))

// Calculate frequency of matches per source row
val matchedRowCounts = collectTouchedFiles.groupBy(ROW_ID_COL).agg(sum("one").as("count"))
Expand Down Expand Up @@ -368,8 +383,12 @@ case class MergeIntoCommand(
val dataSkippedFiles = deltaTxn.filterFiles(targetOnlyPredicates)

// target DataFrame
val targetDF = buildTargetPlanWithFiles(spark, deltaTxn, dataSkippedFiles)

val targetPlan = buildTargetPlanWithFiles(
spark,
deltaTxn,
dataSkippedFiles,
columnsToDrop = Nil)
val targetDF = Dataset.ofRows(spark, targetPlan)
val insertDf = sourceDF.join(targetDF, new Column(condition), "leftanti")
.select(outputCols: _*)
.filter(new Column(incrInsertedCountExpr))
Expand Down Expand Up @@ -417,14 +436,16 @@ case class MergeIntoCommand(
deltaTxn: OptimisticTransaction,
filesToRewrite: Seq[AddFile])
: Seq[FileAction] = recordMergeOperation(
extraOpType = "writeAllChanges",
extraOpType =
if (shouldOptimizeMatchedOnlyMerge(spark)) "writeAllUpdatesAndDeletes"
else "writeAllChanges",
status = s"MERGE operation - Rewriting ${filesToRewrite.size} files",
sqlMetricName = "rewriteTimeMs") {
import org.apache.spark.sql.catalyst.expressions.Literal.{TrueLiteral, FalseLiteral}

val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata)

var targetOutputCols = getTargetOutputCols(deltaTxn)
var targetOutputCols = getTargetOutputCols(deltaTxn, makeNullable = true)
var outputRowSchema = deltaTxn.metadata.schema

// When we have duplicate matches (only allowed when the whenMatchedCondition is a delete with
Expand All @@ -451,9 +472,13 @@ case class MergeIntoCommand(

// Generate a new target dataframe that has same output attributes exprIds as the target plan.
// This allows us to apply the existing resolved update/insert expressions.
val baseTargetDF = buildTargetPlanWithFiles(spark, deltaTxn, filesToRewrite)
val joinType = if (hasNoInserts &&
spark.conf.get(DeltaSQLConf.MERGE_MATCHED_ONLY_ENABLED)) {
val targetPlan = buildTargetPlanWithFiles(
spark,
deltaTxn,
filesToRewrite,
columnsToDrop = Nil)
val baseTargetDF = Dataset.ofRows(spark, targetPlan)
val joinType = if (shouldOptimizeMatchedOnlyMerge(spark)) {
"rightOuter"
} else {
"fullOuter"
Expand Down Expand Up @@ -725,87 +750,6 @@ case class MergeIntoCommand(
newFiles
}


/**
* Build a new logical plan using the given `files` that has the same output columns (exprIds)
* as the `target` logical plan, so that existing update/insert expressions can be applied
* on this new plan.
*/
private def buildTargetPlanWithFiles(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
files: Seq[AddFile]): DataFrame = {
val targetOutputCols = getTargetOutputCols(deltaTxn)
val targetOutputColsMap = {
val colsMap: Map[String, NamedExpression] = targetOutputCols.view
.map(col => col.name -> col).toMap
if (conf.caseSensitiveAnalysis) {
colsMap
} else {
CaseInsensitiveMap(colsMap)
}
}

val plan = {
// We have to do surgery to use the attributes from `targetOutputCols` to scan the table.
// In cases of schema evolution, they may not be the same type as the original attributes.
val original =
deltaTxn.deltaLog.createDataFrame(deltaTxn.snapshot, files).queryExecution.analyzed
val transformed = original.transform {
case LogicalRelation(base, output, catalogTbl, isStreaming) =>
LogicalRelation(
base,
// We can ignore the new columns which aren't yet AttributeReferences.
targetOutputCols.collect { case a: AttributeReference => a },
catalogTbl,
isStreaming)
}

// In case of schema evolution & column mapping, we would also need to rebuild the file format
// because under column mapping, the reference schema within DeltaParquetFileFormat
// that is used to populate metadata needs to be updated
if (deltaTxn.metadata.columnMappingMode != NoMapping) {
val updatedFileFormat = deltaTxn.deltaLog.fileFormat(deltaTxn.protocol, deltaTxn.metadata)
DeltaTableUtils.replaceFileFormat(transformed, updatedFileFormat)
} else {
transformed
}
}

// For each plan output column, find the corresponding target output column (by name) and
// create an alias
val aliases = plan.output.map {
case newAttrib: AttributeReference =>
val existingTargetAttrib = targetOutputColsMap.get(newAttrib.name)
.getOrElse {
throw DeltaErrors.failedFindAttributeInOutputColumns(
newAttrib.name, targetOutputCols.mkString(","))
}.asInstanceOf[AttributeReference]

if (existingTargetAttrib.exprId == newAttrib.exprId) {
// It's not valid to alias an expression to its own exprId (this is considered a
// non-unique exprId by the analyzer), so we just use the attribute directly.
newAttrib
} else {
Alias(newAttrib, existingTargetAttrib.name)(exprId = existingTargetAttrib.exprId)
}
}

Dataset.ofRows(spark, Project(aliases, plan))
}

private def getTargetOutputCols(txn: OptimisticTransaction): Seq[NamedExpression] = {
txn.metadata.schema.map { col =>
targetOutputAttributesMap
.get(col.name)
.map { a =>
AttributeReference(col.name, col.dataType, col.nullable)(a.exprId)
}
.getOrElse(Alias(Literal(null), col.name)()
)
}
}

/**
* Repartitions the output DataFrame by the partition columns if table is partitioned
* and `merge.repartitionBeforeWrite.enabled` is set to true.
Expand Down
Loading

0 comments on commit e27029d

Please sign in to comment.