Skip to content

Conversation

@szehon-ho
Copy link
Member

@szehon-ho szehon-ho commented Nov 4, 2025

What changes were proposed in this pull request?

Change MERGE INTO schema evolution scope. Limit the scope of schema evolution to only add columns/nested fields that exist in source and which are directly assigned to the source column without transformation.

ie,

UPDATE SET new_col = source.new_col 
UPDATE SET struct.new_field = source.struct.new_field
INSERT (old_col, new_col) VALUES (s.old_col, s.new_col)

Why are the changes needed?

#51698 added schema evolution support for MERGE INTO statements. However, it is a bit too broad. In some instances, source table may have many more fields than target tables. But user may only need a few new ones to be added to the target for the MERGE INTO statement.

Does this PR introduce any user-facing change?

No, MERGE INTO schema evolution is not yet released in Spark 4.1.

How was this patch tested?

Added many unit tests in MergeIntoTableSuiteBase

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Nov 4, 2025
@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch 3 times, most recently from 41731d2 to 6c6de51 Compare November 4, 2025 20:02
@szehon-ho
Copy link
Member Author

@cloud-fan @aokolnychyi can you take a look? i think this is an important improvement to get in before we release MERGE INTO WITH SCHEMA EVOLUTION feature in Spark 4.1, thanks!

@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch from 6c6de51 to 24b1a51 Compare November 4, 2025 20:06
|USING source s
|ON t.pk = s.pk
|WHEN MATCHED THEN
| UPDATE SET dep='software'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is weird, dep is an existing column in the target table, and we for sure do not need to do schema evolution. What was the behavior before this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh its because source table has more colunns but they are not used..

|ON t.pk = s.pk
|WHEN NOT MATCHED THEN
| INSERT (pk, info, dep) VALUES (s.pk,
| named_struct('salary', s.info.salary, 'status', 'active'), 'marketing')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we trigger schema evolution for this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discuss offline, refine the logic to be more selective (direct assignment to source column)

@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch from 448bfdf to 8ecc4ad Compare November 6, 2025 22:59
@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch from 8ecc4ad to abbeb1e Compare November 6, 2025 23:02
private lazy val sourceSchemaForEvolution: StructType =
MergeIntoTable.sourceSchemaForSchemaEvolution(this)

lazy val needSchemaEvolution: Boolean = {
Copy link
Contributor

@cloud-fan cloud-fan Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the rule ResolveMergeIntoSchemaEvolution should be triggered as long as MergeIntoTable#schemaEvolutionEnabled is true. These complicated logic should be moved into ResolveMergeIntoSchemaEvolution and the rule returns the merge command unchanged if schema evolution is not needed.

To make ResolveMergeIntoSchemaEvolution more reliable about rule orders, we should wait for the merge assignment values to be resolved before entering the rule. At the beginning of the rule, resolve the merge assignment keys again to make sure rule order does not matter. We can stop earlier if the assignment values are not pure field reference and there is no star.

val assignmentValueExpr = extractFieldPath(assignment.value)
// Valid assignments are: col = s.col or col.nestedField = s.col.nestedField
assignmenKeyExpr.length == path.length && isPrefix(assignmenKeyExpr, path) &&
isSuffix(path, assignmentValueExpr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this only to skip the source table qualifier? it seems wrong to trigger schema evolution for col = wrong_table.col which should fail analysis without schema evolution.

@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch from 1a782ff to f23a985 Compare November 9, 2025 03:16
@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch from fb77395 to 218b1c9 Compare November 10, 2025 06:07
@szehon-ho
Copy link
Member Author

szehon-ho commented Nov 10, 2025

Discuss offline with @cloud-fan. The new condition unfortunately forces us to move ResolveMergeIntoSchamEvolution to be evaluated after an initial pass of ResolveReferences. Because we are checking value assignments (from source table), it is important that these are resolved so we can be sure they mean its an assignment from there. It is now a bit more complex workflow:

Overall the logic is:

  1. ResolveReferences resolves all columns it can. But leave unresolved assignment because it can be an assignment key that does not exist yet in the target schema, and will be added later in schema evolution.
  2. ResolveMergeIntoSchemaEvolution now runs after ResolveReferences. It must unresolve all expressions, because they were resolved on the old table. This triggers another run of ResolveReferences.
  3. The final run of ResolveReferences will resolve all references based on the new target table.

Many changes:

  1. Change ResolveReferences to no longer eagerly throw exception on the first run, but to throw on the second run after schema evolution is evaluated.
  2. Change ResolveReferences to expand UPDATE SET * and INSERT * to fill in missing assignments for columns in source and not target (to trigger the ResolveMergeIntoSchamEvolution condition)
  3. ResolveMergeIntoSchemaEvolution: Add a guard MergeIntoTable.canEvaluateSchemaEvolution to not trigger until ResolveReferences is run the first time (it checks if all assignments are either resolved, or they are possibly solved by schema evolution)
  4. ResolveMergeIntoSchamEvolution: calculate sourceSchemaForEvolution which is the columns in source schema that will be added to target, pruning those that are directly subject of an assignment where the key does not exist in column but is referenced by a assignment value from a source column/field of the same name.
  5. ResolveMergeIntoSchamEvolution: unresolve everything. Because it runs now after ResolveReferences, we need to re-resolve everything because the target table changes. This will trigger another run of ResolveReferences.

@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch from 218b1c9 to 4050f94 Compare November 10, 2025 06:31
}
m.copy(targetTable = newTarget)

// Unresolve all references based on old target output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky, I think we can just update their attribute ids? See QueryPlan#transformUpWithNewOutput

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok done

Copy link
Member Author

@szehon-ho szehon-ho Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually originally I selectively targeted DataSourceV2Relation under targetTable field.

m.targetTable transform { 
  case r: DataSourceV2Relation => ... 
}

But now I need to run the rule on the top object (m) because the attributes to rewrite are child of m.

I could not figure out how to get it to rewrite attributes if I match m itself, ie

m transformWithNewOutput {
     case _: MergeIntoTable => _.targetTable transform { case r: DataSourceV2Relation }
}

because I think this method only populates attributeMap if the rule targets a child and now m itself? https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala#L349 unless I missed it.

m transformWithNewOutput {
   case r: DataSourceV2Relation(SupportsRowLevelOperation, ...) =>
}

So hence now I make an assumption that the DataSourceV2Relation is the target table. Currently it is the case because only target table has a SupportsRowLevelOperationTable object, but just calling this out.

// This allows unresolved assignment keys a chance to be resolved by a second pass
// by newly column/nested fields added by schema evolution.
// If schema evolution has already had a chance to run, this will be the final pass
val throws = !m.schemaEvolutionEnabled ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Not throw" is not a big deal, CheckAnalysis will fail it correctly at the end. I think it's OK to swallow the error if the target table supports schema evolution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, but i have to keep it to keep behavior for non-schema changes.

schemaEvolutionEnabled &&
MergeIntoTable.schemaChanges(targetTable.schema, sourceTable.schema).nonEmpty
canEvaluateSchemaEvolution &&
schemaChangesNonEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need this condition? If there is no schema change, then the rule ResolveMergeIntoSchemaEvolution returns the original merge command. If we need to do schema change, we calculate it twice with the current code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved. Actually future rules still need this condition to run only if no schema evolution needed, so keep the flag for those. But simplified the ResovleMergeIntoSchemaEvolution condition

@szehon-ho szehon-ho force-pushed the merge_schema_evolution_limit_cols branch from dada2b9 to ebe84ac Compare November 11, 2025 01:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants