Skip to content

Commit

Permalink
Strip column mapping metadata when feature is disabled (#3688)
Browse files Browse the repository at this point in the history
## Description

Transactions might try to create or update the schema of a Delta table
with columns that contain column mapping metadata, even when column
mapping is not enabled. For example, this can happen when transactions
copy the schema from another table without stripping metadata.

To avoid such issues, we automatically strip column mapping metadata
when column mapping is disabled. We are doing this only for new tables
or for transactions that add column mapping metadata for the first time.
If column metadata already exist, we cannot strip them because this
would break the table. A usage log is emitted so we can understand the
impact on existing tables.

Note that this change covers the cases where txn.updateMetadata is
called (the "proper API") and not the cases where a Metadata action is
directly committed to the table.

Finally, this commit changes drop column mapping command to check that
all column mapping metadata do not exist, and not only physical column
name and ID.

## How was this patch tested?
Added new UT.
  • Loading branch information
cstavr committed Sep 19, 2024
1 parent 93eef11 commit d07a7bd
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ trait DeltaColumnMappingBase extends DeltaLogging {
val PARQUET_MAP_KEY_FIELD_NAME = "key"
val PARQUET_MAP_VALUE_FIELD_NAME = "value"

/**
* The list of column mapping metadata for each column in the schema.
*/
val COLUMN_MAPPING_METADATA_KEYS: Set[String] = Set(
COLUMN_MAPPING_METADATA_ID_KEY,
COLUMN_MAPPING_PHYSICAL_NAME_KEY,
COLUMN_MAPPING_METADATA_NESTED_IDS_KEY,
PARQUET_FIELD_ID_METADATA_KEY,
PARQUET_MAP_VALUE_FIELD_NAME
)

/**
* This list of internal columns (and only this list) is allowed to have missing
* column mapping metadata such as field id and physical name because
Expand Down Expand Up @@ -116,6 +127,7 @@ trait DeltaColumnMappingBase extends DeltaLogging {
* - upgrading to the column mapping Protocol through configurations
*/
def verifyAndUpdateMetadataChange(
spark: SparkSession,
deltaLog: DeltaLog,
oldProtocol: Protocol,
oldMetadata: Metadata,
Expand All @@ -136,8 +148,34 @@ trait DeltaColumnMappingBase extends DeltaLogging {
oldMappingMode.name, newMappingMode.name)
}

val updatedMetadata = updateColumnMappingMetadata(
oldMetadata, newMetadata, isChangingModeOnExistingTable, isOverwriteSchema)
var updatedMetadata = newMetadata

// If column mapping is disabled, we need to strip any column mapping metadata from the schema,
// because Delta code will use them even when column mapping is not enabled. However, we cannot
// strip column mapping metadata that already exist in the schema, because this would break
// the table.
if (newMappingMode == NoMapping &&
schemaHasColumnMappingMetadata(newMetadata.schema)) {
val addsColumnMappingMetadata = !schemaHasColumnMappingMetadata(oldMetadata.schema)
if (addsColumnMappingMetadata &&
spark.conf.get(DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA)) {
recordDeltaEvent(deltaLog, opType = "delta.columnMapping.stripMetadata")
val strippedSchema = dropColumnMappingMetadata(newMetadata.schema)
updatedMetadata = newMetadata.copy(schemaString = strippedSchema.json)
} else {
recordDeltaEvent(
deltaLog,
opType = "delta.columnMapping.updateSchema.metadataPresentButFeatureDisabled",
data = Map(
"addsColumnMappingMetadata" -> addsColumnMappingMetadata.toString,
"isCreatingNewTable" -> isCreatingNewTable.toString,
"isOverwriteSchema" -> isOverwriteSchema.toString)
)
}
}

updatedMetadata = updateColumnMappingMetadata(
oldMetadata, updatedMetadata, isChangingModeOnExistingTable, isOverwriteSchema)

// record column mapping table creation/upgrade
if (newMappingMode != NoMapping) {
Expand Down Expand Up @@ -455,16 +493,12 @@ trait DeltaColumnMappingBase extends DeltaLogging {

def dropColumnMappingMetadata(schema: StructType): StructType = {
SchemaMergingUtils.transformColumns(schema) { (_, field, _) =>
field.copy(
metadata = new MetadataBuilder()
.withMetadata(field.metadata)
.remove(COLUMN_MAPPING_METADATA_ID_KEY)
.remove(COLUMN_MAPPING_METADATA_NESTED_IDS_KEY)
.remove(COLUMN_MAPPING_PHYSICAL_NAME_KEY)
.remove(PARQUET_FIELD_ID_METADATA_KEY)
.remove(PARQUET_FIELD_NESTED_IDS_METADATA_KEY)
.build()
)
var strippedMetadataBuilder = new MetadataBuilder().withMetadata(field.metadata)
for (key <- COLUMN_MAPPING_METADATA_KEYS) {
strippedMetadataBuilder = strippedMetadataBuilder.remove(key)
}
val strippedMetadata = strippedMetadataBuilder.build()
field.copy(metadata = strippedMetadata)
}
}

Expand Down Expand Up @@ -784,6 +818,15 @@ trait DeltaColumnMappingBase extends DeltaLogging {

(transform(schema, new MetadataBuilder(), Seq.empty), currFieldId)
}

/**
* Returns whether the schema contains any metadata reserved for column mapping.
*/
def schemaHasColumnMappingMetadata(schema: StructType): Boolean = {
SchemaMergingUtils.explode(schema).exists { case (_, col) =>
COLUMN_MAPPING_METADATA_KEYS.exists(k => col.metadata.contains(k))
}
}
}

object DeltaColumnMapping extends DeltaColumnMappingBase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
// The `.schema` cannot be generated correctly unless the column mapping metadata is correctly
// filled for all the fields. Therefore, the column mapping changes need to happen first.
newMetadataTmp = DeltaColumnMapping.verifyAndUpdateMetadataChange(
spark,
deltaLog,
protocolBeforeUpdate,
snapshot.metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,7 @@ object ColumnMappingTableFeature

override def validateRemoval(snapshot: Snapshot): Boolean = {
val schemaHasNoColumnMappingMetadata =
SchemaMergingUtils.explode(snapshot.schema).forall { case (_, col) =>
!DeltaColumnMapping.hasPhysicalName(col) &&
!DeltaColumnMapping.hasColumnId(col)
}
!DeltaColumnMapping.schemaHasColumnMappingMetadata(snapshot.schema)
val metadataHasNoMappingMode = snapshot.metadata.columnMappingMode match {
case NoMapping => true
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,21 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_COLUMN_MAPPING_STRIP_METADATA =
buildConf("columnMapping.stripMetadata")
.doc(
"""
|Transactions might try to update the schema of a table with columns that contain
|column mapping metadata, even when column mapping is not enabled. For example, this
|can happen when transactions copy the schema from another table. When this setting is
|enabled, we will strip the column mapping metadata from the schema before applying it.
|Note that this config applies only when the existing schema of the table does not
|contain any column mapping metadata.
|""".stripMargin)
.internal()
.booleanConf
.createWithDefault(true)

val DYNAMIC_PARTITION_OVERWRITE_ENABLED =
buildConf("dynamicPartitionOverwrite.enabled")
.doc("Whether to overwrite partitions dynamically when 'partitionOverwriteMode' is set to " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, Metadata => MetadataAction, Protocol, SetTransaction}
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
Expand Down Expand Up @@ -2046,4 +2047,48 @@ class DeltaColumnMappingSuite extends QueryTest
}
}
}

for (txnIntroducesMetadata <- BOOLEAN_DOMAIN) {
test("column mapping metadata are stripped when feature is disabled - " +
s"txnIntroducesMetadata=$txnIntroducesMetadata") {
withTempDir { dir =>
val tablePath = dir.getCanonicalPath
val deltaLog = DeltaLog.forTable(spark, tablePath)
// Create the original table.
val schemaV0 = if (txnIntroducesMetadata) {
new StructType().add("id", LongType, true)
} else {
new StructType().add("id", LongType, true, withIdAndPhysicalName(0, "col-0"))
}
withSQLConf(DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA.key -> "false") {
deltaLog.withNewTransaction(catalogTableOpt = None) { txn =>
val metadata = actions.Metadata(
name = "testTable",
schemaString = schemaV0.json,
configuration = Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NoMapping.name)
)
txn.updateMetadata(metadata)
txn.commit(Seq.empty, ManualUpdate)
}
}
val metadataV0 = deltaLog.update().metadata
assert(DeltaColumnMapping.schemaHasColumnMappingMetadata(metadataV0.schema) ===
!txnIntroducesMetadata)

// Update the schema of the existing table.
withSQLConf(DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA.key -> "true") {
deltaLog.withNewTransaction(catalogTableOpt = None) { txn =>
val schemaV1 =
schemaV0.add("value", LongType, true, withIdAndPhysicalName(0, "col-0"))
val metadata = metadataV0.copy(schemaString = schemaV1.json)
txn.updateMetadata(metadata)
txn.commit(Seq.empty, ManualUpdate)
}
val metadataV1 = deltaLog.update().metadata
assert(DeltaColumnMapping.schemaHasColumnMappingMetadata(metadataV1.schema) ===
!txnIntroducesMetadata)
}
}
}
}
}

0 comments on commit d07a7bd

Please sign in to comment.