diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index 30680baacc..841d950b7f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -72,6 +72,25 @@ object DeltaOperations { transformer.transformToString(metric, allMetrics) } } + + /** + * A transaction that commits AddFile actions with deletionVector should have column stats that + * are not tight bounds. An exception to this is ComputeStats operation, which recomputes stats + * on these files, and the new stats are tight bounds. Some other operations that merely take an + * existing AddFile action and commit a copy of it, not changing the deletionVector or stats, + * can then also recommit AddFile with deletionVector and tight bound stats that were recomputed + * before. + * + * An operation for which this can happen, and there is no way that it could be committing + * new deletion vectors, should set this to false to bypass this check. + * All other operations should set this to true, so that this is validated during commit. + * + * This is abstract to force the implementers of all operations to think about this setting. + * All operations should add a comment justifying this setting. + * Any operation that sets this to false should add a test in TightBoundsSuite. + */ + def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean + } abstract class OperationWithPredicates(name: String, val predicates: Seq[Expression]) @@ -133,6 +152,10 @@ object DeltaOperations { DeltaOperationMetrics.WRITE_REPLACE_WHERE } override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. + // DVs can be introduced by the replaceWhere operation. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } case class RemoveColumnMapping( @@ -140,6 +163,9 @@ object DeltaOperations { override def parameters: Map[String, Any] = Map() override val operationMetrics: Set[String] = DeltaOperationMetrics.REMOVE_COLUMN_MAPPING + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded during streaming inserts. */ @@ -154,6 +180,9 @@ object DeltaOperations { ) override val operationMetrics: Set[String] = DeltaOperationMetrics.STREAMING_UPDATE override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded while deleting certain partitions. */ case class Delete(predicate: Seq[Expression]) @@ -175,12 +204,18 @@ object DeltaOperations { strMetrics ++ dvMetrics } override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when truncating the table. */ case class Truncate() extends Operation("TRUNCATE") { override val parameters: Map[String, Any] = Map.empty override val operationMetrics: Set[String] = DeltaOperationMetrics.TRUNCATE override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when converting a table into a Delta table. */ @@ -198,6 +233,9 @@ object DeltaOperations { sourceFormat.map("sourceFormat" -> _) override val operationMetrics: Set[String] = DeltaOperationMetrics.CONVERT override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Represents the predicates and action type (insert, update, delete) for a Merge clause */ @@ -265,6 +303,9 @@ object DeltaOperations { } override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } object Merge { @@ -296,6 +337,9 @@ object DeltaOperations { val dvMetrics = transformDeletionVectorMetrics(metrics) super.transformMetrics(metrics) ++ dvMetrics } + + // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when the table is created. */ case class CreateTable( @@ -317,6 +361,9 @@ object DeltaOperations { DeltaOperationMetrics.WRITE } override def changesData: Boolean = asSelect + + // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when the table is replaced. */ case class ReplaceTable( @@ -341,12 +388,18 @@ object DeltaOperations { DeltaOperationMetrics.WRITE } override def changesData: Boolean = true + + // This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when the table properties are set. */ val OP_SET_TBLPROPERTIES = "SET TBLPROPERTIES" case class SetTableProperties( properties: Map[String, String]) extends Operation(OP_SET_TBLPROPERTIES) { override val parameters: Map[String, Any] = Map("properties" -> JsonUtils.toJson(properties)) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when the table properties are unset. */ case class UnsetTableProperties( @@ -355,6 +408,9 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "properties" -> JsonUtils.toJson(propKeys), "ifExists" -> ifExists) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when dropping a table feature. */ case class DropTableFeature( @@ -363,6 +419,9 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "featureName" -> featureName, "truncateHistory" -> truncateHistory) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when columns are added. */ case class AddColumns( @@ -375,6 +434,9 @@ object DeltaOperations { "column" -> structFieldToMap(columnPath, column) ) ++ colPosition.map("position" -> _.toString) })) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when columns are dropped. */ @@ -384,6 +446,9 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "columns" -> JsonUtils.toJson(colsToDrop.map(UnresolvedAttribute(_).name))) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when column is renamed */ @@ -394,6 +459,9 @@ object DeltaOperations { "oldColumnPath" -> UnresolvedAttribute(oldColumnPath).name, "newColumnPath" -> UnresolvedAttribute(newColumnPath).name ) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when columns are changed. */ @@ -406,6 +474,9 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "column" -> JsonUtils.toJson(structFieldToMap(columnPath, newColumn)) ) ++ colPosition.map("position" -> _) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when columns are replaced. */ case class ReplaceColumns( @@ -413,6 +484,9 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "columns" -> JsonUtils.toJson(columns.map(structFieldToMap(Seq.empty, _)))) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } case class UpgradeProtocol(newProtocol: Protocol) extends Operation("UPGRADE PROTOCOL") { @@ -422,15 +496,24 @@ object DeltaOperations { "readerFeatures" -> newProtocol.readerFeatures, "writerFeatures" -> newProtocol.writerFeatures ))) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } object ManualUpdate extends Operation("Manual Update") { override val parameters: Map[String, Any] = Map.empty + + // Unsafe manual update disables checks. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false } /** A commit without any actions. Could be used to force creation of new checkpoints. */ object EmptyCommit extends Operation("Empty Commit") { override val parameters: Map[String, Any] = Map.empty + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } case class UpdateColumnMetadata( @@ -442,6 +525,9 @@ object DeltaOperations { case (path, field) => structFieldToMap(path, field) })) } + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } case class UpdateSchema(oldSchema: StructType, newSchema: StructType) @@ -449,11 +535,17 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "oldSchema" -> JsonUtils.toJson(oldSchema), "newSchema" -> JsonUtils.toJson(newSchema)) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } case class AddConstraint( constraintName: String, expr: String) extends Operation("ADD CONSTRAINT") { override val parameters: Map[String, Any] = Map("name" -> constraintName, "expr" -> expr) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } case class DropConstraint( @@ -465,11 +557,19 @@ object DeltaOperations { Map("name" -> constraintName, "existed" -> "false") } } + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when recomputing stats on the table. */ case class ComputeStats(predicate: Seq[Expression]) - extends OperationWithPredicates("COMPUTE STATS", predicate) + extends OperationWithPredicates("COMPUTE STATS", predicate) { + + // ComputeStats operation commits AddFiles with recomputed stats which are always tight bounds, + // even when DVs are present. This check should be disabled. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false + } /** Recorded when restoring a Delta table to an older version. */ val OP_RESTORE = "RESTORE" @@ -482,6 +582,10 @@ object DeltaOperations { override def changesData: Boolean = true override val operationMetrics: Set[String] = DeltaOperationMetrics.RESTORE + + // Restore operation commits AddFiles with files, DVs and stats from the version it restores to. + // It can happen that tight bound stats were recomputed before by ComputeStats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false } sealed abstract class OptimizeOrReorg(override val name: String, predicates: Seq[Expression]) @@ -517,6 +621,9 @@ object DeltaOperations { ) override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE + + // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when cloning a Delta table into a new location. */ @@ -531,6 +638,10 @@ object DeltaOperations { ) override def changesData: Boolean = true override val operationMetrics: Set[String] = DeltaOperationMetrics.CLONE + + // Clone operation commits AddFiles with files, DVs and stats copied over from the source table. + // It can happen that tight bound stats were recomputed before by ComputeStats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false } /** @@ -548,6 +659,9 @@ object DeltaOperations { ) ++ specifiedRetentionMillis.map("specifiedRetentionMillis" -> _) override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_START + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** @@ -559,6 +673,9 @@ object DeltaOperations { ) override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_END + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when running REORG on the table. */ @@ -570,6 +687,9 @@ object DeltaOperations { ) override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE + + // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when clustering columns are changed on clustered tables. */ @@ -579,6 +699,9 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "oldClusteringColumns" -> oldClusteringColumns, "newClusteringColumns" -> newClusteringColumns) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** Recorded when we backfill a Delta table's existing AddFiles with row tracking data. */ @@ -587,6 +710,10 @@ object DeltaOperations { override val parameters: Map[String, Any] = Map( "batchId" -> JsonUtils.toJson(batchId) ) + + // RowTrackingBackfill operation commits AddFiles with files, DVs and stats copied over. + // It can happen that tight bound stats were recomputed before by ComputeStats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false } private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = { @@ -610,6 +737,9 @@ object DeltaOperations { /** Dummy operation only for testing with arbitrary operation names */ case class TestOperation(operationName: String = "TEST") extends Operation(operationName) { override val parameters: Map[String, Any] = Map.empty + + // Perform the check for testing. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } /** @@ -630,6 +760,9 @@ object DeltaOperations { case class UpgradeUniformProperties(properties: Map[String, String]) extends Operation( OP_UPGRADE_UNIFORM_BY_REORG) { override val parameters: Map[String, Any] = Map("properties" -> JsonUtils.toJson(properties)) + + // This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 611a31f0b3..109e4726d5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -375,18 +375,6 @@ trait OptimisticTransactionImpl extends TransactionalWrite protected var checkUnsupportedDataType: Boolean = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_SCHEMA_TYPE_CHECK) - // Some operations (e.g. stats collection) may set files with DVs back to tight bounds. - // In that case they need to skip this check. - protected var checkDeletionVectorFilesHaveWideBounds: Boolean = true - /** - * Disable the check that ensures that all files with DVs added have tightBounds set to false. - * - * This is necessary when recomputing the stats on a table with DVs. - */ - def disableDeletionVectorFilesHaveWideBoundsCheck(): Unit = { - checkDeletionVectorFilesHaveWideBounds = false - } - // An array of tuples where each tuple represents a pair (colName, newHighWatermark). // This is collected after a write into Delta table with IDENTITY columns. If it's not // empty, we will update the high water marks during transaction commit. Note that the same @@ -856,7 +844,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite commitCheckEnabled && !isComputeStatsOperation && !deletionVectorCreationAllowed val addFileMustHaveWideBounds = deletionVectorCreationAllowed && - checkDeletionVectorFilesHaveWideBounds + op.checkAddFileWithDeletionVectorStatsAreNotTightBounds action => action match { case a: AddFile => @@ -871,8 +859,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite // 2. All operations that add new DVs should always turn bounds to wide. // Operations that only update files with existing DVs may opt-out from this rule - // via `disableDeletionVectorFilesHaveWideBoundsCheck()`. - // (e.g. stats collection, metadata-only updates.) + // via `checkAddFileWithDeletionVectorStatsAreNotTightBounds`. + // See that field comment in DeltaOperation for more details. // Note, the absence of the tightBounds column when DVs exist is also an illegal state. if (addFileMustHaveWideBounds && a.deletionVector != null && diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala index f9e515bda8..1b771351c3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala @@ -25,6 +25,8 @@ import org.apache.spark.sql.delta.stats.DeltaStatistics.{MIN, NULL_COUNT, NUM_RE import org.apache.spark.sql.delta.stats.StatisticsCollection import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ +import org.apache.spark.sql.delta.util.JsonUtils +import com.fasterxml.jackson.databind.node.ObjectNode import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.functions.{col, lit, map_values, when} @@ -113,13 +115,21 @@ class TightBoundsSuite } val exception = intercept[DeltaIllegalStateException] { - txn.commitManually(addFiles: _*) + txn.commitActions(DeltaOperations.TestOperation(), addFiles: _*) } assert(exception.getErrorClass === "DELTA_ADDING_DELETION_VECTORS_WITH_TIGHT_BOUNDS_DISALLOWED") } } + protected def getStats(snapshot: Snapshot, statName: String): Array[Row] = { + val statsColumnName = snapshot.getBaseStatsColumnName + snapshot + .withStatsDeduplicated + .select(s"$statsColumnName.$statName") + .collect() + } + protected def getStatFromLastFile(snapshot: Snapshot, statName: String): Row = { val statsColumnName = snapshot.getBaseStatsColumnName snapshot @@ -284,6 +294,79 @@ class TightBoundsSuite assert(statsAfterDelete === expectedStatsAfterDelete) } } + + def tableAddDVAndTightStats( + targetTable: () => io.delta.tables.DeltaTable, + targetLog: DeltaLog, + deleteCond: String): Unit = { + // Add DVs. Stats should have tightBounds = false afterwards. + targetTable().delete(deleteCond) + val initialStats = getStats(targetLog.update(), "*") + assert(initialStats.forall(_.get(4) === false)) // tightBounds + + // Other systems may support Compute Stats that recomputes tightBounds stats on tables with DVs. + // Simulate this with a manual update commit that introduces tight stats. + val txn = targetLog.startTransaction() + val addFiles = txn.snapshot.allFiles.collect().toSeq.map { action => + val node = JsonUtils.mapper.readTree(action.stats).asInstanceOf[ObjectNode] + assert(node.has("numRecords")) + val numRecords = node.get("numRecords").asInt() + action.copy(stats = s"""{ "numRecords" : $numRecords, "tightBounds" : true }""") + } + txn.commitActions(DeltaOperations.ManualUpdate, addFiles: _*) + } + + test("CLONE on table with DVs and tightBound stats") { + val targetDF = spark.range(0, 100, 1, 1).toDF() + withTempDeltaTable(targetDF) { (targetTable, targetLog) => + val targetPath = targetLog.dataPath.toString + tableAddDVAndTightStats(targetTable, targetLog, "id >= 80") + // CLONE shouldn't throw + // DELTA_ADDING_DELETION_VECTORS_WITH_TIGHT_BOUNDS_DISALLOWED + withTempPath("cloned") { clonedPath => + sql(s"CREATE TABLE delta.`$clonedPath` SHALLOW CLONE delta.`$targetPath`") + } + } + } + + test("RESTORE TABLE on table with DVs and tightBound stats") { + val targetDF = spark.range(0, 100, 1, 1).toDF() + withTempDeltaTable(targetDF) { (targetTable, targetLog) => + val targetPath = targetLog.dataPath.toString + // adds version 1 (delete) and 2 (compute stats) + tableAddDVAndTightStats(targetTable, targetLog, "id >= 80") + // adds version 3 (delete more) + targetTable().delete("id < 20") + // Restore back to version 2 (after compute stats) + // After 2nd delete, new DVs are added to the file, so the restore will + // have to recommit the file with old DVs. + targetTable().restoreToVersion(2) + // Verify that the restored table has DVs and tight bounds. + val stats = getStatFromLastFileWithDVs(targetLog.update(), "*") + assert(stats.get(4) === true) // tightBounds + } + } + + test("Row Tracking backfill on table with DVs and tightBound stats") { + // Enabling Row Tracking and backfill shouldn't throw + // DELTA_ADDING_DELETION_VECTORS_WITH_TIGHT_BOUNDS_DISALLOWED + withSQLConf(DeltaConfigs.ROW_TRACKING_ENABLED.defaultTablePropertyKey -> "false") { + val targetDF = spark.range(0, 100, 1, 1).toDF() + withTempDeltaTable(targetDF) { (targetTable, targetLog) => + val targetPath = targetLog.dataPath.toString + tableAddDVAndTightStats(targetTable, targetLog, "id >= 80") + // Make sure that we start with no RowTracking feature. + assert(!RowTracking.isSupported(targetLog.unsafeVolatileSnapshot.protocol)) + assert(!RowId.isEnabled(targetLog.unsafeVolatileSnapshot.protocol, + targetLog.unsafeVolatileSnapshot.metadata)) + + sql(s"ALTER TABLE delta.`$targetPath` SET TBLPROPERTIES " + + "('delta.enableRowTracking' = 'true')") + assert(targetLog.history.getHistory(None) + .count(_.operation == DeltaOperations.ROW_TRACKING_BACKFILL_OPERATION_NAME) == 1) + } + } + } } class TightBoundsColumnMappingSuite extends TightBoundsSuite with DeltaColumnMappingEnableIdMode