From d07a7bd50c0ea0af84adc51bc3652030d99d3b00 Mon Sep 17 00:00:00 2001 From: Christos Stavrakakis Date: Thu, 19 Sep 2024 18:29:44 +0200 Subject: [PATCH] Strip column mapping metadata when feature is disabled (#3688) ## Description Transactions might try to create or update the schema of a Delta table with columns that contain column mapping metadata, even when column mapping is not enabled. For example, this can happen when transactions copy the schema from another table without stripping metadata. To avoid such issues, we automatically strip column mapping metadata when column mapping is disabled. We are doing this only for new tables or for transactions that add column mapping metadata for the first time. If column metadata already exist, we cannot strip them because this would break the table. A usage log is emitted so we can understand the impact on existing tables. Note that this change covers the cases where txn.updateMetadata is called (the "proper API") and not the cases where a Metadata action is directly committed to the table. Finally, this commit changes drop column mapping command to check that all column mapping metadata do not exist, and not only physical column name and ID. ## How was this patch tested? Added new UT. --- .../spark/sql/delta/DeltaColumnMapping.scala | 67 +++++++++++++++---- .../sql/delta/OptimisticTransaction.scala | 1 + .../apache/spark/sql/delta/TableFeature.scala | 5 +- .../sql/delta/sources/DeltaSQLConf.scala | 15 +++++ .../sql/delta/DeltaColumnMappingSuite.scala | 45 +++++++++++++ 5 files changed, 117 insertions(+), 16 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index 2ce3cc1b4c..11ea6e513d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -48,6 +48,17 @@ trait DeltaColumnMappingBase extends DeltaLogging { val PARQUET_MAP_KEY_FIELD_NAME = "key" val PARQUET_MAP_VALUE_FIELD_NAME = "value" + /** + * The list of column mapping metadata for each column in the schema. + */ + val COLUMN_MAPPING_METADATA_KEYS: Set[String] = Set( + COLUMN_MAPPING_METADATA_ID_KEY, + COLUMN_MAPPING_PHYSICAL_NAME_KEY, + COLUMN_MAPPING_METADATA_NESTED_IDS_KEY, + PARQUET_FIELD_ID_METADATA_KEY, + PARQUET_MAP_VALUE_FIELD_NAME + ) + /** * This list of internal columns (and only this list) is allowed to have missing * column mapping metadata such as field id and physical name because @@ -116,6 +127,7 @@ trait DeltaColumnMappingBase extends DeltaLogging { * - upgrading to the column mapping Protocol through configurations */ def verifyAndUpdateMetadataChange( + spark: SparkSession, deltaLog: DeltaLog, oldProtocol: Protocol, oldMetadata: Metadata, @@ -136,8 +148,34 @@ trait DeltaColumnMappingBase extends DeltaLogging { oldMappingMode.name, newMappingMode.name) } - val updatedMetadata = updateColumnMappingMetadata( - oldMetadata, newMetadata, isChangingModeOnExistingTable, isOverwriteSchema) + var updatedMetadata = newMetadata + + // If column mapping is disabled, we need to strip any column mapping metadata from the schema, + // because Delta code will use them even when column mapping is not enabled. However, we cannot + // strip column mapping metadata that already exist in the schema, because this would break + // the table. + if (newMappingMode == NoMapping && + schemaHasColumnMappingMetadata(newMetadata.schema)) { + val addsColumnMappingMetadata = !schemaHasColumnMappingMetadata(oldMetadata.schema) + if (addsColumnMappingMetadata && + spark.conf.get(DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA)) { + recordDeltaEvent(deltaLog, opType = "delta.columnMapping.stripMetadata") + val strippedSchema = dropColumnMappingMetadata(newMetadata.schema) + updatedMetadata = newMetadata.copy(schemaString = strippedSchema.json) + } else { + recordDeltaEvent( + deltaLog, + opType = "delta.columnMapping.updateSchema.metadataPresentButFeatureDisabled", + data = Map( + "addsColumnMappingMetadata" -> addsColumnMappingMetadata.toString, + "isCreatingNewTable" -> isCreatingNewTable.toString, + "isOverwriteSchema" -> isOverwriteSchema.toString) + ) + } + } + + updatedMetadata = updateColumnMappingMetadata( + oldMetadata, updatedMetadata, isChangingModeOnExistingTable, isOverwriteSchema) // record column mapping table creation/upgrade if (newMappingMode != NoMapping) { @@ -455,16 +493,12 @@ trait DeltaColumnMappingBase extends DeltaLogging { def dropColumnMappingMetadata(schema: StructType): StructType = { SchemaMergingUtils.transformColumns(schema) { (_, field, _) => - field.copy( - metadata = new MetadataBuilder() - .withMetadata(field.metadata) - .remove(COLUMN_MAPPING_METADATA_ID_KEY) - .remove(COLUMN_MAPPING_METADATA_NESTED_IDS_KEY) - .remove(COLUMN_MAPPING_PHYSICAL_NAME_KEY) - .remove(PARQUET_FIELD_ID_METADATA_KEY) - .remove(PARQUET_FIELD_NESTED_IDS_METADATA_KEY) - .build() - ) + var strippedMetadataBuilder = new MetadataBuilder().withMetadata(field.metadata) + for (key <- COLUMN_MAPPING_METADATA_KEYS) { + strippedMetadataBuilder = strippedMetadataBuilder.remove(key) + } + val strippedMetadata = strippedMetadataBuilder.build() + field.copy(metadata = strippedMetadata) } } @@ -784,6 +818,15 @@ trait DeltaColumnMappingBase extends DeltaLogging { (transform(schema, new MetadataBuilder(), Seq.empty), currFieldId) } + + /** + * Returns whether the schema contains any metadata reserved for column mapping. + */ + def schemaHasColumnMappingMetadata(schema: StructType): Boolean = { + SchemaMergingUtils.explode(schema).exists { case (_, col) => + COLUMN_MAPPING_METADATA_KEYS.exists(k => col.metadata.contains(k)) + } + } } object DeltaColumnMapping extends DeltaColumnMappingBase diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 54ca25bb27..f5e15d94f2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -555,6 +555,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite // The `.schema` cannot be generated correctly unless the column mapping metadata is correctly // filled for all the fields. Therefore, the column mapping changes need to happen first. newMetadataTmp = DeltaColumnMapping.verifyAndUpdateMetadataChange( + spark, deltaLog, protocolBeforeUpdate, snapshot.metadata, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 1edf8d8c02..1c15086b76 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -540,10 +540,7 @@ object ColumnMappingTableFeature override def validateRemoval(snapshot: Snapshot): Boolean = { val schemaHasNoColumnMappingMetadata = - SchemaMergingUtils.explode(snapshot.schema).forall { case (_, col) => - !DeltaColumnMapping.hasPhysicalName(col) && - !DeltaColumnMapping.hasColumnId(col) - } + !DeltaColumnMapping.schemaHasColumnMappingMetadata(snapshot.schema) val metadataHasNoMappingMode = snapshot.metadata.columnMappingMode match { case NoMapping => true case _ => false diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index aa46bd9bd0..857656b16a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1606,6 +1606,21 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_COLUMN_MAPPING_STRIP_METADATA = + buildConf("columnMapping.stripMetadata") + .doc( + """ + |Transactions might try to update the schema of a table with columns that contain + |column mapping metadata, even when column mapping is not enabled. For example, this + |can happen when transactions copy the schema from another table. When this setting is + |enabled, we will strip the column mapping metadata from the schema before applying it. + |Note that this config applies only when the existing schema of the table does not + |contain any column mapping metadata. + |""".stripMargin) + .internal() + .booleanConf + .createWithDefault(true) + val DYNAMIC_PARTITION_OVERWRITE_ENABLED = buildConf("dynamicPartitionOverwrite.enabled") .doc("Whether to overwrite partitions dynamically when 'partitionOverwriteMode' is set to " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala index e6663feb17..54ba60ba65 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate +import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, Metadata => MetadataAction, Protocol, SetTransaction} import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.schema.SchemaMergingUtils @@ -2046,4 +2047,48 @@ class DeltaColumnMappingSuite extends QueryTest } } } + + for (txnIntroducesMetadata <- BOOLEAN_DOMAIN) { + test("column mapping metadata are stripped when feature is disabled - " + + s"txnIntroducesMetadata=$txnIntroducesMetadata") { + withTempDir { dir => + val tablePath = dir.getCanonicalPath + val deltaLog = DeltaLog.forTable(spark, tablePath) + // Create the original table. + val schemaV0 = if (txnIntroducesMetadata) { + new StructType().add("id", LongType, true) + } else { + new StructType().add("id", LongType, true, withIdAndPhysicalName(0, "col-0")) + } + withSQLConf(DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA.key -> "false") { + deltaLog.withNewTransaction(catalogTableOpt = None) { txn => + val metadata = actions.Metadata( + name = "testTable", + schemaString = schemaV0.json, + configuration = Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NoMapping.name) + ) + txn.updateMetadata(metadata) + txn.commit(Seq.empty, ManualUpdate) + } + } + val metadataV0 = deltaLog.update().metadata + assert(DeltaColumnMapping.schemaHasColumnMappingMetadata(metadataV0.schema) === + !txnIntroducesMetadata) + + // Update the schema of the existing table. + withSQLConf(DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA.key -> "true") { + deltaLog.withNewTransaction(catalogTableOpt = None) { txn => + val schemaV1 = + schemaV0.add("value", LongType, true, withIdAndPhysicalName(0, "col-0")) + val metadata = metadataV0.copy(schemaString = schemaV1.json) + txn.updateMetadata(metadata) + txn.commit(Seq.empty, ManualUpdate) + } + val metadataV1 = deltaLog.update().metadata + assert(DeltaColumnMapping.schemaHasColumnMappingMetadata(metadataV1.schema) === + !txnIntroducesMetadata) + } + } + } + } }