diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala index ab4a9bcd9dbf..5c4077180bb3 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala @@ -29,51 +29,55 @@ import java.sql.Date class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { override protected def sparkConf: SparkConf = { - super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + super.sparkConf + .set("spark.sql.catalog.paimon.cache-enabled", "false") + .set("spark.paimon.write.use-v2-write", "false") } import testImplicits._ test("Paimon Sink: forEachBatch") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - // define a pk table and test `forEachBatch` api - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') - |""".stripMargin) - val location = loadTable("T").location().toString - - val inputData = MemoryStream[(Int, String)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], id: Long) => - batch.write.format("paimon").mode("append").save(location) + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + // define a pk table and test `forEachBatch` api + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], id: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + try { + inputData.addData((1, "a")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Nil) + + inputData.addData((2, "b")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + inputData.addData((2, "b2")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + } finally { + stream.stop() } - .start() - - val query = () => spark.sql("SELECT * FROM T ORDER BY a") - - try { - inputData.addData((1, "a")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Nil) - - inputData.addData((2, "b")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - - inputData.addData((2, "b2")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) - } finally { - stream.stop() - } + } } } } diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala index 3f59e897ec6c..2ee09f749e45 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala @@ -29,107 +29,109 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes import testImplicits._ test("Paimon Procedure: create and delete tag") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - // define a pk table and test `forEachBatch` api - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') - |""".stripMargin) - val location = loadTable("T").location().toString - - val inputData = MemoryStream[(Int, String)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + // define a pk table and test `forEachBatch` api + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + try { + // snapshot-1 + inputData.addData((1, "a")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Nil) + + // snapshot-2 + inputData.addData((2, "b")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // snapshot-3 + inputData.addData((2, "b2")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2)"), + Row(true) :: Nil) + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("test_tag") :: Nil) + // test rename_tag + checkAnswer( + spark.sql( + "CALL paimon.sys.rename_tag(table => 'test.T', tag => 'test_tag', target_tag => 'test_tag_1')"), + Row(true) :: Nil) + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("test_tag_1") :: Nil) + checkAnswer( + spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag_1')"), + Row(true) :: Nil) + checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), + Row(true) :: Nil) + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("test_latestSnapshot_tag") :: Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), + Row(true) :: Nil) + checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) + + // snapshot-4 + inputData.addData((2, "c1")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "c1") :: Nil) + + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 's4')"), + Row(true) :: Nil) + + // snapshot-5 + inputData.addData((3, "c2")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "c1") :: Row(3, "c2") :: Nil) + + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 's5')"), + Row(true) :: Nil) + + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("s4") :: Row("s5") :: Nil) + + checkAnswer( + spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 's4,s5')"), + Row(true) :: Nil) + + checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) + } finally { + stream.stop() } - .start() - - val query = () => spark.sql("SELECT * FROM T ORDER BY a") - - try { - // snapshot-1 - inputData.addData((1, "a")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Nil) - - // snapshot-2 - inputData.addData((2, "b")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - - // snapshot-3 - inputData.addData((2, "b2")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2)"), - Row(true) :: Nil) - checkAnswer( - spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), - Row("test_tag") :: Nil) - // test rename_tag - checkAnswer( - spark.sql( - "CALL paimon.sys.rename_tag(table => 'test.T', tag => 'test_tag', target_tag => 'test_tag_1')"), - Row(true) :: Nil) - checkAnswer( - spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), - Row("test_tag_1") :: Nil) - checkAnswer( - spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag_1')"), - Row(true) :: Nil) - checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), - Row(true) :: Nil) - checkAnswer( - spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), - Row("test_latestSnapshot_tag") :: Nil) - checkAnswer( - spark.sql( - "CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), - Row(true) :: Nil) - checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) - - // snapshot-4 - inputData.addData((2, "c1")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "c1") :: Nil) - - checkAnswer( - spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 's4')"), - Row(true) :: Nil) - - // snapshot-5 - inputData.addData((3, "c2")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "c1") :: Row(3, "c2") :: Nil) - - checkAnswer( - spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 's5')"), - Row(true) :: Nil) - - checkAnswer( - spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), - Row("s4") :: Row("s5") :: Nil) - - checkAnswer( - spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 's4,s5')"), - Row(true) :: Nil) - - checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) - } finally { - stream.stop() - } + } } } } diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala index 605b4cadf6e8..6ba778dde179 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala @@ -29,68 +29,70 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { import testImplicits._ test("Paimon Procedure: rollback to snapshot and tag") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - // define a pk table and test `forEachBatch` api - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') - |""".stripMargin) - val location = loadTable("T").location().toString + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + // define a pk table and test `forEachBatch` api + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) - } - .start() + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() - val table = loadTable("T") - val query = () => spark.sql("SELECT * FROM T ORDER BY a") + val table = loadTable("T") + val query = () => spark.sql("SELECT * FROM T ORDER BY a") - try { - // snapshot-1 - inputData.addData((1, "a")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Nil) + try { + // snapshot-1 + inputData.addData((1, "a")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Nil) - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 1)"), - Row(true) :: Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 1)"), + Row(true) :: Nil) - // snapshot-2 - inputData.addData((2, "b")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + // snapshot-2 + inputData.addData((2, "b")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - // snapshot-3 - inputData.addData((2, "b2")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) - assertThrows[RuntimeException] { - spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')") - } - // rollback to snapshot - checkAnswer( - spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"), - Row(table.latestSnapshot().get().id, 2) :: Nil) - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + // snapshot-3 + inputData.addData((2, "b2")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')") + } + // rollback to snapshot + checkAnswer( + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"), + Row(table.latestSnapshot().get().id, 2) :: Nil) + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - // rollback to tag - checkAnswer( - spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => 'test_tag')"), - Row(table.latestSnapshot().get().id, 1) :: Nil) - checkAnswer(query(), Row(1, "a") :: Nil) - } finally { - stream.stop() - } + // rollback to tag + checkAnswer( + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => 'test_tag')"), + Row(table.latestSnapshot().get().id, 1) :: Nil) + checkAnswer(query(), Row(1, "a") :: Nil) + } finally { + stream.stop() + } + } } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java index d889c1ac66b5..522ac290e9a8 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java @@ -346,7 +346,7 @@ public void testWriteWithClustering(String clusterStrategy) { + "'clustering.columns'='a,b'," + String.format("'clustering.strategy'='%s')", clusterStrategy)); spark.sql("INSERT INTO T VALUES (2, 2), (1, 1), (3, 3)").collectAsList(); - List rows = spark.sql("SELECT * FROM T").collectAsList(); + List rows = spark.sql("SELECT * FROM T order by a").collectAsList(); assertThat(rows.toString()).isEqualTo("[[1,1], [2,2], [3,3]]"); } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala index e103429559ba..8dee9c2b841c 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala @@ -134,83 +134,88 @@ class PaimonCDCSourceTest extends PaimonSparkTestBase with StreamTest { } test("Paimon CDC Source: streaming write and streaming read change-log") { - withTempDirs { - (checkpointDir1, checkpointDir2) => - val tableName = "T" - spark.sql(s"DROP TABLE IF EXISTS $tableName") - spark.sql(s""" - |CREATE TABLE $tableName (a INT, b STRING) - |TBLPROPERTIES ( - | 'primary-key'='a', - | 'bucket'='2', - | 'changelog-producer' = 'lookup') - |""".stripMargin) - - val table = loadTable(tableName) - val location = table.location().toString + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + withTempDirs { + (checkpointDir1, checkpointDir2) => + val tableName = "T" + spark.sql(s"DROP TABLE IF EXISTS $tableName") + spark.sql(s""" + |CREATE TABLE $tableName (a INT, b STRING) + |TBLPROPERTIES ( + | 'primary-key'='a', + | 'bucket'='2', + | 'changelog-producer' = 'lookup') + |""".stripMargin) - // streaming write - val inputData = MemoryStream[(Int, String)] - val writeStream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir1.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) - } - .start() + val table = loadTable(tableName) + val location = table.location().toString - // streaming read - val readStream = spark.readStream - .format("paimon") - .option("read.changelog", "true") - .option("scan.mode", "from-snapshot") - .option("scan.snapshot-id", 1) - .load(location) - .writeStream - .format("memory") - .option("checkpointLocation", checkpointDir2.getCanonicalPath) - .queryName("mem_table") - .outputMode("append") - .start() + // streaming write + val inputData = MemoryStream[(Int, String)] + val writeStream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir1.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() - val currentResult = () => spark.sql("SELECT * FROM mem_table") - try { - inputData.addData((1, "v_1")) - writeStream.processAllAvailable() - readStream.processAllAvailable() - val expertResult1 = Row("+I", 1, "v_1") :: Nil - checkAnswer(currentResult(), expertResult1) + // streaming read + val readStream = spark.readStream + .format("paimon") + .option("read.changelog", "true") + .option("scan.mode", "from-snapshot") + .option("scan.snapshot-id", 1) + .load(location) + .writeStream + .format("memory") + .option("checkpointLocation", checkpointDir2.getCanonicalPath) + .queryName("mem_table") + .outputMode("append") + .start() - inputData.addData((2, "v_2")) - writeStream.processAllAvailable() - readStream.processAllAvailable() - val expertResult2 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Nil - checkAnswer(currentResult(), expertResult2) + val currentResult = () => spark.sql("SELECT * FROM mem_table") + try { + inputData.addData((1, "v_1")) + writeStream.processAllAvailable() + readStream.processAllAvailable() + val expertResult1 = Row("+I", 1, "v_1") :: Nil + checkAnswer(currentResult(), expertResult1) - inputData.addData((2, "v_2_new")) - writeStream.processAllAvailable() - readStream.processAllAvailable() - val expertResult3 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Row( - "-U", - 2, - "v_2") :: Row("+U", 2, "v_2_new") :: Nil - checkAnswer(currentResult(), expertResult3) + inputData.addData((2, "v_2")) + writeStream.processAllAvailable() + readStream.processAllAvailable() + val expertResult2 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Nil + checkAnswer(currentResult(), expertResult2) - inputData.addData((1, "v_1_new"), (3, "v_3")) - writeStream.processAllAvailable() - readStream.processAllAvailable() - val expertResult4 = - Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, "v_1_new") :: Row( - "+I", + inputData.addData((2, "v_2_new")) + writeStream.processAllAvailable() + readStream.processAllAvailable() + val expertResult3 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Row( + "-U", 2, - "v_2") :: Row("-U", 2, "v_2") :: Row("+U", 2, "v_2_new") :: Row("+I", 3, "v_3") :: Nil - checkAnswer(currentResult(), expertResult4) - } finally { - readStream.stop() - } + "v_2") :: Row("+U", 2, "v_2_new") :: Nil + checkAnswer(currentResult(), expertResult3) + + inputData.addData((1, "v_1_new"), (3, "v_3")) + writeStream.processAllAvailable() + readStream.processAllAvailable() + val expertResult4 = + Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, "v_1_new") :: Row( + "+I", + 2, + "v_2") :: Row("-U", 2, "v_2") :: Row("+U", 2, "v_2_new") :: Row( + "+I", + 3, + "v_3") :: Nil + checkAnswer(currentResult(), expertResult4) + } finally { + readStream.stop() + } + } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala index 8b1958d7de38..b60b9a04978f 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala @@ -30,30 +30,32 @@ import java.util.List class PaimonCommitTest extends PaimonSparkTestBase { test("test commit callback parameter compatibility") { - withTable("tb") { - spark.sql(""" - |CREATE TABLE tb (id int, dt string) using paimon - |TBLPROPERTIES ('file.format'='parquet', 'primary-key'='id', 'bucket'='1') - |""".stripMargin) + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + withTable("tb") { + spark.sql(""" + |CREATE TABLE tb (id int, dt string) using paimon + |TBLPROPERTIES ('file.format'='parquet', 'primary-key'='id', 'bucket'='1') + |""".stripMargin) - val table = loadTable("tb") - val location = table.location().toString + val table = loadTable("tb") + val location = table.location().toString - val _spark = spark - import _spark.implicits._ - val df = Seq((1, "a"), (2, "b")).toDF("a", "b") - df.write - .format("paimon") - .option(CoreOptions.COMMIT_CALLBACKS.key(), classOf[CustomCommitCallback].getName) - .option( - CoreOptions.COMMIT_CALLBACK_PARAM - .key() - .replace("#", classOf[CustomCommitCallback].getName), - "testid-100") - .mode("append") - .save(location) + val _spark = spark + import _spark.implicits._ + val df = Seq((1, "a"), (2, "b")).toDF("a", "b") + df.write + .format("paimon") + .option(CoreOptions.COMMIT_CALLBACKS.key(), classOf[CustomCommitCallback].getName) + .option( + CoreOptions.COMMIT_CALLBACK_PARAM + .key() + .replace("#", classOf[CustomCommitCallback].getName), + "testid-100") + .mode("append") + .save(location) - Assertions.assertEquals(PaimonCommitTest.id, "testid-100") + Assertions.assertEquals(PaimonCommitTest.id, "testid-100") + } } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala index 61bf5524942d..c46bb46cd1f0 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala @@ -29,51 +29,55 @@ import java.sql.Date class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { override protected def sparkConf: SparkConf = { - super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + super.sparkConf + .set("spark.sql.catalog.paimon.cache-enabled", "false") + .set("spark.paimon.write.use-v2-write", "false") } import testImplicits._ test("Paimon Sink: forEachBatch") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - // define a change-log table and test `forEachBatch` api - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') - |""".stripMargin) - val location = loadTable("T").location().toString - - val inputData = MemoryStream[(Int, String)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], id: Long) => - batch.write.format("paimon").mode("append").save(location) + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + // define a change-log table and test `forEachBatch` api + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], id: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + try { + inputData.addData((1, "a")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Nil) + + inputData.addData((2, "b")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + inputData.addData((2, "b2")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + } finally { + stream.stop() } - .start() - - val query = () => spark.sql("SELECT * FROM T ORDER BY a") - - try { - inputData.addData((1, "a")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Nil) - - inputData.addData((2, "b")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - - inputData.addData((2, "b2")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) - } finally { - stream.stop() - } + } } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala index 58cf9868dc80..4ccc96ad8b0f 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala @@ -20,6 +20,7 @@ package org.apache.paimon.spark import org.apache.paimon.spark.sources.PaimonSourceOffset +import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest, Trigger} import org.junit.jupiter.api.Assertions @@ -28,6 +29,11 @@ import java.util.concurrent.TimeUnit class PaimonSourceTest extends PaimonSparkTestBase with StreamTest { + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.paimon.write.use-v2-write", "false") + } + import testImplicits._ test("Paimon Source: default scan mode") { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala index 316c36c40c56..5bd0036e3a29 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala @@ -28,70 +28,73 @@ class AlterBranchProcedureTest extends PaimonSparkTestBase with StreamTest { import testImplicits._ test("Paimon Procedure: alter schema structure and test $branch syntax.") { - withTempDir { - checkpointDir => - // define a change-log table and test `forEachBatch` api - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') - |""".stripMargin) - val location = loadTable("T").location().toString + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + withTempDir { + checkpointDir => + // define a change-log table and test `forEachBatch` api + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString - val inputData = MemoryStream[(Int, String)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) - } - .start() + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() - val query = () => spark.sql("SELECT * FROM T ORDER BY a") - try { - // snapshot-1 - inputData.addData((1, "a")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Nil) + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + try { + // snapshot-1 + inputData.addData((1, "a")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Nil) - // snapshot-2 - inputData.addData((2, "b")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + // snapshot-2 + inputData.addData((2, "b")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - // snapshot-3 - inputData.addData((2, "b2")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + // snapshot-3 + inputData.addData((2, "b2")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) - val table = loadTable("T") - val branchManager = table.branchManager() + val table = loadTable("T") + val branchManager = table.branchManager() - // create branch with tag - checkAnswer( - spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 's_2', snapshot => 2)"), - Row(true) :: Nil) - checkAnswer( - spark.sql( - "CALL paimon.sys.create_branch(table => 'test.T', branch => 'snapshot_branch', tag => 's_2')"), - Row(true) :: Nil) - assert(branchManager.branchExists("snapshot_branch")) + // create branch with tag + checkAnswer( + spark.sql( + "CALL paimon.sys.create_tag(table => 'test.T', tag => 's_2', snapshot => 2)"), + Row(true) :: Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.create_branch(table => 'test.T', branch => 'snapshot_branch', tag => 's_2')"), + Row(true) :: Nil) + assert(branchManager.branchExists("snapshot_branch")) - spark.sql("INSERT INTO T VALUES (1, 'APPLE'), (2,'DOG'), (2, 'horse')") - spark.sql("ALTER TABLE `T$branch_snapshot_branch` ADD COLUMNS(c INT)") - spark.sql( - "INSERT INTO `T$branch_snapshot_branch` VALUES " + "(1,'cherry', 100), (2,'bird', 200), (3, 'wolf', 400)") + spark.sql("INSERT INTO T VALUES (1, 'APPLE'), (2,'DOG'), (2, 'horse')") + spark.sql("ALTER TABLE `T$branch_snapshot_branch` ADD COLUMNS(c INT)") + spark.sql( + "INSERT INTO `T$branch_snapshot_branch` VALUES " + "(1,'cherry', 100), (2,'bird', 200), (3, 'wolf', 400)") - checkAnswer( - spark.sql("SELECT * FROM T ORDER BY a, b"), - Row(1, "APPLE") :: Row(2, "horse") :: Nil) - checkAnswer( - spark.sql("SELECT * FROM `T$branch_snapshot_branch` ORDER BY a, b,c"), - Row(1, "cherry", 100) :: Row(2, "bird", 200) :: Row(3, "wolf", 400) :: Nil) - assert(branchManager.branchExists("snapshot_branch")) - } + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY a, b"), + Row(1, "APPLE") :: Row(2, "horse") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM `T$branch_snapshot_branch` ORDER BY a, b,c"), + Row(1, "cherry", 100) :: Row(2, "bird", 200) :: Row(3, "wolf", 400) :: Nil) + assert(branchManager.branchExists("snapshot_branch")) + } + } } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala index 735806b5a6f3..8715e08bf5d7 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala @@ -28,120 +28,122 @@ class BranchProcedureTest extends PaimonSparkTestBase with StreamTest { import testImplicits._ test("Paimon Procedure: create, query, write and delete branch") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - // define a change-log table and test `forEachBatch` api - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') - |""".stripMargin) - val location = loadTable("T").location().toString - - val inputData = MemoryStream[(Int, String)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + // define a change-log table and test `forEachBatch` api + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + try { + // snapshot-1 + inputData.addData((1, "a")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Nil) + + // snapshot-2 + inputData.addData((2, "b")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // snapshot-3 + inputData.addData((2, "b2")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + + // create tags + checkAnswer( + spark.sql( + "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2)"), + Row(true) :: Nil) + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("test_tag") :: Nil) + + // create branch with tag + checkAnswer( + spark.sql( + "CALL paimon.sys.create_branch(table => 'test.T', branch => 'test_branch', tag => 'test_tag')"), + Row(true) :: Nil) + val table = loadTable("T") + val branchManager = table.branchManager() + assert(branchManager.branchExists("test_branch")) + + // query from branch + checkAnswer( + spark.sql("SELECT * FROM `T$branch_test_branch` ORDER BY a"), + Row(1, "a") :: Row(2, "b") :: Nil + ) + checkAnswer( + spark.read.format("paimon").option("branch", "test_branch").table("T").orderBy("a"), + Row(1, "a") :: Row(2, "b") :: Nil + ) + + // update branch + spark.sql("INSERT INTO `T$branch_test_branch` VALUES (3, 'c')") + checkAnswer( + spark.sql("SELECT * FROM `T$branch_test_branch` ORDER BY a"), + Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil + ) + // create tags + checkAnswer( + spark.sql( + "CALL paimon.sys.create_tag(table => 'test.`T$branch_test_branch`', tag => 'test_tag2', snapshot => 3)"), + Row(true) :: Nil) + + // create branch from another branch. + checkAnswer( + spark.sql( + "CALL paimon.sys.create_branch(table => 'test.`T$branch_test_branch`', branch => 'test_branch2', tag => 'test_tag2')"), + Row(true) :: Nil) + checkAnswer( + spark.sql("SELECT * FROM `T$branch_test_branch2` ORDER BY a"), + Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil + ) + + // create empty branch + checkAnswer( + spark.sql( + "CALL paimon.sys.create_branch(table => 'test.T', branch => 'empty_branch')"), + Row(true) :: Nil) + assert(branchManager.branchExists("empty_branch")) + checkAnswer( + spark.sql("SELECT * FROM `T$branch_empty_branch` ORDER BY a"), + Nil + ) + + // delete branch + checkAnswer( + spark.sql( + "CALL paimon.sys.delete_branch(table => 'test.T', branch => 'test_branch')"), + Row(true) :: Nil) + assert(!branchManager.branchExists("test_branch")) + intercept[Exception] { + spark.sql("SELECT * FROM `T$branch_test_branch` ORDER BY a") + } + + } finally { + stream.stop() } - .start() - - val query = () => spark.sql("SELECT * FROM T ORDER BY a") - - try { - // snapshot-1 - inputData.addData((1, "a")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Nil) - - // snapshot-2 - inputData.addData((2, "b")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - - // snapshot-3 - inputData.addData((2, "b2")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) - - // create tags - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2)"), - Row(true) :: Nil) - checkAnswer( - spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), - Row("test_tag") :: Nil) - - // create branch with tag - checkAnswer( - spark.sql( - "CALL paimon.sys.create_branch(table => 'test.T', branch => 'test_branch', tag => 'test_tag')"), - Row(true) :: Nil) - val table = loadTable("T") - val branchManager = table.branchManager() - assert(branchManager.branchExists("test_branch")) - - // query from branch - checkAnswer( - spark.sql("SELECT * FROM `T$branch_test_branch` ORDER BY a"), - Row(1, "a") :: Row(2, "b") :: Nil - ) - checkAnswer( - spark.read.format("paimon").option("branch", "test_branch").table("T").orderBy("a"), - Row(1, "a") :: Row(2, "b") :: Nil - ) - - // update branch - spark.sql("INSERT INTO `T$branch_test_branch` VALUES (3, 'c')") - checkAnswer( - spark.sql("SELECT * FROM `T$branch_test_branch` ORDER BY a"), - Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil - ) - // create tags - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(table => 'test.`T$branch_test_branch`', tag => 'test_tag2', snapshot => 3)"), - Row(true) :: Nil) - - // create branch from another branch. - checkAnswer( - spark.sql( - "CALL paimon.sys.create_branch(table => 'test.`T$branch_test_branch`', branch => 'test_branch2', tag => 'test_tag2')"), - Row(true) :: Nil) - checkAnswer( - spark.sql("SELECT * FROM `T$branch_test_branch2` ORDER BY a"), - Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil - ) - - // create empty branch - checkAnswer( - spark.sql( - "CALL paimon.sys.create_branch(table => 'test.T', branch => 'empty_branch')"), - Row(true) :: Nil) - assert(branchManager.branchExists("empty_branch")) - checkAnswer( - spark.sql("SELECT * FROM `T$branch_empty_branch` ORDER BY a"), - Nil - ) - - // delete branch - checkAnswer( - spark.sql( - "CALL paimon.sys.delete_branch(table => 'test.T', branch => 'test_branch')"), - Row(true) :: Nil) - assert(!branchManager.branchExists("test_branch")) - intercept[Exception] { - spark.sql("SELECT * FROM `T$branch_test_branch` ORDER BY a") - } - - } finally { - stream.stop() - } + } } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala index 8d1b35cc1202..b14e541d5b73 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala @@ -90,213 +90,219 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT // ----------------------- Sort Compact ----------------------- test("Paimon Procedure: sort compact") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - spark.sql(s""" - |CREATE TABLE T (a INT, b INT) - |TBLPROPERTIES ('bucket'='-1') - |""".stripMargin) - val location = loadTable("T").location().toString - - val inputData = MemoryStream[(Int, Int)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) - } - .start() - - val query = () => spark.sql("SELECT * FROM T") - - try { - // test zorder sort - inputData.addData((0, 0)) - inputData.addData((0, 1)) - inputData.addData((0, 2)) - inputData.addData((1, 0)) - inputData.addData((1, 1)) - inputData.addData((1, 2)) - inputData.addData((2, 0)) - inputData.addData((2, 1)) - inputData.addData((2, 2)) - stream.processAllAvailable() - - val result = new util.ArrayList[Row]() - for (a <- 0 until 3) { - for (b <- 0 until 3) { - result.add(Row(a, b)) + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql(s""" + |CREATE TABLE T (a INT, b INT) + |TBLPROPERTIES ('bucket'='-1') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, Int)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T") + + try { + // test zorder sort + inputData.addData((0, 0)) + inputData.addData((0, 1)) + inputData.addData((0, 2)) + inputData.addData((1, 0)) + inputData.addData((1, 1)) + inputData.addData((1, 2)) + inputData.addData((2, 0)) + inputData.addData((2, 1)) + inputData.addData((2, 2)) + stream.processAllAvailable() + + val result = new util.ArrayList[Row]() + for (a <- 0 until 3) { + for (b <- 0 until 3) { + result.add(Row(a, b)) + } } + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + + checkAnswer( + spark.sql( + "CALL paimon.sys.compact(table => 'T', order_strategy => 'zorder', order_by => 'a,b')"), + Row(true) :: Nil) + + val result2 = new util.ArrayList[Row]() + result2.add(0, Row(0, 0)) + result2.add(1, Row(0, 1)) + result2.add(2, Row(1, 0)) + result2.add(3, Row(1, 1)) + result2.add(4, Row(0, 2)) + result2.add(5, Row(1, 2)) + result2.add(6, Row(2, 0)) + result2.add(7, Row(2, 1)) + result2.add(8, Row(2, 2)) + + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2) + + // test hilbert sort + val result3 = new util.ArrayList[Row]() + result3.add(0, Row(0, 0)) + result3.add(1, Row(0, 1)) + result3.add(2, Row(1, 1)) + result3.add(3, Row(1, 0)) + result3.add(4, Row(2, 0)) + result3.add(5, Row(2, 1)) + result3.add(6, Row(2, 2)) + result3.add(7, Row(1, 2)) + result3.add(8, Row(0, 2)) + + checkAnswer( + spark.sql( + "CALL paimon.sys.compact(table => 'T', order_strategy => 'hilbert', order_by => 'a,b')"), + Row(true) :: Nil) + + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3) + + // test order sort + checkAnswer( + spark.sql( + "CALL paimon.sys.compact(table => 'T', order_strategy => 'order', order_by => 'a,b')"), + Row(true) :: Nil) + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + } finally { + stream.stop() } - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) - - checkAnswer( - spark.sql( - "CALL paimon.sys.compact(table => 'T', order_strategy => 'zorder', order_by => 'a,b')"), - Row(true) :: Nil) - - val result2 = new util.ArrayList[Row]() - result2.add(0, Row(0, 0)) - result2.add(1, Row(0, 1)) - result2.add(2, Row(1, 0)) - result2.add(3, Row(1, 1)) - result2.add(4, Row(0, 2)) - result2.add(5, Row(1, 2)) - result2.add(6, Row(2, 0)) - result2.add(7, Row(2, 1)) - result2.add(8, Row(2, 2)) - - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2) - - // test hilbert sort - val result3 = new util.ArrayList[Row]() - result3.add(0, Row(0, 0)) - result3.add(1, Row(0, 1)) - result3.add(2, Row(1, 1)) - result3.add(3, Row(1, 0)) - result3.add(4, Row(2, 0)) - result3.add(5, Row(2, 1)) - result3.add(6, Row(2, 2)) - result3.add(7, Row(1, 2)) - result3.add(8, Row(0, 2)) - - checkAnswer( - spark.sql( - "CALL paimon.sys.compact(table => 'T', order_strategy => 'hilbert', order_by => 'a,b')"), - Row(true) :: Nil) - - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3) - - // test order sort - checkAnswer( - spark.sql( - "CALL paimon.sys.compact(table => 'T', order_strategy => 'order', order_by => 'a,b')"), - Row(true) :: Nil) - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) - } finally { - stream.stop() - } + } } } } test("Paimon Procedure: sort compact with partition") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - spark.sql(s""" - |CREATE TABLE T (p INT, a INT, b INT) - |TBLPROPERTIES ('bucket'='-1') - |PARTITIONED BY (p) - |""".stripMargin) - val location = loadTable("T").location().toString - - val inputData = MemoryStream[(Int, Int, Int)] - val stream = inputData - .toDS() - .toDF("p", "a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) - } - .start() - - val query0 = () => spark.sql("SELECT * FROM T WHERE p=0") - val query1 = () => spark.sql("SELECT * FROM T WHERE p=1") - - try { - // test zorder sort - inputData.addData((0, 0, 0)) - inputData.addData((0, 0, 1)) - inputData.addData((0, 0, 2)) - inputData.addData((0, 1, 0)) - inputData.addData((0, 1, 1)) - inputData.addData((0, 1, 2)) - inputData.addData((0, 2, 0)) - inputData.addData((0, 2, 1)) - inputData.addData((0, 2, 2)) - - inputData.addData((1, 0, 0)) - inputData.addData((1, 0, 1)) - inputData.addData((1, 0, 2)) - inputData.addData((1, 1, 0)) - inputData.addData((1, 1, 1)) - inputData.addData((1, 1, 2)) - inputData.addData((1, 2, 0)) - inputData.addData((1, 2, 1)) - inputData.addData((1, 2, 2)) - stream.processAllAvailable() - - val result0 = new util.ArrayList[Row]() - for (a <- 0 until 3) { - for (b <- 0 until 3) { - result0.add(Row(0, a, b)) + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql(s""" + |CREATE TABLE T (p INT, a INT, b INT) + |TBLPROPERTIES ('bucket'='-1') + |PARTITIONED BY (p) + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, Int, Int)] + val stream = inputData + .toDS() + .toDF("p", "a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) } - } - val result1 = new util.ArrayList[Row]() - for (a <- 0 until 3) { - for (b <- 0 until 3) { - result1.add(Row(1, a, b)) + .start() + + val query0 = () => spark.sql("SELECT * FROM T WHERE p=0") + val query1 = () => spark.sql("SELECT * FROM T WHERE p=1") + + try { + // test zorder sort + inputData.addData((0, 0, 0)) + inputData.addData((0, 0, 1)) + inputData.addData((0, 0, 2)) + inputData.addData((0, 1, 0)) + inputData.addData((0, 1, 1)) + inputData.addData((0, 1, 2)) + inputData.addData((0, 2, 0)) + inputData.addData((0, 2, 1)) + inputData.addData((0, 2, 2)) + + inputData.addData((1, 0, 0)) + inputData.addData((1, 0, 1)) + inputData.addData((1, 0, 2)) + inputData.addData((1, 1, 0)) + inputData.addData((1, 1, 1)) + inputData.addData((1, 1, 2)) + inputData.addData((1, 2, 0)) + inputData.addData((1, 2, 1)) + inputData.addData((1, 2, 2)) + stream.processAllAvailable() + + val result0 = new util.ArrayList[Row]() + for (a <- 0 until 3) { + for (b <- 0 until 3) { + result0.add(Row(0, a, b)) + } + } + val result1 = new util.ArrayList[Row]() + for (a <- 0 until 3) { + for (b <- 0 until 3) { + result1.add(Row(1, a, b)) + } } + Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result0) + Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1) + + checkAnswer( + spark.sql( + "CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'zorder', order_by => 'a,b')"), + Row(true) :: Nil) + + val result2 = new util.ArrayList[Row]() + result2.add(0, Row(0, 0, 0)) + result2.add(1, Row(0, 0, 1)) + result2.add(2, Row(0, 1, 0)) + result2.add(3, Row(0, 1, 1)) + result2.add(4, Row(0, 0, 2)) + result2.add(5, Row(0, 1, 2)) + result2.add(6, Row(0, 2, 0)) + result2.add(7, Row(0, 2, 1)) + result2.add(8, Row(0, 2, 2)) + + Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result2) + Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1) + + // test hilbert sort + val result3 = new util.ArrayList[Row]() + result3.add(0, Row(0, 0, 0)) + result3.add(1, Row(0, 0, 1)) + result3.add(2, Row(0, 1, 1)) + result3.add(3, Row(0, 1, 0)) + result3.add(4, Row(0, 2, 0)) + result3.add(5, Row(0, 2, 1)) + result3.add(6, Row(0, 2, 2)) + result3.add(7, Row(0, 1, 2)) + result3.add(8, Row(0, 0, 2)) + + checkAnswer( + spark.sql( + "CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'hilbert', order_by => 'a,b')"), + Row(true) :: Nil) + + Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result3) + Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1) + + // test order sort + checkAnswer( + spark.sql( + "CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'order', order_by => 'a,b')"), + Row(true) :: Nil) + Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result0) + Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1) + } finally { + stream.stop() } - Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result0) - Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1) - - checkAnswer( - spark.sql( - "CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'zorder', order_by => 'a,b')"), - Row(true) :: Nil) - - val result2 = new util.ArrayList[Row]() - result2.add(0, Row(0, 0, 0)) - result2.add(1, Row(0, 0, 1)) - result2.add(2, Row(0, 1, 0)) - result2.add(3, Row(0, 1, 1)) - result2.add(4, Row(0, 0, 2)) - result2.add(5, Row(0, 1, 2)) - result2.add(6, Row(0, 2, 0)) - result2.add(7, Row(0, 2, 1)) - result2.add(8, Row(0, 2, 2)) - - Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result2) - Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1) - - // test hilbert sort - val result3 = new util.ArrayList[Row]() - result3.add(0, Row(0, 0, 0)) - result3.add(1, Row(0, 0, 1)) - result3.add(2, Row(0, 1, 1)) - result3.add(3, Row(0, 1, 0)) - result3.add(4, Row(0, 2, 0)) - result3.add(5, Row(0, 2, 1)) - result3.add(6, Row(0, 2, 2)) - result3.add(7, Row(0, 1, 2)) - result3.add(8, Row(0, 0, 2)) - - checkAnswer( - spark.sql( - "CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'hilbert', order_by => 'a,b')"), - Row(true) :: Nil) - - Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result3) - Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1) - - // test order sort - checkAnswer( - spark.sql( - "CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'order', order_by => 'a,b')"), - Row(true) :: Nil) - Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result0) - Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1) - } finally { - stream.stop() - } + } } } } @@ -343,53 +349,55 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT } test("Paimon Procedure: compact for pk") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - spark.sql(s""" - |CREATE TABLE T (a INT, b INT) - |TBLPROPERTIES ('primary-key'='a,b', 'bucket'='1') - |""".stripMargin) - val location = loadTable("T").location().toString - - val inputData = MemoryStream[(Int, Int)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) - } - .start() - - val query = () => spark.sql("SELECT * FROM T") - - try { - inputData.addData((0, 0)) - inputData.addData((0, 1)) - inputData.addData((0, 2)) - inputData.addData((1, 0)) - inputData.addData((1, 1)) - inputData.addData((1, 2)) - inputData.addData((2, 0)) - inputData.addData((2, 1)) - inputData.addData((2, 2)) - stream.processAllAvailable() - - val result = new util.ArrayList[Row]() - for (a <- 0 until 3) { - for (b <- 0 until 3) { - result.add(Row(a, b)) + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql(s""" + |CREATE TABLE T (a INT, b INT) + |TBLPROPERTIES ('primary-key'='a,b', 'bucket'='1') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, Int)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T") + + try { + inputData.addData((0, 0)) + inputData.addData((0, 1)) + inputData.addData((0, 2)) + inputData.addData((1, 0)) + inputData.addData((1, 1)) + inputData.addData((1, 2)) + inputData.addData((2, 0)) + inputData.addData((2, 1)) + inputData.addData((2, 2)) + stream.processAllAvailable() + + val result = new util.ArrayList[Row]() + for (a <- 0 until 3) { + for (b <- 0 until 3) { + result.add(Row(a, b)) + } } + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) + Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + } finally { + stream.stop() } - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) - checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) - } finally { - stream.stop() - } + } } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala index 4a4c7ae215df..926314e57dae 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala @@ -29,161 +29,162 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes import testImplicits._ test("Paimon Procedure: create and delete tag") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - // define a change-log table and test `forEachBatch` api - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') - |""".stripMargin) - val location = loadTable("T").location().toString - - val inputData = MemoryStream[(Int, String)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) - } - .start() - - val query = () => spark.sql("SELECT * FROM T ORDER BY a") - - try { - // snapshot-1 - inputData.addData((1, "a")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Nil) - - // snapshot-2 - inputData.addData((2, "b")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - - // snapshot-3 - inputData.addData((2, "b2")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(" + + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + // define a change-log table and test `forEachBatch` api + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + try { + // snapshot-1 + inputData.addData((1, "a")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Nil) + + // snapshot-2 + inputData.addData((2, "b")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // snapshot-3 + inputData.addData((2, "b2")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(" + "table => 'test.T', tag => 'test_tag', time_retained => '5 d', snapshot => 2)"), - Row(true) :: Nil) - checkAnswer( - spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), - Row("test_tag") :: Nil) - checkAnswer( - spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag')"), - Row(true) :: Nil) - checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), - Row(true) :: Nil) - checkAnswer( - spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), - Row("test_latestSnapshot_tag") :: Nil) - checkAnswer( - spark.sql( - "CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), - Row(true) :: Nil) - checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) - - // create test_tag_1 and test_tag_2 - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(" + + Row(true) :: Nil) + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("test_tag") :: Nil) + checkAnswer( + spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag')"), + Row(true) :: Nil) + checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), + Row(true) :: Nil) + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("test_latestSnapshot_tag") :: Nil) + checkAnswer( + spark.sql( + "CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"), + Row(true) :: Nil) + checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) + + // create test_tag_1 and test_tag_2 + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(" + "table => 'test.T', tag => 'test_tag_1', snapshot => 1)"), - Row(true) :: Nil) + Row(true) :: Nil) - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(" + + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(" + "table => 'test.T', tag => 'test_tag_2', snapshot => 2)"), - Row(true) :: Nil) - - checkAnswer( - spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), - Row("test_tag_1") :: Row("test_tag_2") :: Nil) - - // test rename_tag - checkAnswer( - spark.sql( - "CALL paimon.sys.rename_tag(table => 'test.T', tag => 'test_tag_1', target_tag => 'test_tag_3')"), - Row(true) :: Nil - ) - checkAnswer( - spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), - Row("test_tag_2") :: Row("test_tag_3") :: Nil) - - // delete test_tag_2 and test_tag_3 - checkAnswer( - spark.sql( - "CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag_2,test_tag_3')"), - Row(true) :: Nil) - - checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) - - } finally { - stream.stop() - } + Row(true) :: Nil) + + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("test_tag_1") :: Row("test_tag_2") :: Nil) + + // test rename_tag + checkAnswer( + spark.sql( + "CALL paimon.sys.rename_tag(table => 'test.T', tag => 'test_tag_1', target_tag => 'test_tag_3')"), + Row(true) :: Nil + ) + checkAnswer( + spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), + Row("test_tag_2") :: Row("test_tag_3") :: Nil) + + // delete test_tag_2 and test_tag_3 + checkAnswer( + spark.sql( + "CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag_2,test_tag_3')"), + Row(true) :: Nil) + + checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil) + + } finally { + stream.stop() + } + } } } } test("Paimon Procedure: create same tag with same snapshot") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - // define a change-log table and test `forEachBatch` api - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') - |""".stripMargin) - val location = loadTable("T").location().toString - - val inputData = MemoryStream[(Int, String)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) - } - .start() - - val query = () => spark.sql("SELECT * FROM T ORDER BY a") - - try { - // snapshot-1 - inputData.addData((1, "a")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Nil) - - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(" + + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + // define a change-log table and test `forEachBatch` api + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + try { + // snapshot-1 + inputData.addData((1, "a")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Nil) + + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(" + "table => 'test.T', tag => 'test_tag', snapshot => 1)"), - Row(true) :: Nil) - checkAnswer( - spark.sql("SELECT count(*) FROM paimon.test.`T$tags` where tag_name = 'test_tag'"), - Row(1) :: Nil) - - // throw exception "Tag test_tag already exists" - assertThrows[IllegalArgumentException] { - spark.sql( - "CALL paimon.sys.create_tag(" + + Row(true) :: Nil) + checkAnswer( + spark.sql("SELECT count(*) FROM paimon.test.`T$tags` where tag_name = 'test_tag'"), + Row(1) :: Nil) + + // throw exception "Tag test_tag already exists" + assertThrows[IllegalArgumentException] { + spark.sql("CALL paimon.sys.create_tag(" + "table => 'test.T', tag => 'test_tag', time_retained => '5 d', snapshot => 1)") + } + } finally { + stream.stop() } - } finally { - stream.stop() - } + } } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala index e9b00298e492..f8b55e2856e0 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala @@ -30,151 +30,155 @@ class CreateTagFromTimestampProcedureTest extends PaimonSparkTestBase with Strea import testImplicits._ test("Paimon Procedure: Create tags from snapshots commit-time ") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') - |""".stripMargin) - val location = loadTable("T").location().toString - - val inputData = MemoryStream[(Int, String)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + try { + + for (i <- 1 to 4) { + inputData.addData((i, "a")) + stream.processAllAvailable() + Thread.sleep(500L) + } + + val table = loadTable("T") + val earliestCommitTime = table.snapshotManager.earliestSnapshot.timeMillis + val commitTime3 = table.snapshotManager.snapshot(3).timeMillis + val commitTime4 = table.snapshotManager.snapshot(4).timeMillis + + // create tag from timestamp that earlier than the earliest snapshot commit time. + checkAnswer( + spark.sql(s"""CALL paimon.sys.create_tag_from_timestamp( + |table => 'test.T', + | tag => 'test_tag', + | timestamp => ${earliestCommitTime - 1})""".stripMargin), + Row("test_tag", 1, earliestCommitTime, "null") :: Nil + ) + + // create tag from timestamp that equals to snapshot-3 commit time. + checkAnswer( + spark.sql(s"""CALL paimon.sys.create_tag_from_timestamp( + |table => 'test.T', + | tag => 'test_tag2', + | timestamp => $commitTime3)""".stripMargin), + Row("test_tag2", 3, commitTime3, "null") :: Nil + ) + + // create tag from timestamp that later than snapshot-3 commit time. + checkAnswer( + spark.sql(s"""CALL paimon.sys.create_tag_from_timestamp( + |table => 'test.T', + |tag => 'test_tag3', + |timestamp => ${commitTime3 + 1})""".stripMargin), + Row("test_tag3", 4, commitTime4, "null") :: Nil + ) + + // create tag from timestamp that later than the latest snapshot commit time and throw SnapshotNotExistException. + assertThrows[SnapshotNotExistException] { + spark.sql(s"""CALL paimon.sys.create_tag_from_timestamp( + |table => 'test.T', + |tag => 'test_tag3', + |timestamp => ${Long.MaxValue})""".stripMargin) + } + + } finally { + stream.stop() } - .start() - - try { - - for (i <- 1 to 4) { - inputData.addData((i, "a")) - stream.processAllAvailable() - Thread.sleep(500L) - } - - val table = loadTable("T") - val earliestCommitTime = table.snapshotManager.earliestSnapshot.timeMillis - val commitTime3 = table.snapshotManager.snapshot(3).timeMillis - val commitTime4 = table.snapshotManager.snapshot(4).timeMillis - - // create tag from timestamp that earlier than the earliest snapshot commit time. - checkAnswer( - spark.sql(s"""CALL paimon.sys.create_tag_from_timestamp( - |table => 'test.T', - | tag => 'test_tag', - | timestamp => ${earliestCommitTime - 1})""".stripMargin), - Row("test_tag", 1, earliestCommitTime, "null") :: Nil - ) - - // create tag from timestamp that equals to snapshot-3 commit time. - checkAnswer( - spark.sql(s"""CALL paimon.sys.create_tag_from_timestamp( - |table => 'test.T', - | tag => 'test_tag2', - | timestamp => $commitTime3)""".stripMargin), - Row("test_tag2", 3, commitTime3, "null") :: Nil - ) - - // create tag from timestamp that later than snapshot-3 commit time. - checkAnswer( - spark.sql(s"""CALL paimon.sys.create_tag_from_timestamp( - |table => 'test.T', - |tag => 'test_tag3', - |timestamp => ${commitTime3 + 1})""".stripMargin), - Row("test_tag3", 4, commitTime4, "null") :: Nil - ) - - // create tag from timestamp that later than the latest snapshot commit time and throw SnapshotNotExistException. - assertThrows[SnapshotNotExistException] { - spark.sql(s"""CALL paimon.sys.create_tag_from_timestamp( - |table => 'test.T', - |tag => 'test_tag3', - |timestamp => ${Long.MaxValue})""".stripMargin) - } - - } finally { - stream.stop() - } + } } } } test("Paimon Procedure: Create tags from tags commit-time") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') - |""".stripMargin) - val location = loadTable("T").location().toString - - val inputData = MemoryStream[(Int, String)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) - } - .start() - - try { - for (i <- 1 to 2) { - inputData.addData((i, "a")) - stream.processAllAvailable() - Thread.sleep(500L) - } - - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(" + + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + try { + for (i <- 1 to 2) { + inputData.addData((i, "a")) + stream.processAllAvailable() + Thread.sleep(500L) + } + + checkAnswer( + spark.sql("CALL paimon.sys.create_tag(" + "table => 'test.T', tag => 'test_tag', snapshot => 1)"), - Row(true) :: Nil) - - val table = loadTable("T") - val latestCommitTime = table.snapshotManager.latestSnapshot().timeMillis - val tagsCommitTime = table.tagManager().getOrThrow("test_tag").timeMillis - assert(latestCommitTime > tagsCommitTime) - - // make snapshot 1 expire. - checkAnswer( - spark.sql( - "CALL paimon.sys.expire_snapshots(table => 'test.T', retain_max => 1, retain_min => 1)"), - Row(1) :: Nil) - - // create tag from timestamp that earlier than the expired snapshot 1. - checkAnswer( - spark.sql(s"""CALL paimon.sys.create_tag_from_timestamp( - |table => 'test.T', - | tag => 'test_tag1', - | timestamp => ${tagsCommitTime - 1})""".stripMargin), - Row("test_tag1", 1, tagsCommitTime, "null") :: Nil - ) - - // create tag from timestamp that later than the expired snapshot 1. - checkAnswer( - spark.sql(s"""CALL paimon.sys.create_tag_from_timestamp( - |table => 'test.T', - |tag => 'test_tag2', - |timestamp => ${tagsCommitTime + 1})""".stripMargin), - Row("test_tag2", 2, latestCommitTime, "null") :: Nil - ) - - } finally { - stream.stop() - } + Row(true) :: Nil) + + val table = loadTable("T") + val latestCommitTime = table.snapshotManager.latestSnapshot().timeMillis + val tagsCommitTime = table.tagManager().getOrThrow("test_tag").timeMillis + assert(latestCommitTime > tagsCommitTime) + + // make snapshot 1 expire. + checkAnswer( + spark.sql( + "CALL paimon.sys.expire_snapshots(table => 'test.T', retain_max => 1, retain_min => 1)"), + Row(1) :: Nil) + + // create tag from timestamp that earlier than the expired snapshot 1. + checkAnswer( + spark.sql(s"""CALL paimon.sys.create_tag_from_timestamp( + |table => 'test.T', + | tag => 'test_tag1', + | timestamp => ${tagsCommitTime - 1})""".stripMargin), + Row("test_tag1", 1, tagsCommitTime, "null") :: Nil + ) + + // create tag from timestamp that later than the expired snapshot 1. + checkAnswer( + spark.sql(s"""CALL paimon.sys.create_tag_from_timestamp( + |table => 'test.T', + |tag => 'test_tag2', + |timestamp => ${tagsCommitTime + 1})""".stripMargin), + Row("test_tag2", 2, latestCommitTime, "null") :: Nil + ) + + } finally { + stream.stop() + } + } } } } - } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala index 586f2e6c2d72..8d6a01025a96 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala @@ -20,6 +20,7 @@ package org.apache.paimon.spark.procedure import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest @@ -28,6 +29,11 @@ import org.assertj.core.api.Assertions.assertThatThrownBy /** IT Case for [[ExpirePartitionsProcedure]]. */ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest { + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.paimon.write.use-v2-write", "false") + } + import testImplicits._ test("Paimon Procedure: expire partitions") { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala index b39aa5d058ca..c0347151076b 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala @@ -21,6 +21,7 @@ package org.apache.paimon.spark.procedure import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.utils.SnapshotManager +import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest @@ -30,6 +31,11 @@ import java.sql.Timestamp class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest { + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.paimon.write.use-v2-write", "false") + } + import testImplicits._ test("Paimon Procedure: expire snapshots") { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index 66f2d57e02bc..131212d7b190 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -29,69 +29,72 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { import testImplicits._ test("Paimon Procedure: rollback to snapshot and tag") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - // define a change-log table and test `forEachBatch` api - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') - |""".stripMargin) - val table = loadTable("T") - val location = table.location().toString - - val inputData = MemoryStream[(Int, String)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + // define a change-log table and test `forEachBatch` api + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val table = loadTable("T") + val location = table.location().toString + + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + try { + // snapshot-1 + inputData.addData((1, "a")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Nil) + + checkAnswer( + spark.sql( + "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 1)"), + Row(true) :: Nil) + + // snapshot-2 + inputData.addData((2, "b")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // snapshot-3 + inputData.addData((2, "b2")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + assertThrows[RuntimeException] { + spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')") + } + // rollback to snapshot + checkAnswer( + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"), + Row(table.latestSnapshot().get().id, 2) :: Nil) + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // rollback to tag + val taggedSnapshotId = table.tagManager().getOrThrow("test_tag").trimToSnapshot().id + checkAnswer( + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => 'test_tag')"), + Row(table.latestSnapshot().get().id, taggedSnapshotId) :: Nil) + checkAnswer(query(), Row(1, "a") :: Nil) + } finally { + stream.stop() } - .start() - - val query = () => spark.sql("SELECT * FROM T ORDER BY a") - - try { - // snapshot-1 - inputData.addData((1, "a")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Nil) - - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 1)"), - Row(true) :: Nil) - - // snapshot-2 - inputData.addData((2, "b")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - - // snapshot-3 - inputData.addData((2, "b2")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) - assertThrows[RuntimeException] { - spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')") - } - // rollback to snapshot - checkAnswer( - spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"), - Row(table.latestSnapshot().get().id, 2) :: Nil) - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - - // rollback to tag - val taggedSnapshotId = table.tagManager().getOrThrow("test_tag").trimToSnapshot().id - checkAnswer( - spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => 'test_tag')"), - Row(table.latestSnapshot().get().id, taggedSnapshotId) :: Nil) - checkAnswer(query(), Row(1, "a") :: Nil) - } finally { - stream.stop() - } + } } } } @@ -159,60 +162,63 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { } test("Paimon Procedure: rollback to timestamp") { - failAfter(streamingTimeout) { - withTempDir { - checkpointDir => - // define a change-log table and test `forEachBatch` api - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') - |""".stripMargin) - val location = loadTable("T").location().toString - - val inputData = MemoryStream[(Int, String)] - val stream = inputData - .toDS() - .toDF("a", "b") - .writeStream - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .foreachBatch { - (batch: Dataset[Row], _: Long) => - batch.write.format("paimon").mode("append").save(location) + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + // define a change-log table and test `forEachBatch` api + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val table = loadTable("T") + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + try { + // snapshot-1 + inputData.addData((1, "a")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Nil) + + // snapshot-2 + inputData.addData((2, "b")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + val timestamp = System.currentTimeMillis() + + // snapshot-3 + inputData.addData((2, "b2")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + + // rollback to timestamp + checkAnswer( + spark.sql( + s"CALL paimon.sys.rollback_to_timestamp(table => 'test.T', timestamp => $timestamp)"), + Row(table.latestSnapshot().get().id, 2) :: Nil) + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + } finally { + stream.stop() } - .start() - - val table = loadTable("T") - - val query = () => spark.sql("SELECT * FROM T ORDER BY a") - - try { - // snapshot-1 - inputData.addData((1, "a")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Nil) - - // snapshot-2 - inputData.addData((2, "b")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - - val timestamp = System.currentTimeMillis() - - // snapshot-3 - inputData.addData((2, "b2")) - stream.processAllAvailable() - checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) - - // rollback to timestamp - checkAnswer( - spark.sql( - s"CALL paimon.sys.rollback_to_timestamp(table => 'test.T', timestamp => $timestamp)"), - Row(table.latestSnapshot().get().id, 2) :: Nil) - checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) - - } finally { - stream.stop() - } + } } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala index b25e41a3fb42..5524571aaf74 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala @@ -184,22 +184,24 @@ abstract class DataFrameWriteTestBase extends PaimonSparkTestBase { } test("Paimon: DataFrameWrite partition table") { - withTable("t") { - spark.sql(s""" - |CREATE TABLE t (a INT, b STRING, dt STRING) PARTITIONED BY(dt) - |TBLPROPERTIES ('file.format' = 'avro', 'bucket' = 2, 'bucket-key' = 'b') - |""".stripMargin) - - val table = loadTable("t") - val location = table.location().toString - - Seq((1, "x1", "a"), (2, "x2", "b")) - .toDF("a", "b", "c") - .write - .format("paimon") - .mode("append") - .save(location) - checkAnswer(sql("SELECT * FROM t"), Row(1, "x1", "a") :: Row(2, "x2", "b") :: Nil) + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + withTable("t") { + spark.sql(s""" + |CREATE TABLE t (a INT, b STRING, dt STRING) PARTITIONED BY(dt) + |TBLPROPERTIES ('file.format' = 'avro', 'bucket' = 2, 'bucket-key' = 'b') + |""".stripMargin) + + val table = loadTable("t") + val location = table.location().toString + + Seq((1, "x1", "a"), (2, "x2", "b")) + .toDF("a", "b", "c") + .write + .format("paimon") + .mode("append") + .save(location) + checkAnswer(sql("SELECT * FROM t"), Row(1, "x1", "a") :: Row(2, "x2", "b") :: Nil) + } } } @@ -285,43 +287,44 @@ abstract class DataFrameWriteTestBase extends PaimonSparkTestBase { bucketModes.foreach { bucket => test(s"Write data into Paimon directly: has-pk: $hasPk, bucket: $bucket") { + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + val prop = if (hasPk) { + s"'primary-key'='a', 'bucket' = '$bucket' " + } else if (bucket != -1) { + s"'bucket-key'='a', 'bucket' = '$bucket' " + } else { + "'write-only'='true'" + } - val prop = if (hasPk) { - s"'primary-key'='a', 'bucket' = '$bucket' " - } else if (bucket != -1) { - s"'bucket-key'='a', 'bucket' = '$bucket' " - } else { - "'write-only'='true'" - } + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ($prop) + |""".stripMargin) + + val paimonTable = loadTable("T") + val location = paimonTable.location().toString + + val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b") + df1.write.format("paimon").mode("append").save(location) + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY a, b"), + Row(1, "a") :: Row(2, "b") :: Nil) + + val df2 = Seq((1, "a2"), (3, "c")).toDF("a", "b") + df2.write.format("paimon").mode("append").save(location) + val expected = if (hasPk) { + Row(1, "a2") :: Row(2, "b") :: Row(3, "c") :: Nil + } else { + Row(1, "a") :: Row(1, "a2") :: Row(2, "b") :: Row(3, "c") :: Nil + } + checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected) - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ($prop) - |""".stripMargin) - - val paimonTable = loadTable("T") - val location = paimonTable.location().toString - - val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b") - df1.write.format("paimon").mode("append").save(location) - checkAnswer( - spark.sql("SELECT * FROM T ORDER BY a, b"), - Row(1, "a") :: Row(2, "b") :: Nil) - - val df2 = Seq((1, "a2"), (3, "c")).toDF("a", "b") - df2.write.format("paimon").mode("append").save(location) - val expected = if (hasPk) { - Row(1, "a2") :: Row(2, "b") :: Row(3, "c") :: Nil - } else { - Row(1, "a") :: Row(1, "a2") :: Row(2, "b") :: Row(3, "c") :: Nil + val df3 = Seq((4, "d"), (5, "e")).toDF("a", "b") + df3.write.format("paimon").mode("overwrite").save(location) + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY a, b"), + Row(4, "d") :: Row(5, "e") :: Nil) } - checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected) - - val df3 = Seq((4, "d"), (5, "e")).toDF("a", "b") - df3.write.format("paimon").mode("overwrite").save(location) - checkAnswer( - spark.sql("SELECT * FROM T ORDER BY a, b"), - Row(4, "d") :: Row(5, "e") :: Nil) } } } @@ -334,111 +337,113 @@ abstract class DataFrameWriteTestBase extends PaimonSparkTestBase { bucket => test( s"Schema evolution: write data into Paimon: $hasPk, bucket: $bucket, format: $format") { - val _spark = spark - import _spark.implicits._ - - val prop = if (hasPk) { - s"'primary-key'='a', 'bucket' = '$bucket', 'file.format' = '$format'" - } else if (bucket != -1) { - s"'bucket-key'='a', 'bucket' = '$bucket', 'file.format' = '$format'" - } else { - s"'write-only'='true', 'file.format' = '$format'" + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + val _spark = spark + import _spark.implicits._ + + val prop = if (hasPk) { + s"'primary-key'='a', 'bucket' = '$bucket', 'file.format' = '$format'" + } else if (bucket != -1) { + s"'bucket-key'='a', 'bucket' = '$bucket', 'file.format' = '$format'" + } else { + s"'write-only'='true', 'file.format' = '$format'" + } + + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ($prop) + |""".stripMargin) + + val paimonTable = loadTable("T") + val location = paimonTable.location().toString + + val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b") + df1.write.format("paimon").mode("append").save(location) + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY a, b"), + Row(1, "a") :: Row(2, "b") :: Nil) + + // Case 1: two additional fields + val df2 = Seq((1, "a2", 123L, Map("k" -> 11.1)), (3, "c", 345L, Map("k" -> 33.3))) + .toDF("a", "b", "c", "d") + df2.write + .format("paimon") + .mode("append") + .option("write.merge-schema", "true") + .save(location) + val expected2 = if (hasPk) { + Row(1, "a2", 123L, Map("k" -> 11.1)) :: + Row(2, "b", null, null) :: Row(3, "c", 345L, Map("k" -> 33.3)) :: Nil + } else { + Row(1, "a", null, null) :: Row(1, "a2", 123L, Map("k" -> 11.1)) :: Row( + 2, + "b", + null, + null) :: Row(3, "c", 345L, Map("k" -> 33.3)) :: Nil + } + checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected2) + + // Case 2: two fields with the evolved types: Int -> Long, Long -> Decimal + val df3 = Seq( + (2L, "b2", BigDecimal.decimal(234), Map("k" -> 22.2)), + (4L, "d", BigDecimal.decimal(456), Map("k" -> 44.4))).toDF("a", "b", "c", "d") + df3.write + .format("paimon") + .mode("append") + .option("write.merge-schema", "true") + .save(location) + val expected3 = if (hasPk) { + Row(1L, "a2", BigDecimal.decimal(123), Map("k" -> 11.1)) :: Row( + 2L, + "b2", + BigDecimal.decimal(234), + Map("k" -> 22.2)) :: Row( + 3L, + "c", + BigDecimal.decimal(345), + Map("k" -> 33.3)) :: Row( + 4L, + "d", + BigDecimal.decimal(456), + Map("k" -> 44.4)) :: Nil + } else { + Row(1L, "a", null, null) :: Row( + 1L, + "a2", + BigDecimal.decimal(123), + Map("k" -> 11.1)) :: Row(2L, "b", null, null) :: Row( + 2L, + "b2", + BigDecimal.decimal(234), + Map("k" -> 22.2)) :: Row( + 3L, + "c", + BigDecimal.decimal(345), + Map("k" -> 33.3)) :: Row( + 4L, + "d", + BigDecimal.decimal(456), + Map("k" -> 44.4)) :: Nil + } + checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected3) + + // Case 3: insert Decimal(20,18) to Decimal(38,18) + val df4 = Seq((99L, "df4", BigDecimal.decimal(4.0), Map("4" -> 4.1))) + .toDF("a", "b", "c", "d") + .selectExpr("a", "b", "cast(c as decimal(20,18)) as c", "d") + df4.write + .format("paimon") + .mode("append") + .option("write.merge-schema", "true") + .save(location) + val expected4 = + expected3 ++ Seq(Row(99L, "df4", BigDecimal.decimal(4.0), Map("4" -> 4.1))) + checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected4) + val decimalType = + spark.table("T").schema.apply(2).dataType.asInstanceOf[DecimalType] + assert(decimalType.precision == 38) + assert(decimalType.scale == 18) } - - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ($prop) - |""".stripMargin) - - val paimonTable = loadTable("T") - val location = paimonTable.location().toString - - val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b") - df1.write.format("paimon").mode("append").save(location) - checkAnswer( - spark.sql("SELECT * FROM T ORDER BY a, b"), - Row(1, "a") :: Row(2, "b") :: Nil) - - // Case 1: two additional fields - val df2 = Seq((1, "a2", 123L, Map("k" -> 11.1)), (3, "c", 345L, Map("k" -> 33.3))) - .toDF("a", "b", "c", "d") - df2.write - .format("paimon") - .mode("append") - .option("write.merge-schema", "true") - .save(location) - val expected2 = if (hasPk) { - Row(1, "a2", 123L, Map("k" -> 11.1)) :: - Row(2, "b", null, null) :: Row(3, "c", 345L, Map("k" -> 33.3)) :: Nil - } else { - Row(1, "a", null, null) :: Row(1, "a2", 123L, Map("k" -> 11.1)) :: Row( - 2, - "b", - null, - null) :: Row(3, "c", 345L, Map("k" -> 33.3)) :: Nil - } - checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected2) - - // Case 2: two fields with the evolved types: Int -> Long, Long -> Decimal - val df3 = Seq( - (2L, "b2", BigDecimal.decimal(234), Map("k" -> 22.2)), - (4L, "d", BigDecimal.decimal(456), Map("k" -> 44.4))).toDF("a", "b", "c", "d") - df3.write - .format("paimon") - .mode("append") - .option("write.merge-schema", "true") - .save(location) - val expected3 = if (hasPk) { - Row(1L, "a2", BigDecimal.decimal(123), Map("k" -> 11.1)) :: Row( - 2L, - "b2", - BigDecimal.decimal(234), - Map("k" -> 22.2)) :: Row( - 3L, - "c", - BigDecimal.decimal(345), - Map("k" -> 33.3)) :: Row( - 4L, - "d", - BigDecimal.decimal(456), - Map("k" -> 44.4)) :: Nil - } else { - Row(1L, "a", null, null) :: Row( - 1L, - "a2", - BigDecimal.decimal(123), - Map("k" -> 11.1)) :: Row(2L, "b", null, null) :: Row( - 2L, - "b2", - BigDecimal.decimal(234), - Map("k" -> 22.2)) :: Row( - 3L, - "c", - BigDecimal.decimal(345), - Map("k" -> 33.3)) :: Row( - 4L, - "d", - BigDecimal.decimal(456), - Map("k" -> 44.4)) :: Nil - } - checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected3) - - // Case 3: insert Decimal(20,18) to Decimal(38,18) - val df4 = Seq((99L, "df4", BigDecimal.decimal(4.0), Map("4" -> 4.1))) - .toDF("a", "b", "c", "d") - .selectExpr("a", "b", "cast(c as decimal(20,18)) as c", "d") - df4.write - .format("paimon") - .mode("append") - .option("write.merge-schema", "true") - .save(location) - val expected4 = - expected3 ++ Seq(Row(99L, "df4", BigDecimal.decimal(4.0), Map("4" -> 4.1))) - checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected4) - val decimalType = - spark.table("T").schema.apply(2).dataType.asInstanceOf[DecimalType] - assert(decimalType.precision == 38) - assert(decimalType.scale == 18) } } } @@ -450,102 +455,103 @@ abstract class DataFrameWriteTestBase extends PaimonSparkTestBase { bucket => test( s"Schema evolution: write data into Paimon with allowExplicitCast = true: $hasPk, bucket: $bucket") { + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { + val prop = if (hasPk) { + s"'primary-key'='a', 'bucket' = '$bucket' " + } else if (bucket != -1) { + s"'bucket-key'='a', 'bucket' = '$bucket' " + } else { + "'write-only'='true'" + } - val prop = if (hasPk) { - s"'primary-key'='a', 'bucket' = '$bucket' " - } else if (bucket != -1) { - s"'bucket-key'='a', 'bucket' = '$bucket' " - } else { - "'write-only'='true'" - } - - spark.sql(s""" - |CREATE TABLE T (a INT, b STRING) - |TBLPROPERTIES ($prop) - |""".stripMargin) - - val paimonTable = loadTable("T") - val location = paimonTable.location().toString - - val df1 = Seq((1, "2023-08-01"), (2, "2023-08-02")).toDF("a", "b") - df1.write.format("paimon").mode("append").save(location) - checkAnswer( - spark.sql("SELECT * FROM T ORDER BY a, b"), - Row(1, "2023-08-01") :: Row(2, "2023-08-02") :: Nil) - - // Case 1: two additional fields: DoubleType and TimestampType - val ts = java.sql.Timestamp.valueOf("2023-08-01 10:00:00.0") - val df2 = Seq((1, "2023-08-01", 12.3d, ts), (3, "2023-08-03", 34.5d, ts)) - .toDF("a", "b", "c", "d") - df2.write - .format("paimon") - .mode("append") - .option("write.merge-schema", "true") - .save(location) - val expected2 = if (hasPk) { - Row(1, "2023-08-01", 12.3d, ts) :: - Row(2, "2023-08-02", null, null) :: Row(3, "2023-08-03", 34.5d, ts) :: Nil - } else { - Row(1, "2023-08-01", null, null) :: Row(1, "2023-08-01", 12.3d, ts) :: Row( - 2, - "2023-08-02", - null, - null) :: Row(3, "2023-08-03", 34.5d, ts) :: Nil - } - checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected2) - - // Case 2: a: Int -> Long, b: String -> Date, c: Long -> Int, d: Map -> String - val date = java.sql.Date.valueOf("2023-07-31") - val df3 = Seq((2L, date, 234, null), (4L, date, 456, "2023-08-01 11:00:00.0")).toDF( - "a", - "b", - "c", - "d") - - // throw UnsupportedOperationException if write.merge-schema.explicit-cast = false - assertThrows[UnsupportedOperationException] { + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ($prop) + |""".stripMargin) + + val paimonTable = loadTable("T") + val location = paimonTable.location().toString + + val df1 = Seq((1, "2023-08-01"), (2, "2023-08-02")).toDF("a", "b") + df1.write.format("paimon").mode("append").save(location) + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY a, b"), + Row(1, "2023-08-01") :: Row(2, "2023-08-02") :: Nil) + + // Case 1: two additional fields: DoubleType and TimestampType + val ts = java.sql.Timestamp.valueOf("2023-08-01 10:00:00.0") + val df2 = Seq((1, "2023-08-01", 12.3d, ts), (3, "2023-08-03", 34.5d, ts)) + .toDF("a", "b", "c", "d") + df2.write + .format("paimon") + .mode("append") + .option("write.merge-schema", "true") + .save(location) + val expected2 = if (hasPk) { + Row(1, "2023-08-01", 12.3d, ts) :: + Row(2, "2023-08-02", null, null) :: Row(3, "2023-08-03", 34.5d, ts) :: Nil + } else { + Row(1, "2023-08-01", null, null) :: Row(1, "2023-08-01", 12.3d, ts) :: Row( + 2, + "2023-08-02", + null, + null) :: Row(3, "2023-08-03", 34.5d, ts) :: Nil + } + checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected2) + + // Case 2: a: Int -> Long, b: String -> Date, c: Long -> Int, d: Map -> String + val date = java.sql.Date.valueOf("2023-07-31") + val df3 = Seq((2L, date, 234, null), (4L, date, 456, "2023-08-01 11:00:00.0")).toDF( + "a", + "b", + "c", + "d") + + // throw UnsupportedOperationException if write.merge-schema.explicit-cast = false + assertThrows[UnsupportedOperationException] { + df3.write + .format("paimon") + .mode("append") + .option("write.merge-schema", "true") + .save(location) + } + // merge schema and write data when write.merge-schema.explicit-cast = true df3.write .format("paimon") .mode("append") .option("write.merge-schema", "true") + .option("write.merge-schema.explicit-cast", "true") .save(location) - } - // merge schema and write data when write.merge-schema.explicit-cast = true - df3.write - .format("paimon") - .mode("append") - .option("write.merge-schema", "true") - .option("write.merge-schema.explicit-cast", "true") - .save(location) - val expected3 = if (hasPk) { - Row(1L, Date.valueOf("2023-08-01"), 12, ts.toString) :: Row( - 2L, - date, - 234, - null) :: Row(3L, Date.valueOf("2023-08-03"), 34, ts.toString) :: Row( - 4L, - date, - 456, - "2023-08-01 11:00:00.0") :: Nil - } else { - Row(1L, Date.valueOf("2023-08-01"), null, null) :: Row( - 1L, - Date.valueOf("2023-08-01"), - 12, - ts.toString) :: Row(2L, date, 234, null) :: Row( - 2L, - Date.valueOf("2023-08-02"), - null, - null) :: Row(3L, Date.valueOf("2023-08-03"), 34, ts.toString) :: Row( - 4L, - date, - 456, - "2023-08-01 11:00:00.0") :: Nil - } - checkAnswer( - spark.sql("SELECT a, b, c, substring(d, 0, 21) FROM T ORDER BY a, b"), - expected3) + val expected3 = if (hasPk) { + Row(1L, Date.valueOf("2023-08-01"), 12, ts.toString) :: Row( + 2L, + date, + 234, + null) :: Row(3L, Date.valueOf("2023-08-03"), 34, ts.toString) :: Row( + 4L, + date, + 456, + "2023-08-01 11:00:00.0") :: Nil + } else { + Row(1L, Date.valueOf("2023-08-01"), null, null) :: Row( + 1L, + Date.valueOf("2023-08-01"), + 12, + ts.toString) :: Row(2L, date, 234, null) :: Row( + 2L, + Date.valueOf("2023-08-02"), + null, + null) :: Row(3L, Date.valueOf("2023-08-03"), 34, ts.toString) :: Row( + 4L, + date, + 456, + "2023-08-01 11:00:00.0") :: Nil + } + checkAnswer( + spark.sql("SELECT a, b, c, substring(d, 0, 21) FROM T ORDER BY a, b"), + expected3) + } } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala index 9656a3caa65c..83646f85a5b1 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala @@ -86,28 +86,31 @@ class PaimonMetricTest extends PaimonSparkTestBase with ScanPlanHelper { } test("Paimon Metric: report output metric") { - sql(s"CREATE TABLE T (id int)") + withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) { - var recordsWritten = 0L - var bytesWritten = 0L + sql(s"CREATE TABLE T (id int)") - val listener = new SparkListener() { - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - val outputMetrics = taskEnd.taskMetrics.outputMetrics - recordsWritten += outputMetrics.recordsWritten - bytesWritten += outputMetrics.bytesWritten + var recordsWritten = 0L + var bytesWritten = 0L + + val listener = new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + val outputMetrics = taskEnd.taskMetrics.outputMetrics + recordsWritten += outputMetrics.recordsWritten + bytesWritten += outputMetrics.bytesWritten + } } - } - try { - spark.sparkContext.addSparkListener(listener) - sql(s"INSERT INTO T VALUES 1, 2, 3") - } finally { - spark.sparkContext.removeSparkListener(listener) - } + try { + spark.sparkContext.addSparkListener(listener) + sql(s"INSERT INTO T VALUES 1, 2, 3") + } finally { + spark.sparkContext.removeSparkListener(listener) + } - Assertions.assertEquals(3, recordsWritten) - Assertions.assertTrue(bytesWritten > 0) + Assertions.assertEquals(3, recordsWritten) + Assertions.assertTrue(bytesWritten > 0) + } } def metric(metrics: Array[CustomTaskMetric], name: String): Long = {