Skip to content

Commit 1ca62ef

Browse files
committed
merge branch
1 parent f1fa6d6 commit 1ca62ef

File tree

5 files changed

+37
-31
lines changed

5 files changed

+37
-31
lines changed

paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,17 @@ public void deleteBranch(String branchName) {
204204
wrapped.deleteBranch(branchName);
205205
}
206206

207+
/**
208+
* Merge a branch to main branch.
209+
*
210+
* @param branchName
211+
*/
212+
@Override
213+
public void mergeBranch(String branchName) {
214+
privilegeChecker.assertCanInsert(identifier);
215+
wrapped.mergeBranch(branchName);
216+
}
217+
207218
@Override
208219
public void replaceBranch(String fromBranch) {
209220
privilegeChecker.assertCanInsert(identifier);

paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.stream.Stream;
4646

4747
import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
48+
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
4849
import static org.apache.paimon.utils.Preconditions.checkArgument;
4950

5051
/** Manager for {@code Branch}. */
@@ -357,12 +358,13 @@ public boolean fileExists(Path path) {
357358

358359
public void mergeBranch(String branchName) {
359360
checkArgument(
360-
!branchName.equals(DEFAULT_MAIN_BRANCH),
361+
!isMainBranch(branchName),
361362
"Branch name '%s' do not use in merge branch.",
362363
branchName);
363364
checkArgument(branchExists(branchName), "Branch name '%s' doesn't exist.", branchName);
364-
Long earliestSnapshotId = snapshotManager.earliestSnapshotId(branchName);
365-
Snapshot earliestSnapshot = snapshotManager.snapshot(branchName, earliestSnapshotId);
365+
Long earliestSnapshotId = snapshotManager.copyWithBranch(branchName).earliestSnapshotId();
366+
Snapshot earliestSnapshot =
367+
snapshotManager.copyWithBranch(branchName).snapshot(earliestSnapshotId);
366368
long earliestSchemaId = earliestSnapshot.schemaId();
367369

368370
try {
@@ -404,21 +406,23 @@ public void mergeBranch(String branchName) {
404406

405407
// Delete latest snapshot
406408
snapshotManager.deleteLatestHint();
407-
408409
fileIO.deleteFilesQuietly(deletePaths);
410+
409411
fileIO.copyFilesUtf8(
410-
snapshotManager.branchSnapshotDirectory(branchName),
412+
snapshotManager.copyWithBranch(branchName).snapshotDirectory(),
411413
snapshotManager.snapshotDirectory());
412414
fileIO.copyFilesUtf8(
413-
schemaManager.branchSchemaDirectory(branchName),
415+
schemaManager.copyWithBranch(branchName).schemaDirectory(),
414416
schemaManager.schemaDirectory());
415417
fileIO.copyFilesUtf8(
416-
tagManager.branchTagDirectory(branchName), tagManager.tagDirectory());
418+
tagManager.copyWithBranch(branchName).tagDirectory(),
419+
tagManager.tagDirectory());
420+
417421
} catch (IOException e) {
418422
throw new RuntimeException(
419423
String.format(
420424
"Exception occurs when merge branch '%s' (directory in %s).",
421-
branchName, getBranchPath(tablePath, branchName)),
425+
branchName, getBranchPath(fileIO, tablePath, branchName)),
422426
e);
423427
}
424428
}

paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -608,11 +608,7 @@ private Long findByListFiles(BinaryOperator<Long> reducer, Path dir, String pref
608608
}
609609

610610
public void deleteLatestHint() throws IOException {
611-
deleteLatestHint(DEFAULT_MAIN_BRANCH);
612-
}
613-
614-
public void deleteLatestHint(String branchName) throws IOException {
615-
Path snapshotDir = snapshotDirByBranch(branchName);
611+
Path snapshotDir = snapshotDirectory();
616612
Path hintFile = new Path(snapshotDir, LATEST);
617613
fileIO.delete(hintFile, false);
618614
}

paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public TagManager copyWithBranch(String branchName) {
8080
public Path tagDirectory() {
8181
return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag");
8282
}
83-
83+
8484
/** Return the path of a tag. */
8585
public Path tagPath(String tagName) {
8686
return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag/" + TAG_PREFIX + tagName);

paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,14 +1113,6 @@ public void testMergeBranch() throws Exception {
11131113

11141114
generateBranch(table);
11151115

1116-
// Verify branch1 and the main branch have the same data
1117-
assertThat(
1118-
getResult(
1119-
table.newRead(),
1120-
toSplits(table.newSnapshotReader("branch1").read().dataSplits()),
1121-
BATCH_ROW_TO_STRING))
1122-
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
1123-
11241116
// Test for unsupported branch name
11251117
assertThatThrownBy(() -> table.mergeBranch("test-branch"))
11261118
.satisfies(
@@ -1134,18 +1126,19 @@ public void testMergeBranch() throws Exception {
11341126
IllegalArgumentException.class,
11351127
"Branch name 'main' do not use in merge branch."));
11361128

1129+
FileStoreTable tableBranch = createFileStoreTable("branch1");
11371130
// Write data to branch1
1138-
try (StreamTableWrite write = table.newWrite(commitUser);
1139-
StreamTableCommit commit = table.newCommit(commitUser, "branch1")) {
1131+
try (StreamTableWrite write = tableBranch.newWrite(commitUser);
1132+
StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
11401133
write.write(rowData(2, 20, 200L));
11411134
commit.commit(1, write.prepareCommit(false, 2));
11421135
}
11431136

11441137
// Validate data in branch1
11451138
assertThat(
11461139
getResult(
1147-
table.newRead(),
1148-
toSplits(table.newSnapshotReader("branch1").read().dataSplits()),
1140+
tableBranch.newRead(),
1141+
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
11491142
BATCH_ROW_TO_STRING))
11501143
.containsExactlyInAnyOrder(
11511144
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
@@ -1176,7 +1169,8 @@ public void testMergeBranch() throws Exception {
11761169
SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath);
11771170
Snapshot branchSnapshot =
11781171
Snapshot.fromPath(
1179-
new TraceableFileIO(), snapshotManager.branchSnapshotPath("branch1", 2));
1172+
new TraceableFileIO(),
1173+
snapshotManager.copyWithBranch("branch1").snapshotPath(2));
11801174
Snapshot snapshot =
11811175
Snapshot.fromPath(new TraceableFileIO(), snapshotManager.snapshotPath(2));
11821176
assertThat(branchSnapshot.equals(snapshot)).isTrue();
@@ -1185,13 +1179,14 @@ public void testMergeBranch() throws Exception {
11851179
SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath);
11861180
TableSchema branchSchema =
11871181
SchemaManager.fromPath(
1188-
new TraceableFileIO(), schemaManager.branchSchemaPath("branch1", 0));
1182+
new TraceableFileIO(),
1183+
schemaManager.copyWithBranch("branch1").toSchemaPath(0));
11891184
TableSchema schema0 = schemaManager.schema(0);
11901185
assertThat(branchSchema.equals(schema0)).isTrue();
11911186

11921187
// Write two rows data to branch1 again
1193-
try (StreamTableWrite write = table.newWrite(commitUser);
1194-
StreamTableCommit commit = table.newCommit(commitUser, "branch1")) {
1188+
try (StreamTableWrite write = tableBranch.newWrite(commitUser);
1189+
StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
11951190
write.write(rowData(3, 30, 300L));
11961191
write.write(rowData(4, 40, 400L));
11971192
commit.commit(2, write.prepareCommit(false, 3));
@@ -1201,7 +1196,7 @@ public void testMergeBranch() throws Exception {
12011196
assertThat(
12021197
getResult(
12031198
table.newRead(),
1204-
toSplits(table.newSnapshotReader("branch1").read().dataSplits()),
1199+
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
12051200
BATCH_ROW_TO_STRING))
12061201
.containsExactlyInAnyOrder(
12071202
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",

0 commit comments

Comments
 (0)