-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Cherry-pick] Do not split commits that contain DVs in CDF streaming #1904
[Cherry-pick] Do not split commits that contain DVs in CDF streaming #1904
Conversation
- When the add and remove file entries for a commit that contains DVs gets split into different batches, we don't recognise that they belong together and we need to produce CDF by calculating the diff of the two DVS. - This PR prevents splitting of these commits, just like we do for AddCDCFile actions to prevent splitting the update_{pre/post}_image information. GitOrigin-RevId: 11438c01ecc69f7c55c3a8826fa540f6b984e4c4
@@ -1011,12 +1011,12 @@ case class DeltaSource( | |||
var commitProcessedInBatch = false | |||
|
|||
/** | |||
* This overloaded method checks if all the AddCDCFiles for a commit can be accommodated by | |||
* This overloaded method checks if all the FileActions for a commit can be accommodated by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Remove "This overloaded method"
Move the description to be @return
and describe what it does (returns).
actions.foldLeft(0L) { (l, r) => l + r.size } | ||
def admit(fileActions: Seq[FileAction]): Boolean = { | ||
def getSize(actions: Seq[FileAction]): Long = { | ||
actions.foldLeft(0L) { (l, r) => l + r.getFileSize } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Replace l
with totalFileSize
(or at least totalFileSizeSoFar
😉)
hasAddsOrRemoves(indexedFile) || hasNoFileActionAndStartIndex(indexedFile) | ||
} | ||
.filter(isValidIndexedFile(_, fromVersion, fromIndex, endOffset)) | ||
val filteredFileActions = filteredFiles.flatMap(f => Option(f.getFileAction)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flatMap(... => Option(...))
construct makes it hard(er) to get the gist of the change. Why not simply filter(_.getFileAction != null)
?
Looks clear(er) and less objects put on the JVM stack.
test("double delete-only on the same file") { | ||
withTempDir { tableDir => | ||
val tablePath = tableDir.toString | ||
spark.range(start = 0L, end = 10L, step = 1L, numPartitions = 1).toDF("id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why toDF("id")
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just making the name of column explicit, rather than having to know what spark.range
happens to name its column.
@jaceklaskowski I appreciate the comments, but this is a cherry-pick from master. I don't feel like anything less than a correctness issue justifies diverging the branches, since that'll just make future backports harder. |
Cherry-pick of 9f70ee5 for branch-2.4.
GitOrigin-RevId: 11438c01ecc69f7c55c3a8826fa540f6b984e4c4