Skip to content

Commit

Permalink
[Feature] Support create a empty branch and create a branch based on …
Browse files Browse the repository at this point in the history
…snapshotId (apache#2938)

* support create empty branch and create a branch based on snapshotId
  • Loading branch information
sunxiaojian authored Apr 24, 2024
1 parent 9969b37 commit 4b878fe
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ public class TableBranch {

private final long createTime;

public TableBranch(String branchName, Long createdFromSnapshot, long createTime) {
this.branchName = branchName;
this.createdFromTag = null;
this.createdFromSnapshot = createdFromSnapshot;
this.createTime = createTime;
}

public TableBranch(String branchName, long createTime) {
this.branchName = branchName;
this.createdFromTag = null;
this.createdFromSnapshot = null;
this.createTime = createTime;
}

public TableBranch(
String branchName, String createdFromTag, Long createdFromSnapshot, long createTime) {
this.branchName = branchName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,16 @@ public void deleteTag(String tagName) {
store().createTagCallbacks());
}

@Override
public void createBranch(String branchName) {
branchManager().createBranch(branchName);
}

@Override
public void createBranch(String branchName, long snapshotId) {
branchManager().createBranch(branchName, snapshotId);
}

@Override
public void createBranch(String branchName, String tagName) {
branchManager().createBranch(branchName, tagName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,22 @@ default void rollbackTo(String tagName) {
this.getClass().getSimpleName()));
}

@Override
default void createBranch(String branchName) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support create empty branch.",
this.getClass().getSimpleName()));
}

@Override
default void createBranch(String branchName, long snapshotId) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support createBranch with snapshotId.",
this.getClass().getSimpleName()));
}

@Override
default void createBranch(String branchName, String tagName) {
throw new UnsupportedOperationException(
Expand Down
8 changes: 8 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ public interface Table extends Serializable {
@Experimental
void rollbackTo(String tagName);

/** Create a empty branch. */
@Experimental
void createBranch(String branchName);

/** Create a branch from given snapshot. */
@Experimental
void createBranch(String branchName, long snapshotId);

/** Create a branch from given tag. */
@Experimental
void createBranch(String branchName, String tagName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;

Expand All @@ -33,6 +34,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.SortedMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -82,6 +84,65 @@ public Path branchPath(String branchName) {
return new Path(getBranchPath(tablePath, branchName));
}

/** Create empty branch. */
public void createBranch(String branchName) {
checkArgument(
!branchName.equals(DEFAULT_MAIN_BRANCH),
String.format(
"Branch name '%s' is the default branch and cannot be used.",
DEFAULT_MAIN_BRANCH));
checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
checkArgument(
!branchName.chars().allMatch(Character::isDigit),
"Branch name cannot be pure numeric string but is '%s'.",
branchName);
try {
TableSchema latestSchema = schemaManager.latest().get();
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(latestSchema.id()),
schemaManager.branchSchemaPath(branchName, latestSchema.id()));
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
e);
}
}

public void createBranch(String branchName, long snapshotId) {
checkArgument(
!branchName.equals(DEFAULT_MAIN_BRANCH),
String.format(
"Branch name '%s' is the default branch and cannot be used.",
DEFAULT_MAIN_BRANCH));
checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
checkArgument(
!branchName.chars().allMatch(Character::isDigit),
"Branch name cannot be pure numeric string but is '%s'.",
branchName);

Snapshot snapshot = snapshotManager.snapshot(snapshotId);

try {
// Copy the corresponding snapshot and schema files into the branch directory
fileIO.copyFileUtf8(
snapshotManager.snapshotPath(snapshotId),
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(snapshot.schemaId()),
schemaManager.branchSchemaPath(branchName, snapshot.schemaId()));
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
e);
}
}

public void createBranch(String branchName, String tagName) {
checkArgument(
!branchName.equals(DEFAULT_MAIN_BRANCH),
Expand Down Expand Up @@ -170,15 +231,34 @@ public List<TableBranch> branches() {
new PriorityQueue<>(Comparator.comparingLong(TableBranch::getCreateTime));
for (Pair<Path, Long> path : paths) {
String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length());
Optional<TableSchema> tableSchema = schemaManager.latest(branchName);
if (!tableSchema.isPresent()) {
// Support empty branch.
pq.add(new TableBranch(branchName, path.getValue()));
continue;
}
FileStoreTable branchTable =
FileStoreTableFactory.create(
fileIO, new Path(getBranchPath(tablePath, branchName)));

SortedMap<Snapshot, List<String>> snapshotTags = branchTable.tagManager().tags();
checkArgument(!snapshotTags.isEmpty());
Snapshot snapshot = snapshotTags.firstKey();
List<String> tags = snapshotTags.get(snapshot);
checkArgument(tags.size() == 1);
pq.add(new TableBranch(branchName, tags.get(0), snapshot.id(), path.getValue()));
Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId();
if (snapshotTags.isEmpty()) {
// Create based on snapshotId.
pq.add(new TableBranch(branchName, earliestSnapshotId, path.getValue()));
} else {
Snapshot snapshot = snapshotTags.firstKey();
if (earliestSnapshotId == snapshot.id()) {
List<String> tags = snapshotTags.get(snapshot);
checkArgument(tags.size() == 1);
pq.add(
new TableBranch(
branchName, tags.get(0), snapshot.id(), path.getValue()));
} else {
// Create based on snapshotId.
pq.add(new TableBranch(branchName, earliestSnapshotId, path.getValue()));
}
}
}

List<TableBranch> branches = new ArrayList<>(pq.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.procedure.ProcedureContext;

/**
Expand All @@ -43,13 +44,30 @@ public String identifier() {
public String[] call(
ProcedureContext procedureContext, String tableId, String branchName, String tagName)
throws Catalog.TableNotExistException {
return innerCall(tableId, branchName, tagName);
return innerCall(tableId, branchName, tagName, 0);
}

private String[] innerCall(String tableId, String branchName, String tagName)
public String[] call(ProcedureContext procedureContext, String tableId, String branchName)
throws Catalog.TableNotExistException {
return innerCall(tableId, branchName, null, 0);
}

public String[] call(
ProcedureContext procedureContext, String tableId, String branchName, long snapshotId)
throws Catalog.TableNotExistException {
return innerCall(tableId, branchName, null, snapshotId);
}

private String[] innerCall(String tableId, String branchName, String tagName, long snapshotId)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.createBranch(branchName, tagName);
if (!StringUtils.isBlank(tagName)) {
table.createBranch(branchName, tagName);
} else if (snapshotId > 0) {
table.createBranch(branchName, snapshotId);
} else {
table.createBranch(branchName);
}
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,84 @@ void testCreateAndDeleteBranch() throws Exception {
"CALL sys.delete_branch('%s.%s', 'branch_name')", database, tableName));
assertThat(branchManager.branchExists("branch_name")).isFalse();
}

@Test
void testCreateAndDeleteBranchWithSnapshotId() throws Exception {

init(warehouse);

RowType rowType =
RowType.of(
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
new String[] {"k", "v"});
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyMap());

StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(1L, BinaryString.fromString("Hi")));
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(3L, BinaryString.fromString("Paimon")));

BranchManager branchManager = table.branchManager();

callProcedure(
String.format(
"CALL sys.create_branch('%s.%s', 'branch_name_with_snapshotId', 2)",
database, tableName));
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isTrue();
branchManager.branches();

callProcedure(
String.format(
"CALL sys.delete_branch('%s.%s', 'branch_name_with_snapshotId')",
database, tableName));
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isFalse();
}

@Test
void testCreateAndDeleteEmptyBranch() throws Exception {

init(warehouse);

RowType rowType =
RowType.of(
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
new String[] {"k", "v"});
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyMap());

StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(1L, BinaryString.fromString("Hi")));
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(3L, BinaryString.fromString("Paimon")));

BranchManager branchManager = table.branchManager();
callProcedure(
String.format(
"CALL sys.create_branch('%s.%s', 'empty_branch_name')",
database, tableName));
assertThat(branchManager.branchExists("empty_branch_name")).isTrue();

callProcedure(
String.format(
"CALL sys.delete_branch('%s.%s', 'empty_branch_name')",
database, tableName));
assertThat(branchManager.branchExists("empty_branch_name")).isFalse();
}
}

0 comments on commit 4b878fe

Please sign in to comment.