Skip to content

Commit 0036037

Browse files
committed
[SPARK-54157][SQL] Fix refresh of DSv2 tables in Dataset
1 parent 8c76795 commit 0036037

File tree

13 files changed

+1082
-14
lines changed

13 files changed

+1082
-14
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2165,6 +2165,31 @@
21652165
],
21662166
"sqlState" : "42000"
21672167
},
2168+
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS" : {
2169+
"message" : [
2170+
"Detected incompatible changes to table <tableName> after DataFrame/Dataset has been resolved and analyzed, meaning the underlying plan is out of sync. Please, re-create DataFrame/Dataset before attempting to execute the query again."
2171+
],
2172+
"subClass" : {
2173+
"COLUMNS_MISMATCH" : {
2174+
"message" : [
2175+
"Data columns have changed:",
2176+
"<errors>"
2177+
]
2178+
},
2179+
"METADATA_COLUMNS_MISMATCH" : {
2180+
"message" : [
2181+
"Metadata columns have changed:",
2182+
"<errors>"
2183+
]
2184+
},
2185+
"TABLE_ID_MISMATCH" : {
2186+
"message" : [
2187+
"Table ID has changed from <capturedTableId> to <detectedTableId>."
2188+
]
2189+
}
2190+
},
2191+
"sqlState" : "51024"
2192+
},
21682193
"INCOMPATIBLE_VIEW_SCHEMA_CHANGE" : {
21692194
"message" : [
21702195
"The SQL query of view <viewName> has an incompatible schema change and column <colName> cannot be resolved. Expected <expectedNum> columns named <colName> but got <actualCols>.",

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,15 @@ public interface Table {
5050
*/
5151
String name();
5252

53+
/**
54+
* An ID of the table that can be used to reliably check if two table objects refer to the same
55+
* metastore entity. If a table is dropped and recreated again with the same name, the new table ID
56+
* must be different. This method must return null if connectors don't support the notion of table ID.
57+
*/
58+
default String id() {
59+
return null;
60+
}
61+
5362
/**
5463
* Returns the schema of this table. If the table is not readable and doesn't have a schema, an
5564
* empty schema can be returned here.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,10 @@ class RelationResolution(override val catalogManager: CatalogManager)
203203
)
204204
)
205205
} else {
206-
SubqueryAlias(
207-
catalog.name +: ident.asMultipartIdentifier,
208-
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options, timeTravelSpec)
209-
)
206+
val relation = DataSourceV2Relation.create(
207+
table, Some(catalog), Some(ident), options, timeTravelSpec)
208+
relation.setLoadTimeNanos(System.nanoTime())
209+
SubqueryAlias(catalog.name +: ident.asMultipartIdentifier, relation)
210210
}
211211
}
212212
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.catalog
19+
20+
import java.util.Locale
21+
22+
import scala.collection.mutable
23+
24+
import org.apache.spark.sql.catalyst.SQLConfHelper
25+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
26+
import org.apache.spark.sql.catalyst.util.quoteIfNeeded
27+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
28+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
29+
import org.apache.spark.sql.types.DataType
30+
import org.apache.spark.util.ArrayImplicits._
31+
32+
private[sql] object V2TableUtil extends SQLConfHelper {
33+
34+
def toQualifiedName(catalog: CatalogPlugin, ident: Identifier): String = {
35+
s"${quoteIfNeeded(catalog.name)}.${ident.quoted}"
36+
}
37+
38+
/**
39+
* Validates that captured data columns match the current table schema.
40+
*
41+
* @param table the current table
42+
* @param relation the relation with captured columns
43+
* @return validation errors, or empty sequence if valid
44+
*/
45+
def validateCapturedColumns(table: Table, relation: DataSourceV2Relation): Seq[String] = {
46+
validateCapturedColumns(table, relation.table.columns.toImmutableArraySeq)
47+
}
48+
49+
/**
50+
* Validates that captured data columns match the current table schema.
51+
*
52+
* Checks for:
53+
* - Column type or nullability changes
54+
* - Removed columns (missing from the current table schema)
55+
* - Added columns (new in the current table schema)
56+
*
57+
* @param table the current table
58+
* @param originCols the originally captured columns
59+
* @return validation errors, or empty sequence if valid
60+
*/
61+
def validateCapturedColumns(table: Table, originCols: Seq[Column]): Seq[String] = {
62+
val errors = mutable.ArrayBuffer[String]()
63+
64+
val colsByNormalizedName = indexByNormalizedName(table.columns.toImmutableArraySeq)
65+
val originColsByNormalizedName = indexByNormalizedName(originCols)
66+
67+
originColsByNormalizedName.foreach { case (normalizedName, originCol) =>
68+
colsByNormalizedName.get(normalizedName) match {
69+
case Some(col) =>
70+
if (originCol.dataType != col.dataType || originCol.nullable != col.nullable) {
71+
val oldType = formatType(originCol.dataType, originCol.nullable)
72+
val newType = formatType(col.dataType, col.nullable)
73+
errors += s"`${originCol.name}` type has changed from $oldType to $newType"
74+
}
75+
case None =>
76+
errors += s"${formatColumn(originCol)} has been removed"
77+
}
78+
}
79+
80+
colsByNormalizedName.foreach { case (normalizedName, col) =>
81+
if (!originColsByNormalizedName.contains(normalizedName)) {
82+
errors += s"${formatColumn(col)} has been added"
83+
}
84+
}
85+
86+
errors.toSeq
87+
}
88+
89+
/**
90+
* Validates that captured metadata columns match the current table schema.
91+
*
92+
* Checks for:
93+
* - Metadata column type or nullability changes
94+
* - Removed metadata columns (missing from current table)
95+
*
96+
* @param table the current table
97+
* @param metaAttrs the originally captured metadata column attributes
98+
* @return validation errors, or empty sequence if valid
99+
*/
100+
def validateCapturedMetadataColumns(
101+
table: Table,
102+
metaAttrs: Seq[AttributeReference]): Seq[String] = {
103+
val errors = mutable.ArrayBuffer[String]()
104+
val metaColsByNormalizedName = metadataColumnsByNormalizedName(table)
105+
106+
metaAttrs.foreach { metaAttr =>
107+
val normalizedName = normalize(metaAttr.name)
108+
metaColsByNormalizedName.get(normalizedName) match {
109+
case Some(metaCol) =>
110+
if (metaAttr.dataType != metaCol.dataType || metaAttr.nullable != metaCol.isNullable) {
111+
val oldType = formatType(metaAttr.dataType, metaAttr.nullable)
112+
val newType = formatType(metaCol.dataType, metaCol.isNullable)
113+
errors += s"`${metaAttr.name}` type has changed from $oldType to $newType"
114+
}
115+
case None =>
116+
errors += s"${formatAttr(metaAttr)} has been removed"
117+
}
118+
}
119+
120+
errors.toSeq
121+
}
122+
123+
private def metadataColumnsByNormalizedName(table: Table): Map[String, MetadataColumn] = {
124+
table match {
125+
case hasMeta: SupportsMetadataColumns =>
126+
hasMeta.metadataColumns.map(col => normalize(col.name) -> col).toMap
127+
case _ =>
128+
Map.empty
129+
}
130+
}
131+
132+
private def formatColumn(col: Column): String = {
133+
s"`${col.name}` ${formatType(col.dataType, col.nullable)}"
134+
}
135+
136+
private def formatAttr(attr: AttributeReference): String = {
137+
s"`${attr.name}` ${formatType(attr.dataType, attr.nullable)}"
138+
}
139+
140+
private def formatType(dataType: DataType, nullable: Boolean): String = {
141+
if (nullable) dataType.sql else s"${dataType.sql} NOT NULL"
142+
}
143+
144+
private def indexByNormalizedName(cols: Seq[Column]): Map[String, Column] = {
145+
cols.map(col => normalize(col.name) -> col).toMap
146+
}
147+
148+
private def normalize(name: String): String = {
149+
if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
150+
}
151+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2113,6 +2113,38 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
21132113
}
21142114
}
21152115

2116+
def tableIdChangedAfterAnalysis(
2117+
tableName: String,
2118+
capturedTableId: String,
2119+
detectedTableId: String): Throwable = {
2120+
new AnalysisException(
2121+
errorClass = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.TABLE_ID_MISMATCH",
2122+
messageParameters = Map(
2123+
"tableName" -> toSQLId(tableName),
2124+
"capturedTableId" -> capturedTableId,
2125+
"detectedTableId" -> detectedTableId))
2126+
}
2127+
2128+
def columnsChangedAfterAnalysis(
2129+
tableName: String,
2130+
errors: Seq[String]): Throwable = {
2131+
new AnalysisException(
2132+
errorClass = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
2133+
messageParameters = Map(
2134+
"tableName" -> toSQLId(tableName),
2135+
"errors" -> errors.mkString("\n- ", "\n- ", "")))
2136+
}
2137+
2138+
def metadataColumnsChangedAfterAnalysis(
2139+
tableName: String,
2140+
errors: Seq[String]): Throwable = {
2141+
new AnalysisException(
2142+
errorClass = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.METADATA_COLUMNS_MISMATCH",
2143+
messageParameters = Map(
2144+
"tableName" -> toSQLId(tableName),
2145+
"errors" -> errors.mkString("\n- ", "\n- ", "")))
2146+
}
2147+
21162148
def numberOfPartitionsNotAllowedWithUnspecifiedDistributionError(): Throwable = {
21172149
new AnalysisException(
21182150
errorClass = "INVALID_WRITE_DISTRIBUTION.PARTITION_NUM_WITH_UNSPECIFIED_DISTRIBUTION",

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
2222
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression, SortOrder}
2323
import org.apache.spark.sql.catalyst.plans.QueryPlan
2424
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics}
25+
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
2526
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
26-
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, truncatedString, CharVarcharUtils}
27-
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability}
27+
import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
28+
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability, TableCatalog, V2TableUtil}
29+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
2830
import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics}
2931
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
3032
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -57,9 +59,8 @@ abstract class DataSourceV2RelationBase(
5759
}
5860

5961
override def name: String = {
60-
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
6162
(catalog, identifier) match {
62-
case (Some(cat), Some(ident)) => s"${quoteIfNeeded(cat.name())}.${ident.quoted}"
63+
case (Some(cat), Some(ident)) => V2TableUtil.toQualifiedName(cat, ident)
6364
case _ => table.name()
6465
}
6566
}
@@ -133,6 +134,27 @@ case class DataSourceV2Relation(
133134

134135
def autoSchemaEvolution(): Boolean =
135136
table.capabilities().contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION)
137+
138+
/**
139+
* Sets the load time (in nanoseconds) for the table in this relation.
140+
* This is used to track when the table metadata was loaded from the catalog,
141+
* allowing refresh logic to determine if the table information is stale.
142+
*
143+
* @param nanos the load time in nanoseconds (typically from System.nanoTime())
144+
*/
145+
def setLoadTimeNanos(nanos: Long): Unit = {
146+
setTagValue(DataSourceV2Relation.TABLE_LOAD_TIME_TAG, nanos)
147+
}
148+
149+
/**
150+
* Returns the load time (in nanoseconds) for the table in this relation, if available.
151+
* Returns None if the load time has not been set.
152+
*
153+
* @return load time if available, None otherwise
154+
*/
155+
def loadTimeNanos: Option[Long] = {
156+
getTagValue(DataSourceV2Relation.TABLE_LOAD_TIME_TAG)
157+
}
136158
}
137159

138160
/**
@@ -259,17 +281,23 @@ object ExtractV2Table {
259281
}
260282

261283
object ExtractV2CatalogAndIdentifier {
262-
def unapply(relation: DataSourceV2Relation): Option[(CatalogPlugin, Identifier)] = {
284+
def unapply(relation: DataSourceV2Relation): Option[(TableCatalog, Identifier)] = {
263285
relation match {
264286
case DataSourceV2Relation(_, _, Some(catalog), Some(identifier), _, _) =>
265-
Some((catalog, identifier))
287+
Some((catalog.asTableCatalog, identifier))
266288
case _ =>
267289
None
268290
}
269291
}
270292
}
271293

272294
object DataSourceV2Relation {
295+
/**
296+
* Tag for tracking when the table metadata was loaded from the catalog.
297+
* Used by version refresh logic to determine if table information is stale.
298+
*/
299+
private[sql] val TABLE_LOAD_TIME_TAG = TreeNodeTag[Long]("table_load_time")
300+
273301
def create(
274302
table: Table,
275303
catalog: Option[CatalogPlugin],

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2060,6 +2060,17 @@ object SQLConf {
20602060
.booleanConf
20612061
.createWithDefault(false)
20622062

2063+
val TABLE_METADATA_MAX_AGE =
2064+
buildConf("spark.sql.analyzer.tableMetadataMaxAge")
2065+
.doc("Maximum age in milliseconds for analyzed table metadata. Table metadata is " +
2066+
"considered valid for this duration after being loaded during analysis. If execution " +
2067+
"is delayed beyond this threshold, metadata will be refreshed to ensure consistency " +
2068+
"and detect schema changes. Note this config is only supported for versioned tables " +
2069+
"that expose their versions to Spark and is only checked during the first execution.")
2070+
.version("4.1.0")
2071+
.timeConf(TimeUnit.MILLISECONDS)
2072+
.createWithDefault(100L)
2073+
20632074
val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets")
20642075
.doc("The maximum number of buckets allowed.")
20652076
.version("2.4.0")
@@ -7137,6 +7148,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
71377148

71387149
def bucketingMaxBuckets: Int = getConf(SQLConf.BUCKETING_MAX_BUCKETS)
71397150

7151+
def tableMetadataMaxAge: Long = getConf(SQLConf.TABLE_METADATA_MAX_AGE)
7152+
71407153
def autoBucketedScanEnabled: Boolean = getConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED)
71417154

71427155
def v2BucketingEnabled: Boolean = getConf(SQLConf.V2_BUCKETING_ENABLED)

0 commit comments

Comments
 (0)