From 779fc062e4724142ed6148ce6a993878bce6c9ec Mon Sep 17 00:00:00 2001 From: klion26 Date: Thu, 6 Jun 2024 11:26:16 +0800 Subject: [PATCH] [AMORO-2893] Optimizing the efficiency for restapi retrieve optimizing tables Befor the change we'll retrieve all tables from db and sort in the memory, this will be slow if there are many entriesin the db, after this change we sort in the db and only retrieve the returned entries. --- amoro-ams/amoro-ams-dashboard/pom.xml | 11 +++ .../server/DefaultOptimizingService.java | 11 ++- .../controller/OptimizerController.java | 42 +++------ .../dashboard/controller/TableController.java | 6 +- .../server/optimizing/OptimizingQueue.java | 36 ++++---- .../server/optimizing/OptimizingStatus.java | 31 +++++-- .../TableRuntimeMeta.java | 19 +--- .../converter/OptimizingStatusConverter.java | 61 +++++++++++++ .../persistence/mapper/TableMetaMapper.java | 87 +++++++++++++++++-- .../server/table/DefaultTableService.java | 61 ++++++++----- .../server/table/RuntimeHandlerChain.java | 13 ++- .../amoro/server/table/TableManager.java | 6 +- .../amoro/server/table/TableRuntime.java | 53 ++++++++++- .../amoro/server/table/TableService.java | 21 +++++ .../table/executor/BaseTableExecutor.java | 9 +- .../main/resources/derby/ams-derby-init.sql | 2 +- .../mysql/upgrade-0.7.0-to-0.7.1.sql | 24 +++++ .../server/RestCatalogServiceTestBase.java | 2 +- .../server/TestDefaultOptimizingService.java | 11 ++- .../optimizing/OptimizingStatusTest.java | 38 ++++++++ .../optimizing/TestOptimizingQueue.java | 53 ++++++----- .../server/table/TestTableRuntimeHandler.java | 11 +-- .../server/table/TestTableRuntimeManager.java | 8 +- .../amoro/server/table/TestTableService.java | 2 +- 24 files changed, 448 insertions(+), 170 deletions(-) rename amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/{table => persistence}/TableRuntimeMeta.java (94%) create mode 100644 amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java create mode 100644 amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.7.0-to-0.7.1.sql create mode 100644 amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java diff --git a/amoro-ams/amoro-ams-dashboard/pom.xml b/amoro-ams/amoro-ams-dashboard/pom.xml index 1a5e2fb20a..9d8f31c6b1 100644 --- a/amoro-ams/amoro-ams-dashboard/pom.xml +++ b/amoro-ams/amoro-ams-dashboard/pom.xml @@ -140,5 +140,16 @@ + + use-alibaba-mirror + + + use-alibaba-mirror + + + + --registry http://172.17.0.1:8888/repository/npm/ + + diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index 10e6ecea98..00bd1d9b7b 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -49,7 +49,6 @@ import org.apache.amoro.server.table.DefaultTableService; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; @@ -121,17 +120,17 @@ public RuntimeHandlerChain getTableRuntimeHandler() { return tableHandlerChain; } - private void loadOptimizingQueues(List tableRuntimeMetaList) { + private void loadOptimizingQueues(List tableRuntimeMetaList) { List optimizerGroups = getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups); List optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll); - Map> groupToTableRuntimes = + Map> groupToTableRuntimes = tableRuntimeMetaList.stream() - .collect(Collectors.groupingBy(TableRuntimeMeta::getOptimizerGroup)); + .collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup)); optimizerGroups.forEach( group -> { String groupName = group.getName(); - List tableRuntimeMetas = groupToTableRuntimes.remove(groupName); + List tableRuntimeMetas = groupToTableRuntimes.remove(groupName); OptimizingQueue optimizingQueue = new OptimizingQueue( tableService, @@ -455,7 +454,7 @@ public void handleTableRemoved(TableRuntime tableRuntime) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeMetaList) { LOG.info("OptimizerManagementService begin initializing"); loadOptimizingQueues(tableRuntimeMetaList); optimizerKeeper.start(); diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java index aef413d694..e57638f8b4 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java @@ -19,7 +19,6 @@ package org.apache.amoro.server.dashboard.controller; import io.javalin.http.Context; -import org.apache.amoro.api.ServerTableIdentifier; import org.apache.amoro.api.resource.Resource; import org.apache.amoro.api.resource.ResourceGroup; import org.apache.amoro.api.resource.ResourceType; @@ -30,13 +29,13 @@ import org.apache.amoro.server.dashboard.response.OkResponse; import org.apache.amoro.server.dashboard.response.PageResult; import org.apache.amoro.server.dashboard.utils.OptimizingUtil; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.ResourceContainers; import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; import javax.ws.rs.BadRequestException; @@ -67,34 +66,17 @@ public void getOptimizerTables(Context ctx) { Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20); int offset = (page - 1) * pageSize; - List tableRuntimes = new ArrayList<>(); - List tables = tableService.listManagedTables(); - for (ServerTableIdentifier identifier : tables) { - TableRuntime tableRuntime = tableService.getRuntime(identifier); - if (tableRuntime == null) { - continue; - } - if ((ALL_GROUP.equals(optimizerGroup) - || tableRuntime.getOptimizerGroup().equals(optimizerGroup)) - && (StringUtils.isEmpty(dbFilterStr) - || StringUtils.containsIgnoreCase(identifier.getDatabase(), dbFilterStr)) - && (StringUtils.isEmpty(tableFilterStr) - || StringUtils.containsIgnoreCase(identifier.getTableName(), tableFilterStr))) { - tableRuntimes.add(tableRuntime); - } - } - tableRuntimes.sort( - (o1, o2) -> { - // first we compare the status , and then we compare the start time when status are equal; - int statDiff = o1.getOptimizingStatus().compareTo(o2.getOptimizingStatus()); - // status order is asc, startTime order is desc - if (statDiff == 0) { - long timeDiff = o1.getCurrentStatusStartTime() - o2.getCurrentStatusStartTime(); - return timeDiff >= 0 ? (timeDiff == 0 ? 0 : -1) : 1; - } else { - return statDiff; - } - }); + String optimizerGroupUsedInDbFilter = "all".equals(optimizerGroup) ? null : optimizerGroup; + // get all info from underlying table table_runtime + List tableRuntimeBeans = + tableService.getTableRuntimes( + optimizerGroupUsedInDbFilter, dbFilterStr, tableFilterStr, pageSize, offset); + + List tableRuntimes = + tableRuntimeBeans.stream() + .map(meta -> tableService.getRuntime(meta.getTableId())) + .collect(Collectors.toList()); + PageResult amsPageResult = PageResult.of(tableRuntimes, offset, pageSize, OptimizingUtil::buildTableOptimizeInfo); ctx.json(OkResponse.of(amsPageResult)); diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java index 8d6884f47f..dfcea2982b 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java @@ -148,7 +148,7 @@ public void getTableDetail(Context ctx) { if (serverTableIdentifier.isPresent()) { tableSummary.put( "optimizingStatus", - tableService.getRuntime(serverTableIdentifier.get()).getOptimizingStatus()); + tableService.getRuntime(serverTableIdentifier.get().getId()).getOptimizingStatus()); } else { tableSummary.put("optimizingStatus", OptimizingStatus.IDLE); } @@ -650,7 +650,9 @@ public void cancelOptimizingProcess(Context ctx) { tableService.getServerTableIdentifier( TableIdentifier.of(catalog, db, table).buildTableIdentifier()); TableRuntime tableRuntime = - serverTableIdentifier != null ? tableService.getRuntime(serverTableIdentifier) : null; + serverTableIdentifier != null + ? tableService.getRuntime(serverTableIdentifier.getId()) + : null; Preconditions.checkArgument( tableRuntime != null diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index 4752ca62ca..e0cf814abd 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -36,7 +36,6 @@ import org.apache.amoro.server.resource.QuotaProvider; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; @@ -93,7 +92,7 @@ public OptimizingQueue( ResourceGroup optimizerGroup, QuotaProvider quotaProvider, Executor planExecutor, - List tableRuntimeMetaList, + List tableRuntimeMetaList, int maxPlanningParallelism) { Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null"); this.planExecutor = planExecutor; @@ -109,11 +108,10 @@ public OptimizingQueue( tableRuntimeMetaList.forEach(this::initTableRuntime); } - private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) { - TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime(); + private void initTableRuntime(TableRuntime tableRuntime) { if (tableRuntime.getOptimizingStatus().isProcessing() - && tableRuntimeMeta.getOptimizingProcessId() != 0) { - tableRuntime.recover(new TableOptimizingProcess(tableRuntimeMeta)); + && tableRuntime.getOptimizingProcess().getProcessId() != 0) { + tableRuntime.recover(new TableOptimizingProcess(tableRuntime)); } if (tableRuntime.isOptimizingEnabled()) { @@ -122,7 +120,7 @@ private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) { if (!tableRuntime.getOptimizingStatus().isProcessing()) { scheduler.addTable(tableRuntime); } else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) { - tableQueue.offer(new TableOptimizingProcess(tableRuntimeMeta)); + tableQueue.offer(new TableOptimizingProcess(tableRuntime)); } } else { OptimizingProcess process = tableRuntime.getOptimizingProcess(); @@ -376,21 +374,21 @@ public TableOptimizingProcess(OptimizingPlanner planner) { beginAndPersistProcess(); } - public TableOptimizingProcess(TableRuntimeMeta tableRuntimeMeta) { - processId = tableRuntimeMeta.getOptimizingProcessId(); - tableRuntime = tableRuntimeMeta.getTableRuntime(); - optimizingType = tableRuntimeMeta.getOptimizingType(); - targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId(); - targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId(); - planTime = tableRuntimeMeta.getPlanTime(); - if (tableRuntimeMeta.getFromSequence() != null) { - fromSequence = tableRuntimeMeta.getFromSequence(); + public TableOptimizingProcess(TableRuntime tableRuntime) { + processId = tableRuntime.getOptimizingProcess().getProcessId(); + this.tableRuntime = tableRuntime; + optimizingType = tableRuntime.getOptimizingProcess().getOptimizingType(); + targetSnapshotId = tableRuntime.getTargetSnapshotId(); + targetChangeSnapshotId = tableRuntime.getTargetChangeSnapshotId(); + planTime = tableRuntime.getLastPlanTime(); + if (tableRuntime.getFromSequence() != null) { + fromSequence = tableRuntime.getFromSequence(); } - if (tableRuntimeMeta.getToSequence() != null) { - toSequence = tableRuntimeMeta.getToSequence(); + if (tableRuntime.getToSequence() != null) { + toSequence = tableRuntime.getToSequence(); } if (this.status != OptimizingProcess.Status.CLOSED) { - tableRuntimeMeta.getTableRuntime().recover(this); + tableRuntime.recover(this); } loadTaskRuntimes(this); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java index 7faca66261..c9d3c152ab 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java @@ -19,20 +19,23 @@ package org.apache.amoro.server.optimizing; public enum OptimizingStatus { - FULL_OPTIMIZING("full", true), - MAJOR_OPTIMIZING("major", true), - MINOR_OPTIMIZING("minor", true), - COMMITTING("committing", true), - PLANNING("planning", false), - PENDING("pending", false), - IDLE("idle", false); + FULL_OPTIMIZING("full", true, 0), + MAJOR_OPTIMIZING("major", true, 1), + MINOR_OPTIMIZING("minor", true, 2), + COMMITTING("committing", true, 3), + PLANNING("planning", false, 4), + PENDING("pending", false, 5), + IDLE("idle", false, 6); private final String displayValue; private final boolean isProcessing; - OptimizingStatus(String displayValue, boolean isProcessing) { + private final int code; + + OptimizingStatus(String displayValue, boolean isProcessing, int code) { this.displayValue = displayValue; this.isProcessing = isProcessing; + this.code = code; } public boolean isProcessing() { @@ -42,4 +45,16 @@ public boolean isProcessing() { public String displayValue() { return displayValue; } + + public int getCode() { + return code; + } + + public static OptimizingStatus ofCode(int code) { + if (code >= 0 && code <= values().length) { + return values()[code]; + } + + return null; + } } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java similarity index 94% rename from amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java rename to amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java index 92ca2e6a36..51d7f0fda1 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.table; +package org.apache.amoro.server.persistence; import org.apache.amoro.TableFormat; import org.apache.amoro.api.config.TableConfiguration; @@ -27,6 +27,7 @@ import java.util.Map; +/** The class for table used when transfer data from/to database. */ public class TableRuntimeMeta { private long tableId; private String catalogName; @@ -57,24 +58,8 @@ public class TableRuntimeMeta { private Map fromSequence; private Map toSequence; - private TableRuntime tableRuntime; - public TableRuntimeMeta() {} - public TableRuntime constructTableRuntime(TableManager initializer) { - if (tableRuntime == null) { - tableRuntime = new TableRuntime(this, initializer); - } - return tableRuntime; - } - - public TableRuntime getTableRuntime() { - if (tableRuntime == null) { - throw new IllegalStateException("TableRuntime is not constructed yet."); - } - return tableRuntime; - } - public long getTargetSnapshotId() { return targetSnapshotId; } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java new file mode 100644 index 0000000000..d314bb41bc --- /dev/null +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence.converter; + +import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.ibatis.type.BaseTypeHandler; +import org.apache.ibatis.type.JdbcType; +import org.apache.ibatis.type.MappedJdbcTypes; +import org.apache.ibatis.type.MappedTypes; + +import java.sql.CallableStatement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +@MappedJdbcTypes(JdbcType.INTEGER) +@MappedTypes(Enum.class) +public class OptimizingStatusConverter extends BaseTypeHandler { + + @Override + public void setNonNullParameter( + PreparedStatement ps, int i, OptimizingStatus parameter, JdbcType jdbcType) + throws SQLException { + ps.setInt(i, parameter.getCode()); + } + + @Override + public OptimizingStatus getNullableResult(ResultSet rs, String columnName) throws SQLException { + String s = rs.getString(columnName); + return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s)); + } + + @Override + public OptimizingStatus getNullableResult(ResultSet rs, int columnIndex) throws SQLException { + String s = rs.getString(columnIndex); + return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s)); + } + + @Override + public OptimizingStatus getNullableResult(CallableStatement cs, int columnIndex) + throws SQLException { + String s = cs.getString(columnIndex); + return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s)); + } +} diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java index b01e4ff4c4..5a9dc52a99 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java @@ -19,13 +19,14 @@ package org.apache.amoro.server.persistence.mapper; import org.apache.amoro.api.ServerTableIdentifier; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.converter.JsonObjectConverter; import org.apache.amoro.server.persistence.converter.Long2TsConverter; import org.apache.amoro.server.persistence.converter.Map2StringConverter; import org.apache.amoro.server.persistence.converter.MapLong2StringConverter; +import org.apache.amoro.server.persistence.converter.OptimizingStatusConverter; import org.apache.amoro.server.table.TableMetadata; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Options; @@ -321,7 +322,8 @@ List selectTableIdentifiersByCatalog( + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " last_full_optimizing_time = #{runtime.lastFullOptimizingTime," + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," - + " optimizing_status = #{runtime.optimizingStatus}," + + " optimizing_status_code = #{runtime.optimizingStatu," + + "typeHandler=org.apache.amoro.server.persistence.converter.OptimizingStatusConverter}," + " optimizing_status_start_time = #{runtime.currentStatusStartTime," + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " optimizing_process_id = #{runtime.processId}," @@ -340,7 +342,7 @@ List selectTableIdentifiersByCatalog( "INSERT INTO table_runtime (table_id, catalog_name, db_name, table_name, current_snapshot_id," + " current_change_snapshotId, last_optimized_snapshotId, last_optimized_change_snapshotId," + " last_major_optimizing_time, last_minor_optimizing_time," - + " last_full_optimizing_time, optimizing_status, optimizing_status_start_time, optimizing_process_id," + + " last_full_optimizing_time, optimizing_status_code, optimizing_status_start_time, optimizing_process_id," + " optimizer_group, table_config, pending_input) VALUES" + " (#{runtime.tableIdentifier.id}, #{runtime.tableIdentifier.catalog}," + " #{runtime.tableIdentifier.database}, #{runtime.tableIdentifier.tableName}, #{runtime" @@ -352,7 +354,8 @@ List selectTableIdentifiersByCatalog( + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " #{runtime.lastFullOptimizingTime," + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," - + " #{runtime.optimizingStatus}," + + " #{runtime.optimizingStatus," + + " typeHandler=org.apache.amoro.server.persistence.converter.OptimizingStatusConverter}," + " #{runtime.currentStatusStartTime, " + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " #{runtime.processId}, #{runtime.optimizerGroup}," @@ -365,7 +368,7 @@ List selectTableIdentifiersByCatalog( @Select( "SELECT a.table_id, a.catalog_name, a.db_name, a.table_name, i.format, a.current_snapshot_id, a" + ".current_change_snapshotId, a.last_optimized_snapshotId, a.last_optimized_change_snapshotId," - + " a.last_major_optimizing_time, a.last_minor_optimizing_time, a.last_full_optimizing_time, a.optimizing_status," + + " a.last_major_optimizing_time, a.last_minor_optimizing_time, a.last_full_optimizing_time, a.optimizing_status_code," + " a.optimizing_status_start_time, a.optimizing_process_id," + " a.optimizer_group, a.table_config, a.pending_input, b.optimizing_type, b.target_snapshot_id," + " b.target_change_snapshot_id, b.plan_time, b.from_sequence, b.to_sequence FROM table_runtime a" @@ -395,7 +398,10 @@ List selectTableIdentifiersByCatalog( property = "lastFullOptimizingTime", column = "last_full_optimizing_time", typeHandler = Long2TsConverter.class), - @Result(property = "tableStatus", column = "optimizing_status"), + @Result( + property = "tableStatus", + column = "optimizing_status_code", + typeHandler = OptimizingStatusConverter.class), @Result( property = "currentStatusStartTime", column = "optimizing_status_start_time", @@ -424,4 +430,73 @@ List selectTableIdentifiersByCatalog( typeHandler = MapLong2StringConverter.class) }) List selectTableRuntimeMetas(); + + @Select( + "") + @Results({ + @Result(property = "tableId", column = "table_id"), + @Result(property = "catalogName", column = "catalog_name"), + @Result(property = "dbName", column = "db_name"), + @Result(property = "tableName", column = "table_name"), + @Result(property = "currentSnapshotId", column = "current_snapshot_id"), + @Result(property = "currentChangeSnapshotId", column = "current_change_snapshotId"), + @Result(property = "lastOptimizedSnapshotId", column = "last_optimized_snapshotId"), + @Result( + property = "lastOptimizedChangeSnapshotId", + column = "last_optimized_change_snapshotId"), + @Result( + property = "lastMajorOptimizingTime", + column = "last_major_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "lastMinorOptimizingTime", + column = "last_minor_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "lastFullOptimizingTime", + column = "last_full_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "tableStatus", + column = "optimizing_status_code", + typeHandler = OptimizingStatusConverter.class), + @Result( + property = "currentStatusStartTime", + column = "optimizing_status_start_time", + typeHandler = Long2TsConverter.class), + @Result(property = "optimizingProcessId", column = "optimizing_process_id"), + @Result(property = "optimizerGroup", column = "optimizer_group"), + @Result( + property = "pendingInput", + column = "pending_input", + typeHandler = JsonObjectConverter.class), + @Result( + property = "tableConfig", + column = "table_config", + typeHandler = JsonObjectConverter.class), + }) + List selectTableRuntimesForOptimizerGroup( + @Param("optimizerGroup") String optimizerGroup, + @Param("fuzzyDbName") String fuzzyDbName, + @Param("fuzzyTableName") String fuzzyTableName, + @Param("limitCount") int limitCount, + @Param("offsetNum") int offset); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index b3266b9d7c..f18c524bde 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -40,6 +40,7 @@ import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.persistence.StatedPersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; import org.apache.amoro.server.table.blocker.TableBlocker; @@ -55,6 +56,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -80,8 +84,7 @@ public class DefaultTableService extends StatedPersistentBase implements TableSe private final Map internalCatalogMap = new ConcurrentHashMap<>(); private final Map externalCatalogMap = new ConcurrentHashMap<>(); - private final Map tableRuntimeMap = - new ConcurrentHashMap<>(); + private final Map tableRuntimeMap = new ConcurrentHashMap<>(); private final ScheduledExecutorService tableExplorerScheduler = Executors.newSingleThreadScheduledExecutor( @@ -138,12 +141,6 @@ public ServerCatalog getServerCatalog(String catalogName) { .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); } - @Override - public InternalCatalog getInternalCatalog(String catalogName) { - return Optional.ofNullable(internalCatalogMap.get(catalogName)) - .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); - } - @Override public void createCatalog(CatalogMeta catalogMeta) { checkStarted(); @@ -208,7 +205,7 @@ public void dropTableMetadata(TableIdentifier tableIdentifier, boolean deleteDat } ServerTableIdentifier serverTableIdentifier = internalCatalog.dropTable(database, table); - Optional.ofNullable(tableRuntimeMap.remove(serverTableIdentifier)) + Optional.ofNullable(tableRuntimeMap.remove(serverTableIdentifier.getId())) .ifPresent( tableRuntime -> { if (headHandler != null) { @@ -260,7 +257,7 @@ public Blocker block( @Override public void releaseBlocker(TableIdentifier tableIdentifier, String blockerId) { checkStarted(); - TableRuntime tableRuntime = getRuntime(getServerTableIdentifier(tableIdentifier)); + TableRuntime tableRuntime = getRuntime(getServerTableIdentifier(tableIdentifier).getId()); if (tableRuntime != null) { tableRuntime.release(blockerId); } @@ -281,6 +278,26 @@ public List getBlockers(TableIdentifier tableIdentifier) { .collect(Collectors.toList()); } + @Override + public List getTableRuntimes( + String optimizerGroup, + @Nullable String fuzzyDbName, + @Nullable String fuzzyTableName, + int limit, + int offset) { + checkStarted(); + return getAs( + TableMetaMapper.class, + mapper -> + mapper.selectTableRuntimesForOptimizerGroup( + optimizerGroup, fuzzyDbName, fuzzyTableName, limit, offset)); + } + + public InternalCatalog getInternalCatalog(String catalogName) { + return Optional.ofNullable(internalCatalogMap.get(catalogName)) + .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); + } + @Override public void addHandlerChain(RuntimeHandlerChain handler) { checkNotStarted(); @@ -313,15 +330,17 @@ public void initialize() { List tableRuntimeMetaList = getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas); + List tableRuntimes = new ArrayList<>(tableRuntimeMetaList.size()); tableRuntimeMetaList.forEach( tableRuntimeMeta -> { - TableRuntime tableRuntime = tableRuntimeMeta.constructTableRuntime(this); - tableRuntimeMap.put(tableRuntime.getTableIdentifier(), tableRuntime); + TableRuntime tableRuntime = new TableRuntime(tableRuntimeMeta, this); + tableRuntimeMap.put(tableRuntimeMeta.getTableId(), tableRuntime); tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); + tableRuntimes.add(tableRuntime); }); if (headHandler != null) { - headHandler.initialize(tableRuntimeMetaList); + headHandler.initialize(tableRuntimes); } if (tableExplorerExecutors == null) { int threadCount = @@ -348,7 +367,7 @@ public void initialize() { private TableRuntime getAndCheckExist(ServerTableIdentifier tableIdentifier) { Preconditions.checkArgument(tableIdentifier != null, "tableIdentifier cannot be null"); - TableRuntime tableRuntime = getRuntime(tableIdentifier); + TableRuntime tableRuntime = getRuntime(tableIdentifier.getId()); if (tableRuntime == null) { throw new ObjectNotExistsException(tableIdentifier); } @@ -384,15 +403,15 @@ private ServerTableIdentifier getOrSyncServerTableIdentifier(TableIdentifier id) } @Override - public TableRuntime getRuntime(ServerTableIdentifier tableIdentifier) { + public TableRuntime getRuntime(Long tableId) { checkStarted(); - return tableRuntimeMap.get(tableIdentifier); + return tableRuntimeMap.get(tableId); } @Override - public boolean contains(ServerTableIdentifier tableIdentifier) { + public boolean contains(Long tableId) { checkStarted(); - return tableRuntimeMap.containsKey(tableIdentifier); + return tableRuntimeMap.containsKey(tableId); } public void dispose() { @@ -582,7 +601,7 @@ private boolean triggerTableAdded( } } TableRuntime tableRuntime = new TableRuntime(serverTableIdentifier, this, table.properties()); - tableRuntimeMap.put(serverTableIdentifier, tableRuntime); + tableRuntimeMap.put(serverTableIdentifier.getId(), tableRuntime); tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); if (headHandler != null) { headHandler.fireTableAdded(table, tableRuntime); @@ -596,7 +615,7 @@ private void revertTableRuntimeAdded( externalCatalog.getServerTableIdentifier( tableIdentity.getDatabase(), tableIdentity.getTableName()); if (tableIdentifier != null) { - tableRuntimeMap.remove(tableIdentifier); + tableRuntimeMap.remove(tableIdentifier.getId()); } } @@ -608,7 +627,7 @@ private void disposeTable(ServerTableIdentifier tableIdentifier) { tableIdentifier.getCatalog(), tableIdentifier.getDatabase(), tableIdentifier.getTableName())); - Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier)) + Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier.getId())) .ifPresent( tableRuntime -> { if (headHandler != null) { diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java index bb3e64ceef..3331b427c1 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java @@ -51,16 +51,15 @@ protected void appendNext(RuntimeHandlerChain handler) { } } - public final void initialize(List tableRuntimeMetaList) { - List supportedtableRuntimeMetaList = - tableRuntimeMetaList.stream() - .filter( - tableRuntimeMeta -> formatSupported(tableRuntimeMeta.getTableRuntime().getFormat())) + public final void initialize(List tableRuntimes) { + List supportedtableRuntimeMetaList = + tableRuntimes.stream() + .filter(runtime -> formatSupported(runtime.getFormat())) .collect(Collectors.toList()); initHandler(supportedtableRuntimeMetaList); initialized = true; if (next != null) { - next.initialize(tableRuntimeMetaList); + next.initialize(tableRuntimes); } } @@ -147,7 +146,7 @@ protected abstract void handleConfigChanged( protected abstract void handleTableRemoved(TableRuntime tableRuntime); - protected abstract void initHandler(List tableRuntimeMetaList); + protected abstract void initHandler(List tableRuntimeMetaList); protected abstract void doDispose(); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableManager.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableManager.java index d5fffcdf82..65a1aa3456 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableManager.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableManager.java @@ -31,9 +31,9 @@ public interface TableManager extends TableRuntimeHandler { */ AmoroTable loadTable(ServerTableIdentifier tableIdentifier); - TableRuntime getRuntime(ServerTableIdentifier tableIdentifier); + TableRuntime getRuntime(Long tableId); - default boolean contains(ServerTableIdentifier tableIdentifier) { - return getRuntime(tableIdentifier) != null; + default boolean contains(Long tableId) { + return getRuntime(tableId) != null; } } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java index e97143133c..fd335bf3ce 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java @@ -35,6 +35,7 @@ import org.apache.amoro.server.optimizing.TaskRuntime; import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator; import org.apache.amoro.server.persistence.StatedPersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.OptimizingMapper; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; @@ -97,7 +98,14 @@ public class TableRuntime extends StatedPersistentBase { private final TableOptimizingMetrics optimizingMetrics; private final ReentrantLock blockerLock = new ReentrantLock(); - protected TableRuntime( + private long targetSnapshotId; + + private long targetChangeSnapshotId; + + private Map fromSequence; + private Map toSequence; + + public TableRuntime( ServerTableIdentifier tableIdentifier, TableRuntimeHandler tableHandler, Map properties) { @@ -111,9 +119,10 @@ protected TableRuntime( optimizingMetrics = new TableOptimizingMetrics(tableIdentifier); } - protected TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler tableHandler) { + public TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler tableHandler) { Preconditions.checkNotNull(tableRuntimeMeta, "TableRuntimeMeta must not be null."); Preconditions.checkNotNull(tableHandler, "TableRuntimeHandler must not be null."); + this.tableHandler = tableHandler; this.tableIdentifier = ServerTableIdentifier.of( @@ -140,6 +149,10 @@ protected TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler ta this.pendingInput = tableRuntimeMeta.getPendingInput(); optimizingMetrics = new TableOptimizingMetrics(tableIdentifier); optimizingMetrics.statusChanged(optimizingStatus, this.currentStatusStartTime); + this.targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId(); + this.targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId(); + this.fromSequence = tableRuntimeMeta.getFromSequence(); + this.toSequence = tableRuntimeMeta.getToSequence(); } public void recover(OptimizingProcess optimizingProcess) { @@ -474,6 +487,38 @@ public void setLastPlanTime(long lastPlanTime) { this.lastPlanTime = lastPlanTime; } + public long getTargetSnapshotId() { + return targetSnapshotId; + } + + public void setTargetSnapshotId(long targetSnapshotId) { + this.targetSnapshotId = targetSnapshotId; + } + + public long getTargetChangeSnapshotId() { + return targetChangeSnapshotId; + } + + public void setTargetChangeSnapshotId(long targetChangeSnapshotId) { + this.targetChangeSnapshotId = targetChangeSnapshotId; + } + + public Map getFromSequence() { + return fromSequence; + } + + public void setFromSequence(Map fromSequence) { + this.fromSequence = fromSequence; + } + + public Map getToSequence() { + return toSequence; + } + + public void setToSequence(Map toSequence) { + this.toSequence = toSequence; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -487,6 +532,10 @@ public String toString() { .add("lastFullOptimizingTime", lastFullOptimizingTime) .add("lastMinorOptimizingTime", lastMinorOptimizingTime) .add("tableConfiguration", tableConfiguration) + .add("targetSnapshotId", targetSnapshotId) + .add("targetChangeSnapshotId", targetChangeSnapshotId) + .add("fromSequence", fromSequence) + .add("toSequence", toSequence) .toString(); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java index 6b87db18b6..9726776c0d 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java @@ -23,6 +23,9 @@ import org.apache.amoro.api.ServerTableIdentifier; import org.apache.amoro.api.TableIdentifier; import org.apache.amoro.server.catalog.CatalogService; +import org.apache.amoro.server.persistence.TableRuntimeMeta; + +import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -88,4 +91,22 @@ Blocker block( * @return block list */ List getBlockers(TableIdentifier tableIdentifier); + + /** + * Get the table info from database for given parameters. + * + * @param optimizerGroup The optimizer group of the table associated to. will be if we want the + * info for all groups. + * @param fuzzyDbName the fuzzy db name used to filter the result, will be null if no filter set. + * @param fuzzyTableName the fuzzy table name used to filter the result, will be null if no filter + * set. + * @param limit How many entries we want to retrieve. + * @param offset The entries we'll skip when retrieving the entries. + */ + List getTableRuntimes( + String optimizerGroup, + @Nullable String fuzzyDbName, + @Nullable String fuzzyTableName, + int limit, + int offset); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java index 29edacac92..b85445cf0b 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java @@ -25,7 +25,6 @@ import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -63,10 +62,9 @@ protected BaseTableExecutor(TableManager tableManager, int poolSize) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeMetaList) { tableRuntimeMetaList.stream() - .map(tableRuntimeMeta -> tableRuntimeMeta.getTableRuntime()) - .filter(tableRuntime -> enabled(tableRuntime)) + .filter(this::enabled) .forEach( tableRuntime -> { if (scheduledTables.add(tableRuntime.getTableIdentifier())) { @@ -109,7 +107,8 @@ protected String getThreadName() { } private boolean isExecutable(TableRuntime tableRuntime) { - return tableManager.contains(tableRuntime.getTableIdentifier()) && enabled(tableRuntime); + return tableManager.contains(tableRuntime.getTableIdentifier().getId()) + && enabled(tableRuntime); } @Override diff --git a/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql b/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql index e7856a57df..09dc615d07 100644 --- a/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql +++ b/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql @@ -111,7 +111,7 @@ CREATE TABLE table_runtime ( last_major_optimizing_time TIMESTAMP, last_minor_optimizing_time TIMESTAMP, last_full_optimizing_time TIMESTAMP, - optimizing_status VARCHAR(20) DEFAULT 'IDLE', + optimizing_status_code INT DEFAULT 7, optimizing_status_start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, optimizing_process_id BIGINT NOT NULL, optimizer_group VARCHAR(64) NOT NULL, diff --git a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.7.0-to-0.7.1.sql b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.7.0-to-0.7.1.sql new file mode 100644 index 0000000000..dd2312f388 --- /dev/null +++ b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.7.0-to-0.7.1.sql @@ -0,0 +1,24 @@ +RENAME TABLE table_runtime TO table_runtime_backup; +CREATE TABLE table_runtime LIKE table_runtime_backup; + +ALTER TABLE table_runtime CHANGE COLUMN optimizing_status optimizing_status_code INT DEFAULT 7; +CREATE INDEX idx_optimizer_status_and_time ON table_runtime(optimizing_status_code, optimizing_status_start_time DESC); + +INSERT INTO table_runtime( + `table_id`,`catalog_name`, `db_name`, `table_name`, `current_snapshot_id`,`current_change_snapshotId`, `last_optimized_snapshotId`, + `last_optimized_change_snapshotId`, `last_major_optimizing_time`, `last_minor_optimizing_time`, `last_full_optimizing_time`, + `optimizing_status_code`, `optimizing_status_start_time`, `optimizing_process_id`, `optimizer_group`, `table_config`, + `optimizing_config`, `pending_input`) +SELECT `table_id`,`catalog_name`, `db_name`, `table_name`, `current_snapshot_id`,`current_change_snapshotId`, `last_optimized_snapshotId`, + `last_optimized_change_snapshotId`, `last_major_optimizing_time`, `last_minor_optimizing_time`, `last_full_optimizing_time`, + CASE + WHEN `optimizing_status` = 'IDLE' THEN 6 + WHEN `optimizing_status` = 'PENDING' THEN 5 + WHEN `optimizing_status` = 'PLANNING' THEN 4 + WHEN `optimizing_status` = 'COMMITTING' THEN 3 + WHEN `optimizing_status` = 'MINOR_OPTIMIZING' THEN 2 + WHEN `optimizing_status` = 'MAJOR_OPTIMIZING' THEN 1 + WHEN `optimizing_status` = 'FULL_OPTIMIZING' THEN 0 + END, + `optimizing_status_start_time`, `optimizing_process_id`, `optimizer_group`, `table_config`, `optimizing_config`, `pending_input` +FROM table_runtime_backup; \ No newline at end of file diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java index 8f83f4ff98..05b21b7838 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java @@ -121,7 +121,7 @@ protected TableMetadata getTableMetadata(TableIdentifier identifier) { protected TableRuntime getTableRuntime(TableIdentifier identifier) { ServerTableIdentifier serverTableIdentifier = getServerTableIdentifier(identifier); - return tableService.getRuntime(serverTableIdentifier); + return tableService.getRuntime(serverTableIdentifier.getId()); } protected void assertTableRuntime(TableIdentifier identifier, TableFormat format) { diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index 01631f073c..ea21898d0c 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -114,7 +114,7 @@ private void initTableWithFiles() { (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); appendData(mixedTable.asUnkeyedTable(), 1); appendData(mixedTable.asUnkeyedTable(), 2); - TableRuntime runtime = tableService().getRuntime(serverTableIdentifier()); + TableRuntime runtime = tableService().getRuntime(serverTableIdentifier().getId()); runtime.refresh(tableService().loadTable(serverTableIdentifier())); } @@ -387,10 +387,13 @@ private void assertTaskCompleted(TaskRuntime taskRuntime) { 0, optimizingService().listTasks(defaultResourceGroup().getName()).size()); Assertions.assertEquals( OptimizingProcess.Status.RUNNING, - tableService().getRuntime(serverTableIdentifier()).getOptimizingProcess().getStatus()); + tableService() + .getRuntime(serverTableIdentifier().getId()) + .getOptimizingProcess() + .getStatus()); Assertions.assertEquals( OptimizingStatus.COMMITTING, - tableService().getRuntime(serverTableIdentifier()).getOptimizingStatus()); + tableService().getRuntime(serverTableIdentifier().getId()).getOptimizingStatus()); } protected void reload() { @@ -415,7 +418,7 @@ public TableRuntimeRefresher() { } void refreshPending() { - execute(tableService().getRuntime(serverTableIdentifier())); + execute(tableService().getRuntime(serverTableIdentifier().getId())); } } diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java new file mode 100644 index 0000000000..77c08a9a0e --- /dev/null +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.optimizing; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class OptimizingStatusTest { + @Test + public void testOptimizingStatusCodeValue() { + assertEquals(7, OptimizingStatus.values().length); + + assertEquals(OptimizingStatus.FULL_OPTIMIZING, OptimizingStatus.ofCode(0)); + assertEquals(OptimizingStatus.MAJOR_OPTIMIZING, OptimizingStatus.ofCode(1)); + assertEquals(OptimizingStatus.MINOR_OPTIMIZING, OptimizingStatus.ofCode(2)); + assertEquals(OptimizingStatus.COMMITTING, OptimizingStatus.ofCode(3)); + assertEquals(OptimizingStatus.PLANNING, OptimizingStatus.ofCode(4)); + assertEquals(OptimizingStatus.PENDING, OptimizingStatus.ofCode(5)); + assertEquals(OptimizingStatus.IDLE, OptimizingStatus.ofCode(6)); + } +} diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java index 25e24565ee..8722267a45 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java @@ -45,12 +45,12 @@ import org.apache.amoro.optimizing.TableOptimizing; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.metrics.MetricRegistry; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.OptimizerThread; import org.apache.amoro.server.resource.QuotaProvider; import org.apache.amoro.server.table.AMSTableTestBase; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.MixedTable; @@ -103,7 +103,7 @@ protected static ResourceGroup testResourceGroup() { return new ResourceGroup.Builder("test", "local").build(); } - protected OptimizingQueue buildOptimizingGroupService(TableRuntimeMeta tableRuntimeMeta) { + protected OptimizingQueue buildOptimizingGroupService(TableRuntime tableRuntimeMeta) { return new OptimizingQueue( tableService(), testResourceGroup(), @@ -125,7 +125,7 @@ private OptimizingQueue buildOptimizingGroupService() { @Test public void testPollNoTask() { - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntimeMeta = buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup()); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); Assert.assertNull(queue.pollTask(0)); @@ -136,25 +136,25 @@ public void testPollNoTask() { public void testRefreshAndReleaseTable() { OptimizingQueue queue = buildOptimizingGroupService(); Assert.assertEquals(0, queue.getSchedulingPolicy().getTableRuntimeMap().size()); - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntime = buildTableRuntimeMeta(OptimizingStatus.IDLE, defaultResourceGroup()); - queue.refreshTable(tableRuntimeMeta.getTableRuntime()); + queue.refreshTable(tableRuntime); Assert.assertEquals(1, queue.getSchedulingPolicy().getTableRuntimeMap().size()); Assert.assertTrue( queue.getSchedulingPolicy().getTableRuntimeMap().containsKey(serverTableIdentifier())); - queue.releaseTable(tableRuntimeMeta.getTableRuntime()); + queue.releaseTable(tableRuntime); Assert.assertEquals(0, queue.getSchedulingPolicy().getTableRuntimeMap().size()); - queue.refreshTable(tableRuntimeMeta.getTableRuntime()); + queue.refreshTable(tableRuntime); Assert.assertEquals(1, queue.getSchedulingPolicy().getTableRuntimeMap().size()); queue.dispose(); } @Test public void testPollTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); // 1.poll task TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -167,7 +167,7 @@ public void testPollTask() { @Test public void testRetryTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); + TableRuntime tableRuntimeMeta = initTableWithFiles(); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); // 1.poll task @@ -200,8 +200,8 @@ public void testRetryTask() { @Test public void testCommitTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); Assert.assertEquals(0, queue.collectTasks().size()); TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -216,11 +216,11 @@ public void testCommitTask() { Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus()); // 7.commit - OptimizingProcess optimizingProcess = tableRuntimeMeta.getTableRuntime().getOptimizingProcess(); + OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess(); Assert.assertEquals(OptimizingProcess.Status.RUNNING, optimizingProcess.getStatus()); optimizingProcess.commit(); Assert.assertEquals(OptimizingProcess.Status.SUCCESS, optimizingProcess.getStatus()); - Assert.assertNull(tableRuntimeMeta.getTableRuntime().getOptimizingProcess()); + Assert.assertNull(tableRuntime.getOptimizingProcess()); // 8.commit again, throw exceptions, and status not changed. Assert.assertThrows(IllegalStateException.class, optimizingProcess::commit); @@ -232,8 +232,8 @@ public void testCommitTask() { @Test public void testCollectingTasks() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); Assert.assertEquals(0, queue.collectTasks().size()); TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -247,8 +247,8 @@ public void testCollectingTasks() { @Test public void testTaskAndTableMetrics() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); MetricRegistry registry = MetricManager.getInstance().getGlobalRegistry(); Map tagValues = ImmutableMap.of(GROUP_TAG, testResourceGroup().getName()); @@ -299,7 +299,7 @@ public void testTaskAndTableMetrics() { Assert.assertEquals(0, pendingTablesGauge.getValue().longValue()); Assert.assertEquals(1, executingTablesGauge.getValue().longValue()); - OptimizingProcess optimizingProcess = tableRuntimeMeta.getTableRuntime().getOptimizingProcess(); + OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess(); optimizingProcess.commit(); Assert.assertEquals(0, queueTasksGauge.getValue().longValue()); Assert.assertEquals(0, executingTasksGauge.getValue().longValue()); @@ -345,21 +345,19 @@ public void testAddAndRemoveOptimizers() { queue.dispose(); } - protected TableRuntimeMeta initTableWithFiles() { + protected TableRuntime initTableWithFiles() { MixedTable mixedTable = (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); appendData(mixedTable.asUnkeyedTable(), 1); appendData(mixedTable.asUnkeyedTable(), 2); - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntime = buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup()); - TableRuntime runtime = tableRuntimeMeta.getTableRuntime(); - runtime.refresh(tableService().loadTable(serverTableIdentifier())); - return tableRuntimeMeta; + tableRuntime.refresh(tableService().loadTable(serverTableIdentifier())); + return tableRuntime; } - private TableRuntimeMeta buildTableRuntimeMeta( - OptimizingStatus status, ResourceGroup resourceGroup) { + private TableRuntime buildTableRuntimeMeta(OptimizingStatus status, ResourceGroup resourceGroup) { MixedTable mixedTable = (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta(); @@ -371,8 +369,7 @@ private TableRuntimeMeta buildTableRuntimeMeta( tableRuntimeMeta.setTableStatus(status); tableRuntimeMeta.setTableConfig(TableConfiguration.parseConfig(mixedTable.properties())); tableRuntimeMeta.setOptimizerGroup(resourceGroup.getName()); - tableRuntimeMeta.constructTableRuntime(tableService()); - return tableRuntimeMeta; + return new TableRuntime(tableRuntimeMeta, tableService()); } private void appendData(UnkeyedTable table, int id) { diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java index c641fb258d..6c0eef31fc 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java @@ -92,14 +92,15 @@ public void testInitialize() throws Exception { tableService.initialize(); Assert.assertEquals(1, handler.getInitTables().size()); Assert.assertEquals( - createTableId.getId().longValue(), handler.getInitTables().get(0).getTableId()); + (Long) createTableId.getId().longValue(), + handler.getInitTables().get(0).getTableIdentifier().getId()); // test change properties MixedTable mixedTable = (MixedTable) tableService().loadTable(createTableId).originalTable(); mixedTable.updateProperties().set(TableProperties.ENABLE_ORPHAN_CLEAN, "true").commit(); tableService() - .getRuntime(createTableId) + .getRuntime(createTableId.getId()) .refresh(tableService.loadTable(serverTableIdentifier())); Assert.assertEquals(1, handler.getConfigChangedTables().size()); validateTableRuntime(handler.getConfigChangedTables().get(0).first()); @@ -131,7 +132,7 @@ protected DefaultTableService tableService() { static class TestHandler extends RuntimeHandlerChain { - private final List initTables = Lists.newArrayList(); + private final List initTables = Lists.newArrayList(); private final List> statusChangedTables = Lists.newArrayList(); private final List> configChangedTables = @@ -162,7 +163,7 @@ protected void handleTableRemoved(TableRuntime tableRuntime) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeMetaList) { initTables.addAll(tableRuntimeMetaList); } @@ -171,7 +172,7 @@ protected void doDispose() { disposed = true; } - public List getInitTables() { + public List getInitTables() { return initTables; } diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java index 423e09795c..35b1e8c435 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java @@ -71,7 +71,7 @@ public void testLoadTable() { @Test public void testTableContains() { - Assert.assertTrue(tableService().contains(serverTableIdentifier())); + Assert.assertTrue(tableService().contains(serverTableIdentifier().getId())); ServerTableIdentifier copyId = ServerTableIdentifier.of( null, @@ -79,7 +79,7 @@ public void testTableContains() { serverTableIdentifier().getDatabase(), serverTableIdentifier().getTableName(), serverTableIdentifier().getFormat()); - Assert.assertFalse(tableService().contains(copyId)); + Assert.assertFalse(tableService().contains(copyId.getId())); copyId = ServerTableIdentifier.of( serverTableIdentifier().getId(), @@ -87,12 +87,12 @@ public void testTableContains() { serverTableIdentifier().getDatabase(), "unknown", serverTableIdentifier().getFormat()); - Assert.assertFalse(tableService().contains(copyId)); + Assert.assertFalse(tableService().contains(copyId.getId())); } @Test public void testTableRuntime() { - TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier()); + TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier().getId()); validateTableRuntime(tableRuntime); } } diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableService.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableService.java index dea85f1a73..cbf8010c6e 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableService.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableService.java @@ -341,7 +341,7 @@ private boolean isBlocked(BlockableOperation operation) { } private boolean isTableRuntimeBlocked(BlockableOperation operation) { - return tableService().getRuntime(serverTableIdentifier()).isBlocked(operation); + return tableService().getRuntime(serverTableIdentifier().getId()).isBlocked(operation); } private void assertBlockerCnt(int i) {