diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala index 01467883054..87f35ff0cc9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala @@ -21,6 +21,7 @@ import java.io.File // scalastyle:off import.ordering.noEmptyLine import com.databricks.spark.util.{Log4jUsageLogger, UsageRecord} +import org.apache.spark.sql.delta.DeltaExcludedBySparkVersionTestMixinShims import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.delta.commands.optimize._ import org.apache.spark.sql.delta.hooks.{AutoCompact, AutoCompactType} @@ -32,7 +33,7 @@ import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.JsonUtils import org.apache.hadoop.fs.Path -import org.apache.spark.sql.Column +import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.test.SharedSparkSession @@ -60,7 +61,8 @@ class AutoCompactSuite extends CompactionTestHelperForAutoCompaction with DeltaSQLCommandTest with SharedSparkSession - with AutoCompactTestUtils { + with AutoCompactTestUtils + with DeltaExcludedBySparkVersionTestMixinShims { test("auto-compact-type: test table properties") { withTempDir { tempDir => @@ -184,15 +186,20 @@ class AutoCompactSuite extends } } - private def checkAutoCompactionWorks(dir: String): Unit = { - spark.range(10).write.format("delta").mode("append").save(dir) + /** + * Writes `df` twice to the same location and checks that + * 1. There is only one resultant file. + * 2. The result is equal to `df` unioned with itself. + */ + private def checkAutoCompactionWorks(dir: String, df: DataFrame): Unit = { + df.write.format("delta").mode("append").save(dir) val deltaLog = DeltaLog.forTable(spark, dir) val newSnapshot = deltaLog.update() assert(newSnapshot.version === 1) // 0 is the first commit, 1 is optimize assert(deltaLog.update().numOfFiles === 1) val isLogged = checkAutoOptimizeLogging { - spark.range(10).write.format("delta").mode("append").save(dir) + df.write.format("delta").mode("append").save(dir) } assert(isLogged) @@ -202,17 +209,40 @@ class AutoCompactSuite extends assert(deltaLog.update().numOfFiles === 1, "Files should be optimized into a single one") checkAnswer( - spark.range(10).union(spark.range(10)).toDF(), + df.union(df).toDF(), spark.read.format("delta").load(dir) ) } testBothModesViaProperty("auto compact should kick in when enabled - table config") { dir => - checkAutoCompactionWorks(dir) + checkAutoCompactionWorks(dir, spark.range(10).toDF("id")) } testBothModesViaConf("auto compact should kick in when enabled - session config") { dir => - checkAutoCompactionWorks(dir) + checkAutoCompactionWorks(dir, spark.range(10).toDF("id")) + } + + testSparkMasterOnly("variant auto compact kicks in when enabled - table config") { + withTempDir { dir => + withSQLConf( + "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact" -> "true", + DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "0", + DeltaSQLConf.DELTA_AUTO_COMPACT_MODIFIED_PARTITIONS_ONLY_ENABLED.key -> "false") { + checkAutoCompactionWorks( + dir.getCanonicalPath, spark.range(10).selectExpr("parse_json(cast(id as string)) as v")) + } + } + } + + testSparkMasterOnly("variant auto compact kicks in when enabled - session config") { + withTempDir { dir => + withSQLConf( + DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED.key -> "true", + DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "0") { + checkAutoCompactionWorks( + dir.getCanonicalPath, spark.range(10).selectExpr("parse_json(cast(id as string)) as v")) + } + } } testBothModesViaProperty("auto compact should not kick in when session config is off") { dir => diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala index 4f31a1761d0..a500cd515a9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.delta.deletionvectors import java.io.{File, FileNotFoundException} import java.net.URISyntaxException -import org.apache.spark.sql.delta.{DeletionVectorsTableFeature, DeletionVectorsTestUtils, DeltaChecksumException, DeltaConfigs, DeltaLog, DeltaMetricsUtils, DeltaTestUtilsForTempViews} +import org.apache.spark.sql.delta.{DeletionVectorsTableFeature, DeletionVectorsTestUtils, DeltaChecksumException, DeltaConfigs, DeltaExcludedBySparkVersionTestMixinShims, DeltaLog, DeltaMetricsUtils, DeltaTestUtilsForTempViews} import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile import org.apache.spark.sql.delta.actions.{AddFile, DeletionVectorDescriptor, RemoveFile} import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor.EMPTY @@ -46,7 +46,8 @@ class DeletionVectorsSuite extends QueryTest with DeltaSQLCommandTest with DeletionVectorsTestUtils with DeltaTestUtilsForTempViews - with DeltaExceptionTestUtils { + with DeltaExceptionTestUtils + with DeltaExcludedBySparkVersionTestMixinShims { import testImplicits._ override def beforeAll(): Unit = { @@ -270,7 +271,7 @@ class DeletionVectorsSuite extends QueryTest } } - Seq("name", "id").foreach(mode => + Seq("name", "id").foreach { mode => test(s"DELETE with DVs with column mapping mode=$mode") { withSQLConf("spark.databricks.delta.properties.defaults.columnMapping.mode" -> mode) { withTempDir { dirName => @@ -286,7 +287,26 @@ class DeletionVectorsSuite extends QueryTest } } } - ) + + testSparkMasterOnly(s"variant types DELETE with DVs with column mapping mode=$mode") { + withSQLConf("spark.databricks.delta.properties.defaults.columnMapping.mode" -> mode) { + withTempDir { dirName => + val path = dirName.getAbsolutePath + val df = spark.range(0, 50).selectExpr( + "id % 10 as part", + "id", + "parse_json(cast(id as string)) as v" + ) + df.write.format("delta").partitionBy("part").save(path) + val tableLog = DeltaLog.forTable(spark, path) + enableDeletionVectorsInTable(tableLog, true) + spark.sql(s"DELETE FROM delta.`$path` WHERE v::int = 2") + checkAnswer(spark.sql(s"select * from delta.`$path` WHERE v::int = 2"), Seq()) + verifyDVsExist(tableLog, 1) + } + } + } + } test("DELETE with DVs - existing table already has DVs") { withSQLConf(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> "true") {