diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index 1881c7350900a..04bd393bc841c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -204,6 +204,29 @@ public void deleteBranch(String branchName) { wrapped.deleteBranch(branchName); } + /** + * Replace main branch. + * + * @param branchName + */ + @Override + public void replaceMainBranch(String branchName) { + privilegeChecker.assertCanInsert(identifier); + wrapped.replaceMainBranch(branchName); + } + + @Override + public void cleanMainBranchFile() { + privilegeChecker.assertCanInsert(identifier); + wrapped.cleanMainBranchFile(); + } + + @Override + public void mainBranch() { + privilegeChecker.assertCanInsert(identifier); + wrapped.mainBranch(); + } + @Override public ExpireSnapshots newExpireSnapshots() { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 228d30d5cb089..cb478d7bad285 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -68,7 +68,6 @@ import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; -import static org.apache.paimon.utils.BranchManager.isMainBranch; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; import static org.apache.paimon.utils.Preconditions.checkState; @@ -500,17 +499,13 @@ public static TableSchema fromPath(FileIO fileIO, Path path) { } public Path schemaDirectory() { - return isMainBranch(branch) - ? new Path(tableRoot + "/schema") - : new Path(getBranchPath(tableRoot, branch) + "/schema"); + return new Path(getBranchPath(fileIO, tableRoot, branch) + "/schema"); } @VisibleForTesting public Path toSchemaPath(long schemaId) { - return isMainBranch(branch) - ? new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + schemaId) - : new Path( - getBranchPath(tableRoot, branch) + "/schema/" + SCHEMA_PREFIX + schemaId); + return new Path( + getBranchPath(fileIO, tableRoot, branch) + "/schema/" + SCHEMA_PREFIX + schemaId); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 655e814316903..2223d1dc2938b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -525,6 +525,21 @@ public void deleteBranch(String branchName) { branchManager().deleteBranch(branchName); } + @Override + public void cleanMainBranchFile() { + branchManager().cleanMainBranchFile(); + } + + @Override + public void replaceMainBranch(String branchName) { + branchManager().commitMainBranch(branchName); + } + + @Override + public void mainBranch() { + branchManager().mainBranch(); + } + @Override public void rollbackTo(String tagName) { TagManager tagManager = tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index 42bea3f6813eb..2577ab2bcc486 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -182,6 +182,30 @@ default void deleteBranch(String branchName) { this.getClass().getSimpleName())); } + @Override + default void mainBranch() { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support mainBranch.", + this.getClass().getSimpleName())); + } + + @Override + default void cleanMainBranchFile() { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support cleanMainBranchFile.", + this.getClass().getSimpleName())); + } + + @Override + default void replaceMainBranch(String branchName) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support setMainBranch.", + this.getClass().getSimpleName())); + } + @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 876908394f2dc..3ecc00205b866 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -111,6 +111,14 @@ public interface Table extends Serializable { @Experimental void deleteBranch(String branchName); + /** Replace main branch. */ + @Experimental + void replaceMainBranch(String branchName); + + void cleanMainBranchFile(); + + void mainBranch(); + /** Manually expire snapshots, parameters can be controlled independently of table options. */ @Experimental ExpireSnapshots newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 41099dfac46b6..61f6e1a43cad1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -49,6 +49,7 @@ public class BranchManager { public static final String BRANCH_PREFIX = "branch-"; public static final String DEFAULT_MAIN_BRANCH = "main"; + public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH"; private final FileIO fileIO; private final Path tablePath; @@ -79,13 +80,69 @@ public static boolean isMainBranch(String branch) { } /** Return the path string of a branch. */ - public static String getBranchPath(Path tablePath, String branchName) { - return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName; + public static String getBranchPath(FileIO fileIO, Path tablePath, String branch) { + if (isMainBranch(branch)) { + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + String data = fileIO.readFileUtf8(path); + if (StringUtils.isBlank(data)) { + return tablePath.toString(); + } else { + return tablePath.toString() + "/branch/" + BRANCH_PREFIX + data; + } + } else { + return tablePath.toString(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branch; + } + + /** Get main branch. */ + public String mainBranch() { + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + return fileIO.readFileUtf8(path); + } else { + return DEFAULT_MAIN_BRANCH; + } + } catch (IOException e) { + throw new RuntimeException(e); + } } /** Return the path of a branch. */ public Path branchPath(String branchName) { - return new Path(getBranchPath(tablePath, branchName)); + return new Path(getBranchPath(fileIO, tablePath, branchName)); + } + + /** Replace main by specify branch. */ + public void commitMainBranch(String branchName) { + Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE); + try { + fileIO.delete(mainBranchFile, false); + fileIO.overwriteFileUtf8(mainBranchFile, branchName); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Exception occurs when set main branch '%s' (directory in %s).", + branchName, tablePath.toString()), + e); + } + } + + /** Clean the main branch file and use default. */ + public void cleanMainBranchFile() { + Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE); + try { + fileIO.delete(mainBranchFile, false); + } catch (IOException e) { + throw new RuntimeException("Exception occurs when clean main branch file.", e); + } } /** Create empty branch. */ @@ -111,7 +168,7 @@ public void createBranch(String branchName) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -143,7 +200,7 @@ public void createBranch(String branchName, long snapshotId) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -179,7 +236,7 @@ public void createBranch(String branchName, String tagName) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -193,7 +250,7 @@ public void deleteBranch(String branchName) { LOG.info( String.format( "Deleting the branch failed due to an exception in deleting the directory %s. Please try again.", - getBranchPath(tablePath, branchName)), + getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -246,8 +303,7 @@ public List branches() { } FileStoreTable branchTable = FileStoreTableFactory.create( - fileIO, new Path(getBranchPath(tablePath, branchName))); - + fileIO, new Path(getBranchPath(fileIO, tablePath, branchName))); SortedMap> snapshotTags = branchTable.tagManager().tags(); Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId(); if (snapshotTags.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 63679b86acea3..9813debe4761e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -47,7 +47,6 @@ import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; -import static org.apache.paimon.utils.BranchManager.isMainBranch; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; /** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */ @@ -90,35 +89,27 @@ public Path tablePath() { } public Path changelogDirectory() { - return isMainBranch(branch) - ? new Path(tablePath + "/changelog") - : new Path(getBranchPath(tablePath, branch) + "/changelog"); + return new Path(getBranchPath(fileIO, tablePath, branch) + "/changelog"); } public Path longLivedChangelogPath(long snapshotId) { - return isMainBranch(branch) - ? new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId) - : new Path( - getBranchPath(tablePath, branch) - + "/changelog/" - + CHANGELOG_PREFIX - + snapshotId); + return new Path( + getBranchPath(fileIO, tablePath, branch) + + "/changelog/" + + CHANGELOG_PREFIX + + snapshotId); } public Path snapshotPath(long snapshotId) { - return isMainBranch(branch) - ? new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId) - : new Path( - getBranchPath(tablePath, branch) - + "/snapshot/" - + SNAPSHOT_PREFIX - + snapshotId); + return new Path( + getBranchPath(fileIO, tablePath, branch) + + "/snapshot/" + + SNAPSHOT_PREFIX + + snapshotId); } public Path snapshotDirectory() { - return isMainBranch(branch) - ? new Path(tablePath + "/snapshot") - : new Path(getBranchPath(tablePath, branch) + "/snapshot"); + return new Path(getBranchPath(fileIO, tablePath, branch) + "/snapshot"); } public Snapshot snapshot(long snapshotId) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index c96bbdd568a7a..01e9d94146fb6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -47,7 +47,6 @@ import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; -import static org.apache.paimon.utils.BranchManager.isMainBranch; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -79,16 +78,12 @@ public TagManager copyWithBranch(String branchName) { /** Return the root Directory of tags. */ public Path tagDirectory() { - return isMainBranch(branch) - ? new Path(tablePath + "/tag") - : new Path(getBranchPath(tablePath, branch) + "/tag"); + return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag"); } - /** Return the path of a tag in branch. */ + /** Return the path of a tag. */ public Path tagPath(String tagName) { - return isMainBranch(branch) - ? new Path(tablePath + "/tag/" + TAG_PREFIX + tagName) - : new Path(getBranchPath(tablePath, branch) + "/tag/" + TAG_PREFIX + tagName); + return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag/" + TAG_PREFIX + tagName); } /** Create a tag from given snapshot and save it in the storage. */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java new file mode 100644 index 0000000000000..bae7820d10124 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Clean main branch procedure. Usage: + * + *

+ *  CALL sys.clean_main_branch('tableId')
+ * 
+ */ +public class CleanMainBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "clean_main_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call(ProcedureContext procedureContext, String tableId) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.cleanMainBranchFile(); + + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceMainBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceMainBranchProcedure.java new file mode 100644 index 0000000000000..33fdd99142569 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceMainBranchProcedure.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Replace main branch procedure. Usage: + * + *

+ *  CALL sys.replace_main_branch('tableId', 'branchName')
+ * 
+ */ +public class ReplaceMainBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "replace_main_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call(ProcedureContext procedureContext, String tableId, String branchName) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.replaceMainBranch(branchName); + + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 848dd317d43bd..58d327ad81858 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -51,3 +51,5 @@ org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure org.apache.paimon.flink.procedure.RepairProcedure +org.apache.paimon.flink.procedure.ReplaceMainBranchProcedure +org.apache.paimon.flink.procedure.CleanMainBranchProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index 007d1ac5c1475..30fdfe144396d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -163,4 +163,52 @@ void testCreateAndDeleteEmptyBranch() throws Exception { database, tableName)); assertThat(branchManager.branchExists("empty_branch_name")).isFalse(); } + + @Test + void testReplaceMainBranchAndCleanMainBranch() throws Exception { + + init(warehouse); + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"k", "v"}); + FileStoreTable table = + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyList(), + Collections.emptyMap()); + + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // 3 snapshots + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(3L, BinaryString.fromString("Paimon"))); + + TagManager tagManager = new TagManager(table.fileIO(), table.location()); + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + assertThat(tagManager.tagExists("tag2")).isTrue(); + + BranchManager branchManager = table.branchManager(); + callProcedure( + String.format( + "CALL sys.create_branch('%s.%s', 'branch_name', 'tag2')", + database, tableName)); + assertThat(branchManager.branchExists("branch_name")).isTrue(); + + callProcedure( + String.format( + "CALL sys.replace_main_branch('%s.%s', 'branch_name')", + database, tableName)); + assertThat(branchManager.mainBranch()).isEqualTo("branch_name"); + + callProcedure(String.format("CALL sys.clean_main_branch('%s.%s')", database, tableName)); + assertThat(branchManager.mainBranch()).isEqualTo("main"); + } }