Skip to content

Commit

Permalink
Do not split commits that contain DVs in CDF streaming
Browse files Browse the repository at this point in the history
Cherry-pick of 9f70ee5 for branch-2.4.

- When the add and remove file entries for a commit that contains DVs gets split into different batches, we don't recognise that they belong together and we need to produce CDF by calculating the diff of the two DVS.
- This PR prevents splitting of these commits, just like we do for AddCDCFile actions to prevent splitting the update_{pre/post}_image information.

GitOrigin-RevId: 11438c01ecc69f7c55c3a8826fa540f6b984e4c4
  • Loading branch information
larsk-db committed Aug 22, 2023
1 parent de19540 commit 9ef19f1
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1033,12 +1033,12 @@ case class DeltaSource(
var commitProcessedInBatch = false

/**
* This overloaded method checks if all the AddCDCFiles for a commit can be accommodated by
* This overloaded method checks if all the FileActions for a commit can be accommodated by
* the rate limit.
*/
def admit(fileActions: Seq[AddCDCFile]): Boolean = {
def getSize(actions: Seq[AddCDCFile]): Long = {
actions.foldLeft(0L) { (l, r) => l + r.size }
def admit(fileActions: Seq[FileAction]): Boolean = {
def getSize(actions: Seq[FileAction]): Long = {
actions.foldLeft(0L) { (l, r) => l + r.getFileSize }
}
if (fileActions.isEmpty) {
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,31 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
// We allow entries with no file actions and index as [[DeltaSourceOffset.BASE_INDEX]]
// that are used primarily to update latest offset when no other
// file action based entries are present.
fileActions.filter(indexedFile => hasAddsOrRemoves(indexedFile) ||
hasNoFileActionAndStartIndex(indexedFile))
.filter(
isValidIndexedFile(_, fromVersion, fromIndex, endOffset)
).takeWhile { indexedFile =>
admissionControl.admit(Option(indexedFile.getFileAction))
}.toIterator
val filteredFiles = fileActions
.filter { indexedFile =>
hasAddsOrRemoves(indexedFile) || hasNoFileActionAndStartIndex(indexedFile)
}
.filter(isValidIndexedFile(_, fromVersion, fromIndex, endOffset))
val filteredFileActions = filteredFiles.flatMap(f => Option(f.getFileAction))
val hasDeletionVectors = filteredFileActions.exists {
case add: AddFile => add.deletionVector != null
case remove: RemoveFile => remove.deletionVector != null
case _ => false
}
if (hasDeletionVectors) {
// We cannot split up add/remove pairs with Deletion Vectors, because we will get the
// wrong result.
// So in this case we behave as above with CDC files and either admit all or none.
if (admissionControl.admit(filteredFileActions)) {
filteredFiles.toIterator
} else {
Iterator()
}
} else {
filteredFiles.takeWhile { indexedFile =>
admissionControl.admit(Option(indexedFile.getFileAction))
}.toIterator
}
}
}
}
Expand Down Expand Up @@ -279,7 +297,6 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
} else {
filterAndIndexDeltaLogs(fromVersion)
}

// In this case, filterFiles will consume the available capacity. We use takeWhile
// to stop the iteration when we reach the limit which will save us from reading
// unnecessary log files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,40 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
}
}
}

// LC-1281: Ensure that when we would split batches into one file at a time, we still produce
// correct CDF even in cases where the CDF may need to compare multiple file actions from the
// same commit to be correct, such as with persistent deletion vectors.
test("double delete-only on the same file") {
withTempDir { tableDir =>
val tablePath = tableDir.toString
spark.range(start = 0L, end = 10L, step = 1L, numPartitions = 1).toDF("id")
.write.format("delta").save(tablePath)

spark.sql(s"DELETE FROM delta.`$tablePath` WHERE id IN (1, 3, 6)")
spark.sql(s"DELETE FROM delta.`$tablePath` WHERE id IN (2, 4, 7)")

val stream = spark.readStream
.format("delta")
.option(DeltaOptions.CDC_READ_OPTION, true)
.option(DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION, 1)
.option(DeltaOptions.STARTING_VERSION_OPTION, 1)
.load(tablePath)
.drop(CDCReader.CDC_COMMIT_TIMESTAMP)

testStream(stream)(
ProcessAllAvailable(),
CheckAnswer(
(1L, "delete", 1L),
(3L, "delete", 1L),
(6L, "delete", 1L),
(2L, "delete", 2L),
(4L, "delete", 2L),
(7L, "delete", 2L)
)
)
}
}
}

class DeltaCDCStreamDeletionVectorSuite extends DeltaCDCStreamSuite
Expand Down

0 comments on commit 9ef19f1

Please sign in to comment.