Skip to content

Commit 33077b9

Browse files
committed
fixed
1 parent 8f22611 commit 33077b9

File tree

8 files changed

+191
-43
lines changed

8 files changed

+191
-43
lines changed

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.paimon.table.Table;
3939
import org.apache.paimon.table.sink.BatchWriteBuilder;
4040
import org.apache.paimon.table.system.SystemTableLoader;
41+
import org.apache.paimon.utils.BranchManager;
4142
import org.apache.paimon.utils.StringUtils;
4243

4344
import javax.annotation.Nullable;
@@ -166,9 +167,10 @@ public Map<String, String> loadDatabaseProperties(String name)
166167
protected abstract Map<String, String> loadDatabasePropertiesImpl(String name);
167168

168169
@Override
169-
public void dropPartition(Identifier identifier, Map<String, String> partitionSpec)
170+
public void dropPartition(
171+
Identifier identifier, Map<String, String> partitionSpec, String branch)
170172
throws TableNotExistException {
171-
Table table = getTable(identifier);
173+
Table table = getTable(identifier, branch);
172174
FileStoreTable fileStoreTable = (FileStoreTable) table;
173175
FileStoreCommit commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString());
174176
commit.dropPartitions(
@@ -280,23 +282,32 @@ public void renameTable(Identifier fromTable, Identifier toTable, boolean ignore
280282
public void alterTable(
281283
Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists)
282284
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
285+
alterTableImpl(identifier, changes, BranchManager.DEFAULT_MAIN_BRANCH);
286+
}
287+
288+
protected abstract void alterTableImpl(
289+
Identifier identifier, List<SchemaChange> changes, String branch)
290+
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException;
291+
292+
@Override
293+
public void alterTable(
294+
Identifier identifier,
295+
List<SchemaChange> changes,
296+
boolean ignoreIfNotExists,
297+
String branch)
298+
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
283299
checkNotSystemTable(identifier, "alterTable");
284300
validateIdentifierNameCaseInsensitive(identifier);
285301
validateFieldNameCaseInsensitiveInSchemaChange(changes);
286-
287302
if (!tableExists(identifier)) {
288303
if (ignoreIfNotExists) {
289304
return;
290305
}
291306
throw new TableNotExistException(identifier);
292307
}
293-
294-
alterTableImpl(identifier, changes);
308+
alterTableImpl(identifier, changes, branch);
295309
}
296310

297-
protected abstract void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
298-
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException;
299-
300311
@Nullable
301312
private LineageMetaFactory findAndCreateLineageMeta(Options options, ClassLoader classLoader) {
302313
return options.getOptional(LINEAGE_META)
@@ -308,7 +319,7 @@ private LineageMetaFactory findAndCreateLineageMeta(Options options, ClassLoader
308319
}
309320

310321
@Override
311-
public Table getTable(Identifier identifier) throws TableNotExistException {
322+
public Table getTable(Identifier identifier, String branch) throws TableNotExistException {
312323
if (isSystemDatabase(identifier.getDatabaseName())) {
313324
String tableName = identifier.getObjectName();
314325
Table table =
@@ -327,28 +338,29 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
327338
String tableName = splits[0];
328339
String type = splits[1];
329340
FileStoreTable originTable =
330-
getDataTable(new Identifier(identifier.getDatabaseName(), tableName));
341+
getDataTable(new Identifier(identifier.getDatabaseName(), tableName), branch);
331342
Table table = SystemTableLoader.load(type, fileIO, originTable);
332343
if (table == null) {
333344
throw new TableNotExistException(identifier);
334345
}
335346
return table;
336347
} else {
337-
Table table = getDataTable(identifier);
348+
Table table = getDataTable(identifier, branch);
338349
return table;
339350
}
340351
}
341352

342-
private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistException {
343-
TableSchema tableSchema = getDataTableSchema(identifier);
353+
private FileStoreTable getDataTable(Identifier identifier, String branch)
354+
throws TableNotExistException {
355+
TableSchema tableSchema = getDataTableSchema(identifier, branch);
344356
return FileStoreTableFactory.create(
345357
fileIO,
346358
getDataTableLocation(identifier),
347359
tableSchema,
348360
new CatalogEnvironment(
349361
Lock.factory(
350362
lockFactory().orElse(null), lockContext().orElse(null), identifier),
351-
metastoreClientFactory(identifier).orElse(null),
363+
metastoreClientFactory(identifier, branch).orElse(null),
352364
lineageMetaFactory));
353365
}
354366

@@ -379,7 +391,7 @@ public Map<String, Map<String, Path>> allTablePaths() {
379391
}
380392
}
381393

382-
protected abstract TableSchema getDataTableSchema(Identifier identifier)
394+
protected abstract TableSchema getDataTableSchema(Identifier identifier, String branch)
383395
throws TableNotExistException;
384396

385397
@VisibleForTesting

paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.schema.Schema;
2525
import org.apache.paimon.schema.SchemaChange;
2626
import org.apache.paimon.table.Table;
27+
import org.apache.paimon.utils.BranchManager;
2728

2829
import java.io.Serializable;
2930
import java.util.Collections;
@@ -66,7 +67,8 @@ default Optional<CatalogLockContext> lockContext() {
6667
}
6768

6869
/** Get metastore client factory for the table specified by {@code identifier}. */
69-
default Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier identifier) {
70+
default Optional<MetastoreClient.Factory> metastoreClientFactory(
71+
Identifier identifier, String branch) {
7072
return Optional.empty();
7173
}
7274

@@ -140,7 +142,20 @@ void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
140142
* @return The requested table
141143
* @throws TableNotExistException if the target does not exist
142144
*/
143-
Table getTable(Identifier identifier) throws TableNotExistException;
145+
default Table getTable(Identifier identifier) throws TableNotExistException {
146+
return getTable(identifier, BranchManager.DEFAULT_MAIN_BRANCH);
147+
}
148+
149+
/**
150+
* Return a {@link Table} identified by the given {@link Identifier}.
151+
*
152+
* <p>System tables can be got by '$' splitter.
153+
*
154+
* @param identifier Path of the table
155+
* @return The requested table
156+
* @throws TableNotExistException if the target does not exist
157+
*/
158+
Table getTable(Identifier identifier, String branch) throws TableNotExistException;
144159

145160
/**
146161
* Get names of all tables under this database. An empty list is returned if none exists.
@@ -226,6 +241,27 @@ void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotEx
226241
void alterTable(Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists)
227242
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException;
228243

244+
/**
245+
* Modify an existing table from {@link SchemaChange}s.
246+
*
247+
* <p>NOTE: System tables can not be altered.
248+
*
249+
* @param identifier path of the table to be modified
250+
* @param changes the schema changes
251+
* @param ignoreIfNotExists flag to specify behavior when the table does not exist: if set to
252+
* false, throw an exception, if set to true, do nothing.
253+
* @param branch specify branch
254+
* @throws TableNotExistException if the table does not exist
255+
* @throws ColumnAlreadyExistException
256+
* @throws ColumnNotExistException
257+
*/
258+
void alterTable(
259+
Identifier identifier,
260+
List<SchemaChange> changes,
261+
boolean ignoreIfNotExists,
262+
String branch)
263+
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException;
264+
229265
/**
230266
* Drop the partition of the specify table.
231267
*
@@ -234,7 +270,7 @@ void alterTable(Identifier identifier, List<SchemaChange> changes, boolean ignor
234270
* @throws TableNotExistException if the table does not exist
235271
* @throws PartitionNotExistException if the partition does not exist
236272
*/
237-
void dropPartition(Identifier identifier, Map<String, String> partitions)
273+
void dropPartition(Identifier identifier, Map<String, String> partitions, String branch)
238274
throws TableNotExistException, PartitionNotExistException;
239275

240276
/**
@@ -253,6 +289,26 @@ default void alterTable(Identifier identifier, SchemaChange change, boolean igno
253289
alterTable(identifier, Collections.singletonList(change), ignoreIfNotExists);
254290
}
255291

292+
/**
293+
* Modify an existing table from {@link SchemaChange}.
294+
*
295+
* <p>NOTE: System tables can not be altered.
296+
*
297+
* @param identifier path of the table to be modified
298+
* @param changes the schema change
299+
* @param ignoreIfNotExists flag to specify behavior when the table does not exist: if set to
300+
* false, throw an exception, if set to true, do nothing.
301+
* @param branch specify branch
302+
* @throws TableNotExistException if the table does not exist
303+
* @throws ColumnAlreadyExistException
304+
* @throws ColumnNotExistException
305+
*/
306+
default void alterTable(
307+
Identifier identifier, SchemaChange change, boolean ignoreIfNotExists, String branch)
308+
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
309+
alterTable(identifier, Collections.singletonList(change), ignoreIfNotExists, branch);
310+
}
311+
256312
/** Return a boolean that indicates whether this catalog is case-sensitive. */
257313
default boolean caseSensitive() {
258314
return true;

paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.paimon.schema.SchemaChange;
2727
import org.apache.paimon.schema.SchemaManager;
2828
import org.apache.paimon.schema.TableSchema;
29+
import org.apache.paimon.utils.BranchManager;
2930

3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
@@ -121,6 +122,10 @@ public void createTableImpl(Identifier identifier, Schema schema) {
121122
}
122123

123124
private SchemaManager schemaManager(Identifier identifier) {
125+
return schemaManager(identifier, BranchManager.DEFAULT_MAIN_BRANCH);
126+
}
127+
128+
private SchemaManager schemaManager(Identifier identifier, String branch) {
124129
Path path = getDataTableLocation(identifier);
125130
CatalogLock catalogLock =
126131
lockFactory()
@@ -133,7 +138,7 @@ private SchemaManager schemaManager(Identifier identifier) {
133138
new RuntimeException(
134139
"No lock context when lock is enabled."))))
135140
.orElse(null);
136-
return new SchemaManager(fileIO, path)
141+
return new SchemaManager(fileIO, path, branch)
137142
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
138143
}
139144

@@ -145,9 +150,9 @@ public void renameTableImpl(Identifier fromTable, Identifier toTable) {
145150
}
146151

147152
@Override
148-
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
153+
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes, String branch)
149154
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
150-
schemaManager(identifier).commitChanges(changes);
155+
schemaManager(identifier, branch).commitChanges(changes);
151156
}
152157

153158
protected static <T> T uncheck(Callable<T> callable) {

paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.paimon.schema.SchemaChange;
3333
import org.apache.paimon.schema.SchemaManager;
3434
import org.apache.paimon.schema.TableSchema;
35+
import org.apache.paimon.utils.BranchManager;
3536
import org.apache.paimon.utils.Preconditions;
3637

3738
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -248,7 +249,7 @@ protected void dropTableImpl(Identifier identifier) {
248249
protected void createTableImpl(Identifier identifier, Schema schema) {
249250
try {
250251
// create table file
251-
getSchemaManager(identifier).createTable(schema);
252+
schemaManager(identifier).createTable(schema);
252253
// Update schema metadata
253254
Path path = getDataTableLocation(identifier);
254255
int insertRecord =
@@ -308,22 +309,23 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
308309
}
309310

310311
@Override
311-
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
312+
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes, String branch)
312313
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
313314
if (!tableExists(identifier)) {
314315
throw new RuntimeException("Table is not exists " + identifier.getFullName());
315316
}
316-
SchemaManager schemaManager = getSchemaManager(identifier);
317+
SchemaManager schemaManager = schemaManager(identifier, branch);
317318
schemaManager.commitChanges(changes);
318319
}
319320

320321
@Override
321-
protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
322+
protected TableSchema getDataTableSchema(Identifier identifier, String branch)
323+
throws TableNotExistException {
322324
if (!tableExists(identifier)) {
323325
throw new TableNotExistException(identifier);
324326
}
325327
Path tableLocation = getDataTableLocation(identifier);
326-
return new SchemaManager(fileIO, tableLocation)
328+
return new SchemaManager(fileIO, tableLocation, branch)
327329
.latest()
328330
.orElseThrow(
329331
() -> new RuntimeException("There is no paimon table in " + tableLocation));
@@ -373,8 +375,12 @@ public void close() throws Exception {
373375
}
374376
}
375377

376-
private SchemaManager getSchemaManager(Identifier identifier) {
377-
return new SchemaManager(fileIO, getDataTableLocation(identifier))
378+
private SchemaManager schemaManager(Identifier identifier) {
379+
return schemaManager(identifier, BranchManager.DEFAULT_MAIN_BRANCH);
380+
}
381+
382+
private SchemaManager schemaManager(Identifier identifier, String branch) {
383+
return new SchemaManager(fileIO, getDataTableLocation(identifier), branch)
378384
.withLock(lock(identifier));
379385
}
380386

paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,9 @@ public Optional<CatalogLockContext> lockContext() {
9393
}
9494

9595
@Override
96-
public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier identifier) {
97-
return wrapped.metastoreClientFactory(identifier);
96+
public Optional<MetastoreClient.Factory> metastoreClientFactory(
97+
Identifier identifier, String branch) {
98+
return wrapped.metastoreClientFactory(identifier, branch);
9899
}
99100

100101
@Override
@@ -189,6 +190,31 @@ public void alterTable(
189190
wrapped.alterTable(identifier, changes, ignoreIfNotExists);
190191
}
191192

193+
/**
194+
* Modify an existing table from {@link SchemaChange}s.
195+
*
196+
* <p>NOTE: System tables can not be altered.
197+
*
198+
* @param identifier path of the table to be modified
199+
* @param changes the schema changes
200+
* @param ignoreIfNotExists flag to specify behavior when the table does not exist: if set to
201+
* false, throw an exception, if set to true, do nothing.
202+
* @param branch specify branch
203+
* @throws TableNotExistException if the table does not exist
204+
* @throws ColumnAlreadyExistException
205+
* @throws ColumnNotExistException
206+
*/
207+
@Override
208+
public void alterTable(
209+
Identifier identifier,
210+
List<SchemaChange> changes,
211+
boolean ignoreIfNotExists,
212+
String branch)
213+
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
214+
privilegeManager.getPrivilegeChecker().assertCanAlterTable(identifier);
215+
wrapped.alterTable(identifier, changes, ignoreIfNotExists, branch);
216+
}
217+
192218
@Override
193219
public Table getTable(Identifier identifier) throws TableNotExistException {
194220
Table table = wrapped.getTable(identifier);
@@ -200,11 +226,32 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
200226
}
201227
}
202228

229+
/**
230+
* Return a {@link Table} identified by the given {@link Identifier}.
231+
*
232+
* <p>System tables can be got by '$' splitter.
233+
*
234+
* @param identifier Path of the table
235+
* @param branch
236+
* @return The requested table
237+
* @throws TableNotExistException if the target does not exist
238+
*/
239+
@Override
240+
public Table getTable(Identifier identifier, String branch) throws TableNotExistException {
241+
Table table = wrapped.getTable(identifier, branch);
242+
if (table instanceof FileStoreTable) {
243+
return new PrivilegedFileStoreTable(
244+
(FileStoreTable) table, privilegeManager.getPrivilegeChecker(), identifier);
245+
} else {
246+
return table;
247+
}
248+
}
249+
203250
@Override
204-
public void dropPartition(Identifier identifier, Map<String, String> partitions)
251+
public void dropPartition(Identifier identifier, Map<String, String> partitions, String branch)
205252
throws TableNotExistException, PartitionNotExistException {
206253
privilegeManager.getPrivilegeChecker().assertCanInsert(identifier);
207-
wrapped.dropPartition(identifier, partitions);
254+
wrapped.dropPartition(identifier, partitions, branch);
208255
}
209256

210257
@Override

0 commit comments

Comments
 (0)