Skip to content

Commit

Permalink
add config to run MERGE source materialization eagerly
Browse files Browse the repository at this point in the history
  • Loading branch information
larsk-db committed Jul 17, 2023
1 parent a2b73be commit ff0ea0b
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,11 @@ trait MergeIntoMaterializeSource extends DeltaLogging {
val sourceWithSelectedColumns = Project(referencedSourceColumns, source)
val baseSourcePlanDF = Dataset.ofRows(spark, sourceWithSelectedColumns)

// Caches the source in RDD cache using localCheckpopoint, which cuts away the RDD lineage,
// Caches the source in RDD cache using localCheckpoint, which cuts away the RDD lineage,
// which shall ensure that the source cannot be recomputed and thus become inconsistent.
val checkpointedSourcePlanDF = baseSourcePlanDF
// eager = false makes it be executed and materialized first time it's used.
// Doing it lazily inside the query lets it interleave this work better with other work.
// On the other hand, it makes it impossible to measure the time it took in a metric.
// Set eager=false for now, even if we should be doing eager, so that we can set the storage
// level before executing.
.localCheckpoint(eager = false)

// We have to reach through the crust and into the plan of the checkpointed DF
Expand Down Expand Up @@ -284,6 +283,20 @@ trait MergeIntoMaterializeSource extends DeltaLogging {
)
rdd.persist(storageLevel)

// WARNING: if eager == false, the source used during the first Spark Job that uses this may
// still be inconsistent with source materialized afterwards.
// This is because doCheckpoint that finalizes the lazy checkpoint is called after the Job
// that triggered the lazy checkpointing finished.
// If blocks were lost during that job, they may still get recomputed and changed compared
// to how they were used during the execution of the job.
if (spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER)) {
// Force the evaluation of the `rdd`, since we cannot access `doCheckpoint()` from here.
rdd
.mapPartitions(_ => Iterator.empty.asInstanceOf[Iterator[InternalRow]])
.foreach((_: InternalRow) => ())
assert(rdd.isCheckpointed)
}

logDebug(s"Materializing MERGE with pruned columns $referencedSourceColumns. ")
logDebug(s"Materialized MERGE source plan:\n${sourceDF.get.queryExecution}")
materializeReason
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,13 @@ trait DeltaSQLConfBase {
.intConf
.createWithDefault(4)

val MERGE_MATERIALIZE_SOURCE_EAGER =
buildConf("merge.materializeSource.eager")
.internal()
.doc("Materialize the source eagerly before Job 1")
.booleanConf
.createWithDefault(true)

val DELTA_LAST_COMMIT_VERSION_IN_SESSION =
buildConf("lastCommitVersionInSession")
.doc("The version of the last commit made in the SparkSession for any table.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,33 +76,36 @@ trait MergeIntoMaterializeSourceTests
}


test("merge logs out of disk errors") {
for (eager <- BOOLEAN_DOMAIN)
test(s"merge logs out of disk errors - eager=$eager") {
{
val tblName = "target"
withTable(tblName) {
val targetDF = spark.range(10).toDF("id").withColumn("value", rand())
targetDF.write.format("delta").saveAsTable(tblName)
spark.range(10).mapPartitions { x =>
throw new java.io.IOException("No space left on device")
x
}.toDF("id").withColumn("value", rand()).createOrReplaceTempView("s")
val events = Log4jUsageLogger.track {
val ex = intercept[SparkException] {
sql(s"MERGE INTO $tblName t USING s ON t.id = s.id " +
s"WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT *")
withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) {
val targetDF = spark.range(10).toDF("id").withColumn("value", rand())
targetDF.write.format("delta").saveAsTable(tblName)
spark.range(10).mapPartitions { x =>
throw new java.io.IOException("No space left on device")
x
}.toDF("id").withColumn("value", rand()).createOrReplaceTempView("s")
val events = Log4jUsageLogger.track {
val ex = intercept[SparkException] {
sql(s"MERGE INTO $tblName t USING s ON t.id = s.id " +
s"WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT *")
}
}.filter { e =>
e.metric == MetricDefinitions.EVENT_TAHOE.name &&
e.tags.get("opType").contains(MergeIntoMaterializeSourceError.OP_TYPE)
}
assert(events.length == 1)
val error = JsonUtils.fromJson[MergeIntoMaterializeSourceError](events(0).blob)
assert(error.errorType == MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString)
assert(error.attempt == 1)
val storageLevel = StorageLevel.fromString(
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL))
assert(error.materializedSourceRDDStorageLevel == storageLevel.toString)
}
}.filter { e =>
e.metric == MetricDefinitions.EVENT_TAHOE.name &&
e.tags.get("opType").contains(MergeIntoMaterializeSourceError.OP_TYPE)
}
assert(events.length == 1)
val error = JsonUtils.fromJson[MergeIntoMaterializeSourceError](events(0).blob)
assert(error.errorType == MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString)
assert(error.attempt == 1)
val storageLevel = StorageLevel.fromString(
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL))
assert(error.materializedSourceRDDStorageLevel == storageLevel.toString)
}
}
}

Expand Down Expand Up @@ -161,10 +164,11 @@ trait MergeIntoMaterializeSourceTests
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL_RETRY)
}
)
if (rdd.getStorageLevel != expectedStorageLevel) {
val rddStorageLevel = rdd.getStorageLevel
if (rddStorageLevel != expectedStorageLevel) {
invalidStorageLevel =
Some(s"For attempt ${seenSources.size} of materialized source expected " +
s"$expectedStorageLevel but got ${rdd.getStorageLevel}")
s"$expectedStorageLevel but got ${rddStorageLevel}")
finished = true
}
logInfo(s"Unpersisting mergeMaterializedSource with id=$rddId")
Expand Down Expand Up @@ -209,36 +213,48 @@ trait MergeIntoMaterializeSourceTests
val tblName = "target"

// For 1 to maxAttempts - 1 RDD block lost failures, merge should retry and succeed.
(1 to maxAttempts - 1).foreach { kills =>
test(s"materialize source unpersist with $kills kill attempts succeeds") {
for {
eager <- BOOLEAN_DOMAIN
kills <- 1 to maxAttempts - 1
} {
test(s"materialize source unpersist with $kills kill attempts succeeds - eager=$eager") {
withTable(tblName) {
val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, kills)
val events = allDeltaEvents.filter(_.tags.get("opType").contains("delta.dml.merge.stats"))
assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents")
val mergeStats = JsonUtils.fromJson[MergeStats](events(0).blob)
assert(mergeStats.materializeSourceAttempts.isDefined,
s"MergeStats:\n$mergeStats")
assert(mergeStats.materializeSourceAttempts.get == kills + 1,
s"MergeStats:\n$mergeStats")

// Check query result after merge
val tab = sql(s"select * from $tblName order by id")
.collect().map(row => row.getLong(0)).toSeq
assert(tab == (0L until 90L) ++ (100L until 120L))
withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) {
val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, kills)
val events =
allDeltaEvents.filter(_.tags.get("opType").contains("delta.dml.merge.stats"))
assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents")
val mergeStats = JsonUtils.fromJson[MergeStats](events(0).blob)
assert(mergeStats.materializeSourceAttempts.isDefined, s"MergeStats:\n$mergeStats")
assert(
mergeStats.materializeSourceAttempts.get == kills + 1,
s"MergeStats:\n$mergeStats")

// Check query result after merge
val tab = sql(s"select * from $tblName order by id")
.collect()
.map(row => row.getLong(0))
.toSeq
assert(tab == (0L until 90L) ++ (100L until 120L))
}
}
}
}

// Eventually it should fail after exceeding maximum number of attempts.
test(s"materialize source unpersist with $maxAttempts kill attempts fails") {
withTable(tblName) {
val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, maxAttempts)
val events = allDeltaEvents
.filter(_.tags.get("opType").contains(MergeIntoMaterializeSourceError.OP_TYPE))
assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents")
val error = JsonUtils.fromJson[MergeIntoMaterializeSourceError](events(0).blob)
assert(error.errorType == MergeIntoMaterializeSourceErrorType.RDD_BLOCK_LOST.toString)
assert(error.attempt == maxAttempts)
for (eager <- BOOLEAN_DOMAIN) {
test(s"materialize source unpersist with $maxAttempts kill attempts fails - eager=$eager") {
withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) {
withTable(tblName) {
val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, maxAttempts)
val events = allDeltaEvents
.filter(_.tags.get("opType").contains(MergeIntoMaterializeSourceError.OP_TYPE))
assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents")
val error = JsonUtils.fromJson[MergeIntoMaterializeSourceError](events(0).blob)
assert(error.errorType == MergeIntoMaterializeSourceErrorType.RDD_BLOCK_LOST.toString)
assert(error.attempt == maxAttempts)
}
}
}
}
}
Expand Down Expand Up @@ -292,14 +308,17 @@ trait MergeIntoMaterializeSourceTests
hints
}

test("materialize source preserves dataframe hints") {
for (eager <- BOOLEAN_DOMAIN)
test(s"materialize source preserves dataframe hints - eager=$eager") {
withTable("A", "B", "T") {
sql("select id, id as v from range(50000)").write.format("delta").saveAsTable("T")
sql("select id, id+2 as v from range(10000)").write.format("csv").saveAsTable("A")
sql("select id, id*2 as v from range(1000)").write.format("csv").saveAsTable("B")

// Manually added broadcast hint will mess up the expected hints hence disable it
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withSQLConf(
DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString,
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
// Simple BROADCAST hint
val hSimple = getHints(
sql("MERGE INTO T USING (SELECT /*+ BROADCAST */ * FROM A) s ON T.id = s.id" +
Expand Down

0 comments on commit ff0ea0b

Please sign in to comment.