Skip to content

Commit f57a473

Browse files
szehon-hocloud-fan
authored andcommitted
[SPARK-53546][TESTS][FOLLOW-UP] Fix nested array schema evolution and style for InMemoryBaseTable
### What changes were proposed in this pull request? Fix nested array struct schema evolution case for InMemoryDataSource, also style fixes. ### Why are the changes needed? Address style review comment (#52299 (comment)) and a small bug fix for #52299 (test-only) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Ran affected unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #52359 from szehon-ho/in_memory_schema_evolution_fix. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 33e40b7 commit f57a473

File tree

2 files changed

+25
-22
lines changed

2 files changed

+25
-22
lines changed

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,9 @@ abstract class InMemoryBaseTable(
351351
this
352352
}
353353

354-
def alterTableWithData(data: Array[BufferedRows],
355-
newSchema: StructType): InMemoryBaseTable = {
354+
def alterTableWithData(
355+
data: Array[BufferedRows],
356+
newSchema: StructType): InMemoryBaseTable = {
356357
data.foreach { bufferedRow =>
357358
val oldSchema = bufferedRow.schema
358359
bufferedRow.rows.foreach { row =>
@@ -757,8 +758,7 @@ object InMemoryBaseTable {
757758
* @param key partition key
758759
* @param schema schema used to write the rows
759760
*/
760-
class BufferedRows(val key: Seq[Any],
761-
val schema: StructType)
761+
class BufferedRows(val key: Seq[Any], val schema: StructType)
762762
extends WriterCommitMessage
763763
with InputPartition with HasPartitionKey with HasPartitionStatistics with Serializable {
764764
val log = new mutable.ArrayBuffer[InternalRow]()
@@ -892,8 +892,8 @@ private class BufferedRowsReader(
892892
if (arrayData == null) {
893893
null
894894
} else {
895-
extractArrayValue(arrayData, elementType,
896-
readSchema.fields(readIndex).dataType)
895+
val writeType = writeSchema.fields(writeIndex).dataType.asInstanceOf[ArrayType]
896+
extractArrayValue(arrayData, elementType, writeType.elementType)
897897
}
898898

899899
case dt =>
@@ -906,19 +906,21 @@ private class BufferedRowsReader(
906906
}
907907
}
908908

909-
private def extractArrayValue(arrayData: ArrayData,
910-
readType: DataType,
911-
writeType: DataType): ArrayData = {
909+
private def extractArrayValue(
910+
arrayData: ArrayData,
911+
readType: DataType,
912+
writeType: DataType): ArrayData = {
912913
val elements = arrayData.toArray[Any](readType)
913914
val convertedElements = extractCollection(elements, readType, writeType)
914915
new GenericArrayData(convertedElements)
915916
}
916917

917-
private def extractMapValue(mapData: MapData,
918-
readKeyType: DataType,
919-
readValueType: DataType,
920-
writeKeyType: DataType,
921-
writeValueType: DataType): MapData = {
918+
private def extractMapValue(
919+
mapData: MapData,
920+
readKeyType: DataType,
921+
readValueType: DataType,
922+
writeKeyType: DataType,
923+
writeValueType: DataType): MapData = {
922924
val keys = mapData.keyArray().toArray[Any](readKeyType)
923925
val values = mapData.valueArray().toArray[Any](readValueType)
924926

@@ -927,9 +929,10 @@ private class BufferedRowsReader(
927929
ArrayBasedMapData(convertedKeys, convertedValues)
928930
}
929931

930-
private def extractCollection(elements: Array[Any],
931-
readType: DataType,
932-
writeType: DataType) = {
932+
private def extractCollection(
933+
elements: Array[Any],
934+
readType: DataType,
935+
writeType: DataType) = {
933936
(readType, writeType) match {
934937
case (readSt: StructType, writeSt: StructType) =>
935938
elements.map { elem =>

sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2511,7 +2511,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
25112511
}
25122512
}
25132513

2514-
test("merge into schema evolution add column with nested field and set all columns") {
2514+
test("merge into schema evolution add column with nested struct and set all columns") {
25152515
Seq(true, false).foreach { withSchemaEvolution =>
25162516
withTempView("source") {
25172517
createAndInitTable(
@@ -2570,7 +2570,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
25702570
}
25712571
}
25722572

2573-
test("merge into schema evolution replace column with nested field and set explicit columns") {
2573+
test("merge into schema evolution replace column with nested struct and set explicit columns") {
25742574
Seq(true, false).foreach { withSchemaEvolution =>
25752575
withTempView("source") {
25762576
createAndInitTable(
@@ -2631,7 +2631,8 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
26312631
}
26322632
}
26332633

2634-
// TODO- support schema evolution for missing nested types using UPDATE SET * and INSERT *
2634+
// currently the source struct needs to be fully compatible with target struct
2635+
// i.e. cannot remove a nested field
26352636
test("merge into schema evolution replace column with nested field and set all columns") {
26362637
Seq(true, false).foreach { withSchemaEvolution =>
26372638
withTempView("source") {
@@ -2723,8 +2724,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
27232724
sql(mergeStmt)
27242725
checkAnswer(
27252726
sql(s"SELECT * FROM $tableNameAsString"),
2726-
// TODO- InMemoryBaseTable does not return null for nested schema evolution.
2727-
Seq(Row(0, Array(Row(1, "a", true), Row(2, "b", true)), "sales"),
2727+
Seq(Row(0, Array(Row(1, "a", null), Row(2, "b", null)), "sales"),
27282728
Row(1, Array(Row(10, "c", true), Row(20, "d", false)), "hr"),
27292729
Row(2, Array(Row(30, "d", false), Row(40, "e", true)), "engineering")))
27302730
} else {

0 commit comments

Comments
 (0)