Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip data in merge based on MATCHED conditions #1851

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.planning.NodeWithOnlyDeterministicProjectAndFilter
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform}
import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -325,6 +327,87 @@ object DeltaTableUtils extends PredicateHelper
}
}

/**
* Replace the file index in a logical plan and return the updated plan.
* It's a common pattern that, in Delta commands, we use data skipping to determine a subset of
* files that can be affected by the command, so we replace the whole-table file index in the
* original logical plan with a new index of potentially affected files, while everything else in
* the original plan, e.g., resolved references, remain unchanged.
*
* Many Delta meta-queries involve nondeterminstic functions, which interfere with automatic
* column pruning, so columns can be manually pruned from the new scan. Note that partition
* columns can never be dropped even if they're not referenced in the rest of the query.
*
* @param spark the spark session to use
* @param target the logical plan in which we replace the file index
* @param fileIndex the new file index
* @param columnsToDrop columns to drop from the scan
* @param newOutput If specified, new logical output to replace the current LogicalRelation.
* Used for schema evolution to produce the new schema-evolved types from
* old files, because `target` will have the old types.
*/
def replaceFileIndex(
spark: SparkSession,
target: LogicalPlan,
fileIndex: FileIndex,
columnsToDrop: Seq[String],
newOutput: Option[Seq[AttributeReference]]): LogicalPlan = {
val resolver = spark.sessionState.analyzer.resolver

var actualNewOutput = newOutput
var hasChar = false
var newTarget = target transformDown {
case l @ LogicalRelation(hfsr: HadoopFsRelation, _, _, _) =>
// Prune columns from the scan.
val finalOutput = actualNewOutput.getOrElse(l.output).filterNot { col =>
columnsToDrop.exists(resolver(_, col.name))
}

// If the output columns were changed e.g. by schema evolution, we need to update
// the relation to expose all the columns that are expected after schema evolution.
val newDataSchema = StructType(finalOutput.map(attr =>
StructField(attr.name, attr.dataType, attr.nullable, attr.metadata)))
val newBaseRelation = hfsr.copy(
location = fileIndex, dataSchema = newDataSchema)(
hfsr.sparkSession)
l.copy(relation = newBaseRelation, output = finalOutput)

case p @ Project(projectList, _) =>
// Spark does char type read-side padding via an additional Project over the scan node.
// `newOutput` references the Project attributes, we need to translate their expression IDs
// so that `newOutput` references attributes from the LogicalRelation instead.
def hasCharPadding(e: Expression): Boolean = e.exists {
johanl-db marked this conversation as resolved.
Show resolved Hide resolved
case s: StaticInvoke => s.staticObject == classOf[CharVarcharCodegenUtils] &&
s.functionName == "readSidePadding"
case _ => false
}
val charColMapping = AttributeMap(projectList.collect {
case a: Alias if hasCharPadding(a.child) && a.references.size == 1 =>
hasChar = true
val tableCol = a.references.head.asInstanceOf[AttributeReference]
a.toAttribute -> tableCol
})
actualNewOutput = newOutput.map(_.map { attr =>
charColMapping.get(attr).map { tableCol =>
attr.withExprId(tableCol.exprId)
}.getOrElse(attr)
})
p
}

if (hasChar) {
// When char type read-side padding is applied, we need to apply column pruning for the
// Project as well, otherwise the Project will contain missing attributes.
newTarget = newTarget.transformUp {
case p @ Project(projectList, child) =>
val newProjectList = projectList.filter { e =>
e.references.subsetOf(child.outputSet)
}
p.copy(projectList = newProjectList)
}
}
newTarget
}

/**
* Update FileFormat for a plan and return the updated plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,14 @@ import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.{AnalysisHelper, SetAccumulator}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, BasePredicate, Expression, Literal, NamedExpression, PredicateHelper, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataTypes, LongType, StructType}

Expand Down Expand Up @@ -75,7 +72,6 @@ case class MergeIntoCommand(
notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause],
notMatchedBySourceClauses: Seq[DeltaMergeIntoNotMatchedBySourceClause],
migratedSchema: Option[StructType]) extends MergeIntoCommandBase
with PredicateHelper
with AnalysisHelper
with ImplicitMetadataOperation {

Expand All @@ -92,26 +88,9 @@ case class MergeIntoCommand(
AttributeReference("num_deleted_rows", LongType)(),
AttributeReference("num_inserted_rows", LongType)())

/**
* Map to get target output attributes by name.
* The case sensitivity of the map is set accordingly to Spark configuration.
*/
@transient private lazy val targetOutputAttributesMap: Map[String, Attribute] = {
val attrMap: Map[String, Attribute] = target
.outputSet.view
.map(attr => attr.name -> attr).toMap
if (conf.caseSensitiveAnalysis) {
attrMap
} else {
CaseInsensitiveMap(attrMap)
}
}

/** Whether this merge statement has only a single insert (NOT MATCHED) clause. */
private def isSingleInsertOnly: Boolean =
matchedClauses.isEmpty && notMatchedBySourceClauses.isEmpty && notMatchedClauses.length == 1
/** Whether this merge statement has no insert (NOT MATCHED) clause. */
private def hasNoInserts: Boolean = notMatchedClauses.isEmpty

// We over-count numTargetRowsDeleted when there are multiple matches;
// this is the amount of the overcount, so we can subtract it to get a correct final metric.
Expand Down Expand Up @@ -217,29 +196,30 @@ case class MergeIntoCommand(
status = "MERGE operation - scanning files for matches",
sqlMetricName = "scanTimeMs") {

val columnComparator = spark.sessionState.analyzer.resolver

// Accumulator to collect all the distinct touched files
val touchedFilesAccum = new SetAccumulator[String]()
spark.sparkContext.register(touchedFilesAccum, TOUCHED_FILES_ACCUM_NAME)

// UDFs to records touched files names and add them to the accumulator
val recordTouchedFileName = DeltaUDF.intFromString { fileName =>
touchedFilesAccum.add(fileName)
1
}.asNondeterministic()
val recordTouchedFileName =
DeltaUDF.intFromStringBoolean { (fileName, shouldRecord) => {
if (shouldRecord) {
touchedFilesAccum.add(fileName)
}
1
}}.asNondeterministic()

// Prune non-matching files if we don't need to collect them for NOT MATCHED BY SOURCE clauses.
val dataSkippedFiles =
if (notMatchedBySourceClauses.isEmpty) {
val targetOnlyPredicates =
splitConjunctivePredicates(condition).filter(_.references.subsetOf(target.outputSet))
deltaTxn.filterFiles(targetOnlyPredicates)
deltaTxn.filterFiles(getTargetOnlyPredicates(spark))
} else {
deltaTxn.filterFiles()
}

val incrSourceRowCountExpr = incrementMetricAndReturnBool("numSourceRows", valueToReturn = true)
val sourceDF = getSourceDF()
.filter(new Column(incrSourceRowCountExpr))

// Join the source and target table using the merge condition to find touched files. An inner
// join collects all candidate files for MATCHED clauses, a right outer join also includes
Expand All @@ -249,14 +229,49 @@ case class MergeIntoCommand(
// target row is modified by multiple user or not
// - the target file name the row is from to later identify the files touched by matched rows
val joinType = if (notMatchedBySourceClauses.isEmpty) "inner" else "right_outer"
val targetDF = buildTargetPlanWithFiles(spark, deltaTxn, dataSkippedFiles)

// When they are only MATCHED clauses, we prune after the join the files that have no rows that
// satisfy any of the clause conditions.
val matchedPredicate =
if (isMatchedOnly) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A common scenario is:

MERGE INTO .... t
USING .... s
   ON s.id = t.id
   WHEN MATCHED AND NOT (s.name <=> t.name) THEN UPDATE SET t.name = s.name
   WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.id, s.name)

In that case the optimization won't work because it has when not matched?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't skip data when reading the target in this case: let's assume there's a row {"id": 10, "name": "Felipe"} in both the source and target tables, i.e. it's a match but it won't be updated by the first MATCHED clause.

If we apply NOT (s.name <=> t.name) as a matched predicate here to prune files, then the file that contains that row in the target table may get pruned from the list of modified files. It won't be included in the join in writeAllChanges and the corresponding row in the source won't have a match anymore and will be wrongly inserted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying it. Do you know if in this case, we are re-writing the parquet file (with the same data, given the matched predicate is false).

I'm asking because user can have large tables and constantly update it using merge, and even if source and target are identical, we would rewrite all the data files. This is what I meant in #1812

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we would rewrite all files where at least one row matches the ON condition. I imagine that would be more complex to implement than the data skipping I'm adding here.

matchedClauses
.map(clause => clause.condition.getOrElse(Literal(true)))
.reduce((a, b) => Or(a, b))
} else Literal(true)

// Compute the columns needed for the inner join.
val targetColsNeeded = {
condition.references.map(_.name) ++ deltaTxn.snapshot.metadata.partitionColumns ++
matchedPredicate.references.map(_.name)
}

val columnsToDrop = deltaTxn.snapshot.metadata.schema.map(_.name)
.filterNot { field =>
targetColsNeeded.exists { name => columnComparator(name, field) }
}

// We can't use filter() directly on the expression because that will prevent
// column pruning. We don't need the SOURCE_ROW_PRESENT_COL so we immediately drop it.
val sourceDF = getSourceDF()
.withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr))
.filter(SOURCE_ROW_PRESENT_COL)
.drop(SOURCE_ROW_PRESENT_COL)
val targetPlan =
buildTargetPlanWithFiles(
spark,
deltaTxn,
dataSkippedFiles,
columnsToDrop)
val targetDF = Dataset.ofRows(spark, targetPlan)
.withColumn(ROW_ID_COL, monotonically_increasing_id())
.withColumn(FILE_NAME_COL, input_file_name())

val joinToFindTouchedFiles = sourceDF.join(targetDF, new Column(condition), joinType)

// Process the matches from the inner join to record touched files and find multiple matches
val collectTouchedFiles = joinToFindTouchedFiles
.select(col(ROW_ID_COL), recordTouchedFileName(col(FILE_NAME_COL)).as("one"))
.select(col(ROW_ID_COL),
recordTouchedFileName(col(FILE_NAME_COL), new Column(matchedPredicate)).as("one"))

// Calculate frequency of matches per source row
val matchedRowCounts = collectTouchedFiles.groupBy(ROW_ID_COL).agg(sum("one").as("count"))
Expand Down Expand Up @@ -368,8 +383,12 @@ case class MergeIntoCommand(
val dataSkippedFiles = deltaTxn.filterFiles(targetOnlyPredicates)

// target DataFrame
val targetDF = buildTargetPlanWithFiles(spark, deltaTxn, dataSkippedFiles)

val targetPlan = buildTargetPlanWithFiles(
spark,
deltaTxn,
dataSkippedFiles,
columnsToDrop = Nil)
val targetDF = Dataset.ofRows(spark, targetPlan)
val insertDf = sourceDF.join(targetDF, new Column(condition), "leftanti")
.select(outputCols: _*)
.filter(new Column(incrInsertedCountExpr))
Expand Down Expand Up @@ -417,14 +436,16 @@ case class MergeIntoCommand(
deltaTxn: OptimisticTransaction,
filesToRewrite: Seq[AddFile])
: Seq[FileAction] = recordMergeOperation(
extraOpType = "writeAllChanges",
extraOpType =
if (shouldOptimizeMatchedOnlyMerge(spark)) "writeAllUpdatesAndDeletes"
else "writeAllChanges",
status = s"MERGE operation - Rewriting ${filesToRewrite.size} files",
sqlMetricName = "rewriteTimeMs") {
import org.apache.spark.sql.catalyst.expressions.Literal.{TrueLiteral, FalseLiteral}

val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata)

var targetOutputCols = getTargetOutputCols(deltaTxn)
var targetOutputCols = getTargetOutputCols(deltaTxn, makeNullable = true)
var outputRowSchema = deltaTxn.metadata.schema

// When we have duplicate matches (only allowed when the whenMatchedCondition is a delete with
Expand All @@ -451,9 +472,13 @@ case class MergeIntoCommand(

// Generate a new target dataframe that has same output attributes exprIds as the target plan.
// This allows us to apply the existing resolved update/insert expressions.
val baseTargetDF = buildTargetPlanWithFiles(spark, deltaTxn, filesToRewrite)
val joinType = if (hasNoInserts &&
spark.conf.get(DeltaSQLConf.MERGE_MATCHED_ONLY_ENABLED)) {
val targetPlan = buildTargetPlanWithFiles(
spark,
deltaTxn,
filesToRewrite,
columnsToDrop = Nil)
val baseTargetDF = Dataset.ofRows(spark, targetPlan)
val joinType = if (shouldOptimizeMatchedOnlyMerge(spark)) {
"rightOuter"
} else {
"fullOuter"
Expand Down Expand Up @@ -725,87 +750,6 @@ case class MergeIntoCommand(
newFiles
}


/**
* Build a new logical plan using the given `files` that has the same output columns (exprIds)
* as the `target` logical plan, so that existing update/insert expressions can be applied
* on this new plan.
*/
private def buildTargetPlanWithFiles(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
files: Seq[AddFile]): DataFrame = {
val targetOutputCols = getTargetOutputCols(deltaTxn)
val targetOutputColsMap = {
val colsMap: Map[String, NamedExpression] = targetOutputCols.view
.map(col => col.name -> col).toMap
if (conf.caseSensitiveAnalysis) {
colsMap
} else {
CaseInsensitiveMap(colsMap)
}
}

val plan = {
// We have to do surgery to use the attributes from `targetOutputCols` to scan the table.
// In cases of schema evolution, they may not be the same type as the original attributes.
val original =
deltaTxn.deltaLog.createDataFrame(deltaTxn.snapshot, files).queryExecution.analyzed
val transformed = original.transform {
case LogicalRelation(base, output, catalogTbl, isStreaming) =>
LogicalRelation(
base,
// We can ignore the new columns which aren't yet AttributeReferences.
targetOutputCols.collect { case a: AttributeReference => a },
catalogTbl,
isStreaming)
}

// In case of schema evolution & column mapping, we would also need to rebuild the file format
// because under column mapping, the reference schema within DeltaParquetFileFormat
// that is used to populate metadata needs to be updated
if (deltaTxn.metadata.columnMappingMode != NoMapping) {
val updatedFileFormat = deltaTxn.deltaLog.fileFormat(deltaTxn.protocol, deltaTxn.metadata)
DeltaTableUtils.replaceFileFormat(transformed, updatedFileFormat)
} else {
transformed
}
}

// For each plan output column, find the corresponding target output column (by name) and
// create an alias
val aliases = plan.output.map {
case newAttrib: AttributeReference =>
val existingTargetAttrib = targetOutputColsMap.get(newAttrib.name)
.getOrElse {
throw DeltaErrors.failedFindAttributeInOutputColumns(
newAttrib.name, targetOutputCols.mkString(","))
}.asInstanceOf[AttributeReference]

if (existingTargetAttrib.exprId == newAttrib.exprId) {
// It's not valid to alias an expression to its own exprId (this is considered a
// non-unique exprId by the analyzer), so we just use the attribute directly.
newAttrib
} else {
Alias(newAttrib, existingTargetAttrib.name)(exprId = existingTargetAttrib.exprId)
}
}

Dataset.ofRows(spark, Project(aliases, plan))
}

private def getTargetOutputCols(txn: OptimisticTransaction): Seq[NamedExpression] = {
txn.metadata.schema.map { col =>
targetOutputAttributesMap
.get(col.name)
.map { a =>
AttributeReference(col.name, col.dataType, col.nullable)(a.exprId)
}
.getOrElse(Alias(Literal(null), col.name)()
)
}
}

/**
* Repartitions the output DataFrame by the partition columns if table is partitioned
* and `merge.repartitionBeforeWrite.enabled` is set to true.
Expand Down
Loading