diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index e4b09df3893b6..cc89852cf0d7c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -204,6 +204,17 @@ public void deleteBranch(String branchName) { wrapped.deleteBranch(branchName); } + /** + * Merge a branch to main branch. + * + * @param branchName + */ + @Override + public void mergeBranch(String branchName) { + privilegeChecker.assertCanInsert(identifier); + wrapped.mergeBranch(branchName); + } + @Override public void replaceBranch(String fromBranch) { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 785de6008cbe9..d21fa304e67c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -45,6 +45,7 @@ import java.util.stream.Stream; import static org.apache.paimon.utils.FileUtils.listVersionedDirectories; +import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Manager for {@code Branch}. */ @@ -357,12 +358,13 @@ public boolean fileExists(Path path) { public void mergeBranch(String branchName) { checkArgument( - !branchName.equals(DEFAULT_MAIN_BRANCH), + !isMainBranch(branchName), "Branch name '%s' do not use in merge branch.", branchName); checkArgument(branchExists(branchName), "Branch name '%s' doesn't exist.", branchName); - Long earliestSnapshotId = snapshotManager.earliestSnapshotId(branchName); - Snapshot earliestSnapshot = snapshotManager.snapshot(branchName, earliestSnapshotId); + Long earliestSnapshotId = snapshotManager.copyWithBranch(branchName).earliestSnapshotId(); + Snapshot earliestSnapshot = + snapshotManager.copyWithBranch(branchName).snapshot(earliestSnapshotId); long earliestSchemaId = earliestSnapshot.schemaId(); try { @@ -407,18 +409,19 @@ public void mergeBranch(String branchName) { fileIO.deleteFilesQuietly(deletePaths); fileIO.copyFilesUtf8( - snapshotManager.branchSnapshotDirectory(branchName), + snapshotManager.copyWithBranch(branchName).snapshotDirectory(), snapshotManager.snapshotDirectory()); fileIO.copyFilesUtf8( - schemaManager.branchSchemaDirectory(branchName), + schemaManager.copyWithBranch(branchName).schemaDirectory(), schemaManager.schemaDirectory()); fileIO.copyFilesUtf8( - tagManager.branchTagDirectory(branchName), tagManager.tagDirectory()); + tagManager.copyWithBranch(branchName).tagDirectory(), + tagManager.tagDirectory()); } catch (IOException e) { throw new RuntimeException( String.format( "Exception occurs when merge branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 51c915d9cd38c..6c022602584ba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -608,11 +608,7 @@ private Long findByListFiles(BinaryOperator reducer, Path dir, String pref } public void deleteLatestHint() throws IOException { - deleteLatestHint(DEFAULT_MAIN_BRANCH); - } - - public void deleteLatestHint(String branchName) throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); + Path snapshotDir = snapshotDirectory(); Path hintFile = new Path(snapshotDir, LATEST); fileIO.delete(hintFile, false); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 59f3545c60afd..6c59ef53cc725 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -80,7 +80,7 @@ public TagManager copyWithBranch(String branchName) { public Path tagDirectory() { return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag"); } - + /** Return the path of a tag. */ public Path tagPath(String tagName) { return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag/" + TAG_PREFIX + tagName); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 1d64e0271324d..b6014107a9a9a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1113,11 +1113,12 @@ public void testMergeBranch() throws Exception { generateBranch(table); + FileStoreTable tableBranch = createFileStoreTable("branch1"); // Verify branch1 and the main branch have the same data assertThat( getResult( table.newRead(), - toSplits(table.newSnapshotReader("branch1").read().dataSplits()), + toSplits(tableBranch.newSnapshotReader().read().dataSplits()), BATCH_ROW_TO_STRING)) .containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset"); @@ -1135,8 +1136,8 @@ public void testMergeBranch() throws Exception { "Branch name 'main' do not use in merge branch.")); // Write data to branch1 - try (StreamTableWrite write = table.newWrite(commitUser); - StreamTableCommit commit = table.newCommit(commitUser, "branch1")) { + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { write.write(rowData(2, 20, 200L)); commit.commit(1, write.prepareCommit(false, 2)); } @@ -1144,8 +1145,8 @@ public void testMergeBranch() throws Exception { // Validate data in branch1 assertThat( getResult( - table.newRead(), - toSplits(table.newSnapshotReader("branch1").read().dataSplits()), + tableBranch.newRead(), + toSplits(tableBranch.newSnapshotReader().read().dataSplits()), BATCH_ROW_TO_STRING)) .containsExactlyInAnyOrder( "0|0|0|binary|varbinary|mapKey:mapVal|multiset", @@ -1176,7 +1177,8 @@ public void testMergeBranch() throws Exception { SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath); Snapshot branchSnapshot = Snapshot.fromPath( - new TraceableFileIO(), snapshotManager.branchSnapshotPath("branch1", 2)); + new TraceableFileIO(), + snapshotManager.copyWithBranch("branch1").snapshotPath(2)); Snapshot snapshot = Snapshot.fromPath(new TraceableFileIO(), snapshotManager.snapshotPath(2)); assertThat(branchSnapshot.equals(snapshot)).isTrue(); @@ -1185,13 +1187,14 @@ public void testMergeBranch() throws Exception { SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath); TableSchema branchSchema = SchemaManager.fromPath( - new TraceableFileIO(), schemaManager.branchSchemaPath("branch1", 0)); + new TraceableFileIO(), + schemaManager.copyWithBranch("branch1").toSchemaPath(0)); TableSchema schema0 = schemaManager.schema(0); assertThat(branchSchema.equals(schema0)).isTrue(); // Write two rows data to branch1 again - try (StreamTableWrite write = table.newWrite(commitUser); - StreamTableCommit commit = table.newCommit(commitUser, "branch1")) { + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { write.write(rowData(3, 30, 300L)); write.write(rowData(4, 40, 400L)); commit.commit(2, write.prepareCommit(false, 3)); @@ -1201,7 +1204,7 @@ public void testMergeBranch() throws Exception { assertThat( getResult( table.newRead(), - toSplits(table.newSnapshotReader("branch1").read().dataSplits()), + toSplits(tableBranch.newSnapshotReader().read().dataSplits()), BATCH_ROW_TO_STRING)) .containsExactlyInAnyOrder( "0|0|0|binary|varbinary|mapKey:mapVal|multiset",