diff --git a/amoro-ams/amoro-ams-dashboard/pom.xml b/amoro-ams/amoro-ams-dashboard/pom.xml index 1a5e2fb20a..9d8f31c6b1 100644 --- a/amoro-ams/amoro-ams-dashboard/pom.xml +++ b/amoro-ams/amoro-ams-dashboard/pom.xml @@ -140,5 +140,16 @@ + + use-alibaba-mirror + + + use-alibaba-mirror + + + + --registry http://172.17.0.1:8888/repository/npm/ + + diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index 10e6ecea98..00bd1d9b7b 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -49,7 +49,6 @@ import org.apache.amoro.server.table.DefaultTableService; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; @@ -121,17 +120,17 @@ public RuntimeHandlerChain getTableRuntimeHandler() { return tableHandlerChain; } - private void loadOptimizingQueues(List tableRuntimeMetaList) { + private void loadOptimizingQueues(List tableRuntimeMetaList) { List optimizerGroups = getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups); List optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll); - Map> groupToTableRuntimes = + Map> groupToTableRuntimes = tableRuntimeMetaList.stream() - .collect(Collectors.groupingBy(TableRuntimeMeta::getOptimizerGroup)); + .collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup)); optimizerGroups.forEach( group -> { String groupName = group.getName(); - List tableRuntimeMetas = groupToTableRuntimes.remove(groupName); + List tableRuntimeMetas = groupToTableRuntimes.remove(groupName); OptimizingQueue optimizingQueue = new OptimizingQueue( tableService, @@ -455,7 +454,7 @@ public void handleTableRemoved(TableRuntime tableRuntime) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeMetaList) { LOG.info("OptimizerManagementService begin initializing"); loadOptimizingQueues(tableRuntimeMetaList); optimizerKeeper.start(); diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java index aef413d694..e57638f8b4 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java @@ -19,7 +19,6 @@ package org.apache.amoro.server.dashboard.controller; import io.javalin.http.Context; -import org.apache.amoro.api.ServerTableIdentifier; import org.apache.amoro.api.resource.Resource; import org.apache.amoro.api.resource.ResourceGroup; import org.apache.amoro.api.resource.ResourceType; @@ -30,13 +29,13 @@ import org.apache.amoro.server.dashboard.response.OkResponse; import org.apache.amoro.server.dashboard.response.PageResult; import org.apache.amoro.server.dashboard.utils.OptimizingUtil; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.ResourceContainers; import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; import javax.ws.rs.BadRequestException; @@ -67,34 +66,17 @@ public void getOptimizerTables(Context ctx) { Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20); int offset = (page - 1) * pageSize; - List tableRuntimes = new ArrayList<>(); - List tables = tableService.listManagedTables(); - for (ServerTableIdentifier identifier : tables) { - TableRuntime tableRuntime = tableService.getRuntime(identifier); - if (tableRuntime == null) { - continue; - } - if ((ALL_GROUP.equals(optimizerGroup) - || tableRuntime.getOptimizerGroup().equals(optimizerGroup)) - && (StringUtils.isEmpty(dbFilterStr) - || StringUtils.containsIgnoreCase(identifier.getDatabase(), dbFilterStr)) - && (StringUtils.isEmpty(tableFilterStr) - || StringUtils.containsIgnoreCase(identifier.getTableName(), tableFilterStr))) { - tableRuntimes.add(tableRuntime); - } - } - tableRuntimes.sort( - (o1, o2) -> { - // first we compare the status , and then we compare the start time when status are equal; - int statDiff = o1.getOptimizingStatus().compareTo(o2.getOptimizingStatus()); - // status order is asc, startTime order is desc - if (statDiff == 0) { - long timeDiff = o1.getCurrentStatusStartTime() - o2.getCurrentStatusStartTime(); - return timeDiff >= 0 ? (timeDiff == 0 ? 0 : -1) : 1; - } else { - return statDiff; - } - }); + String optimizerGroupUsedInDbFilter = "all".equals(optimizerGroup) ? null : optimizerGroup; + // get all info from underlying table table_runtime + List tableRuntimeBeans = + tableService.getTableRuntimes( + optimizerGroupUsedInDbFilter, dbFilterStr, tableFilterStr, pageSize, offset); + + List tableRuntimes = + tableRuntimeBeans.stream() + .map(meta -> tableService.getRuntime(meta.getTableId())) + .collect(Collectors.toList()); + PageResult amsPageResult = PageResult.of(tableRuntimes, offset, pageSize, OptimizingUtil::buildTableOptimizeInfo); ctx.json(OkResponse.of(amsPageResult)); diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java index 8d6884f47f..dfcea2982b 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java @@ -148,7 +148,7 @@ public void getTableDetail(Context ctx) { if (serverTableIdentifier.isPresent()) { tableSummary.put( "optimizingStatus", - tableService.getRuntime(serverTableIdentifier.get()).getOptimizingStatus()); + tableService.getRuntime(serverTableIdentifier.get().getId()).getOptimizingStatus()); } else { tableSummary.put("optimizingStatus", OptimizingStatus.IDLE); } @@ -650,7 +650,9 @@ public void cancelOptimizingProcess(Context ctx) { tableService.getServerTableIdentifier( TableIdentifier.of(catalog, db, table).buildTableIdentifier()); TableRuntime tableRuntime = - serverTableIdentifier != null ? tableService.getRuntime(serverTableIdentifier) : null; + serverTableIdentifier != null + ? tableService.getRuntime(serverTableIdentifier.getId()) + : null; Preconditions.checkArgument( tableRuntime != null diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index 4752ca62ca..e0cf814abd 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -36,7 +36,6 @@ import org.apache.amoro.server.resource.QuotaProvider; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; @@ -93,7 +92,7 @@ public OptimizingQueue( ResourceGroup optimizerGroup, QuotaProvider quotaProvider, Executor planExecutor, - List tableRuntimeMetaList, + List tableRuntimeMetaList, int maxPlanningParallelism) { Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null"); this.planExecutor = planExecutor; @@ -109,11 +108,10 @@ public OptimizingQueue( tableRuntimeMetaList.forEach(this::initTableRuntime); } - private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) { - TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime(); + private void initTableRuntime(TableRuntime tableRuntime) { if (tableRuntime.getOptimizingStatus().isProcessing() - && tableRuntimeMeta.getOptimizingProcessId() != 0) { - tableRuntime.recover(new TableOptimizingProcess(tableRuntimeMeta)); + && tableRuntime.getOptimizingProcess().getProcessId() != 0) { + tableRuntime.recover(new TableOptimizingProcess(tableRuntime)); } if (tableRuntime.isOptimizingEnabled()) { @@ -122,7 +120,7 @@ private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) { if (!tableRuntime.getOptimizingStatus().isProcessing()) { scheduler.addTable(tableRuntime); } else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) { - tableQueue.offer(new TableOptimizingProcess(tableRuntimeMeta)); + tableQueue.offer(new TableOptimizingProcess(tableRuntime)); } } else { OptimizingProcess process = tableRuntime.getOptimizingProcess(); @@ -376,21 +374,21 @@ public TableOptimizingProcess(OptimizingPlanner planner) { beginAndPersistProcess(); } - public TableOptimizingProcess(TableRuntimeMeta tableRuntimeMeta) { - processId = tableRuntimeMeta.getOptimizingProcessId(); - tableRuntime = tableRuntimeMeta.getTableRuntime(); - optimizingType = tableRuntimeMeta.getOptimizingType(); - targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId(); - targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId(); - planTime = tableRuntimeMeta.getPlanTime(); - if (tableRuntimeMeta.getFromSequence() != null) { - fromSequence = tableRuntimeMeta.getFromSequence(); + public TableOptimizingProcess(TableRuntime tableRuntime) { + processId = tableRuntime.getOptimizingProcess().getProcessId(); + this.tableRuntime = tableRuntime; + optimizingType = tableRuntime.getOptimizingProcess().getOptimizingType(); + targetSnapshotId = tableRuntime.getTargetSnapshotId(); + targetChangeSnapshotId = tableRuntime.getTargetChangeSnapshotId(); + planTime = tableRuntime.getLastPlanTime(); + if (tableRuntime.getFromSequence() != null) { + fromSequence = tableRuntime.getFromSequence(); } - if (tableRuntimeMeta.getToSequence() != null) { - toSequence = tableRuntimeMeta.getToSequence(); + if (tableRuntime.getToSequence() != null) { + toSequence = tableRuntime.getToSequence(); } if (this.status != OptimizingProcess.Status.CLOSED) { - tableRuntimeMeta.getTableRuntime().recover(this); + tableRuntime.recover(this); } loadTaskRuntimes(this); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java index 7faca66261..c9d3c152ab 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java @@ -19,20 +19,23 @@ package org.apache.amoro.server.optimizing; public enum OptimizingStatus { - FULL_OPTIMIZING("full", true), - MAJOR_OPTIMIZING("major", true), - MINOR_OPTIMIZING("minor", true), - COMMITTING("committing", true), - PLANNING("planning", false), - PENDING("pending", false), - IDLE("idle", false); + FULL_OPTIMIZING("full", true, 0), + MAJOR_OPTIMIZING("major", true, 1), + MINOR_OPTIMIZING("minor", true, 2), + COMMITTING("committing", true, 3), + PLANNING("planning", false, 4), + PENDING("pending", false, 5), + IDLE("idle", false, 6); private final String displayValue; private final boolean isProcessing; - OptimizingStatus(String displayValue, boolean isProcessing) { + private final int code; + + OptimizingStatus(String displayValue, boolean isProcessing, int code) { this.displayValue = displayValue; this.isProcessing = isProcessing; + this.code = code; } public boolean isProcessing() { @@ -42,4 +45,16 @@ public boolean isProcessing() { public String displayValue() { return displayValue; } + + public int getCode() { + return code; + } + + public static OptimizingStatus ofCode(int code) { + if (code >= 0 && code <= values().length) { + return values()[code]; + } + + return null; + } } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java similarity index 94% rename from amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java rename to amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java index 92ca2e6a36..51d7f0fda1 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.table; +package org.apache.amoro.server.persistence; import org.apache.amoro.TableFormat; import org.apache.amoro.api.config.TableConfiguration; @@ -27,6 +27,7 @@ import java.util.Map; +/** The class for table used when transfer data from/to database. */ public class TableRuntimeMeta { private long tableId; private String catalogName; @@ -57,24 +58,8 @@ public class TableRuntimeMeta { private Map fromSequence; private Map toSequence; - private TableRuntime tableRuntime; - public TableRuntimeMeta() {} - public TableRuntime constructTableRuntime(TableManager initializer) { - if (tableRuntime == null) { - tableRuntime = new TableRuntime(this, initializer); - } - return tableRuntime; - } - - public TableRuntime getTableRuntime() { - if (tableRuntime == null) { - throw new IllegalStateException("TableRuntime is not constructed yet."); - } - return tableRuntime; - } - public long getTargetSnapshotId() { return targetSnapshotId; } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java new file mode 100644 index 0000000000..d314bb41bc --- /dev/null +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence.converter; + +import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.ibatis.type.BaseTypeHandler; +import org.apache.ibatis.type.JdbcType; +import org.apache.ibatis.type.MappedJdbcTypes; +import org.apache.ibatis.type.MappedTypes; + +import java.sql.CallableStatement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +@MappedJdbcTypes(JdbcType.INTEGER) +@MappedTypes(Enum.class) +public class OptimizingStatusConverter extends BaseTypeHandler { + + @Override + public void setNonNullParameter( + PreparedStatement ps, int i, OptimizingStatus parameter, JdbcType jdbcType) + throws SQLException { + ps.setInt(i, parameter.getCode()); + } + + @Override + public OptimizingStatus getNullableResult(ResultSet rs, String columnName) throws SQLException { + String s = rs.getString(columnName); + return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s)); + } + + @Override + public OptimizingStatus getNullableResult(ResultSet rs, int columnIndex) throws SQLException { + String s = rs.getString(columnIndex); + return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s)); + } + + @Override + public OptimizingStatus getNullableResult(CallableStatement cs, int columnIndex) + throws SQLException { + String s = cs.getString(columnIndex); + return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s)); + } +} diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java index b01e4ff4c4..5a9dc52a99 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java @@ -19,13 +19,14 @@ package org.apache.amoro.server.persistence.mapper; import org.apache.amoro.api.ServerTableIdentifier; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.converter.JsonObjectConverter; import org.apache.amoro.server.persistence.converter.Long2TsConverter; import org.apache.amoro.server.persistence.converter.Map2StringConverter; import org.apache.amoro.server.persistence.converter.MapLong2StringConverter; +import org.apache.amoro.server.persistence.converter.OptimizingStatusConverter; import org.apache.amoro.server.table.TableMetadata; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Options; @@ -321,7 +322,8 @@ List selectTableIdentifiersByCatalog( + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " last_full_optimizing_time = #{runtime.lastFullOptimizingTime," + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," - + " optimizing_status = #{runtime.optimizingStatus}," + + " optimizing_status_code = #{runtime.optimizingStatu," + + "typeHandler=org.apache.amoro.server.persistence.converter.OptimizingStatusConverter}," + " optimizing_status_start_time = #{runtime.currentStatusStartTime," + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " optimizing_process_id = #{runtime.processId}," @@ -340,7 +342,7 @@ List selectTableIdentifiersByCatalog( "INSERT INTO table_runtime (table_id, catalog_name, db_name, table_name, current_snapshot_id," + " current_change_snapshotId, last_optimized_snapshotId, last_optimized_change_snapshotId," + " last_major_optimizing_time, last_minor_optimizing_time," - + " last_full_optimizing_time, optimizing_status, optimizing_status_start_time, optimizing_process_id," + + " last_full_optimizing_time, optimizing_status_code, optimizing_status_start_time, optimizing_process_id," + " optimizer_group, table_config, pending_input) VALUES" + " (#{runtime.tableIdentifier.id}, #{runtime.tableIdentifier.catalog}," + " #{runtime.tableIdentifier.database}, #{runtime.tableIdentifier.tableName}, #{runtime" @@ -352,7 +354,8 @@ List selectTableIdentifiersByCatalog( + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " #{runtime.lastFullOptimizingTime," + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," - + " #{runtime.optimizingStatus}," + + " #{runtime.optimizingStatus," + + " typeHandler=org.apache.amoro.server.persistence.converter.OptimizingStatusConverter}," + " #{runtime.currentStatusStartTime, " + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " #{runtime.processId}, #{runtime.optimizerGroup}," @@ -365,7 +368,7 @@ List selectTableIdentifiersByCatalog( @Select( "SELECT a.table_id, a.catalog_name, a.db_name, a.table_name, i.format, a.current_snapshot_id, a" + ".current_change_snapshotId, a.last_optimized_snapshotId, a.last_optimized_change_snapshotId," - + " a.last_major_optimizing_time, a.last_minor_optimizing_time, a.last_full_optimizing_time, a.optimizing_status," + + " a.last_major_optimizing_time, a.last_minor_optimizing_time, a.last_full_optimizing_time, a.optimizing_status_code," + " a.optimizing_status_start_time, a.optimizing_process_id," + " a.optimizer_group, a.table_config, a.pending_input, b.optimizing_type, b.target_snapshot_id," + " b.target_change_snapshot_id, b.plan_time, b.from_sequence, b.to_sequence FROM table_runtime a" @@ -395,7 +398,10 @@ List selectTableIdentifiersByCatalog( property = "lastFullOptimizingTime", column = "last_full_optimizing_time", typeHandler = Long2TsConverter.class), - @Result(property = "tableStatus", column = "optimizing_status"), + @Result( + property = "tableStatus", + column = "optimizing_status_code", + typeHandler = OptimizingStatusConverter.class), @Result( property = "currentStatusStartTime", column = "optimizing_status_start_time", @@ -424,4 +430,73 @@ List selectTableIdentifiersByCatalog( typeHandler = MapLong2StringConverter.class) }) List selectTableRuntimeMetas(); + + @Select( + "") + @Results({ + @Result(property = "tableId", column = "table_id"), + @Result(property = "catalogName", column = "catalog_name"), + @Result(property = "dbName", column = "db_name"), + @Result(property = "tableName", column = "table_name"), + @Result(property = "currentSnapshotId", column = "current_snapshot_id"), + @Result(property = "currentChangeSnapshotId", column = "current_change_snapshotId"), + @Result(property = "lastOptimizedSnapshotId", column = "last_optimized_snapshotId"), + @Result( + property = "lastOptimizedChangeSnapshotId", + column = "last_optimized_change_snapshotId"), + @Result( + property = "lastMajorOptimizingTime", + column = "last_major_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "lastMinorOptimizingTime", + column = "last_minor_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "lastFullOptimizingTime", + column = "last_full_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "tableStatus", + column = "optimizing_status_code", + typeHandler = OptimizingStatusConverter.class), + @Result( + property = "currentStatusStartTime", + column = "optimizing_status_start_time", + typeHandler = Long2TsConverter.class), + @Result(property = "optimizingProcessId", column = "optimizing_process_id"), + @Result(property = "optimizerGroup", column = "optimizer_group"), + @Result( + property = "pendingInput", + column = "pending_input", + typeHandler = JsonObjectConverter.class), + @Result( + property = "tableConfig", + column = "table_config", + typeHandler = JsonObjectConverter.class), + }) + List selectTableRuntimesForOptimizerGroup( + @Param("optimizerGroup") String optimizerGroup, + @Param("fuzzyDbName") String fuzzyDbName, + @Param("fuzzyTableName") String fuzzyTableName, + @Param("limitCount") int limitCount, + @Param("offsetNum") int offset); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index b3266b9d7c..f18c524bde 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -40,6 +40,7 @@ import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.persistence.StatedPersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; import org.apache.amoro.server.table.blocker.TableBlocker; @@ -55,6 +56,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -80,8 +84,7 @@ public class DefaultTableService extends StatedPersistentBase implements TableSe private final Map internalCatalogMap = new ConcurrentHashMap<>(); private final Map externalCatalogMap = new ConcurrentHashMap<>(); - private final Map tableRuntimeMap = - new ConcurrentHashMap<>(); + private final Map tableRuntimeMap = new ConcurrentHashMap<>(); private final ScheduledExecutorService tableExplorerScheduler = Executors.newSingleThreadScheduledExecutor( @@ -138,12 +141,6 @@ public ServerCatalog getServerCatalog(String catalogName) { .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); } - @Override - public InternalCatalog getInternalCatalog(String catalogName) { - return Optional.ofNullable(internalCatalogMap.get(catalogName)) - .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); - } - @Override public void createCatalog(CatalogMeta catalogMeta) { checkStarted(); @@ -208,7 +205,7 @@ public void dropTableMetadata(TableIdentifier tableIdentifier, boolean deleteDat } ServerTableIdentifier serverTableIdentifier = internalCatalog.dropTable(database, table); - Optional.ofNullable(tableRuntimeMap.remove(serverTableIdentifier)) + Optional.ofNullable(tableRuntimeMap.remove(serverTableIdentifier.getId())) .ifPresent( tableRuntime -> { if (headHandler != null) { @@ -260,7 +257,7 @@ public Blocker block( @Override public void releaseBlocker(TableIdentifier tableIdentifier, String blockerId) { checkStarted(); - TableRuntime tableRuntime = getRuntime(getServerTableIdentifier(tableIdentifier)); + TableRuntime tableRuntime = getRuntime(getServerTableIdentifier(tableIdentifier).getId()); if (tableRuntime != null) { tableRuntime.release(blockerId); } @@ -281,6 +278,26 @@ public List getBlockers(TableIdentifier tableIdentifier) { .collect(Collectors.toList()); } + @Override + public List getTableRuntimes( + String optimizerGroup, + @Nullable String fuzzyDbName, + @Nullable String fuzzyTableName, + int limit, + int offset) { + checkStarted(); + return getAs( + TableMetaMapper.class, + mapper -> + mapper.selectTableRuntimesForOptimizerGroup( + optimizerGroup, fuzzyDbName, fuzzyTableName, limit, offset)); + } + + public InternalCatalog getInternalCatalog(String catalogName) { + return Optional.ofNullable(internalCatalogMap.get(catalogName)) + .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); + } + @Override public void addHandlerChain(RuntimeHandlerChain handler) { checkNotStarted(); @@ -313,15 +330,17 @@ public void initialize() { List tableRuntimeMetaList = getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas); + List tableRuntimes = new ArrayList<>(tableRuntimeMetaList.size()); tableRuntimeMetaList.forEach( tableRuntimeMeta -> { - TableRuntime tableRuntime = tableRuntimeMeta.constructTableRuntime(this); - tableRuntimeMap.put(tableRuntime.getTableIdentifier(), tableRuntime); + TableRuntime tableRuntime = new TableRuntime(tableRuntimeMeta, this); + tableRuntimeMap.put(tableRuntimeMeta.getTableId(), tableRuntime); tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); + tableRuntimes.add(tableRuntime); }); if (headHandler != null) { - headHandler.initialize(tableRuntimeMetaList); + headHandler.initialize(tableRuntimes); } if (tableExplorerExecutors == null) { int threadCount = @@ -348,7 +367,7 @@ public void initialize() { private TableRuntime getAndCheckExist(ServerTableIdentifier tableIdentifier) { Preconditions.checkArgument(tableIdentifier != null, "tableIdentifier cannot be null"); - TableRuntime tableRuntime = getRuntime(tableIdentifier); + TableRuntime tableRuntime = getRuntime(tableIdentifier.getId()); if (tableRuntime == null) { throw new ObjectNotExistsException(tableIdentifier); } @@ -384,15 +403,15 @@ private ServerTableIdentifier getOrSyncServerTableIdentifier(TableIdentifier id) } @Override - public TableRuntime getRuntime(ServerTableIdentifier tableIdentifier) { + public TableRuntime getRuntime(Long tableId) { checkStarted(); - return tableRuntimeMap.get(tableIdentifier); + return tableRuntimeMap.get(tableId); } @Override - public boolean contains(ServerTableIdentifier tableIdentifier) { + public boolean contains(Long tableId) { checkStarted(); - return tableRuntimeMap.containsKey(tableIdentifier); + return tableRuntimeMap.containsKey(tableId); } public void dispose() { @@ -582,7 +601,7 @@ private boolean triggerTableAdded( } } TableRuntime tableRuntime = new TableRuntime(serverTableIdentifier, this, table.properties()); - tableRuntimeMap.put(serverTableIdentifier, tableRuntime); + tableRuntimeMap.put(serverTableIdentifier.getId(), tableRuntime); tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); if (headHandler != null) { headHandler.fireTableAdded(table, tableRuntime); @@ -596,7 +615,7 @@ private void revertTableRuntimeAdded( externalCatalog.getServerTableIdentifier( tableIdentity.getDatabase(), tableIdentity.getTableName()); if (tableIdentifier != null) { - tableRuntimeMap.remove(tableIdentifier); + tableRuntimeMap.remove(tableIdentifier.getId()); } } @@ -608,7 +627,7 @@ private void disposeTable(ServerTableIdentifier tableIdentifier) { tableIdentifier.getCatalog(), tableIdentifier.getDatabase(), tableIdentifier.getTableName())); - Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier)) + Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier.getId())) .ifPresent( tableRuntime -> { if (headHandler != null) { diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java index bb3e64ceef..3331b427c1 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java @@ -51,16 +51,15 @@ protected void appendNext(RuntimeHandlerChain handler) { } } - public final void initialize(List tableRuntimeMetaList) { - List supportedtableRuntimeMetaList = - tableRuntimeMetaList.stream() - .filter( - tableRuntimeMeta -> formatSupported(tableRuntimeMeta.getTableRuntime().getFormat())) + public final void initialize(List tableRuntimes) { + List supportedtableRuntimeMetaList = + tableRuntimes.stream() + .filter(runtime -> formatSupported(runtime.getFormat())) .collect(Collectors.toList()); initHandler(supportedtableRuntimeMetaList); initialized = true; if (next != null) { - next.initialize(tableRuntimeMetaList); + next.initialize(tableRuntimes); } } @@ -147,7 +146,7 @@ protected abstract void handleConfigChanged( protected abstract void handleTableRemoved(TableRuntime tableRuntime); - protected abstract void initHandler(List tableRuntimeMetaList); + protected abstract void initHandler(List tableRuntimeMetaList); protected abstract void doDispose(); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableManager.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableManager.java index d5fffcdf82..65a1aa3456 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableManager.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableManager.java @@ -31,9 +31,9 @@ public interface TableManager extends TableRuntimeHandler { */ AmoroTable loadTable(ServerTableIdentifier tableIdentifier); - TableRuntime getRuntime(ServerTableIdentifier tableIdentifier); + TableRuntime getRuntime(Long tableId); - default boolean contains(ServerTableIdentifier tableIdentifier) { - return getRuntime(tableIdentifier) != null; + default boolean contains(Long tableId) { + return getRuntime(tableId) != null; } } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java index e97143133c..fd335bf3ce 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java @@ -35,6 +35,7 @@ import org.apache.amoro.server.optimizing.TaskRuntime; import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator; import org.apache.amoro.server.persistence.StatedPersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.OptimizingMapper; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; @@ -97,7 +98,14 @@ public class TableRuntime extends StatedPersistentBase { private final TableOptimizingMetrics optimizingMetrics; private final ReentrantLock blockerLock = new ReentrantLock(); - protected TableRuntime( + private long targetSnapshotId; + + private long targetChangeSnapshotId; + + private Map fromSequence; + private Map toSequence; + + public TableRuntime( ServerTableIdentifier tableIdentifier, TableRuntimeHandler tableHandler, Map properties) { @@ -111,9 +119,10 @@ protected TableRuntime( optimizingMetrics = new TableOptimizingMetrics(tableIdentifier); } - protected TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler tableHandler) { + public TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler tableHandler) { Preconditions.checkNotNull(tableRuntimeMeta, "TableRuntimeMeta must not be null."); Preconditions.checkNotNull(tableHandler, "TableRuntimeHandler must not be null."); + this.tableHandler = tableHandler; this.tableIdentifier = ServerTableIdentifier.of( @@ -140,6 +149,10 @@ protected TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler ta this.pendingInput = tableRuntimeMeta.getPendingInput(); optimizingMetrics = new TableOptimizingMetrics(tableIdentifier); optimizingMetrics.statusChanged(optimizingStatus, this.currentStatusStartTime); + this.targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId(); + this.targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId(); + this.fromSequence = tableRuntimeMeta.getFromSequence(); + this.toSequence = tableRuntimeMeta.getToSequence(); } public void recover(OptimizingProcess optimizingProcess) { @@ -474,6 +487,38 @@ public void setLastPlanTime(long lastPlanTime) { this.lastPlanTime = lastPlanTime; } + public long getTargetSnapshotId() { + return targetSnapshotId; + } + + public void setTargetSnapshotId(long targetSnapshotId) { + this.targetSnapshotId = targetSnapshotId; + } + + public long getTargetChangeSnapshotId() { + return targetChangeSnapshotId; + } + + public void setTargetChangeSnapshotId(long targetChangeSnapshotId) { + this.targetChangeSnapshotId = targetChangeSnapshotId; + } + + public Map getFromSequence() { + return fromSequence; + } + + public void setFromSequence(Map fromSequence) { + this.fromSequence = fromSequence; + } + + public Map getToSequence() { + return toSequence; + } + + public void setToSequence(Map toSequence) { + this.toSequence = toSequence; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -487,6 +532,10 @@ public String toString() { .add("lastFullOptimizingTime", lastFullOptimizingTime) .add("lastMinorOptimizingTime", lastMinorOptimizingTime) .add("tableConfiguration", tableConfiguration) + .add("targetSnapshotId", targetSnapshotId) + .add("targetChangeSnapshotId", targetChangeSnapshotId) + .add("fromSequence", fromSequence) + .add("toSequence", toSequence) .toString(); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java index 6b87db18b6..9726776c0d 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableService.java @@ -23,6 +23,9 @@ import org.apache.amoro.api.ServerTableIdentifier; import org.apache.amoro.api.TableIdentifier; import org.apache.amoro.server.catalog.CatalogService; +import org.apache.amoro.server.persistence.TableRuntimeMeta; + +import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -88,4 +91,22 @@ Blocker block( * @return block list */ List getBlockers(TableIdentifier tableIdentifier); + + /** + * Get the table info from database for given parameters. + * + * @param optimizerGroup The optimizer group of the table associated to. will be if we want the + * info for all groups. + * @param fuzzyDbName the fuzzy db name used to filter the result, will be null if no filter set. + * @param fuzzyTableName the fuzzy table name used to filter the result, will be null if no filter + * set. + * @param limit How many entries we want to retrieve. + * @param offset The entries we'll skip when retrieving the entries. + */ + List getTableRuntimes( + String optimizerGroup, + @Nullable String fuzzyDbName, + @Nullable String fuzzyTableName, + int limit, + int offset); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java index 29edacac92..b85445cf0b 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java @@ -25,7 +25,6 @@ import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -63,10 +62,9 @@ protected BaseTableExecutor(TableManager tableManager, int poolSize) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeMetaList) { tableRuntimeMetaList.stream() - .map(tableRuntimeMeta -> tableRuntimeMeta.getTableRuntime()) - .filter(tableRuntime -> enabled(tableRuntime)) + .filter(this::enabled) .forEach( tableRuntime -> { if (scheduledTables.add(tableRuntime.getTableIdentifier())) { @@ -109,7 +107,8 @@ protected String getThreadName() { } private boolean isExecutable(TableRuntime tableRuntime) { - return tableManager.contains(tableRuntime.getTableIdentifier()) && enabled(tableRuntime); + return tableManager.contains(tableRuntime.getTableIdentifier().getId()) + && enabled(tableRuntime); } @Override diff --git a/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql b/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql index e7856a57df..09dc615d07 100644 --- a/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql +++ b/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql @@ -111,7 +111,7 @@ CREATE TABLE table_runtime ( last_major_optimizing_time TIMESTAMP, last_minor_optimizing_time TIMESTAMP, last_full_optimizing_time TIMESTAMP, - optimizing_status VARCHAR(20) DEFAULT 'IDLE', + optimizing_status_code INT DEFAULT 7, optimizing_status_start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, optimizing_process_id BIGINT NOT NULL, optimizer_group VARCHAR(64) NOT NULL, diff --git a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.7.0-to-0.7.1.sql b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.7.0-to-0.7.1.sql new file mode 100644 index 0000000000..dd2312f388 --- /dev/null +++ b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.7.0-to-0.7.1.sql @@ -0,0 +1,24 @@ +RENAME TABLE table_runtime TO table_runtime_backup; +CREATE TABLE table_runtime LIKE table_runtime_backup; + +ALTER TABLE table_runtime CHANGE COLUMN optimizing_status optimizing_status_code INT DEFAULT 7; +CREATE INDEX idx_optimizer_status_and_time ON table_runtime(optimizing_status_code, optimizing_status_start_time DESC); + +INSERT INTO table_runtime( + `table_id`,`catalog_name`, `db_name`, `table_name`, `current_snapshot_id`,`current_change_snapshotId`, `last_optimized_snapshotId`, + `last_optimized_change_snapshotId`, `last_major_optimizing_time`, `last_minor_optimizing_time`, `last_full_optimizing_time`, + `optimizing_status_code`, `optimizing_status_start_time`, `optimizing_process_id`, `optimizer_group`, `table_config`, + `optimizing_config`, `pending_input`) +SELECT `table_id`,`catalog_name`, `db_name`, `table_name`, `current_snapshot_id`,`current_change_snapshotId`, `last_optimized_snapshotId`, + `last_optimized_change_snapshotId`, `last_major_optimizing_time`, `last_minor_optimizing_time`, `last_full_optimizing_time`, + CASE + WHEN `optimizing_status` = 'IDLE' THEN 6 + WHEN `optimizing_status` = 'PENDING' THEN 5 + WHEN `optimizing_status` = 'PLANNING' THEN 4 + WHEN `optimizing_status` = 'COMMITTING' THEN 3 + WHEN `optimizing_status` = 'MINOR_OPTIMIZING' THEN 2 + WHEN `optimizing_status` = 'MAJOR_OPTIMIZING' THEN 1 + WHEN `optimizing_status` = 'FULL_OPTIMIZING' THEN 0 + END, + `optimizing_status_start_time`, `optimizing_process_id`, `optimizer_group`, `table_config`, `optimizing_config`, `pending_input` +FROM table_runtime_backup; \ No newline at end of file diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java index 8f83f4ff98..05b21b7838 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java @@ -121,7 +121,7 @@ protected TableMetadata getTableMetadata(TableIdentifier identifier) { protected TableRuntime getTableRuntime(TableIdentifier identifier) { ServerTableIdentifier serverTableIdentifier = getServerTableIdentifier(identifier); - return tableService.getRuntime(serverTableIdentifier); + return tableService.getRuntime(serverTableIdentifier.getId()); } protected void assertTableRuntime(TableIdentifier identifier, TableFormat format) { diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index 01631f073c..ea21898d0c 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -114,7 +114,7 @@ private void initTableWithFiles() { (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); appendData(mixedTable.asUnkeyedTable(), 1); appendData(mixedTable.asUnkeyedTable(), 2); - TableRuntime runtime = tableService().getRuntime(serverTableIdentifier()); + TableRuntime runtime = tableService().getRuntime(serverTableIdentifier().getId()); runtime.refresh(tableService().loadTable(serverTableIdentifier())); } @@ -387,10 +387,13 @@ private void assertTaskCompleted(TaskRuntime taskRuntime) { 0, optimizingService().listTasks(defaultResourceGroup().getName()).size()); Assertions.assertEquals( OptimizingProcess.Status.RUNNING, - tableService().getRuntime(serverTableIdentifier()).getOptimizingProcess().getStatus()); + tableService() + .getRuntime(serverTableIdentifier().getId()) + .getOptimizingProcess() + .getStatus()); Assertions.assertEquals( OptimizingStatus.COMMITTING, - tableService().getRuntime(serverTableIdentifier()).getOptimizingStatus()); + tableService().getRuntime(serverTableIdentifier().getId()).getOptimizingStatus()); } protected void reload() { @@ -415,7 +418,7 @@ public TableRuntimeRefresher() { } void refreshPending() { - execute(tableService().getRuntime(serverTableIdentifier())); + execute(tableService().getRuntime(serverTableIdentifier().getId())); } } diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java new file mode 100644 index 0000000000..77c08a9a0e --- /dev/null +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.optimizing; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class OptimizingStatusTest { + @Test + public void testOptimizingStatusCodeValue() { + assertEquals(7, OptimizingStatus.values().length); + + assertEquals(OptimizingStatus.FULL_OPTIMIZING, OptimizingStatus.ofCode(0)); + assertEquals(OptimizingStatus.MAJOR_OPTIMIZING, OptimizingStatus.ofCode(1)); + assertEquals(OptimizingStatus.MINOR_OPTIMIZING, OptimizingStatus.ofCode(2)); + assertEquals(OptimizingStatus.COMMITTING, OptimizingStatus.ofCode(3)); + assertEquals(OptimizingStatus.PLANNING, OptimizingStatus.ofCode(4)); + assertEquals(OptimizingStatus.PENDING, OptimizingStatus.ofCode(5)); + assertEquals(OptimizingStatus.IDLE, OptimizingStatus.ofCode(6)); + } +} diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java index 25e24565ee..8722267a45 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java @@ -45,12 +45,12 @@ import org.apache.amoro.optimizing.TableOptimizing; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.metrics.MetricRegistry; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.OptimizerThread; import org.apache.amoro.server.resource.QuotaProvider; import org.apache.amoro.server.table.AMSTableTestBase; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.MixedTable; @@ -103,7 +103,7 @@ protected static ResourceGroup testResourceGroup() { return new ResourceGroup.Builder("test", "local").build(); } - protected OptimizingQueue buildOptimizingGroupService(TableRuntimeMeta tableRuntimeMeta) { + protected OptimizingQueue buildOptimizingGroupService(TableRuntime tableRuntimeMeta) { return new OptimizingQueue( tableService(), testResourceGroup(), @@ -125,7 +125,7 @@ private OptimizingQueue buildOptimizingGroupService() { @Test public void testPollNoTask() { - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntimeMeta = buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup()); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); Assert.assertNull(queue.pollTask(0)); @@ -136,25 +136,25 @@ public void testPollNoTask() { public void testRefreshAndReleaseTable() { OptimizingQueue queue = buildOptimizingGroupService(); Assert.assertEquals(0, queue.getSchedulingPolicy().getTableRuntimeMap().size()); - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntime = buildTableRuntimeMeta(OptimizingStatus.IDLE, defaultResourceGroup()); - queue.refreshTable(tableRuntimeMeta.getTableRuntime()); + queue.refreshTable(tableRuntime); Assert.assertEquals(1, queue.getSchedulingPolicy().getTableRuntimeMap().size()); Assert.assertTrue( queue.getSchedulingPolicy().getTableRuntimeMap().containsKey(serverTableIdentifier())); - queue.releaseTable(tableRuntimeMeta.getTableRuntime()); + queue.releaseTable(tableRuntime); Assert.assertEquals(0, queue.getSchedulingPolicy().getTableRuntimeMap().size()); - queue.refreshTable(tableRuntimeMeta.getTableRuntime()); + queue.refreshTable(tableRuntime); Assert.assertEquals(1, queue.getSchedulingPolicy().getTableRuntimeMap().size()); queue.dispose(); } @Test public void testPollTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); // 1.poll task TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -167,7 +167,7 @@ public void testPollTask() { @Test public void testRetryTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); + TableRuntime tableRuntimeMeta = initTableWithFiles(); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); // 1.poll task @@ -200,8 +200,8 @@ public void testRetryTask() { @Test public void testCommitTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); Assert.assertEquals(0, queue.collectTasks().size()); TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -216,11 +216,11 @@ public void testCommitTask() { Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus()); // 7.commit - OptimizingProcess optimizingProcess = tableRuntimeMeta.getTableRuntime().getOptimizingProcess(); + OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess(); Assert.assertEquals(OptimizingProcess.Status.RUNNING, optimizingProcess.getStatus()); optimizingProcess.commit(); Assert.assertEquals(OptimizingProcess.Status.SUCCESS, optimizingProcess.getStatus()); - Assert.assertNull(tableRuntimeMeta.getTableRuntime().getOptimizingProcess()); + Assert.assertNull(tableRuntime.getOptimizingProcess()); // 8.commit again, throw exceptions, and status not changed. Assert.assertThrows(IllegalStateException.class, optimizingProcess::commit); @@ -232,8 +232,8 @@ public void testCommitTask() { @Test public void testCollectingTasks() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); Assert.assertEquals(0, queue.collectTasks().size()); TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -247,8 +247,8 @@ public void testCollectingTasks() { @Test public void testTaskAndTableMetrics() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); MetricRegistry registry = MetricManager.getInstance().getGlobalRegistry(); Map tagValues = ImmutableMap.of(GROUP_TAG, testResourceGroup().getName()); @@ -299,7 +299,7 @@ public void testTaskAndTableMetrics() { Assert.assertEquals(0, pendingTablesGauge.getValue().longValue()); Assert.assertEquals(1, executingTablesGauge.getValue().longValue()); - OptimizingProcess optimizingProcess = tableRuntimeMeta.getTableRuntime().getOptimizingProcess(); + OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess(); optimizingProcess.commit(); Assert.assertEquals(0, queueTasksGauge.getValue().longValue()); Assert.assertEquals(0, executingTasksGauge.getValue().longValue()); @@ -345,21 +345,19 @@ public void testAddAndRemoveOptimizers() { queue.dispose(); } - protected TableRuntimeMeta initTableWithFiles() { + protected TableRuntime initTableWithFiles() { MixedTable mixedTable = (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); appendData(mixedTable.asUnkeyedTable(), 1); appendData(mixedTable.asUnkeyedTable(), 2); - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntime = buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup()); - TableRuntime runtime = tableRuntimeMeta.getTableRuntime(); - runtime.refresh(tableService().loadTable(serverTableIdentifier())); - return tableRuntimeMeta; + tableRuntime.refresh(tableService().loadTable(serverTableIdentifier())); + return tableRuntime; } - private TableRuntimeMeta buildTableRuntimeMeta( - OptimizingStatus status, ResourceGroup resourceGroup) { + private TableRuntime buildTableRuntimeMeta(OptimizingStatus status, ResourceGroup resourceGroup) { MixedTable mixedTable = (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta(); @@ -371,8 +369,7 @@ private TableRuntimeMeta buildTableRuntimeMeta( tableRuntimeMeta.setTableStatus(status); tableRuntimeMeta.setTableConfig(TableConfiguration.parseConfig(mixedTable.properties())); tableRuntimeMeta.setOptimizerGroup(resourceGroup.getName()); - tableRuntimeMeta.constructTableRuntime(tableService()); - return tableRuntimeMeta; + return new TableRuntime(tableRuntimeMeta, tableService()); } private void appendData(UnkeyedTable table, int id) { diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java index c641fb258d..6c0eef31fc 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java @@ -92,14 +92,15 @@ public void testInitialize() throws Exception { tableService.initialize(); Assert.assertEquals(1, handler.getInitTables().size()); Assert.assertEquals( - createTableId.getId().longValue(), handler.getInitTables().get(0).getTableId()); + (Long) createTableId.getId().longValue(), + handler.getInitTables().get(0).getTableIdentifier().getId()); // test change properties MixedTable mixedTable = (MixedTable) tableService().loadTable(createTableId).originalTable(); mixedTable.updateProperties().set(TableProperties.ENABLE_ORPHAN_CLEAN, "true").commit(); tableService() - .getRuntime(createTableId) + .getRuntime(createTableId.getId()) .refresh(tableService.loadTable(serverTableIdentifier())); Assert.assertEquals(1, handler.getConfigChangedTables().size()); validateTableRuntime(handler.getConfigChangedTables().get(0).first()); @@ -131,7 +132,7 @@ protected DefaultTableService tableService() { static class TestHandler extends RuntimeHandlerChain { - private final List initTables = Lists.newArrayList(); + private final List initTables = Lists.newArrayList(); private final List> statusChangedTables = Lists.newArrayList(); private final List> configChangedTables = @@ -162,7 +163,7 @@ protected void handleTableRemoved(TableRuntime tableRuntime) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeMetaList) { initTables.addAll(tableRuntimeMetaList); } @@ -171,7 +172,7 @@ protected void doDispose() { disposed = true; } - public List getInitTables() { + public List getInitTables() { return initTables; } diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java index 423e09795c..35b1e8c435 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java @@ -71,7 +71,7 @@ public void testLoadTable() { @Test public void testTableContains() { - Assert.assertTrue(tableService().contains(serverTableIdentifier())); + Assert.assertTrue(tableService().contains(serverTableIdentifier().getId())); ServerTableIdentifier copyId = ServerTableIdentifier.of( null, @@ -79,7 +79,7 @@ public void testTableContains() { serverTableIdentifier().getDatabase(), serverTableIdentifier().getTableName(), serverTableIdentifier().getFormat()); - Assert.assertFalse(tableService().contains(copyId)); + Assert.assertFalse(tableService().contains(copyId.getId())); copyId = ServerTableIdentifier.of( serverTableIdentifier().getId(), @@ -87,12 +87,12 @@ public void testTableContains() { serverTableIdentifier().getDatabase(), "unknown", serverTableIdentifier().getFormat()); - Assert.assertFalse(tableService().contains(copyId)); + Assert.assertFalse(tableService().contains(copyId.getId())); } @Test public void testTableRuntime() { - TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier()); + TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier().getId()); validateTableRuntime(tableRuntime); } } diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableService.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableService.java index dea85f1a73..cbf8010c6e 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableService.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/TestTableService.java @@ -341,7 +341,7 @@ private boolean isBlocked(BlockableOperation operation) { } private boolean isTableRuntimeBlocked(BlockableOperation operation) { - return tableService().getRuntime(serverTableIdentifier()).isBlocked(operation); + return tableService().getRuntime(serverTableIdentifier().getId()).isBlocked(operation); } private void assertBlockerCnt(int i) {