Skip to content

Commit

Permalink
support flink write branch
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed May 18, 2024
1 parent a232302 commit 750797f
Show file tree
Hide file tree
Showing 36 changed files with 422 additions and 274 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
<td>Boolean</td>
<td>Whether to create underlying storage when reading and writing the table.</td>
</tr>
<tr>
<td><h5>branch</h5></td>
<td style="word-wrap: break-word;">"main"</td>
<td>String</td>
<td>Specify branch name.</td>
</tr>
<tr>
<td><h5>bucket</h5></td>
<td style="word-wrap: break-word;">-1</td>
Expand Down
14 changes: 14 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The file path of this table in the filesystem.");

public static final ConfigOption<String> BRANCH =
key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");

public static final String FILE_FORMAT_ORC = "orc";
public static final String FILE_FORMAT_AVRO = "avro";
public static final String FILE_FORMAT_PARQUET = "parquet";
Expand Down Expand Up @@ -1178,6 +1181,17 @@ public Path path() {
return path(options.toMap());
}

public String branch() {
return branch(options.toMap());
}

public static String branch(Map<String, String> options) {
if (options.containsKey(BRANCH.key())) {
return options.get(BRANCH.key());
}
return BRANCH.defaultValue();
}

public static Path path(Map<String, String> options) {
return new Path(options.get(PATH.key()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@
import java.util.Comparator;
import java.util.List;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/**
* Base {@link FileStore} implementation.
*
Expand Down Expand Up @@ -104,7 +102,7 @@ public FileStorePathFactory pathFactory() {

@Override
public SnapshotManager snapshotManager() {
return new SnapshotManager(fileIO, options.path());
return new SnapshotManager(fileIO, options.path(), options.branch());
}

@Override
Expand Down Expand Up @@ -175,10 +173,6 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {

@Override
public FileStoreCommitImpl newCommit(String commitUser) {
return newCommit(commitUser, DEFAULT_MAIN_BRANCH);
}

public FileStoreCommitImpl newCommit(String commitUser, String branchName) {
return new FileStoreCommitImpl(
fileIO,
schemaManager,
Expand All @@ -196,7 +190,7 @@ public FileStoreCommitImpl newCommit(String commitUser, String branchName) {
options.manifestMergeMinCount(),
partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(),
newKeyComparator(),
branchName,
options.branch(),
newStatsFileHandler());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/** {@link FileStore} for reading and writing {@link InternalRow}. */
public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {
Expand Down Expand Up @@ -71,11 +70,7 @@ public BucketMode bucketMode() {

@Override
public AppendOnlyFileStoreScan newScan() {
return newScan(DEFAULT_MAIN_BRANCH);
}

public AppendOnlyFileStoreScan newScan(String branchName) {
return newScan(false, branchName);
return newScan(false);
}

@Override
Expand Down Expand Up @@ -106,12 +101,12 @@ public AppendOnlyFileStoreWrite newWrite(
rowType,
pathFactory(),
snapshotManager(),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
newScan(true).withManifestCacheFilter(manifestFilter),
options,
tableName);
}

private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) {
private AppendOnlyFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
Expand Down Expand Up @@ -146,7 +141,6 @@ public void pushdown(Predicate predicate) {
options.bucket(),
forWrite,
options.scanManifestParallelism(),
branchName,
options.fileIndexReadEnabled());
}

Expand Down
4 changes: 0 additions & 4 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public interface FileStore<T> extends Serializable {

FileStoreScan newScan();

FileStoreScan newScan(String branchName);

ManifestList.Factory manifestListFactory();

ManifestFile.Factory manifestFileFactory();
Expand All @@ -81,8 +79,6 @@ public interface FileStore<T> extends Serializable {

FileStoreCommit newCommit(String commitUser);

FileStoreCommit newCommit(String commitUser, String branchName);

SnapshotDeletion newSnapshotDeletion();

TagManager newTagManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** {@link FileStore} for querying and updating {@link KeyValue}s. */
Expand Down Expand Up @@ -112,11 +111,7 @@ public BucketMode bucketMode() {

@Override
public KeyValueFileStoreScan newScan() {
return newScan(DEFAULT_MAIN_BRANCH);
}

public KeyValueFileStoreScan newScan(String branchName) {
return newScan(false, branchName);
return newScan(false);
}

@Override
Expand Down Expand Up @@ -185,7 +180,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
pathFactory(),
format2PathFactory(),
snapshotManager(),
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
newScan(true).withManifestCacheFilter(manifestFilter),
indexFactory,
deletionVectorsMaintainerFactory,
options,
Expand All @@ -209,7 +204,7 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
return pathFactoryMap;
}

private KeyValueFileStoreScan newScan(boolean forWrite, String branchName) {
private KeyValueFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
Expand Down Expand Up @@ -240,7 +235,6 @@ public void pushdown(Predicate keyFilter) {
options.bucket(),
forWrite,
options.scanManifestParallelism(),
branchName,
options.deletionVectorsEnabled(),
options.mergeEngine());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}
return table;
} else {
return getDataTable(identifier);
Table table = getDataTable(identifier);
return table;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private final SchemaManager schemaManager;
private final TableSchema schema;
protected final ScanBucketFilter bucketKeyFilter;
private final String branchName;

private PartitionPredicate partitionFilter;
private Snapshot specifiedSnapshot = null;
Expand All @@ -102,8 +101,7 @@ public AbstractFileStoreScan(
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
String branchName) {
Integer scanManifestParallelism) {
this.partitionType = partitionType;
this.bucketKeyFilter = bucketKeyFilter;
this.snapshotManager = snapshotManager;
Expand All @@ -115,7 +113,6 @@ public AbstractFileStoreScan(
this.checkNumOfBuckets = checkNumOfBuckets;
this.tableSchemas = new ConcurrentHashMap<>();
this.scanManifestParallelism = scanManifestParallelism;
this.branchName = branchName;
}

@Override
Expand Down Expand Up @@ -397,7 +394,7 @@ private Pair<Snapshot, List<ManifestFileMeta>> readManifests() {
if (manifests == null) {
snapshot =
specifiedSnapshot == null
? snapshotManager.latestSnapshot(branchName)
? snapshotManager.latestSnapshot()
: specifiedSnapshot;
if (snapshot == null) {
manifests = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public AppendOnlyFileStoreScan(
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
String branchName,
boolean fileIndexReadEnabled) {
super(
partitionType,
Expand All @@ -74,8 +73,7 @@ public AppendOnlyFileStoreScan(
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism,
branchName);
scanManifestParallelism);
this.simpleStatsConverters =
new SimpleStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id());
this.fileIndexReadEnabled = fileIndexReadEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public void commit(
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from
// files.
latestSnapshot = snapshotManager.latestSnapshot(branchName);
latestSnapshot = snapshotManager.latestSnapshot();
if (latestSnapshot != null && checkAppendFiles) {
// it is possible that some partitions only have compact changes,
// so we need to contain all changes
Expand Down Expand Up @@ -654,7 +654,7 @@ private int tryCommit(
@Nullable String statsFileName) {
int cnt = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot(branchName);
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
cnt++;
if (tryCommitOnce(
tableFiles,
Expand Down Expand Up @@ -754,7 +754,7 @@ public boolean tryCommitOnce(
Path newSnapshotPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotManager.snapshotPath(newSnapshotId)
: snapshotManager.branchSnapshotPath(branchName, newSnapshotId);
: snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshotId);

if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
Expand Down Expand Up @@ -839,7 +839,7 @@ public boolean tryCommitOnce(
newIndexManifest = indexManifest;
}

long latestSchemaId = schemaManager.latest(branchName).get().id();
long latestSchemaId = schemaManager.latest().get().id();

// write new stats or inherit from the previous snapshot
String statsFileName = null;
Expand Down Expand Up @@ -904,7 +904,7 @@ public boolean tryCommitOnce(
boolean committed =
fileIO.writeFileUtf8(newSnapshotPath, newSnapshot.toJson());
if (committed) {
snapshotManager.commitLatestHint(newSnapshotId, branchName);
snapshotManager.commitLatestHint(newSnapshotId);
}
return committed;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public KeyValueFileStoreScan(
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
String branchName,
boolean deletionVectorsEnabled,
MergeEngine mergeEngine) {
super(
Expand All @@ -75,8 +74,7 @@ public KeyValueFileStoreScan(
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
scanManifestParallelism,
branchName);
scanManifestParallelism);
this.fieldKeyStatsConverters =
new SimpleStatsConverters(
sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ public FileStoreScan newScan() {
return wrapped.newScan();
}

@Override
public FileStoreScan newScan(String branchName) {
privilegeChecker.assertCanSelect(identifier);
return wrapped.newScan(branchName);
}

@Override
public ManifestList.Factory manifestListFactory() {
return wrapped.manifestListFactory();
Expand Down Expand Up @@ -144,12 +138,6 @@ public FileStoreCommit newCommit(String commitUser) {
return wrapped.newCommit(commitUser);
}

@Override
public FileStoreCommit newCommit(String commitUser, String branchName) {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newCommit(commitUser, branchName);
}

@Override
public SnapshotDeletion newSnapshotDeletion() {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ public SnapshotReader newSnapshotReader() {
return wrapped.newSnapshotReader();
}

@Override
public SnapshotReader newSnapshotReader(String branchName) {
privilegeChecker.assertCanSelect(identifier);
return wrapped.newSnapshotReader(branchName);
}

@Override
public CoreOptions coreOptions() {
return wrapped.coreOptions();
Expand Down Expand Up @@ -270,12 +264,6 @@ public TableCommitImpl newCommit(String commitUser) {
return wrapped.newCommit(commitUser);
}

@Override
public TableCommitImpl newCommit(String commitUser, String branchName) {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newCommit(commitUser, branchName);
}

@Override
public LocalTableQuery newLocalTableQuery() {
privilegeChecker.assertCanSelect(identifier);
Expand Down
Loading

0 comments on commit 750797f

Please sign in to comment.