diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala index 368d005fc7a..4743be6b0dc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala @@ -16,26 +16,15 @@ package org.apache.spark.sql.delta.commands -import scala.collection.JavaConverters._ - import org.apache.spark.sql.delta._ -import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, FileAction} -import org.apache.spark.sql.delta.commands.MergeIntoCommandBase.ROW_DROPPED_COL -import org.apache.spark.sql.delta.commands.merge.InsertOnlyMergeExecutor +import org.apache.spark.sql.delta.commands.merge.{ClassicMergeExecutor, InsertOnlyMergeExecutor} import org.apache.spark.sql.delta.files._ -import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils} import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.util.{AnalysisHelper, SetAccumulator} import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{DataTypes, LongType, StructType} +import org.apache.spark.sql.types.{LongType, StructType} /** * Performs a merge of a source query/table into a Delta table. @@ -47,7 +36,7 @@ import org.apache.spark.sql.types.{DataTypes, LongType, StructType} * * Phase 1: Find the input files in target that are touched by the rows that satisfy * the condition and verify that no two source rows match with the same target row. - * This is implemented as an inner-join using the given condition. See [[findTouchedFiles]] + * This is implemented as an inner-join using the given condition. See [[ClassicMergeExecutor]] * for more details. * * Phase 2: Read the touched files again and write new files with updated and/or inserted rows. @@ -73,17 +62,8 @@ case class MergeIntoCommand( notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause], notMatchedBySourceClauses: Seq[DeltaMergeIntoNotMatchedBySourceClause], migratedSchema: Option[StructType]) extends MergeIntoCommandBase - with AnalysisHelper with InsertOnlyMergeExecutor - with ImplicitMetadataOperation { - - import MergeIntoCommand._ - import MergeIntoCommandBase.totalBytesAndDistinctPartitionValues - - import org.apache.spark.sql.delta.commands.cdc.CDCReader._ - - override val canMergeSchema: Boolean = conf.getConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE) - override val canOverwriteSchema: Boolean = false + with ClassicMergeExecutor { override val output: Seq[Attribute] = Seq( AttributeReference("num_affected_rows", LongType)(), @@ -91,38 +71,6 @@ case class MergeIntoCommand( AttributeReference("num_deleted_rows", LongType)(), AttributeReference("num_inserted_rows", LongType)()) - // We over-count numTargetRowsDeleted when there are multiple matches; - // this is the amount of the overcount, so we can subtract it to get a correct final metric. - private var multipleMatchDeleteOnlyOvercount: Option[Long] = None - override def run(spark: SparkSession): Seq[Row] = { - metrics("executionTimeMs").set(0) - metrics("scanTimeMs").set(0) - metrics("rewriteTimeMs").set(0) - - if (migratedSchema.isDefined) { - // Block writes of void columns in the Delta log. Currently void columns are not properly - // supported and are dropped on read, but this is not enough for merge command that is also - // reading the schema from the Delta log. Until proper support we prefer to fail merge - // queries that add void columns. - val newNullColumn = SchemaUtils.findNullTypeColumn(migratedSchema.get) - if (newNullColumn.isDefined) { - throw new AnalysisException( - s"""Cannot add column '${newNullColumn.get}' with type 'void'. Please explicitly specify a - |non-void type.""".stripMargin.replaceAll("\n", " ") - ) - } - } - val (materializeSource, _) = shouldMaterializeSource(spark, source, isInsertOnly) - if (!materializeSource) { - runMerge(spark) - } else { - // If it is determined that source should be materialized, wrap the execution with retries, - // in case the data of the materialized source is lost. - runWithMaterializedSourceLostRetries( - spark, targetFileIndex.deltaLog, metrics, runMerge) - } - } - protected def runMerge(spark: SparkSession): Seq[Row] = { recordDeltaOperation(targetDeltaLog, "delta.dml.merge") { val startTime = System.nanoTime() @@ -160,10 +108,10 @@ case class MergeIntoCommand( writeOnlyInserts( spark, deltaTxn, filterMatchedRows = true, numSourceRowsMetric = "numSourceRows") } else { - val filesToRewrite = findTouchedFiles(spark, deltaTxn) + val (filesToRewrite, deduplicateCDFDeletes) = findTouchedFiles(spark, deltaTxn) if (filesToRewrite.nonEmpty) { val newWrittenFiles = withStatusCode("DELTA", "Writing merged data") { - writeAllChanges(spark, deltaTxn, filesToRewrite) + writeAllChanges(spark, deltaTxn, filesToRewrite, deduplicateCDFDeletes) } filesToRewrite.map(_.remove) ++ newWrittenFiles } else { @@ -193,621 +141,4 @@ case class MergeIntoCommand( metrics("numTargetRowsInserted").value, metrics("numTargetRowsUpdated").value, metrics("numTargetRowsDeleted").value, metrics("numTargetRowsInserted").value)) } - - /** - * Find the target table files that contain the rows that satisfy the merge condition. This is - * implemented as an inner-join between the source query/table and the target table using - * the merge condition. - */ - private def findTouchedFiles( - spark: SparkSession, - deltaTxn: OptimisticTransaction) - : Seq[AddFile] = recordMergeOperation( - extraOpType = "findTouchedFiles", - status = "MERGE operation - scanning files for matches", - sqlMetricName = "scanTimeMs") { - - val columnComparator = spark.sessionState.analyzer.resolver - - // Accumulator to collect all the distinct touched files - val touchedFilesAccum = new SetAccumulator[String]() - spark.sparkContext.register(touchedFilesAccum, TOUCHED_FILES_ACCUM_NAME) - - // UDFs to records touched files names and add them to the accumulator - val recordTouchedFileName = - DeltaUDF.intFromStringBoolean { (fileName, shouldRecord) => { - if (shouldRecord) { - touchedFilesAccum.add(fileName) - } - 1 - }}.asNondeterministic() - - // Prune non-matching files if we don't need to collect them for NOT MATCHED BY SOURCE clauses. - val dataSkippedFiles = - if (notMatchedBySourceClauses.isEmpty) { - deltaTxn.filterFiles(getTargetOnlyPredicates(spark)) - } else { - deltaTxn.filterFiles() - } - - val incrSourceRowCountExpr = incrementMetricAndReturnBool("numSourceRows", valueToReturn = true) - - // Join the source and target table using the merge condition to find touched files. An inner - // join collects all candidate files for MATCHED clauses, a right outer join also includes - // candidates for NOT MATCHED BY SOURCE clauses. - // In addition, we attach two columns - // - a monotonically increasing row id for target rows to later identify whether the same - // target row is modified by multiple user or not - // - the target file name the row is from to later identify the files touched by matched rows - val joinType = if (notMatchedBySourceClauses.isEmpty) "inner" else "right_outer" - - // When they are only MATCHED clauses, after the join we prune files that have no rows that - // satisfy any of the clause conditions. - val matchedPredicate = - if (isMatchedOnly) { - matchedClauses - .map(clause => clause.condition.getOrElse(Literal(true))) - .reduce((a, b) => Or(a, b)) - } else Literal(true) - - // Compute the columns needed for the inner join. - val targetColsNeeded = { - condition.references.map(_.name) ++ deltaTxn.snapshot.metadata.partitionColumns ++ - matchedPredicate.references.map(_.name) - } - - val columnsToDrop = deltaTxn.snapshot.metadata.schema.map(_.name) - .filterNot { field => - targetColsNeeded.exists { name => columnComparator(name, field) } - } - - // We can't use filter() directly on the expression because that will prevent - // column pruning. We don't need the SOURCE_ROW_PRESENT_COL so we immediately drop it. - val sourceDF = getSourceDF() - .withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr)) - .filter(SOURCE_ROW_PRESENT_COL) - .drop(SOURCE_ROW_PRESENT_COL) - val targetPlan = - buildTargetPlanWithFiles( - spark, - deltaTxn, - dataSkippedFiles, - columnsToDrop) - val targetDF = Dataset.ofRows(spark, targetPlan) - .withColumn(ROW_ID_COL, monotonically_increasing_id()) - .withColumn(FILE_NAME_COL, input_file_name()) - - val joinToFindTouchedFiles = sourceDF.join(targetDF, new Column(condition), joinType) - - // Process the matches from the inner join to record touched files and find multiple matches - val collectTouchedFiles = joinToFindTouchedFiles - .select(col(ROW_ID_COL), - recordTouchedFileName(col(FILE_NAME_COL), new Column(matchedPredicate)).as("one")) - - // Calculate frequency of matches per source row - val matchedRowCounts = collectTouchedFiles.groupBy(ROW_ID_COL).agg(sum("one").as("count")) - - // Get multiple matches and simultaneously collect (using touchedFilesAccum) the file names - // multipleMatchCount = # of target rows with more than 1 matching source row (duplicate match) - // multipleMatchSum = total # of duplicate matched rows - import org.apache.spark.sql.delta.implicits._ - val (multipleMatchCount, multipleMatchSum) = matchedRowCounts - .filter("count > 1") - .select(coalesce(count(new Column("*")), lit(0)), coalesce(sum("count"), lit(0))) - .as[(Long, Long)] - .collect() - .head - - val hasMultipleMatches = multipleMatchCount > 0 - - // Throw error if multiple matches are ambiguous or cannot be computed correctly. - val canBeComputedUnambiguously = { - // Multiple matches are not ambiguous when there is only one unconditional delete as - // all the matched row pairs in the 2nd join in `writeAllChanges` will get deleted. - val isUnconditionalDelete = matchedClauses.headOption match { - case Some(DeltaMergeIntoMatchedDeleteClause(None)) => true - case _ => false - } - matchedClauses.size == 1 && isUnconditionalDelete - } - - if (hasMultipleMatches && !canBeComputedUnambiguously) { - throw DeltaErrors.multipleSourceRowMatchingTargetRowInMergeException(spark) - } - - if (hasMultipleMatches) { - // This is only allowed for delete-only queries. - // This query will count the duplicates for numTargetRowsDeleted in Job 2, - // because we count matches after the join and not just the target rows. - // We have to compensate for this by subtracting the duplicates later, - // so we need to record them here. - val duplicateCount = multipleMatchSum - multipleMatchCount - multipleMatchDeleteOnlyOvercount = Some(duplicateCount) - } - - // Get the AddFiles using the touched file names. - val touchedFileNames = touchedFilesAccum.value.iterator().asScala.toSeq - logTrace(s"findTouchedFiles: matched files:\n\t${touchedFileNames.mkString("\n\t")}") - - val nameToAddFileMap = generateCandidateFileMap(targetDeltaLog.dataPath, dataSkippedFiles) - val touchedAddFiles = touchedFileNames.map(f => - getTouchedFile(targetDeltaLog.dataPath, f, nameToAddFileMap)) - - // When the target table is empty, and the optimizer optimized away the join entirely - // numSourceRows will be incorrectly 0. We need to scan the source table once to get the correct - // metric here. - if (metrics("numSourceRows").value == 0 && - (dataSkippedFiles.isEmpty || targetDF.take(1).isEmpty)) { - val numSourceRows = sourceDF.count() - metrics("numSourceRows").set(numSourceRows) - } - - // Update metrics - metrics("numTargetFilesBeforeSkipping") += deltaTxn.snapshot.numOfFiles - metrics("numTargetBytesBeforeSkipping") += deltaTxn.snapshot.sizeInBytes - val (afterSkippingBytes, afterSkippingPartitions) = - totalBytesAndDistinctPartitionValues(dataSkippedFiles) - metrics("numTargetFilesAfterSkipping") += dataSkippedFiles.size - metrics("numTargetBytesAfterSkipping") += afterSkippingBytes - metrics("numTargetPartitionsAfterSkipping") += afterSkippingPartitions - val (removedBytes, removedPartitions) = totalBytesAndDistinctPartitionValues(touchedAddFiles) - metrics("numTargetFilesRemoved") += touchedAddFiles.size - metrics("numTargetBytesRemoved") += removedBytes - metrics("numTargetPartitionsRemovedFrom") += removedPartitions - touchedAddFiles - } - - /** - * Write new files by reading the touched files and updating/inserting data using the source - * query/table. This is implemented using a full|right-outer-join using the merge condition. - * - * Note that unlike the insert-only code paths with just one control column INCR_ROW_COUNT_COL, - * this method has two additional control columns ROW_DROPPED_COL for dropping deleted rows and - * CDC_TYPE_COL_NAME used for handling CDC when enabled. - */ - private def writeAllChanges( - spark: SparkSession, - deltaTxn: OptimisticTransaction, - filesToRewrite: Seq[AddFile]) - : Seq[FileAction] = recordMergeOperation( - extraOpType = - if (shouldOptimizeMatchedOnlyMerge(spark)) "writeAllUpdatesAndDeletes" - else "writeAllChanges", - status = s"MERGE operation - Rewriting ${filesToRewrite.size} files", - sqlMetricName = "rewriteTimeMs") { - import org.apache.spark.sql.catalyst.expressions.Literal.{TrueLiteral, FalseLiteral} - - val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata) - - var targetOutputCols = getTargetOutputCols(deltaTxn, makeNullable = true) - var outputRowSchema = deltaTxn.metadata.schema - - // When we have duplicate matches (only allowed when the whenMatchedCondition is a delete with - // no match condition) we will incorrectly generate duplicate CDC rows. - // Duplicate matches can be due to: - // - Duplicate rows in the source w.r.t. the merge condition - // - A target-only or source-only merge condition, which essentially turns our join into a cross - // join with the target/source satisfiying the merge condition. - // These duplicate matches are dropped from the main data output since this is a delete - // operation, but the duplicate CDC rows are not removed by default. - // See https://github.com/delta-io/delta/issues/1274 - - // We address this specific scenario by adding row ids to the target before performing our join. - // There should only be one CDC delete row per target row so we can use these row ids to dedupe - // the duplicate CDC delete rows. - - // We also need to address the scenario when there are duplicate matches with delete and we - // insert duplicate rows. Here we need to additionally add row ids to the source before the - // join to avoid dropping these valid duplicate inserted rows and their corresponding cdc rows. - - // When there is an insert clause, we set SOURCE_ROW_ID_COL=null for all delete rows because we - // need to drop the duplicate matches. - val isDeleteWithDuplicateMatchesAndCdc = multipleMatchDeleteOnlyOvercount.nonEmpty && cdcEnabled - - // Generate a new target dataframe that has same output attributes exprIds as the target plan. - // This allows us to apply the existing resolved update/insert expressions. - val targetPlan = buildTargetPlanWithFiles( - spark, - deltaTxn, - filesToRewrite, - columnsToDrop = Nil) - val baseTargetDF = Dataset.ofRows(spark, targetPlan) - val joinType = if (shouldOptimizeMatchedOnlyMerge(spark)) { - "rightOuter" - } else { - "fullOuter" - } - - logDebug(s"""writeAllChanges using $joinType join: - | source.output: ${source.outputSet} - | target.output: ${target.outputSet} - | condition: $condition - | newTarget.output: ${baseTargetDF.queryExecution.logical.outputSet} - """.stripMargin) - - // Expressions to update metrics - val incrSourceRowCountExpr = - incrementMetricAndReturnBool("numSourceRowsInSecondScan", valueToReturn = true) - val incrUpdatedCountExpr = - incrementMetricAndReturnBool("numTargetRowsUpdated", valueToReturn = true) - val incrUpdatedMatchedCountExpr = - incrementMetricAndReturnBool("numTargetRowsMatchedUpdated", valueToReturn = true) - val incrUpdatedNotMatchedBySourceCountExpr = - incrementMetricAndReturnBool("numTargetRowsNotMatchedBySourceUpdated", valueToReturn = true) - val incrInsertedCountExpr = - incrementMetricAndReturnBool("numTargetRowsInserted", valueToReturn = true) - val incrNoopCountExpr = - incrementMetricAndReturnBool("numTargetRowsCopied", valueToReturn = true) - val incrDeletedCountExpr = - incrementMetricAndReturnBool("numTargetRowsDeleted", valueToReturn = true) - val incrDeletedMatchedCountExpr = - incrementMetricAndReturnBool("numTargetRowsMatchedDeleted", valueToReturn = true) - val incrDeletedNotMatchedBySourceCountExpr = - incrementMetricAndReturnBool("numTargetRowsNotMatchedBySourceDeleted", valueToReturn = true) - - // Apply an outer join to find both, matches and non-matches. We are adding two boolean fields - // with value `true`, one to each side of the join. Whether this field is null or not after - // the outer join, will allow us to identify whether the resultant joined row was a - // matched inner result or an unmatched result with null on one side. - // We add row IDs to the targetDF if we have a delete-when-matched clause with duplicate - // matches and CDC is enabled, and additionally add row IDs to the source if we also have an - // insert clause. See above at isDeleteWithDuplicateMatchesAndCdc definition for more details. - var sourceDF = getSourceDF() - .withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr)) - var targetDF = baseTargetDF - .withColumn(TARGET_ROW_PRESENT_COL, lit(true)) - if (isDeleteWithDuplicateMatchesAndCdc) { - targetDF = targetDF.withColumn(TARGET_ROW_ID_COL, monotonically_increasing_id()) - if (notMatchedClauses.nonEmpty) { // insert clause - sourceDF = sourceDF.withColumn(SOURCE_ROW_ID_COL, monotonically_increasing_id()) - } - } - val joinedDF = sourceDF.join(targetDF, new Column(condition), joinType) - val joinedPlan = joinedDF.queryExecution.analyzed - - def resolveOnJoinedPlan(exprs: Seq[Expression]): Seq[Expression] = { - tryResolveReferencesForExpressions(spark, exprs, joinedPlan) - } - - // ==== Generate the expressions to process full-outer join output and generate target rows ==== - // If there are N columns in the target table, there will be N + 3 columns after processing - // - N columns for target table - // - ROW_DROPPED_COL to define whether the generated row should dropped or written - // - INCR_ROW_COUNT_COL containing an expression to update the output row row counter - // - CDC_TYPE_COLUMN_NAME containing the type of change being performed in a particular row - - // To generate these N + 3 columns, we will generate N + 3 expressions and apply them to the - // rows in the joinedDF. The CDC column will be either used for CDC generation or dropped before - // performing the final write, and the other two will always be dropped after executing the - // metrics expressions and filtering on ROW_DROPPED_COL. - - // We produce rows for both the main table data (with CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC), - // and rows for the CDC data which will be output to CDCReader.CDC_LOCATION. - // See [[CDCReader]] for general details on how partitioning on the CDC type column works. - - // In the following functions `updateOutput`, `deleteOutput` and `insertOutput`, we - // produce a Seq[Expression] for each intended output row. - // Depending on the clause and whether CDC is enabled, we output between 0 and 3 rows, as a - // Seq[Seq[Expression]] - - // There is one corner case outlined above at isDeleteWithDuplicateMatchesAndCdc definition. - // When we have a delete-ONLY merge with duplicate matches we have N + 4 columns: - // N target cols, TARGET_ROW_ID_COL, ROW_DROPPED_COL, INCR_ROW_COUNT_COL, CDC_TYPE_COLUMN_NAME - // When we have a delete-when-matched merge with duplicate matches + an insert clause, we have - // N + 5 columns: - // N target cols, TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL, ROW_DROPPED_COL, INCR_ROW_COUNT_COL, - // CDC_TYPE_COLUMN_NAME - // These ROW_ID_COL will always be dropped before the final write. - - if (isDeleteWithDuplicateMatchesAndCdc) { - targetOutputCols = targetOutputCols :+ UnresolvedAttribute(TARGET_ROW_ID_COL) - outputRowSchema = outputRowSchema.add(TARGET_ROW_ID_COL, DataTypes.LongType) - if (notMatchedClauses.nonEmpty) { // there is an insert clause, make SRC_ROW_ID_COL=null - targetOutputCols = targetOutputCols :+ Alias(Literal(null), SOURCE_ROW_ID_COL)() - outputRowSchema = outputRowSchema.add(SOURCE_ROW_ID_COL, DataTypes.LongType) - } - } - - if (cdcEnabled) { - outputRowSchema = outputRowSchema - .add(ROW_DROPPED_COL, DataTypes.BooleanType) - .add(INCR_ROW_COUNT_COL, DataTypes.BooleanType) - .add(CDC_TYPE_COLUMN_NAME, DataTypes.StringType) - } - - def updateOutput(resolvedActions: Seq[DeltaMergeAction], incrMetricExpr: Expression) - : Seq[Seq[Expression]] = { - val updateExprs = { - // Generate update expressions and set ROW_DELETED_COL = false and - // CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC - val mainDataOutput = resolvedActions.map(_.expr) :+ FalseLiteral :+ - incrMetricExpr :+ CDC_TYPE_NOT_CDC - if (cdcEnabled) { - // For update preimage, we have do a no-op copy with ROW_DELETED_COL = false and - // CDC_TYPE_COLUMN_NAME = CDC_TYPE_UPDATE_PREIMAGE and INCR_ROW_COUNT_COL as a no-op - // (because the metric will be incremented in `mainDataOutput`) - val preImageOutput = targetOutputCols :+ FalseLiteral :+ TrueLiteral :+ - Literal(CDC_TYPE_UPDATE_PREIMAGE) - // For update postimage, we have the same expressions as for mainDataOutput but with - // INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in - // `mainDataOutput`), and CDC_TYPE_COLUMN_NAME = CDC_TYPE_UPDATE_POSTIMAGE - val postImageOutput = mainDataOutput.dropRight(2) :+ TrueLiteral :+ - Literal(CDC_TYPE_UPDATE_POSTIMAGE) - Seq(mainDataOutput, preImageOutput, postImageOutput) - } else { - Seq(mainDataOutput) - } - } - updateExprs.map(resolveOnJoinedPlan) - } - - def deleteOutput(incrMetricExpr: Expression): Seq[Seq[Expression]] = { - val deleteExprs = { - // Generate expressions to set the ROW_DELETED_COL = true and CDC_TYPE_COLUMN_NAME = - // CDC_TYPE_NOT_CDC - val mainDataOutput = targetOutputCols :+ TrueLiteral :+ incrMetricExpr :+ - CDC_TYPE_NOT_CDC - if (cdcEnabled) { - // For delete we do a no-op copy with ROW_DELETED_COL = false, INCR_ROW_COUNT_COL as a - // no-op (because the metric will be incremented in `mainDataOutput`) and - // CDC_TYPE_COLUMN_NAME = CDC_TYPE_DELETE - val deleteCdcOutput = targetOutputCols :+ FalseLiteral :+ TrueLiteral :+ CDC_TYPE_DELETE - Seq(mainDataOutput, deleteCdcOutput) - } else { - Seq(mainDataOutput) - } - } - deleteExprs.map(resolveOnJoinedPlan) - } - - def insertOutput(resolvedActions: Seq[DeltaMergeAction], incrMetricExpr: Expression) - : Seq[Seq[Expression]] = { - // Generate insert expressions and set ROW_DELETED_COL = false and - // CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC - val insertExprs = resolvedActions.map(_.expr) - val mainDataOutput = resolveOnJoinedPlan( - if (isDeleteWithDuplicateMatchesAndCdc) { - // Must be delete-when-matched merge with duplicate matches + insert clause - // Therefore we must keep the target row id and source row id. Since this is a not-matched - // clause we know the target row-id will be null. See above at - // isDeleteWithDuplicateMatchesAndCdc definition for more details. - insertExprs :+ - Alias(Literal(null), TARGET_ROW_ID_COL)() :+ UnresolvedAttribute(SOURCE_ROW_ID_COL) :+ - FalseLiteral :+ incrMetricExpr :+ CDC_TYPE_NOT_CDC - } else { - insertExprs :+ FalseLiteral :+ incrMetricExpr :+ CDC_TYPE_NOT_CDC - } - ) - if (cdcEnabled) { - // For insert we have the same expressions as for mainDataOutput, but with - // INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in - // `mainDataOutput`), and CDC_TYPE_COLUMN_NAME = CDC_TYPE_INSERT - val insertCdcOutput = mainDataOutput.dropRight(2) :+ TrueLiteral :+ Literal(CDC_TYPE_INSERT) - Seq(mainDataOutput, insertCdcOutput) - } else { - Seq(mainDataOutput) - } - } - - def clauseOutput(clause: DeltaMergeIntoClause): Seq[Seq[Expression]] = clause match { - case u: DeltaMergeIntoMatchedUpdateClause => - updateOutput(u.resolvedActions, And(incrUpdatedCountExpr, incrUpdatedMatchedCountExpr)) - case _: DeltaMergeIntoMatchedDeleteClause => - deleteOutput(And(incrDeletedCountExpr, incrDeletedMatchedCountExpr)) - case i: DeltaMergeIntoNotMatchedInsertClause => - insertOutput(i.resolvedActions, incrInsertedCountExpr) - case u: DeltaMergeIntoNotMatchedBySourceUpdateClause => - updateOutput( - u.resolvedActions, - And(incrUpdatedCountExpr, incrUpdatedNotMatchedBySourceCountExpr)) - case _: DeltaMergeIntoNotMatchedBySourceDeleteClause => - deleteOutput(And(incrDeletedCountExpr, incrDeletedNotMatchedBySourceCountExpr)) - } - - def clauseCondition(clause: DeltaMergeIntoClause): Expression = { - // if condition is None, then expression always evaluates to true - val condExpr = clause.condition.getOrElse(TrueLiteral) - resolveOnJoinedPlan(Seq(condExpr)).head - } - - val joinedRowEncoder = RowEncoder(joinedPlan.schema) - val outputRowEncoder = RowEncoder(outputRowSchema).resolveAndBind() - - val processor = new JoinedRowProcessor( - targetRowHasNoMatch = resolveOnJoinedPlan(Seq(col(SOURCE_ROW_PRESENT_COL).isNull.expr)).head, - sourceRowHasNoMatch = resolveOnJoinedPlan(Seq(col(TARGET_ROW_PRESENT_COL).isNull.expr)).head, - matchedConditions = matchedClauses.map(clauseCondition), - matchedOutputs = matchedClauses.map(clauseOutput), - notMatchedConditions = notMatchedClauses.map(clauseCondition), - notMatchedOutputs = notMatchedClauses.map(clauseOutput), - notMatchedBySourceConditions = notMatchedBySourceClauses.map(clauseCondition), - notMatchedBySourceOutputs = notMatchedBySourceClauses.map(clauseOutput), - noopCopyOutput = - resolveOnJoinedPlan(targetOutputCols :+ FalseLiteral :+ incrNoopCountExpr :+ - CDC_TYPE_NOT_CDC), - deleteRowOutput = - resolveOnJoinedPlan(targetOutputCols :+ TrueLiteral :+ TrueLiteral :+ CDC_TYPE_NOT_CDC), - joinedAttributes = joinedPlan.output, - joinedRowEncoder = joinedRowEncoder, - outputRowEncoder = outputRowEncoder) - - var outputDF = - Dataset.ofRows(spark, joinedPlan).mapPartitions(processor.processPartition)(outputRowEncoder) - - if (isDeleteWithDuplicateMatchesAndCdc) { - // When we have a delete when matched clause with duplicate matches we have to remove - // duplicate CDC rows. This scenario is further explained at - // isDeleteWithDuplicateMatchesAndCdc definition. - - // To remove duplicate CDC rows generated by the duplicate matches we dedupe by - // TARGET_ROW_ID_COL since there should only be one CDC delete row per target row. - // When there is an insert clause in addition to the delete clause we additionally dedupe by - // SOURCE_ROW_ID_COL and CDC_TYPE_COLUMN_NAME to avoid dropping valid duplicate inserted rows - // and their corresponding CDC rows. - val columnsToDedupeBy = if (notMatchedClauses.nonEmpty) { // insert clause - Seq(TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL, CDC_TYPE_COLUMN_NAME) - } else { - Seq(TARGET_ROW_ID_COL) - } - outputDF = outputDF - .dropDuplicates(columnsToDedupeBy) - .drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL, TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL) - } else { - outputDF = outputDF.drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL) - } - - logDebug("writeAllChanges: join output plan:\n" + outputDF.queryExecution) - - // Write to Delta - val newFiles = writeFiles(spark, deltaTxn, outputDF) - - // Update metrics - val (addedBytes, addedPartitions) = totalBytesAndDistinctPartitionValues(newFiles) - metrics("numTargetFilesAdded") += newFiles.count(_.isInstanceOf[AddFile]) - metrics("numTargetChangeFilesAdded") += newFiles.count(_.isInstanceOf[AddCDCFile]) - metrics("numTargetChangeFileBytes") += newFiles.collect{ case f: AddCDCFile => f.size }.sum - metrics("numTargetBytesAdded") += addedBytes - metrics("numTargetPartitionsAddedTo") += addedPartitions - if (multipleMatchDeleteOnlyOvercount.isDefined) { - // Compensate for counting duplicates during the query. - val actualRowsDeleted = - metrics("numTargetRowsDeleted").value - multipleMatchDeleteOnlyOvercount.get - assert(actualRowsDeleted >= 0) - metrics("numTargetRowsDeleted").set(actualRowsDeleted) - val actualRowsMatchedDeleted = - metrics("numTargetRowsMatchedDeleted").value - multipleMatchDeleteOnlyOvercount.get - assert(actualRowsMatchedDeleted >= 0) - metrics("numTargetRowsMatchedDeleted").set(actualRowsMatchedDeleted) - } - - newFiles - } -} - -object MergeIntoCommand { - /** - * Spark UI will track all normal accumulators along with Spark tasks to show them on Web UI. - * However, the accumulator used by `MergeIntoCommand` can store a very large value since it - * tracks all files that need to be rewritten. We should ask Spark UI to not remember it, - * otherwise, the UI data may consume lots of memory. Hence, we use the prefix `internal.metrics.` - * to make this accumulator become an internal accumulator, so that it will not be tracked by - * Spark UI. - */ - val TOUCHED_FILES_ACCUM_NAME = "internal.metrics.MergeIntoDelta.touchedFiles" - - val ROW_ID_COL = "_row_id_" - val TARGET_ROW_ID_COL = "_target_row_id_" - val SOURCE_ROW_ID_COL = "_source_row_id_" - val FILE_NAME_COL = "_file_name_" - val SOURCE_ROW_PRESENT_COL = "_source_row_present_" - val TARGET_ROW_PRESENT_COL = "_target_row_present_" - val INCR_ROW_COUNT_COL = "_incr_row_count_" - - /** - * @param targetRowHasNoMatch whether a joined row is a target row with no match in the source - * table - * @param sourceRowHasNoMatch whether a joined row is a source row with no match in the target - * table - * @param matchedConditions condition for each match clause - * @param matchedOutputs corresponding output for each match clause. for each clause, we - * have 1-3 output rows, each of which is a sequence of expressions - * to apply to the joined row - * @param notMatchedConditions condition for each not-matched clause - * @param notMatchedOutputs corresponding output for each not-matched clause. for each clause, - * we have 1-2 output rows, each of which is a sequence of - * expressions to apply to the joined row - * @param notMatchedBySourceConditions condition for each not-matched-by-source clause - * @param notMatchedBySourceOutputs corresponding output for each not-matched-by-source - * clause. for each clause, we have 1-3 output rows, each of - * which is a sequence of expressions to apply to the joined - * row - * @param noopCopyOutput no-op expression to copy a target row to the output - * @param deleteRowOutput expression to drop a row from the final output. this is used for - * source rows that don't match any not-matched clauses - * @param joinedAttributes schema of our outer-joined dataframe - * @param joinedRowEncoder joinedDF row encoder - * @param outputRowEncoder final output row encoder - */ - class JoinedRowProcessor( - targetRowHasNoMatch: Expression, - sourceRowHasNoMatch: Expression, - matchedConditions: Seq[Expression], - matchedOutputs: Seq[Seq[Seq[Expression]]], - notMatchedConditions: Seq[Expression], - notMatchedOutputs: Seq[Seq[Seq[Expression]]], - notMatchedBySourceConditions: Seq[Expression], - notMatchedBySourceOutputs: Seq[Seq[Seq[Expression]]], - noopCopyOutput: Seq[Expression], - deleteRowOutput: Seq[Expression], - joinedAttributes: Seq[Attribute], - joinedRowEncoder: ExpressionEncoder[Row], - outputRowEncoder: ExpressionEncoder[Row]) extends Serializable { - - private def generateProjection(exprs: Seq[Expression]): UnsafeProjection = { - UnsafeProjection.create(exprs, joinedAttributes) - } - - private def generatePredicate(expr: Expression): BasePredicate = { - GeneratePredicate.generate(expr, joinedAttributes) - } - - def processPartition(rowIterator: Iterator[Row]): Iterator[Row] = { - - val targetRowHasNoMatchPred = generatePredicate(targetRowHasNoMatch) - val sourceRowHasNoMatchPred = generatePredicate(sourceRowHasNoMatch) - val matchedPreds = matchedConditions.map(generatePredicate) - val matchedProjs = matchedOutputs.map(_.map(generateProjection)) - val notMatchedPreds = notMatchedConditions.map(generatePredicate) - val notMatchedProjs = notMatchedOutputs.map(_.map(generateProjection)) - val notMatchedBySourcePreds = notMatchedBySourceConditions.map(generatePredicate) - val notMatchedBySourceProjs = notMatchedBySourceOutputs.map(_.map(generateProjection)) - val noopCopyProj = generateProjection(noopCopyOutput) - val deleteRowProj = generateProjection(deleteRowOutput) - val outputProj = UnsafeProjection.create(outputRowEncoder.schema) - - // this is accessing ROW_DROPPED_COL. If ROW_DROPPED_COL is not in outputRowEncoder.schema - // then CDC must be disabled and it's the column after our output cols - def shouldDeleteRow(row: InternalRow): Boolean = { - row.getBoolean( - outputRowEncoder.schema.getFieldIndex(ROW_DROPPED_COL) - .getOrElse(outputRowEncoder.schema.fields.size) - ) - } - - def processRow(inputRow: InternalRow): Iterator[InternalRow] = { - // Identify which set of clauses to execute: matched, not-matched or not-matched-by-source - val (predicates, projections, noopAction) = if (targetRowHasNoMatchPred.eval(inputRow)) { - // Target row did not match any source row, so update the target row. - (notMatchedBySourcePreds, notMatchedBySourceProjs, noopCopyProj) - } else if (sourceRowHasNoMatchPred.eval(inputRow)) { - // Source row did not match with any target row, so insert the new source row - (notMatchedPreds, notMatchedProjs, deleteRowProj) - } else { - // Source row matched with target row, so update the target row - (matchedPreds, matchedProjs, noopCopyProj) - } - - // find (predicate, projection) pair whose predicate satisfies inputRow - val pair = (predicates zip projections).find { - case (predicate, _) => predicate.eval(inputRow) - } - - pair match { - case Some((_, projections)) => - projections.map(_.apply(inputRow)).iterator - case None => Iterator(noopAction.apply(inputRow)) - } - } - - val toRow = joinedRowEncoder.createSerializer() - val fromRow = outputRowEncoder.createDeserializer() - rowIterator - .map(toRow) - .flatMap(processRow) - .filter(!shouldDeleteRow(_)) - .map { notDeletedInternalRow => - fromRow(outputProj(notDeletedInternalRow)) - } - } - } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala index 610ccf1e90e..d354ac006cd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala @@ -26,10 +26,11 @@ import org.apache.spark.sql.delta.actions.{Action, AddFile, FileAction} import org.apache.spark.sql.delta.commands.merge.{MergeIntoMaterializeSource, MergeIntoMaterializeSourceReason, MergeStats} import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeFileIndex} import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -42,6 +43,7 @@ abstract class MergeIntoCommandBase extends LeafRunnableCommand with DeltaCommand with DeltaLogging with PredicateHelper + with ImplicitMetadataOperation with MergeIntoMaterializeSource { @transient val source: LogicalPlan @@ -53,6 +55,13 @@ abstract class MergeIntoCommandBase extends LeafRunnableCommand val notMatchedBySourceClauses: Seq[DeltaMergeIntoNotMatchedBySourceClause] val migratedSchema: Option[StructType] + override val (canMergeSchema, canOverwriteSchema) = { + // Delta options can't be passed to MERGE INTO currently, so they'll always be empty. + // The methods in options check if the auto migration flag is on, in which case schema evolution + // will be allowed. + val options = new DeltaOptions(Map.empty[String, String], conf) + (options.canMergeSchema, options.canOverwriteSchema) + } @transient protected lazy val sc: SparkContext = SparkContext.getOrCreate() @transient protected lazy val targetDeltaLog: DeltaLog = targetFileIndex.deltaLog @@ -80,6 +89,43 @@ abstract class MergeIntoCommandBase extends LeafRunnableCommand protected def isInsertOnly: Boolean = matchedClauses.isEmpty && notMatchedClauses.nonEmpty && notMatchedBySourceClauses.isEmpty + /** Whether this merge statement includes inserts statements. */ + protected def includesInserts: Boolean = notMatchedClauses.nonEmpty + + protected def isCdcEnabled(deltaTxn: OptimisticTransaction): Boolean = + DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata) + + protected def runMerge(spark: SparkSession): Seq[Row] + + override def run(spark: SparkSession): Seq[Row] = { + metrics("executionTimeMs").set(0) + metrics("scanTimeMs").set(0) + metrics("rewriteTimeMs").set(0) + if (migratedSchema.isDefined) { + // Block writes of void columns in the Delta log. Currently void columns are not properly + // supported and are dropped on read, but this is not enough for merge command that is also + // reading the schema from the Delta log. Until proper support we prefer to fail merge + // queries that add void columns. + val newNullColumn = SchemaUtils.findNullTypeColumn(migratedSchema.get) + if (newNullColumn.isDefined) { + throw new AnalysisException( + s"""Cannot add column '${newNullColumn.get}' with type 'void'. Please explicitly specify a + |non-void type.""".stripMargin.replaceAll("\n", " ") + ) + } + } + + val (materializeSource, _) = shouldMaterializeSource(spark, source, isInsertOnly) + if (!materializeSource) { + runMerge(spark) + } else { + // If it is determined that source should be materialized, wrap the execution with retries, + // in case the data of the materialized source is lost. + runWithMaterializedSourceLostRetries( + spark, targetFileIndex.deltaLog, metrics, runMerge) + } + } + import SQLMetrics._ override lazy val metrics: Map[String, SQLMetric] = baseMetrics @@ -178,6 +224,25 @@ abstract class MergeIntoCommandBase extends LeafRunnableCommand protected def shouldOptimizeMatchedOnlyMerge(spark: SparkSession): Boolean = { isMatchedOnly && spark.conf.get(DeltaSQLConf.MERGE_MATCHED_ONLY_ENABLED) } + + // There is only one when matched clause and it's a Delete and it does not have a condition. + protected val isOnlyOneUnconditionalDelete: Boolean = + matchedClauses == Seq(DeltaMergeIntoMatchedDeleteClause(None)) + + // We over-count numTargetRowsDeleted when there are multiple matches; + // this is the amount of the overcount, so we can subtract it to get a correct final metric. + protected var multipleMatchDeleteOnlyOvercount: Option[Long] = None + + // Throw error if multiple matches are ambiguous or cannot be computed correctly. + protected def throwErrorOnMultipleMatches( + hasMultipleMatches: Boolean, spark: SparkSession): Unit = { + // Multiple matches are not ambiguous when there is only one unconditional delete as + // all the matched row pairs in the 2nd join in `writeAllChanges` will get deleted. + if (hasMultipleMatches && !isOnlyOneUnconditionalDelete) { + throw DeltaErrors.multipleSourceRowMatchingTargetRowInMergeException(spark) + } + } + /** * Write the output data to files, repartitioning the output DataFrame by the partition columns * if table is partitioned and `merge.repartitionBeforeWrite.enabled` is set to true. @@ -374,9 +439,24 @@ abstract class MergeIntoCommandBase extends LeafRunnableCommand } object MergeIntoCommandBase { + val ROW_ID_COL = "_row_id_" + val FILE_NAME_COL = "_file_name_" + val SOURCE_ROW_PRESENT_COL = "_source_row_present_" + val TARGET_ROW_PRESENT_COL = "_target_row_present_" val ROW_DROPPED_COL = "_row_dropped_" val PRECOMPUTED_CONDITION_COL = "_condition_" + /** + * Spark UI will track all normal accumulators along with Spark tasks to show them on Web UI. + * However, the accumulator used by `MergeIntoCommand` can store a very large value since it + * tracks all files that need to be rewritten. We should ask Spark UI to not remember it, + * otherwise, the UI data may consume lots of memory. Hence, we use the prefix `internal.metrics.` + * to make this accumulator become an internal accumulator, so that it will not be tracked by + * Spark UI. + */ + val TOUCHED_FILES_ACCUM_NAME = "internal.metrics.MergeIntoDelta.touchedFiles" + + /** Count the number of distinct partition values among the AddFiles in the given set. */ def totalBytesAndDistinctPartitionValues(files: Seq[FileAction]): (Long, Int) = { val distinctValues = new mutable.HashSet[Map[String, String]]() diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala new file mode 100644 index 00000000000..7f80c3ddbb4 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala @@ -0,0 +1,359 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.commands.merge + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, FileAction} +import org.apache.spark.sql.delta.commands.MergeIntoCommandBase +import org.apache.spark.sql.delta.commands.cdc.CDCReader.{CDC_TYPE_COLUMN_NAME, CDC_TYPE_NOT_CDC} +import org.apache.spark.sql.delta.commands.merge.MergeOutputGeneration.{SOURCE_ROW_INDEX_COL, TARGET_ROW_INDEX_COL} +import org.apache.spark.sql.delta.util.SetAccumulator + +import org.apache.spark.sql.{Column, Dataset, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Literal, Or} +import org.apache.spark.sql.functions.{coalesce, col, count, input_file_name, lit, monotonically_increasing_id, sum} + +/** + * Trait with merge execution in two phases: + * + * Phase 1: Find the input files in target that are touched by the rows that satisfy + * the condition and verify that no two source rows match with the same target row. + * This is implemented as an inner-join using the given condition. See [[findTouchedFiles]] + * for more details. In the special case that there is no update clause we write all the non- + * matching source data as new files and skip phase 2. + * Issues an error message when the ON search_condition of the MERGE statement can match + * a single row from the target table with multiple rows of the source table-reference. + * + * Phase 2: Read the touched files again and write new files with updated and/or inserted rows. + * If there are updates, then use a outer join using the given condition to write the + * updates and inserts (see [[writeAllChanges()]]. If there are no matches for updates, + * only inserts, then write them directy (see [[writeInsertsOnlyWhenNoMatches()]]. + * + * See [[InsertOnlyMergeExecutor]] for the optimized executor used in case there are only inserts. + */ +trait ClassicMergeExecutor extends MergeIntoMaterializeSource with MergeOutputGeneration { + self: MergeIntoCommandBase => + import MergeIntoCommandBase._ + + /** + * Find the target table files that contain the rows that satisfy the merge condition. This is + * implemented as an inner-join between the source query/table and the target table using + * the merge condition. + */ + protected def findTouchedFiles( + spark: SparkSession, + deltaTxn: OptimisticTransaction + ): (Seq[AddFile], DeduplicateCDFDeletes) = recordMergeOperation( + extraOpType = "findTouchedFiles", + status = "MERGE operation - scanning files for matches", + sqlMetricName = "scanTimeMs") { + + val columnComparator = spark.sessionState.analyzer.resolver + + // Accumulator to collect all the distinct touched files + val touchedFilesAccum = new SetAccumulator[String]() + spark.sparkContext.register(touchedFilesAccum, TOUCHED_FILES_ACCUM_NAME) + + // UDFs to records touched files names and add them to the accumulator + val recordTouchedFileName = + DeltaUDF.intFromStringBoolean { (fileName, shouldRecord) => { + if (shouldRecord) { + touchedFilesAccum.add(fileName) + } + 1 + }}.asNondeterministic() + + // Prune non-matching files if we don't need to collect them for NOT MATCHED BY SOURCE clauses. + val dataSkippedFiles = + if (notMatchedBySourceClauses.isEmpty) { + deltaTxn.filterFiles(getTargetOnlyPredicates(spark)) + } else { + deltaTxn.filterFiles() + } + + // Join the source and target table using the merge condition to find touched files. An inner + // join collects all candidate files for MATCHED clauses, a right outer join also includes + // candidates for NOT MATCHED BY SOURCE clauses. + // In addition, we attach two columns + // - a monotonically increasing row id for target rows to later identify whether the same + // target row is modified by multiple user or not + // - the target file name the row is from to later identify the files touched by matched rows + val joinType = if (notMatchedBySourceClauses.isEmpty) "inner" else "right_outer" + + // When they are only MATCHED clauses, after the join we prune files that have no rows that + // satisfy any of the clause conditions. + val matchedPredicate = + if (isMatchedOnly) { + matchedClauses + .map(clause => clause.condition.getOrElse(Literal(true))) + .reduce((a, b) => Or(a, b)) + } else Literal(true) + + // Compute the columns needed for the inner join. + val targetColsNeeded = { + condition.references.map(_.name) ++ deltaTxn.snapshot.metadata.partitionColumns ++ + matchedPredicate.references.map(_.name) + } + + val columnsToDrop = deltaTxn.snapshot.metadata.schema.map(_.name) + .filterNot { field => + targetColsNeeded.exists { name => columnComparator(name, field) } + } + val incrSourceRowCountExpr = incrementMetricAndReturnBool("numSourceRows", true) + // We can't use filter() directly on the expression because that will prevent + // column pruning. We don't need the SOURCE_ROW_PRESENT_COL so we immediately drop it. + val sourceDF = getSourceDF() + .withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr)) + .filter(SOURCE_ROW_PRESENT_COL) + .drop(SOURCE_ROW_PRESENT_COL) + val targetPlan = + buildTargetPlanWithFiles( + spark, + deltaTxn, + dataSkippedFiles, + columnsToDrop) + val targetDF = Dataset.ofRows(spark, targetPlan) + .withColumn(ROW_ID_COL, monotonically_increasing_id()) + .withColumn(FILE_NAME_COL, input_file_name()) + + val joinToFindTouchedFiles = + sourceDF.join(targetDF, new Column(condition), joinType) + + // Process the matches from the inner join to record touched files and find multiple matches + val collectTouchedFiles = joinToFindTouchedFiles + .select(col(ROW_ID_COL), + recordTouchedFileName(col(FILE_NAME_COL), new Column(matchedPredicate)).as("one")) + + // Calculate frequency of matches per source row + val matchedRowCounts = collectTouchedFiles.groupBy(ROW_ID_COL).agg(sum("one").as("count")) + + // Get multiple matches and simultaneously collect (using touchedFilesAccum) the file names + import org.apache.spark.sql.delta.implicits._ + val (multipleMatchCount, multipleMatchSum) = matchedRowCounts + .filter("count > 1") + .select(coalesce(count(new Column("*")), lit(0)), coalesce(sum("count"), lit(0))) + .as[(Long, Long)] + .collect() + .head + + val hasMultipleMatches = multipleMatchCount > 0 + throwErrorOnMultipleMatches(hasMultipleMatches, spark) + if (hasMultipleMatches) { + // This is only allowed for delete-only queries. + // This query will count the duplicates for numTargetRowsDeleted in Job 2, + // because we count matches after the join and not just the target rows. + // We have to compensate for this by subtracting the duplicates later, + // so we need to record them here. + val duplicateCount = multipleMatchSum - multipleMatchCount + multipleMatchDeleteOnlyOvercount = Some(duplicateCount) + } + + // Get the AddFiles using the touched file names. + val touchedFileNames = touchedFilesAccum.value.iterator().asScala.toSeq + logTrace(s"findTouchedFiles: matched files:\n\t${touchedFileNames.mkString("\n\t")}") + + val nameToAddFileMap = generateCandidateFileMap(targetDeltaLog.dataPath, dataSkippedFiles) + val touchedAddFiles = touchedFileNames.map(f => + getTouchedFile(targetDeltaLog.dataPath, f, nameToAddFileMap)) + + if (metrics("numSourceRows").value == 0 && (dataSkippedFiles.isEmpty || + dataSkippedFiles.forall(_.numLogicalRecords.getOrElse(0) == 0))) { + // The target table is empty, and the optimizer optimized away the join entirely OR the + // source table is truly empty. In that case, scanning the source table once is the only + // way to get the correct metric. + val numSourceRows = sourceDF.count() + metrics("numSourceRows").set(numSourceRows) + } + + metrics("numTargetFilesBeforeSkipping") += deltaTxn.snapshot.numOfFiles + metrics("numTargetBytesBeforeSkipping") += deltaTxn.snapshot.sizeInBytes + val (afterSkippingBytes, afterSkippingPartitions) = + totalBytesAndDistinctPartitionValues(dataSkippedFiles) + metrics("numTargetFilesAfterSkipping") += dataSkippedFiles.size + metrics("numTargetBytesAfterSkipping") += afterSkippingBytes + metrics("numTargetPartitionsAfterSkipping") += afterSkippingPartitions + val (removedBytes, removedPartitions) = totalBytesAndDistinctPartitionValues(touchedAddFiles) + metrics("numTargetFilesRemoved") += touchedAddFiles.size + metrics("numTargetBytesRemoved") += removedBytes + metrics("numTargetPartitionsRemovedFrom") += removedPartitions + val dedupe = DeduplicateCDFDeletes( + hasMultipleMatches && isCdcEnabled(deltaTxn), + includesInserts) + (touchedAddFiles, dedupe) + } + + /** + * Write new files by reading the touched files and updating/inserting data using the source + * query/table. This is implemented using a full-outer-join using the merge condition. + * + * Note that unlike the insert-only code paths with just one control column ROW_DROPPED_COL, this + * method has a second control column CDC_TYPE_COL_NAME used for handling CDC when enabled. + */ + protected def writeAllChanges( + spark: SparkSession, + deltaTxn: OptimisticTransaction, + filesToRewrite: Seq[AddFile], + deduplicateCDFDeletes: DeduplicateCDFDeletes) + : Seq[FileAction] = recordMergeOperation( + extraOpType = + if (shouldOptimizeMatchedOnlyMerge(spark)) "writeAllUpdatesAndDeletes" + else "writeAllChanges", + status = s"MERGE operation - Rewriting ${filesToRewrite.size} files", + sqlMetricName = "rewriteTimeMs") { + + require(!deduplicateCDFDeletes.enabled || isCdcEnabled(deltaTxn), + "CDF delete duplication is enabled but overall the CDF generation is disabled") + + // Generate a new target dataframe that has same output attributes exprIds as the target plan. + // This allows us to apply the existing resolved update/insert expressions. + val targetPlan = buildTargetPlanWithFiles( + spark, + deltaTxn, + filesToRewrite, + columnsToDrop = Nil) + val baseTargetDF = Dataset.ofRows(spark, targetPlan) + val joinType = if (shouldOptimizeMatchedOnlyMerge(spark)) { + "rightOuter" + } else { + "fullOuter" + } + + logDebug(s"""writeAllChanges using $joinType join: + | source.output: ${source.outputSet} + | target.output: ${target.outputSet} + | condition: $condition + | newTarget.output: ${baseTargetDF.queryExecution.logical.outputSet} + """.stripMargin) + + // Expressions to update metrics + val incrSourceRowCountExpr = incrementMetricAndReturnBool("numSourceRowsInSecondScan", true) + val incrNoopCountExpr = incrementMetricAndReturnBool("numTargetRowsCopied", false) + + // Apply an outer join to find both, matches and non-matches. We are adding two boolean fields + // with value `true`, one to each side of the join. Whether this field is null or not after + // the outer join, will allow us to identify whether the resultant joined row was a + // matched inner result or an unmatched result with null on one side. + val joinedDF = { + val sourceDF = if (deduplicateCDFDeletes.enabled && deduplicateCDFDeletes.includesInserts) { + // Add row index for the source rows to identify inserted rows during the cdf deleted rows + // deduplication. See [[deduplicateCDFDeletes()]] + getSourceDF() + .withColumn(SOURCE_ROW_INDEX_COL, monotonically_increasing_id()) + } else { + getSourceDF() + } + val targetDF = baseTargetDF + .withColumn(TARGET_ROW_PRESENT_COL, lit(true)) + sourceDF + .withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr)) + .join( + if (deduplicateCDFDeletes.enabled) { + targetDF.withColumn(TARGET_ROW_INDEX_COL, monotonically_increasing_id()) + } else targetDF, + new Column(condition), joinType) + } + + // Precompute conditions in matched and not matched clauses and generate + // the joinedDF with precomputed columns and clauses with rewritten conditions. + val (joinedAndPrecomputedConditionsDF, clausesWithPrecompConditions) = + generatePrecomputedConditionsAndDF( + joinedDF, + clauses = matchedClauses ++ notMatchedClauses ++ notMatchedBySourceClauses) + + val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata) + + + // The target output columns need to be marked as nullable here, as they are going to be used + // to reference the output of an outer join. + val targetOutputCols = getTargetOutputCols(deltaTxn, makeNullable = true) + + // If there are N columns in the target table, the full outer join output will have: + // - N columns for target table + // - ROW_DROPPED_COL to define whether the generated row should dropped or written + // - if CDC is enabled, also CDC_TYPE_COLUMN_NAME containing the type of change being performed + // in a particular row + // (N+1 or N+2 or N+3 columns depending on CDC disabled / enabled and if Row IDs are preserved) + val outputColNames = + targetOutputCols.map(_.name) ++ + Seq(ROW_DROPPED_COL) ++ + (if (cdcEnabled) Seq(CDC_TYPE_COLUMN_NAME) else Seq()) + + // Copy expressions to copy the existing target row and not drop it (ROW_DROPPED_COL=false), + // and in case CDC is enabled, set it to CDC_TYPE_NOT_CDC. + // (N+1 or N+2 or N+3 columns depending on CDC disabled / enabled and if Row IDs are preserved) + val noopCopyExprs = + targetOutputCols ++ + Seq(incrNoopCountExpr) ++ + (if (cdcEnabled) Seq(CDC_TYPE_NOT_CDC) else Seq()) + + // Generate output columns. + val outputCols = generateWriteAllChangesOutputCols( + targetOutputCols, + outputColNames, + noopCopyExprs, + clausesWithPrecompConditions, + cdcEnabled + ) + + val outputDF = if (cdcEnabled) { + generateCdcAndOutputRows( + joinedAndPrecomputedConditionsDF, + outputCols, + outputColNames, + noopCopyExprs, + deduplicateCDFDeletes) + .filter(s"$ROW_DROPPED_COL = false") + .drop(ROW_DROPPED_COL) + } else { // change data capture is off, just output the normal data + // Apply the expressions on the join output. The filter ensures we only consider rows + // that are not dropped. And the drop ensures that the dropped flag does not leak out + // to the output. + joinedAndPrecomputedConditionsDF + .select(outputCols: _*) + .filter(s"$ROW_DROPPED_COL = false") + .drop(ROW_DROPPED_COL) + } + + logDebug("writeAllChanges: join output plan:\n" + outputDF.queryExecution) + + // Write to Delta + val newFiles = writeFiles(spark, deltaTxn, outputDF) + + // Update metrics + val (addedBytes, addedPartitions) = totalBytesAndDistinctPartitionValues(newFiles) + metrics("numTargetFilesAdded") += newFiles.count(_.isInstanceOf[AddFile]) + metrics("numTargetChangeFilesAdded") += newFiles.count(_.isInstanceOf[AddCDCFile]) + metrics("numTargetChangeFileBytes") += newFiles.collect{ case f: AddCDCFile => f.size }.sum + metrics("numTargetBytesAdded") += addedBytes + metrics("numTargetPartitionsAddedTo") += addedPartitions + if (multipleMatchDeleteOnlyOvercount.isDefined) { + // Compensate for counting duplicates during the query. + val actualRowsDeleted = + metrics("numTargetRowsDeleted").value - multipleMatchDeleteOnlyOvercount.get + assert(actualRowsDeleted >= 0) + metrics("numTargetRowsDeleted").set(actualRowsDeleted) + val actualRowsMatchedDeleted = + metrics("numTargetRowsMatchedDeleted").value - multipleMatchDeleteOnlyOvercount.get + assert(actualRowsMatchedDeleted >= 0) + metrics("numTargetRowsMatchedDeleted").set(actualRowsMatchedDeleted) + } + + newFiles + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeOutputGeneration.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeOutputGeneration.scala index 069a969dcf7..a0a467ff33a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeOutputGeneration.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeOutputGeneration.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.delta.commands.merge import scala.collection.mutable +import org.apache.spark.sql.delta.metric.IncrementMetric import org.apache.spark.sql.delta.commands.MergeIntoCommandBase +import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute @@ -31,7 +33,9 @@ import org.apache.spark.sql.functions._ * the output of the merge operation. */ trait MergeOutputGeneration { self: MergeIntoCommandBase => + import CDCReader._ import MergeIntoCommandBase._ + import MergeOutputGeneration._ /** * Precompute conditions in MATCHED and NOT MATCHED clauses and generate the source @@ -85,4 +89,438 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase => (sourceWithPrecompConditions, clausesWithPrecompConditions) } + /** + * Generate the expressions to process full-outer join output and generate target rows. + * + * To generate these N + 2 columns, we will be generate N + 2 expressions and apply them + * on the joinedDF. The CDC column will be either used for CDC generation or dropped before + * performing the final write, and the other column will always be dropped after executing the + * increment metric expression and filtering on ROW_DROPPED_COL. + */ + protected def generateWriteAllChangesOutputCols( + targetOutputCols: Seq[Expression], + outputColNames: Seq[String], + noopCopyExprs: Seq[Expression], + clausesWithPrecompConditions: Seq[DeltaMergeIntoClause], + cdcEnabled: Boolean, + shouldCountDeletedRows: Boolean = true) + : IndexedSeq[Column] = { + + val numOutputCols = outputColNames.size + + // ==== Generate N + 2 (N + 4 preserving Row Tracking) expressions for MATCHED clauses ==== + val processedMatchClauses: Seq[ProcessedClause] = generateAllActionExprs( + targetOutputCols, + clausesWithPrecompConditions.collect{ case c: DeltaMergeIntoMatchedClause => c }, + cdcEnabled, + shouldCountDeletedRows) + val matchedExprs: Seq[Expression] = generateClauseOutputExprs( + numOutputCols, + processedMatchClauses, + noopCopyExprs) + + // N + 1 (or N + 2 with CDC, N + 4 preserving Row Tracking and CDC) expressions to delete the + // unmatched source row when it should not be inserted. `target.output` will produce NULLs + // which will get deleted eventually. + val deleteSourceRowExprs = + (targetOutputCols ++ + Seq(Literal(true))) ++ + (if (cdcEnabled) Seq(CDC_TYPE_NOT_CDC) else Seq()) + + // ==== Generate N + 2 (N + 4 preserving Row Tracking) expressions for NOT MATCHED clause ==== + val processedNotMatchClauses: Seq[ProcessedClause] = generateAllActionExprs( + targetOutputCols, + clausesWithPrecompConditions.collect{ case c: DeltaMergeIntoNotMatchedClause => c }, + cdcEnabled, + shouldCountDeletedRows) + val notMatchedExprs: Seq[Expression] = generateClauseOutputExprs( + numOutputCols, + processedNotMatchClauses, + deleteSourceRowExprs) + + // === Generate N + 2 (N + 4 with Row Tracking) expressions for NOT MATCHED BY SOURCE clause === + val processedNotMatchBySourceClauses: Seq[ProcessedClause] = generateAllActionExprs( + targetOutputCols, + clausesWithPrecompConditions.collect{ case c: DeltaMergeIntoNotMatchedBySourceClause => c }, + cdcEnabled, + shouldCountDeletedRows) + val notMatchedBySourceExprs: Seq[Expression] = generateClauseOutputExprs( + numOutputCols, + processedNotMatchBySourceClauses, + noopCopyExprs) + + // ==== Generate N + 2 (N + 4 preserving Row Tracking) expressions that invokes the MATCHED, + // NOT MATCHED and NOT MATCHED BY SOURCE expressions ==== + // That is, conditionally invokes them based on whether there was a match in the outer join. + + // Predicates to check whether there was a match in the full outer join. + val ifSourceRowNull = col(SOURCE_ROW_PRESENT_COL).isNull.expr + val ifTargetRowNull = col(TARGET_ROW_PRESENT_COL).isNull.expr + + val outputCols = (0 until numOutputCols).map { i => + // Coupled with the clause conditions, the resultant possibly-nested CaseWhens can + // be the following for every i-th column. (In the case with single matched/not-matched + // clauses, instead of nested CaseWhens, there will be If/Else.) + // + // CASE WHEN (source row is null) + // CASE WHEN + // THEN + // WHEN + // THEN + // ... + // ELSE + // + // WHEN (target row is null) + // THEN + // CASE WHEN + // THEN + // WHEN + // THEN + // ... + // ELSE + // + // ELSE (both source and target row are not null) + // CASE WHEN + // THEN + // WHEN + // THEN + // ... + // ELSE + // + val caseWhen = CaseWhen(Seq( + ifSourceRowNull -> notMatchedBySourceExprs(i), + ifTargetRowNull -> notMatchedExprs(i)), + /* otherwise */ matchedExprs(i)) + new Column(Alias(caseWhen, outputColNames(i))()) + } + logDebug("writeAllChanges: join output expressions\n\t" + seqToString(outputCols.map(_.expr))) + outputCols + } + + /** + * Represents a merge clause after its condition and action expressions have been processed before + * generating the final output expression. + * @param condition Optional precomputed condition. + * @param actions List of output expressions generated from every action of the clause. + */ + protected case class ProcessedClause(condition: Option[Expression], actions: Seq[Expression]) + + /** + * Generate expressions for every output column and every merge clause based on the corresponding + * UPDATE, DELETE and/or INSERT action(s). + * @param targetOutputCols List of output column expressions from the target table. Used to + * generate CDC data for DELETE. + * @param clausesWithPrecompConditions List of merge clauses with precomputed conditions. Action + * expressions are generated for each of these clauses. + * @param cdcEnabled Whether the generated expressions should include CDC information. + * @param shouldCountDeletedRows Whether metrics for number of deleted rows should be incremented + * here. + * @return For each merge clause, a list of [[ProcessedClause] each with a precomputed + * condition and N+2 action expressions (N output columns + [[ROW_DROPPED_COL]] + + * [[CDC_TYPE_COLUMN_NAME]]) to apply on a row when that clause matches. + */ + protected def generateAllActionExprs( + targetOutputCols: Seq[Expression], + clausesWithPrecompConditions: Seq[DeltaMergeIntoClause], + cdcEnabled: Boolean, + shouldCountDeletedRows: Boolean) + : Seq[ProcessedClause] = clausesWithPrecompConditions.map (clause => { + val incrUpdatedCountExpr = + incrementMetricAndReturnBool(name = "numTargetRowsUpdated", valueToReturn = false) + val incrDeletedCountExpr = + incrementMetricAndReturnBool(name = "numTargetRowsDeleted", valueToReturn = true) + + val actions = clause match { + // Seq of up to N+3 expressions to generate output rows based on the UPDATE, DELETE and/or + // INSERT action(s) + case u: DeltaMergeIntoMatchedUpdateClause => + val incrCountExpr = + IncrementMetric(incrUpdatedCountExpr, metrics("numTargetRowsMatchedUpdated")) + // Generate update expressions and set ROW_DROPPED_COL = false + u.resolvedActions.map(_.expr) ++ + Seq(incrCountExpr) ++ + (if (cdcEnabled) Seq(Literal(CDC_TYPE_UPDATE_POSTIMAGE)) else Seq()) + case u: DeltaMergeIntoNotMatchedBySourceUpdateClause => + val incrCountExpr = + IncrementMetric(incrUpdatedCountExpr, metrics("numTargetRowsNotMatchedBySourceUpdated")) + // Generate update expressions and set ROW_DROPPED_COL = false + u.resolvedActions.map(_.expr) ++ + Seq(incrCountExpr) ++ + (if (cdcEnabled) Seq(Literal(CDC_TYPE_UPDATE_POSTIMAGE)) else Seq()) + case _: DeltaMergeIntoMatchedDeleteClause => + val incrCountExpr = + if (shouldCountDeletedRows) { + IncrementMetric(incrDeletedCountExpr, metrics("numTargetRowsMatchedDeleted")) + } else { + lit(true).expr + } + // Generate expressions to set the ROW_DROPPED_COL = true and mark as a DELETE + targetOutputCols ++ + Seq(incrCountExpr) ++ + (if (cdcEnabled) Seq(CDC_TYPE_DELETE) else Seq()) + case _: DeltaMergeIntoNotMatchedBySourceDeleteClause => + val incrCountExpr = + if (shouldCountDeletedRows) { + IncrementMetric(incrDeletedCountExpr, metrics("numTargetRowsNotMatchedBySourceDeleted")) + } else { + lit(true).expr + } + // Generate expressions to set the ROW_DROPPED_COL = true and mark as a DELETE + targetOutputCols ++ + Seq(incrCountExpr) ++ + (if (cdcEnabled) Seq(CDC_TYPE_DELETE) else Seq()) + case i: DeltaMergeIntoNotMatchedInsertClause => + val incrInsertedCountExpr = + incrementMetricAndReturnBool("numTargetRowsInserted", valueToReturn = false) + i.resolvedActions.map(_.expr) ++ + Seq(incrInsertedCountExpr) ++ + (if (cdcEnabled) Seq(Literal(CDC_TYPE_INSERT)) else Seq()) + } + ProcessedClause(clause.condition, actions) + }) + + /** + * Generate the output expression for each output column to apply the correct action for a type of + * merge clause. For each output column, the resulting expression dispatches the correct action + * based on all clause conditions. + * @param numOutputCols Number of output columns. + * @param clauses List of preprocessed merge clauses to bind together. + * @param noopExprs Default expression to apply when no condition holds. + * @return A list of one expression per output column to apply for a type of merge clause. + */ + protected def generateClauseOutputExprs( + numOutputCols: Integer, + clauses: Seq[ProcessedClause], + noopExprs: Seq[Expression]) + : Seq[Expression] = { + val clauseExprs = if (clauses.isEmpty) { + // Nothing to update or delete + noopExprs + } else { + + if (clauses.head.condition.isEmpty) { + // Only one clause without any condition, so the corresponding action expressions + // can be evaluated directly to generate the output columns. + clauses.head.actions + + } else if (clauses.length == 1) { + // Only one clause _with_ a condition, so generate IF/THEN instead of CASE WHEN. + // + // For the i-th output column, generate + // IF THEN + // ELSE + // + val condition = clauses.head.condition.get + clauses.head.actions.zip(noopExprs).map { case (a, noop) => If(condition, a, noop) } + + } else { + // There are multiple clauses. Use `CaseWhen` to conditionally evaluate the right + // action expressions to output columns + (0 until numOutputCols).map { i => + // For the i-th output column, generate + // CASE + // WHEN THEN + // WHEN THEN + // ... + // ELSE + // + val conditionalBranches = clauses.map(precomp => { + precomp.condition.getOrElse(Literal(true)) -> precomp.actions(i) + }) + CaseWhen(conditionalBranches, Some(noopExprs(i))) + } + } + } + assert(clauseExprs.size == numOutputCols, + s"incorrect # expressions:\n\t" + seqToString(clauseExprs)) + logDebug(s"writeAllChanges: expressions\n\t" + seqToString(clauseExprs)) + clauseExprs + } + + /** + * Build the full output as an array of packed rows, then explode into the final result. Based + * on the CDC type as originally marked, we produce both rows for the CDC_TYPE_NOT_CDC partition + * to be written to the main table and rows for the CDC partitions to be written as CDC files. + * + * See [[CDCReader]] for general details on how partitioning on the CDC type column works. + */ + protected def generateCdcAndOutputRows( + sourceDf: DataFrame, + outputCols: Seq[Column], + outputColNames: Seq[String], + noopCopyExprs: Seq[Expression], + deduplicateDeletes: DeduplicateCDFDeletes): DataFrame = { + import org.apache.spark.sql.delta.commands.cdc.CDCReader._ + // The main partition just needs to swap in the CDC_TYPE_NOT_CDC value. + val mainDataOutput = + outputCols.dropRight(1) :+ new Column(CDC_TYPE_NOT_CDC).as(CDC_TYPE_COLUMN_NAME) + + // Deleted rows are sent to the CDC partition instead of the main partition. These rows are + // marked as dropped, we need to retain them while incrementing the original metric column + // ourselves. + val keepRowAndIncrDeletedCountExpr = !outputCols(outputCols.length - 2) + val deleteCdcOutput = outputCols + .updated(outputCols.length - 2, keepRowAndIncrDeletedCountExpr.as(ROW_DROPPED_COL)) + + // Update preimages need special handling. This is conceptually the same as the + // transformation for cdcOutputCols, but we have to transform the noop exprs to columns + // ourselves because it hasn't already been done. + val cdcNoopExprs = noopCopyExprs.dropRight(2) :+ + Literal(false) :+ Literal(CDC_TYPE_UPDATE_PREIMAGE) + val updatePreimageCdcOutput = cdcNoopExprs.zipWithIndex.map { + case (e, i) => new Column(Alias(e, outputColNames(i))()) + } + + // To avoid duplicate evaluation of nondeterministic column values such as + // [[GenerateIdentityValues]], we EXPLODE CDC rows first, from which we EXPLODE again, + // and for each of "insert" and "update_postimage" rows, generate main data rows. + // The first EXPLODE will force evaluation all nondeterministic expressions, + // and the second EXPLODE will just copy the generated value from CDC rows + // to main data. By doing so we ensure nondeterministic column values in CDC and + // main data rows stay the same. + + val cdcTypeCol = outputCols.last + val cdcArray = new Column(CaseWhen(Seq( + EqualNullSafe(cdcTypeCol.expr, Literal(CDC_TYPE_INSERT)) -> array( + struct(outputCols: _*)).expr, + + EqualNullSafe(cdcTypeCol.expr, Literal(CDC_TYPE_UPDATE_POSTIMAGE)) -> array( + struct(updatePreimageCdcOutput: _*), + struct(outputCols: _*)).expr, + + EqualNullSafe(cdcTypeCol.expr, CDC_TYPE_DELETE) -> array( + struct(deleteCdcOutput: _*)).expr), + + // If none of the CDC cases apply (true for purely rewritten target rows, dropped source + // rows, etc.) just stick to the normal output. + array(struct(mainDataOutput: _*)).expr + )) + + val cdcToMainDataArray = new Column(If( + Or( + EqualNullSafe(col(s"packedCdc.$CDC_TYPE_COLUMN_NAME").expr, + Literal(CDC_TYPE_INSERT)), + EqualNullSafe(col(s"packedCdc.$CDC_TYPE_COLUMN_NAME").expr, + Literal(CDC_TYPE_UPDATE_POSTIMAGE))), + array( + col("packedCdc"), + struct(outputColNames.dropRight(1).map { n => col(s"packedCdc.`$n`") } :+ + new Column(CDC_TYPE_NOT_CDC).as(CDC_TYPE_COLUMN_NAME) + : _*) + ).expr, + array(col("packedCdc")).expr + )) + + if (deduplicateDeletes.enabled) { + deduplicateCDFDeletes( + deduplicateDeletes, + sourceDf, + cdcArray, + cdcToMainDataArray, + outputColNames) + } else { + packAndExplodeCDCOutput( + sourceDf, + cdcArray, + cdcToMainDataArray, + outputColNames, + dedupColumns = Nil) + } + } + + /** + * Applies the transformations to generate the CDC output: + * 1. Transform each input row into its corresponding array of CDC rows, e.g. an updated row + * yields: array(update_preimage, update_postimage). + * 2. Add the main data output for inserted/updated rows to the previously packed CDC data. + * 3. Explode the result to flatten the packed arrays. + * + * @param sourceDf The dataframe generated after processing the merge output. + * @param cdcArray Transforms the merge output into the corresponding packed CDC data that will be + * written to the CDC partition. + * @param cdcToMainDataArray Transforms the packed CDC data to add the main data output, i.e. rows + * that are inserted or updated and will be written to the main + * partition. + * @param outputColNames All the main and CDC columns to use in the output. + * @param dedupColumns Additional columns to add to enable deduplication. + */ + private def packAndExplodeCDCOutput( + sourceDf: DataFrame, + cdcArray: Column, + cdcToMainDataArray: Column, + outputColNames: Seq[String], + dedupColumns: Seq[Column]): DataFrame = { + val unpackedCols = outputColNames.map { name => + col(s"packedData.`$name`").as(name) + } + sourceDf + // `explode()` creates a [[Generator]] which can't handle non-deterministic expressions that + // we use to increment metric counters. We first project the CDC array so that the expressions + // are evaluated before we explode the array, + .select(cdcArray.as("projectedCDC") +: dedupColumns: _*) + .select(explode(col("projectedCDC")).as("packedCdc") +: dedupColumns: _*) + .select(explode(cdcToMainDataArray).as("packedData") +: dedupColumns: _*) + .select(unpackedCols++ dedupColumns: _*) + } + + /** + * This method deduplicates CDF deletes where a target row has potentially multiple matches. It + * assumes that the input dataframe contains the [[TARGET_ROW_INDEX_COL]] and + * to detect inserts the [[SOURCE_ROW_INDEX_COL]] column to track the origin of the row. + * + * All duplicates of deleted rows have the same [[TARGET_ROW_INDEX_COL]] and + * [[CDC_TYPE_COLUMN_NAME]] therefore we use both columns as compound deduplication key. + * In case the input data frame contains additional insert rows we leave them untouched by using + * the [[SOURCE_ROW_INDEX_COL]] to fill the null values of the [[TARGET_ROW_INDEX_COL]]. This + * may lead to duplicates as part of the final row index but this is not a problem since if + * an insert and a delete have the same [[TARGET_ROW_INDEX_COL]] they definitely have a + * different [[CDC_TYPE_COLUMN_NAME]]. + */ + private def deduplicateCDFDeletes( + deduplicateDeletes: DeduplicateCDFDeletes, + df: DataFrame, + cdcArray: Column, + cdcToMainDataArray: Column, + outputColNames: Seq[String]): DataFrame = { + val dedupColumns = (if (deduplicateDeletes.includesInserts) { + Seq(col(TARGET_ROW_INDEX_COL), col(SOURCE_ROW_INDEX_COL)) + } else { + Seq(col(TARGET_ROW_INDEX_COL)) + }) + + val cdcDf = packAndExplodeCDCOutput( + df, + cdcArray, + cdcToMainDataArray, + outputColNames, + dedupColumns + ) + + val cdcDfWithIncreasingIds = if (deduplicateDeletes.includesInserts) { + cdcDf.withColumn( + TARGET_ROW_INDEX_COL, + coalesce(col(TARGET_ROW_INDEX_COL), col(SOURCE_ROW_INDEX_COL))) + } else { + cdcDf + } + cdcDfWithIncreasingIds + .dropDuplicates(TARGET_ROW_INDEX_COL, CDC_TYPE_COLUMN_NAME) + .drop(TARGET_ROW_INDEX_COL, SOURCE_ROW_INDEX_COL) + } +} + +/** + * This class enables and configures the deduplication of CDF deletes in case the merge statement + * contains an unconditional delete statement that matches multiple target rows. + * + * @param enabled CDF generation should be enabled and duplicate target matches are detected + * @param includesInserts in addition to the unconditional deletes the merge also inserts rows + */ +case class DeduplicateCDFDeletes( + enabled: Boolean, + includesInserts: Boolean) + +object MergeOutputGeneration { + final val TARGET_ROW_INDEX_COL = "_target_row_index_" + final val SOURCE_ROW_INDEX_COL = "_source_row_index" } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoAccumulatorSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoAccumulatorSuite.scala index dcc1c0502d7..a2b7d915088 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoAccumulatorSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoAccumulatorSuite.scala @@ -20,7 +20,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConverters._ -import org.apache.spark.sql.delta.commands.MergeIntoCommand +import org.apache.spark.sql.delta.commands.MergeIntoCommandBase import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerNodeExcluded, SparkListenerTaskEnd} @@ -71,7 +71,7 @@ class MergeIntoAccumulatorSuite task.accumulatorUpdates.map(_.name) }.toSet // Verify accumulators used by MergeIntoCommand are not tracked. - assert(!accumNames.contains(MergeIntoCommand.TOUCHED_FILES_ACCUM_NAME)) + assert(!accumNames.contains(MergeIntoCommandBase.TOUCHED_FILES_ACCUM_NAME)) } finally { iter.close() }