From 6d4123004fd9e4a57b47f1db4b1db2bf514cded0 Mon Sep 17 00:00:00 2001 From: klion26 Date: Thu, 6 Jun 2024 11:26:16 +0800 Subject: [PATCH] [AMORO-2893] Optimizing the efficiency for restapi retrieve optimizing tables Befor the change we'll retrieve all tables from db and sort in the memory, this will be slow if there are many entriesin the db, after this change we sort in the db and only retrieve the returned entries. --- .../server/DefaultOptimizingService.java | 11 ++- .../controller/OptimizerController.java | 41 +++-------- .../dashboard/controller/TableController.java | 6 +- .../server/optimizing/OptimizingQueue.java | 36 +++++----- .../TableRuntimeMeta.java | 19 +---- .../persistence/mapper/TableMetaMapper.java | 69 ++++++++++++++++++- .../server/table/DefaultTableService.java | 52 ++++++++------ .../server/table/RuntimeHandlerChain.java | 13 ++-- .../amoro/server/table/TableManager.java | 6 +- .../amoro/server/table/TableRuntime.java | 53 +++++++++++++- .../amoro/server/table/TableService.java | 10 +++ .../table/executor/BaseTableExecutor.java | 9 ++- .../server/RestCatalogServiceTestBase.java | 2 +- .../server/TestDefaultOptimizingService.java | 11 +-- .../optimizing/TestOptimizingQueue.java | 53 +++++++------- .../server/table/TestTableRuntimeHandler.java | 11 +-- .../server/table/TestTableRuntimeManager.java | 8 +-- .../amoro/server/table/TestTableService.java | 2 +- 18 files changed, 256 insertions(+), 156 deletions(-) rename amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/{table => persistence}/TableRuntimeMeta.java (94%) diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index 10e6ecea98..00bd1d9b7b 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -49,7 +49,6 @@ import org.apache.amoro.server.table.DefaultTableService; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; @@ -121,17 +120,17 @@ public RuntimeHandlerChain getTableRuntimeHandler() { return tableHandlerChain; } - private void loadOptimizingQueues(List tableRuntimeMetaList) { + private void loadOptimizingQueues(List tableRuntimeMetaList) { List optimizerGroups = getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups); List optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll); - Map> groupToTableRuntimes = + Map> groupToTableRuntimes = tableRuntimeMetaList.stream() - .collect(Collectors.groupingBy(TableRuntimeMeta::getOptimizerGroup)); + .collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup)); optimizerGroups.forEach( group -> { String groupName = group.getName(); - List tableRuntimeMetas = groupToTableRuntimes.remove(groupName); + List tableRuntimeMetas = groupToTableRuntimes.remove(groupName); OptimizingQueue optimizingQueue = new OptimizingQueue( tableService, @@ -455,7 +454,7 @@ public void handleTableRemoved(TableRuntime tableRuntime) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeMetaList) { LOG.info("OptimizerManagementService begin initializing"); loadOptimizingQueues(tableRuntimeMetaList); optimizerKeeper.start(); diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java index aef413d694..3a02ac8482 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java @@ -19,7 +19,6 @@ package org.apache.amoro.server.dashboard.controller; import io.javalin.http.Context; -import org.apache.amoro.api.ServerTableIdentifier; import org.apache.amoro.api.resource.Resource; import org.apache.amoro.api.resource.ResourceGroup; import org.apache.amoro.api.resource.ResourceType; @@ -30,13 +29,13 @@ import org.apache.amoro.server.dashboard.response.OkResponse; import org.apache.amoro.server.dashboard.response.PageResult; import org.apache.amoro.server.dashboard.utils.OptimizingUtil; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.ResourceContainers; import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; import javax.ws.rs.BadRequestException; @@ -67,34 +66,16 @@ public void getOptimizerTables(Context ctx) { Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20); int offset = (page - 1) * pageSize; - List tableRuntimes = new ArrayList<>(); - List tables = tableService.listManagedTables(); - for (ServerTableIdentifier identifier : tables) { - TableRuntime tableRuntime = tableService.getRuntime(identifier); - if (tableRuntime == null) { - continue; - } - if ((ALL_GROUP.equals(optimizerGroup) - || tableRuntime.getOptimizerGroup().equals(optimizerGroup)) - && (StringUtils.isEmpty(dbFilterStr) - || StringUtils.containsIgnoreCase(identifier.getDatabase(), dbFilterStr)) - && (StringUtils.isEmpty(tableFilterStr) - || StringUtils.containsIgnoreCase(identifier.getTableName(), tableFilterStr))) { - tableRuntimes.add(tableRuntime); - } - } - tableRuntimes.sort( - (o1, o2) -> { - // first we compare the status , and then we compare the start time when status are equal; - int statDiff = o1.getOptimizingStatus().compareTo(o2.getOptimizingStatus()); - // status order is asc, startTime order is desc - if (statDiff == 0) { - long timeDiff = o1.getCurrentStatusStartTime() - o2.getCurrentStatusStartTime(); - return timeDiff >= 0 ? (timeDiff == 0 ? 0 : -1) : 1; - } else { - return statDiff; - } - }); + String optimizerGroupUsedInDbFilter = "all".equals(optimizerGroup) ? null : optimizerGroup; + // get all info from underlying table table_runtime + List tableRuntimeBeans = + tableService.getTableRuntimes(optimizerGroupUsedInDbFilter, pageSize, offset); + + List tableRuntimes = + tableRuntimeBeans.stream() + .map(meta -> tableService.getRuntime(meta.getTableId())) + .collect(Collectors.toList()); + PageResult amsPageResult = PageResult.of(tableRuntimes, offset, pageSize, OptimizingUtil::buildTableOptimizeInfo); ctx.json(OkResponse.of(amsPageResult)); diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java index 8d6884f47f..dfcea2982b 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java @@ -148,7 +148,7 @@ public void getTableDetail(Context ctx) { if (serverTableIdentifier.isPresent()) { tableSummary.put( "optimizingStatus", - tableService.getRuntime(serverTableIdentifier.get()).getOptimizingStatus()); + tableService.getRuntime(serverTableIdentifier.get().getId()).getOptimizingStatus()); } else { tableSummary.put("optimizingStatus", OptimizingStatus.IDLE); } @@ -650,7 +650,9 @@ public void cancelOptimizingProcess(Context ctx) { tableService.getServerTableIdentifier( TableIdentifier.of(catalog, db, table).buildTableIdentifier()); TableRuntime tableRuntime = - serverTableIdentifier != null ? tableService.getRuntime(serverTableIdentifier) : null; + serverTableIdentifier != null + ? tableService.getRuntime(serverTableIdentifier.getId()) + : null; Preconditions.checkArgument( tableRuntime != null diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index 4752ca62ca..e0cf814abd 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -36,7 +36,6 @@ import org.apache.amoro.server.resource.QuotaProvider; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; @@ -93,7 +92,7 @@ public OptimizingQueue( ResourceGroup optimizerGroup, QuotaProvider quotaProvider, Executor planExecutor, - List tableRuntimeMetaList, + List tableRuntimeMetaList, int maxPlanningParallelism) { Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null"); this.planExecutor = planExecutor; @@ -109,11 +108,10 @@ public OptimizingQueue( tableRuntimeMetaList.forEach(this::initTableRuntime); } - private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) { - TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime(); + private void initTableRuntime(TableRuntime tableRuntime) { if (tableRuntime.getOptimizingStatus().isProcessing() - && tableRuntimeMeta.getOptimizingProcessId() != 0) { - tableRuntime.recover(new TableOptimizingProcess(tableRuntimeMeta)); + && tableRuntime.getOptimizingProcess().getProcessId() != 0) { + tableRuntime.recover(new TableOptimizingProcess(tableRuntime)); } if (tableRuntime.isOptimizingEnabled()) { @@ -122,7 +120,7 @@ private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) { if (!tableRuntime.getOptimizingStatus().isProcessing()) { scheduler.addTable(tableRuntime); } else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) { - tableQueue.offer(new TableOptimizingProcess(tableRuntimeMeta)); + tableQueue.offer(new TableOptimizingProcess(tableRuntime)); } } else { OptimizingProcess process = tableRuntime.getOptimizingProcess(); @@ -376,21 +374,21 @@ public TableOptimizingProcess(OptimizingPlanner planner) { beginAndPersistProcess(); } - public TableOptimizingProcess(TableRuntimeMeta tableRuntimeMeta) { - processId = tableRuntimeMeta.getOptimizingProcessId(); - tableRuntime = tableRuntimeMeta.getTableRuntime(); - optimizingType = tableRuntimeMeta.getOptimizingType(); - targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId(); - targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId(); - planTime = tableRuntimeMeta.getPlanTime(); - if (tableRuntimeMeta.getFromSequence() != null) { - fromSequence = tableRuntimeMeta.getFromSequence(); + public TableOptimizingProcess(TableRuntime tableRuntime) { + processId = tableRuntime.getOptimizingProcess().getProcessId(); + this.tableRuntime = tableRuntime; + optimizingType = tableRuntime.getOptimizingProcess().getOptimizingType(); + targetSnapshotId = tableRuntime.getTargetSnapshotId(); + targetChangeSnapshotId = tableRuntime.getTargetChangeSnapshotId(); + planTime = tableRuntime.getLastPlanTime(); + if (tableRuntime.getFromSequence() != null) { + fromSequence = tableRuntime.getFromSequence(); } - if (tableRuntimeMeta.getToSequence() != null) { - toSequence = tableRuntimeMeta.getToSequence(); + if (tableRuntime.getToSequence() != null) { + toSequence = tableRuntime.getToSequence(); } if (this.status != OptimizingProcess.Status.CLOSED) { - tableRuntimeMeta.getTableRuntime().recover(this); + tableRuntime.recover(this); } loadTaskRuntimes(this); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java similarity index 94% rename from amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java rename to amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java index 92ca2e6a36..51d7f0fda1 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.table; +package org.apache.amoro.server.persistence; import org.apache.amoro.TableFormat; import org.apache.amoro.api.config.TableConfiguration; @@ -27,6 +27,7 @@ import java.util.Map; +/** The class for table used when transfer data from/to database. */ public class TableRuntimeMeta { private long tableId; private String catalogName; @@ -57,24 +58,8 @@ public class TableRuntimeMeta { private Map fromSequence; private Map toSequence; - private TableRuntime tableRuntime; - public TableRuntimeMeta() {} - public TableRuntime constructTableRuntime(TableManager initializer) { - if (tableRuntime == null) { - tableRuntime = new TableRuntime(this, initializer); - } - return tableRuntime; - } - - public TableRuntime getTableRuntime() { - if (tableRuntime == null) { - throw new IllegalStateException("TableRuntime is not constructed yet."); - } - return tableRuntime; - } - public long getTargetSnapshotId() { return targetSnapshotId; } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java index b01e4ff4c4..28c7264de4 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java @@ -19,13 +19,13 @@ package org.apache.amoro.server.persistence.mapper; import org.apache.amoro.api.ServerTableIdentifier; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.converter.JsonObjectConverter; import org.apache.amoro.server.persistence.converter.Long2TsConverter; import org.apache.amoro.server.persistence.converter.Map2StringConverter; import org.apache.amoro.server.persistence.converter.MapLong2StringConverter; import org.apache.amoro.server.table.TableMetadata; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Options; @@ -424,4 +424,71 @@ List selectTableIdentifiersByCatalog( typeHandler = MapLong2StringConverter.class) }) List selectTableRuntimeMetas(); + + @Select( + "") + @Results({ + @Result(property = "tableId", column = "table_id"), + @Result(property = "catalogName", column = "catalog_name"), + @Result(property = "dbName", column = "db_name"), + @Result(property = "tableName", column = "table_name"), + @Result(property = "currentSnapshotId", column = "current_snapshot_id"), + @Result(property = "currentChangeSnapshotId", column = "current_change_snapshotId"), + @Result(property = "lastOptimizedSnapshotId", column = "last_optimized_snapshotId"), + @Result( + property = "lastOptimizedChangeSnapshotId", + column = "last_optimized_change_snapshotId"), + @Result( + property = "lastMajorOptimizingTime", + column = "last_major_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "lastMinorOptimizingTime", + column = "last_minor_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "lastFullOptimizingTime", + column = "last_full_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result(property = "tableStatus", column = "optimizing_status"), + @Result( + property = "currentStatusStartTime", + column = "optimizing_status_start_time", + typeHandler = Long2TsConverter.class), + @Result(property = "optimizingProcessId", column = "optimizing_process_id"), + @Result(property = "optimizerGroup", column = "optimizer_group"), + @Result( + property = "pendingInput", + column = "pending_input", + typeHandler = JsonObjectConverter.class), + @Result( + property = "tableConfig", + column = "table_config", + typeHandler = JsonObjectConverter.class), + }) + List selectTableRuntimesForOptimizerGroup( + @Param("optimizerGroup") String optimizerGroup, + @Param("limitCount") int limitCount, + @Param("offsetNum") int offset); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index b3266b9d7c..82757f99d1 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -40,6 +40,7 @@ import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.persistence.StatedPersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; import org.apache.amoro.server.table.blocker.TableBlocker; @@ -55,6 +56,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -80,8 +82,7 @@ public class DefaultTableService extends StatedPersistentBase implements TableSe private final Map internalCatalogMap = new ConcurrentHashMap<>(); private final Map externalCatalogMap = new ConcurrentHashMap<>(); - private final Map tableRuntimeMap = - new ConcurrentHashMap<>(); + private final Map tableRuntimeMap = new ConcurrentHashMap<>(); private final ScheduledExecutorService tableExplorerScheduler = Executors.newSingleThreadScheduledExecutor( @@ -138,12 +139,6 @@ public ServerCatalog getServerCatalog(String catalogName) { .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); } - @Override - public InternalCatalog getInternalCatalog(String catalogName) { - return Optional.ofNullable(internalCatalogMap.get(catalogName)) - .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); - } - @Override public void createCatalog(CatalogMeta catalogMeta) { checkStarted(); @@ -208,7 +203,7 @@ public void dropTableMetadata(TableIdentifier tableIdentifier, boolean deleteDat } ServerTableIdentifier serverTableIdentifier = internalCatalog.dropTable(database, table); - Optional.ofNullable(tableRuntimeMap.remove(serverTableIdentifier)) + Optional.ofNullable(tableRuntimeMap.remove(serverTableIdentifier.getId())) .ifPresent( tableRuntime -> { if (headHandler != null) { @@ -260,7 +255,7 @@ public Blocker block( @Override public void releaseBlocker(TableIdentifier tableIdentifier, String blockerId) { checkStarted(); - TableRuntime tableRuntime = getRuntime(getServerTableIdentifier(tableIdentifier)); + TableRuntime tableRuntime = getRuntime(getServerTableIdentifier(tableIdentifier).getId()); if (tableRuntime != null) { tableRuntime.release(blockerId); } @@ -281,6 +276,19 @@ public List getBlockers(TableIdentifier tableIdentifier) { .collect(Collectors.toList()); } + @Override + public List getTableRuntimes(String optimizerGroup, int limit, int offset) { + checkStarted(); + return getAs( + TableMetaMapper.class, + mapper -> mapper.selectTableRuntimesForOptimizerGroup(optimizerGroup, limit, offset)); + } + + public InternalCatalog getInternalCatalog(String catalogName) { + return Optional.ofNullable(internalCatalogMap.get(catalogName)) + .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); + } + @Override public void addHandlerChain(RuntimeHandlerChain handler) { checkNotStarted(); @@ -313,15 +321,17 @@ public void initialize() { List tableRuntimeMetaList = getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas); + List tableRuntimes = new ArrayList<>(tableRuntimeMetaList.size()); tableRuntimeMetaList.forEach( tableRuntimeMeta -> { - TableRuntime tableRuntime = tableRuntimeMeta.constructTableRuntime(this); - tableRuntimeMap.put(tableRuntime.getTableIdentifier(), tableRuntime); + TableRuntime tableRuntime = new TableRuntime(tableRuntimeMeta, this); + tableRuntimeMap.put(tableRuntimeMeta.getTableId(), tableRuntime); tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); + tableRuntimes.add(tableRuntime); }); if (headHandler != null) { - headHandler.initialize(tableRuntimeMetaList); + headHandler.initialize(tableRuntimes); } if (tableExplorerExecutors == null) { int threadCount = @@ -348,7 +358,7 @@ public void initialize() { private TableRuntime getAndCheckExist(ServerTableIdentifier tableIdentifier) { Preconditions.checkArgument(tableIdentifier != null, "tableIdentifier cannot be null"); - TableRuntime tableRuntime = getRuntime(tableIdentifier); + TableRuntime tableRuntime = getRuntime(tableIdentifier.getId()); if (tableRuntime == null) { throw new ObjectNotExistsException(tableIdentifier); } @@ -384,15 +394,15 @@ private ServerTableIdentifier getOrSyncServerTableIdentifier(TableIdentifier id) } @Override - public TableRuntime getRuntime(ServerTableIdentifier tableIdentifier) { + public TableRuntime getRuntime(Long tableId) { checkStarted(); - return tableRuntimeMap.get(tableIdentifier); + return tableRuntimeMap.get(tableId); } @Override - public boolean contains(ServerTableIdentifier tableIdentifier) { + public boolean contains(Long tableId) { checkStarted(); - return tableRuntimeMap.containsKey(tableIdentifier); + return tableRuntimeMap.containsKey(tableId); } public void dispose() { @@ -582,7 +592,7 @@ private boolean triggerTableAdded( } } TableRuntime tableRuntime = new TableRuntime(serverTableIdentifier, this, table.properties()); - tableRuntimeMap.put(serverTableIdentifier, tableRuntime); + tableRuntimeMap.put(serverTableIdentifier.getId(), tableRuntime); tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); if (headHandler != null) { headHandler.fireTableAdded(table, tableRuntime); @@ -596,7 +606,7 @@ private void revertTableRuntimeAdded( externalCatalog.getServerTableIdentifier( tableIdentity.getDatabase(), tableIdentity.getTableName()); if (tableIdentifier != null) { - tableRuntimeMap.remove(tableIdentifier); + tableRuntimeMap.remove(tableIdentifier.getId()); } } @@ -608,7 +618,7 @@ private void disposeTable(ServerTableIdentifier tableIdentifier) { tableIdentifier.getCatalog(), tableIdentifier.getDatabase(), tableIdentifier.getTableName())); - Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier)) + Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier.getId())) .ifPresent( tableRuntime -> { if (headHandler != null) { diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java index bb3e64ceef..3331b427c1 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java @@ -51,16 +51,15 @@ protected void appendNext(RuntimeHandlerChain handler) { } } - public final void initialize(List tableRuntimeMetaList) { - List supportedtableRuntimeMetaList = - tableRuntimeMetaList.stream() - .filter( - tableRuntimeMeta -> formatSupported(tableRuntimeMeta.getTableRuntime().getFormat())) + public final void initialize(List tableRuntimes) { + List supportedtableRuntimeMetaList = + tableRuntimes.stream() + .filter(runtime -> formatSupported(runtime.getFormat())) .collect(Collectors.toList()); initHandler(supportedtableRuntimeMetaList); initialized = true; if (next != null) { - next.initialize(tableRuntimeMetaList); + next.initialize(tableRuntimes); } } @@ -147,7 +146,7 @@ protected abstract void handleConfigChanged( protected abstract void handleTableRemoved(TableRuntime tableRuntime); - protected abstract void initHandler(List tableRuntimeMetaList); + protected abstract void initHandler(List tableRuntimeMetaList); protected abstract void doDispose(); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableManager.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableManager.java index d5fffcdf82..65a1aa3456 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableManager.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableManager.java @@ -31,9 +31,9 @@ public interface TableManager extends TableRuntimeHandler { */ AmoroTable loadTable(ServerTableIdentifier tableIdentifier); - TableRuntime getRuntime(ServerTableIdentifier tableIdentifier); + TableRuntime getRuntime(Long tableId); - default boolean contains(ServerTableIdentifier tableIdentifier) { - return getRuntime(tableIdentifier) != null; + default boolean contains(Long tableId) { + return getRuntime(tableId) != null; } } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java index e97143133c..fd335bf3ce 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java @@ -35,6 +35,7 @@ import org.apache.amoro.server.optimizing.TaskRuntime; import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator; import org.apache.amoro.server.persistence.StatedPersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.OptimizingMapper; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; @@ -97,7 +98,14 @@ public class TableRuntime extends StatedPersistentBase { private final TableOptimizingMetrics optimizingMetrics; private final ReentrantLock blockerLock = new ReentrantLock(); - protected TableRuntime( + private long targetSnapshotId; + + private long targetChangeSnapshotId; + + private Map fromSequence; + private Map toSequence; + + public TableRuntime( ServerTableIdentifier tableIdentifier, TableRuntimeHandler tableHandler, Map properties) { @@ -111,9 +119,10 @@ protected TableRuntime( optimizingMetrics = new TableOptimizingMetrics(tableIdentifier); } - protected TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler tableHandler) { + public TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler tableHandler) { Preconditions.checkNotNull(tableRuntimeMeta, "TableRuntimeMeta must not be null."); Preconditions.checkNotNull(tableHandler, "TableRuntimeHandler must not be null."); + this.tableHandler = tableHandler; this.tableIdentifier = ServerTableIdentifier.of( @@ -140,6 +149,10 @@ protected TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler ta this.pendingInput = tableRuntimeMeta.getPendingInput(); optimizingMetrics = new TableOptimizingMetrics(tableIdentifier); optimizingMetrics.statusChanged(optimizingStatus, this.currentStatusStartTime); + this.targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId(); + this.targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId(); + this.fromSequence = tableRuntimeMeta.getFromSequence(); + this.toSequence = tableRuntimeMeta.getToSequence(); } public void recover(OptimizingProcess optimizingProcess) { @@ -474,6 +487,38 @@ public void setLastPlanTime(long lastPlanTime) { this.lastPlanTime = lastPlanTime; } + public long getTargetSnapshotId() { + return targetSnapshotId; + } + + public void setTargetSnapshotId(long targetSnapshotId) { + this.targetSnapshotId = targetSnapshotId; + } + + public long getTargetChangeSnapshotId() { + return targetChangeSnapshotId; + } + + public void setTargetChangeSnapshotId(long targetChangeSnapshotId) { + this.targetChangeSnapshotId = targetChangeSnapshotId; + } + + public Map getFromSequence() { + return fromSequence; + } + + public void setFromSequence(Map fromSequence) { + this.fromSequence = fromSequence; + } + + public Map getToSequence() { + return toSequence; + } + + public void setToSequence(Map toSequence) { + this.toSequence = toSequence; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -487,6 +532,10 @@ public String toString() { .add("lastFullOptimizingTime", lastFullOptimizingTime) .add("lastMinorOptimizingTime", lastMinorOptimizingTime) .add("tableConfiguration", tableConfiguration) + .add("targetSnapshotId", targetSnapshotId) + .add("targetChangeSnapshotId", targetChangeSnapshotId) + .add("fromSequence", fromSequence) + .add("toSequence", toSequence) .toString(); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java index 6b87db18b6..8881b9bdbc 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java @@ -23,6 +23,7 @@ import org.apache.amoro.api.ServerTableIdentifier; import org.apache.amoro.api.TableIdentifier; import org.apache.amoro.server.catalog.CatalogService; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import java.util.List; import java.util.Map; @@ -88,4 +89,13 @@ Blocker block( * @return block list */ List getBlockers(TableIdentifier tableIdentifier); + + /** + * Get the table info from database for given parameters. + * @param optimizerGroup The optimizer group of the table associated to. + * will be if we want the info for all groups. + * @param limit How many entries we want to retrieve. + * @param offset The entries we'll skip when retrieving the entries. + */ + List getTableRuntimes(String optimizerGroup, int limit, int offset); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java index 29edacac92..b85445cf0b 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java @@ -25,7 +25,6 @@ import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -63,10 +62,9 @@ protected BaseTableExecutor(TableManager tableManager, int poolSize) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeMetaList) { tableRuntimeMetaList.stream() - .map(tableRuntimeMeta -> tableRuntimeMeta.getTableRuntime()) - .filter(tableRuntime -> enabled(tableRuntime)) + .filter(this::enabled) .forEach( tableRuntime -> { if (scheduledTables.add(tableRuntime.getTableIdentifier())) { @@ -109,7 +107,8 @@ protected String getThreadName() { } private boolean isExecutable(TableRuntime tableRuntime) { - return tableManager.contains(tableRuntime.getTableIdentifier()) && enabled(tableRuntime); + return tableManager.contains(tableRuntime.getTableIdentifier().getId()) + && enabled(tableRuntime); } @Override diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java index 8f83f4ff98..05b21b7838 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java @@ -121,7 +121,7 @@ protected TableMetadata getTableMetadata(TableIdentifier identifier) { protected TableRuntime getTableRuntime(TableIdentifier identifier) { ServerTableIdentifier serverTableIdentifier = getServerTableIdentifier(identifier); - return tableService.getRuntime(serverTableIdentifier); + return tableService.getRuntime(serverTableIdentifier.getId()); } protected void assertTableRuntime(TableIdentifier identifier, TableFormat format) { diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index 01631f073c..ea21898d0c 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -114,7 +114,7 @@ private void initTableWithFiles() { (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); appendData(mixedTable.asUnkeyedTable(), 1); appendData(mixedTable.asUnkeyedTable(), 2); - TableRuntime runtime = tableService().getRuntime(serverTableIdentifier()); + TableRuntime runtime = tableService().getRuntime(serverTableIdentifier().getId()); runtime.refresh(tableService().loadTable(serverTableIdentifier())); } @@ -387,10 +387,13 @@ private void assertTaskCompleted(TaskRuntime taskRuntime) { 0, optimizingService().listTasks(defaultResourceGroup().getName()).size()); Assertions.assertEquals( OptimizingProcess.Status.RUNNING, - tableService().getRuntime(serverTableIdentifier()).getOptimizingProcess().getStatus()); + tableService() + .getRuntime(serverTableIdentifier().getId()) + .getOptimizingProcess() + .getStatus()); Assertions.assertEquals( OptimizingStatus.COMMITTING, - tableService().getRuntime(serverTableIdentifier()).getOptimizingStatus()); + tableService().getRuntime(serverTableIdentifier().getId()).getOptimizingStatus()); } protected void reload() { @@ -415,7 +418,7 @@ public TableRuntimeRefresher() { } void refreshPending() { - execute(tableService().getRuntime(serverTableIdentifier())); + execute(tableService().getRuntime(serverTableIdentifier().getId())); } } diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java index 25e24565ee..8722267a45 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java @@ -45,12 +45,12 @@ import org.apache.amoro.optimizing.TableOptimizing; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.metrics.MetricRegistry; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.OptimizerThread; import org.apache.amoro.server.resource.QuotaProvider; import org.apache.amoro.server.table.AMSTableTestBase; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.MixedTable; @@ -103,7 +103,7 @@ protected static ResourceGroup testResourceGroup() { return new ResourceGroup.Builder("test", "local").build(); } - protected OptimizingQueue buildOptimizingGroupService(TableRuntimeMeta tableRuntimeMeta) { + protected OptimizingQueue buildOptimizingGroupService(TableRuntime tableRuntimeMeta) { return new OptimizingQueue( tableService(), testResourceGroup(), @@ -125,7 +125,7 @@ private OptimizingQueue buildOptimizingGroupService() { @Test public void testPollNoTask() { - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntimeMeta = buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup()); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); Assert.assertNull(queue.pollTask(0)); @@ -136,25 +136,25 @@ public void testPollNoTask() { public void testRefreshAndReleaseTable() { OptimizingQueue queue = buildOptimizingGroupService(); Assert.assertEquals(0, queue.getSchedulingPolicy().getTableRuntimeMap().size()); - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntime = buildTableRuntimeMeta(OptimizingStatus.IDLE, defaultResourceGroup()); - queue.refreshTable(tableRuntimeMeta.getTableRuntime()); + queue.refreshTable(tableRuntime); Assert.assertEquals(1, queue.getSchedulingPolicy().getTableRuntimeMap().size()); Assert.assertTrue( queue.getSchedulingPolicy().getTableRuntimeMap().containsKey(serverTableIdentifier())); - queue.releaseTable(tableRuntimeMeta.getTableRuntime()); + queue.releaseTable(tableRuntime); Assert.assertEquals(0, queue.getSchedulingPolicy().getTableRuntimeMap().size()); - queue.refreshTable(tableRuntimeMeta.getTableRuntime()); + queue.refreshTable(tableRuntime); Assert.assertEquals(1, queue.getSchedulingPolicy().getTableRuntimeMap().size()); queue.dispose(); } @Test public void testPollTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); // 1.poll task TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -167,7 +167,7 @@ public void testPollTask() { @Test public void testRetryTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); + TableRuntime tableRuntimeMeta = initTableWithFiles(); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); // 1.poll task @@ -200,8 +200,8 @@ public void testRetryTask() { @Test public void testCommitTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); Assert.assertEquals(0, queue.collectTasks().size()); TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -216,11 +216,11 @@ public void testCommitTask() { Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus()); // 7.commit - OptimizingProcess optimizingProcess = tableRuntimeMeta.getTableRuntime().getOptimizingProcess(); + OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess(); Assert.assertEquals(OptimizingProcess.Status.RUNNING, optimizingProcess.getStatus()); optimizingProcess.commit(); Assert.assertEquals(OptimizingProcess.Status.SUCCESS, optimizingProcess.getStatus()); - Assert.assertNull(tableRuntimeMeta.getTableRuntime().getOptimizingProcess()); + Assert.assertNull(tableRuntime.getOptimizingProcess()); // 8.commit again, throw exceptions, and status not changed. Assert.assertThrows(IllegalStateException.class, optimizingProcess::commit); @@ -232,8 +232,8 @@ public void testCommitTask() { @Test public void testCollectingTasks() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); Assert.assertEquals(0, queue.collectTasks().size()); TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -247,8 +247,8 @@ public void testCollectingTasks() { @Test public void testTaskAndTableMetrics() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); MetricRegistry registry = MetricManager.getInstance().getGlobalRegistry(); Map tagValues = ImmutableMap.of(GROUP_TAG, testResourceGroup().getName()); @@ -299,7 +299,7 @@ public void testTaskAndTableMetrics() { Assert.assertEquals(0, pendingTablesGauge.getValue().longValue()); Assert.assertEquals(1, executingTablesGauge.getValue().longValue()); - OptimizingProcess optimizingProcess = tableRuntimeMeta.getTableRuntime().getOptimizingProcess(); + OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess(); optimizingProcess.commit(); Assert.assertEquals(0, queueTasksGauge.getValue().longValue()); Assert.assertEquals(0, executingTasksGauge.getValue().longValue()); @@ -345,21 +345,19 @@ public void testAddAndRemoveOptimizers() { queue.dispose(); } - protected TableRuntimeMeta initTableWithFiles() { + protected TableRuntime initTableWithFiles() { MixedTable mixedTable = (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); appendData(mixedTable.asUnkeyedTable(), 1); appendData(mixedTable.asUnkeyedTable(), 2); - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntime = buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup()); - TableRuntime runtime = tableRuntimeMeta.getTableRuntime(); - runtime.refresh(tableService().loadTable(serverTableIdentifier())); - return tableRuntimeMeta; + tableRuntime.refresh(tableService().loadTable(serverTableIdentifier())); + return tableRuntime; } - private TableRuntimeMeta buildTableRuntimeMeta( - OptimizingStatus status, ResourceGroup resourceGroup) { + private TableRuntime buildTableRuntimeMeta(OptimizingStatus status, ResourceGroup resourceGroup) { MixedTable mixedTable = (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta(); @@ -371,8 +369,7 @@ private TableRuntimeMeta buildTableRuntimeMeta( tableRuntimeMeta.setTableStatus(status); tableRuntimeMeta.setTableConfig(TableConfiguration.parseConfig(mixedTable.properties())); tableRuntimeMeta.setOptimizerGroup(resourceGroup.getName()); - tableRuntimeMeta.constructTableRuntime(tableService()); - return tableRuntimeMeta; + return new TableRuntime(tableRuntimeMeta, tableService()); } private void appendData(UnkeyedTable table, int id) { diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java index c641fb258d..6c0eef31fc 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java @@ -92,14 +92,15 @@ public void testInitialize() throws Exception { tableService.initialize(); Assert.assertEquals(1, handler.getInitTables().size()); Assert.assertEquals( - createTableId.getId().longValue(), handler.getInitTables().get(0).getTableId()); + (Long) createTableId.getId().longValue(), + handler.getInitTables().get(0).getTableIdentifier().getId()); // test change properties MixedTable mixedTable = (MixedTable) tableService().loadTable(createTableId).originalTable(); mixedTable.updateProperties().set(TableProperties.ENABLE_ORPHAN_CLEAN, "true").commit(); tableService() - .getRuntime(createTableId) + .getRuntime(createTableId.getId()) .refresh(tableService.loadTable(serverTableIdentifier())); Assert.assertEquals(1, handler.getConfigChangedTables().size()); validateTableRuntime(handler.getConfigChangedTables().get(0).first()); @@ -131,7 +132,7 @@ protected DefaultTableService tableService() { static class TestHandler extends RuntimeHandlerChain { - private final List initTables = Lists.newArrayList(); + private final List initTables = Lists.newArrayList(); private final List> statusChangedTables = Lists.newArrayList(); private final List> configChangedTables = @@ -162,7 +163,7 @@ protected void handleTableRemoved(TableRuntime tableRuntime) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeMetaList) { initTables.addAll(tableRuntimeMetaList); } @@ -171,7 +172,7 @@ protected void doDispose() { disposed = true; } - public List getInitTables() { + public List getInitTables() { return initTables; } diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java index 423e09795c..35b1e8c435 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java @@ -71,7 +71,7 @@ public void testLoadTable() { @Test public void testTableContains() { - Assert.assertTrue(tableService().contains(serverTableIdentifier())); + Assert.assertTrue(tableService().contains(serverTableIdentifier().getId())); ServerTableIdentifier copyId = ServerTableIdentifier.of( null, @@ -79,7 +79,7 @@ public void testTableContains() { serverTableIdentifier().getDatabase(), serverTableIdentifier().getTableName(), serverTableIdentifier().getFormat()); - Assert.assertFalse(tableService().contains(copyId)); + Assert.assertFalse(tableService().contains(copyId.getId())); copyId = ServerTableIdentifier.of( serverTableIdentifier().getId(), @@ -87,12 +87,12 @@ public void testTableContains() { serverTableIdentifier().getDatabase(), "unknown", serverTableIdentifier().getFormat()); - Assert.assertFalse(tableService().contains(copyId)); + Assert.assertFalse(tableService().contains(copyId.getId())); } @Test public void testTableRuntime() { - TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier()); + TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier().getId()); validateTableRuntime(tableRuntime); } } diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableService.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableService.java index dea85f1a73..cbf8010c6e 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableService.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableService.java @@ -341,7 +341,7 @@ private boolean isBlocked(BlockableOperation operation) { } private boolean isTableRuntimeBlocked(BlockableOperation operation) { - return tableService().getRuntime(serverTableIdentifier()).isBlocked(operation); + return tableService().getRuntime(serverTableIdentifier().getId()).isBlocked(operation); } private void assertBlockerCnt(int i) {