From 9808453837fe53d961c6589509eab0bd31ba2f6d Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Fri, 23 Jun 2023 23:34:10 +0200 Subject: [PATCH] Improve generating and writing out changes in Merge This changes is part of a larger effort to improve merge performance, see https://github.com/delta-io/delta/issues/1827 ## Description This change rewrites the way modified data is written out in merge to improve performance. `writeAllChanges` now generates a dataframe containing all the updated and copied rows to write out by building a large expression that selectively applies the right merge action to each row. This replaces the previous method that relied on applying a function to individual rows. Changes: - Move `findTouchedFiles` and `writeAllchanges` to a dedicated new trait `ClassicMergeExecutor` implementing the regular merge path when `InsertOnlyMergeExecutor` is not used. - Introduce methods in `MergeOutputGeneration` to transform the merge clauses into expressions that can be applied to generate the output of the merge operation (both main data and CDC data). This change fully preserve the behavior of merge which is extensively tested in `MergeIntoSuiteBase`, `MergeIntoSQLSuite`, `MergeIntoScalaSuite`, `MergeCDCSuite`, `MergeIntoMetricsBase`, `MergeIntoNotMatchedBySourceSuite`. Closes delta-io/delta#1854 GitOrigin-RevId: d8c8a0e9439c6710978f2ec345cb94b2b9b19e0e --- .../sql/delta/commands/MergeIntoCommand.scala | 681 +----------------- .../delta/commands/MergeIntoCommandBase.scala | 82 ++- .../commands/merge/ClassicMergeExecutor.scala | 359 +++++++++ .../merge/MergeOutputGeneration.scala | 438 +++++++++++ .../sql/delta/MergeIntoAccumulatorSuite.scala | 4 +- 5 files changed, 886 insertions(+), 678 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala index 368d005fc7a..4743be6b0dc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala @@ -16,26 +16,15 @@ package org.apache.spark.sql.delta.commands -import scala.collection.JavaConverters._ - import org.apache.spark.sql.delta._ -import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, FileAction} -import org.apache.spark.sql.delta.commands.MergeIntoCommandBase.ROW_DROPPED_COL -import org.apache.spark.sql.delta.commands.merge.InsertOnlyMergeExecutor +import org.apache.spark.sql.delta.commands.merge.{ClassicMergeExecutor, InsertOnlyMergeExecutor} import org.apache.spark.sql.delta.files._ -import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils} import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.util.{AnalysisHelper, SetAccumulator} import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{DataTypes, LongType, StructType} +import org.apache.spark.sql.types.{LongType, StructType} /** * Performs a merge of a source query/table into a Delta table. @@ -47,7 +36,7 @@ import org.apache.spark.sql.types.{DataTypes, LongType, StructType} * * Phase 1: Find the input files in target that are touched by the rows that satisfy * the condition and verify that no two source rows match with the same target row. - * This is implemented as an inner-join using the given condition. See [[findTouchedFiles]] + * This is implemented as an inner-join using the given condition. See [[ClassicMergeExecutor]] * for more details. * * Phase 2: Read the touched files again and write new files with updated and/or inserted rows. @@ -73,17 +62,8 @@ case class MergeIntoCommand( notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause], notMatchedBySourceClauses: Seq[DeltaMergeIntoNotMatchedBySourceClause], migratedSchema: Option[StructType]) extends MergeIntoCommandBase - with AnalysisHelper with InsertOnlyMergeExecutor - with ImplicitMetadataOperation { - - import MergeIntoCommand._ - import MergeIntoCommandBase.totalBytesAndDistinctPartitionValues - - import org.apache.spark.sql.delta.commands.cdc.CDCReader._ - - override val canMergeSchema: Boolean = conf.getConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE) - override val canOverwriteSchema: Boolean = false + with ClassicMergeExecutor { override val output: Seq[Attribute] = Seq( AttributeReference("num_affected_rows", LongType)(), @@ -91,38 +71,6 @@ case class MergeIntoCommand( AttributeReference("num_deleted_rows", LongType)(), AttributeReference("num_inserted_rows", LongType)()) - // We over-count numTargetRowsDeleted when there are multiple matches; - // this is the amount of the overcount, so we can subtract it to get a correct final metric. - private var multipleMatchDeleteOnlyOvercount: Option[Long] = None - override def run(spark: SparkSession): Seq[Row] = { - metrics("executionTimeMs").set(0) - metrics("scanTimeMs").set(0) - metrics("rewriteTimeMs").set(0) - - if (migratedSchema.isDefined) { - // Block writes of void columns in the Delta log. Currently void columns are not properly - // supported and are dropped on read, but this is not enough for merge command that is also - // reading the schema from the Delta log. Until proper support we prefer to fail merge - // queries that add void columns. - val newNullColumn = SchemaUtils.findNullTypeColumn(migratedSchema.get) - if (newNullColumn.isDefined) { - throw new AnalysisException( - s"""Cannot add column '${newNullColumn.get}' with type 'void'. Please explicitly specify a - |non-void type.""".stripMargin.replaceAll("\n", " ") - ) - } - } - val (materializeSource, _) = shouldMaterializeSource(spark, source, isInsertOnly) - if (!materializeSource) { - runMerge(spark) - } else { - // If it is determined that source should be materialized, wrap the execution with retries, - // in case the data of the materialized source is lost. - runWithMaterializedSourceLostRetries( - spark, targetFileIndex.deltaLog, metrics, runMerge) - } - } - protected def runMerge(spark: SparkSession): Seq[Row] = { recordDeltaOperation(targetDeltaLog, "delta.dml.merge") { val startTime = System.nanoTime() @@ -160,10 +108,10 @@ case class MergeIntoCommand( writeOnlyInserts( spark, deltaTxn, filterMatchedRows = true, numSourceRowsMetric = "numSourceRows") } else { - val filesToRewrite = findTouchedFiles(spark, deltaTxn) + val (filesToRewrite, deduplicateCDFDeletes) = findTouchedFiles(spark, deltaTxn) if (filesToRewrite.nonEmpty) { val newWrittenFiles = withStatusCode("DELTA", "Writing merged data") { - writeAllChanges(spark, deltaTxn, filesToRewrite) + writeAllChanges(spark, deltaTxn, filesToRewrite, deduplicateCDFDeletes) } filesToRewrite.map(_.remove) ++ newWrittenFiles } else { @@ -193,621 +141,4 @@ case class MergeIntoCommand( metrics("numTargetRowsInserted").value, metrics("numTargetRowsUpdated").value, metrics("numTargetRowsDeleted").value, metrics("numTargetRowsInserted").value)) } - - /** - * Find the target table files that contain the rows that satisfy the merge condition. This is - * implemented as an inner-join between the source query/table and the target table using - * the merge condition. - */ - private def findTouchedFiles( - spark: SparkSession, - deltaTxn: OptimisticTransaction) - : Seq[AddFile] = recordMergeOperation( - extraOpType = "findTouchedFiles", - status = "MERGE operation - scanning files for matches", - sqlMetricName = "scanTimeMs") { - - val columnComparator = spark.sessionState.analyzer.resolver - - // Accumulator to collect all the distinct touched files - val touchedFilesAccum = new SetAccumulator[String]() - spark.sparkContext.register(touchedFilesAccum, TOUCHED_FILES_ACCUM_NAME) - - // UDFs to records touched files names and add them to the accumulator - val recordTouchedFileName = - DeltaUDF.intFromStringBoolean { (fileName, shouldRecord) => { - if (shouldRecord) { - touchedFilesAccum.add(fileName) - } - 1 - }}.asNondeterministic() - - // Prune non-matching files if we don't need to collect them for NOT MATCHED BY SOURCE clauses. - val dataSkippedFiles = - if (notMatchedBySourceClauses.isEmpty) { - deltaTxn.filterFiles(getTargetOnlyPredicates(spark)) - } else { - deltaTxn.filterFiles() - } - - val incrSourceRowCountExpr = incrementMetricAndReturnBool("numSourceRows", valueToReturn = true) - - // Join the source and target table using the merge condition to find touched files. An inner - // join collects all candidate files for MATCHED clauses, a right outer join also includes - // candidates for NOT MATCHED BY SOURCE clauses. - // In addition, we attach two columns - // - a monotonically increasing row id for target rows to later identify whether the same - // target row is modified by multiple user or not - // - the target file name the row is from to later identify the files touched by matched rows - val joinType = if (notMatchedBySourceClauses.isEmpty) "inner" else "right_outer" - - // When they are only MATCHED clauses, after the join we prune files that have no rows that - // satisfy any of the clause conditions. - val matchedPredicate = - if (isMatchedOnly) { - matchedClauses - .map(clause => clause.condition.getOrElse(Literal(true))) - .reduce((a, b) => Or(a, b)) - } else Literal(true) - - // Compute the columns needed for the inner join. - val targetColsNeeded = { - condition.references.map(_.name) ++ deltaTxn.snapshot.metadata.partitionColumns ++ - matchedPredicate.references.map(_.name) - } - - val columnsToDrop = deltaTxn.snapshot.metadata.schema.map(_.name) - .filterNot { field => - targetColsNeeded.exists { name => columnComparator(name, field) } - } - - // We can't use filter() directly on the expression because that will prevent - // column pruning. We don't need the SOURCE_ROW_PRESENT_COL so we immediately drop it. - val sourceDF = getSourceDF() - .withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr)) - .filter(SOURCE_ROW_PRESENT_COL) - .drop(SOURCE_ROW_PRESENT_COL) - val targetPlan = - buildTargetPlanWithFiles( - spark, - deltaTxn, - dataSkippedFiles, - columnsToDrop) - val targetDF = Dataset.ofRows(spark, targetPlan) - .withColumn(ROW_ID_COL, monotonically_increasing_id()) - .withColumn(FILE_NAME_COL, input_file_name()) - - val joinToFindTouchedFiles = sourceDF.join(targetDF, new Column(condition), joinType) - - // Process the matches from the inner join to record touched files and find multiple matches - val collectTouchedFiles = joinToFindTouchedFiles - .select(col(ROW_ID_COL), - recordTouchedFileName(col(FILE_NAME_COL), new Column(matchedPredicate)).as("one")) - - // Calculate frequency of matches per source row - val matchedRowCounts = collectTouchedFiles.groupBy(ROW_ID_COL).agg(sum("one").as("count")) - - // Get multiple matches and simultaneously collect (using touchedFilesAccum) the file names - // multipleMatchCount = # of target rows with more than 1 matching source row (duplicate match) - // multipleMatchSum = total # of duplicate matched rows - import org.apache.spark.sql.delta.implicits._ - val (multipleMatchCount, multipleMatchSum) = matchedRowCounts - .filter("count > 1") - .select(coalesce(count(new Column("*")), lit(0)), coalesce(sum("count"), lit(0))) - .as[(Long, Long)] - .collect() - .head - - val hasMultipleMatches = multipleMatchCount > 0 - - // Throw error if multiple matches are ambiguous or cannot be computed correctly. - val canBeComputedUnambiguously = { - // Multiple matches are not ambiguous when there is only one unconditional delete as - // all the matched row pairs in the 2nd join in `writeAllChanges` will get deleted. - val isUnconditionalDelete = matchedClauses.headOption match { - case Some(DeltaMergeIntoMatchedDeleteClause(None)) => true - case _ => false - } - matchedClauses.size == 1 && isUnconditionalDelete - } - - if (hasMultipleMatches && !canBeComputedUnambiguously) { - throw DeltaErrors.multipleSourceRowMatchingTargetRowInMergeException(spark) - } - - if (hasMultipleMatches) { - // This is only allowed for delete-only queries. - // This query will count the duplicates for numTargetRowsDeleted in Job 2, - // because we count matches after the join and not just the target rows. - // We have to compensate for this by subtracting the duplicates later, - // so we need to record them here. - val duplicateCount = multipleMatchSum - multipleMatchCount - multipleMatchDeleteOnlyOvercount = Some(duplicateCount) - } - - // Get the AddFiles using the touched file names. - val touchedFileNames = touchedFilesAccum.value.iterator().asScala.toSeq - logTrace(s"findTouchedFiles: matched files:\n\t${touchedFileNames.mkString("\n\t")}") - - val nameToAddFileMap = generateCandidateFileMap(targetDeltaLog.dataPath, dataSkippedFiles) - val touchedAddFiles = touchedFileNames.map(f => - getTouchedFile(targetDeltaLog.dataPath, f, nameToAddFileMap)) - - // When the target table is empty, and the optimizer optimized away the join entirely - // numSourceRows will be incorrectly 0. We need to scan the source table once to get the correct - // metric here. - if (metrics("numSourceRows").value == 0 && - (dataSkippedFiles.isEmpty || targetDF.take(1).isEmpty)) { - val numSourceRows = sourceDF.count() - metrics("numSourceRows").set(numSourceRows) - } - - // Update metrics - metrics("numTargetFilesBeforeSkipping") += deltaTxn.snapshot.numOfFiles - metrics("numTargetBytesBeforeSkipping") += deltaTxn.snapshot.sizeInBytes - val (afterSkippingBytes, afterSkippingPartitions) = - totalBytesAndDistinctPartitionValues(dataSkippedFiles) - metrics("numTargetFilesAfterSkipping") += dataSkippedFiles.size - metrics("numTargetBytesAfterSkipping") += afterSkippingBytes - metrics("numTargetPartitionsAfterSkipping") += afterSkippingPartitions - val (removedBytes, removedPartitions) = totalBytesAndDistinctPartitionValues(touchedAddFiles) - metrics("numTargetFilesRemoved") += touchedAddFiles.size - metrics("numTargetBytesRemoved") += removedBytes - metrics("numTargetPartitionsRemovedFrom") += removedPartitions - touchedAddFiles - } - - /** - * Write new files by reading the touched files and updating/inserting data using the source - * query/table. This is implemented using a full|right-outer-join using the merge condition. - * - * Note that unlike the insert-only code paths with just one control column INCR_ROW_COUNT_COL, - * this method has two additional control columns ROW_DROPPED_COL for dropping deleted rows and - * CDC_TYPE_COL_NAME used for handling CDC when enabled. - */ - private def writeAllChanges( - spark: SparkSession, - deltaTxn: OptimisticTransaction, - filesToRewrite: Seq[AddFile]) - : Seq[FileAction] = recordMergeOperation( - extraOpType = - if (shouldOptimizeMatchedOnlyMerge(spark)) "writeAllUpdatesAndDeletes" - else "writeAllChanges", - status = s"MERGE operation - Rewriting ${filesToRewrite.size} files", - sqlMetricName = "rewriteTimeMs") { - import org.apache.spark.sql.catalyst.expressions.Literal.{TrueLiteral, FalseLiteral} - - val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata) - - var targetOutputCols = getTargetOutputCols(deltaTxn, makeNullable = true) - var outputRowSchema = deltaTxn.metadata.schema - - // When we have duplicate matches (only allowed when the whenMatchedCondition is a delete with - // no match condition) we will incorrectly generate duplicate CDC rows. - // Duplicate matches can be due to: - // - Duplicate rows in the source w.r.t. the merge condition - // - A target-only or source-only merge condition, which essentially turns our join into a cross - // join with the target/source satisfiying the merge condition. - // These duplicate matches are dropped from the main data output since this is a delete - // operation, but the duplicate CDC rows are not removed by default. - // See https://github.com/delta-io/delta/issues/1274 - - // We address this specific scenario by adding row ids to the target before performing our join. - // There should only be one CDC delete row per target row so we can use these row ids to dedupe - // the duplicate CDC delete rows. - - // We also need to address the scenario when there are duplicate matches with delete and we - // insert duplicate rows. Here we need to additionally add row ids to the source before the - // join to avoid dropping these valid duplicate inserted rows and their corresponding cdc rows. - - // When there is an insert clause, we set SOURCE_ROW_ID_COL=null for all delete rows because we - // need to drop the duplicate matches. - val isDeleteWithDuplicateMatchesAndCdc = multipleMatchDeleteOnlyOvercount.nonEmpty && cdcEnabled - - // Generate a new target dataframe that has same output attributes exprIds as the target plan. - // This allows us to apply the existing resolved update/insert expressions. - val targetPlan = buildTargetPlanWithFiles( - spark, - deltaTxn, - filesToRewrite, - columnsToDrop = Nil) - val baseTargetDF = Dataset.ofRows(spark, targetPlan) - val joinType = if (shouldOptimizeMatchedOnlyMerge(spark)) { - "rightOuter" - } else { - "fullOuter" - } - - logDebug(s"""writeAllChanges using $joinType join: - | source.output: ${source.outputSet} - | target.output: ${target.outputSet} - | condition: $condition - | newTarget.output: ${baseTargetDF.queryExecution.logical.outputSet} - """.stripMargin) - - // Expressions to update metrics - val incrSourceRowCountExpr = - incrementMetricAndReturnBool("numSourceRowsInSecondScan", valueToReturn = true) - val incrUpdatedCountExpr = - incrementMetricAndReturnBool("numTargetRowsUpdated", valueToReturn = true) - val incrUpdatedMatchedCountExpr = - incrementMetricAndReturnBool("numTargetRowsMatchedUpdated", valueToReturn = true) - val incrUpdatedNotMatchedBySourceCountExpr = - incrementMetricAndReturnBool("numTargetRowsNotMatchedBySourceUpdated", valueToReturn = true) - val incrInsertedCountExpr = - incrementMetricAndReturnBool("numTargetRowsInserted", valueToReturn = true) - val incrNoopCountExpr = - incrementMetricAndReturnBool("numTargetRowsCopied", valueToReturn = true) - val incrDeletedCountExpr = - incrementMetricAndReturnBool("numTargetRowsDeleted", valueToReturn = true) - val incrDeletedMatchedCountExpr = - incrementMetricAndReturnBool("numTargetRowsMatchedDeleted", valueToReturn = true) - val incrDeletedNotMatchedBySourceCountExpr = - incrementMetricAndReturnBool("numTargetRowsNotMatchedBySourceDeleted", valueToReturn = true) - - // Apply an outer join to find both, matches and non-matches. We are adding two boolean fields - // with value `true`, one to each side of the join. Whether this field is null or not after - // the outer join, will allow us to identify whether the resultant joined row was a - // matched inner result or an unmatched result with null on one side. - // We add row IDs to the targetDF if we have a delete-when-matched clause with duplicate - // matches and CDC is enabled, and additionally add row IDs to the source if we also have an - // insert clause. See above at isDeleteWithDuplicateMatchesAndCdc definition for more details. - var sourceDF = getSourceDF() - .withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr)) - var targetDF = baseTargetDF - .withColumn(TARGET_ROW_PRESENT_COL, lit(true)) - if (isDeleteWithDuplicateMatchesAndCdc) { - targetDF = targetDF.withColumn(TARGET_ROW_ID_COL, monotonically_increasing_id()) - if (notMatchedClauses.nonEmpty) { // insert clause - sourceDF = sourceDF.withColumn(SOURCE_ROW_ID_COL, monotonically_increasing_id()) - } - } - val joinedDF = sourceDF.join(targetDF, new Column(condition), joinType) - val joinedPlan = joinedDF.queryExecution.analyzed - - def resolveOnJoinedPlan(exprs: Seq[Expression]): Seq[Expression] = { - tryResolveReferencesForExpressions(spark, exprs, joinedPlan) - } - - // ==== Generate the expressions to process full-outer join output and generate target rows ==== - // If there are N columns in the target table, there will be N + 3 columns after processing - // - N columns for target table - // - ROW_DROPPED_COL to define whether the generated row should dropped or written - // - INCR_ROW_COUNT_COL containing an expression to update the output row row counter - // - CDC_TYPE_COLUMN_NAME containing the type of change being performed in a particular row - - // To generate these N + 3 columns, we will generate N + 3 expressions and apply them to the - // rows in the joinedDF. The CDC column will be either used for CDC generation or dropped before - // performing the final write, and the other two will always be dropped after executing the - // metrics expressions and filtering on ROW_DROPPED_COL. - - // We produce rows for both the main table data (with CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC), - // and rows for the CDC data which will be output to CDCReader.CDC_LOCATION. - // See [[CDCReader]] for general details on how partitioning on the CDC type column works. - - // In the following functions `updateOutput`, `deleteOutput` and `insertOutput`, we - // produce a Seq[Expression] for each intended output row. - // Depending on the clause and whether CDC is enabled, we output between 0 and 3 rows, as a - // Seq[Seq[Expression]] - - // There is one corner case outlined above at isDeleteWithDuplicateMatchesAndCdc definition. - // When we have a delete-ONLY merge with duplicate matches we have N + 4 columns: - // N target cols, TARGET_ROW_ID_COL, ROW_DROPPED_COL, INCR_ROW_COUNT_COL, CDC_TYPE_COLUMN_NAME - // When we have a delete-when-matched merge with duplicate matches + an insert clause, we have - // N + 5 columns: - // N target cols, TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL, ROW_DROPPED_COL, INCR_ROW_COUNT_COL, - // CDC_TYPE_COLUMN_NAME - // These ROW_ID_COL will always be dropped before the final write. - - if (isDeleteWithDuplicateMatchesAndCdc) { - targetOutputCols = targetOutputCols :+ UnresolvedAttribute(TARGET_ROW_ID_COL) - outputRowSchema = outputRowSchema.add(TARGET_ROW_ID_COL, DataTypes.LongType) - if (notMatchedClauses.nonEmpty) { // there is an insert clause, make SRC_ROW_ID_COL=null - targetOutputCols = targetOutputCols :+ Alias(Literal(null), SOURCE_ROW_ID_COL)() - outputRowSchema = outputRowSchema.add(SOURCE_ROW_ID_COL, DataTypes.LongType) - } - } - - if (cdcEnabled) { - outputRowSchema = outputRowSchema - .add(ROW_DROPPED_COL, DataTypes.BooleanType) - .add(INCR_ROW_COUNT_COL, DataTypes.BooleanType) - .add(CDC_TYPE_COLUMN_NAME, DataTypes.StringType) - } - - def updateOutput(resolvedActions: Seq[DeltaMergeAction], incrMetricExpr: Expression) - : Seq[Seq[Expression]] = { - val updateExprs = { - // Generate update expressions and set ROW_DELETED_COL = false and - // CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC - val mainDataOutput = resolvedActions.map(_.expr) :+ FalseLiteral :+ - incrMetricExpr :+ CDC_TYPE_NOT_CDC - if (cdcEnabled) { - // For update preimage, we have do a no-op copy with ROW_DELETED_COL = false and - // CDC_TYPE_COLUMN_NAME = CDC_TYPE_UPDATE_PREIMAGE and INCR_ROW_COUNT_COL as a no-op - // (because the metric will be incremented in `mainDataOutput`) - val preImageOutput = targetOutputCols :+ FalseLiteral :+ TrueLiteral :+ - Literal(CDC_TYPE_UPDATE_PREIMAGE) - // For update postimage, we have the same expressions as for mainDataOutput but with - // INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in - // `mainDataOutput`), and CDC_TYPE_COLUMN_NAME = CDC_TYPE_UPDATE_POSTIMAGE - val postImageOutput = mainDataOutput.dropRight(2) :+ TrueLiteral :+ - Literal(CDC_TYPE_UPDATE_POSTIMAGE) - Seq(mainDataOutput, preImageOutput, postImageOutput) - } else { - Seq(mainDataOutput) - } - } - updateExprs.map(resolveOnJoinedPlan) - } - - def deleteOutput(incrMetricExpr: Expression): Seq[Seq[Expression]] = { - val deleteExprs = { - // Generate expressions to set the ROW_DELETED_COL = true and CDC_TYPE_COLUMN_NAME = - // CDC_TYPE_NOT_CDC - val mainDataOutput = targetOutputCols :+ TrueLiteral :+ incrMetricExpr :+ - CDC_TYPE_NOT_CDC - if (cdcEnabled) { - // For delete we do a no-op copy with ROW_DELETED_COL = false, INCR_ROW_COUNT_COL as a - // no-op (because the metric will be incremented in `mainDataOutput`) and - // CDC_TYPE_COLUMN_NAME = CDC_TYPE_DELETE - val deleteCdcOutput = targetOutputCols :+ FalseLiteral :+ TrueLiteral :+ CDC_TYPE_DELETE - Seq(mainDataOutput, deleteCdcOutput) - } else { - Seq(mainDataOutput) - } - } - deleteExprs.map(resolveOnJoinedPlan) - } - - def insertOutput(resolvedActions: Seq[DeltaMergeAction], incrMetricExpr: Expression) - : Seq[Seq[Expression]] = { - // Generate insert expressions and set ROW_DELETED_COL = false and - // CDC_TYPE_COLUMN_NAME = CDC_TYPE_NOT_CDC - val insertExprs = resolvedActions.map(_.expr) - val mainDataOutput = resolveOnJoinedPlan( - if (isDeleteWithDuplicateMatchesAndCdc) { - // Must be delete-when-matched merge with duplicate matches + insert clause - // Therefore we must keep the target row id and source row id. Since this is a not-matched - // clause we know the target row-id will be null. See above at - // isDeleteWithDuplicateMatchesAndCdc definition for more details. - insertExprs :+ - Alias(Literal(null), TARGET_ROW_ID_COL)() :+ UnresolvedAttribute(SOURCE_ROW_ID_COL) :+ - FalseLiteral :+ incrMetricExpr :+ CDC_TYPE_NOT_CDC - } else { - insertExprs :+ FalseLiteral :+ incrMetricExpr :+ CDC_TYPE_NOT_CDC - } - ) - if (cdcEnabled) { - // For insert we have the same expressions as for mainDataOutput, but with - // INCR_ROW_COUNT_COL as a no-op (because the metric will be incremented in - // `mainDataOutput`), and CDC_TYPE_COLUMN_NAME = CDC_TYPE_INSERT - val insertCdcOutput = mainDataOutput.dropRight(2) :+ TrueLiteral :+ Literal(CDC_TYPE_INSERT) - Seq(mainDataOutput, insertCdcOutput) - } else { - Seq(mainDataOutput) - } - } - - def clauseOutput(clause: DeltaMergeIntoClause): Seq[Seq[Expression]] = clause match { - case u: DeltaMergeIntoMatchedUpdateClause => - updateOutput(u.resolvedActions, And(incrUpdatedCountExpr, incrUpdatedMatchedCountExpr)) - case _: DeltaMergeIntoMatchedDeleteClause => - deleteOutput(And(incrDeletedCountExpr, incrDeletedMatchedCountExpr)) - case i: DeltaMergeIntoNotMatchedInsertClause => - insertOutput(i.resolvedActions, incrInsertedCountExpr) - case u: DeltaMergeIntoNotMatchedBySourceUpdateClause => - updateOutput( - u.resolvedActions, - And(incrUpdatedCountExpr, incrUpdatedNotMatchedBySourceCountExpr)) - case _: DeltaMergeIntoNotMatchedBySourceDeleteClause => - deleteOutput(And(incrDeletedCountExpr, incrDeletedNotMatchedBySourceCountExpr)) - } - - def clauseCondition(clause: DeltaMergeIntoClause): Expression = { - // if condition is None, then expression always evaluates to true - val condExpr = clause.condition.getOrElse(TrueLiteral) - resolveOnJoinedPlan(Seq(condExpr)).head - } - - val joinedRowEncoder = RowEncoder(joinedPlan.schema) - val outputRowEncoder = RowEncoder(outputRowSchema).resolveAndBind() - - val processor = new JoinedRowProcessor( - targetRowHasNoMatch = resolveOnJoinedPlan(Seq(col(SOURCE_ROW_PRESENT_COL).isNull.expr)).head, - sourceRowHasNoMatch = resolveOnJoinedPlan(Seq(col(TARGET_ROW_PRESENT_COL).isNull.expr)).head, - matchedConditions = matchedClauses.map(clauseCondition), - matchedOutputs = matchedClauses.map(clauseOutput), - notMatchedConditions = notMatchedClauses.map(clauseCondition), - notMatchedOutputs = notMatchedClauses.map(clauseOutput), - notMatchedBySourceConditions = notMatchedBySourceClauses.map(clauseCondition), - notMatchedBySourceOutputs = notMatchedBySourceClauses.map(clauseOutput), - noopCopyOutput = - resolveOnJoinedPlan(targetOutputCols :+ FalseLiteral :+ incrNoopCountExpr :+ - CDC_TYPE_NOT_CDC), - deleteRowOutput = - resolveOnJoinedPlan(targetOutputCols :+ TrueLiteral :+ TrueLiteral :+ CDC_TYPE_NOT_CDC), - joinedAttributes = joinedPlan.output, - joinedRowEncoder = joinedRowEncoder, - outputRowEncoder = outputRowEncoder) - - var outputDF = - Dataset.ofRows(spark, joinedPlan).mapPartitions(processor.processPartition)(outputRowEncoder) - - if (isDeleteWithDuplicateMatchesAndCdc) { - // When we have a delete when matched clause with duplicate matches we have to remove - // duplicate CDC rows. This scenario is further explained at - // isDeleteWithDuplicateMatchesAndCdc definition. - - // To remove duplicate CDC rows generated by the duplicate matches we dedupe by - // TARGET_ROW_ID_COL since there should only be one CDC delete row per target row. - // When there is an insert clause in addition to the delete clause we additionally dedupe by - // SOURCE_ROW_ID_COL and CDC_TYPE_COLUMN_NAME to avoid dropping valid duplicate inserted rows - // and their corresponding CDC rows. - val columnsToDedupeBy = if (notMatchedClauses.nonEmpty) { // insert clause - Seq(TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL, CDC_TYPE_COLUMN_NAME) - } else { - Seq(TARGET_ROW_ID_COL) - } - outputDF = outputDF - .dropDuplicates(columnsToDedupeBy) - .drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL, TARGET_ROW_ID_COL, SOURCE_ROW_ID_COL) - } else { - outputDF = outputDF.drop(ROW_DROPPED_COL, INCR_ROW_COUNT_COL) - } - - logDebug("writeAllChanges: join output plan:\n" + outputDF.queryExecution) - - // Write to Delta - val newFiles = writeFiles(spark, deltaTxn, outputDF) - - // Update metrics - val (addedBytes, addedPartitions) = totalBytesAndDistinctPartitionValues(newFiles) - metrics("numTargetFilesAdded") += newFiles.count(_.isInstanceOf[AddFile]) - metrics("numTargetChangeFilesAdded") += newFiles.count(_.isInstanceOf[AddCDCFile]) - metrics("numTargetChangeFileBytes") += newFiles.collect{ case f: AddCDCFile => f.size }.sum - metrics("numTargetBytesAdded") += addedBytes - metrics("numTargetPartitionsAddedTo") += addedPartitions - if (multipleMatchDeleteOnlyOvercount.isDefined) { - // Compensate for counting duplicates during the query. - val actualRowsDeleted = - metrics("numTargetRowsDeleted").value - multipleMatchDeleteOnlyOvercount.get - assert(actualRowsDeleted >= 0) - metrics("numTargetRowsDeleted").set(actualRowsDeleted) - val actualRowsMatchedDeleted = - metrics("numTargetRowsMatchedDeleted").value - multipleMatchDeleteOnlyOvercount.get - assert(actualRowsMatchedDeleted >= 0) - metrics("numTargetRowsMatchedDeleted").set(actualRowsMatchedDeleted) - } - - newFiles - } -} - -object MergeIntoCommand { - /** - * Spark UI will track all normal accumulators along with Spark tasks to show them on Web UI. - * However, the accumulator used by `MergeIntoCommand` can store a very large value since it - * tracks all files that need to be rewritten. We should ask Spark UI to not remember it, - * otherwise, the UI data may consume lots of memory. Hence, we use the prefix `internal.metrics.` - * to make this accumulator become an internal accumulator, so that it will not be tracked by - * Spark UI. - */ - val TOUCHED_FILES_ACCUM_NAME = "internal.metrics.MergeIntoDelta.touchedFiles" - - val ROW_ID_COL = "_row_id_" - val TARGET_ROW_ID_COL = "_target_row_id_" - val SOURCE_ROW_ID_COL = "_source_row_id_" - val FILE_NAME_COL = "_file_name_" - val SOURCE_ROW_PRESENT_COL = "_source_row_present_" - val TARGET_ROW_PRESENT_COL = "_target_row_present_" - val INCR_ROW_COUNT_COL = "_incr_row_count_" - - /** - * @param targetRowHasNoMatch whether a joined row is a target row with no match in the source - * table - * @param sourceRowHasNoMatch whether a joined row is a source row with no match in the target - * table - * @param matchedConditions condition for each match clause - * @param matchedOutputs corresponding output for each match clause. for each clause, we - * have 1-3 output rows, each of which is a sequence of expressions - * to apply to the joined row - * @param notMatchedConditions condition for each not-matched clause - * @param notMatchedOutputs corresponding output for each not-matched clause. for each clause, - * we have 1-2 output rows, each of which is a sequence of - * expressions to apply to the joined row - * @param notMatchedBySourceConditions condition for each not-matched-by-source clause - * @param notMatchedBySourceOutputs corresponding output for each not-matched-by-source - * clause. for each clause, we have 1-3 output rows, each of - * which is a sequence of expressions to apply to the joined - * row - * @param noopCopyOutput no-op expression to copy a target row to the output - * @param deleteRowOutput expression to drop a row from the final output. this is used for - * source rows that don't match any not-matched clauses - * @param joinedAttributes schema of our outer-joined dataframe - * @param joinedRowEncoder joinedDF row encoder - * @param outputRowEncoder final output row encoder - */ - class JoinedRowProcessor( - targetRowHasNoMatch: Expression, - sourceRowHasNoMatch: Expression, - matchedConditions: Seq[Expression], - matchedOutputs: Seq[Seq[Seq[Expression]]], - notMatchedConditions: Seq[Expression], - notMatchedOutputs: Seq[Seq[Seq[Expression]]], - notMatchedBySourceConditions: Seq[Expression], - notMatchedBySourceOutputs: Seq[Seq[Seq[Expression]]], - noopCopyOutput: Seq[Expression], - deleteRowOutput: Seq[Expression], - joinedAttributes: Seq[Attribute], - joinedRowEncoder: ExpressionEncoder[Row], - outputRowEncoder: ExpressionEncoder[Row]) extends Serializable { - - private def generateProjection(exprs: Seq[Expression]): UnsafeProjection = { - UnsafeProjection.create(exprs, joinedAttributes) - } - - private def generatePredicate(expr: Expression): BasePredicate = { - GeneratePredicate.generate(expr, joinedAttributes) - } - - def processPartition(rowIterator: Iterator[Row]): Iterator[Row] = { - - val targetRowHasNoMatchPred = generatePredicate(targetRowHasNoMatch) - val sourceRowHasNoMatchPred = generatePredicate(sourceRowHasNoMatch) - val matchedPreds = matchedConditions.map(generatePredicate) - val matchedProjs = matchedOutputs.map(_.map(generateProjection)) - val notMatchedPreds = notMatchedConditions.map(generatePredicate) - val notMatchedProjs = notMatchedOutputs.map(_.map(generateProjection)) - val notMatchedBySourcePreds = notMatchedBySourceConditions.map(generatePredicate) - val notMatchedBySourceProjs = notMatchedBySourceOutputs.map(_.map(generateProjection)) - val noopCopyProj = generateProjection(noopCopyOutput) - val deleteRowProj = generateProjection(deleteRowOutput) - val outputProj = UnsafeProjection.create(outputRowEncoder.schema) - - // this is accessing ROW_DROPPED_COL. If ROW_DROPPED_COL is not in outputRowEncoder.schema - // then CDC must be disabled and it's the column after our output cols - def shouldDeleteRow(row: InternalRow): Boolean = { - row.getBoolean( - outputRowEncoder.schema.getFieldIndex(ROW_DROPPED_COL) - .getOrElse(outputRowEncoder.schema.fields.size) - ) - } - - def processRow(inputRow: InternalRow): Iterator[InternalRow] = { - // Identify which set of clauses to execute: matched, not-matched or not-matched-by-source - val (predicates, projections, noopAction) = if (targetRowHasNoMatchPred.eval(inputRow)) { - // Target row did not match any source row, so update the target row. - (notMatchedBySourcePreds, notMatchedBySourceProjs, noopCopyProj) - } else if (sourceRowHasNoMatchPred.eval(inputRow)) { - // Source row did not match with any target row, so insert the new source row - (notMatchedPreds, notMatchedProjs, deleteRowProj) - } else { - // Source row matched with target row, so update the target row - (matchedPreds, matchedProjs, noopCopyProj) - } - - // find (predicate, projection) pair whose predicate satisfies inputRow - val pair = (predicates zip projections).find { - case (predicate, _) => predicate.eval(inputRow) - } - - pair match { - case Some((_, projections)) => - projections.map(_.apply(inputRow)).iterator - case None => Iterator(noopAction.apply(inputRow)) - } - } - - val toRow = joinedRowEncoder.createSerializer() - val fromRow = outputRowEncoder.createDeserializer() - rowIterator - .map(toRow) - .flatMap(processRow) - .filter(!shouldDeleteRow(_)) - .map { notDeletedInternalRow => - fromRow(outputProj(notDeletedInternalRow)) - } - } - } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala index 610ccf1e90e..d354ac006cd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala @@ -26,10 +26,11 @@ import org.apache.spark.sql.delta.actions.{Action, AddFile, FileAction} import org.apache.spark.sql.delta.commands.merge.{MergeIntoMaterializeSource, MergeIntoMaterializeSourceReason, MergeStats} import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeFileIndex} import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -42,6 +43,7 @@ abstract class MergeIntoCommandBase extends LeafRunnableCommand with DeltaCommand with DeltaLogging with PredicateHelper + with ImplicitMetadataOperation with MergeIntoMaterializeSource { @transient val source: LogicalPlan @@ -53,6 +55,13 @@ abstract class MergeIntoCommandBase extends LeafRunnableCommand val notMatchedBySourceClauses: Seq[DeltaMergeIntoNotMatchedBySourceClause] val migratedSchema: Option[StructType] + override val (canMergeSchema, canOverwriteSchema) = { + // Delta options can't be passed to MERGE INTO currently, so they'll always be empty. + // The methods in options check if the auto migration flag is on, in which case schema evolution + // will be allowed. + val options = new DeltaOptions(Map.empty[String, String], conf) + (options.canMergeSchema, options.canOverwriteSchema) + } @transient protected lazy val sc: SparkContext = SparkContext.getOrCreate() @transient protected lazy val targetDeltaLog: DeltaLog = targetFileIndex.deltaLog @@ -80,6 +89,43 @@ abstract class MergeIntoCommandBase extends LeafRunnableCommand protected def isInsertOnly: Boolean = matchedClauses.isEmpty && notMatchedClauses.nonEmpty && notMatchedBySourceClauses.isEmpty + /** Whether this merge statement includes inserts statements. */ + protected def includesInserts: Boolean = notMatchedClauses.nonEmpty + + protected def isCdcEnabled(deltaTxn: OptimisticTransaction): Boolean = + DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata) + + protected def runMerge(spark: SparkSession): Seq[Row] + + override def run(spark: SparkSession): Seq[Row] = { + metrics("executionTimeMs").set(0) + metrics("scanTimeMs").set(0) + metrics("rewriteTimeMs").set(0) + if (migratedSchema.isDefined) { + // Block writes of void columns in the Delta log. Currently void columns are not properly + // supported and are dropped on read, but this is not enough for merge command that is also + // reading the schema from the Delta log. Until proper support we prefer to fail merge + // queries that add void columns. + val newNullColumn = SchemaUtils.findNullTypeColumn(migratedSchema.get) + if (newNullColumn.isDefined) { + throw new AnalysisException( + s"""Cannot add column '${newNullColumn.get}' with type 'void'. Please explicitly specify a + |non-void type.""".stripMargin.replaceAll("\n", " ") + ) + } + } + + val (materializeSource, _) = shouldMaterializeSource(spark, source, isInsertOnly) + if (!materializeSource) { + runMerge(spark) + } else { + // If it is determined that source should be materialized, wrap the execution with retries, + // in case the data of the materialized source is lost. + runWithMaterializedSourceLostRetries( + spark, targetFileIndex.deltaLog, metrics, runMerge) + } + } + import SQLMetrics._ override lazy val metrics: Map[String, SQLMetric] = baseMetrics @@ -178,6 +224,25 @@ abstract class MergeIntoCommandBase extends LeafRunnableCommand protected def shouldOptimizeMatchedOnlyMerge(spark: SparkSession): Boolean = { isMatchedOnly && spark.conf.get(DeltaSQLConf.MERGE_MATCHED_ONLY_ENABLED) } + + // There is only one when matched clause and it's a Delete and it does not have a condition. + protected val isOnlyOneUnconditionalDelete: Boolean = + matchedClauses == Seq(DeltaMergeIntoMatchedDeleteClause(None)) + + // We over-count numTargetRowsDeleted when there are multiple matches; + // this is the amount of the overcount, so we can subtract it to get a correct final metric. + protected var multipleMatchDeleteOnlyOvercount: Option[Long] = None + + // Throw error if multiple matches are ambiguous or cannot be computed correctly. + protected def throwErrorOnMultipleMatches( + hasMultipleMatches: Boolean, spark: SparkSession): Unit = { + // Multiple matches are not ambiguous when there is only one unconditional delete as + // all the matched row pairs in the 2nd join in `writeAllChanges` will get deleted. + if (hasMultipleMatches && !isOnlyOneUnconditionalDelete) { + throw DeltaErrors.multipleSourceRowMatchingTargetRowInMergeException(spark) + } + } + /** * Write the output data to files, repartitioning the output DataFrame by the partition columns * if table is partitioned and `merge.repartitionBeforeWrite.enabled` is set to true. @@ -374,9 +439,24 @@ abstract class MergeIntoCommandBase extends LeafRunnableCommand } object MergeIntoCommandBase { + val ROW_ID_COL = "_row_id_" + val FILE_NAME_COL = "_file_name_" + val SOURCE_ROW_PRESENT_COL = "_source_row_present_" + val TARGET_ROW_PRESENT_COL = "_target_row_present_" val ROW_DROPPED_COL = "_row_dropped_" val PRECOMPUTED_CONDITION_COL = "_condition_" + /** + * Spark UI will track all normal accumulators along with Spark tasks to show them on Web UI. + * However, the accumulator used by `MergeIntoCommand` can store a very large value since it + * tracks all files that need to be rewritten. We should ask Spark UI to not remember it, + * otherwise, the UI data may consume lots of memory. Hence, we use the prefix `internal.metrics.` + * to make this accumulator become an internal accumulator, so that it will not be tracked by + * Spark UI. + */ + val TOUCHED_FILES_ACCUM_NAME = "internal.metrics.MergeIntoDelta.touchedFiles" + + /** Count the number of distinct partition values among the AddFiles in the given set. */ def totalBytesAndDistinctPartitionValues(files: Seq[FileAction]): (Long, Int) = { val distinctValues = new mutable.HashSet[Map[String, String]]() diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala new file mode 100644 index 00000000000..7f80c3ddbb4 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala @@ -0,0 +1,359 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.commands.merge + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, FileAction} +import org.apache.spark.sql.delta.commands.MergeIntoCommandBase +import org.apache.spark.sql.delta.commands.cdc.CDCReader.{CDC_TYPE_COLUMN_NAME, CDC_TYPE_NOT_CDC} +import org.apache.spark.sql.delta.commands.merge.MergeOutputGeneration.{SOURCE_ROW_INDEX_COL, TARGET_ROW_INDEX_COL} +import org.apache.spark.sql.delta.util.SetAccumulator + +import org.apache.spark.sql.{Column, Dataset, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Literal, Or} +import org.apache.spark.sql.functions.{coalesce, col, count, input_file_name, lit, monotonically_increasing_id, sum} + +/** + * Trait with merge execution in two phases: + * + * Phase 1: Find the input files in target that are touched by the rows that satisfy + * the condition and verify that no two source rows match with the same target row. + * This is implemented as an inner-join using the given condition. See [[findTouchedFiles]] + * for more details. In the special case that there is no update clause we write all the non- + * matching source data as new files and skip phase 2. + * Issues an error message when the ON search_condition of the MERGE statement can match + * a single row from the target table with multiple rows of the source table-reference. + * + * Phase 2: Read the touched files again and write new files with updated and/or inserted rows. + * If there are updates, then use a outer join using the given condition to write the + * updates and inserts (see [[writeAllChanges()]]. If there are no matches for updates, + * only inserts, then write them directy (see [[writeInsertsOnlyWhenNoMatches()]]. + * + * See [[InsertOnlyMergeExecutor]] for the optimized executor used in case there are only inserts. + */ +trait ClassicMergeExecutor extends MergeIntoMaterializeSource with MergeOutputGeneration { + self: MergeIntoCommandBase => + import MergeIntoCommandBase._ + + /** + * Find the target table files that contain the rows that satisfy the merge condition. This is + * implemented as an inner-join between the source query/table and the target table using + * the merge condition. + */ + protected def findTouchedFiles( + spark: SparkSession, + deltaTxn: OptimisticTransaction + ): (Seq[AddFile], DeduplicateCDFDeletes) = recordMergeOperation( + extraOpType = "findTouchedFiles", + status = "MERGE operation - scanning files for matches", + sqlMetricName = "scanTimeMs") { + + val columnComparator = spark.sessionState.analyzer.resolver + + // Accumulator to collect all the distinct touched files + val touchedFilesAccum = new SetAccumulator[String]() + spark.sparkContext.register(touchedFilesAccum, TOUCHED_FILES_ACCUM_NAME) + + // UDFs to records touched files names and add them to the accumulator + val recordTouchedFileName = + DeltaUDF.intFromStringBoolean { (fileName, shouldRecord) => { + if (shouldRecord) { + touchedFilesAccum.add(fileName) + } + 1 + }}.asNondeterministic() + + // Prune non-matching files if we don't need to collect them for NOT MATCHED BY SOURCE clauses. + val dataSkippedFiles = + if (notMatchedBySourceClauses.isEmpty) { + deltaTxn.filterFiles(getTargetOnlyPredicates(spark)) + } else { + deltaTxn.filterFiles() + } + + // Join the source and target table using the merge condition to find touched files. An inner + // join collects all candidate files for MATCHED clauses, a right outer join also includes + // candidates for NOT MATCHED BY SOURCE clauses. + // In addition, we attach two columns + // - a monotonically increasing row id for target rows to later identify whether the same + // target row is modified by multiple user or not + // - the target file name the row is from to later identify the files touched by matched rows + val joinType = if (notMatchedBySourceClauses.isEmpty) "inner" else "right_outer" + + // When they are only MATCHED clauses, after the join we prune files that have no rows that + // satisfy any of the clause conditions. + val matchedPredicate = + if (isMatchedOnly) { + matchedClauses + .map(clause => clause.condition.getOrElse(Literal(true))) + .reduce((a, b) => Or(a, b)) + } else Literal(true) + + // Compute the columns needed for the inner join. + val targetColsNeeded = { + condition.references.map(_.name) ++ deltaTxn.snapshot.metadata.partitionColumns ++ + matchedPredicate.references.map(_.name) + } + + val columnsToDrop = deltaTxn.snapshot.metadata.schema.map(_.name) + .filterNot { field => + targetColsNeeded.exists { name => columnComparator(name, field) } + } + val incrSourceRowCountExpr = incrementMetricAndReturnBool("numSourceRows", true) + // We can't use filter() directly on the expression because that will prevent + // column pruning. We don't need the SOURCE_ROW_PRESENT_COL so we immediately drop it. + val sourceDF = getSourceDF() + .withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr)) + .filter(SOURCE_ROW_PRESENT_COL) + .drop(SOURCE_ROW_PRESENT_COL) + val targetPlan = + buildTargetPlanWithFiles( + spark, + deltaTxn, + dataSkippedFiles, + columnsToDrop) + val targetDF = Dataset.ofRows(spark, targetPlan) + .withColumn(ROW_ID_COL, monotonically_increasing_id()) + .withColumn(FILE_NAME_COL, input_file_name()) + + val joinToFindTouchedFiles = + sourceDF.join(targetDF, new Column(condition), joinType) + + // Process the matches from the inner join to record touched files and find multiple matches + val collectTouchedFiles = joinToFindTouchedFiles + .select(col(ROW_ID_COL), + recordTouchedFileName(col(FILE_NAME_COL), new Column(matchedPredicate)).as("one")) + + // Calculate frequency of matches per source row + val matchedRowCounts = collectTouchedFiles.groupBy(ROW_ID_COL).agg(sum("one").as("count")) + + // Get multiple matches and simultaneously collect (using touchedFilesAccum) the file names + import org.apache.spark.sql.delta.implicits._ + val (multipleMatchCount, multipleMatchSum) = matchedRowCounts + .filter("count > 1") + .select(coalesce(count(new Column("*")), lit(0)), coalesce(sum("count"), lit(0))) + .as[(Long, Long)] + .collect() + .head + + val hasMultipleMatches = multipleMatchCount > 0 + throwErrorOnMultipleMatches(hasMultipleMatches, spark) + if (hasMultipleMatches) { + // This is only allowed for delete-only queries. + // This query will count the duplicates for numTargetRowsDeleted in Job 2, + // because we count matches after the join and not just the target rows. + // We have to compensate for this by subtracting the duplicates later, + // so we need to record them here. + val duplicateCount = multipleMatchSum - multipleMatchCount + multipleMatchDeleteOnlyOvercount = Some(duplicateCount) + } + + // Get the AddFiles using the touched file names. + val touchedFileNames = touchedFilesAccum.value.iterator().asScala.toSeq + logTrace(s"findTouchedFiles: matched files:\n\t${touchedFileNames.mkString("\n\t")}") + + val nameToAddFileMap = generateCandidateFileMap(targetDeltaLog.dataPath, dataSkippedFiles) + val touchedAddFiles = touchedFileNames.map(f => + getTouchedFile(targetDeltaLog.dataPath, f, nameToAddFileMap)) + + if (metrics("numSourceRows").value == 0 && (dataSkippedFiles.isEmpty || + dataSkippedFiles.forall(_.numLogicalRecords.getOrElse(0) == 0))) { + // The target table is empty, and the optimizer optimized away the join entirely OR the + // source table is truly empty. In that case, scanning the source table once is the only + // way to get the correct metric. + val numSourceRows = sourceDF.count() + metrics("numSourceRows").set(numSourceRows) + } + + metrics("numTargetFilesBeforeSkipping") += deltaTxn.snapshot.numOfFiles + metrics("numTargetBytesBeforeSkipping") += deltaTxn.snapshot.sizeInBytes + val (afterSkippingBytes, afterSkippingPartitions) = + totalBytesAndDistinctPartitionValues(dataSkippedFiles) + metrics("numTargetFilesAfterSkipping") += dataSkippedFiles.size + metrics("numTargetBytesAfterSkipping") += afterSkippingBytes + metrics("numTargetPartitionsAfterSkipping") += afterSkippingPartitions + val (removedBytes, removedPartitions) = totalBytesAndDistinctPartitionValues(touchedAddFiles) + metrics("numTargetFilesRemoved") += touchedAddFiles.size + metrics("numTargetBytesRemoved") += removedBytes + metrics("numTargetPartitionsRemovedFrom") += removedPartitions + val dedupe = DeduplicateCDFDeletes( + hasMultipleMatches && isCdcEnabled(deltaTxn), + includesInserts) + (touchedAddFiles, dedupe) + } + + /** + * Write new files by reading the touched files and updating/inserting data using the source + * query/table. This is implemented using a full-outer-join using the merge condition. + * + * Note that unlike the insert-only code paths with just one control column ROW_DROPPED_COL, this + * method has a second control column CDC_TYPE_COL_NAME used for handling CDC when enabled. + */ + protected def writeAllChanges( + spark: SparkSession, + deltaTxn: OptimisticTransaction, + filesToRewrite: Seq[AddFile], + deduplicateCDFDeletes: DeduplicateCDFDeletes) + : Seq[FileAction] = recordMergeOperation( + extraOpType = + if (shouldOptimizeMatchedOnlyMerge(spark)) "writeAllUpdatesAndDeletes" + else "writeAllChanges", + status = s"MERGE operation - Rewriting ${filesToRewrite.size} files", + sqlMetricName = "rewriteTimeMs") { + + require(!deduplicateCDFDeletes.enabled || isCdcEnabled(deltaTxn), + "CDF delete duplication is enabled but overall the CDF generation is disabled") + + // Generate a new target dataframe that has same output attributes exprIds as the target plan. + // This allows us to apply the existing resolved update/insert expressions. + val targetPlan = buildTargetPlanWithFiles( + spark, + deltaTxn, + filesToRewrite, + columnsToDrop = Nil) + val baseTargetDF = Dataset.ofRows(spark, targetPlan) + val joinType = if (shouldOptimizeMatchedOnlyMerge(spark)) { + "rightOuter" + } else { + "fullOuter" + } + + logDebug(s"""writeAllChanges using $joinType join: + | source.output: ${source.outputSet} + | target.output: ${target.outputSet} + | condition: $condition + | newTarget.output: ${baseTargetDF.queryExecution.logical.outputSet} + """.stripMargin) + + // Expressions to update metrics + val incrSourceRowCountExpr = incrementMetricAndReturnBool("numSourceRowsInSecondScan", true) + val incrNoopCountExpr = incrementMetricAndReturnBool("numTargetRowsCopied", false) + + // Apply an outer join to find both, matches and non-matches. We are adding two boolean fields + // with value `true`, one to each side of the join. Whether this field is null or not after + // the outer join, will allow us to identify whether the resultant joined row was a + // matched inner result or an unmatched result with null on one side. + val joinedDF = { + val sourceDF = if (deduplicateCDFDeletes.enabled && deduplicateCDFDeletes.includesInserts) { + // Add row index for the source rows to identify inserted rows during the cdf deleted rows + // deduplication. See [[deduplicateCDFDeletes()]] + getSourceDF() + .withColumn(SOURCE_ROW_INDEX_COL, monotonically_increasing_id()) + } else { + getSourceDF() + } + val targetDF = baseTargetDF + .withColumn(TARGET_ROW_PRESENT_COL, lit(true)) + sourceDF + .withColumn(SOURCE_ROW_PRESENT_COL, new Column(incrSourceRowCountExpr)) + .join( + if (deduplicateCDFDeletes.enabled) { + targetDF.withColumn(TARGET_ROW_INDEX_COL, monotonically_increasing_id()) + } else targetDF, + new Column(condition), joinType) + } + + // Precompute conditions in matched and not matched clauses and generate + // the joinedDF with precomputed columns and clauses with rewritten conditions. + val (joinedAndPrecomputedConditionsDF, clausesWithPrecompConditions) = + generatePrecomputedConditionsAndDF( + joinedDF, + clauses = matchedClauses ++ notMatchedClauses ++ notMatchedBySourceClauses) + + val cdcEnabled = DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(deltaTxn.metadata) + + + // The target output columns need to be marked as nullable here, as they are going to be used + // to reference the output of an outer join. + val targetOutputCols = getTargetOutputCols(deltaTxn, makeNullable = true) + + // If there are N columns in the target table, the full outer join output will have: + // - N columns for target table + // - ROW_DROPPED_COL to define whether the generated row should dropped or written + // - if CDC is enabled, also CDC_TYPE_COLUMN_NAME containing the type of change being performed + // in a particular row + // (N+1 or N+2 or N+3 columns depending on CDC disabled / enabled and if Row IDs are preserved) + val outputColNames = + targetOutputCols.map(_.name) ++ + Seq(ROW_DROPPED_COL) ++ + (if (cdcEnabled) Seq(CDC_TYPE_COLUMN_NAME) else Seq()) + + // Copy expressions to copy the existing target row and not drop it (ROW_DROPPED_COL=false), + // and in case CDC is enabled, set it to CDC_TYPE_NOT_CDC. + // (N+1 or N+2 or N+3 columns depending on CDC disabled / enabled and if Row IDs are preserved) + val noopCopyExprs = + targetOutputCols ++ + Seq(incrNoopCountExpr) ++ + (if (cdcEnabled) Seq(CDC_TYPE_NOT_CDC) else Seq()) + + // Generate output columns. + val outputCols = generateWriteAllChangesOutputCols( + targetOutputCols, + outputColNames, + noopCopyExprs, + clausesWithPrecompConditions, + cdcEnabled + ) + + val outputDF = if (cdcEnabled) { + generateCdcAndOutputRows( + joinedAndPrecomputedConditionsDF, + outputCols, + outputColNames, + noopCopyExprs, + deduplicateCDFDeletes) + .filter(s"$ROW_DROPPED_COL = false") + .drop(ROW_DROPPED_COL) + } else { // change data capture is off, just output the normal data + // Apply the expressions on the join output. The filter ensures we only consider rows + // that are not dropped. And the drop ensures that the dropped flag does not leak out + // to the output. + joinedAndPrecomputedConditionsDF + .select(outputCols: _*) + .filter(s"$ROW_DROPPED_COL = false") + .drop(ROW_DROPPED_COL) + } + + logDebug("writeAllChanges: join output plan:\n" + outputDF.queryExecution) + + // Write to Delta + val newFiles = writeFiles(spark, deltaTxn, outputDF) + + // Update metrics + val (addedBytes, addedPartitions) = totalBytesAndDistinctPartitionValues(newFiles) + metrics("numTargetFilesAdded") += newFiles.count(_.isInstanceOf[AddFile]) + metrics("numTargetChangeFilesAdded") += newFiles.count(_.isInstanceOf[AddCDCFile]) + metrics("numTargetChangeFileBytes") += newFiles.collect{ case f: AddCDCFile => f.size }.sum + metrics("numTargetBytesAdded") += addedBytes + metrics("numTargetPartitionsAddedTo") += addedPartitions + if (multipleMatchDeleteOnlyOvercount.isDefined) { + // Compensate for counting duplicates during the query. + val actualRowsDeleted = + metrics("numTargetRowsDeleted").value - multipleMatchDeleteOnlyOvercount.get + assert(actualRowsDeleted >= 0) + metrics("numTargetRowsDeleted").set(actualRowsDeleted) + val actualRowsMatchedDeleted = + metrics("numTargetRowsMatchedDeleted").value - multipleMatchDeleteOnlyOvercount.get + assert(actualRowsMatchedDeleted >= 0) + metrics("numTargetRowsMatchedDeleted").set(actualRowsMatchedDeleted) + } + + newFiles + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeOutputGeneration.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeOutputGeneration.scala index 069a969dcf7..a0a467ff33a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeOutputGeneration.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeOutputGeneration.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.delta.commands.merge import scala.collection.mutable +import org.apache.spark.sql.delta.metric.IncrementMetric import org.apache.spark.sql.delta.commands.MergeIntoCommandBase +import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute @@ -31,7 +33,9 @@ import org.apache.spark.sql.functions._ * the output of the merge operation. */ trait MergeOutputGeneration { self: MergeIntoCommandBase => + import CDCReader._ import MergeIntoCommandBase._ + import MergeOutputGeneration._ /** * Precompute conditions in MATCHED and NOT MATCHED clauses and generate the source @@ -85,4 +89,438 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase => (sourceWithPrecompConditions, clausesWithPrecompConditions) } + /** + * Generate the expressions to process full-outer join output and generate target rows. + * + * To generate these N + 2 columns, we will be generate N + 2 expressions and apply them + * on the joinedDF. The CDC column will be either used for CDC generation or dropped before + * performing the final write, and the other column will always be dropped after executing the + * increment metric expression and filtering on ROW_DROPPED_COL. + */ + protected def generateWriteAllChangesOutputCols( + targetOutputCols: Seq[Expression], + outputColNames: Seq[String], + noopCopyExprs: Seq[Expression], + clausesWithPrecompConditions: Seq[DeltaMergeIntoClause], + cdcEnabled: Boolean, + shouldCountDeletedRows: Boolean = true) + : IndexedSeq[Column] = { + + val numOutputCols = outputColNames.size + + // ==== Generate N + 2 (N + 4 preserving Row Tracking) expressions for MATCHED clauses ==== + val processedMatchClauses: Seq[ProcessedClause] = generateAllActionExprs( + targetOutputCols, + clausesWithPrecompConditions.collect{ case c: DeltaMergeIntoMatchedClause => c }, + cdcEnabled, + shouldCountDeletedRows) + val matchedExprs: Seq[Expression] = generateClauseOutputExprs( + numOutputCols, + processedMatchClauses, + noopCopyExprs) + + // N + 1 (or N + 2 with CDC, N + 4 preserving Row Tracking and CDC) expressions to delete the + // unmatched source row when it should not be inserted. `target.output` will produce NULLs + // which will get deleted eventually. + val deleteSourceRowExprs = + (targetOutputCols ++ + Seq(Literal(true))) ++ + (if (cdcEnabled) Seq(CDC_TYPE_NOT_CDC) else Seq()) + + // ==== Generate N + 2 (N + 4 preserving Row Tracking) expressions for NOT MATCHED clause ==== + val processedNotMatchClauses: Seq[ProcessedClause] = generateAllActionExprs( + targetOutputCols, + clausesWithPrecompConditions.collect{ case c: DeltaMergeIntoNotMatchedClause => c }, + cdcEnabled, + shouldCountDeletedRows) + val notMatchedExprs: Seq[Expression] = generateClauseOutputExprs( + numOutputCols, + processedNotMatchClauses, + deleteSourceRowExprs) + + // === Generate N + 2 (N + 4 with Row Tracking) expressions for NOT MATCHED BY SOURCE clause === + val processedNotMatchBySourceClauses: Seq[ProcessedClause] = generateAllActionExprs( + targetOutputCols, + clausesWithPrecompConditions.collect{ case c: DeltaMergeIntoNotMatchedBySourceClause => c }, + cdcEnabled, + shouldCountDeletedRows) + val notMatchedBySourceExprs: Seq[Expression] = generateClauseOutputExprs( + numOutputCols, + processedNotMatchBySourceClauses, + noopCopyExprs) + + // ==== Generate N + 2 (N + 4 preserving Row Tracking) expressions that invokes the MATCHED, + // NOT MATCHED and NOT MATCHED BY SOURCE expressions ==== + // That is, conditionally invokes them based on whether there was a match in the outer join. + + // Predicates to check whether there was a match in the full outer join. + val ifSourceRowNull = col(SOURCE_ROW_PRESENT_COL).isNull.expr + val ifTargetRowNull = col(TARGET_ROW_PRESENT_COL).isNull.expr + + val outputCols = (0 until numOutputCols).map { i => + // Coupled with the clause conditions, the resultant possibly-nested CaseWhens can + // be the following for every i-th column. (In the case with single matched/not-matched + // clauses, instead of nested CaseWhens, there will be If/Else.) + // + // CASE WHEN (source row is null) + // CASE WHEN + // THEN + // WHEN + // THEN + // ... + // ELSE + // + // WHEN (target row is null) + // THEN + // CASE WHEN + // THEN + // WHEN + // THEN + // ... + // ELSE + // + // ELSE (both source and target row are not null) + // CASE WHEN + // THEN + // WHEN + // THEN + // ... + // ELSE + // + val caseWhen = CaseWhen(Seq( + ifSourceRowNull -> notMatchedBySourceExprs(i), + ifTargetRowNull -> notMatchedExprs(i)), + /* otherwise */ matchedExprs(i)) + new Column(Alias(caseWhen, outputColNames(i))()) + } + logDebug("writeAllChanges: join output expressions\n\t" + seqToString(outputCols.map(_.expr))) + outputCols + } + + /** + * Represents a merge clause after its condition and action expressions have been processed before + * generating the final output expression. + * @param condition Optional precomputed condition. + * @param actions List of output expressions generated from every action of the clause. + */ + protected case class ProcessedClause(condition: Option[Expression], actions: Seq[Expression]) + + /** + * Generate expressions for every output column and every merge clause based on the corresponding + * UPDATE, DELETE and/or INSERT action(s). + * @param targetOutputCols List of output column expressions from the target table. Used to + * generate CDC data for DELETE. + * @param clausesWithPrecompConditions List of merge clauses with precomputed conditions. Action + * expressions are generated for each of these clauses. + * @param cdcEnabled Whether the generated expressions should include CDC information. + * @param shouldCountDeletedRows Whether metrics for number of deleted rows should be incremented + * here. + * @return For each merge clause, a list of [[ProcessedClause] each with a precomputed + * condition and N+2 action expressions (N output columns + [[ROW_DROPPED_COL]] + + * [[CDC_TYPE_COLUMN_NAME]]) to apply on a row when that clause matches. + */ + protected def generateAllActionExprs( + targetOutputCols: Seq[Expression], + clausesWithPrecompConditions: Seq[DeltaMergeIntoClause], + cdcEnabled: Boolean, + shouldCountDeletedRows: Boolean) + : Seq[ProcessedClause] = clausesWithPrecompConditions.map (clause => { + val incrUpdatedCountExpr = + incrementMetricAndReturnBool(name = "numTargetRowsUpdated", valueToReturn = false) + val incrDeletedCountExpr = + incrementMetricAndReturnBool(name = "numTargetRowsDeleted", valueToReturn = true) + + val actions = clause match { + // Seq of up to N+3 expressions to generate output rows based on the UPDATE, DELETE and/or + // INSERT action(s) + case u: DeltaMergeIntoMatchedUpdateClause => + val incrCountExpr = + IncrementMetric(incrUpdatedCountExpr, metrics("numTargetRowsMatchedUpdated")) + // Generate update expressions and set ROW_DROPPED_COL = false + u.resolvedActions.map(_.expr) ++ + Seq(incrCountExpr) ++ + (if (cdcEnabled) Seq(Literal(CDC_TYPE_UPDATE_POSTIMAGE)) else Seq()) + case u: DeltaMergeIntoNotMatchedBySourceUpdateClause => + val incrCountExpr = + IncrementMetric(incrUpdatedCountExpr, metrics("numTargetRowsNotMatchedBySourceUpdated")) + // Generate update expressions and set ROW_DROPPED_COL = false + u.resolvedActions.map(_.expr) ++ + Seq(incrCountExpr) ++ + (if (cdcEnabled) Seq(Literal(CDC_TYPE_UPDATE_POSTIMAGE)) else Seq()) + case _: DeltaMergeIntoMatchedDeleteClause => + val incrCountExpr = + if (shouldCountDeletedRows) { + IncrementMetric(incrDeletedCountExpr, metrics("numTargetRowsMatchedDeleted")) + } else { + lit(true).expr + } + // Generate expressions to set the ROW_DROPPED_COL = true and mark as a DELETE + targetOutputCols ++ + Seq(incrCountExpr) ++ + (if (cdcEnabled) Seq(CDC_TYPE_DELETE) else Seq()) + case _: DeltaMergeIntoNotMatchedBySourceDeleteClause => + val incrCountExpr = + if (shouldCountDeletedRows) { + IncrementMetric(incrDeletedCountExpr, metrics("numTargetRowsNotMatchedBySourceDeleted")) + } else { + lit(true).expr + } + // Generate expressions to set the ROW_DROPPED_COL = true and mark as a DELETE + targetOutputCols ++ + Seq(incrCountExpr) ++ + (if (cdcEnabled) Seq(CDC_TYPE_DELETE) else Seq()) + case i: DeltaMergeIntoNotMatchedInsertClause => + val incrInsertedCountExpr = + incrementMetricAndReturnBool("numTargetRowsInserted", valueToReturn = false) + i.resolvedActions.map(_.expr) ++ + Seq(incrInsertedCountExpr) ++ + (if (cdcEnabled) Seq(Literal(CDC_TYPE_INSERT)) else Seq()) + } + ProcessedClause(clause.condition, actions) + }) + + /** + * Generate the output expression for each output column to apply the correct action for a type of + * merge clause. For each output column, the resulting expression dispatches the correct action + * based on all clause conditions. + * @param numOutputCols Number of output columns. + * @param clauses List of preprocessed merge clauses to bind together. + * @param noopExprs Default expression to apply when no condition holds. + * @return A list of one expression per output column to apply for a type of merge clause. + */ + protected def generateClauseOutputExprs( + numOutputCols: Integer, + clauses: Seq[ProcessedClause], + noopExprs: Seq[Expression]) + : Seq[Expression] = { + val clauseExprs = if (clauses.isEmpty) { + // Nothing to update or delete + noopExprs + } else { + + if (clauses.head.condition.isEmpty) { + // Only one clause without any condition, so the corresponding action expressions + // can be evaluated directly to generate the output columns. + clauses.head.actions + + } else if (clauses.length == 1) { + // Only one clause _with_ a condition, so generate IF/THEN instead of CASE WHEN. + // + // For the i-th output column, generate + // IF THEN + // ELSE + // + val condition = clauses.head.condition.get + clauses.head.actions.zip(noopExprs).map { case (a, noop) => If(condition, a, noop) } + + } else { + // There are multiple clauses. Use `CaseWhen` to conditionally evaluate the right + // action expressions to output columns + (0 until numOutputCols).map { i => + // For the i-th output column, generate + // CASE + // WHEN THEN + // WHEN THEN + // ... + // ELSE + // + val conditionalBranches = clauses.map(precomp => { + precomp.condition.getOrElse(Literal(true)) -> precomp.actions(i) + }) + CaseWhen(conditionalBranches, Some(noopExprs(i))) + } + } + } + assert(clauseExprs.size == numOutputCols, + s"incorrect # expressions:\n\t" + seqToString(clauseExprs)) + logDebug(s"writeAllChanges: expressions\n\t" + seqToString(clauseExprs)) + clauseExprs + } + + /** + * Build the full output as an array of packed rows, then explode into the final result. Based + * on the CDC type as originally marked, we produce both rows for the CDC_TYPE_NOT_CDC partition + * to be written to the main table and rows for the CDC partitions to be written as CDC files. + * + * See [[CDCReader]] for general details on how partitioning on the CDC type column works. + */ + protected def generateCdcAndOutputRows( + sourceDf: DataFrame, + outputCols: Seq[Column], + outputColNames: Seq[String], + noopCopyExprs: Seq[Expression], + deduplicateDeletes: DeduplicateCDFDeletes): DataFrame = { + import org.apache.spark.sql.delta.commands.cdc.CDCReader._ + // The main partition just needs to swap in the CDC_TYPE_NOT_CDC value. + val mainDataOutput = + outputCols.dropRight(1) :+ new Column(CDC_TYPE_NOT_CDC).as(CDC_TYPE_COLUMN_NAME) + + // Deleted rows are sent to the CDC partition instead of the main partition. These rows are + // marked as dropped, we need to retain them while incrementing the original metric column + // ourselves. + val keepRowAndIncrDeletedCountExpr = !outputCols(outputCols.length - 2) + val deleteCdcOutput = outputCols + .updated(outputCols.length - 2, keepRowAndIncrDeletedCountExpr.as(ROW_DROPPED_COL)) + + // Update preimages need special handling. This is conceptually the same as the + // transformation for cdcOutputCols, but we have to transform the noop exprs to columns + // ourselves because it hasn't already been done. + val cdcNoopExprs = noopCopyExprs.dropRight(2) :+ + Literal(false) :+ Literal(CDC_TYPE_UPDATE_PREIMAGE) + val updatePreimageCdcOutput = cdcNoopExprs.zipWithIndex.map { + case (e, i) => new Column(Alias(e, outputColNames(i))()) + } + + // To avoid duplicate evaluation of nondeterministic column values such as + // [[GenerateIdentityValues]], we EXPLODE CDC rows first, from which we EXPLODE again, + // and for each of "insert" and "update_postimage" rows, generate main data rows. + // The first EXPLODE will force evaluation all nondeterministic expressions, + // and the second EXPLODE will just copy the generated value from CDC rows + // to main data. By doing so we ensure nondeterministic column values in CDC and + // main data rows stay the same. + + val cdcTypeCol = outputCols.last + val cdcArray = new Column(CaseWhen(Seq( + EqualNullSafe(cdcTypeCol.expr, Literal(CDC_TYPE_INSERT)) -> array( + struct(outputCols: _*)).expr, + + EqualNullSafe(cdcTypeCol.expr, Literal(CDC_TYPE_UPDATE_POSTIMAGE)) -> array( + struct(updatePreimageCdcOutput: _*), + struct(outputCols: _*)).expr, + + EqualNullSafe(cdcTypeCol.expr, CDC_TYPE_DELETE) -> array( + struct(deleteCdcOutput: _*)).expr), + + // If none of the CDC cases apply (true for purely rewritten target rows, dropped source + // rows, etc.) just stick to the normal output. + array(struct(mainDataOutput: _*)).expr + )) + + val cdcToMainDataArray = new Column(If( + Or( + EqualNullSafe(col(s"packedCdc.$CDC_TYPE_COLUMN_NAME").expr, + Literal(CDC_TYPE_INSERT)), + EqualNullSafe(col(s"packedCdc.$CDC_TYPE_COLUMN_NAME").expr, + Literal(CDC_TYPE_UPDATE_POSTIMAGE))), + array( + col("packedCdc"), + struct(outputColNames.dropRight(1).map { n => col(s"packedCdc.`$n`") } :+ + new Column(CDC_TYPE_NOT_CDC).as(CDC_TYPE_COLUMN_NAME) + : _*) + ).expr, + array(col("packedCdc")).expr + )) + + if (deduplicateDeletes.enabled) { + deduplicateCDFDeletes( + deduplicateDeletes, + sourceDf, + cdcArray, + cdcToMainDataArray, + outputColNames) + } else { + packAndExplodeCDCOutput( + sourceDf, + cdcArray, + cdcToMainDataArray, + outputColNames, + dedupColumns = Nil) + } + } + + /** + * Applies the transformations to generate the CDC output: + * 1. Transform each input row into its corresponding array of CDC rows, e.g. an updated row + * yields: array(update_preimage, update_postimage). + * 2. Add the main data output for inserted/updated rows to the previously packed CDC data. + * 3. Explode the result to flatten the packed arrays. + * + * @param sourceDf The dataframe generated after processing the merge output. + * @param cdcArray Transforms the merge output into the corresponding packed CDC data that will be + * written to the CDC partition. + * @param cdcToMainDataArray Transforms the packed CDC data to add the main data output, i.e. rows + * that are inserted or updated and will be written to the main + * partition. + * @param outputColNames All the main and CDC columns to use in the output. + * @param dedupColumns Additional columns to add to enable deduplication. + */ + private def packAndExplodeCDCOutput( + sourceDf: DataFrame, + cdcArray: Column, + cdcToMainDataArray: Column, + outputColNames: Seq[String], + dedupColumns: Seq[Column]): DataFrame = { + val unpackedCols = outputColNames.map { name => + col(s"packedData.`$name`").as(name) + } + sourceDf + // `explode()` creates a [[Generator]] which can't handle non-deterministic expressions that + // we use to increment metric counters. We first project the CDC array so that the expressions + // are evaluated before we explode the array, + .select(cdcArray.as("projectedCDC") +: dedupColumns: _*) + .select(explode(col("projectedCDC")).as("packedCdc") +: dedupColumns: _*) + .select(explode(cdcToMainDataArray).as("packedData") +: dedupColumns: _*) + .select(unpackedCols++ dedupColumns: _*) + } + + /** + * This method deduplicates CDF deletes where a target row has potentially multiple matches. It + * assumes that the input dataframe contains the [[TARGET_ROW_INDEX_COL]] and + * to detect inserts the [[SOURCE_ROW_INDEX_COL]] column to track the origin of the row. + * + * All duplicates of deleted rows have the same [[TARGET_ROW_INDEX_COL]] and + * [[CDC_TYPE_COLUMN_NAME]] therefore we use both columns as compound deduplication key. + * In case the input data frame contains additional insert rows we leave them untouched by using + * the [[SOURCE_ROW_INDEX_COL]] to fill the null values of the [[TARGET_ROW_INDEX_COL]]. This + * may lead to duplicates as part of the final row index but this is not a problem since if + * an insert and a delete have the same [[TARGET_ROW_INDEX_COL]] they definitely have a + * different [[CDC_TYPE_COLUMN_NAME]]. + */ + private def deduplicateCDFDeletes( + deduplicateDeletes: DeduplicateCDFDeletes, + df: DataFrame, + cdcArray: Column, + cdcToMainDataArray: Column, + outputColNames: Seq[String]): DataFrame = { + val dedupColumns = (if (deduplicateDeletes.includesInserts) { + Seq(col(TARGET_ROW_INDEX_COL), col(SOURCE_ROW_INDEX_COL)) + } else { + Seq(col(TARGET_ROW_INDEX_COL)) + }) + + val cdcDf = packAndExplodeCDCOutput( + df, + cdcArray, + cdcToMainDataArray, + outputColNames, + dedupColumns + ) + + val cdcDfWithIncreasingIds = if (deduplicateDeletes.includesInserts) { + cdcDf.withColumn( + TARGET_ROW_INDEX_COL, + coalesce(col(TARGET_ROW_INDEX_COL), col(SOURCE_ROW_INDEX_COL))) + } else { + cdcDf + } + cdcDfWithIncreasingIds + .dropDuplicates(TARGET_ROW_INDEX_COL, CDC_TYPE_COLUMN_NAME) + .drop(TARGET_ROW_INDEX_COL, SOURCE_ROW_INDEX_COL) + } +} + +/** + * This class enables and configures the deduplication of CDF deletes in case the merge statement + * contains an unconditional delete statement that matches multiple target rows. + * + * @param enabled CDF generation should be enabled and duplicate target matches are detected + * @param includesInserts in addition to the unconditional deletes the merge also inserts rows + */ +case class DeduplicateCDFDeletes( + enabled: Boolean, + includesInserts: Boolean) + +object MergeOutputGeneration { + final val TARGET_ROW_INDEX_COL = "_target_row_index_" + final val SOURCE_ROW_INDEX_COL = "_source_row_index" } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoAccumulatorSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoAccumulatorSuite.scala index dcc1c0502d7..a2b7d915088 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoAccumulatorSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoAccumulatorSuite.scala @@ -20,7 +20,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConverters._ -import org.apache.spark.sql.delta.commands.MergeIntoCommand +import org.apache.spark.sql.delta.commands.MergeIntoCommandBase import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerNodeExcluded, SparkListenerTaskEnd} @@ -71,7 +71,7 @@ class MergeIntoAccumulatorSuite task.accumulatorUpdates.map(_.name) }.toSet // Verify accumulators used by MergeIntoCommand are not tracked. - assert(!accumNames.contains(MergeIntoCommand.TOUCHED_FILES_ACCUM_NAME)) + assert(!accumNames.contains(MergeIntoCommandBase.TOUCHED_FILES_ACCUM_NAME)) } finally { iter.close() }