From 408b8948073ae3515a52ca8d5ba93926e436e4aa Mon Sep 17 00:00:00 2001 From: Christos Stavrakakis Date: Thu, 19 Sep 2024 18:30:07 +0200 Subject: [PATCH] Exclude metadata only updates from DV check (#3686) ## Description During commit we validate that `AddFile` actions cannot contain Deletion Vectors when DVs are not enabled for a table (table property). This restriction is incorrect for actions that update metadata of existing files, e.g. `ComputeStatistics` or `RowTrackingBackfill`. The current code skips the check for `ComputeStatistics` operation but not for other operations that perform in-place-metadata updates. The new `isInPlaceFileMetadataUpdate` method is added to Delta operations so that we can easily distinguish such operations. The `getAssertDeletionVectorWellFormedFunc` function is slightly refactor to be more readable. ## How was this patch tested? Existing tests provide coverage. --- .../spark/sql/delta/DeltaOperations.scala | 97 ++++++++++++++++++- .../sql/delta/OptimisticTransaction.scala | 31 +++--- 2 files changed, 116 insertions(+), 12 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index 841d950b7f..6fa7ad297c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -91,6 +91,21 @@ object DeltaOperations { */ def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean + /** + * Whether the transaction is updating metadata of existing files. + * + * The Delta protocol allows committing AddFile actions for files that already exist on the + * latest version of the table, without committing corresponding RemoveFile actions. This is + * used to update the metadata of existing files, e.g. to recompute statistics or add tags. + * + * Such operations need special handling during conflict checking, especially against + * no-data-change transactions, because the read/delete conflict can be resolved with + * read-file-remapping and because there is no RemoveFile action to trigger a delete/delete + * conflict. In case you are adding such operation, make sure to include a test for conflicts + * with business *and* no-data-change transactions, e.g. optimize. + */ + def isInPlaceFileMetadataUpdate: Option[Boolean] + } abstract class OperationWithPredicates(name: String, val predicates: Seq[Expression]) @@ -156,6 +171,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. // DVs can be introduced by the replaceWhere operation. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } case class RemoveColumnMapping( @@ -166,6 +183,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded during streaming inserts. */ @@ -183,6 +202,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded while deleting certain partitions. */ case class Delete(predicate: Seq[Expression]) @@ -207,6 +228,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when truncating the table. */ case class Truncate() extends Operation("TRUNCATE") { @@ -216,6 +239,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when converting a table into a Delta table. */ @@ -236,6 +261,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Represents the predicates and action type (insert, update, delete) for a Merge clause */ @@ -306,6 +333,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } object Merge { @@ -340,6 +369,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when the table is created. */ case class CreateTable( @@ -364,6 +395,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when the table is replaced. */ case class ReplaceTable( @@ -391,6 +424,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when the table properties are set. */ val OP_SET_TBLPROPERTIES = "SET TBLPROPERTIES" @@ -400,6 +435,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when the table properties are unset. */ case class UnsetTableProperties( @@ -411,6 +448,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when dropping a table feature. */ case class DropTableFeature( @@ -422,6 +461,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when columns are added. */ case class AddColumns( @@ -437,6 +478,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when columns are dropped. */ @@ -449,6 +492,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when column is renamed */ @@ -462,6 +507,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when columns are changed. */ @@ -477,6 +524,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when columns are replaced. */ case class ReplaceColumns( @@ -487,6 +536,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } case class UpgradeProtocol(newProtocol: Protocol) extends Operation("UPGRADE PROTOCOL") { @@ -499,6 +550,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } object ManualUpdate extends Operation("Manual Update") { @@ -506,6 +559,10 @@ object DeltaOperations { // Unsafe manual update disables checks. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false + + // Manual update operations can commit arbitrary actions. In case this field is needed consider + // adding a new Delta operation. For test-only code use TestOperation. + override val isInPlaceFileMetadataUpdate: Option[Boolean] = None } /** A commit without any actions. Could be used to force creation of new checkpoints. */ @@ -514,6 +571,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } case class UpdateColumnMetadata( @@ -528,6 +587,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } case class UpdateSchema(oldSchema: StructType, newSchema: StructType) @@ -538,6 +599,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } case class AddConstraint( @@ -546,6 +609,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } case class DropConstraint( @@ -560,6 +625,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when recomputing stats on the table. */ @@ -569,6 +636,9 @@ object DeltaOperations { // ComputeStats operation commits AddFiles with recomputed stats which are always tight bounds, // even when DVs are present. This check should be disabled. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false + + // ComputeStats operation only updates statistics of existing files. + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(true) } /** Recorded when restoring a Delta table to an older version. */ @@ -586,6 +656,11 @@ object DeltaOperations { // Restore operation commits AddFiles with files, DVs and stats from the version it restores to. // It can happen that tight bound stats were recomputed before by ComputeStats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false + + // The restore operation could perform in-place file metadata updates. However, the difference + // between the current and the restored state is computed using only the (path, DV) pairs as + // identifiers, meaning that metadata differences are ignored. + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } sealed abstract class OptimizeOrReorg(override val name: String, predicates: Seq[Expression]) @@ -624,6 +699,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when cloning a Delta table into a new location. */ @@ -642,6 +719,8 @@ object DeltaOperations { // Clone operation commits AddFiles with files, DVs and stats copied over from the source table. // It can happen that tight bound stats were recomputed before by ComputeStats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** @@ -662,6 +741,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** @@ -676,6 +757,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when running REORG on the table. */ @@ -690,6 +773,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when clustering columns are changed on clustered tables. */ @@ -702,6 +787,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } /** Recorded when we backfill a Delta table's existing AddFiles with row tracking data. */ @@ -714,6 +801,9 @@ object DeltaOperations { // RowTrackingBackfill operation commits AddFiles with files, DVs and stats copied over. // It can happen that tight bound stats were recomputed before by ComputeStats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false + + // RowTrackingBackfill only updates tags of existing files. + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(true) } private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = { @@ -735,7 +825,10 @@ object DeltaOperations { colPosition: Option[String]) /** Dummy operation only for testing with arbitrary operation names */ - case class TestOperation(operationName: String = "TEST") extends Operation(operationName) { + case class TestOperation( + operationName: String = "TEST", + override val isInPlaceFileMetadataUpdate: Option[Boolean] = None + ) extends Operation(operationName) { override val parameters: Map[String, Any] = Map.empty // Perform the check for testing. @@ -763,6 +856,8 @@ object DeltaOperations { // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index f5e15d94f2..ec7090d7a0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -881,25 +881,35 @@ trait OptimisticTransactionImpl extends TransactionalWrite protected def getAssertDeletionVectorWellFormedFunc( spark: SparkSession, op: DeltaOperations.Operation): (Action => Unit) = { - val deletionVectorCreationAllowed = - DeletionVectorUtils.deletionVectorsWritable(snapshot, newProtocol, newMetadata) - val isComputeStatsOperation = op.isInstanceOf[DeltaOperations.ComputeStats] val commitCheckEnabled = spark.conf.get(DeltaSQLConf.DELETION_VECTORS_COMMIT_CHECK_ENABLED) + if (!commitCheckEnabled) { + return _ => {} + } + + // Whether DVs are supported, i.e. the table is allowed to contain any DVs. + val deletionVectorsSupported = + DeletionVectorUtils.deletionVectorsReadable(snapshot, newProtocol, newMetadata) + // Whether DVs are enabled, i.e. operations are allowed to create new DVs. + val deletionVectorsEnabled = + DeletionVectorUtils.deletionVectorsWritable(snapshot, newProtocol, newMetadata) - val deletionVectorDisallowedForAddFiles = - commitCheckEnabled && !isComputeStatsOperation && !deletionVectorCreationAllowed + // If the operation does not define whether it performs in-place metadata updates, we are + // conservative and assume that it is not, which makes the check stricter. + val isInPlaceFileMetadataUpdate = op.isInPlaceFileMetadataUpdate.getOrElse(false) + val deletionVectorAllowedForAddFiles = + deletionVectorsSupported && (deletionVectorsEnabled || isInPlaceFileMetadataUpdate) - val addFileMustHaveWideBounds = deletionVectorCreationAllowed && - op.checkAddFileWithDeletionVectorStatsAreNotTightBounds + val addFileMustHaveWideBounds = op.checkAddFileWithDeletionVectorStatsAreNotTightBounds action => action match { - case a: AddFile => - if (deletionVectorDisallowedForAddFiles && a.deletionVector != null) { + case a: AddFile if a.deletionVector != null => + if (!deletionVectorAllowedForAddFiles) { throw DeltaErrors.addingDeletionVectorsDisallowedException() } + // Protocol requirement checks: // 1. All files with DVs must have `stats` with `numRecords`. - if (a.deletionVector != null && (a.stats == null || a.numPhysicalRecords.isEmpty)) { + if (a.stats == null || a.numPhysicalRecords.isEmpty) { throw DeltaErrors.addFileWithDVsMissingNumRecordsException } @@ -909,7 +919,6 @@ trait OptimisticTransactionImpl extends TransactionalWrite // See that field comment in DeltaOperation for more details. // Note, the absence of the tightBounds column when DVs exist is also an illegal state. if (addFileMustHaveWideBounds && - a.deletionVector != null && // Extra inversion to also catch absent `tightBounds`. !a.tightBounds.contains(false)) { throw DeltaErrors.addFileWithDVsAndTightBoundsException()