diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala index fe28f1f2818..8f573e50d02 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala @@ -257,12 +257,11 @@ trait MergeIntoMaterializeSource extends DeltaLogging { val sourceWithSelectedColumns = Project(referencedSourceColumns, source) val baseSourcePlanDF = Dataset.ofRows(spark, sourceWithSelectedColumns) - // Caches the source in RDD cache using localCheckpopoint, which cuts away the RDD lineage, + // Caches the source in RDD cache using localCheckpoint, which cuts away the RDD lineage, // which shall ensure that the source cannot be recomputed and thus become inconsistent. val checkpointedSourcePlanDF = baseSourcePlanDF - // eager = false makes it be executed and materialized first time it's used. - // Doing it lazily inside the query lets it interleave this work better with other work. - // On the other hand, it makes it impossible to measure the time it took in a metric. + // Set eager=false for now, even if we should be doing eager, so that we can set the storage + // level before executing. .localCheckpoint(eager = false) // We have to reach through the crust and into the plan of the checkpointed DF @@ -289,6 +288,20 @@ trait MergeIntoMaterializeSource extends DeltaLogging { ) rdd.persist(storageLevel) + // WARNING: if eager == false, the source used during the first Spark Job that uses this may + // still be inconsistent with source materialized afterwards. + // This is because doCheckpoint that finalizes the lazy checkpoint is called after the Job + // that triggered the lazy checkpointing finished. + // If blocks were lost during that job, they may still get recomputed and changed compared + // to how they were used during the execution of the job. + if (spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER)) { + // Force the evaluation of the `rdd`, since we cannot access `doCheckpoint()` from here. + rdd + .mapPartitions(_ => Iterator.empty.asInstanceOf[Iterator[InternalRow]]) + .foreach((_: InternalRow) => ()) + assert(rdd.isCheckpointed) + } + logDebug(s"Materializing MERGE with pruned columns $referencedSourceColumns. ") logDebug(s"Materialized MERGE source plan:\n${sourceDF.get.queryExecution}") materializeReason diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 7fdc74f7adf..7cb6150284d 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -510,6 +510,13 @@ trait DeltaSQLConfBase { .intConf .createWithDefault(4) + val MERGE_MATERIALIZE_SOURCE_EAGER = + buildConf("merge.materializeSource.eager") + .internal() + .doc("Materialize the source eagerly before Job 1") + .booleanConf + .createWithDefault(true) + val DELTA_LAST_COMMIT_VERSION_IN_SESSION = buildConf("lastCommitVersionInSession") .doc("The version of the last commit made in the SparkSession for any table.") diff --git a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala index b7627dafdb7..6fb92c10e3a 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.delta import scala.collection.mutable +import scala.concurrent.duration._ import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -77,19 +78,22 @@ trait MergeIntoMaterializeSourceTests - test("merge logs out of disk errors") { - val injectEx = new java.io.IOException("No space left on device") - testWithCustomErrorInjected[SparkException](injectEx) { (thrownEx, errorOpt) => - // Compare messages instead of instances, since the equals method for these exceptions - // takes more into account. - assert(thrownEx.getCause.getMessage === injectEx.getMessage) - assert(errorOpt.isDefined) - val error = errorOpt.get - assert(error.errorType == MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString) - assert(error.attempt == 1) - val storageLevel = StorageLevel.fromString( - spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL)) - assert(error.materializedSourceRDDStorageLevel == storageLevel.toString) + for (eager <- BOOLEAN_DOMAIN) + test(s"merge logs out of disk errors - eager=$eager") { + withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) { + val injectEx = new java.io.IOException("No space left on device") + testWithCustomErrorInjected[SparkException](injectEx) { (thrownEx, errorOpt) => + // Compare messages instead of instances, since the equals method for these exceptions + // takes more into account. + assert(thrownEx.getCause.getMessage === injectEx.getMessage) + assert(errorOpt.isDefined) + val error = errorOpt.get + assert(error.errorType == MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString) + assert(error.attempt == 1) + val storageLevel = StorageLevel.fromString( + spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL)) + assert(error.materializedSourceRDDStorageLevel == storageLevel.toString) + } } } @@ -196,10 +200,11 @@ trait MergeIntoMaterializeSourceTests spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL_RETRY) } ) - if (rdd.getStorageLevel != expectedStorageLevel) { + val rddStorageLevel = rdd.getStorageLevel + if (rddStorageLevel != expectedStorageLevel) { invalidStorageLevel = Some(s"For attempt ${seenSources.size} of materialized source expected " + - s"$expectedStorageLevel but got ${rdd.getStorageLevel}") + s"$expectedStorageLevel but got ${rddStorageLevel}") finished = true } logInfo(s"Unpersisting mergeMaterializedSource with id=$rddId") @@ -244,36 +249,48 @@ trait MergeIntoMaterializeSourceTests val tblName = "target" // For 1 to maxAttempts - 1 RDD block lost failures, merge should retry and succeed. - (1 to maxAttempts - 1).foreach { kills => - test(s"materialize source unpersist with $kills kill attempts succeeds") { + for { + eager <- BOOLEAN_DOMAIN + kills <- 1 to maxAttempts - 1 + } { + test(s"materialize source unpersist with $kills kill attempts succeeds - eager=$eager") { withTable(tblName) { - val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, kills) - val events = allDeltaEvents.filter(_.tags.get("opType").contains("delta.dml.merge.stats")) - assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents") - val mergeStats = JsonUtils.fromJson[MergeStats](events(0).blob) - assert(mergeStats.materializeSourceAttempts.isDefined, - s"MergeStats:\n$mergeStats") - assert(mergeStats.materializeSourceAttempts.get == kills + 1, - s"MergeStats:\n$mergeStats") - - // Check query result after merge - val tab = sql(s"select * from $tblName order by id") - .collect().map(row => row.getLong(0)).toSeq - assert(tab == (0L until 90L) ++ (100L until 120L)) + withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) { + val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, kills) + val events = + allDeltaEvents.filter(_.tags.get("opType").contains("delta.dml.merge.stats")) + assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents") + val mergeStats = JsonUtils.fromJson[MergeStats](events(0).blob) + assert(mergeStats.materializeSourceAttempts.isDefined, s"MergeStats:\n$mergeStats") + assert( + mergeStats.materializeSourceAttempts.get == kills + 1, + s"MergeStats:\n$mergeStats") + + // Check query result after merge + val tab = sql(s"select * from $tblName order by id") + .collect() + .map(row => row.getLong(0)) + .toSeq + assert(tab == (0L until 90L) ++ (100L until 120L)) + } } } } // Eventually it should fail after exceeding maximum number of attempts. - test(s"materialize source unpersist with $maxAttempts kill attempts fails") { - withTable(tblName) { - val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, maxAttempts) - val events = allDeltaEvents - .filter(_.tags.get("opType").contains(MergeIntoMaterializeSourceError.OP_TYPE)) - assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents") - val error = JsonUtils.fromJson[MergeIntoMaterializeSourceError](events(0).blob) - assert(error.errorType == MergeIntoMaterializeSourceErrorType.RDD_BLOCK_LOST.toString) - assert(error.attempt == maxAttempts) + for (eager <- BOOLEAN_DOMAIN) { + test(s"materialize source unpersist with $maxAttempts kill attempts fails - eager=$eager") { + withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) { + withTable(tblName) { + val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, maxAttempts) + val events = allDeltaEvents + .filter(_.tags.get("opType").contains(MergeIntoMaterializeSourceError.OP_TYPE)) + assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents") + val error = JsonUtils.fromJson[MergeIntoMaterializeSourceError](events(0).blob) + assert(error.errorType == MergeIntoMaterializeSourceErrorType.RDD_BLOCK_LOST.toString) + assert(error.attempt == maxAttempts) + } + } } } } @@ -327,14 +344,17 @@ trait MergeIntoMaterializeSourceTests hints } - test("materialize source preserves dataframe hints") { + for (eager <- BOOLEAN_DOMAIN) + test(s"materialize source preserves dataframe hints - eager=$eager") { withTable("A", "B", "T") { sql("select id, id as v from range(50000)").write.format("delta").saveAsTable("T") sql("select id, id+2 as v from range(10000)").write.format("csv").saveAsTable("A") sql("select id, id*2 as v from range(1000)").write.format("csv").saveAsTable("B") // Manually added broadcast hint will mess up the expected hints hence disable it - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { // Simple BROADCAST hint val hSimple = getHints( sql("MERGE INTO T USING (SELECT /*+ BROADCAST */ * FROM A) s ON T.id = s.id" +