Skip to content

Commit

Permalink
Manage main branch for paimon
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Apr 24, 2024
1 parent 9f9e46a commit 5f880e3
Showing 12 changed files with 291 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -97,10 +97,7 @@ public Optional<TableSchema> latest() {
}

public Optional<TableSchema> latest(String branchName) {
Path directoryPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? schemaDirectory()
: branchSchemaDirectory(branchName);
Path directoryPath = schemaDirectory(branchName);
try {
return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX)
.reduce(Math::max)
@@ -498,21 +495,24 @@ public static TableSchema fromPath(FileIO fileIO, Path path) {
}

private Path schemaDirectory() {
return new Path(tableRoot + "/schema");
return schemaDirectory(DEFAULT_MAIN_BRANCH);
}

@VisibleForTesting
public Path toSchemaPath(long id) {
return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
return toSchemaPath(DEFAULT_MAIN_BRANCH, id);
}

public Path branchSchemaDirectory(String branchName) {
return new Path(getBranchPath(tableRoot, branchName) + "/schema");
public Path schemaDirectory(String branchName) {
return new Path(getBranchPath(fileIO, tableRoot, branchName) + "/schema");
}

public Path branchSchemaPath(String branchName, long schemaId) {
public Path toSchemaPath(String branchName, long schemaId) {
return new Path(
getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId);
getBranchPath(fileIO, tableRoot, branchName)
+ "/schema/"
+ SCHEMA_PREFIX
+ schemaId);
}

/**
Original file line number Diff line number Diff line change
@@ -544,6 +544,21 @@ public void deleteBranch(String branchName) {
branchManager().deleteBranch(branchName);
}

@Override
public void cleanMainBranchFile() {
branchManager().cleanMainBranchFile();
}

@Override
public void replaceMainBranch(String branchName) {
branchManager().commitMainBranch(branchName);
}

@Override
public void mainBranch() {
branchManager().mainBranch();
}

@Override
public void rollbackTo(String tagName) {
TagManager tagManager = tagManager();
Original file line number Diff line number Diff line change
@@ -182,6 +182,30 @@ default void deleteBranch(String branchName) {
this.getClass().getSimpleName()));
}

@Override
default void mainBranch() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support mainBranch.",
this.getClass().getSimpleName()));
}

@Override
default void cleanMainBranchFile() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support cleanMainBranchFile.",
this.getClass().getSimpleName()));
}

@Override
default void replaceMainBranch(String branchName) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support setMainBranch.",
this.getClass().getSimpleName()));
}

@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException(
8 changes: 8 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
@@ -111,6 +111,14 @@ public interface Table extends Serializable {
@Experimental
void deleteBranch(String branchName);

/** Replace main branch. */
@Experimental
void replaceMainBranch(String branchName);

void cleanMainBranchFile();

void mainBranch();

/** Manually expire snapshots, parameters can be controlled independently of table options. */
@Experimental
ExpireSnapshots newExpireSnapshots();
Original file line number Diff line number Diff line change
@@ -49,6 +49,7 @@ public class BranchManager {

public static final String BRANCH_PREFIX = "branch-";
public static final String DEFAULT_MAIN_BRANCH = "main";
public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH";

private final FileIO fileIO;
private final Path tablePath;
@@ -75,13 +76,69 @@ public Path branchDirectory() {
}

/** Return the path string of a branch. */
public static String getBranchPath(Path tablePath, String branchName) {
public static String getBranchPath(FileIO fileIO, Path tablePath, String branchName) {
if (branchName.equals(DEFAULT_MAIN_BRANCH)) {
Path path = new Path(tablePath, MAIN_BRANCH_FILE);
try {
if (fileIO.exists(path)) {
String data = fileIO.readFileUtf8(path);
if (StringUtils.isBlank(data)) {
return tablePath.toString();
} else {
return tablePath.toString() + "/branch/" + BRANCH_PREFIX + data;
}
} else {
return tablePath.toString();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName;
}

/** Get main branch. */
public String mainBranch() {
Path path = new Path(tablePath, MAIN_BRANCH_FILE);
try {
if (fileIO.exists(path)) {
return fileIO.readFileUtf8(path);
} else {
return DEFAULT_MAIN_BRANCH;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Return the path of a branch. */
public Path branchPath(String branchName) {
return new Path(getBranchPath(tablePath, branchName));
return new Path(getBranchPath(fileIO, tablePath, branchName));
}

/** Replace main by specify branch. */
public void commitMainBranch(String branchName) {
Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
try {
fileIO.delete(mainBranchFile, false);
fileIO.overwriteFileUtf8(mainBranchFile, branchName);
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when set main branch '%s' (directory in %s).",
branchName, tablePath.toString()),
e);
}
}

/** Clean the main branch file and use default. */
public void cleanMainBranchFile() {
Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
try {
fileIO.delete(mainBranchFile, false);
} catch (IOException e) {
throw new RuntimeException("Exception occurs when clean main branch file.", e);
}
}

/** Create empty branch. */
@@ -101,12 +158,12 @@ public void createBranch(String branchName) {
TableSchema latestSchema = schemaManager.latest().get();
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(latestSchema.id()),
schemaManager.branchSchemaPath(branchName, latestSchema.id()));
schemaManager.toSchemaPath(branchName, latestSchema.id()));
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
branchName, getBranchPath(fileIO, tablePath, branchName)),
e);
}
}
@@ -133,12 +190,12 @@ public void createBranch(String branchName, long snapshotId) {
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(snapshot.schemaId()),
schemaManager.branchSchemaPath(branchName, snapshot.schemaId()));
schemaManager.toSchemaPath(branchName, snapshot.schemaId()));
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
branchName, getBranchPath(fileIO, tablePath, branchName)),
e);
}
}
@@ -162,18 +219,18 @@ public void createBranch(String branchName, String tagName) {
try {
// Copy the corresponding tag, snapshot and schema files into the branch directory
fileIO.copyFileUtf8(
tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName));
tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName));
fileIO.copyFileUtf8(
snapshotManager.snapshotPath(snapshot.id()),
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(snapshot.schemaId()),
schemaManager.branchSchemaPath(branchName, snapshot.schemaId()));
schemaManager.toSchemaPath(branchName, snapshot.schemaId()));
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
branchName, getBranchPath(fileIO, tablePath, branchName)),
e);
}
}
@@ -187,7 +244,7 @@ public void deleteBranch(String branchName) {
LOG.info(
String.format(
"Deleting the branch failed due to an exception in deleting the directory %s. Please try again.",
getBranchPath(tablePath, branchName)),
getBranchPath(fileIO, tablePath, branchName)),
e);
}
}
@@ -239,8 +296,7 @@ public List<TableBranch> branches() {
}
FileStoreTable branchTable =
FileStoreTableFactory.create(
fileIO, new Path(getBranchPath(tablePath, branchName)));

fileIO, new Path(getBranchPath(fileIO, tablePath, branchName)));
SortedMap<Snapshot, List<String>> snapshotTags = branchTable.tagManager().tags();
Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId();
if (snapshotTags.isEmpty()) {
Original file line number Diff line number Diff line change
@@ -94,12 +94,15 @@ public Path snapshotPath(long snapshotId) {
}

public Path branchSnapshotDirectory(String branchName) {
return new Path(getBranchPath(tablePath, branchName) + "/snapshot");
return new Path(getBranchPath(fileIO, tablePath, branchName) + "/snapshot");
}

public Path branchSnapshotPath(String branchName, long snapshotId) {
return new Path(
getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
getBranchPath(fileIO, tablePath, branchName)
+ "/snapshot/"
+ SNAPSHOT_PREFIX
+ snapshotId);
}

public Path snapshotPathByBranch(String branchName, long snapshotId) {
Original file line number Diff line number Diff line change
@@ -45,6 +45,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -66,7 +67,11 @@ public TagManager(FileIO fileIO, Path tablePath) {

/** Return the root Directory of tags. */
public Path tagDirectory() {
return new Path(tablePath + "/tag");
return tagDirectory(DEFAULT_MAIN_BRANCH);
}

public Path tagDirectory(String branchName) {
return new Path(getBranchPath(fileIO, tablePath, branchName) + "/tag");
}

/** Return the path of a tag. */
@@ -75,8 +80,9 @@ public Path tagPath(String tagName) {
}

/** Return the path of a tag in branch. */
public Path branchTagPath(String branchName, String tagName) {
return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName);
public Path tagPath(String branchName, String tagName) {
return new Path(
getBranchPath(fileIO, tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName);
}

/** Create a tag from given snapshot and save it in the storage. */
Original file line number Diff line number Diff line change
@@ -1019,7 +1019,7 @@ public void testCreateBranch() throws Exception {
// verify test-tag in test-branch is equal to snapshot 2
Snapshot branchTag =
Snapshot.fromPath(
new TraceableFileIO(), tagManager.branchTagPath("test-branch", "test-tag"));
new TraceableFileIO(), tagManager.tagPath("test-branch", "test-tag"));
assertThat(branchTag.equals(snapshot2)).isTrue();

// verify snapshot in test-branch is equal to snapshot 2
@@ -1034,7 +1034,7 @@ public void testCreateBranch() throws Exception {
SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath);
TableSchema branchSchema =
SchemaManager.fromPath(
new TraceableFileIO(), schemaManager.branchSchemaPath("test-branch", 0));
new TraceableFileIO(), schemaManager.toSchemaPath("test-branch", 0));
TableSchema schema0 = schemaManager.schema(0);
assertThat(branchSchema.equals(schema0)).isTrue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.flink.table.procedure.ProcedureContext;

/**
* Clean main branch procedure. Usage:
*
* <pre><code>
* CALL sys.clean_main_branch('tableId')
* </code></pre>
*/
public class CleanMainBranchProcedure extends ProcedureBase {

public static final String IDENTIFIER = "clean_main_branch";

@Override
public String identifier() {
return IDENTIFIER;
}

public String[] call(ProcedureContext procedureContext, String tableId)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.cleanMainBranchFile();

return new String[] {"Success"};
}
}
Loading

0 comments on commit 5f880e3

Please sign in to comment.