Skip to content

Commit 8e4cb76

Browse files
committed
Manage main branch for paimon
1 parent 4b86018 commit 8e4cb76

File tree

12 files changed

+290
-29
lines changed

12 files changed

+290
-29
lines changed

paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,7 @@ public Optional<TableSchema> latest() {
9797
}
9898

9999
public Optional<TableSchema> latest(String branchName) {
100-
Path directoryPath =
101-
branchName.equals(DEFAULT_MAIN_BRANCH)
102-
? schemaDirectory()
103-
: branchSchemaDirectory(branchName);
100+
Path directoryPath = schemaDirectory(branchName);
104101
try {
105102
return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX)
106103
.reduce(Math::max)
@@ -498,21 +495,24 @@ public static TableSchema fromPath(FileIO fileIO, Path path) {
498495
}
499496

500497
private Path schemaDirectory() {
501-
return new Path(tableRoot + "/schema");
498+
return schemaDirectory(DEFAULT_MAIN_BRANCH);
502499
}
503500

504501
@VisibleForTesting
505502
public Path toSchemaPath(long id) {
506-
return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
503+
return toSchemaPath(DEFAULT_MAIN_BRANCH, id);
507504
}
508505

509-
public Path branchSchemaDirectory(String branchName) {
510-
return new Path(getBranchPath(tableRoot, branchName) + "/schema");
506+
public Path schemaDirectory(String branchName) {
507+
return new Path(getBranchPath(fileIO, tableRoot, branchName) + "/schema");
511508
}
512509

513-
public Path branchSchemaPath(String branchName, long schemaId) {
510+
public Path toSchemaPath(String branchName, long schemaId) {
514511
return new Path(
515-
getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId);
512+
getBranchPath(fileIO, tableRoot, branchName)
513+
+ "/schema/"
514+
+ SCHEMA_PREFIX
515+
+ schemaId);
516516
}
517517

518518
/**

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,21 @@ public void deleteBranch(String branchName) {
544544
branchManager().deleteBranch(branchName);
545545
}
546546

547+
@Override
548+
public void cleanMainBranchFile() {
549+
branchManager().cleanMainBranchFile();
550+
}
551+
552+
@Override
553+
public void replaceMainBranch(String branchName) {
554+
branchManager().commitMainBranch(branchName);
555+
}
556+
557+
@Override
558+
public void mainBranch() {
559+
branchManager().mainBranch();
560+
}
561+
547562
@Override
548563
public void rollbackTo(String tagName) {
549564
TagManager tagManager = tagManager();

paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,30 @@ default void deleteBranch(String branchName) {
182182
this.getClass().getSimpleName()));
183183
}
184184

185+
@Override
186+
default void mainBranch() {
187+
throw new UnsupportedOperationException(
188+
String.format(
189+
"Readonly Table %s does not support mainBranch.",
190+
this.getClass().getSimpleName()));
191+
}
192+
193+
@Override
194+
default void cleanMainBranchFile() {
195+
throw new UnsupportedOperationException(
196+
String.format(
197+
"Readonly Table %s does not support cleanMainBranchFile.",
198+
this.getClass().getSimpleName()));
199+
}
200+
201+
@Override
202+
default void replaceMainBranch(String branchName) {
203+
throw new UnsupportedOperationException(
204+
String.format(
205+
"Readonly Table %s does not support setMainBranch.",
206+
this.getClass().getSimpleName()));
207+
}
208+
185209
@Override
186210
default ExpireSnapshots newExpireSnapshots() {
187211
throw new UnsupportedOperationException(

paimon-core/src/main/java/org/apache/paimon/table/Table.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,14 @@ public interface Table extends Serializable {
111111
@Experimental
112112
void deleteBranch(String branchName);
113113

114+
/** Replace main branch. */
115+
@Experimental
116+
void replaceMainBranch(String branchName);
117+
118+
void cleanMainBranchFile();
119+
120+
void mainBranch();
121+
114122
/** Manually expire snapshots, parameters can be controlled independently of table options. */
115123
@Experimental
116124
ExpireSnapshots newExpireSnapshots();

paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class BranchManager {
4949

5050
public static final String BRANCH_PREFIX = "branch-";
5151
public static final String DEFAULT_MAIN_BRANCH = "main";
52+
public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH";
5253

5354
private final FileIO fileIO;
5455
private final Path tablePath;
@@ -75,13 +76,69 @@ public Path branchDirectory() {
7576
}
7677

7778
/** Return the path string of a branch. */
78-
public static String getBranchPath(Path tablePath, String branchName) {
79+
public static String getBranchPath(FileIO fileIO, Path tablePath, String branchName) {
80+
if (branchName.equals(DEFAULT_MAIN_BRANCH)) {
81+
Path path = new Path(tablePath, MAIN_BRANCH_FILE);
82+
try {
83+
if (fileIO.exists(path)) {
84+
String data = fileIO.readFileUtf8(path);
85+
if (StringUtils.isBlank(data)) {
86+
return tablePath.toString();
87+
} else {
88+
return tablePath.toString() + "/branch/" + BRANCH_PREFIX + data;
89+
}
90+
} else {
91+
return tablePath.toString();
92+
}
93+
} catch (IOException e) {
94+
throw new RuntimeException(e);
95+
}
96+
}
7997
return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName;
8098
}
8199

100+
/** Get main branch. */
101+
public String mainBranch() {
102+
Path path = new Path(tablePath, MAIN_BRANCH_FILE);
103+
try {
104+
if (fileIO.exists(path)) {
105+
return fileIO.readFileUtf8(path);
106+
} else {
107+
return DEFAULT_MAIN_BRANCH;
108+
}
109+
} catch (IOException e) {
110+
throw new RuntimeException(e);
111+
}
112+
}
113+
82114
/** Return the path of a branch. */
83115
public Path branchPath(String branchName) {
84-
return new Path(getBranchPath(tablePath, branchName));
116+
return new Path(getBranchPath(fileIO, tablePath, branchName));
117+
}
118+
119+
/** Replace main by specify branch. */
120+
public void commitMainBranch(String branchName) {
121+
Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
122+
try {
123+
fileIO.delete(mainBranchFile, false);
124+
fileIO.overwriteFileUtf8(mainBranchFile, branchName);
125+
} catch (IOException e) {
126+
throw new RuntimeException(
127+
String.format(
128+
"Exception occurs when set main branch '%s' (directory in %s).",
129+
branchName, tablePath.toString()),
130+
e);
131+
}
132+
}
133+
134+
/** Clean the main branch file and use default. */
135+
public void cleanMainBranchFile() {
136+
Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
137+
try {
138+
fileIO.delete(mainBranchFile, false);
139+
} catch (IOException e) {
140+
throw new RuntimeException("Exception occurs when clean main branch file.", e);
141+
}
85142
}
86143

87144
/** Create empty branch. */
@@ -101,12 +158,12 @@ public void createBranch(String branchName) {
101158
TableSchema latestSchema = schemaManager.latest().get();
102159
fileIO.copyFileUtf8(
103160
schemaManager.toSchemaPath(latestSchema.id()),
104-
schemaManager.branchSchemaPath(branchName, latestSchema.id()));
161+
schemaManager.toSchemaPath(branchName, latestSchema.id()));
105162
} catch (IOException e) {
106163
throw new RuntimeException(
107164
String.format(
108165
"Exception occurs when create branch '%s' (directory in %s).",
109-
branchName, getBranchPath(tablePath, branchName)),
166+
branchName, getBranchPath(fileIO, tablePath, branchName)),
110167
e);
111168
}
112169
}
@@ -133,12 +190,12 @@ public void createBranch(String branchName, long snapshotId) {
133190
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
134191
fileIO.copyFileUtf8(
135192
schemaManager.toSchemaPath(snapshot.schemaId()),
136-
schemaManager.branchSchemaPath(branchName, snapshot.schemaId()));
193+
schemaManager.toSchemaPath(branchName, snapshot.schemaId()));
137194
} catch (IOException e) {
138195
throw new RuntimeException(
139196
String.format(
140197
"Exception occurs when create branch '%s' (directory in %s).",
141-
branchName, getBranchPath(tablePath, branchName)),
198+
branchName, getBranchPath(fileIO, tablePath, branchName)),
142199
e);
143200
}
144201
}
@@ -162,18 +219,18 @@ public void createBranch(String branchName, String tagName) {
162219
try {
163220
// Copy the corresponding tag, snapshot and schema files into the branch directory
164221
fileIO.copyFileUtf8(
165-
tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName));
222+
tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName));
166223
fileIO.copyFileUtf8(
167224
snapshotManager.snapshotPath(snapshot.id()),
168225
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
169226
fileIO.copyFileUtf8(
170227
schemaManager.toSchemaPath(snapshot.schemaId()),
171-
schemaManager.branchSchemaPath(branchName, snapshot.schemaId()));
228+
schemaManager.toSchemaPath(branchName, snapshot.schemaId()));
172229
} catch (IOException e) {
173230
throw new RuntimeException(
174231
String.format(
175232
"Exception occurs when create branch '%s' (directory in %s).",
176-
branchName, getBranchPath(tablePath, branchName)),
233+
branchName, getBranchPath(fileIO, tablePath, branchName)),
177234
e);
178235
}
179236
}
@@ -187,7 +244,7 @@ public void deleteBranch(String branchName) {
187244
LOG.info(
188245
String.format(
189246
"Deleting the branch failed due to an exception in deleting the directory %s. Please try again.",
190-
getBranchPath(tablePath, branchName)),
247+
getBranchPath(fileIO, tablePath, branchName)),
191248
e);
192249
}
193250
}
@@ -239,8 +296,7 @@ public List<TableBranch> branches() {
239296
}
240297
FileStoreTable branchTable =
241298
FileStoreTableFactory.create(
242-
fileIO, new Path(getBranchPath(tablePath, branchName)));
243-
299+
fileIO, new Path(getBranchPath(fileIO, tablePath, branchName)));
244300
SortedMap<Snapshot, List<String>> snapshotTags = branchTable.tagManager().tags();
245301
Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId();
246302
if (snapshotTags.isEmpty()) {

paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,15 @@ public Path snapshotPath(long snapshotId) {
9494
}
9595

9696
public Path branchSnapshotDirectory(String branchName) {
97-
return new Path(getBranchPath(tablePath, branchName) + "/snapshot");
97+
return new Path(getBranchPath(fileIO, tablePath, branchName) + "/snapshot");
9898
}
9999

100100
public Path branchSnapshotPath(String branchName, long snapshotId) {
101101
return new Path(
102-
getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
102+
getBranchPath(fileIO, tablePath, branchName)
103+
+ "/snapshot/"
104+
+ SNAPSHOT_PREFIX
105+
+ snapshotId);
103106
}
104107

105108
public Path snapshotPathByBranch(String branchName, long snapshotId) {

paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.function.Predicate;
4646
import java.util.stream.Collectors;
4747

48+
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
4849
import static org.apache.paimon.utils.BranchManager.getBranchPath;
4950
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
5051
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -66,7 +67,11 @@ public TagManager(FileIO fileIO, Path tablePath) {
6667

6768
/** Return the root Directory of tags. */
6869
public Path tagDirectory() {
69-
return new Path(tablePath + "/tag");
70+
return tagDirectory(DEFAULT_MAIN_BRANCH);
71+
}
72+
73+
public Path tagDirectory(String branchName) {
74+
return new Path(getBranchPath(fileIO, tablePath, branchName) + "/tag");
7075
}
7176

7277
/** Return the path of a tag. */
@@ -75,8 +80,9 @@ public Path tagPath(String tagName) {
7580
}
7681

7782
/** Return the path of a tag in branch. */
78-
public Path branchTagPath(String branchName, String tagName) {
79-
return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName);
83+
public Path tagPath(String branchName, String tagName) {
84+
return new Path(
85+
getBranchPath(fileIO, tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName);
8086
}
8187

8288
/** Create a tag from given snapshot and save it in the storage. */

paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1019,7 +1019,7 @@ public void testCreateBranch() throws Exception {
10191019
// verify test-tag in test-branch is equal to snapshot 2
10201020
Snapshot branchTag =
10211021
Snapshot.fromPath(
1022-
new TraceableFileIO(), tagManager.branchTagPath("test-branch", "test-tag"));
1022+
new TraceableFileIO(), tagManager.tagPath("test-branch", "test-tag"));
10231023
assertThat(branchTag.equals(snapshot2)).isTrue();
10241024

10251025
// verify snapshot in test-branch is equal to snapshot 2
@@ -1034,7 +1034,7 @@ public void testCreateBranch() throws Exception {
10341034
SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath);
10351035
TableSchema branchSchema =
10361036
SchemaManager.fromPath(
1037-
new TraceableFileIO(), schemaManager.branchSchemaPath("test-branch", 0));
1037+
new TraceableFileIO(), schemaManager.toSchemaPath("test-branch", 0));
10381038
TableSchema schema0 = schemaManager.schema(0);
10391039
assertThat(branchSchema.equals(schema0)).isTrue();
10401040
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.procedure;
20+
21+
import org.apache.paimon.catalog.Catalog;
22+
import org.apache.paimon.catalog.Identifier;
23+
import org.apache.paimon.table.Table;
24+
25+
import org.apache.flink.table.procedure.ProcedureContext;
26+
27+
/**
28+
* Clean main branch procedure. Usage:
29+
*
30+
* <pre><code>
31+
* CALL sys.clean_main_branch('tableId')
32+
* </code></pre>
33+
*/
34+
public class CleanMainBranchProcedure extends ProcedureBase {
35+
36+
public static final String IDENTIFIER = "clean_main_branch";
37+
38+
@Override
39+
public String identifier() {
40+
return IDENTIFIER;
41+
}
42+
43+
public String[] call(ProcedureContext procedureContext, String tableId)
44+
throws Catalog.TableNotExistException {
45+
Table table = catalog.getTable(Identifier.fromString(tableId));
46+
table.cleanMainBranchFile();
47+
48+
return new String[] {"Success"};
49+
}
50+
}

0 commit comments

Comments
 (0)