Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
richardc-db committed May 20, 2024
1 parent 57df2c0 commit 01c9241
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File

// scalastyle:off import.ordering.noEmptyLine
import com.databricks.spark.util.{Log4jUsageLogger, UsageRecord}
import org.apache.spark.sql.delta.DeltaExcludedBySparkVersionTestMixinShims
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.commands.optimize._
import org.apache.spark.sql.delta.hooks.{AutoCompact, AutoCompactType}
Expand All @@ -32,7 +33,7 @@ import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.Column
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -60,7 +61,8 @@ class AutoCompactSuite extends
CompactionTestHelperForAutoCompaction
with DeltaSQLCommandTest
with SharedSparkSession
with AutoCompactTestUtils {
with AutoCompactTestUtils
with DeltaExcludedBySparkVersionTestMixinShims {

test("auto-compact-type: test table properties") {
withTempDir { tempDir =>
Expand Down Expand Up @@ -184,15 +186,20 @@ class AutoCompactSuite extends
}
}

private def checkAutoCompactionWorks(dir: String): Unit = {
spark.range(10).write.format("delta").mode("append").save(dir)
/**
* Writes `df` twice to the same location and checks that
* 1. There is only one resultant file.
* 2. The result is equal to `df` unioned with itself.
*/
private def checkAutoCompactionWorks(dir: String, df: DataFrame): Unit = {
df.write.format("delta").mode("append").save(dir)
val deltaLog = DeltaLog.forTable(spark, dir)
val newSnapshot = deltaLog.update()
assert(newSnapshot.version === 1) // 0 is the first commit, 1 is optimize
assert(deltaLog.update().numOfFiles === 1)

val isLogged = checkAutoOptimizeLogging {
spark.range(10).write.format("delta").mode("append").save(dir)
df.write.format("delta").mode("append").save(dir)
}

assert(isLogged)
Expand All @@ -202,17 +209,40 @@ class AutoCompactSuite extends

assert(deltaLog.update().numOfFiles === 1, "Files should be optimized into a single one")
checkAnswer(
spark.range(10).union(spark.range(10)).toDF(),
df.union(df).toDF(),
spark.read.format("delta").load(dir)
)
}

testBothModesViaProperty("auto compact should kick in when enabled - table config") { dir =>
checkAutoCompactionWorks(dir)
checkAutoCompactionWorks(dir, spark.range(10).toDF("id"))
}

testBothModesViaConf("auto compact should kick in when enabled - session config") { dir =>
checkAutoCompactionWorks(dir)
checkAutoCompactionWorks(dir, spark.range(10).toDF("id"))
}

testSparkMasterOnly("variant auto compact kicks in when enabled - table config") {
withTempDir { dir =>
withSQLConf(
"spark.databricks.delta.properties.defaults.autoOptimize.autoCompact" -> "true",
DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "0",
DeltaSQLConf.DELTA_AUTO_COMPACT_MODIFIED_PARTITIONS_ONLY_ENABLED.key -> "false") {
checkAutoCompactionWorks(
dir.getCanonicalPath, spark.range(10).selectExpr("parse_json(cast(id as string)) as v"))
}
}
}

testSparkMasterOnly("variant auto compact kicks in when enabled - session config") {
withTempDir { dir =>
withSQLConf(
DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED.key -> "true",
DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "0") {
checkAutoCompactionWorks(
dir.getCanonicalPath, spark.range(10).selectExpr("parse_json(cast(id as string)) as v"))
}
}
}

testBothModesViaProperty("auto compact should not kick in when session config is off") { dir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.delta.deletionvectors
import java.io.{File, FileNotFoundException}
import java.net.URISyntaxException

import org.apache.spark.sql.delta.{DeletionVectorsTableFeature, DeletionVectorsTestUtils, DeltaChecksumException, DeltaConfigs, DeltaLog, DeltaMetricsUtils, DeltaTestUtilsForTempViews}
import org.apache.spark.sql.delta.{DeletionVectorsTableFeature, DeletionVectorsTestUtils, DeltaChecksumException, DeltaConfigs, DeltaExcludedBySparkVersionTestMixinShims, DeltaLog, DeltaMetricsUtils, DeltaTestUtilsForTempViews}
import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
import org.apache.spark.sql.delta.actions.{AddFile, DeletionVectorDescriptor, RemoveFile}
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor.EMPTY
Expand All @@ -46,7 +46,8 @@ class DeletionVectorsSuite extends QueryTest
with DeltaSQLCommandTest
with DeletionVectorsTestUtils
with DeltaTestUtilsForTempViews
with DeltaExceptionTestUtils {
with DeltaExceptionTestUtils
with DeltaExcludedBySparkVersionTestMixinShims {
import testImplicits._

override def beforeAll(): Unit = {
Expand Down Expand Up @@ -270,7 +271,7 @@ class DeletionVectorsSuite extends QueryTest
}
}

Seq("name", "id").foreach(mode =>
Seq("name", "id").foreach { mode =>
test(s"DELETE with DVs with column mapping mode=$mode") {
withSQLConf("spark.databricks.delta.properties.defaults.columnMapping.mode" -> mode) {
withTempDir { dirName =>
Expand All @@ -286,7 +287,26 @@ class DeletionVectorsSuite extends QueryTest
}
}
}
)

testSparkMasterOnly(s"variant types DELETE with DVs with column mapping mode=$mode") {
withSQLConf("spark.databricks.delta.properties.defaults.columnMapping.mode" -> mode) {
withTempDir { dirName =>
val path = dirName.getAbsolutePath
val df = spark.range(0, 50).selectExpr(
"id % 10 as part",
"id",
"parse_json(cast(id as string)) as v"
)
df.write.format("delta").partitionBy("part").save(path)
val tableLog = DeltaLog.forTable(spark, path)
enableDeletionVectorsInTable(tableLog, true)
spark.sql(s"DELETE FROM delta.`$path` WHERE v::int = 2")
checkAnswer(spark.sql(s"select * from delta.`$path` WHERE v::int = 2"), Seq())
verifyDVsExist(tableLog, 1)
}
}
}
}

test("DELETE with DVs - existing table already has DVs") {
withSQLConf(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> "true") {
Expand Down

0 comments on commit 01c9241

Please sign in to comment.