From a0fcf208896a814942f22e6742cc4e36a83af517 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen Date: Tue, 1 Oct 2024 16:53:50 -0700 Subject: [PATCH 1/2] Pass table identifier from DeltaLog constructors to the commit coordinator --- .../org/apache/spark/sql/delta/DeltaLog.scala | 24 ++++--- .../spark/sql/delta/SnapshotManagement.scala | 65 +++++++++++++++---- 2 files changed, 69 insertions(+), 20 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 7d23ec134d..090422ecb1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -69,6 +69,7 @@ import org.apache.spark.util._ * @param options Filesystem options filtered from `allOptions`. * @param allOptions All options provided by the user, for example via `df.write.option()`. This * includes but not limited to filesystem and table properties. + * @param initialTableIdentifier Identifier of the table when the log is initialized. * @param clock Clock to be used when starting a new transaction. */ class DeltaLog private( @@ -76,6 +77,7 @@ class DeltaLog private( val dataPath: Path, val options: Map[String, String], val allOptions: Map[String, String], + val initialTableIdentifier: Option[TableIdentifier], val clock: Clock ) extends Checkpoints with MetadataCleanup @@ -750,22 +752,22 @@ object DeltaLog extends DeltaLogging { /** Helper for creating a log when it stored at the root of the data. */ def forTable(spark: SparkSession, dataPath: String): DeltaLog = { - apply(spark, logPathFor(dataPath), Map.empty, new SystemClock) + apply(spark, logPathFor(dataPath), Map.empty, None, new SystemClock) } /** Helper for creating a log when it stored at the root of the data. */ def forTable(spark: SparkSession, dataPath: Path): DeltaLog = { - apply(spark, logPathFor(dataPath), new SystemClock) + apply(spark, logPathFor(dataPath), None, new SystemClock) } /** Helper for creating a log when it stored at the root of the data. */ def forTable(spark: SparkSession, dataPath: Path, options: Map[String, String]): DeltaLog = { - apply(spark, logPathFor(dataPath), options, new SystemClock) + apply(spark, logPathFor(dataPath), options, None, new SystemClock) } /** Helper for creating a log when it stored at the root of the data. */ def forTable(spark: SparkSession, dataPath: Path, clock: Clock): DeltaLog = { - apply(spark, logPathFor(dataPath), clock) + apply(spark, logPathFor(dataPath), None, clock) } /** Helper for creating a log for the table. */ @@ -789,11 +791,15 @@ object DeltaLog extends DeltaLogging { /** Helper for creating a log for the table. */ def forTable(spark: SparkSession, table: CatalogTable, clock: Clock): DeltaLog = { - apply(spark, logPathFor(new Path(table.location)), clock) + apply(spark, logPathFor(new Path(table.location)), Some(table.identifier), clock) } - private def apply(spark: SparkSession, rawPath: Path, clock: Clock = new SystemClock): DeltaLog = - apply(spark, rawPath, Map.empty, clock) + private def apply( + spark: SparkSession, + rawPath: Path, + initialTableIdentifier: Option[TableIdentifier], + clock: Clock): DeltaLog = + apply(spark, rawPath, Map.empty, initialTableIdentifier, clock) /** Helper for getting a log, as well as the latest snapshot, of the table */ @@ -815,7 +821,7 @@ object DeltaLog extends DeltaLogging { spark: SparkSession, dataPath: Path, options: Map[String, String]): (DeltaLog, Snapshot) = - withFreshSnapshot { apply(spark, logPathFor(dataPath), options, _) } + withFreshSnapshot { apply(spark, logPathFor(dataPath), options, None, _) } /** * Helper function to be used with the forTableWithSnapshot calls. Thunk is a @@ -834,6 +840,7 @@ object DeltaLog extends DeltaLogging { spark: SparkSession, rawPath: Path, options: Map[String, String], + initialTableIdentifier: Option[TableIdentifier], clock: Clock ): DeltaLog = { val fileSystemOptions: Map[String, String] = @@ -862,6 +869,7 @@ object DeltaLog extends DeltaLogging { dataPath = path.getParent, options = fileSystemOptions, allOptions = options, + initialTableIdentifier = initialTableIdentifier, clock = clock ) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index cfb7bb6351..86d71c7eda 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.util.{ThreadUtils, Utils} /** @@ -155,6 +156,8 @@ trait SnapshotManagement { self: DeltaLog => * @param startVersion the version to start. Inclusive. * @param tableCommitCoordinatorClientOpt the optional commit coordinator to use for fetching * un-backfilled commits. + * @param tableIdentifierOpt the optional table identifier to pass to the commit coordinator + * client. * @param versionToLoad the optional parameter to set the max version we should return. Inclusive. * @param includeMinorCompactions Whether to include minor compaction files in the result * @return A tuple where the first element is an array of log files (possibly empty, if no @@ -164,6 +167,7 @@ trait SnapshotManagement { self: DeltaLog => protected def listDeltaCompactedDeltaCheckpointFilesAndLatestChecksumFile( startVersion: Long, tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], + tableIdentifierOpt: Option[TableIdentifier], versionToLoad: Option[Long], includeMinorCompactions: Boolean): (Option[Array[FileStatus]], Option[FileStatus]) = { val tableCommitCoordinatorClient = tableCommitCoordinatorClientOpt.getOrElse { @@ -174,9 +178,6 @@ trait SnapshotManagement { self: DeltaLog => // Submit a potential async call to get commits from commit coordinator if available val threadPool = SnapshotManagement.commitCoordinatorGetCommitsThreadPool - // TODO(table-identifier-plumbing): Plumb the right tableIdentifier from the deltaLog.update and - // Cold deltaLog initialization codepath. - val tableIdentifierOpt = None def getCommitsTask(isAsyncRequest: Boolean): GetCommitsResponse = { CoordinatedCommitsUtils.getCommitsFromCommitCoordinatorWithUsageLogs( this, tableCommitCoordinatorClient, tableIdentifierOpt, @@ -314,6 +315,7 @@ trait SnapshotManagement { self: DeltaLog => * @param startVersion the version to start. Inclusive. * @param tableCommitCoordinatorClientOpt the optional commit-coordinator client to use for * fetching un-backfilled commits. + * @param tableIdentifierOpt the optional identifier of the target delta table. * @param versionToLoad the optional parameter to set the max version we should return. Inclusive. * @param includeMinorCompactions Whether to include minor compaction files in the result * @return Some array of files found (possibly empty, if no usable commit files are present), or @@ -322,12 +324,18 @@ trait SnapshotManagement { self: DeltaLog => protected final def listDeltaCompactedDeltaAndCheckpointFiles( startVersion: Long, tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], + tableIdentifierOpt: Option[TableIdentifier], versionToLoad: Option[Long], includeMinorCompactions: Boolean): Option[Array[FileStatus]] = { recordDeltaOperation(self, "delta.deltaLog.listDeltaAndCheckpointFiles") { val (logTuplesOpt, latestChecksumOpt) = listDeltaCompactedDeltaCheckpointFilesAndLatestChecksumFile( - startVersion, tableCommitCoordinatorClientOpt, versionToLoad, includeMinorCompactions) + startVersion, + tableCommitCoordinatorClientOpt, + tableIdentifierOpt, + versionToLoad, + includeMinorCompactions + ) lastSeenChecksumFileStatusOpt = latestChecksumOpt logTuplesOpt } @@ -354,6 +362,7 @@ trait SnapshotManagement { self: DeltaLog => versionToLoad: Option[Long] = None, oldCheckpointProviderOpt: Option[UninitializedCheckpointProvider] = None, tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient] = None, + tableIdentifierOpt: Option[TableIdentifier] = None, lastCheckpointInfo: Option[LastCheckpointInfo] = None): Option[LogSegment] = { // List based on the last known checkpoint version. // if that is -1, list from version 0L @@ -362,21 +371,31 @@ trait SnapshotManagement { self: DeltaLog => val includeMinorCompactions = spark.conf.get(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_READS) val newFiles = listDeltaCompactedDeltaAndCheckpointFiles( - listingStartVersion, tableCommitCoordinatorClientOpt, versionToLoad, includeMinorCompactions) + listingStartVersion, + tableCommitCoordinatorClientOpt, + tableIdentifierOpt, + versionToLoad, + includeMinorCompactions + ) getLogSegmentForVersion( versionToLoad, newFiles, validateLogSegmentWithoutCompactedDeltas = true, oldCheckpointProviderOpt = oldCheckpointProviderOpt, tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, + tableIdentifierOpt = tableIdentifierOpt, lastCheckpointInfo = lastCheckpointInfo ) } - private def createLogSegment(previousSnapshot: Snapshot): Option[LogSegment] = { + private def createLogSegment( + previousSnapshot: Snapshot, + tableIdentifierOpt: Option[TableIdentifier]): Option[LogSegment] = { createLogSegment( oldCheckpointProviderOpt = Some(previousSnapshot.checkpointProvider), - tableCommitCoordinatorClientOpt = previousSnapshot.tableCommitCoordinatorClientOpt) + tableCommitCoordinatorClientOpt = previousSnapshot.tableCommitCoordinatorClientOpt, + tableIdentifierOpt = tableIdentifierOpt + ) } /** @@ -439,6 +458,7 @@ trait SnapshotManagement { self: DeltaLog => files: Option[Array[FileStatus]], validateLogSegmentWithoutCompactedDeltas: Boolean, tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], + tableIdentifierOpt: Option[TableIdentifier] = None, oldCheckpointProviderOpt: Option[UninitializedCheckpointProvider], lastCheckpointInfo: Option[LastCheckpointInfo]): Option[LogSegment] = { recordFrameProfile("Delta", "SnapshotManagement.getLogSegmentForVersion") { @@ -464,6 +484,7 @@ trait SnapshotManagement { self: DeltaLog => } else if (newFiles.isEmpty) { // The directory may be deleted and recreated and we may have stale state in our DeltaLog // singleton, so try listing from the first version + // TODO(table-identifier-plumbing): Pass the tableIdentifierOpt here return createLogSegment(versionToLoad = versionToLoad) } val (checkpoints, deltasAndCompactedDeltas) = newFiles.partition(isCheckpointFile) @@ -483,8 +504,13 @@ trait SnapshotManagement { self: DeltaLog => recordDeltaEvent(this, "delta.checkpoint.error.partial") val snapshotVersion = versionToLoad.getOrElse(deltaVersion(deltas.last)) getLogSegmentWithMaxExclusiveCheckpointVersion( - snapshotVersion, lastCheckpointVersion, tableCommitCoordinatorClientOpt) - .foreach { alternativeLogSegment => return Some(alternativeLogSegment) } + snapshotVersion, + lastCheckpointVersion, + tableCommitCoordinatorClientOpt, + tableIdentifierOpt + ).foreach { alternativeLogSegment => + return Some(alternativeLogSegment) + } // No alternative found, but the directory contains files so we cannot return None. throw DeltaErrors.missingPartFilesException( @@ -638,11 +664,13 @@ trait SnapshotManagement { self: DeltaLog => val lastCheckpointOpt = readLastCheckpointFile() val initialSegmentForNewSnapshot = createLogSegment( versionToLoad = None, + tableIdentifierOpt = initialTableIdentifier, lastCheckpointInfo = lastCheckpointOpt) val snapshot = getUpdatedSnapshot( oldSnapshotOpt = None, initialSegmentForNewSnapshot = initialSegmentForNewSnapshot, initialTableCommitCoordinatorClient = None, + tableIdentifierOpt = initialTableIdentifier, isAsync = false) CapturedSnapshot(snapshot, snapshotInitWallclockTime) } @@ -712,7 +740,8 @@ trait SnapshotManagement { self: DeltaLog => private def getLogSegmentWithMaxExclusiveCheckpointVersion( snapshotVersion: Long, maxExclusiveCheckpointVersion: Long, - tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient]): Option[LogSegment] = { + tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], + tableIdentifierOpt: Option[TableIdentifier] = None): Option[LogSegment] = { assert( snapshotVersion >= maxExclusiveCheckpointVersion, s"snapshotVersion($snapshotVersion) is less than " + @@ -725,6 +754,7 @@ trait SnapshotManagement { self: DeltaLog => val filesSinceCheckpointVersion = listDeltaCompactedDeltaAndCheckpointFiles( startVersion = cp.version, tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, + tableIdentifierOpt = tableIdentifierOpt, versionToLoad = Some(snapshotVersion), includeMinorCompactions = false ).getOrElse(Array.empty) @@ -767,6 +797,7 @@ trait SnapshotManagement { self: DeltaLog => listDeltaCompactedDeltaAndCheckpointFiles( startVersion = 0, tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, + tableIdentifierOpt = tableIdentifierOpt, versionToLoad = Some(snapshotVersion), includeMinorCompactions = false) val (deltas, deltaVersions) = @@ -830,6 +861,7 @@ trait SnapshotManagement { self: DeltaLog => * Instead, just do a general update to the latest available version. The racing commits * can then use the version check short-circuit to avoid constructing a new snapshot. */ + // TODO(table-identifier-plumbing): Pass the tableIdentifierOpt here createLogSegment( oldCheckpointProviderOpt = Some(oldCheckpointProvider), tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt @@ -867,6 +899,7 @@ trait SnapshotManagement { self: DeltaLog => } logWarning(log"Failed to create a snapshot from log segment " + log"${MDC(DeltaLogKeys.SEGMENT, segment)}. Trying a different checkpoint.", e) + // TODO(table-identifier-plumbing): Plumb the right tableIdentifier here segment = getLogSegmentWithMaxExclusiveCheckpointVersion( segment.version, segment.checkpointProvider.version, @@ -901,9 +934,11 @@ trait SnapshotManagement { self: DeltaLog => tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient] ): (LogSegment, Seq[FileStatus]) = { val includeCompactions = spark.conf.get(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_READS) + // TODO(table-identifier-plumbing): Plumb the right tableIdentifier here val newFilesOpt = listDeltaCompactedDeltaAndCheckpointFiles( startVersion = oldLogSegment.version + 1, tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, + tableIdentifierOpt = None, versionToLoad = None, includeMinorCompactions = includeCompactions) val newFiles = newFilesOpt.getOrElse { @@ -917,6 +952,7 @@ trait SnapshotManagement { self: DeltaLog => newFiles ).toArray val lastCheckpointInfo = Option.empty[LastCheckpointInfo] + // TODO(table-identifier-plumbing): Plumb the right tableIdentifier here val newLogSegment = getLogSegmentForVersion( versionToLoad = None, files = Some(allFiles), @@ -1049,11 +1085,14 @@ trait SnapshotManagement { self: DeltaLog => recordDeltaOperation(this, "delta.log.update", Map(TAG_ASYNC -> isAsync.toString)) { val updateStartTimeMs = clock.getTimeMillis() val previousSnapshot = currentSnapshot.snapshot - val segmentOpt = createLogSegment(previousSnapshot) + // TODO(table-identifier-plumbing): pass the right tableIdentifier here + val segmentOpt = createLogSegment(previousSnapshot, tableIdentifierOpt = None) + // TODO(table-identifier-plumbing): pass the right tableIdentifier here val newSnapshot = getUpdatedSnapshot( oldSnapshotOpt = Some(previousSnapshot), initialSegmentForNewSnapshot = segmentOpt, initialTableCommitCoordinatorClient = previousSnapshot.tableCommitCoordinatorClientOpt, + tableIdentifierOpt = None, isAsync = isAsync) installSnapshot(newSnapshot, updateStartTimeMs) } @@ -1073,6 +1112,7 @@ trait SnapshotManagement { self: DeltaLog => oldSnapshotOpt: Option[Snapshot], initialSegmentForNewSnapshot: Option[LogSegment], initialTableCommitCoordinatorClient: Option[TableCommitCoordinatorClient], + tableIdentifierOpt: Option[TableIdentifier], isAsync: Boolean): Snapshot = { var newSnapshot = getSnapshotForLogSegmentInternal( oldSnapshotOpt, @@ -1089,7 +1129,7 @@ trait SnapshotManagement { self: DeltaLog => initialTableCommitCoordinatorClient.forall(!_.semanticsEquals(newStore)) } if (usedStaleCommitCoordinator) { - val segmentOpt = createLogSegment(newSnapshot) + val segmentOpt = createLogSegment(newSnapshot, tableIdentifierOpt) newSnapshot = getSnapshotForLogSegmentInternal( Some(newSnapshot), segmentOpt, newSnapshot.tableCommitCoordinatorClientOpt, isAsync) @@ -1274,6 +1314,7 @@ trait SnapshotManagement { self: DeltaLog => .map(manuallyLoadCheckpoint) lastCheckpointInfoForListing -> None } + // TODO(table-identifier-plumbing): Pass the right tableIdentifier here val logSegmentOpt = createLogSegment( versionToLoad = Some(version), oldCheckpointProviderOpt = lastCheckpointProviderOpt, From 8b01b6d48cdd8e62cc5591bca990e2db06e205d8 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen Date: Wed, 2 Oct 2024 10:21:36 -0700 Subject: [PATCH 2/2] Update documentation --- .../scala/org/apache/spark/sql/delta/SnapshotManagement.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index 86d71c7eda..87435b2e6f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -1105,6 +1105,7 @@ trait SnapshotManagement { self: DeltaLog => * @param initialSegmentForNewSnapshot the log segment constructed for the new snapshot * @param initialTableCommitCoordinatorClient the commit-coordinator used for constructing the * `initialSegmentForNewSnapshot` + * @param tableIdentifierOpt The optional identifier of the target table. * @param isAsync Whether the update is async. * @return The new snapshot. */