diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index 10e6ecea98..00bd1d9b7b 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -49,7 +49,6 @@ import org.apache.amoro.server.table.DefaultTableService; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; @@ -121,17 +120,17 @@ public RuntimeHandlerChain getTableRuntimeHandler() { return tableHandlerChain; } - private void loadOptimizingQueues(List tableRuntimeMetaList) { + private void loadOptimizingQueues(List tableRuntimeMetaList) { List optimizerGroups = getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups); List optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll); - Map> groupToTableRuntimes = + Map> groupToTableRuntimes = tableRuntimeMetaList.stream() - .collect(Collectors.groupingBy(TableRuntimeMeta::getOptimizerGroup)); + .collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup)); optimizerGroups.forEach( group -> { String groupName = group.getName(); - List tableRuntimeMetas = groupToTableRuntimes.remove(groupName); + List tableRuntimeMetas = groupToTableRuntimes.remove(groupName); OptimizingQueue optimizingQueue = new OptimizingQueue( tableService, @@ -455,7 +454,7 @@ public void handleTableRemoved(TableRuntime tableRuntime) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeMetaList) { LOG.info("OptimizerManagementService begin initializing"); loadOptimizingQueues(tableRuntimeMetaList); optimizerKeeper.start(); diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java index aef413d694..fca5226710 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java @@ -19,6 +19,7 @@ package org.apache.amoro.server.dashboard.controller; import io.javalin.http.Context; +import org.apache.amoro.TableFormat; import org.apache.amoro.api.ServerTableIdentifier; import org.apache.amoro.api.resource.Resource; import org.apache.amoro.api.resource.ResourceGroup; @@ -30,6 +31,7 @@ import org.apache.amoro.server.dashboard.response.OkResponse; import org.apache.amoro.server.dashboard.response.PageResult; import org.apache.amoro.server.dashboard.utils.OptimizingUtil; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.ResourceContainers; @@ -67,8 +69,22 @@ public void getOptimizerTables(Context ctx) { Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20); int offset = (page - 1) * pageSize; + String optimizerGroupUsedInDbFilter = "all".equals(optimizerGroup) ? null : optimizerGroup; + // get all info from underlying table table_runtime + List tableRuntimeBeans = + tableService.getTableRuntimes(optimizerGroupUsedInDbFilter, pageSize, offset); + List tableRuntimes = new ArrayList<>(); - List tables = tableService.listManagedTables(); + List tables = new ArrayList<>(); + for (TableRuntimeMeta bean : tableRuntimeBeans) { + tables.add( + ServerTableIdentifier.of( + bean.getTableId(), + bean.getCatalogName(), + bean.getDbName(), + bean.getTableName(), + TableFormat.ICEBERG)); // current hard code for this. + } for (ServerTableIdentifier identifier : tables) { TableRuntime tableRuntime = tableService.getRuntime(identifier); if (tableRuntime == null) { diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index 4752ca62ca..76ce5cb3c2 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -36,7 +36,6 @@ import org.apache.amoro.server.resource.QuotaProvider; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; @@ -93,7 +92,7 @@ public OptimizingQueue( ResourceGroup optimizerGroup, QuotaProvider quotaProvider, Executor planExecutor, - List tableRuntimeMetaList, + List tableRuntimeMetaList, int maxPlanningParallelism) { Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null"); this.planExecutor = planExecutor; @@ -109,11 +108,11 @@ public OptimizingQueue( tableRuntimeMetaList.forEach(this::initTableRuntime); } - private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) { - TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime(); + private void initTableRuntime(TableRuntime tableRuntime) { + // TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime(); if (tableRuntime.getOptimizingStatus().isProcessing() - && tableRuntimeMeta.getOptimizingProcessId() != 0) { - tableRuntime.recover(new TableOptimizingProcess(tableRuntimeMeta)); + && tableRuntime.getOptimizingProcess().getProcessId() != 0) { + tableRuntime.recover(new TableOptimizingProcess(tableRuntime)); } if (tableRuntime.isOptimizingEnabled()) { @@ -122,7 +121,7 @@ private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) { if (!tableRuntime.getOptimizingStatus().isProcessing()) { scheduler.addTable(tableRuntime); } else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) { - tableQueue.offer(new TableOptimizingProcess(tableRuntimeMeta)); + tableQueue.offer(new TableOptimizingProcess(tableRuntime)); } } else { OptimizingProcess process = tableRuntime.getOptimizingProcess(); @@ -376,21 +375,21 @@ public TableOptimizingProcess(OptimizingPlanner planner) { beginAndPersistProcess(); } - public TableOptimizingProcess(TableRuntimeMeta tableRuntimeMeta) { - processId = tableRuntimeMeta.getOptimizingProcessId(); - tableRuntime = tableRuntimeMeta.getTableRuntime(); - optimizingType = tableRuntimeMeta.getOptimizingType(); - targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId(); - targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId(); - planTime = tableRuntimeMeta.getPlanTime(); - if (tableRuntimeMeta.getFromSequence() != null) { - fromSequence = tableRuntimeMeta.getFromSequence(); + public TableOptimizingProcess(TableRuntime tableRuntime) { + processId = tableRuntime.getOptimizingProcess().getProcessId(); + this.tableRuntime = tableRuntime; + optimizingType = tableRuntime.getOptimizingProcess().getOptimizingType(); + targetSnapshotId = tableRuntime.getTargetSnapshotId(); + targetChangeSnapshotId = tableRuntime.getTargetChangeSnapshotId(); + planTime = tableRuntime.getLastPlanTime(); + if (tableRuntime.getFromSequence() != null) { + fromSequence = tableRuntime.getFromSequence(); } - if (tableRuntimeMeta.getToSequence() != null) { - toSequence = tableRuntimeMeta.getToSequence(); + if (tableRuntime.getToSequence() != null) { + toSequence = tableRuntime.getToSequence(); } if (this.status != OptimizingProcess.Status.CLOSED) { - tableRuntimeMeta.getTableRuntime().recover(this); + tableRuntime.recover(this); } loadTaskRuntimes(this); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java similarity index 94% rename from amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java rename to amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java index 92ca2e6a36..ad4c513947 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.table; +package org.apache.amoro.server.persistence; import org.apache.amoro.TableFormat; import org.apache.amoro.api.config.TableConfiguration; @@ -27,6 +27,7 @@ import java.util.Map; +/** The bean class for table used to when transfer data from/to database. */ public class TableRuntimeMeta { private long tableId; private String catalogName; @@ -57,24 +58,8 @@ public class TableRuntimeMeta { private Map fromSequence; private Map toSequence; - private TableRuntime tableRuntime; - public TableRuntimeMeta() {} - public TableRuntime constructTableRuntime(TableManager initializer) { - if (tableRuntime == null) { - tableRuntime = new TableRuntime(this, initializer); - } - return tableRuntime; - } - - public TableRuntime getTableRuntime() { - if (tableRuntime == null) { - throw new IllegalStateException("TableRuntime is not constructed yet."); - } - return tableRuntime; - } - public long getTargetSnapshotId() { return targetSnapshotId; } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java index b01e4ff4c4..28c7264de4 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java @@ -19,13 +19,13 @@ package org.apache.amoro.server.persistence.mapper; import org.apache.amoro.api.ServerTableIdentifier; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.converter.JsonObjectConverter; import org.apache.amoro.server.persistence.converter.Long2TsConverter; import org.apache.amoro.server.persistence.converter.Map2StringConverter; import org.apache.amoro.server.persistence.converter.MapLong2StringConverter; import org.apache.amoro.server.table.TableMetadata; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Options; @@ -424,4 +424,71 @@ List selectTableIdentifiersByCatalog( typeHandler = MapLong2StringConverter.class) }) List selectTableRuntimeMetas(); + + @Select( + "") + @Results({ + @Result(property = "tableId", column = "table_id"), + @Result(property = "catalogName", column = "catalog_name"), + @Result(property = "dbName", column = "db_name"), + @Result(property = "tableName", column = "table_name"), + @Result(property = "currentSnapshotId", column = "current_snapshot_id"), + @Result(property = "currentChangeSnapshotId", column = "current_change_snapshotId"), + @Result(property = "lastOptimizedSnapshotId", column = "last_optimized_snapshotId"), + @Result( + property = "lastOptimizedChangeSnapshotId", + column = "last_optimized_change_snapshotId"), + @Result( + property = "lastMajorOptimizingTime", + column = "last_major_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "lastMinorOptimizingTime", + column = "last_minor_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "lastFullOptimizingTime", + column = "last_full_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result(property = "tableStatus", column = "optimizing_status"), + @Result( + property = "currentStatusStartTime", + column = "optimizing_status_start_time", + typeHandler = Long2TsConverter.class), + @Result(property = "optimizingProcessId", column = "optimizing_process_id"), + @Result(property = "optimizerGroup", column = "optimizer_group"), + @Result( + property = "pendingInput", + column = "pending_input", + typeHandler = JsonObjectConverter.class), + @Result( + property = "tableConfig", + column = "table_config", + typeHandler = JsonObjectConverter.class), + }) + List selectTableRuntimesForOptimizerGroup( + @Param("optimizerGroup") String optimizerGroup, + @Param("limitCount") int limitCount, + @Param("offsetNum") int offset); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index b3266b9d7c..0679f5ae0e 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -40,6 +40,7 @@ import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.persistence.StatedPersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; import org.apache.amoro.server.table.blocker.TableBlocker; @@ -55,6 +56,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -138,12 +140,6 @@ public ServerCatalog getServerCatalog(String catalogName) { .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); } - @Override - public InternalCatalog getInternalCatalog(String catalogName) { - return Optional.ofNullable(internalCatalogMap.get(catalogName)) - .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); - } - @Override public void createCatalog(CatalogMeta catalogMeta) { checkStarted(); @@ -281,6 +277,19 @@ public List getBlockers(TableIdentifier tableIdentifier) { .collect(Collectors.toList()); } + @Override + public List getTableRuntimes(String optimizerGroup, int limit, int offset) { + checkStarted(); + return getAs( + TableMetaMapper.class, + mapper -> mapper.selectTableRuntimesForOptimizerGroup(optimizerGroup, limit, offset)); + } + + public InternalCatalog getInternalCatalog(String catalogName) { + return Optional.ofNullable(internalCatalogMap.get(catalogName)) + .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); + } + @Override public void addHandlerChain(RuntimeHandlerChain handler) { checkNotStarted(); @@ -313,15 +322,19 @@ public void initialize() { List tableRuntimeMetaList = getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas); + List tableRuntimes = new ArrayList<>(tableRuntimeMetaList.size()); tableRuntimeMetaList.forEach( tableRuntimeMeta -> { - TableRuntime tableRuntime = tableRuntimeMeta.constructTableRuntime(this); + TableRuntime tableRuntime = + new TableRuntime( + tableRuntimeMeta, this); // tableRuntimeMeta.constructTableRuntime(this); tableRuntimeMap.put(tableRuntime.getTableIdentifier(), tableRuntime); tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); + tableRuntimes.add(tableRuntime); }); if (headHandler != null) { - headHandler.initialize(tableRuntimeMetaList); + headHandler.initialize(tableRuntimes); } if (tableExplorerExecutors == null) { int threadCount = diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java index bb3e64ceef..3331b427c1 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java @@ -51,16 +51,15 @@ protected void appendNext(RuntimeHandlerChain handler) { } } - public final void initialize(List tableRuntimeMetaList) { - List supportedtableRuntimeMetaList = - tableRuntimeMetaList.stream() - .filter( - tableRuntimeMeta -> formatSupported(tableRuntimeMeta.getTableRuntime().getFormat())) + public final void initialize(List tableRuntimes) { + List supportedtableRuntimeMetaList = + tableRuntimes.stream() + .filter(runtime -> formatSupported(runtime.getFormat())) .collect(Collectors.toList()); initHandler(supportedtableRuntimeMetaList); initialized = true; if (next != null) { - next.initialize(tableRuntimeMetaList); + next.initialize(tableRuntimes); } } @@ -147,7 +146,7 @@ protected abstract void handleConfigChanged( protected abstract void handleTableRemoved(TableRuntime tableRuntime); - protected abstract void initHandler(List tableRuntimeMetaList); + protected abstract void initHandler(List tableRuntimeMetaList); protected abstract void doDispose(); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java index e97143133c..e37ce5151f 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java @@ -35,6 +35,7 @@ import org.apache.amoro.server.optimizing.TaskRuntime; import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator; import org.apache.amoro.server.persistence.StatedPersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.OptimizingMapper; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; @@ -97,7 +98,14 @@ public class TableRuntime extends StatedPersistentBase { private final TableOptimizingMetrics optimizingMetrics; private final ReentrantLock blockerLock = new ReentrantLock(); - protected TableRuntime( + private long targetSnapshotId; + + private long targetChangeSnapshotId; + + private Map fromSequence; + private Map toSequence; + + public TableRuntime( ServerTableIdentifier tableIdentifier, TableRuntimeHandler tableHandler, Map properties) { @@ -111,9 +119,10 @@ protected TableRuntime( optimizingMetrics = new TableOptimizingMetrics(tableIdentifier); } - protected TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler tableHandler) { + public TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler tableHandler) { Preconditions.checkNotNull(tableRuntimeMeta, "TableRuntimeMeta must not be null."); Preconditions.checkNotNull(tableHandler, "TableRuntimeHandler must not be null."); + this.tableHandler = tableHandler; this.tableIdentifier = ServerTableIdentifier.of( @@ -140,6 +149,10 @@ protected TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler ta this.pendingInput = tableRuntimeMeta.getPendingInput(); optimizingMetrics = new TableOptimizingMetrics(tableIdentifier); optimizingMetrics.statusChanged(optimizingStatus, this.currentStatusStartTime); + this.targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId(); + this.targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId(); + this.fromSequence = tableRuntimeMeta.getFromSequence(); + this.toSequence = tableRuntimeMeta.getToSequence(); } public void recover(OptimizingProcess optimizingProcess) { @@ -474,6 +487,38 @@ public void setLastPlanTime(long lastPlanTime) { this.lastPlanTime = lastPlanTime; } + public long getTargetSnapshotId() { + return targetSnapshotId; + } + + public void setTargetSnapshotId(long targetSnapshotId) { + this.targetSnapshotId = targetSnapshotId; + } + + public long getTargetChangeSnapshotId() { + return targetChangeSnapshotId; + } + + public void setTargetChangeSnapshotId(long targetChangeSnapshotId) { + this.targetChangeSnapshotId = targetChangeSnapshotId; + } + + public Map getFromSequence() { + return fromSequence; + } + + public void setFromSequence(Map fromSequence) { + this.fromSequence = fromSequence; + } + + public Map getToSequence() { + return toSequence; + } + + public void setToSequence(Map toSequence) { + this.toSequence = toSequence; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java index 6b87db18b6..44a453882a 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java @@ -23,6 +23,7 @@ import org.apache.amoro.api.ServerTableIdentifier; import org.apache.amoro.api.TableIdentifier; import org.apache.amoro.server.catalog.CatalogService; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import java.util.List; import java.util.Map; @@ -88,4 +89,12 @@ Blocker block( * @return block list */ List getBlockers(TableIdentifier tableIdentifier); + + /** + * @param optimizerGroup The optimizer group info used to retrieve the table infos. will be null + * if we want the info for all groups. + * @param limit How many entries we want to retrieve. + * @param offset The entries we'll skip when retrieving the entries. + */ + List getTableRuntimes(String optimizerGroup, int limit, int offset); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java index 29edacac92..879e6dcd30 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java @@ -25,7 +25,6 @@ import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -63,9 +62,9 @@ protected BaseTableExecutor(TableManager tableManager, int poolSize) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeMetaList) { tableRuntimeMetaList.stream() - .map(tableRuntimeMeta -> tableRuntimeMeta.getTableRuntime()) + // .map(tableRuntimeMeta -> tableRuntimeMeta.getTableRuntime()) .filter(tableRuntime -> enabled(tableRuntime)) .forEach( tableRuntime -> { diff --git a/amoro-ams/amoro-ams-server/src/main/resources/log4j2.xml b/amoro-ams/amoro-ams-server/src/main/resources/log4j2.xml index 7602b18332..6a03ab2b31 100644 --- a/amoro-ams/amoro-ams-server/src/main/resources/log4j2.xml +++ b/amoro-ams/amoro-ams-server/src/main/resources/log4j2.xml @@ -40,6 +40,7 @@ + diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java index 25e24565ee..62bd0fba53 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java @@ -45,12 +45,12 @@ import org.apache.amoro.optimizing.TableOptimizing; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.metrics.MetricRegistry; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.OptimizerThread; import org.apache.amoro.server.resource.QuotaProvider; import org.apache.amoro.server.table.AMSTableTestBase; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.MixedTable; @@ -103,7 +103,7 @@ protected static ResourceGroup testResourceGroup() { return new ResourceGroup.Builder("test", "local").build(); } - protected OptimizingQueue buildOptimizingGroupService(TableRuntimeMeta tableRuntimeMeta) { + protected OptimizingQueue buildOptimizingGroupService(TableRuntime tableRuntimeMeta) { return new OptimizingQueue( tableService(), testResourceGroup(), @@ -125,7 +125,7 @@ private OptimizingQueue buildOptimizingGroupService() { @Test public void testPollNoTask() { - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntimeMeta = buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup()); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); Assert.assertNull(queue.pollTask(0)); @@ -136,24 +136,24 @@ public void testPollNoTask() { public void testRefreshAndReleaseTable() { OptimizingQueue queue = buildOptimizingGroupService(); Assert.assertEquals(0, queue.getSchedulingPolicy().getTableRuntimeMap().size()); - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntimeMeta = buildTableRuntimeMeta(OptimizingStatus.IDLE, defaultResourceGroup()); - queue.refreshTable(tableRuntimeMeta.getTableRuntime()); + queue.refreshTable(tableRuntimeMeta); Assert.assertEquals(1, queue.getSchedulingPolicy().getTableRuntimeMap().size()); Assert.assertTrue( queue.getSchedulingPolicy().getTableRuntimeMap().containsKey(serverTableIdentifier())); - queue.releaseTable(tableRuntimeMeta.getTableRuntime()); + queue.releaseTable(tableRuntimeMeta); Assert.assertEquals(0, queue.getSchedulingPolicy().getTableRuntimeMap().size()); - queue.refreshTable(tableRuntimeMeta.getTableRuntime()); + queue.refreshTable(tableRuntimeMeta); Assert.assertEquals(1, queue.getSchedulingPolicy().getTableRuntimeMap().size()); queue.dispose(); } @Test public void testPollTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); + TableRuntime tableRuntimeMeta = initTableWithFiles(); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); // 1.poll task @@ -167,7 +167,7 @@ public void testPollTask() { @Test public void testRetryTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); + TableRuntime tableRuntimeMeta = initTableWithFiles(); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); // 1.poll task @@ -200,7 +200,7 @@ public void testRetryTask() { @Test public void testCommitTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); + TableRuntime tableRuntimeMeta = initTableWithFiles(); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); Assert.assertEquals(0, queue.collectTasks().size()); @@ -216,11 +216,11 @@ public void testCommitTask() { Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus()); // 7.commit - OptimizingProcess optimizingProcess = tableRuntimeMeta.getTableRuntime().getOptimizingProcess(); + OptimizingProcess optimizingProcess = tableRuntimeMeta.getOptimizingProcess(); Assert.assertEquals(OptimizingProcess.Status.RUNNING, optimizingProcess.getStatus()); optimizingProcess.commit(); Assert.assertEquals(OptimizingProcess.Status.SUCCESS, optimizingProcess.getStatus()); - Assert.assertNull(tableRuntimeMeta.getTableRuntime().getOptimizingProcess()); + Assert.assertNull(tableRuntimeMeta.getOptimizingProcess()); // 8.commit again, throw exceptions, and status not changed. Assert.assertThrows(IllegalStateException.class, optimizingProcess::commit); @@ -232,7 +232,7 @@ public void testCommitTask() { @Test public void testCollectingTasks() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); + TableRuntime tableRuntimeMeta = initTableWithFiles(); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); Assert.assertEquals(0, queue.collectTasks().size()); @@ -247,7 +247,7 @@ public void testCollectingTasks() { @Test public void testTaskAndTableMetrics() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); + TableRuntime tableRuntimeMeta = initTableWithFiles(); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); MetricRegistry registry = MetricManager.getInstance().getGlobalRegistry(); Map tagValues = ImmutableMap.of(GROUP_TAG, testResourceGroup().getName()); @@ -299,7 +299,7 @@ public void testTaskAndTableMetrics() { Assert.assertEquals(0, pendingTablesGauge.getValue().longValue()); Assert.assertEquals(1, executingTablesGauge.getValue().longValue()); - OptimizingProcess optimizingProcess = tableRuntimeMeta.getTableRuntime().getOptimizingProcess(); + OptimizingProcess optimizingProcess = tableRuntimeMeta.getOptimizingProcess(); optimizingProcess.commit(); Assert.assertEquals(0, queueTasksGauge.getValue().longValue()); Assert.assertEquals(0, executingTasksGauge.getValue().longValue()); @@ -345,21 +345,20 @@ public void testAddAndRemoveOptimizers() { queue.dispose(); } - protected TableRuntimeMeta initTableWithFiles() { + protected TableRuntime initTableWithFiles() { MixedTable mixedTable = (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); appendData(mixedTable.asUnkeyedTable(), 1); appendData(mixedTable.asUnkeyedTable(), 2); - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntimeMeta = buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup()); - TableRuntime runtime = tableRuntimeMeta.getTableRuntime(); + // TableRuntime runtime = tableRuntimeMeta.getTableRuntime(); - runtime.refresh(tableService().loadTable(serverTableIdentifier())); + tableRuntimeMeta.refresh(tableService().loadTable(serverTableIdentifier())); return tableRuntimeMeta; } - private TableRuntimeMeta buildTableRuntimeMeta( - OptimizingStatus status, ResourceGroup resourceGroup) { + private TableRuntime buildTableRuntimeMeta(OptimizingStatus status, ResourceGroup resourceGroup) { MixedTable mixedTable = (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta(); @@ -371,8 +370,9 @@ private TableRuntimeMeta buildTableRuntimeMeta( tableRuntimeMeta.setTableStatus(status); tableRuntimeMeta.setTableConfig(TableConfiguration.parseConfig(mixedTable.properties())); tableRuntimeMeta.setOptimizerGroup(resourceGroup.getName()); - tableRuntimeMeta.constructTableRuntime(tableService()); - return tableRuntimeMeta; + // tableRuntimeMeta.constructTableRuntime(tableService()); + TableRuntime runtime = new TableRuntime(tableRuntimeMeta, tableService()); + return runtime; } private void appendData(UnkeyedTable table, int id) { diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java index c641fb258d..a5b5d5a02b 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java @@ -92,7 +92,8 @@ public void testInitialize() throws Exception { tableService.initialize(); Assert.assertEquals(1, handler.getInitTables().size()); Assert.assertEquals( - createTableId.getId().longValue(), handler.getInitTables().get(0).getTableId()); + (Long) createTableId.getId().longValue(), + handler.getInitTables().get(0).getTableIdentifier().getId()); // test change properties MixedTable mixedTable = (MixedTable) tableService().loadTable(createTableId).originalTable(); @@ -131,7 +132,7 @@ protected DefaultTableService tableService() { static class TestHandler extends RuntimeHandlerChain { - private final List initTables = Lists.newArrayList(); + private final List initTables = Lists.newArrayList(); private final List> statusChangedTables = Lists.newArrayList(); private final List> configChangedTables = @@ -162,7 +163,7 @@ protected void handleTableRemoved(TableRuntime tableRuntime) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeMetaList) { initTables.addAll(tableRuntimeMetaList); } @@ -171,7 +172,7 @@ protected void doDispose() { disposed = true; } - public List getInitTables() { + public List getInitTables() { return initTables; } diff --git a/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml b/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml index cdea700842..6eed3a2f4b 100644 --- a/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml +++ b/amoro-ams/dist/src/main/amoro-bin/conf/config.yaml @@ -88,9 +88,6 @@ ams: type: derby jdbc-driver-class: org.apache.derby.jdbc.EmbeddedDriver url: jdbc:derby:/tmp/amoro/derby;create=true - connection-pool-max-total: 20 - connection-pool-max-idle: 16 - connection-pool-max-wait-millis: 1000 # MySQL database configuration. # database: