diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala index fafcac617f8..f3a1274c28a 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala @@ -242,12 +242,11 @@ trait MergeIntoMaterializeSource extends DeltaLogging { val sourceWithSelectedColumns = Project(referencedSourceColumns, source) val baseSourcePlanDF = Dataset.ofRows(spark, sourceWithSelectedColumns) - // Caches the source in RDD cache using localCheckpopoint, which cuts away the RDD lineage, + // Caches the source in RDD cache using localCheckpoint, which cuts away the RDD lineage, // which shall ensure that the source cannot be recomputed and thus become inconsistent. val checkpointedSourcePlanDF = baseSourcePlanDF - // eager = false makes it be executed and materialized first time it's used. - // Doing it lazily inside the query lets it interleave this work better with other work. - // On the other hand, it makes it impossible to measure the time it took in a metric. + // Set eager=false for now, even if we should be doing eager, so that we can set the storage + // level before executing. .localCheckpoint(eager = false) // We have to reach through the crust and into the plan of the checkpointed DF @@ -284,6 +283,20 @@ trait MergeIntoMaterializeSource extends DeltaLogging { ) rdd.persist(storageLevel) + // WARNING: if eager == false, the source used during the first Spark Job that uses this may + // still be inconsistent with source materialized afterwards. + // This is because doCheckpoint that finalizes the lazy checkpoint is called after the Job + // that triggered the lazy checkpointing finished. + // If blocks were lost during that job, they may still get recomputed and changed compared + // to how they were used during the execution of the job. + if (spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER)) { + // Force the evaluation of the `rdd`, since we cannot access `doCheckpoint()` from here. + rdd + .mapPartitions(_ => Iterator.empty.asInstanceOf[Iterator[InternalRow]]) + .foreach((_: InternalRow) => ()) + assert(rdd.isCheckpointed) + } + logDebug(s"Materializing MERGE with pruned columns $referencedSourceColumns. ") logDebug(s"Materialized MERGE source plan:\n${sourceDF.get.queryExecution}") materializeReason diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 49eef7dbc57..21ce4dd9a70 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -504,6 +504,13 @@ trait DeltaSQLConfBase { .intConf .createWithDefault(4) + val MERGE_MATERIALIZE_SOURCE_EAGER = + buildConf("merge.materializeSource.eager") + .internal() + .doc("Materialize the source eagerly before Job 1") + .booleanConf + .createWithDefault(true) + val DELTA_LAST_COMMIT_VERSION_IN_SESSION = buildConf("lastCommitVersionInSession") .doc("The version of the last commit made in the SparkSession for any table.") diff --git a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala index 453c4156337..f6bf0838cf8 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala @@ -76,33 +76,36 @@ trait MergeIntoMaterializeSourceTests } - test("merge logs out of disk errors") { + for (eager <- BOOLEAN_DOMAIN) + test(s"merge logs out of disk errors - eager=$eager") { { val tblName = "target" withTable(tblName) { - val targetDF = spark.range(10).toDF("id").withColumn("value", rand()) - targetDF.write.format("delta").saveAsTable(tblName) - spark.range(10).mapPartitions { x => - throw new java.io.IOException("No space left on device") - x - }.toDF("id").withColumn("value", rand()).createOrReplaceTempView("s") - val events = Log4jUsageLogger.track { - val ex = intercept[SparkException] { - sql(s"MERGE INTO $tblName t USING s ON t.id = s.id " + - s"WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT *") + withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) { + val targetDF = spark.range(10).toDF("id").withColumn("value", rand()) + targetDF.write.format("delta").saveAsTable(tblName) + spark.range(10).mapPartitions { x => + throw new java.io.IOException("No space left on device") + x + }.toDF("id").withColumn("value", rand()).createOrReplaceTempView("s") + val events = Log4jUsageLogger.track { + val ex = intercept[SparkException] { + sql(s"MERGE INTO $tblName t USING s ON t.id = s.id " + + s"WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT *") + } + }.filter { e => + e.metric == MetricDefinitions.EVENT_TAHOE.name && + e.tags.get("opType").contains(MergeIntoMaterializeSourceError.OP_TYPE) + } + assert(events.length == 1) + val error = JsonUtils.fromJson[MergeIntoMaterializeSourceError](events(0).blob) + assert(error.errorType == MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString) + assert(error.attempt == 1) + val storageLevel = StorageLevel.fromString( + spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL)) + assert(error.materializedSourceRDDStorageLevel == storageLevel.toString) } - }.filter { e => - e.metric == MetricDefinitions.EVENT_TAHOE.name && - e.tags.get("opType").contains(MergeIntoMaterializeSourceError.OP_TYPE) } - assert(events.length == 1) - val error = JsonUtils.fromJson[MergeIntoMaterializeSourceError](events(0).blob) - assert(error.errorType == MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString) - assert(error.attempt == 1) - val storageLevel = StorageLevel.fromString( - spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL)) - assert(error.materializedSourceRDDStorageLevel == storageLevel.toString) - } } } @@ -161,10 +164,11 @@ trait MergeIntoMaterializeSourceTests spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL_RETRY) } ) - if (rdd.getStorageLevel != expectedStorageLevel) { + val rddStorageLevel = rdd.getStorageLevel + if (rddStorageLevel != expectedStorageLevel) { invalidStorageLevel = Some(s"For attempt ${seenSources.size} of materialized source expected " + - s"$expectedStorageLevel but got ${rdd.getStorageLevel}") + s"$expectedStorageLevel but got ${rddStorageLevel}") finished = true } logInfo(s"Unpersisting mergeMaterializedSource with id=$rddId") @@ -209,36 +213,48 @@ trait MergeIntoMaterializeSourceTests val tblName = "target" // For 1 to maxAttempts - 1 RDD block lost failures, merge should retry and succeed. - (1 to maxAttempts - 1).foreach { kills => - test(s"materialize source unpersist with $kills kill attempts succeeds") { + for { + eager <- BOOLEAN_DOMAIN + kills <- 1 to maxAttempts - 1 + } { + test(s"materialize source unpersist with $kills kill attempts succeeds - eager=$eager") { withTable(tblName) { - val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, kills) - val events = allDeltaEvents.filter(_.tags.get("opType").contains("delta.dml.merge.stats")) - assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents") - val mergeStats = JsonUtils.fromJson[MergeStats](events(0).blob) - assert(mergeStats.materializeSourceAttempts.isDefined, - s"MergeStats:\n$mergeStats") - assert(mergeStats.materializeSourceAttempts.get == kills + 1, - s"MergeStats:\n$mergeStats") - - // Check query result after merge - val tab = sql(s"select * from $tblName order by id") - .collect().map(row => row.getLong(0)).toSeq - assert(tab == (0L until 90L) ++ (100L until 120L)) + withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) { + val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, kills) + val events = + allDeltaEvents.filter(_.tags.get("opType").contains("delta.dml.merge.stats")) + assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents") + val mergeStats = JsonUtils.fromJson[MergeStats](events(0).blob) + assert(mergeStats.materializeSourceAttempts.isDefined, s"MergeStats:\n$mergeStats") + assert( + mergeStats.materializeSourceAttempts.get == kills + 1, + s"MergeStats:\n$mergeStats") + + // Check query result after merge + val tab = sql(s"select * from $tblName order by id") + .collect() + .map(row => row.getLong(0)) + .toSeq + assert(tab == (0L until 90L) ++ (100L until 120L)) + } } } } // Eventually it should fail after exceeding maximum number of attempts. - test(s"materialize source unpersist with $maxAttempts kill attempts fails") { - withTable(tblName) { - val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, maxAttempts) - val events = allDeltaEvents - .filter(_.tags.get("opType").contains(MergeIntoMaterializeSourceError.OP_TYPE)) - assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents") - val error = JsonUtils.fromJson[MergeIntoMaterializeSourceError](events(0).blob) - assert(error.errorType == MergeIntoMaterializeSourceErrorType.RDD_BLOCK_LOST.toString) - assert(error.attempt == maxAttempts) + for (eager <- BOOLEAN_DOMAIN) { + test(s"materialize source unpersist with $maxAttempts kill attempts fails - eager=$eager") { + withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) { + withTable(tblName) { + val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, maxAttempts) + val events = allDeltaEvents + .filter(_.tags.get("opType").contains(MergeIntoMaterializeSourceError.OP_TYPE)) + assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents") + val error = JsonUtils.fromJson[MergeIntoMaterializeSourceError](events(0).blob) + assert(error.errorType == MergeIntoMaterializeSourceErrorType.RDD_BLOCK_LOST.toString) + assert(error.attempt == maxAttempts) + } + } } } } @@ -292,14 +308,17 @@ trait MergeIntoMaterializeSourceTests hints } - test("materialize source preserves dataframe hints") { + for (eager <- BOOLEAN_DOMAIN) + test(s"materialize source preserves dataframe hints - eager=$eager") { withTable("A", "B", "T") { sql("select id, id as v from range(50000)").write.format("delta").saveAsTable("T") sql("select id, id+2 as v from range(10000)").write.format("csv").saveAsTable("A") sql("select id, id*2 as v from range(1000)").write.format("csv").saveAsTable("B") // Manually added broadcast hint will mess up the expected hints hence disable it - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { // Simple BROADCAST hint val hSimple = getHints( sql("MERGE INTO T USING (SELECT /*+ BROADCAST */ * FROM A) s ON T.id = s.id" +