diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java index 95defcd4253d4..61a464c2f1db6 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java @@ -259,6 +259,22 @@ public void testDynamicOptions() throws Exception { assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(2)); } + @Test + public void testReadWriteBranch() throws Exception { + // create table + sql("CREATE TABLE T (id INT)"); + // insert data + batchSql("INSERT INTO T VALUES (1)"); + // create tag + paimonTable("T").createTag("tag1", 1); + // create branch + paimonTable("T").createBranch("branch1", "tag1"); + // insert data to branch + batchSql("INSERT INTO T/*+ OPTIONS('branch' = 'branch1') */ VALUES (2)"); + List rows = batchSql("select * from T /*+ OPTIONS('branch' = 'branch1') */"); + assertThat(rows).containsExactlyInAnyOrder(Row.of(2), Row.of(1)); + } + @Override protected List ddl() { return Arrays.asList( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index ccd1eafca6b3c..e2f3d8b50eb80 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -691,7 +691,8 @@ public void testTagsTable() throws Exception { List result = sql("SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags"); - assertThat(result).containsExactlyInAnyOrder(Row.of("tag1", 1L, 0L, 1L), Row.of("tag2", 2L, 0L, 2L)); + assertThat(result) + .containsExactlyInAnyOrder(Row.of("tag1", 1L, 0L, 1L), Row.of("tag2", 2L, 0L, 2L)); } @Test diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index e495ad3da5d4c..09e2dbac54a60 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -168,6 +168,61 @@ public void testLookupChangelog() throws Exception { innerTestChangelogProducing(Collections.singletonList("'changelog-producer' = 'lookup'")); } + @Test + public void testAlterTableReadWriteBranch() throws Exception { + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100) + .parallelism(1) + .build(); + + sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + sEnv.executeSql("USE CATALOG testCatalog"); + sEnv.executeSql( + "CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT ENFORCED ) " + + "WITH ( " + + "'bucket' = '2'" + + ")"); + + CloseableIterator it = sEnv.executeSql("SELECT * FROM T2").collect(); + + // insert data + sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await(); + // read initial data + List actual = new ArrayList<>(); + for (int i = 0; i < 1; i++) { + actual.add(it.next().toString()); + } + + assertThat(actual).containsExactlyInAnyOrder("+I[1, A]"); + + // create tag + sEnv.executeSql( + String.format("CALL sys.create_tag('%s.%s', 'tag2', 1, '5 d')", "default", "T2")); + // create branch + sEnv.executeSql( + String.format( + "CALL sys.create_branch('%s.%s', 'branch1', 'tag2')", "default", "T2")); + // alter table + sEnv.executeSql("ALTER TABLE T2 SET ('changelog-producer'='full-compaction')"); + + CloseableIterator branchIt = + sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' = 'branch1') */").collect(); + // insert data to branch + sEnv.executeSql( + "INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */ VALUES (10, 'v10'),(11, 'v11'),(12, 'v12')") + .await(); + + // read initial data + List actualBranch = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + actualBranch.add(branchIt.next().toString()); + } + assertThat(actualBranch) + .containsExactlyInAnyOrder("+I[1, A]", "+I[10, v10]", "+I[11, v11]", "+I[12, v12]"); + } + private void innerTestChangelogProducing(List options) throws Exception { TableEnvironment sEnv = tableEnvironmentBuilder()