From 5d6309477ab51be5f88d421e868a4489723f400d Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 3 Sep 2024 06:51:25 +0200 Subject: [PATCH] [Spark] Fix stats with tightBounds check for AddFiles with deletionVectors (#3633) ## Description Check `DeletionVectorFilesHaveWideBoundsCheck` has been disabled for COMPUTE STATS because it reintroduces stats with tight bound to files with Deletion Vectors. However, there are other operations that can then copy over these AddFile actions with DVs and tight stats. These operations resulted in DELTA_ADDING_DELETION_VECTORS_WITH_TIGHT_BOUNDS_DISALLOWED error, which was a false positive. In this PR we also attempt to introduce and discuss a "framework" for checks like that as a property of DeltaOperations, with DeltaOperations declaring as a member method whether a certain property and check should be performed. This is opposed to current practice, where many places in the code feature special cases like matching against a certain DeltaOperation and doing something special; this kind of code is very decentralized, and it's easy to miss if any new place or new operation needs such central handling. If this was centralized in DeltaOperations, this could lead to better discoverability of special cases and edge cases when implementing new operations. ## How was this patch tested? Tests added. Co-authored-by: Julek Sompolski --- .../spark/sql/delta/DeltaOperations.scala | 135 +++++++++++++++++- .../sql/delta/OptimisticTransaction.scala | 18 +-- .../spark/sql/delta/TightBoundsSuite.scala | 85 ++++++++++- 3 files changed, 221 insertions(+), 17 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index 30680baacc6..841d950b7f3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -72,6 +72,25 @@ object DeltaOperations { transformer.transformToString(metric, allMetrics) } } + + /** + * A transaction that commits AddFile actions with deletionVector should have column stats that + * are not tight bounds. An exception to this is ComputeStats operation, which recomputes stats + * on these files, and the new stats are tight bounds. Some other operations that merely take an + * existing AddFile action and commit a copy of it, not changing the deletionVector or stats, + * can then also recommit AddFile with deletionVector and tight bound stats that were recomputed + * before. + * + * An operation for which this can happen, and there is no way that it could be committing + * new deletion vectors, should set this to false to bypass this check. + * All other operations should set this to true, so that this is validated during commit. + * + * This is abstract to force the implementers of all operations to think about this setting. + * All operations should add a comment justifying this setting. + * Any operation that sets this to false should add a test in TightBoundsSuite. + */ + def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean + } abstract class OperationWithPredicates(name: String, val predicates: Seq[Expression]) @@ -133,6 +152,10 @@ object DeltaOperations { DeltaOperationMetrics.WRITE_REPLACE_WHERE } override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. + // DVs can be introduced by the replaceWhere operation. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } case class RemoveColumnMapping( @@ -140,6 +163,9 @@ object DeltaOperations { override def parameters: Map[String, Any] = Map() override val operationMetrics: Set[String] = DeltaOperationMetrics.REMOVE_COLUMN_MAPPING + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded during streaming inserts. */ @@ -154,6 +180,9 @@ object DeltaOperations { ) override val operationMetrics: Set[String] = DeltaOperationMetrics.STREAMING_UPDATE override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded while deleting certain partitions. */ case class Delete(predicate: Seq[Expression]) @@ -175,12 +204,18 @@ object DeltaOperations { strMetrics ++ dvMetrics } override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when truncating the table. */ case class Truncate() extends Operation("TRUNCATE") { override val parameters: Map[String, Any] = Map.empty override val operationMetrics: Set[String] = DeltaOperationMetrics.TRUNCATE override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when converting a table into a Delta table. */ @@ -198,6 +233,9 @@ object DeltaOperations { sourceFormat.map("sourceFormat" -> _) override val operationMetrics: Set[String] = DeltaOperationMetrics.CONVERT override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Represents the predicates and action type (insert, update, delete) for a Merge clause */ @@ -265,6 +303,9 @@ object DeltaOperations { } override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } object Merge { @@ -296,6 +337,9 @@ object DeltaOperations { val dvMetrics = transformDeletionVectorMetrics(metrics) super.transformMetrics(metrics) ++ dvMetrics } + + // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when the table is created. */ case class CreateTable( @@ -317,6 +361,9 @@ object DeltaOperations { DeltaOperationMetrics.WRITE } override def changesData: Boolean = asSelect + + // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when the table is replaced. */ case class ReplaceTable( @@ -341,12 +388,18 @@ object DeltaOperations { DeltaOperationMetrics.WRITE } override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when the table properties are set. */ val OP_SET_TBLPROPERTIES = "SET TBLPROPERTIES" case class SetTableProperties( properties: Map[String, String]) extends Operation(OP_SET_TBLPROPERTIES) { override val parameters: Map[String, Any] = Map("properties" -> JsonUtils.toJson(properties)) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when the table properties are unset. */ case class UnsetTableProperties( @@ -355,6 +408,9 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "properties" -> JsonUtils.toJson(propKeys), "ifExists" -> ifExists) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when dropping a table feature. */ case class DropTableFeature( @@ -363,6 +419,9 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "featureName" -> featureName, "truncateHistory" -> truncateHistory) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when columns are added. */ case class AddColumns( @@ -375,6 +434,9 @@ object DeltaOperations { "column" -> structFieldToMap(columnPath, column) ) ++ colPosition.map("position" -> _.toString) })) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when columns are dropped. */ @@ -384,6 +446,9 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "columns" -> JsonUtils.toJson(colsToDrop.map(UnresolvedAttribute(_).name))) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when column is renamed */ @@ -394,6 +459,9 @@ object DeltaOperations { "oldColumnPath" -> UnresolvedAttribute(oldColumnPath).name, "newColumnPath" -> UnresolvedAttribute(newColumnPath).name ) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when columns are changed. */ @@ -406,6 +474,9 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "column" -> JsonUtils.toJson(structFieldToMap(columnPath, newColumn)) ) ++ colPosition.map("position" -> _) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when columns are replaced. */ case class ReplaceColumns( @@ -413,6 +484,9 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "columns" -> JsonUtils.toJson(columns.map(structFieldToMap(Seq.empty, _)))) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } case class UpgradeProtocol(newProtocol: Protocol) extends Operation("UPGRADE PROTOCOL") { @@ -422,15 +496,24 @@ object DeltaOperations { "readerFeatures" -> newProtocol.readerFeatures, "writerFeatures" -> newProtocol.writerFeatures ))) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } object ManualUpdate extends Operation("Manual Update") { override val parameters: Map[String, Any] = Map.empty + + // Unsafe manual update disables checks. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false } /** A commit without any actions. Could be used to force creation of new checkpoints. */ object EmptyCommit extends Operation("Empty Commit") { override val parameters: Map[String, Any] = Map.empty + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } case class UpdateColumnMetadata( @@ -442,6 +525,9 @@ object DeltaOperations { case (path, field) => structFieldToMap(path, field) })) } + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } case class UpdateSchema(oldSchema: StructType, newSchema: StructType) @@ -449,11 +535,17 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "oldSchema" -> JsonUtils.toJson(oldSchema), "newSchema" -> JsonUtils.toJson(newSchema)) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } case class AddConstraint( constraintName: String, expr: String) extends Operation("ADD CONSTRAINT") { override val parameters: Map[String, Any] = Map("name" -> constraintName, "expr" -> expr) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } case class DropConstraint( @@ -465,11 +557,19 @@ object DeltaOperations { Map("name" -> constraintName, "existed" -> "false") } } + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when recomputing stats on the table. */ case class ComputeStats(predicate: Seq[Expression]) - extends OperationWithPredicates("COMPUTE STATS", predicate) + extends OperationWithPredicates("COMPUTE STATS", predicate) { + + // ComputeStats operation commits AddFiles with recomputed stats which are always tight bounds, + // even when DVs are present. This check should be disabled. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false + } /** Recorded when restoring a Delta table to an older version. */ val OP_RESTORE = "RESTORE" @@ -482,6 +582,10 @@ object DeltaOperations { override def changesData: Boolean = true override val operationMetrics: Set[String] = DeltaOperationMetrics.RESTORE + + // Restore operation commits AddFiles with files, DVs and stats from the version it restores to. + // It can happen that tight bound stats were recomputed before by ComputeStats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false } sealed abstract class OptimizeOrReorg(override val name: String, predicates: Seq[Expression]) @@ -517,6 +621,9 @@ object DeltaOperations { ) override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE + + // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when cloning a Delta table into a new location. */ @@ -531,6 +638,10 @@ object DeltaOperations { ) override def changesData: Boolean = true override val operationMetrics: Set[String] = DeltaOperationMetrics.CLONE + + // Clone operation commits AddFiles with files, DVs and stats copied over from the source table. + // It can happen that tight bound stats were recomputed before by ComputeStats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false } /** @@ -548,6 +659,9 @@ object DeltaOperations { ) ++ specifiedRetentionMillis.map("specifiedRetentionMillis" -> _) override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_START + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** @@ -559,6 +673,9 @@ object DeltaOperations { ) override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_END + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when running REORG on the table. */ @@ -570,6 +687,9 @@ object DeltaOperations { ) override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE + + // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when clustering columns are changed on clustered tables. */ @@ -579,6 +699,9 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "oldClusteringColumns" -> oldClusteringColumns, "newClusteringColumns" -> newClusteringColumns) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when we backfill a Delta table's existing AddFiles with row tracking data. */ @@ -587,6 +710,10 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "batchId" -> JsonUtils.toJson(batchId) ) + + // RowTrackingBackfill operation commits AddFiles with files, DVs and stats copied over. + // It can happen that tight bound stats were recomputed before by ComputeStats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false } private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = { @@ -610,6 +737,9 @@ object DeltaOperations { /** Dummy operation only for testing with arbitrary operation names */ case class TestOperation(operationName: String = "TEST") extends Operation(operationName) { override val parameters: Map[String, Any] = Map.empty + + // Perform the check for testing. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** @@ -630,6 +760,9 @@ object DeltaOperations { case class UpgradeUniformProperties(properties: Map[String, String]) extends Operation( OP_UPGRADE_UNIFORM_BY_REORG) { override val parameters: Map[String, Any] = Map("properties" -> JsonUtils.toJson(properties)) + + // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 611a31f0b3f..109e4726d59 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -375,18 +375,6 @@ trait OptimisticTransactionImpl extends TransactionalWrite protected var checkUnsupportedDataType: Boolean = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_SCHEMA_TYPE_CHECK) - // Some operations (e.g. stats collection) may set files with DVs back to tight bounds. - // In that case they need to skip this check. - protected var checkDeletionVectorFilesHaveWideBounds: Boolean = true - /** - * Disable the check that ensures that all files with DVs added have tightBounds set to false. - * - * This is necessary when recomputing the stats on a table with DVs. - */ - def disableDeletionVectorFilesHaveWideBoundsCheck(): Unit = { - checkDeletionVectorFilesHaveWideBounds = false - } - // An array of tuples where each tuple represents a pair (colName, newHighWatermark). // This is collected after a write into Delta table with IDENTITY columns. If it's not // empty, we will update the high water marks during transaction commit. Note that the same @@ -856,7 +844,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite commitCheckEnabled && !isComputeStatsOperation && !deletionVectorCreationAllowed val addFileMustHaveWideBounds = deletionVectorCreationAllowed && - checkDeletionVectorFilesHaveWideBounds + op.checkAddFileWithDeletionVectorStatsAreNotTightBounds action => action match { case a: AddFile => @@ -871,8 +859,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite // 2. All operations that add new DVs should always turn bounds to wide. // Operations that only update files with existing DVs may opt-out from this rule - // via `disableDeletionVectorFilesHaveWideBoundsCheck()`. - // (e.g. stats collection, metadata-only updates.) + // via `checkAddFileWithDeletionVectorStatsAreNotTightBounds`. + // See that field comment in DeltaOperation for more details. // Note, the absence of the tightBounds column when DVs exist is also an illegal state. if (addFileMustHaveWideBounds && a.deletionVector != null && diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala index f9e515bda8b..1b771351c37 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala @@ -25,6 +25,8 @@ import org.apache.spark.sql.delta.stats.DeltaStatistics.{MIN, NULL_COUNT, NUM_RE import org.apache.spark.sql.delta.stats.StatisticsCollection import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ +import org.apache.spark.sql.delta.util.JsonUtils +import com.fasterxml.jackson.databind.node.ObjectNode import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.functions.{col, lit, map_values, when} @@ -113,13 +115,21 @@ class TightBoundsSuite } val exception = intercept[DeltaIllegalStateException] { - txn.commitManually(addFiles: _*) + txn.commitActions(DeltaOperations.TestOperation(), addFiles: _*) } assert(exception.getErrorClass === "DELTA_ADDING_DELETION_VECTORS_WITH_TIGHT_BOUNDS_DISALLOWED") } } + protected def getStats(snapshot: Snapshot, statName: String): Array[Row] = { + val statsColumnName = snapshot.getBaseStatsColumnName + snapshot + .withStatsDeduplicated + .select(s"$statsColumnName.$statName") + .collect() + } + protected def getStatFromLastFile(snapshot: Snapshot, statName: String): Row = { val statsColumnName = snapshot.getBaseStatsColumnName snapshot @@ -284,6 +294,79 @@ class TightBoundsSuite assert(statsAfterDelete === expectedStatsAfterDelete) } } + + def tableAddDVAndTightStats( + targetTable: () => io.delta.tables.DeltaTable, + targetLog: DeltaLog, + deleteCond: String): Unit = { + // Add DVs. Stats should have tightBounds = false afterwards. + targetTable().delete(deleteCond) + val initialStats = getStats(targetLog.update(), "*") + assert(initialStats.forall(_.get(4) === false)) // tightBounds + + // Other systems may support Compute Stats that recomputes tightBounds stats on tables with DVs. + // Simulate this with a manual update commit that introduces tight stats. + val txn = targetLog.startTransaction() + val addFiles = txn.snapshot.allFiles.collect().toSeq.map { action => + val node = JsonUtils.mapper.readTree(action.stats).asInstanceOf[ObjectNode] + assert(node.has("numRecords")) + val numRecords = node.get("numRecords").asInt() + action.copy(stats = s"""{ "numRecords" : $numRecords, "tightBounds" : true }""") + } + txn.commitActions(DeltaOperations.ManualUpdate, addFiles: _*) + } + + test("CLONE on table with DVs and tightBound stats") { + val targetDF = spark.range(0, 100, 1, 1).toDF() + withTempDeltaTable(targetDF) { (targetTable, targetLog) => + val targetPath = targetLog.dataPath.toString + tableAddDVAndTightStats(targetTable, targetLog, "id >= 80") + // CLONE shouldn't throw + // DELTA_ADDING_DELETION_VECTORS_WITH_TIGHT_BOUNDS_DISALLOWED + withTempPath("cloned") { clonedPath => + sql(s"CREATE TABLE delta.`$clonedPath` SHALLOW CLONE delta.`$targetPath`") + } + } + } + + test("RESTORE TABLE on table with DVs and tightBound stats") { + val targetDF = spark.range(0, 100, 1, 1).toDF() + withTempDeltaTable(targetDF) { (targetTable, targetLog) => + val targetPath = targetLog.dataPath.toString + // adds version 1 (delete) and 2 (compute stats) + tableAddDVAndTightStats(targetTable, targetLog, "id >= 80") + // adds version 3 (delete more) + targetTable().delete("id < 20") + // Restore back to version 2 (after compute stats) + // After 2nd delete, new DVs are added to the file, so the restore will + // have to recommit the file with old DVs. + targetTable().restoreToVersion(2) + // Verify that the restored table has DVs and tight bounds. + val stats = getStatFromLastFileWithDVs(targetLog.update(), "*") + assert(stats.get(4) === true) // tightBounds + } + } + + test("Row Tracking backfill on table with DVs and tightBound stats") { + // Enabling Row Tracking and backfill shouldn't throw + // DELTA_ADDING_DELETION_VECTORS_WITH_TIGHT_BOUNDS_DISALLOWED + withSQLConf(DeltaConfigs.ROW_TRACKING_ENABLED.defaultTablePropertyKey -> "false") { + val targetDF = spark.range(0, 100, 1, 1).toDF() + withTempDeltaTable(targetDF) { (targetTable, targetLog) => + val targetPath = targetLog.dataPath.toString + tableAddDVAndTightStats(targetTable, targetLog, "id >= 80") + // Make sure that we start with no RowTracking feature. + assert(!RowTracking.isSupported(targetLog.unsafeVolatileSnapshot.protocol)) + assert(!RowId.isEnabled(targetLog.unsafeVolatileSnapshot.protocol, + targetLog.unsafeVolatileSnapshot.metadata)) + + sql(s"ALTER TABLE delta.`$targetPath` SET TBLPROPERTIES " + + "('delta.enableRowTracking' = 'true')") + assert(targetLog.history.getHistory(None) + .count(_.operation == DeltaOperations.ROW_TRACKING_BACKFILL_OPERATION_NAME) == 1) + } + } + } } class TightBoundsColumnMappingSuite extends TightBoundsSuite with DeltaColumnMappingEnableIdMode