From 750797fc9f5dd1c98608b4e3927bddbc61acc4d6 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Tue, 5 Mar 2024 19:54:54 +0800 Subject: [PATCH] support flink write branch --- .../generated/core_configuration.html | 6 + .../java/org/apache/paimon/CoreOptions.java | 14 ++ .../org/apache/paimon/AbstractFileStore.java | 10 +- .../apache/paimon/AppendOnlyFileStore.java | 12 +- .../java/org/apache/paimon/FileStore.java | 4 - .../org/apache/paimon/KeyValueFileStore.java | 12 +- .../paimon/catalog/AbstractCatalog.java | 3 +- .../operation/AbstractFileStoreScan.java | 7 +- .../operation/AppendOnlyFileStoreScan.java | 4 +- .../paimon/operation/FileStoreCommitImpl.java | 10 +- .../operation/KeyValueFileStoreScan.java | 4 +- .../paimon/privilege/PrivilegedFileStore.java | 12 -- .../privilege/PrivilegedFileStoreTable.java | 12 -- .../apache/paimon/schema/SchemaManager.java | 54 ++++---- .../paimon/table/AbstractFileStoreTable.java | 22 +--- .../org/apache/paimon/table/DataTable.java | 2 - .../apache/paimon/table/FileStoreTable.java | 2 - .../paimon/table/FileStoreTableFactory.java | 3 +- .../paimon/table/system/AuditLogTable.java | 5 - .../paimon/table/system/BucketsTable.java | 5 - .../paimon/table/system/FileMonitorTable.java | 5 - .../table/system/ReadOptimizedTable.java | 5 - .../apache/paimon/utils/BranchManager.java | 27 ++-- .../apache/paimon/utils/SnapshotManager.java | 120 ++++++++---------- .../org/apache/paimon/utils/TagManager.java | 38 ++++-- .../table/AppendOnlyFileStoreTableTest.java | 41 +++++- .../paimon/table/FileStoreTableTestBase.java | 49 ++++--- .../table/PrimaryKeyFileStoreTableTest.java | 44 ++++++- .../table/SchemaEvolutionTableTestBase.java | 6 - .../paimon/table/WritePreemptMemoryTest.java | 25 ++++ .../paimon/flink/FlinkTableFactory.java | 5 +- .../apache/paimon/flink/sink/FlinkSink.java | 1 + .../paimon/flink/AppendOnlyTableITCase.java | 16 +++ .../apache/paimon/flink/FileStoreITCase.java | 13 +- .../flink/FileStoreWithBranchITCase.java | 43 +++++++ .../flink/PrimaryKeyFileStoreTableITCase.java | 55 ++++++++ 36 files changed, 422 insertions(+), 274 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 77710afabe21..5e7db5c553c4 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -32,6 +32,12 @@ Boolean Whether to create underlying storage when reading and writing the table. + +
branch
+ "main" + String + Specify branch name. +
bucket
-1 diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index c9f9833dccd8..a82166ad7504 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -112,6 +112,9 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("The file path of this table in the filesystem."); + public static final ConfigOption BRANCH = + key("branch").stringType().defaultValue("main").withDescription("Specify branch name."); + public static final String FILE_FORMAT_ORC = "orc"; public static final String FILE_FORMAT_AVRO = "avro"; public static final String FILE_FORMAT_PARQUET = "parquet"; @@ -1178,6 +1181,17 @@ public Path path() { return path(options.toMap()); } + public String branch() { + return branch(options.toMap()); + } + + public static String branch(Map options) { + if (options.containsKey(BRANCH.key())) { + return options.get(BRANCH.key()); + } + return BRANCH.defaultValue(); + } + public static Path path(Map options) { return new Path(options.get(PATH.key())); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index e893c0525978..8cbf433f61f9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -55,8 +55,6 @@ import java.util.Comparator; import java.util.List; -import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; - /** * Base {@link FileStore} implementation. * @@ -104,7 +102,7 @@ public FileStorePathFactory pathFactory() { @Override public SnapshotManager snapshotManager() { - return new SnapshotManager(fileIO, options.path()); + return new SnapshotManager(fileIO, options.path(), options.branch()); } @Override @@ -175,10 +173,6 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) { @Override public FileStoreCommitImpl newCommit(String commitUser) { - return newCommit(commitUser, DEFAULT_MAIN_BRANCH); - } - - public FileStoreCommitImpl newCommit(String commitUser, String branchName) { return new FileStoreCommitImpl( fileIO, schemaManager, @@ -196,7 +190,7 @@ public FileStoreCommitImpl newCommit(String commitUser, String branchName) { options.manifestMergeMinCount(), partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(), newKeyComparator(), - branchName, + options.branch(), newStatsFileHandler()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index d2fefc5f98ed..3cd7bb3b6959 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -39,7 +39,6 @@ import static org.apache.paimon.predicate.PredicateBuilder.and; import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; -import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; /** {@link FileStore} for reading and writing {@link InternalRow}. */ public class AppendOnlyFileStore extends AbstractFileStore { @@ -71,11 +70,7 @@ public BucketMode bucketMode() { @Override public AppendOnlyFileStoreScan newScan() { - return newScan(DEFAULT_MAIN_BRANCH); - } - - public AppendOnlyFileStoreScan newScan(String branchName) { - return newScan(false, branchName); + return newScan(false); } @Override @@ -106,12 +101,12 @@ public AppendOnlyFileStoreWrite newWrite( rowType, pathFactory(), snapshotManager(), - newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter), + newScan(true).withManifestCacheFilter(manifestFilter), options, tableName); } - private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) { + private AppendOnlyFileStoreScan newScan(boolean forWrite) { ScanBucketFilter bucketFilter = new ScanBucketFilter(bucketKeyType) { @Override @@ -146,7 +141,6 @@ public void pushdown(Predicate predicate) { options.bucket(), forWrite, options.scanManifestParallelism(), - branchName, options.fileIndexReadEnabled()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index 870feffdef68..66bf3363de01 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -63,8 +63,6 @@ public interface FileStore extends Serializable { FileStoreScan newScan(); - FileStoreScan newScan(String branchName); - ManifestList.Factory manifestListFactory(); ManifestFile.Factory manifestFileFactory(); @@ -81,8 +79,6 @@ public interface FileStore extends Serializable { FileStoreCommit newCommit(String commitUser); - FileStoreCommit newCommit(String commitUser, String branchName); - SnapshotDeletion newSnapshotDeletion(); TagManager newTagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index edd45d1c5487..b1b7fc211c1a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -56,7 +56,6 @@ import static org.apache.paimon.predicate.PredicateBuilder.and; import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; -import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.Preconditions.checkArgument; /** {@link FileStore} for querying and updating {@link KeyValue}s. */ @@ -112,11 +111,7 @@ public BucketMode bucketMode() { @Override public KeyValueFileStoreScan newScan() { - return newScan(DEFAULT_MAIN_BRANCH); - } - - public KeyValueFileStoreScan newScan(String branchName) { - return newScan(false, branchName); + return newScan(false); } @Override @@ -185,7 +180,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma pathFactory(), format2PathFactory(), snapshotManager(), - newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter), + newScan(true).withManifestCacheFilter(manifestFilter), indexFactory, deletionVectorsMaintainerFactory, options, @@ -209,7 +204,7 @@ private Map format2PathFactory() { return pathFactoryMap; } - private KeyValueFileStoreScan newScan(boolean forWrite, String branchName) { + private KeyValueFileStoreScan newScan(boolean forWrite) { ScanBucketFilter bucketFilter = new ScanBucketFilter(bucketKeyType) { @Override @@ -240,7 +235,6 @@ public void pushdown(Predicate keyFilter) { options.bucket(), forWrite, options.scanManifestParallelism(), - branchName, options.deletionVectorsEnabled(), options.mergeEngine()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 7d26a2197c10..605fd14f644d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -335,7 +335,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException { } return table; } else { - return getDataTable(identifier); + Table table = getDataTable(identifier); + return table; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 073490694777..8abf1f18b311 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -79,7 +79,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private final SchemaManager schemaManager; private final TableSchema schema; protected final ScanBucketFilter bucketKeyFilter; - private final String branchName; private PartitionPredicate partitionFilter; private Snapshot specifiedSnapshot = null; @@ -102,8 +101,7 @@ public AbstractFileStoreScan( ManifestList.Factory manifestListFactory, int numOfBuckets, boolean checkNumOfBuckets, - Integer scanManifestParallelism, - String branchName) { + Integer scanManifestParallelism) { this.partitionType = partitionType; this.bucketKeyFilter = bucketKeyFilter; this.snapshotManager = snapshotManager; @@ -115,7 +113,6 @@ public AbstractFileStoreScan( this.checkNumOfBuckets = checkNumOfBuckets; this.tableSchemas = new ConcurrentHashMap<>(); this.scanManifestParallelism = scanManifestParallelism; - this.branchName = branchName; } @Override @@ -397,7 +394,7 @@ private Pair> readManifests() { if (manifests == null) { snapshot = specifiedSnapshot == null - ? snapshotManager.latestSnapshot(branchName) + ? snapshotManager.latestSnapshot() : specifiedSnapshot; if (snapshot == null) { manifests = Collections.emptyList(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index bbc4487caf7b..056c82e82e59 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -62,7 +62,6 @@ public AppendOnlyFileStoreScan( int numOfBuckets, boolean checkNumOfBuckets, Integer scanManifestParallelism, - String branchName, boolean fileIndexReadEnabled) { super( partitionType, @@ -74,8 +73,7 @@ public AppendOnlyFileStoreScan( manifestListFactory, numOfBuckets, checkNumOfBuckets, - scanManifestParallelism, - branchName); + scanManifestParallelism); this.simpleStatsConverters = new SimpleStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id()); this.fileIndexReadEnabled = fileIndexReadEnabled; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index dab336b81871..7ddf8c210284 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -251,7 +251,7 @@ public void commit( // we can skip conflict checking in tryCommit method. // This optimization is mainly used to decrease the number of times we read from // files. - latestSnapshot = snapshotManager.latestSnapshot(branchName); + latestSnapshot = snapshotManager.latestSnapshot(); if (latestSnapshot != null && checkAppendFiles) { // it is possible that some partitions only have compact changes, // so we need to contain all changes @@ -654,7 +654,7 @@ private int tryCommit( @Nullable String statsFileName) { int cnt = 0; while (true) { - Snapshot latestSnapshot = snapshotManager.latestSnapshot(branchName); + Snapshot latestSnapshot = snapshotManager.latestSnapshot(); cnt++; if (tryCommitOnce( tableFiles, @@ -754,7 +754,7 @@ public boolean tryCommitOnce( Path newSnapshotPath = branchName.equals(DEFAULT_MAIN_BRANCH) ? snapshotManager.snapshotPath(newSnapshotId) - : snapshotManager.branchSnapshotPath(branchName, newSnapshotId); + : snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshotId); if (LOG.isDebugEnabled()) { LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId); @@ -839,7 +839,7 @@ public boolean tryCommitOnce( newIndexManifest = indexManifest; } - long latestSchemaId = schemaManager.latest(branchName).get().id(); + long latestSchemaId = schemaManager.latest().get().id(); // write new stats or inherit from the previous snapshot String statsFileName = null; @@ -904,7 +904,7 @@ public boolean tryCommitOnce( boolean committed = fileIO.writeFileUtf8(newSnapshotPath, newSnapshot.toJson()); if (committed) { - snapshotManager.commitLatestHint(newSnapshotId, branchName); + snapshotManager.commitLatestHint(newSnapshotId); } return committed; }; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 5363336eacdd..ab19cc49f8da 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -62,7 +62,6 @@ public KeyValueFileStoreScan( int numOfBuckets, boolean checkNumOfBuckets, Integer scanManifestParallelism, - String branchName, boolean deletionVectorsEnabled, MergeEngine mergeEngine) { super( @@ -75,8 +74,7 @@ public KeyValueFileStoreScan( manifestListFactory, numOfBuckets, checkNumOfBuckets, - scanManifestParallelism, - branchName); + scanManifestParallelism); this.fieldKeyStatsConverters = new SimpleStatsConverters( sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index cf8e4c357e66..e71a0cde1cb8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -94,12 +94,6 @@ public FileStoreScan newScan() { return wrapped.newScan(); } - @Override - public FileStoreScan newScan(String branchName) { - privilegeChecker.assertCanSelect(identifier); - return wrapped.newScan(branchName); - } - @Override public ManifestList.Factory manifestListFactory() { return wrapped.manifestListFactory(); @@ -144,12 +138,6 @@ public FileStoreCommit newCommit(String commitUser) { return wrapped.newCommit(commitUser); } - @Override - public FileStoreCommit newCommit(String commitUser, String branchName) { - privilegeChecker.assertCanInsert(identifier); - return wrapped.newCommit(commitUser, branchName); - } - @Override public SnapshotDeletion newSnapshotDeletion() { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index 5820e46b4a09..1881c7350900 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -67,12 +67,6 @@ public SnapshotReader newSnapshotReader() { return wrapped.newSnapshotReader(); } - @Override - public SnapshotReader newSnapshotReader(String branchName) { - privilegeChecker.assertCanSelect(identifier); - return wrapped.newSnapshotReader(branchName); - } - @Override public CoreOptions coreOptions() { return wrapped.coreOptions(); @@ -270,12 +264,6 @@ public TableCommitImpl newCommit(String commitUser) { return wrapped.newCommit(commitUser); } - @Override - public TableCommitImpl newCommit(String commitUser, String branchName) { - privilegeChecker.assertCanInsert(identifier); - return wrapped.newCommit(commitUser, branchName); - } - @Override public LocalTableQuery newLocalTableQuery() { privilegeChecker.assertCanSelect(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index d94da91ef4c5..228d30d5cb08 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -44,6 +44,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.StringUtils; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -67,6 +68,7 @@ import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; +import static org.apache.paimon.utils.BranchManager.isMainBranch; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; import static org.apache.paimon.utils.Preconditions.checkState; @@ -81,9 +83,21 @@ public class SchemaManager implements Serializable { @Nullable private transient Lock lock; + private final String branch; + public SchemaManager(FileIO fileIO, Path tableRoot) { + this(fileIO, tableRoot, DEFAULT_MAIN_BRANCH); + } + + /** Specify the default branch for data writing. */ + public SchemaManager(FileIO fileIO, Path tableRoot, String branch) { this.fileIO = fileIO; this.tableRoot = tableRoot; + this.branch = StringUtils.isBlank(branch) ? DEFAULT_MAIN_BRANCH : branch; + } + + public SchemaManager copyWithBranch(String branchName) { + return new SchemaManager(fileIO, tableRoot, branchName); } public SchemaManager withLock(@Nullable Lock lock) { @@ -91,28 +105,18 @@ public SchemaManager withLock(@Nullable Lock lock) { return this; } - /** @return latest schema. */ public Optional latest() { - return latest(DEFAULT_MAIN_BRANCH); - } - - public Optional latest(String branchName) { - Path directoryPath = - branchName.equals(DEFAULT_MAIN_BRANCH) - ? schemaDirectory() - : branchSchemaDirectory(branchName); try { - return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX) + return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX) .reduce(Math::max) - .map(this::schema); + .map(id -> schema(id)); } catch (IOException e) { throw new UncheckedIOException(e); } } - /** List all schema. */ public List listAll() { - return listAllIds().stream().map(this::schema).collect(Collectors.toList()); + return listAllIds().stream().map(id -> schema(id)).collect(Collectors.toList()); } /** List all schema IDs. */ @@ -125,7 +129,6 @@ public List listAllIds() { } } - /** Create a new schema from {@link Schema}. */ public TableSchema createTable(Schema schema) throws Exception { return createTable(schema, false); } @@ -471,7 +474,6 @@ private void updateColumn( @VisibleForTesting boolean commit(TableSchema newSchema) throws Exception { SchemaValidation.validateTableSchema(newSchema); - Path schemaPath = toSchemaPath(newSchema.id()); Callable callable = () -> fileIO.writeFileUtf8(schemaPath, newSchema.toString()); if (lock == null) { @@ -497,22 +499,18 @@ public static TableSchema fromPath(FileIO fileIO, Path path) { } } - private Path schemaDirectory() { - return new Path(tableRoot + "/schema"); + public Path schemaDirectory() { + return isMainBranch(branch) + ? new Path(tableRoot + "/schema") + : new Path(getBranchPath(tableRoot, branch) + "/schema"); } @VisibleForTesting - public Path toSchemaPath(long id) { - return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id); - } - - public Path branchSchemaDirectory(String branchName) { - return new Path(getBranchPath(tableRoot, branchName) + "/schema"); - } - - public Path branchSchemaPath(String branchName, long schemaId) { - return new Path( - getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId); + public Path toSchemaPath(long schemaId) { + return isMainBranch(branch) + ? new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + schemaId) + : new Path( + getBranchPath(tableRoot, branch) + "/schema/" + SCHEMA_PREFIX + schemaId); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index a8241e51fc2b..434264dbbd09 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -71,7 +71,6 @@ import java.util.function.BiConsumer; import static org.apache.paimon.CoreOptions.PATH; -import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -139,13 +138,8 @@ public RowKeyExtractor createRowKeyExtractor() { @Override public SnapshotReader newSnapshotReader() { - return newSnapshotReader(DEFAULT_MAIN_BRANCH); - } - - @Override - public SnapshotReader newSnapshotReader(String branchName) { return new SnapshotReaderImpl( - store().newScan(branchName), + store().newScan(), tableSchema, coreOptions(), snapshotManager(), @@ -246,7 +240,8 @@ private FileStoreTable copyInternal(Map dynamicOptions, boolean @Override public FileStoreTable copyWithLatestSchema() { Map options = tableSchema.options(); - SchemaManager schemaManager = new SchemaManager(fileIO(), location()); + SchemaManager schemaManager = + new SchemaManager(fileIO(), location(), CoreOptions.branch(options())); Optional optionalLatestSchema = schemaManager.latest(); if (optionalLatestSchema.isPresent()) { TableSchema newTableSchema = optionalLatestSchema.get(); @@ -259,7 +254,7 @@ public FileStoreTable copyWithLatestSchema() { } protected SchemaManager schemaManager() { - return new SchemaManager(fileIO(), path); + return new SchemaManager(fileIO(), path, CoreOptions.branch(options())); } @Override @@ -308,11 +303,6 @@ public ExpireSnapshots newExpireChangelog() { @Override public TableCommitImpl newCommit(String commitUser) { - // Compatibility with previous design, the main branch is written by default - return newCommit(commitUser, DEFAULT_MAIN_BRANCH); - } - - public TableCommitImpl newCommit(String commitUser, String branchName) { CoreOptions options = coreOptions(); Runnable snapshotExpire = null; if (!options.writeOnly()) { @@ -340,7 +330,7 @@ public TableCommitImpl newCommit(String commitUser, String branchName) { } return new TableCommitImpl( - store().newCommit(commitUser, branchName), + store().newCommit(commitUser), createCommitCallbacks(), snapshotExpire, options.writeOnly() ? null : store().newPartitionExpire(commitUser), @@ -571,7 +561,7 @@ public void rollbackTo(String tagName) { @Override public TagManager tagManager() { - return new TagManager(fileIO, path); + return new TagManager(fileIO, path, CoreOptions.branch(options())); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java index 1d892130499c..b5bebe2a72d4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java @@ -31,8 +31,6 @@ public interface DataTable extends InnerTable { SnapshotReader newSnapshotReader(); - SnapshotReader newSnapshotReader(String branchName); - CoreOptions coreOptions(); SnapshotManager snapshotManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index b8685f329bc5..212555d7b1d6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -97,8 +97,6 @@ default Optional comment() { @Override TableCommitImpl newCommit(String commitUser); - TableCommitImpl newCommit(String commitUser, String branchName); - LocalTableQuery newLocalTableQuery(); default SimpleStats getSchemaFieldStats(DataFileMeta dataFileMeta) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index 638253585867..e941a89e01b3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -53,8 +53,9 @@ public static FileStoreTable create(FileIO fileIO, Path path) { public static FileStoreTable create(FileIO fileIO, Options options) { Path tablePath = CoreOptions.path(options); + String branchName = CoreOptions.branch(options.toMap()); TableSchema tableSchema = - new SchemaManager(fileIO, tablePath) + new SchemaManager(fileIO, tablePath, branchName) .latest() .orElseThrow( () -> diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index b6cfa5c8f280..7ff7f936e104 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -132,11 +132,6 @@ public SnapshotReader newSnapshotReader() { return new AuditLogDataReader(dataTable.newSnapshotReader()); } - @Override - public SnapshotReader newSnapshotReader(String branchName) { - return new AuditLogDataReader(dataTable.newSnapshotReader(branchName)); - } - @Override public InnerTableScan newScan() { return new AuditLogBatchScan(dataTable.newScan()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java index 2ab88a34609e..4c9b9a6015f7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java @@ -148,11 +148,6 @@ public SnapshotReader newSnapshotReader() { return wrapped.newSnapshotReader(); } - @Override - public SnapshotReader newSnapshotReader(String branchName) { - return wrapped.newSnapshotReader(branchName); - } - @Override public InnerTableScan newScan() { return wrapped.newScan(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java index bedc19ac3557..7825b93e4fad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java @@ -135,11 +135,6 @@ public SnapshotReader newSnapshotReader() { return wrapped.newSnapshotReader(); } - @Override - public SnapshotReader newSnapshotReader(String branchName) { - return wrapped.newSnapshotReader(branchName); - } - @Override public InnerTableScan newScan() { return wrapped.newScan(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index 3c6910fbec16..0b0b0e586489 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -97,11 +97,6 @@ public SnapshotReader newSnapshotReader() { } } - @Override - public SnapshotReader newSnapshotReader(String branchName) { - return dataTable.newSnapshotReader(branchName); - } - @Override public InnerTableScan newScan() { return new InnerTableScanImpl( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index f3f06f89208a..41099dfac46b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -74,6 +74,10 @@ public Path branchDirectory() { return new Path(tablePath + "/branch"); } + public static boolean isMainBranch(String branch) { + return branch.equals(DEFAULT_MAIN_BRANCH); + } + /** Return the path string of a branch. */ public static String getBranchPath(Path tablePath, String branchName) { return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName; @@ -87,7 +91,7 @@ public Path branchPath(String branchName) { /** Create empty branch. */ public void createBranch(String branchName) { checkArgument( - !branchName.equals(DEFAULT_MAIN_BRANCH), + !isMainBranch(branchName), String.format( "Branch name '%s' is the default branch and cannot be used.", DEFAULT_MAIN_BRANCH)); @@ -97,11 +101,12 @@ public void createBranch(String branchName) { !branchName.chars().allMatch(Character::isDigit), "Branch name cannot be pure numeric string but is '%s'.", branchName); + try { TableSchema latestSchema = schemaManager.latest().get(); fileIO.copyFileUtf8( schemaManager.toSchemaPath(latestSchema.id()), - schemaManager.branchSchemaPath(branchName, latestSchema.id())); + schemaManager.copyWithBranch(branchName).toSchemaPath(latestSchema.id())); } catch (IOException e) { throw new RuntimeException( String.format( @@ -113,7 +118,7 @@ branchName, getBranchPath(tablePath, branchName)), public void createBranch(String branchName, long snapshotId) { checkArgument( - !branchName.equals(DEFAULT_MAIN_BRANCH), + !isMainBranch(branchName), String.format( "Branch name '%s' is the default branch and cannot be used.", DEFAULT_MAIN_BRANCH)); @@ -130,10 +135,10 @@ public void createBranch(String branchName, long snapshotId) { // Copy the corresponding snapshot and schema files into the branch directory fileIO.copyFileUtf8( snapshotManager.snapshotPath(snapshotId), - snapshotManager.branchSnapshotPath(branchName, snapshot.id())); + snapshotManager.copyWithBranch(branchName).snapshotPath(snapshot.id())); fileIO.copyFileUtf8( schemaManager.toSchemaPath(snapshot.schemaId()), - schemaManager.branchSchemaPath(branchName, snapshot.schemaId())); + schemaManager.copyWithBranch(branchName).toSchemaPath(snapshot.schemaId())); } catch (IOException e) { throw new RuntimeException( String.format( @@ -145,7 +150,7 @@ branchName, getBranchPath(tablePath, branchName)), public void createBranch(String branchName, String tagName) { checkArgument( - !branchName.equals(DEFAULT_MAIN_BRANCH), + !isMainBranch(branchName), String.format( "Branch name '%s' is the default branch and cannot be used.", DEFAULT_MAIN_BRANCH)); @@ -162,13 +167,14 @@ public void createBranch(String branchName, String tagName) { try { // Copy the corresponding tag, snapshot and schema files into the branch directory fileIO.copyFileUtf8( - tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName)); + tagManager.tagPath(tagName), + tagManager.copyWithBranch(branchName).tagPath(tagName)); fileIO.copyFileUtf8( snapshotManager.snapshotPath(snapshot.id()), - snapshotManager.branchSnapshotPath(branchName, snapshot.id())); + snapshotManager.copyWithBranch(branchName).snapshotPath(snapshot.id())); fileIO.copyFileUtf8( schemaManager.toSchemaPath(snapshot.schemaId()), - schemaManager.branchSchemaPath(branchName, snapshot.schemaId())); + schemaManager.copyWithBranch(branchName).toSchemaPath(snapshot.schemaId())); } catch (IOException e) { throw new RuntimeException( String.format( @@ -231,7 +237,8 @@ public List branches() { new PriorityQueue<>(Comparator.comparingLong(TableBranch::getCreateTime)); for (Pair path : paths) { String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length()); - Optional tableSchema = schemaManager.latest(branchName); + Optional tableSchema = + schemaManager.copyWithBranch(branchName).latest(); if (!tableSchema.isPresent()) { // Support empty branch. pq.add(new TableBranch(branchName, path.getValue())); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index dbbc8fffdc05..63679b86acea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -47,6 +47,7 @@ import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; +import static org.apache.paimon.utils.BranchManager.isMainBranch; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; /** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */ @@ -63,10 +64,21 @@ public class SnapshotManager implements Serializable { private final FileIO fileIO; private final Path tablePath; + private final String branch; public SnapshotManager(FileIO fileIO, Path tablePath) { + this(fileIO, tablePath, DEFAULT_MAIN_BRANCH); + } + + /** Specify the default branch for data writing. */ + public SnapshotManager(FileIO fileIO, Path tablePath, String branchName) { this.fileIO = fileIO; this.tablePath = tablePath; + this.branch = StringUtils.isBlank(branchName) ? DEFAULT_MAIN_BRANCH : branchName; + } + + public SnapshotManager copyWithBranch(String branchName) { + return new SnapshotManager(fileIO, tablePath, branchName); } public FileIO fileIO() { @@ -77,45 +89,41 @@ public Path tablePath() { return tablePath; } - public Path snapshotDirectory() { - return new Path(tablePath + "/snapshot"); - } - public Path changelogDirectory() { - return new Path(tablePath + "/changelog"); + return isMainBranch(branch) + ? new Path(tablePath + "/changelog") + : new Path(getBranchPath(tablePath, branch) + "/changelog"); } public Path longLivedChangelogPath(long snapshotId) { - return new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId); + return isMainBranch(branch) + ? new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId) + : new Path( + getBranchPath(tablePath, branch) + + "/changelog/" + + CHANGELOG_PREFIX + + snapshotId); } public Path snapshotPath(long snapshotId) { - return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); - } - - public Path branchSnapshotDirectory(String branchName) { - return new Path(getBranchPath(tablePath, branchName) + "/snapshot"); - } - - public Path branchSnapshotPath(String branchName, long snapshotId) { - return new Path( - getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); - } - - public Path snapshotPathByBranch(String branchName, long snapshotId) { - return branchName.equals(DEFAULT_MAIN_BRANCH) - ? snapshotPath(snapshotId) - : branchSnapshotPath(branchName, snapshotId); + return isMainBranch(branch) + ? new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId) + : new Path( + getBranchPath(tablePath, branch) + + "/snapshot/" + + SNAPSHOT_PREFIX + + snapshotId); } - public Path snapshotDirByBranch(String branchName) { - return branchName.equals(DEFAULT_MAIN_BRANCH) - ? snapshotDirectory() - : branchSnapshotDirectory(branchName); + public Path snapshotDirectory() { + return isMainBranch(branch) + ? new Path(tablePath + "/snapshot") + : new Path(getBranchPath(tablePath, branch) + "/snapshot"); } public Snapshot snapshot(long snapshotId) { - return snapshot(DEFAULT_MAIN_BRANCH, snapshotId); + Path snapshotPath = snapshotPath(snapshotId); + return Snapshot.fromPath(fileIO, snapshotPath); } public Changelog changelog(long snapshotId) { @@ -127,11 +135,6 @@ public Changelog longLivedChangelog(long snapshotId) { return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId)); } - public Snapshot snapshot(String branchName, long snapshotId) { - Path snapshotPath = snapshotPathByBranch(branchName, snapshotId); - return Snapshot.fromPath(fileIO, snapshotPath); - } - public boolean snapshotExists(long snapshotId) { Path path = snapshotPath(snapshotId); try { @@ -155,21 +158,13 @@ public boolean longLivedChangelogExists(long snapshotId) { } public @Nullable Snapshot latestSnapshot() { - return latestSnapshot(DEFAULT_MAIN_BRANCH); - } - - public @Nullable Snapshot latestSnapshot(String branchName) { - Long snapshotId = latestSnapshotId(branchName); - return snapshotId == null ? null : snapshot(branchName, snapshotId); + Long snapshotId = latestSnapshotId(); + return snapshotId == null ? null : snapshot(snapshotId); } public @Nullable Long latestSnapshotId() { - return latestSnapshotId(DEFAULT_MAIN_BRANCH); - } - - public @Nullable Long latestSnapshotId(String branchName) { try { - return findLatest(snapshotDirByBranch(branchName), SNAPSHOT_PREFIX, this::snapshotPath); + return findLatest(snapshotDirectory(), SNAPSHOT_PREFIX, this::snapshotPath); } catch (IOException e) { throw new RuntimeException("Failed to find latest snapshot id", e); } @@ -181,7 +176,11 @@ public boolean longLivedChangelogExists(long snapshotId) { } public @Nullable Long earliestSnapshotId() { - return earliestSnapshotId(DEFAULT_MAIN_BRANCH); + try { + return findEarliest(snapshotDirectory(), SNAPSHOT_PREFIX, this::snapshotPath); + } catch (IOException e) { + throw new RuntimeException("Failed to find earliest snapshot id", e); + } } public @Nullable Long earliestLongLivedChangelogId() { @@ -205,15 +204,6 @@ public boolean longLivedChangelogExists(long snapshotId) { return latestSnapshotId(); } - public @Nullable Long earliestSnapshotId(String branchName) { - try { - return findEarliest( - snapshotDirByBranch(branchName), SNAPSHOT_PREFIX, this::snapshotPath); - } catch (IOException e) { - throw new RuntimeException("Failed to find earliest snapshot id", e); - } - } - public @Nullable Long pickOrLatest(Predicate predicate) { Long latestId = latestSnapshotId(); Long earliestId = earliestSnapshotId(); @@ -255,7 +245,6 @@ private Snapshot changelogOrSnapshot(long snapshotId) { earliest = earliestSnapshot; } Long latest = latestSnapshotId(); - if (earliest == null || latest == null) { return null; } @@ -370,14 +359,14 @@ public long snapshotCount() throws IOException { public Iterator snapshots() throws IOException { return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX) - .map(this::snapshot) + .map(id -> snapshot(id)) .sorted(Comparator.comparingLong(Snapshot::id)) .iterator(); } public Iterator changelogs() throws IOException { return listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX) - .map(this::changelog) + .map(snapshotId -> changelog(snapshotId)) .sorted(Comparator.comparingLong(Changelog::id)) .iterator(); } @@ -389,7 +378,7 @@ public Iterator changelogs() throws IOException { public List safelyGetAllSnapshots() throws IOException { List paths = listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX) - .map(this::snapshotPath) + .map(id -> snapshotPath(id)) .collect(Collectors.toList()); List snapshots = new ArrayList<>(); @@ -406,7 +395,7 @@ public List safelyGetAllSnapshots() throws IOException { public List safelyGetAllChangelogs() throws IOException { List paths = listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX) - .map(this::longLivedChangelogPath) + .map(id -> longLivedChangelogPath(id)) .collect(Collectors.toList()); List changelogs = new ArrayList<>(); @@ -582,7 +571,6 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { return snapshotId; } } - return findByListFiles(Math::max, dir, prefix); } @@ -602,7 +590,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { } public Long readHint(String fileName) { - return readHint(fileName, snapshotDirByBranch(DEFAULT_MAIN_BRANCH)); + return readHint(fileName, snapshotDirectory()); } public Long readHint(String fileName, Path dir) { @@ -629,11 +617,7 @@ private Long findByListFiles(BinaryOperator reducer, Path dir, String pref } public void commitLatestHint(long snapshotId) throws IOException { - commitLatestHint(snapshotId, DEFAULT_MAIN_BRANCH); - } - - public void commitLatestHint(long snapshotId, String branchName) throws IOException { - commitHint(snapshotId, LATEST, snapshotDirByBranch(branchName)); + commitHint(snapshotId, LATEST, snapshotDirectory()); } public void commitLongLivedChangelogLatestHint(long snapshotId) throws IOException { @@ -645,11 +629,7 @@ public void commitLongLivedChangelogEarliestHint(long snapshotId) throws IOExcep } public void commitEarliestHint(long snapshotId) throws IOException { - commitEarliestHint(snapshotId, DEFAULT_MAIN_BRANCH); - } - - public void commitEarliestHint(long snapshotId, String branchName) throws IOException { - commitHint(snapshotId, EARLIEST, snapshotDirByBranch(branchName)); + commitHint(snapshotId, EARLIEST, snapshotDirectory()); } private void commitHint(long snapshotId, String fileName, Path dir) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 8b7818fed782..c96bbdd568a7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -45,7 +45,9 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; +import static org.apache.paimon.utils.BranchManager.isMainBranch; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -58,25 +60,35 @@ public class TagManager { private final FileIO fileIO; private final Path tablePath; + private final String branch; public TagManager(FileIO fileIO, Path tablePath) { + this(fileIO, tablePath, DEFAULT_MAIN_BRANCH); + } + + /** Specify the default branch for data writing. */ + public TagManager(FileIO fileIO, Path tablePath, String branch) { this.fileIO = fileIO; this.tablePath = tablePath; + this.branch = StringUtils.isBlank(branch) ? DEFAULT_MAIN_BRANCH : branch; } - /** Return the root Directory of tags. */ - public Path tagDirectory() { - return new Path(tablePath + "/tag"); + public TagManager copyWithBranch(String branchName) { + return new TagManager(fileIO, tablePath, branchName); } - /** Return the path of a tag. */ - public Path tagPath(String tagName) { - return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName); + /** Return the root Directory of tags. */ + public Path tagDirectory() { + return isMainBranch(branch) + ? new Path(tablePath + "/tag") + : new Path(getBranchPath(tablePath, branch) + "/tag"); } /** Return the path of a tag in branch. */ - public Path branchTagPath(String branchName, String tagName) { - return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); + public Path tagPath(String tagName) { + return isMainBranch(branch) + ? new Path(tablePath + "/tag/" + TAG_PREFIX + tagName) + : new Path(getBranchPath(tablePath, branch) + "/tag/" + TAG_PREFIX + tagName); } /** Create a tag from given snapshot and save it in the storage. */ @@ -152,14 +164,13 @@ public void deleteTag( List taggedSnapshots; // skip file deletion if snapshot exists - if (snapshotManager.snapshotExists(taggedSnapshot.id())) { + if (snapshotManager.copyWithBranch(branch).snapshotExists(taggedSnapshot.id())) { deleteTagMetaFile(tagName, callbacks); return; } else { // FileIO discovers tags by tag file, so we should read all tags before we delete tag SortedMap> tags = tags(); deleteTagMetaFile(tagName, callbacks); - // skip data file clean if more than 1 tags are created based on this snapshot if (tags.get(taggedSnapshot).size() > 1) { return; @@ -196,7 +207,7 @@ private void doClean( skippedSnapshots.add(taggedSnapshots.get(index - 1)); } // the nearest right neighbor - Snapshot right = snapshotManager.earliestSnapshot(); + Snapshot right = snapshotManager.copyWithBranch(branch).earliestSnapshot(); if (index + 1 < taggedSnapshots.size()) { Snapshot rightTag = taggedSnapshots.get(index + 1); right = right.id() < rightTag.id() ? right : rightTag; @@ -226,7 +237,7 @@ private void doClean( taggedSnapshot, tagDeletion.manifestSkippingSet(skippedSnapshots)); } - /** Check if a tag exists. */ + /** Check if a branch tag exists. */ public boolean tagExists(String tagName) { Path path = tagPath(tagName); try { @@ -242,7 +253,6 @@ public boolean tagExists(String tagName) { /** Get the tagged snapshot by name. */ public Snapshot taggedSnapshot(String tagName) { checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); - // Trim to snapshot to avoid equals and compare snapshot. return Tag.fromPath(fileIO, tagPath(tagName)).trimToSnapshot(); } @@ -259,7 +269,7 @@ public List taggedSnapshots() { return new ArrayList<>(tags().keySet()); } - /** Get all tagged snapshots with tag names sorted by snapshot id. */ + /** Get all tagged snapshots with names sorted by snapshot id. */ public SortedMap> tags() { return tags(tagName -> true); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index 62124a617085..b569a76a8a08 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -128,9 +128,11 @@ public void testBatchReadWrite() throws Exception { public void testBranchBatchReadWrite() throws Exception { FileStoreTable table = createFileStoreTable(); generateBranch(table); - writeBranchData(table); - List splits = toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()); - TableRead read = table.newRead(); + + FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME); + writeBranchData(tableBranch); + List splits = toSplits(tableBranch.newSnapshotReader().read().dataSplits()); + TableRead read = tableBranch.newRead(); assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)) .hasSameElementsAs( Arrays.asList( @@ -308,15 +310,18 @@ public void testBatchSplitOrderByPartition() throws Exception { public void testBranchStreamingReadWrite() throws Exception { FileStoreTable table = createFileStoreTable(); generateBranch(table); - writeBranchData(table); + + FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME); + writeBranchData(tableBranch); List splits = toSplits( - table.newSnapshotReader(BRANCH_NAME) + tableBranch + .newSnapshotReader() .withMode(ScanMode.DELTA) .read() .dataSplits()); - TableRead read = table.newRead(); + TableRead read = tableBranch.newRead(); assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)) .isEqualTo( @@ -804,7 +809,7 @@ private void writeData() throws Exception { private void writeBranchData(FileStoreTable table) throws Exception { StreamTableWrite write = table.newWrite(commitUser); - StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME); + StreamTableCommit commit = table.newCommit(commitUser); write.write(rowData(1, 10, 100L)); write.write(rowData(2, 20, 200L)); @@ -845,6 +850,28 @@ protected FileStoreTable createFileStoreTable(Consumer configure) throw return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); } + @Override + protected FileStoreTable createFileStoreTable(String branch, Consumer configure) + throws Exception { + Options conf = new Options(); + conf.set(CoreOptions.PATH, tablePath.toString()); + conf.set(CoreOptions.BRANCH, branch); + configure.accept(conf); + if (!conf.contains(BUCKET_KEY) && conf.get(BUCKET) != -1) { + conf.set(BUCKET_KEY, "a"); + } + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), tablePath, branch), + new Schema( + ROW_TYPE.getFields(), + Collections.singletonList("pt"), + Collections.emptyList(), + conf.toMap(), + "")); + return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); + } + @Override protected FileStoreTable overwriteTestFileStoreTable() throws Exception { Options conf = new Options(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 778c871f7798..b30dab420708 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1019,22 +1019,22 @@ public void testCreateBranch() throws Exception { // verify test-tag in test-branch is equal to snapshot 2 Snapshot branchTag = Snapshot.fromPath( - new TraceableFileIO(), tagManager.branchTagPath("test-branch", "test-tag")); + new TraceableFileIO(), + tagManager.copyWithBranch("test-branch").tagPath("test-tag")); assertThat(branchTag.equals(snapshot2)).isTrue(); // verify snapshot in test-branch is equal to snapshot 2 - SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath); + SnapshotManager snapshotManager = + new SnapshotManager(new TraceableFileIO(), tablePath, "test-branch"); Snapshot branchSnapshot = - Snapshot.fromPath( - new TraceableFileIO(), - snapshotManager.branchSnapshotPath("test-branch", 2)); + Snapshot.fromPath(new TraceableFileIO(), snapshotManager.snapshotPath(2)); assertThat(branchSnapshot.equals(snapshot2)).isTrue(); // verify schema in test-branch is equal to schema 0 - SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath); + SchemaManager schemaManager = + new SchemaManager(new TraceableFileIO(), tablePath, "test-branch"); TableSchema branchSchema = - SchemaManager.fromPath( - new TraceableFileIO(), schemaManager.branchSchemaPath("test-branch", 0)); + SchemaManager.fromPath(new TraceableFileIO(), schemaManager.toSchemaPath(0)); TableSchema schema0 = schemaManager.schema(0); assertThat(branchSchema.equals(schema0)).isTrue(); } @@ -1312,9 +1312,10 @@ public void testBranchWriteAndRead() throws Exception { generateBranch(table); + FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME); // Write data to branch1 - try (StreamTableWrite write = table.newWrite(commitUser); - StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME)) { + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { write.write(rowData(2, 20, 200L)); commit.commit(1, write.prepareCommit(false, 2)); } @@ -1330,16 +1331,16 @@ public void testBranchWriteAndRead() throws Exception { // Validate data in branch1 assertThat( getResult( - table.newRead(), - toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()), + tableBranch.newRead(), + toSplits(tableBranch.newSnapshotReader().read().dataSplits()), BATCH_ROW_TO_STRING)) .containsExactlyInAnyOrder( "0|0|0|binary|varbinary|mapKey:mapVal|multiset", "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); // Write two rows data to branch1 again - try (StreamTableWrite write = table.newWrite(commitUser); - StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME)) { + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser)) { write.write(rowData(3, 30, 300L)); write.write(rowData(4, 40, 400L)); commit.commit(2, write.prepareCommit(false, 3)); @@ -1356,8 +1357,8 @@ public void testBranchWriteAndRead() throws Exception { // Verify data in branch1 assertThat( getResult( - table.newRead(), - toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()), + tableBranch.newRead(), + toSplits(tableBranch.newSnapshotReader().read().dataSplits()), BATCH_ROW_TO_STRING)) .containsExactlyInAnyOrder( "0|0|0|binary|varbinary|mapKey:mapVal|multiset", @@ -1446,6 +1447,14 @@ protected FileStoreTable createFileStoreTable(int numOfBucket) throws Exception return createFileStoreTable(conf -> conf.set(BUCKET, numOfBucket)); } + protected FileStoreTable createFileStoreTable(String branch, int numOfBucket) throws Exception { + return createFileStoreTable(branch, conf -> conf.set(BUCKET, numOfBucket)); + } + + protected FileStoreTable createFileStoreTable(String branch) throws Exception { + return createFileStoreTable(branch, 1); + } + protected FileStoreTable createFileStoreTable() throws Exception { return createFileStoreTable(1); } @@ -1453,6 +1462,9 @@ protected FileStoreTable createFileStoreTable() throws Exception { protected abstract FileStoreTable createFileStoreTable(Consumer configure) throws Exception; + protected abstract FileStoreTable createFileStoreTable( + String branch, Consumer configure) throws Exception; + protected abstract FileStoreTable overwriteTestFileStoreTable() throws Exception; private static InternalRow overwriteRow(Object... values) { @@ -1571,10 +1583,11 @@ protected void generateBranch(FileStoreTable table) throws Exception { assertThat(branchManager.branchExists(BRANCH_NAME)).isTrue(); // Verify branch1 and the main branch have the same data + FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME); assertThat( getResult( - table.newRead(), - toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()), + tableBranch.newRead(), + toSplits(tableBranch.newSnapshotReader().read().dataSplits()), BATCH_ROW_TO_STRING)) .containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset"); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 4c33e9893cfb..e4a6b1d3f306 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -81,6 +81,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.paimon.CoreOptions.BRANCH; import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN; @@ -257,9 +258,10 @@ public void testBatchReadWrite() throws Exception { public void testBranchBatchReadWrite() throws Exception { FileStoreTable table = createFileStoreTable(); generateBranch(table); - writeBranchData(table); - List splits = toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()); - TableRead read = table.newRead(); + FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME); + writeBranchData(tableBranch); + List splits = toSplits(tableBranch.newSnapshotReader().read().dataSplits()); + TableRead read = tableBranch.newRead(); assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)) .isEqualTo( Collections.singletonList( @@ -328,15 +330,18 @@ public void testStreamingReadWrite() throws Exception { public void testBranchStreamingReadWrite() throws Exception { FileStoreTable table = createFileStoreTable(); generateBranch(table); - writeBranchData(table); + + FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME); + writeBranchData(tableBranch); List splits = toSplits( - table.newSnapshotReader(BRANCH_NAME) + tableBranch + .newSnapshotReader() .withMode(ScanMode.DELTA) .read() .dataSplits()); - TableRead read = table.newRead(); + TableRead read = tableBranch.newRead(); assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)) .isEqualTo( Collections.singletonList( @@ -648,7 +653,7 @@ private void writeData() throws Exception { private void writeBranchData(FileStoreTable table) throws Exception { StreamTableWrite write = table.newWrite(commitUser); - StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME); + StreamTableCommit commit = table.newCommit(commitUser); write.write(rowData(1, 10, 100L)); write.write(rowData(2, 20, 200L)); @@ -1645,4 +1650,29 @@ private FileStoreTable createFileStoreTable(Consumer configure, RowType "")); return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); } + + @Override + protected FileStoreTable createFileStoreTable(String branch, Consumer configure) + throws Exception { + return createFileStoreTable(branch, configure, ROW_TYPE); + } + + private FileStoreTable createFileStoreTable( + String branch, Consumer configure, RowType rowType) throws Exception { + Options options = new Options(); + options.set(CoreOptions.PATH, tablePath.toString()); + options.set(BUCKET, 1); + options.set(BRANCH, branch); + configure.accept(options); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), tablePath), + new Schema( + rowType.getFields(), + Collections.singletonList("pt"), + Arrays.asList("pt", "a"), + options.toMap(), + "")); + return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java index a55bda9118a5..f5874bed7eac 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java @@ -502,12 +502,6 @@ public Optional latest() { .orElseThrow(IllegalStateException::new))); } - @Override - public Optional latest(String branchName) { - // for compatibility test - return latest(); - } - @Override public List listAll() { return new ArrayList<>(tableSchemas.values()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java index 548dc7205f43..ee74a9e8f213 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java @@ -110,6 +110,31 @@ protected FileStoreTable createFileStoreTable(Consumer configure) throw return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, schema); } + @Override + protected FileStoreTable createFileStoreTable(String branch, Consumer configure) + throws Exception { + Options options = new Options(); + options.set(CoreOptions.BUCKET, 1); + options.set(CoreOptions.PATH, tablePath.toString()); + // Run with minimal memory to ensure a more intense preempt + // Currently a writer needs at least one page + int pages = 10; + options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(pages * 1024)); + options.set(CoreOptions.PAGE_SIZE, new MemorySize(1024)); + options.set(CoreOptions.BRANCH, branch); + configure.accept(options); + TableSchema schema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), tablePath, branch), + new Schema( + ROW_TYPE.getFields(), + Collections.singletonList("pt"), + Arrays.asList("pt", "a"), + options.toMap(), + "")); + return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, schema); + } + @Override protected FileStoreTable overwriteTestFileStoreTable() throws Exception { Options conf = new Options(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java index 4c9f8ff9b55d..1617b4cc3327 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java @@ -58,9 +58,12 @@ private void createTableIfNeeded(Context context) { if (options.get(AUTO_CREATE)) { try { Path tablePath = CoreOptions.path(table.getOptions()); + String branch = CoreOptions.branch(table.getOptions()); SchemaManager schemaManager = new SchemaManager( - FileIO.get(tablePath, createCatalogContext(context)), tablePath); + FileIO.get(tablePath, createCatalogContext(context)), + tablePath, + branch); if (!schemaManager.latest().isPresent()) { schemaManager.createTable(FlinkCatalog.fromCatalogTable(table)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index cf9bd487a0ce..c3460f39cf0b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -235,6 +235,7 @@ protected DataStreamSink doCommit(DataStream written, String com commitUser, createCommitterFactory(), createCommittableStateManager()); + if (options.get(SINK_AUTO_TAG_FOR_SAVEPOINT)) { committerOperator = new AutoTagForSavepointCommitterOperator<>( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java index ad432d7cdb84..11f5b2313d62 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java @@ -259,6 +259,22 @@ public void testDynamicOptions() throws Exception { assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(2)); } + @Test + public void testReadWriteBranch() throws Exception { + // create table + sql("CREATE TABLE T (id INT)"); + // insert data + batchSql("INSERT INTO T VALUES (1)"); + // create tag + paimonTable("T").createTag("tag1", 1); + // create branch + paimonTable("T").createBranch("branch1", "tag1"); + // insert data to branch + batchSql("INSERT INTO T/*+ OPTIONS('branch' = 'branch1') */ VALUES (2)"); + List rows = batchSql("select * from T /*+ OPTIONS('branch' = 'branch1') */"); + assertThat(rows).containsExactlyInAnyOrder(Row.of(2), Row.of(1)); + } + @Override protected List ddl() { return Arrays.asList( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java index 980f3fb906dd..225c7f63f69f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java @@ -33,6 +33,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.utils.BlockingIterator; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FailingFileIO; import org.apache.flink.api.common.functions.MapFunction; @@ -62,6 +63,7 @@ import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -76,6 +78,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.paimon.CoreOptions.BRANCH; import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.BUCKET_KEY; import static org.apache.paimon.CoreOptions.FILE_FORMAT; @@ -125,11 +128,18 @@ public class FileStoreITCase extends AbstractTestBase { private final StreamExecutionEnvironment env; + protected static String branch; + public FileStoreITCase(boolean isBatch) { this.isBatch = isBatch; this.env = isBatch ? buildBatchEnv() : buildStreamEnv(); } + @BeforeAll + public static void before() { + branch = BranchManager.DEFAULT_MAIN_BRANCH; + } + @Parameters(name = "isBatch-{0}") public static List getVarSeg() { return Arrays.asList(true, false); @@ -468,7 +478,7 @@ public static FileStoreTable buildFileStoreTable( ""); return retryArtificialException( () -> { - new SchemaManager(LocalFileIO.create(), tablePath).createTable(schema); + new SchemaManager(LocalFileIO.create(), tablePath, branch).createTable(schema); return FileStoreTableFactory.create(LocalFileIO.create(), options); }); } @@ -484,6 +494,7 @@ public static Options buildConfiguration(boolean noFail, String temporaryPath) { options.set(PATH, FailingFileIO.getFailingPath(failingName, temporaryPath)); } options.set(FILE_FORMAT, CoreOptions.FILE_FORMAT_AVRO); + options.set(BRANCH, branch); return options; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java new file mode 100644 index 000000000000..8a2374b3c367 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.flink.sink.FixedBucketSink; +import org.apache.paimon.flink.source.ContinuousFileStoreSource; +import org.apache.paimon.flink.source.StaticFileStoreSource; + +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource} and {@link + * FixedBucketSink}. + */ +@ExtendWith(ParameterizedTestExtension.class) +public class FileStoreWithBranchITCase extends FileStoreITCase { + public FileStoreWithBranchITCase(boolean isBatch) { + super(isBatch); + } + + @BeforeAll + public static void before() { + branch = "testBranch"; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index e495ad3da5d4..229517e763a7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -168,6 +168,61 @@ public void testLookupChangelog() throws Exception { innerTestChangelogProducing(Collections.singletonList("'changelog-producer' = 'lookup'")); } + @Test + public void testTableReadWriteBranch() throws Exception { + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100) + .parallelism(1) + .build(); + + sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + sEnv.executeSql("USE CATALOG testCatalog"); + sEnv.executeSql( + "CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT ENFORCED ) " + + "WITH ( " + + "'bucket' = '2'" + + ")"); + + CloseableIterator it = sEnv.executeSql("SELECT * FROM T2").collect(); + + // insert data + sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await(); + // read initial data + List actual = new ArrayList<>(); + for (int i = 0; i < 1; i++) { + actual.add(it.next().toString()); + } + + assertThat(actual).containsExactlyInAnyOrder("+I[1, A]"); + + // create tag + sEnv.executeSql( + String.format("CALL sys.create_tag('%s.%s', 'tag2', 1, '5 d')", "default", "T2")); + // create branch + sEnv.executeSql( + String.format( + "CALL sys.create_branch('%s.%s', 'branch1', 'tag2')", "default", "T2")); + // alter table + sEnv.executeSql("ALTER TABLE T2 SET ('changelog-producer'='full-compaction')"); + + CloseableIterator branchIt = + sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' = 'branch1') */").collect(); + // insert data to branch + sEnv.executeSql( + "INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */ VALUES (10, 'v10'),(11, 'v11'),(12, 'v12')") + .await(); + + // read initial data + List actualBranch = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + actualBranch.add(branchIt.next().toString()); + } + assertThat(actualBranch) + .containsExactlyInAnyOrder("+I[1, A]", "+I[10, v10]", "+I[11, v11]", "+I[12, v12]"); + } + private void innerTestChangelogProducing(List options) throws Exception { TableEnvironment sEnv = tableEnvironmentBuilder()