diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index 2ce3cc1b4c..11ea6e513d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -48,6 +48,17 @@ trait DeltaColumnMappingBase extends DeltaLogging { val PARQUET_MAP_KEY_FIELD_NAME = "key" val PARQUET_MAP_VALUE_FIELD_NAME = "value" + /** + * The list of column mapping metadata for each column in the schema. + */ + val COLUMN_MAPPING_METADATA_KEYS: Set[String] = Set( + COLUMN_MAPPING_METADATA_ID_KEY, + COLUMN_MAPPING_PHYSICAL_NAME_KEY, + COLUMN_MAPPING_METADATA_NESTED_IDS_KEY, + PARQUET_FIELD_ID_METADATA_KEY, + PARQUET_MAP_VALUE_FIELD_NAME + ) + /** * This list of internal columns (and only this list) is allowed to have missing * column mapping metadata such as field id and physical name because @@ -116,6 +127,7 @@ trait DeltaColumnMappingBase extends DeltaLogging { * - upgrading to the column mapping Protocol through configurations */ def verifyAndUpdateMetadataChange( + spark: SparkSession, deltaLog: DeltaLog, oldProtocol: Protocol, oldMetadata: Metadata, @@ -136,8 +148,34 @@ trait DeltaColumnMappingBase extends DeltaLogging { oldMappingMode.name, newMappingMode.name) } - val updatedMetadata = updateColumnMappingMetadata( - oldMetadata, newMetadata, isChangingModeOnExistingTable, isOverwriteSchema) + var updatedMetadata = newMetadata + + // If column mapping is disabled, we need to strip any column mapping metadata from the schema, + // because Delta code will use them even when column mapping is not enabled. However, we cannot + // strip column mapping metadata that already exist in the schema, because this would break + // the table. + if (newMappingMode == NoMapping && + schemaHasColumnMappingMetadata(newMetadata.schema)) { + val addsColumnMappingMetadata = !schemaHasColumnMappingMetadata(oldMetadata.schema) + if (addsColumnMappingMetadata && + spark.conf.get(DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA)) { + recordDeltaEvent(deltaLog, opType = "delta.columnMapping.stripMetadata") + val strippedSchema = dropColumnMappingMetadata(newMetadata.schema) + updatedMetadata = newMetadata.copy(schemaString = strippedSchema.json) + } else { + recordDeltaEvent( + deltaLog, + opType = "delta.columnMapping.updateSchema.metadataPresentButFeatureDisabled", + data = Map( + "addsColumnMappingMetadata" -> addsColumnMappingMetadata.toString, + "isCreatingNewTable" -> isCreatingNewTable.toString, + "isOverwriteSchema" -> isOverwriteSchema.toString) + ) + } + } + + updatedMetadata = updateColumnMappingMetadata( + oldMetadata, updatedMetadata, isChangingModeOnExistingTable, isOverwriteSchema) // record column mapping table creation/upgrade if (newMappingMode != NoMapping) { @@ -455,16 +493,12 @@ trait DeltaColumnMappingBase extends DeltaLogging { def dropColumnMappingMetadata(schema: StructType): StructType = { SchemaMergingUtils.transformColumns(schema) { (_, field, _) => - field.copy( - metadata = new MetadataBuilder() - .withMetadata(field.metadata) - .remove(COLUMN_MAPPING_METADATA_ID_KEY) - .remove(COLUMN_MAPPING_METADATA_NESTED_IDS_KEY) - .remove(COLUMN_MAPPING_PHYSICAL_NAME_KEY) - .remove(PARQUET_FIELD_ID_METADATA_KEY) - .remove(PARQUET_FIELD_NESTED_IDS_METADATA_KEY) - .build() - ) + var strippedMetadataBuilder = new MetadataBuilder().withMetadata(field.metadata) + for (key <- COLUMN_MAPPING_METADATA_KEYS) { + strippedMetadataBuilder = strippedMetadataBuilder.remove(key) + } + val strippedMetadata = strippedMetadataBuilder.build() + field.copy(metadata = strippedMetadata) } } @@ -784,6 +818,15 @@ trait DeltaColumnMappingBase extends DeltaLogging { (transform(schema, new MetadataBuilder(), Seq.empty), currFieldId) } + + /** + * Returns whether the schema contains any metadata reserved for column mapping. + */ + def schemaHasColumnMappingMetadata(schema: StructType): Boolean = { + SchemaMergingUtils.explode(schema).exists { case (_, col) => + COLUMN_MAPPING_METADATA_KEYS.exists(k => col.metadata.contains(k)) + } + } } object DeltaColumnMapping extends DeltaColumnMappingBase diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 54ca25bb27..f5e15d94f2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -555,6 +555,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite // The `.schema` cannot be generated correctly unless the column mapping metadata is correctly // filled for all the fields. Therefore, the column mapping changes need to happen first. newMetadataTmp = DeltaColumnMapping.verifyAndUpdateMetadataChange( + spark, deltaLog, protocolBeforeUpdate, snapshot.metadata, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 1edf8d8c02..1c15086b76 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -540,10 +540,7 @@ object ColumnMappingTableFeature override def validateRemoval(snapshot: Snapshot): Boolean = { val schemaHasNoColumnMappingMetadata = - SchemaMergingUtils.explode(snapshot.schema).forall { case (_, col) => - !DeltaColumnMapping.hasPhysicalName(col) && - !DeltaColumnMapping.hasColumnId(col) - } + !DeltaColumnMapping.schemaHasColumnMappingMetadata(snapshot.schema) val metadataHasNoMappingMode = snapshot.metadata.columnMappingMode match { case NoMapping => true case _ => false diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index aa46bd9bd0..857656b16a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1606,6 +1606,21 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_COLUMN_MAPPING_STRIP_METADATA = + buildConf("columnMapping.stripMetadata") + .doc( + """ + |Transactions might try to update the schema of a table with columns that contain + |column mapping metadata, even when column mapping is not enabled. For example, this + |can happen when transactions copy the schema from another table. When this setting is + |enabled, we will strip the column mapping metadata from the schema before applying it. + |Note that this config applies only when the existing schema of the table does not + |contain any column mapping metadata. + |""".stripMargin) + .internal() + .booleanConf + .createWithDefault(true) + val DYNAMIC_PARTITION_OVERWRITE_ENABLED = buildConf("dynamicPartitionOverwrite.enabled") .doc("Whether to overwrite partitions dynamically when 'partitionOverwriteMode' is set to " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala index e6663feb17..54ba60ba65 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate +import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, Metadata => MetadataAction, Protocol, SetTransaction} import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.schema.SchemaMergingUtils @@ -2046,4 +2047,48 @@ class DeltaColumnMappingSuite extends QueryTest } } } + + for (txnIntroducesMetadata <- BOOLEAN_DOMAIN) { + test("column mapping metadata are stripped when feature is disabled - " + + s"txnIntroducesMetadata=$txnIntroducesMetadata") { + withTempDir { dir => + val tablePath = dir.getCanonicalPath + val deltaLog = DeltaLog.forTable(spark, tablePath) + // Create the original table. + val schemaV0 = if (txnIntroducesMetadata) { + new StructType().add("id", LongType, true) + } else { + new StructType().add("id", LongType, true, withIdAndPhysicalName(0, "col-0")) + } + withSQLConf(DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA.key -> "false") { + deltaLog.withNewTransaction(catalogTableOpt = None) { txn => + val metadata = actions.Metadata( + name = "testTable", + schemaString = schemaV0.json, + configuration = Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NoMapping.name) + ) + txn.updateMetadata(metadata) + txn.commit(Seq.empty, ManualUpdate) + } + } + val metadataV0 = deltaLog.update().metadata + assert(DeltaColumnMapping.schemaHasColumnMappingMetadata(metadataV0.schema) === + !txnIntroducesMetadata) + + // Update the schema of the existing table. + withSQLConf(DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA.key -> "true") { + deltaLog.withNewTransaction(catalogTableOpt = None) { txn => + val schemaV1 = + schemaV0.add("value", LongType, true, withIdAndPhysicalName(0, "col-0")) + val metadata = metadataV0.copy(schemaString = schemaV1.json) + txn.updateMetadata(metadata) + txn.commit(Seq.empty, ManualUpdate) + } + val metadataV1 = deltaLog.update().metadata + assert(DeltaColumnMapping.schemaHasColumnMappingMetadata(metadataV1.schema) === + !txnIntroducesMetadata) + } + } + } + } }