Skip to content

Commit 4050f94

Browse files
committed
More cleanup
1 parent ea00e3c commit 4050f94

File tree

2 files changed

+39
-35
lines changed

2 files changed

+39
-35
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1704,9 +1704,11 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
17041704
resolveAssignments(assignments, m, MergeResolvePolicy.BOTH,
17051705
throws = throws))
17061706
case UpdateStarAction(updateCondition) =>
1707-
// Use only source columns. Missing columns in target will be handled in
1708-
// ResolveRowLevelCommandAssignments.
1707+
// Expand star to top level source columns. If source has less columns than target,
1708+
// assignments will be added by ResolveRowLevelCommandAssignments later.
17091709
val assignments = if (m.schemaEvolutionEnabled) {
1710+
// For schema evolution case, generate assignments for missing target columns.
1711+
// These columns will be added by ResolveMergeIntoTableSchemaEvolution later.
17101712
sourceTable.output.map(sourceAttr =>
17111713
findAttrInTarget(sourceAttr.name).map(
17121714
targetAttr => Assignment(targetAttr, sourceAttr))
@@ -1719,10 +1721,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
17191721
targetAttr => Assignment(targetAttr, sourceAttr))
17201722
}
17211723
}
1722-
1723-
// sourceTable.output.find(
1724-
// sourceCol => conf.resolver(sourceCol.name, targetAttr.name))
1725-
// .map(Assignment(targetAttr, _))}
17261724
UpdateAction(
17271725
updateCondition.map(resolveExpressionByPlanChildren(_, m)),
17281726
// For UPDATE *, the value must be from source table.
@@ -1745,9 +1743,11 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
17451743
// access columns from the source table.
17461744
val resolvedInsertCondition = insertCondition.map(
17471745
resolveExpressionByPlanOutput(_, m.sourceTable))
1748-
// Use only source columns. Missing columns in target will be handled in
1749-
// ResolveRowLevelCommandAssignments.
1746+
// Expand star to top level source columns. If source has less columns than target,
1747+
// assignments will be added by ResolveRowLevelCommandAssignments later.
17501748
val assignments = if (m.schemaEvolutionEnabled) {
1749+
// For schema evolution case, generate assignments for missing target columns.
1750+
// These columns will be added by ResolveMergeIntoTableSchemaEvolution later.
17511751
sourceTable.output.map(sourceAttr =>
17521752
findAttrInTarget(sourceAttr.name).map(
17531753
targetAttr => Assignment(targetAttr, sourceAttr))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -895,17 +895,17 @@ case class MergeIntoTable(
895895
private lazy val sourceSchemaForEvolution: StructType =
896896
MergeIntoTable.sourceSchemaForSchemaEvolution(this)
897897

898-
lazy val schemaChangesNonEmpty =
898+
lazy val schemaChangesNonEmpty: Boolean =
899899
MergeIntoTable.schemaChanges(targetTable.schema, sourceSchemaForEvolution).nonEmpty
900900

901901
lazy val needSchemaEvolution: Boolean =
902902
schemaEvolutionEnabled &&
903903
canEvaluateSchemaEvolution &&
904904
schemaChangesNonEmpty
905905

906-
// Guard that assignments are resolved or candidates for evolution before evaluating schema
907-
// evolution. We need to use resolved assignment values to check candidates, see
908-
// MergeIntoTable.assignmentForEvolutionCandidate.
906+
// Guard that assignments are either resolved or candidates for evolution before
907+
// evaluating schema evolution. We need to use resolved assignment values to check
908+
// candidates, see MergeIntoTable.sourceSchemaForSchemaEvolution for details.
909909
lazy val canEvaluateSchemaEvolution: Boolean = {
910910
if ((!targetTable.resolved) || (!sourceTable.resolved)) {
911911
false
@@ -916,10 +916,10 @@ case class MergeIntoTable(
916916
case a: InsertAction => a.assignments
917917
}.flatten
918918

919-
val evolutionPaths = MergeIntoTable.extractAllFieldPaths(sourceSchemaForEvolution)
919+
val sourcePaths = MergeIntoTable.extractAllFieldPaths(sourceTable.schema)
920920
assignments.forall { assignment =>
921921
assignment.resolved ||
922-
evolutionPaths.exists { path => MergeIntoTable.isEqual(assignment, path) }
922+
sourcePaths.exists { path => MergeIntoTable.isEqual(assignment, path) }
923923
}
924924
}
925925
}
@@ -978,11 +978,12 @@ object MergeIntoTable {
978978
case currentField: StructField if newFieldMap.contains(currentField.name) =>
979979
schemaChanges(currentField.dataType, newFieldMap(currentField.name).dataType,
980980
originalTarget, originalSource, fieldPath ++ Seq(currentField.name))
981-
}}.flatten
981+
}
982+
}.flatten
982983

983984
// Identify the newly added fields and append to the end
984985
val currentFieldMap = toFieldMap(currentFields)
985-
val adds = newFields.filterNot (f => currentFieldMap.contains (f.name))
986+
val adds = newFields.filterNot(f => currentFieldMap.contains(f.name))
986987
.map(f => TableChange.addColumn(fieldPath ++ Set(f.name), f.dataType))
987988

988989
updates ++ adds
@@ -1024,10 +1025,13 @@ object MergeIntoTable {
10241025
// A pruned version of source schema that only contains columns/nested fields
10251026
// explicitly and directly assigned to a target counterpart in MERGE INTO actions,
10261027
// which are relevant for schema evolution.
1028+
// Examples:
1029+
// * UPDATE SET target.a = source.a
1030+
// * UPDATE SET nested.a = source.nested.a
1031+
// * INSERT (a, nested.b) VALUES (source.a, source.nested.b)
10271032
// New columns/nested fields in this schema that are not existing in target schema
10281033
// will be added for schema evolution.
10291034
def sourceSchemaForSchemaEvolution(merge: MergeIntoTable): StructType = {
1030-
10311035
val actions = merge.matchedActions ++ merge.notMatchedActions
10321036
val assignments = actions.collect {
10331037
case a: UpdateAction => a.assignments
@@ -1051,7 +1055,7 @@ object MergeIntoTable {
10511055
// If this is a struct and one of the children is being assigned to in a merge clause,
10521056
// keep it and continue filtering children.
10531057
case struct: StructType if assignments.exists(assign =>
1054-
isPrefix(fieldPath, extractFieldPath(assign.key))) =>
1058+
isPrefix(fieldPath, extractFieldPath(assign.key, allowUnresolved = true))) =>
10551059
Some(field.copy(dataType = filterSchema(struct, fieldPath)))
10561060
// The field isn't assigned to directly or indirectly (i.e. its children) in any non-*
10571061
// clause. Check if it should be kept with any * action.
@@ -1080,12 +1084,14 @@ object MergeIntoTable {
10801084
}
10811085

10821086
// Helper method to extract field path from an Expression.
1083-
private def extractFieldPath(expr: Expression): Seq[String] = expr match {
1084-
case UnresolvedAttribute(nameParts) => nameParts
1085-
case a: AttributeReference => Seq(a.name)
1086-
case GetStructField(child, ordinal, nameOpt) =>
1087-
extractFieldPath(child) :+ nameOpt.getOrElse(s"col$ordinal")
1088-
case _ => Seq.empty
1087+
private def extractFieldPath(expr: Expression, allowUnresolved: Boolean): Seq[String] = {
1088+
expr match {
1089+
case UnresolvedAttribute(nameParts) if allowUnresolved => nameParts
1090+
case a: AttributeReference => Seq(a.name)
1091+
case GetStructField(child, ordinal, nameOpt) =>
1092+
extractFieldPath(child, allowUnresolved) :+ nameOpt.getOrElse(s"col$ordinal")
1093+
case _ => Seq.empty
1094+
}
10891095
}
10901096

10911097
// Helper method to check if a given field path is a prefix of another path.
@@ -1095,18 +1101,16 @@ object MergeIntoTable {
10951101
SQLConf.get.resolver(prefixNamePart, pathNamePart)
10961102
}
10971103

1098-
// Helper method to check if a given field path is a suffix of another path.
1099-
private def isSuffix(suffix: Seq[String], path: Seq[String]): Boolean =
1100-
isPrefix(suffix.reverse, path.reverse)
1101-
11021104
// Helper method to check if an assignment key is equal to a source column
1103-
// and if the assignment value is the corresponding source column directly
1104-
private def isEqual(assignment: Assignment, path: Seq[String]): Boolean = {
1105-
val assignmenKeyExpr = extractFieldPath(assignment.key)
1106-
val assignmentValueExpr = extractFieldPath(assignment.value)
1107-
// Valid assignments are: col = s.col or col.nestedField = s.col.nestedField
1108-
assignmenKeyExpr.length == path.length && isPrefix(assignmenKeyExpr, path) &&
1109-
isSuffix(path, assignmentValueExpr)
1105+
// and if the assignment value is that same source column.
1106+
// Example: UPDATE SET target.a = source.a
1107+
private def isEqual(assignment: Assignment, sourceFieldPath: Seq[String]): Boolean = {
1108+
// key must be a non-qualified field path that may be added to target schema via evolution
1109+
val assignmenKeyExpr = extractFieldPath(assignment.key, allowUnresolved = true)
1110+
// value should always be resolved (from source)
1111+
val assignmentValueExpr = extractFieldPath(assignment.value, allowUnresolved = false)
1112+
assignmenKeyExpr == assignmentValueExpr &&
1113+
assignmenKeyExpr == sourceFieldPath
11101114
}
11111115
}
11121116

0 commit comments

Comments
 (0)