From 5f12a00eb7998dfd616b0bce05333011b8e92e7d Mon Sep 17 00:00:00 2001
From: sunxiaojian <sunxiaojian926@163.com>
Date: Thu, 7 Mar 2024 16:41:44 +0800
Subject: [PATCH] Manage main branch for paimon

---
 .../privilege/PrivilegedFileStoreTable.java   | 23 +++++++++
 .../paimon/table/AbstractFileStoreTable.java  | 14 ++++++
 .../apache/paimon/table/ReadonlyTable.java    | 23 +++++++++
 .../java/org/apache/paimon/table/Table.java   | 10 ++++
 .../apache/paimon/utils/BranchManager.java    | 32 +++++++++++-
 .../procedure/CleanMainBranchProcedure.java   | 50 +++++++++++++++++++
 .../procedure/SetMainBranchProcedure.java     | 50 +++++++++++++++++++
 .../org.apache.paimon.factories.Factory       |  2 +
 .../flink/action/BranchActionITCase.java      | 47 +++++++++++++++++
 9 files changed, 249 insertions(+), 2 deletions(-)
 create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java
 create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/SetMainBranchProcedure.java

diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index e4b09df3893b..635587e22e40 100644
--- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -204,6 +204,29 @@ public void deleteBranch(String branchName) {
         wrapped.deleteBranch(branchName);
     }
 
+    /**
+     * Replace main branch.
+     *
+     * @param branchName
+     */
+    @Override
+    public void replaceMainBranch(String branchName) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.replaceMainBranch(branchName);
+    }
+
+    @Override
+    public void cleanMainBranchFile() {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.cleanMainBranchFile();
+    }
+
+    @Override
+    public void mainBranch() {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.mainBranch();
+    }
+
     @Override
     public void replaceBranch(String fromBranch) {
         privilegeChecker.assertCanInsert(identifier);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 82cc47ad5a47..57b0e5588cc2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -530,6 +530,20 @@ public void replaceBranch(String fromBranch) {
         branchManager().replaceBranch(fromBranch);
     }
 
+    public void cleanMainBranchFile() {
+        branchManager().cleanMainBranchFile();
+    }
+
+    @Override
+    public void replaceMainBranch(String branchName) {
+        branchManager().commitMainBranch(branchName);
+    }
+
+    @Override
+    public void mainBranch() {
+        branchManager().mainBranch();
+    }
+
     @Override
     public void rollbackTo(String tagName) {
         TagManager tagManager = tagManager();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index dcb62dfcbea6..3ace8160e3ed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -190,6 +190,29 @@ default void replaceBranch(String fromBranch) {
                         this.getClass().getSimpleName()));
     }
 
+    default void mainBranch() {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support mainBranch.",
+                        this.getClass().getSimpleName()));
+    }
+
+    @Override
+    default void cleanMainBranchFile() {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support cleanMainBranchFile.",
+                        this.getClass().getSimpleName()));
+    }
+
+    @Override
+    default void replaceMainBranch(String branchName) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support setMainBranch.",
+                        this.getClass().getSimpleName()));
+    }
+
     @Override
     default ExpireSnapshots newExpireSnapshots() {
         throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index d01ecc95cdb2..16d50891c637 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -114,6 +114,16 @@ public interface Table extends Serializable {
     @Experimental
     void replaceBranch(String fromBranch);
 
+    /** Replace main branch. */
+    @Experimental
+    void replaceMainBranch(String branchName);
+
+    @Experimental
+    void cleanMainBranchFile();
+
+    @Experimental
+    void mainBranch();
+
     /** Manually expire snapshots, parameters can be controlled independently of table options. */
     @Experimental
     ExpireSnapshots newExpireSnapshots();
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 9742d63ac57d..c665fe9d67a6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -74,9 +74,13 @@ public BranchManager(
     }
 
     /** Commit specify branch to main. */
-    public void commitMainBranch(String branchName) throws IOException {
+    public void commitMainBranch(String branchName) {
         Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
-        fileIO.overwriteFileUtf8(mainBranchFile, branchName);
+        try {
+            fileIO.overwriteFileUtf8(mainBranchFile, branchName);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     /** Return the root Directory of branch. */
@@ -125,11 +129,35 @@ public String defaultMainBranch() {
         }
     }
 
+    /** Get main branch. */
+    public String mainBranch() {
+        Path path = new Path(tablePath, MAIN_BRANCH_FILE);
+        try {
+            if (fileIO.exists(path)) {
+                return fileIO.readFileUtf8(path);
+            } else {
+                return DEFAULT_MAIN_BRANCH;
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     /** Return the path of a branch. */
     public Path branchPath(String branchName) {
         return new Path(getBranchPath(fileIO, tablePath, branchName));
     }
 
+    /** Clean the main branch file and use default. */
+    public void cleanMainBranchFile() {
+        Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
+        try {
+            fileIO.delete(mainBranchFile, false);
+        } catch (IOException e) {
+            throw new RuntimeException("Exception occurs when clean main branch file.", e);
+        }
+    }
+
     /** Create empty branch. */
     public void createBranch(String branchName) {
         checkArgument(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java
new file mode 100644
index 000000000000..bae7820d1012
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Clean main branch procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.clean_main_branch('tableId')
+ * </code></pre>
+ */
+public class CleanMainBranchProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "clean_main_branch";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    public String[] call(ProcedureContext procedureContext, String tableId)
+            throws Catalog.TableNotExistException {
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        table.cleanMainBranchFile();
+
+        return new String[] {"Success"};
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/SetMainBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/SetMainBranchProcedure.java
new file mode 100644
index 000000000000..ac8dc3b48da8
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/SetMainBranchProcedure.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Replace main branch procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.replace_main_branch('tableId', 'branchName')
+ * </code></pre>
+ */
+public class SetMainBranchProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "set_main_branch";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    public String[] call(ProcedureContext procedureContext, String tableId, String branchName)
+            throws Catalog.TableNotExistException {
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        table.replaceMainBranch(branchName);
+
+        return new String[] {"Success"};
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 33a43009d69b..732be3d432ef 100644
--- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -52,3 +52,5 @@ org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
 org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
 org.apache.paimon.flink.procedure.RepairProcedure
 org.apache.paimon.flink.procedure.ReplaceBranchProcedure
+org.apache.paimon.flink.procedure.SetMainBranchProcedure
+org.apache.paimon.flink.procedure.CleanMainBranchProcedure
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index 209b0d2e7bda..3ada0a20039d 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -255,4 +255,51 @@ void testReplaceBranch() throws Exception {
                 String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName));
         assertThat(tagManager.tagExists("tag3")).isTrue();
     }
+
+    @Test
+    void testReplaceMainBranchAndCleanMainBranch() throws Exception {
+
+        init(warehouse);
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        Collections.emptyList(),
+                        Collections.emptyMap());
+
+        StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // 3 snapshots
+        writeData(rowData(1L, BinaryString.fromString("Hi")));
+        writeData(rowData(2L, BinaryString.fromString("Hello")));
+        writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+        // Create tag2
+        TagManager tagManager = new TagManager(table.fileIO(), table.location());
+        callProcedure(
+                String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName));
+        assertThat(tagManager.tagExists("tag2")).isTrue();
+
+        BranchManager branchManager = table.branchManager();
+        callProcedure(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 'branch_name', 'tag2')",
+                        database, tableName));
+        assertThat(branchManager.branchExists("branch_name")).isTrue();
+
+        callProcedure(
+                String.format(
+                        "CALL sys.set_main_branch('%s.%s', 'branch_name')", database, tableName));
+        assertThat(branchManager.mainBranch()).isEqualTo("branch_name");
+
+        callProcedure(String.format("CALL sys.clean_main_branch('%s.%s')", database, tableName));
+        assertThat(branchManager.mainBranch()).isEqualTo("main");
+    }
 }