-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-54157][SQL] Fix refresh of DSv2 tables in Dataset #52920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| } | ||
| } | ||
|
|
||
| // refresh table versions before looking up cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we need a new stage because this must be done after analysis but before we normalize the plan for cache lookup. It would be great to use FinishAnalysis in the optimizer but I feel like it is too late.
I also worry about just doing this refresh all the time. We may remember the analysis finish time and only refresh if it is older than 100ms or so to not refresh unnecessary.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala
Show resolved
Hide resolved
| errors += s"`${originCol.name}` type has changed from $oldType to $newType" | ||
| } | ||
| case None => | ||
| errors += s"${formatColumn(originCol)} is missing" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be s"${formatColumn(originCol)} has been deleted" to be consistent with the other error message?
| } | ||
|
|
||
| // refresh table versions before looking up cache | ||
| private val lazyTableVersionsPinned = LazyTry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private val lazyTableVersionsPinned = LazyTry { | |
| private val lazyTableVersionsRefreshed = LazyTry { |
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala
Show resolved
Hide resolved
| "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", | ||
| "errors" -> | ||
| ("\n- `person` type has changed from STRUCT<name: STRING, age: INT> " + | ||
| "to STRUCT<name: STRING, age: INT, city: STRING>"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the error message will be hard to read with super wide or deeply nested struct types. I think we should perform the check recursively and point to the exact nested fields in the error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree but it would be error-prone to iterate field by field. I would probably address this in a separate PR as this one is already pretty large and tricky.
7b1fade to
0036037
Compare
|
|
||
| // refresh table versions before cache lookup | ||
| private val lazyTableVersionsRefreshed = LazyTry { | ||
| if (QueryExecution.lastExecutionId != id || TableRefreshUtil.shouldRefresh(commandExecuted)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does QueryExecution.lastExecutionId != id indicate here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To trigger refresh if there were any query executions between this Dataset analysis and execution. For instance, we must always refresh if there is ALTER in between.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is to refresh always unless Dataset is created and executed immediately without any temporary steps in between.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this make sense, @cloud-fan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It kind of make sense but I don't fully agree. I think the chance is low that we need to refresh the tables after a new execution. It may be a scan query execution or maybe altering other tables. This hurts perf a lot for a busy cluster serving many short queries at the same time.
I think a simple time-based refresh policy is good enough.
| val freshTable = cache.getOrElseUpdate((catalog, ident), { | ||
| val tableName = V2TableUtil.toQualifiedName(catalog, ident) | ||
| logDebug(s"Refreshing table metadata for $tableName") | ||
| catalog.loadTable(ident) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if any table needs refresh, we refresh all the tables in the plan, is it intentional? shall we record the tables that need to be refreshed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this was intentional to be safe in case there are dependent operations that modify multiple tables at the same time. It is safer to always refresh everything (keep in mind this is only for versioned tables where refresh is cheap).
0036037 to
e85e600
Compare
What changes were proposed in this pull request?
This PR fixes refresh of DSv2 tables in Dataset.
Why are the changes needed?
Prior to this change, Spark would pin the version of DSv2 tables at load/resolution time. Any changes within the session will not be propagated to the analyzed but not yet executed Dataset, breaking the behavior compared to DSv1 tables. Changes in this PR are needed for the following reasons:
Does this PR introduce any user-facing change?
Yes, but this PR makes DSv2 Table behavior match the expected Spark semantics.
How was this patch tested?
This PR comes with tests.
Was this patch authored or co-authored using generative AI tooling?
No.