Skip to content

Commit

Permalink
Exclude metadata only updates from DV check (#3686)
Browse files Browse the repository at this point in the history
## Description

During commit we validate that `AddFile` actions cannot contain Deletion
Vectors when DVs are not enabled for a table (table property). This
restriction is incorrect for actions that update metadata of existing
files, e.g. `ComputeStatistics` or `RowTrackingBackfill`. The current
code skips the check for `ComputeStatistics` operation but not for other
operations that perform in-place-metadata updates. The new
`isInPlaceFileMetadataUpdate` method is added to Delta operations so
that we can easily distinguish such operations.

The `getAssertDeletionVectorWellFormedFunc` function is slightly
refactor to be more readable.

## How was this patch tested?
Existing tests provide coverage.
  • Loading branch information
cstavr committed Sep 19, 2024
1 parent d07a7bd commit 408b894
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ object DeltaOperations {
*/
def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean

/**
* Whether the transaction is updating metadata of existing files.
*
* The Delta protocol allows committing AddFile actions for files that already exist on the
* latest version of the table, without committing corresponding RemoveFile actions. This is
* used to update the metadata of existing files, e.g. to recompute statistics or add tags.
*
* Such operations need special handling during conflict checking, especially against
* no-data-change transactions, because the read/delete conflict can be resolved with
* read-file-remapping and because there is no RemoveFile action to trigger a delete/delete
* conflict. In case you are adding such operation, make sure to include a test for conflicts
* with business *and* no-data-change transactions, e.g. optimize.
*/
def isInPlaceFileMetadataUpdate: Option[Boolean]

}

abstract class OperationWithPredicates(name: String, val predicates: Seq[Expression])
Expand Down Expand Up @@ -156,6 +171,8 @@ object DeltaOperations {
// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
// DVs can be introduced by the replaceWhere operation.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

case class RemoveColumnMapping(
Expand All @@ -166,6 +183,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/** Recorded during streaming inserts. */
Expand All @@ -183,6 +202,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}
/** Recorded while deleting certain partitions. */
case class Delete(predicate: Seq[Expression])
Expand All @@ -207,6 +228,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}
/** Recorded when truncating the table. */
case class Truncate() extends Operation("TRUNCATE") {
Expand All @@ -216,6 +239,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/** Recorded when converting a table into a Delta table. */
Expand All @@ -236,6 +261,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/** Represents the predicates and action type (insert, update, delete) for a Merge clause */
Expand Down Expand Up @@ -306,6 +333,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

object Merge {
Expand Down Expand Up @@ -340,6 +369,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}
/** Recorded when the table is created. */
case class CreateTable(
Expand All @@ -364,6 +395,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}
/** Recorded when the table is replaced. */
case class ReplaceTable(
Expand Down Expand Up @@ -391,6 +424,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}
/** Recorded when the table properties are set. */
val OP_SET_TBLPROPERTIES = "SET TBLPROPERTIES"
Expand All @@ -400,6 +435,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}
/** Recorded when the table properties are unset. */
case class UnsetTableProperties(
Expand All @@ -411,6 +448,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}
/** Recorded when dropping a table feature. */
case class DropTableFeature(
Expand All @@ -422,6 +461,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}
/** Recorded when columns are added. */
case class AddColumns(
Expand All @@ -437,6 +478,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/** Recorded when columns are dropped. */
Expand All @@ -449,6 +492,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/** Recorded when column is renamed */
Expand All @@ -462,6 +507,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/** Recorded when columns are changed. */
Expand All @@ -477,6 +524,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}
/** Recorded when columns are replaced. */
case class ReplaceColumns(
Expand All @@ -487,6 +536,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

case class UpgradeProtocol(newProtocol: Protocol) extends Operation("UPGRADE PROTOCOL") {
Expand All @@ -499,13 +550,19 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

object ManualUpdate extends Operation("Manual Update") {
override val parameters: Map[String, Any] = Map.empty

// Unsafe manual update disables checks.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false

// Manual update operations can commit arbitrary actions. In case this field is needed consider
// adding a new Delta operation. For test-only code use TestOperation.
override val isInPlaceFileMetadataUpdate: Option[Boolean] = None
}

/** A commit without any actions. Could be used to force creation of new checkpoints. */
Expand All @@ -514,6 +571,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

case class UpdateColumnMetadata(
Expand All @@ -528,6 +587,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

case class UpdateSchema(oldSchema: StructType, newSchema: StructType)
Expand All @@ -538,6 +599,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

case class AddConstraint(
Expand All @@ -546,6 +609,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

case class DropConstraint(
Expand All @@ -560,6 +625,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/** Recorded when recomputing stats on the table. */
Expand All @@ -569,6 +636,9 @@ object DeltaOperations {
// ComputeStats operation commits AddFiles with recomputed stats which are always tight bounds,
// even when DVs are present. This check should be disabled.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false

// ComputeStats operation only updates statistics of existing files.
override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(true)
}

/** Recorded when restoring a Delta table to an older version. */
Expand All @@ -586,6 +656,11 @@ object DeltaOperations {
// Restore operation commits AddFiles with files, DVs and stats from the version it restores to.
// It can happen that tight bound stats were recomputed before by ComputeStats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false

// The restore operation could perform in-place file metadata updates. However, the difference
// between the current and the restored state is computed using only the (path, DV) pairs as
// identifiers, meaning that metadata differences are ignored.
override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

sealed abstract class OptimizeOrReorg(override val name: String, predicates: Seq[Expression])
Expand Down Expand Up @@ -624,6 +699,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/** Recorded when cloning a Delta table into a new location. */
Expand All @@ -642,6 +719,8 @@ object DeltaOperations {
// Clone operation commits AddFiles with files, DVs and stats copied over from the source table.
// It can happen that tight bound stats were recomputed before by ComputeStats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/**
Expand All @@ -662,6 +741,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/**
Expand All @@ -676,6 +757,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/** Recorded when running REORG on the table. */
Expand All @@ -690,6 +773,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/** Recorded when clustering columns are changed on clustered tables. */
Expand All @@ -702,6 +787,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/** Recorded when we backfill a Delta table's existing AddFiles with row tracking data. */
Expand All @@ -714,6 +801,9 @@ object DeltaOperations {
// RowTrackingBackfill operation commits AddFiles with files, DVs and stats copied over.
// It can happen that tight bound stats were recomputed before by ComputeStats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false

// RowTrackingBackfill only updates tags of existing files.
override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(true)
}

private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = {
Expand All @@ -735,7 +825,10 @@ object DeltaOperations {
colPosition: Option[String])

/** Dummy operation only for testing with arbitrary operation names */
case class TestOperation(operationName: String = "TEST") extends Operation(operationName) {
case class TestOperation(
operationName: String = "TEST",
override val isInPlaceFileMetadataUpdate: Option[Boolean] = None
) extends Operation(operationName) {
override val parameters: Map[String, Any] = Map.empty

// Perform the check for testing.
Expand Down Expand Up @@ -763,6 +856,8 @@ object DeltaOperations {

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}
}

Expand Down
Loading

0 comments on commit 408b894

Please sign in to comment.