Skip to content

Commit ccd330c

Browse files
committed
[WIP][SPARK-53924] Reload DSv2 tables in views created using plans on each view access
1 parent 8c76795 commit ccd330c

File tree

7 files changed

+282
-1
lines changed

7 files changed

+282
-1
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5700,6 +5700,15 @@
57005700
],
57015701
"sqlState" : "42P01"
57025702
},
5703+
"TABLE_REFERENCE_TEMP_VIEW_COLUMN_MISMATCH" : {
5704+
"message" : [
5705+
"View <viewName> plan references table <tableName> whose <colType> columns changed since the view plan was initially captured.",
5706+
"Mismatched columns:",
5707+
"<mismatchDetails>",
5708+
"This indicates the table has evolved and the view based on the plan must be recreated."
5709+
],
5710+
"sqlState" : "KD000"
5711+
},
57035712
"TABLE_VALUED_ARGUMENTS_NOT_YET_IMPLEMENTED_FOR_SQL_FUNCTIONS" : {
57045713
"message" : [
57055714
"Cannot <action> SQL user-defined function <functionName> with TABLE arguments because this functionality is not yet implemented."

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
12241224
case u: UnresolvedRelation =>
12251225
resolveRelation(u).map(resolveViews(_, u.options)).getOrElse(u)
12261226

1227+
case r: TableReference =>
1228+
relationResolution.resolveReference(r)
1229+
12271230
case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version)
12281231
if timestamp.forall(ts => ts.resolved && !SubqueryExpression.hasSubquery(ts)) =>
12291232
val timeTravelSpec = TimeTravelSpec.create(timestamp, version, conf.sessionLocalTimeZone)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,43 @@ class RelationResolution(override val catalogManager: CatalogManager)
227227
}
228228
}
229229

230+
def resolveReference(ref: TableReference): LogicalPlan = {
231+
val relation = getOrLoadRelation(ref)
232+
val planId = ref.getTagValue(LogicalPlan.PLAN_ID_TAG)
233+
cloneWithPlanId(relation, planId)
234+
}
235+
236+
private def getOrLoadRelation(ref: TableReference): LogicalPlan = {
237+
val key = toCacheKey(ref.catalog, ref.identifier)
238+
relationCache.get(key) match {
239+
case Some(cached) =>
240+
alignCachedRelationState(cached, ref)
241+
case None =>
242+
val relation = loadRelation(ref)
243+
relationCache.update(key, relation)
244+
relation
245+
}
246+
}
247+
248+
private def loadRelation(ref: TableReference): LogicalPlan = {
249+
val table = ref.catalog.loadTable(ref.identifier)
250+
val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
251+
SubqueryAlias(tableName, ref.toRelation(table))
252+
}
253+
254+
private def alignCachedRelationState(cached: LogicalPlan, ref: TableReference): LogicalPlan = {
255+
cached transform {
256+
case r: DataSourceV2Relation if matchesReference(r, ref) =>
257+
r.copy(output = ref.output, options = ref.options)
258+
}
259+
}
260+
261+
private def matchesReference(
262+
relation: DataSourceV2Relation,
263+
ref: TableReference): Boolean = {
264+
relation.catalog.contains(ref.catalog) && relation.identifier.contains(ref.identifier)
265+
}
266+
230267
private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty
231268

232269
private def isReferredTempViewName(nameParts: Seq[String]): Boolean = {
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.SparkException
23+
import org.apache.spark.sql.catalyst.SQLConfHelper
24+
import org.apache.spark.sql.catalyst.analysis.TableReference.Context
25+
import org.apache.spark.sql.catalyst.analysis.TableReference.TableInfo
26+
import org.apache.spark.sql.catalyst.analysis.TableReference.TemporaryViewContext
27+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
28+
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
29+
import org.apache.spark.sql.catalyst.plans.logical.Statistics
30+
import org.apache.spark.sql.catalyst.util.{truncatedString, MetadataColumnHelper}
31+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
32+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
33+
import org.apache.spark.sql.connector.catalog.Column
34+
import org.apache.spark.sql.connector.catalog.Identifier
35+
import org.apache.spark.sql.connector.catalog.MetadataColumn
36+
import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns
37+
import org.apache.spark.sql.connector.catalog.Table
38+
import org.apache.spark.sql.connector.catalog.TableCatalog
39+
import org.apache.spark.sql.errors.QueryCompilationErrors
40+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
41+
import org.apache.spark.sql.types.DataType
42+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
43+
import org.apache.spark.util.ArrayImplicits._
44+
45+
case class TableReference private (
46+
catalog: TableCatalog,
47+
identifier: Identifier,
48+
options: CaseInsensitiveStringMap,
49+
info: TableInfo,
50+
output: Seq[AttributeReference],
51+
context: Context)
52+
extends LeafNode with MultiInstanceRelation with NamedRelation {
53+
54+
override def name: String = {
55+
s"${catalog.name()}.${identifier.quoted}"
56+
}
57+
58+
override def newInstance(): TableReference = {
59+
copy(output = output.map(_.newInstance()))
60+
}
61+
62+
override def computeStats(): Statistics = Statistics.DUMMY
63+
64+
override def simpleString(maxFields: Int): String = {
65+
val outputString = truncatedString(output, "[", ", ", "]", maxFields)
66+
s"TableReference$outputString $name"
67+
}
68+
69+
def toRelation(table: Table): DataSourceV2Relation = {
70+
TableReferenceUtils.validateLoadedTable(table, this)
71+
DataSourceV2Relation(table, output, Some(catalog), Some(identifier), options)
72+
}
73+
}
74+
75+
object TableReference {
76+
77+
case class TableInfo(columns: Seq[Column])
78+
79+
sealed trait Context
80+
case class TemporaryViewContext(viewName: Seq[String]) extends Context
81+
82+
def createForTempView(relation: DataSourceV2Relation, viewName: Seq[String]): TableReference = {
83+
create(relation, TemporaryViewContext(viewName))
84+
}
85+
86+
private def create(relation: DataSourceV2Relation, context: Context): TableReference = {
87+
val ref = TableReference(
88+
relation.catalog.get.asTableCatalog,
89+
relation.identifier.get,
90+
relation.options,
91+
TableInfo(relation.table.columns.toImmutableArraySeq),
92+
relation.output,
93+
context)
94+
ref.copyTagsFrom(relation)
95+
ref
96+
}
97+
}
98+
99+
object TableReferenceUtils extends SQLConfHelper {
100+
101+
def validateLoadedTable(table: Table, ref: TableReference): Unit = {
102+
ref.context match {
103+
case ctx: TemporaryViewContext =>
104+
validateLoadedTableInTempView(table, ref, ctx)
105+
case ctx =>
106+
throw SparkException.internalError(s"Unknown table ref context: ${ctx.getClass.getName}")
107+
}
108+
}
109+
110+
private def validateLoadedTableInTempView(
111+
table: Table,
112+
ref: TableReference,
113+
ctx: TemporaryViewContext): Unit = {
114+
val dataColMismatches = validateDataColumns(table, ref.info.columns)
115+
if (dataColMismatches.nonEmpty) {
116+
throw QueryCompilationErrors.tableReferenceTempViewColumnMismatchError(
117+
ctx.viewName,
118+
ref.identifier.toQualifiedNameParts(ref.catalog),
119+
dataColMismatches.mkString("\n"))
120+
}
121+
122+
val metaOutput = ref.output.filter(_.isMetadataCol)
123+
if (metaOutput.nonEmpty) {
124+
val metaColMismatches = validateMetadataColumns(table, metaOutput)
125+
if (metaColMismatches.nonEmpty) {
126+
throw QueryCompilationErrors.tableReferenceTempViewMetadataColumnMismatchError(
127+
ctx.viewName,
128+
ref.identifier.toQualifiedNameParts(ref.catalog),
129+
metaColMismatches.mkString("\n"))
130+
}
131+
}
132+
}
133+
134+
private def validateDataColumns(table: Table, refDataCols: Seq[Column]): Seq[String] = {
135+
val mismatches = mutable.ArrayBuffer[String]()
136+
137+
refDataCols.foreach { refCol =>
138+
table.columns.find(c => conf.resolver(c.name, refCol.name)) match {
139+
case Some(col) =>
140+
if (refCol.dataType != col.dataType || refCol.nullable != col.nullable) {
141+
val refType = formatType(refCol.dataType, refCol.nullable)
142+
val newType = formatType(col.dataType, col.nullable)
143+
mismatches += s"- ${refCol.name} has evolved from $refType to $newType"
144+
}
145+
case None =>
146+
mismatches += s"- ${refCol.name} is missing"
147+
}
148+
}
149+
150+
table.columns.foreach { col =>
151+
if (!refDataCols.exists(refCol => conf.resolver(col.name, refCol.name))) {
152+
mismatches += s"- ${col.name} has been added"
153+
}
154+
}
155+
156+
mismatches.toSeq
157+
}
158+
159+
private def validateMetadataColumns(
160+
table: Table,
161+
metaOutput: Seq[AttributeReference]): Seq[String] = {
162+
val mismatches = mutable.ArrayBuffer[String]()
163+
val metaCols = metadataColumns(table)
164+
165+
metaOutput.foreach { metaAttr =>
166+
metaCols.find(c => conf.resolver(c.name, metaAttr.name)) match {
167+
case Some(col) =>
168+
if (metaAttr.dataType != col.dataType || metaAttr.nullable != col.isNullable) {
169+
val refType = formatType(metaAttr.dataType, metaAttr.nullable)
170+
val newType = formatType(col.dataType, col.isNullable)
171+
mismatches += s"- ${metaAttr.name} has evolved from $refType to $newType"
172+
}
173+
case None =>
174+
mismatches += s"- ${metaAttr.name} is missing (metadata column)"
175+
}
176+
}
177+
178+
mismatches.toSeq
179+
}
180+
181+
private def metadataColumns(table: Table): Array[MetadataColumn] = table match {
182+
case hasMeta: SupportsMetadataColumns => hasMeta.metadataColumns
183+
case _ => Array.empty
184+
}
185+
186+
private def formatType(dataType: DataType, nullable: Boolean): String = {
187+
val nullableDDL = if (nullable) "" else " NOT NULL"
188+
s"${dataType.sql}$nullableDDL"
189+
}
190+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4365,4 +4365,30 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
43654365
origin = origin
43664366
)
43674367
}
4368+
4369+
def tableReferenceTempViewColumnMismatchError(
4370+
viewName: Seq[String],
4371+
tableName: Seq[String],
4372+
mismatchDetails: String): Throwable = {
4373+
new AnalysisException(
4374+
errorClass = "TABLE_REFERENCE_TEMP_VIEW_COLUMN_MISMATCH",
4375+
messageParameters = Map(
4376+
"viewName" -> toSQLId(viewName),
4377+
"tableName" -> toSQLId(tableName),
4378+
"colType" -> "data",
4379+
"mismatchDetails" -> mismatchDetails))
4380+
}
4381+
4382+
def tableReferenceTempViewMetadataColumnMismatchError(
4383+
viewName: Seq[String],
4384+
tableName: Seq[String],
4385+
mismatchDetails: String): Throwable = {
4386+
new AnalysisException(
4387+
errorClass = "TABLE_REFERENCE_TEMP_VIEW_COLUMN_MISMATCH",
4388+
messageParameters = Map(
4389+
"viewName" -> toSQLId(viewName),
4390+
"tableName" -> toSQLId(tableName),
4391+
"colType" -> "metadata",
4392+
"mismatchDetails" -> mismatchDetails))
4393+
}
43684394
}

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2222
import org.apache.spark.internal.{Logging, MessageWithContext}
2323
import org.apache.spark.internal.LogKeys._
2424
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
25+
import org.apache.spark.sql.catalyst.analysis.TableReference
2526
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
2627
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
2728
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
@@ -250,6 +251,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
250251
val nameInCache = v2Ident.toQualifiedNameParts(catalog)
251252
isSameName(nameInCache) && (includeTimeTravel || timeTravelSpec.isEmpty)
252253

254+
case r: TableReference =>
255+
isSameName(r.identifier.toQualifiedNameParts(r.catalog))
256+
253257
case v: View =>
254258
isSameName(v.desc.identifier.nameParts)
255259

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.{Row, SparkSession}
2828
import org.apache.spark.sql.catalyst.{CapturesConfig, SQLConfHelper, TableIdentifier}
2929
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, SchemaEvolution, SchemaUnsupported, ViewSchemaMode, ViewType}
30+
import org.apache.spark.sql.catalyst.analysis.TableReference
3031
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation}
3132
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, VariableReference}
3233
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CreateTempView, CTEInChildren, CTERelationDef, LogicalPlan, Project, View, WithCTE}
3334
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
3435
import org.apache.spark.sql.classic.ClassicConversions.castToImpl
3536
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
3637
import org.apache.spark.sql.errors.QueryCompilationErrors
38+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
3739
import org.apache.spark.sql.internal.StaticSQLConf
3840
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
3941
import org.apache.spark.sql.util.SchemaUtils
@@ -733,7 +735,17 @@ object ViewHelper extends SQLConfHelper with Logging with CapturesConfig {
733735
} else {
734736
TemporaryViewRelation(
735737
prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan, defaultCollation),
736-
Some(aliasedPlan))
738+
Some(prepareTemporaryViewPlan(name, aliasedPlan)))
739+
}
740+
}
741+
742+
private def prepareTemporaryViewPlan(
743+
viewName: TableIdentifier,
744+
plan: LogicalPlan): LogicalPlan = {
745+
plan transform {
746+
case r: DataSourceV2Relation
747+
if r.catalog.isDefined && r.identifier.isDefined && r.timeTravelSpec.isEmpty =>
748+
TableReference.createForTempView(r, viewName.nameParts)
737749
}
738750
}
739751

0 commit comments

Comments
 (0)