Skip to content

Commit 17d32c9

Browse files
committed
support flink write branch
1 parent 85bd8a3 commit 17d32c9

38 files changed

+779
-265
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@
3232
<td>Boolean</td>
3333
<td>Whether to create underlying storage when reading and writing the table.</td>
3434
</tr>
35+
<tr>
36+
<td><h5>branch</h5></td>
37+
<td style="word-wrap: break-word;">"main"</td>
38+
<td>String</td>
39+
<td>Specify branch name.</td>
40+
</tr>
3541
<tr>
3642
<td><h5>bucket</h5></td>
3743
<td style="word-wrap: break-word;">-1</td>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ public class CoreOptions implements Serializable {
112112
.noDefaultValue()
113113
.withDescription("The file path of this table in the filesystem.");
114114

115+
public static final ConfigOption<String> BRANCH =
116+
key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");
117+
115118
public static final ConfigOption<FileFormatType> FILE_FORMAT =
116119
key("file.format")
117120
.enumType(FileFormatType.class)
@@ -1149,6 +1152,17 @@ public Path path() {
11491152
return path(options.toMap());
11501153
}
11511154

1155+
public String branch() {
1156+
return branch(options.toMap());
1157+
}
1158+
1159+
public static String branch(Map<String, String> options) {
1160+
if (options.containsKey(BRANCH.key())) {
1161+
return options.get(BRANCH.key());
1162+
}
1163+
return BRANCH.defaultValue();
1164+
}
1165+
11521166
public static Path path(Map<String, String> options) {
11531167
return new Path(options.get(PATH.key()));
11541168
}

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public FileStorePathFactory pathFactory() {
104104

105105
@Override
106106
public SnapshotManager snapshotManager() {
107-
return new SnapshotManager(fileIO, options.path());
107+
return new SnapshotManager(fileIO, options.path(), options.branch());
108108
}
109109

110110
@Override

paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import static org.apache.paimon.predicate.PredicateBuilder.and;
4040
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
4141
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
42-
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
4342

4443
/** {@link FileStore} for reading and writing {@link InternalRow}. */
4544
public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {
@@ -71,11 +70,7 @@ public BucketMode bucketMode() {
7170

7271
@Override
7372
public AppendOnlyFileStoreScan newScan() {
74-
return newScan(DEFAULT_MAIN_BRANCH);
75-
}
76-
77-
public AppendOnlyFileStoreScan newScan(String branchName) {
78-
return newScan(false, branchName);
73+
return newScan(false);
7974
}
8075

8176
@Override
@@ -106,12 +101,12 @@ public AppendOnlyFileStoreWrite newWrite(
106101
rowType,
107102
pathFactory(),
108103
snapshotManager(),
109-
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
104+
newScan(true).withManifestCacheFilter(manifestFilter),
110105
options,
111106
tableName);
112107
}
113108

114-
private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) {
109+
private AppendOnlyFileStoreScan newScan(boolean forWrite) {
115110
ScanBucketFilter bucketFilter =
116111
new ScanBucketFilter(bucketKeyType) {
117112
@Override
@@ -146,7 +141,6 @@ public void pushdown(Predicate predicate) {
146141
options.bucket(),
147142
forWrite,
148143
options.scanManifestParallelism(),
149-
branchName,
150144
options.fileIndexReadEnabled());
151145
}
152146

paimon-core/src/main/java/org/apache/paimon/FileStore.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@ public interface FileStore<T> extends Serializable {
6363

6464
FileStoreScan newScan();
6565

66-
FileStoreScan newScan(String branchName);
67-
6866
ManifestList.Factory manifestListFactory();
6967

7068
ManifestFile.Factory manifestFileFactory();

paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import static org.apache.paimon.predicate.PredicateBuilder.and;
5757
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
5858
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
59-
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
6059
import static org.apache.paimon.utils.Preconditions.checkArgument;
6160

6261
/** {@link FileStore} for querying and updating {@link KeyValue}s. */
@@ -112,11 +111,7 @@ public BucketMode bucketMode() {
112111

113112
@Override
114113
public KeyValueFileStoreScan newScan() {
115-
return newScan(DEFAULT_MAIN_BRANCH);
116-
}
117-
118-
public KeyValueFileStoreScan newScan(String branchName) {
119-
return newScan(false, branchName);
114+
return newScan(false);
120115
}
121116

122117
@Override
@@ -185,7 +180,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
185180
pathFactory(),
186181
format2PathFactory(),
187182
snapshotManager(),
188-
newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
183+
newScan(true).withManifestCacheFilter(manifestFilter),
189184
indexFactory,
190185
deletionVectorsMaintainerFactory,
191186
options,
@@ -209,7 +204,7 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
209204
return pathFactoryMap;
210205
}
211206

212-
private KeyValueFileStoreScan newScan(boolean forWrite, String branchName) {
207+
private KeyValueFileStoreScan newScan(boolean forWrite) {
213208
ScanBucketFilter bucketFilter =
214209
new ScanBucketFilter(bucketKeyType) {
215210
@Override
@@ -240,7 +235,6 @@ public void pushdown(Predicate keyFilter) {
240235
options.bucket(),
241236
forWrite,
242237
options.scanManifestParallelism(),
243-
branchName,
244238
options.deletionVectorsEnabled(),
245239
options.mergeEngine());
246240
}

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.paimon.table.Table;
3737
import org.apache.paimon.table.sink.BatchWriteBuilder;
3838
import org.apache.paimon.table.system.SystemTableLoader;
39+
import org.apache.paimon.utils.BranchManager;
3940
import org.apache.paimon.utils.StringUtils;
4041

4142
import javax.annotation.Nullable;
@@ -66,6 +67,7 @@ public abstract class AbstractCatalog implements Catalog {
6667
protected final FileIO fileIO;
6768
protected final Map<String, String> tableDefaultOptions;
6869
protected final Options catalogOptions;
70+
protected final String branchName;
6971

7072
@Nullable protected final LineageMetaFactory lineageMetaFactory;
7173

@@ -74,6 +76,7 @@ protected AbstractCatalog(FileIO fileIO) {
7476
this.lineageMetaFactory = null;
7577
this.tableDefaultOptions = new HashMap<>();
7678
this.catalogOptions = new Options();
79+
branchName = BranchManager.DEFAULT_MAIN_BRANCH;
7780
}
7881

7982
protected AbstractCatalog(FileIO fileIO, Options options) {
@@ -83,6 +86,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
8386
this.tableDefaultOptions =
8487
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
8588
this.catalogOptions = options;
89+
this.branchName = options.get(CoreOptions.BRANCH);
8690
}
8791

8892
@Override
@@ -325,7 +329,12 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
325329
}
326330
return table;
327331
} else {
328-
return getDataTable(identifier);
332+
Table table = getDataTable(identifier);
333+
// Override branch option
334+
if (!branchName.equals(BranchManager.DEFAULT_MAIN_BRANCH)) {
335+
table.options().put(CoreOptions.BRANCH.key(), branchName);
336+
}
337+
return table;
329338
}
330339
}
331340

paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public boolean tableExists(Identifier identifier) {
119119
}
120120

121121
private boolean tableExists(Path tablePath) {
122-
return new SchemaManager(fileIO, tablePath).listAllIds().size() > 0;
122+
return new SchemaManager(fileIO, tablePath, branchName).listAllIds().size() > 0;
123123
}
124124

125125
@Override
@@ -153,7 +153,7 @@ private SchemaManager schemaManager(Identifier identifier) {
153153
new RuntimeException(
154154
"No lock context when lock is enabled."))))
155155
.orElse(null);
156-
return new SchemaManager(fileIO, path)
156+
return new SchemaManager(fileIO, path, branchName)
157157
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
158158
}
159159

paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
288288
updateTable(connections, catalogKey, fromTable, toTable);
289289

290290
Path fromPath = getDataTableLocation(fromTable);
291-
if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) {
291+
if (new SchemaManager(fileIO, fromPath, branchName).listAllIds().size() > 0) {
292292
// Rename the file system's table directory. Maintain consistency between tables in
293293
// the file system and tables in the Hive Metastore.
294294
Path toPath = getDataTableLocation(toTable);
@@ -323,7 +323,7 @@ protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotE
323323
throw new TableNotExistException(identifier);
324324
}
325325
Path tableLocation = getDataTableLocation(identifier);
326-
return new SchemaManager(fileIO, tableLocation)
326+
return new SchemaManager(fileIO, tableLocation, branchName)
327327
.latest()
328328
.orElseThrow(
329329
() -> new RuntimeException("There is no paimon table in " + tableLocation));
@@ -374,7 +374,7 @@ public void close() throws Exception {
374374
}
375375

376376
private SchemaManager getSchemaManager(Identifier identifier) {
377-
return new SchemaManager(fileIO, getDataTableLocation(identifier))
377+
return new SchemaManager(fileIO, getDataTableLocation(identifier), branchName)
378378
.withLock(lock(identifier));
379379
}
380380

paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
7575
private final SchemaManager schemaManager;
7676
private final TableSchema schema;
7777
protected final ScanBucketFilter bucketKeyFilter;
78-
private final String branchName;
7978

8079
private PartitionPredicate partitionFilter;
8180
private Snapshot specifiedSnapshot = null;
@@ -98,8 +97,7 @@ public AbstractFileStoreScan(
9897
ManifestList.Factory manifestListFactory,
9998
int numOfBuckets,
10099
boolean checkNumOfBuckets,
101-
Integer scanManifestParallelism,
102-
String branchName) {
100+
Integer scanManifestParallelism) {
103101
this.partitionType = partitionType;
104102
this.bucketKeyFilter = bucketKeyFilter;
105103
this.snapshotManager = snapshotManager;
@@ -111,7 +109,6 @@ public AbstractFileStoreScan(
111109
this.checkNumOfBuckets = checkNumOfBuckets;
112110
this.tableSchemas = new ConcurrentHashMap<>();
113111
this.scanManifestParallelism = scanManifestParallelism;
114-
this.branchName = branchName;
115112
}
116113

117114
@Override
@@ -368,7 +365,7 @@ private Pair<Snapshot, List<ManifestFileMeta>> readManifests() {
368365
if (manifests == null) {
369366
snapshot =
370367
specifiedSnapshot == null
371-
? snapshotManager.latestSnapshot(branchName)
368+
? snapshotManager.latestSnapshot()
372369
: specifiedSnapshot;
373370
if (snapshot == null) {
374371
manifests = Collections.emptyList();

paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public AppendOnlyFileStoreScan(
6262
int numOfBuckets,
6363
boolean checkNumOfBuckets,
6464
Integer scanManifestParallelism,
65-
String branchName,
6665
boolean fileIndexReadEnabled) {
6766
super(
6867
partitionType,
@@ -74,8 +73,7 @@ public AppendOnlyFileStoreScan(
7473
manifestListFactory,
7574
numOfBuckets,
7675
checkNumOfBuckets,
77-
scanManifestParallelism,
78-
branchName);
76+
scanManifestParallelism);
7977
this.fieldStatsConverters =
8078
new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id());
8179
this.fileIndexReadEnabled = fileIndexReadEnabled;

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ public void commit(
251251
// we can skip conflict checking in tryCommit method.
252252
// This optimization is mainly used to decrease the number of times we read from
253253
// files.
254-
latestSnapshot = snapshotManager.latestSnapshot(branchName);
254+
latestSnapshot = snapshotManager.latestSnapshot();
255255
if (latestSnapshot != null && checkAppendFiles) {
256256
// it is possible that some partitions only have compact changes,
257257
// so we need to contain all changes
@@ -654,7 +654,7 @@ private int tryCommit(
654654
@Nullable String statsFileName) {
655655
int cnt = 0;
656656
while (true) {
657-
Snapshot latestSnapshot = snapshotManager.latestSnapshot(branchName);
657+
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
658658
cnt++;
659659
if (tryCommitOnce(
660660
tableFiles,
@@ -754,7 +754,7 @@ public boolean tryCommitOnce(
754754
Path newSnapshotPath =
755755
branchName.equals(DEFAULT_MAIN_BRANCH)
756756
? snapshotManager.snapshotPath(newSnapshotId)
757-
: snapshotManager.branchSnapshotPath(branchName, newSnapshotId);
757+
: snapshotManager.snapshotPath(branchName, newSnapshotId);
758758

759759
if (LOG.isDebugEnabled()) {
760760
LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
@@ -836,7 +836,7 @@ public boolean tryCommitOnce(
836836
newIndexManifest = indexManifest;
837837
}
838838

839-
long latestSchemaId = schemaManager.latest(branchName).get().id();
839+
long latestSchemaId = schemaManager.latest().get().id();
840840

841841
// write new stats or inherit from the previous snapshot
842842
String statsFileName = null;

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public KeyValueFileStoreScan(
6262
int numOfBuckets,
6363
boolean checkNumOfBuckets,
6464
Integer scanManifestParallelism,
65-
String branchName,
6665
boolean deletionVectorsEnabled,
6766
MergeEngine mergeEngine) {
6867
super(
@@ -75,8 +74,7 @@ public KeyValueFileStoreScan(
7574
manifestListFactory,
7675
numOfBuckets,
7776
checkNumOfBuckets,
78-
scanManifestParallelism,
79-
branchName);
77+
scanManifestParallelism);
8078
this.fieldKeyStatsConverters =
8179
new FieldStatsConverters(
8280
sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)),

0 commit comments

Comments
 (0)