Skip to content

Commit

Permalink
[Spark] Passing TableIdentifier from CheckpointHook to commit coordin…
Browse files Browse the repository at this point in the history
…ator client (#3695)

## Description
As part of the effort to make commit coordinator client aware of the
table identifier, this PR handles the code path going from the
checkpoint hook to the commit coordinator client.

## How was this patch tested?
Ran existing unit tests
  • Loading branch information
ctringdb authored Sep 21, 2024
1 parent 5bfd99e commit 1753cb5
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
22 changes: 13 additions & 9 deletions spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.internal.MDC
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, Literal}
import org.apache.spark.sql.execution.SQLExecution
Expand Down Expand Up @@ -298,13 +299,15 @@ trait Checkpoints extends DeltaLogging {
* Note that this function captures and logs all exceptions, since the checkpoint shouldn't fail
* the overall commit operation.
*/
def checkpoint(snapshotToCheckpoint: Snapshot): Unit = recordDeltaOperation(
this, "delta.checkpoint") {
def checkpoint(
snapshotToCheckpoint: Snapshot,
tableIdentifierOpt: Option[TableIdentifier] = None): Unit =
recordDeltaOperation(this, "delta.checkpoint") {
withCheckpointExceptionHandling(snapshotToCheckpoint.deltaLog, "delta.checkpoint.sync.error") {
if (snapshotToCheckpoint.version < 0) {
throw DeltaErrors.checkpointNonExistTable(dataPath)
}
checkpointAndCleanUpDeltaLog(snapshotToCheckpoint)
checkpointAndCleanUpDeltaLog(snapshotToCheckpoint, tableIdentifierOpt)
}
}

Expand All @@ -324,8 +327,9 @@ trait Checkpoints extends DeltaLogging {
}

def checkpointAndCleanUpDeltaLog(
snapshotToCheckpoint: Snapshot): Unit = {
val lastCheckpointInfo = writeCheckpointFiles(snapshotToCheckpoint)
snapshotToCheckpoint: Snapshot,
tableIdentifierOpt: Option[TableIdentifier] = None): Unit = {
val lastCheckpointInfo = writeCheckpointFiles(snapshotToCheckpoint, tableIdentifierOpt)
writeLastCheckpointFile(
snapshotToCheckpoint.deltaLog, lastCheckpointInfo, LastCheckpointInfo.checksumEnabled(spark))
doLogCleanup(snapshotToCheckpoint)
Expand All @@ -347,7 +351,9 @@ trait Checkpoints extends DeltaLogging {
}
}

protected def writeCheckpointFiles(snapshotToCheckpoint: Snapshot): LastCheckpointInfo = {
protected def writeCheckpointFiles(
snapshotToCheckpoint: Snapshot,
tableIdentifierOpt: Option[TableIdentifier] = None): LastCheckpointInfo = {
// With Coordinated-Commits, commit files are not guaranteed to be backfilled immediately in the
// _delta_log dir. While it is possible to compute a checkpoint file without backfilling,
// writing the checkpoint file in the log directory before backfilling the relevant commits
Expand All @@ -362,9 +368,7 @@ trait Checkpoints extends DeltaLogging {
// 00015.json
// 00016.json
// 00018.checkpoint.parquet
// TODO(table-identifier-plumbing): Plumb the right tableIdentifier from the Checkpoint Hook
// and pass it to `ensureCommitFilesBackfilled`.
snapshotToCheckpoint.ensureCommitFilesBackfilled(tableIdentifierOpt = None)
snapshotToCheckpoint.ensureCommitFilesBackfilled(tableIdentifierOpt)
Checkpoints.writeCheckpoint(spark, this, snapshotToCheckpoint)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ object CheckpointHook extends PostCommitHook {
committedVersion,
lastCheckpointHint = None,
lastCheckpointProvider = Some(cp))
txn.deltaLog.checkpoint(snapshotToCheckpoint)
txn.deltaLog.checkpoint(snapshotToCheckpoint, txn.catalogTable.map(_.identifier))
}
}

0 comments on commit 1753cb5

Please sign in to comment.