diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 1dd763f324..20904ec268 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -66,7 +66,6 @@ import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; -import java.io.IOException; import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.Paths; @@ -215,7 +214,7 @@ public void dispose() { MetricManager.dispose(); } - private void initConfig() throws IOException { + private void initConfig() throws Exception { LOG.info("initializing configurations..."); new ConfigurationHelper().init(); } @@ -407,14 +406,14 @@ private class ConfigurationHelper { private JsonNode yamlConfig; - public void init() throws IOException { + public void init() throws Exception { Map envConfig = initEnvConfig(); initServiceConfig(envConfig); setIcebergSystemProperties(); initContainerConfig(); } - private void initServiceConfig(Map envConfig) throws IOException { + private void initServiceConfig(Map envConfig) throws Exception { LOG.info("initializing service configuration..."); String configPath = Environments.getConfigPath() + "/" + SERVER_CONFIG_FILENAME; LOG.info("load config from path: {}", configPath); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index d77b7f9170..1b206ca6e9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -49,7 +49,6 @@ import org.apache.amoro.server.table.DefaultTableService; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; @@ -121,24 +120,24 @@ public RuntimeHandlerChain getTableRuntimeHandler() { return tableHandlerChain; } - private void loadOptimizingQueues(List tableRuntimeMetaList) { + private void loadOptimizingQueues(List tableRuntimeMetaList) { List optimizerGroups = getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups); List optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll); - Map> groupToTableRuntimes = + Map> groupToTableRuntimes = tableRuntimeMetaList.stream() - .collect(Collectors.groupingBy(TableRuntimeMeta::getOptimizerGroup)); + .collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup)); optimizerGroups.forEach( group -> { String groupName = group.getName(); - List tableRuntimeMetas = groupToTableRuntimes.remove(groupName); + List tableRuntimes = groupToTableRuntimes.remove(groupName); OptimizingQueue optimizingQueue = new OptimizingQueue( tableService, group, this, planExecutor, - Optional.ofNullable(tableRuntimeMetas).orElseGet(ArrayList::new), + Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new), maxPlanningParallelism); optimizingQueueByGroup.put(groupName, optimizingQueue); }); @@ -456,9 +455,9 @@ public void handleTableRemoved(TableRuntime tableRuntime) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeList) { LOG.info("OptimizerManagementService begin initializing"); - loadOptimizingQueues(tableRuntimeMetaList); + loadOptimizingQueues(tableRuntimeList); optimizerKeeper.start(); LOG.info("SuspendingDetector for Optimizer has been started."); LOG.info("OptimizerManagementService initializing has completed"); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java index aa5abf386d..c5770be20b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java @@ -19,7 +19,6 @@ package org.apache.amoro.server.dashboard.controller; import io.javalin.http.Context; -import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.resource.Resource; import org.apache.amoro.resource.ResourceGroup; import org.apache.amoro.resource.ResourceType; @@ -30,13 +29,13 @@ import org.apache.amoro.server.dashboard.response.OkResponse; import org.apache.amoro.server.dashboard.response.PageResult; import org.apache.amoro.server.dashboard.utils.OptimizingUtil; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.ResourceContainers; import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; import javax.ws.rs.BadRequestException; @@ -67,34 +66,17 @@ public void getOptimizerTables(Context ctx) { Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20); int offset = (page - 1) * pageSize; - List tableRuntimes = new ArrayList<>(); - List tables = tableService.listManagedTables(); - for (ServerTableIdentifier identifier : tables) { - TableRuntime tableRuntime = tableService.getRuntime(identifier); - if (tableRuntime == null) { - continue; - } - if ((ALL_GROUP.equals(optimizerGroup) - || tableRuntime.getOptimizerGroup().equals(optimizerGroup)) - && (StringUtils.isEmpty(dbFilterStr) - || StringUtils.containsIgnoreCase(identifier.getDatabase(), dbFilterStr)) - && (StringUtils.isEmpty(tableFilterStr) - || StringUtils.containsIgnoreCase(identifier.getTableName(), tableFilterStr))) { - tableRuntimes.add(tableRuntime); - } - } - tableRuntimes.sort( - (o1, o2) -> { - // first we compare the status , and then we compare the start time when status are equal; - int statDiff = o1.getOptimizingStatus().compareTo(o2.getOptimizingStatus()); - // status order is asc, startTime order is desc - if (statDiff == 0) { - long timeDiff = o1.getCurrentStatusStartTime() - o2.getCurrentStatusStartTime(); - return timeDiff >= 0 ? (timeDiff == 0 ? 0 : -1) : 1; - } else { - return statDiff; - } - }); + String optimizerGroupUsedInDbFilter = ALL_GROUP.equals(optimizerGroup) ? null : optimizerGroup; + // get all info from underlying table table_runtime + List tableRuntimeBeans = + tableService.getTableRuntimes( + optimizerGroupUsedInDbFilter, dbFilterStr, tableFilterStr, pageSize, offset); + + List tableRuntimes = + tableRuntimeBeans.stream() + .map(meta -> tableService.getRuntime(meta.getTableId())) + .collect(Collectors.toList()); + PageResult amsPageResult = PageResult.of(tableRuntimes, offset, pageSize, OptimizingUtil::buildTableOptimizeInfo); ctx.json(OkResponse.of(amsPageResult)); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index dd192c1298..77ec96cfc1 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -36,7 +36,6 @@ import org.apache.amoro.server.resource.QuotaProvider; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; @@ -93,7 +92,7 @@ public OptimizingQueue( ResourceGroup optimizerGroup, QuotaProvider quotaProvider, Executor planExecutor, - List tableRuntimeMetaList, + List tableRuntimeList, int maxPlanningParallelism) { Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null"); this.planExecutor = planExecutor; @@ -106,14 +105,12 @@ public OptimizingQueue( new OptimizerGroupMetrics( optimizerGroup.getName(), MetricManager.getInstance().getGlobalRegistry(), this); this.metrics.register(); - tableRuntimeMetaList.forEach(this::initTableRuntime); + tableRuntimeList.forEach(this::initTableRuntime); } - private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) { - TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime(); - if (tableRuntime.getOptimizingStatus().isProcessing() - && tableRuntimeMeta.getOptimizingProcessId() != 0) { - tableRuntime.recover(new TableOptimizingProcess(tableRuntimeMeta)); + private void initTableRuntime(TableRuntime tableRuntime) { + if (tableRuntime.getOptimizingStatus().isProcessing() && tableRuntime.getProcessId() != 0) { + tableRuntime.recover(new TableOptimizingProcess(tableRuntime)); } if (tableRuntime.isOptimizingEnabled()) { @@ -122,7 +119,7 @@ private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) { if (!tableRuntime.getOptimizingStatus().isProcessing()) { scheduler.addTable(tableRuntime); } else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) { - tableQueue.offer(new TableOptimizingProcess(tableRuntimeMeta)); + tableQueue.offer(new TableOptimizingProcess(tableRuntime)); } } else { OptimizingProcess process = tableRuntime.getOptimizingProcess(); @@ -387,21 +384,21 @@ public TableOptimizingProcess(OptimizingPlanner planner) { beginAndPersistProcess(); } - public TableOptimizingProcess(TableRuntimeMeta tableRuntimeMeta) { - processId = tableRuntimeMeta.getOptimizingProcessId(); - tableRuntime = tableRuntimeMeta.getTableRuntime(); - optimizingType = tableRuntimeMeta.getOptimizingType(); - targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId(); - targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId(); - planTime = tableRuntimeMeta.getPlanTime(); - if (tableRuntimeMeta.getFromSequence() != null) { - fromSequence = tableRuntimeMeta.getFromSequence(); + public TableOptimizingProcess(TableRuntime tableRuntime) { + processId = tableRuntime.getProcessId(); + this.tableRuntime = tableRuntime; + optimizingType = tableRuntime.getOptimizingType(); + targetSnapshotId = tableRuntime.getTargetSnapshotId(); + targetChangeSnapshotId = tableRuntime.getTargetChangeSnapshotId(); + planTime = tableRuntime.getLastPlanTime(); + if (tableRuntime.getFromSequence() != null) { + fromSequence = tableRuntime.getFromSequence(); } - if (tableRuntimeMeta.getToSequence() != null) { - toSequence = tableRuntimeMeta.getToSequence(); + if (tableRuntime.getToSequence() != null) { + toSequence = tableRuntime.getToSequence(); } if (this.status != OptimizingProcess.Status.CLOSED) { - tableRuntimeMeta.getTableRuntime().recover(this); + tableRuntime.recover(this); } loadTaskRuntimes(this); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java index 7faca66261..5e34be9ea9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java @@ -19,20 +19,23 @@ package org.apache.amoro.server.optimizing; public enum OptimizingStatus { - FULL_OPTIMIZING("full", true), - MAJOR_OPTIMIZING("major", true), - MINOR_OPTIMIZING("minor", true), - COMMITTING("committing", true), - PLANNING("planning", false), - PENDING("pending", false), - IDLE("idle", false); + FULL_OPTIMIZING("full", true, 100), + MAJOR_OPTIMIZING("major", true, 200), + MINOR_OPTIMIZING("minor", true, 300), + COMMITTING("committing", true, 400), + PLANNING("planning", false, 500), + PENDING("pending", false, 600), + IDLE("idle", false, 700); private final String displayValue; private final boolean isProcessing; - OptimizingStatus(String displayValue, boolean isProcessing) { + private final int code; + + OptimizingStatus(String displayValue, boolean isProcessing, int code) { this.displayValue = displayValue; this.isProcessing = isProcessing; + this.code = code; } public boolean isProcessing() { @@ -42,4 +45,17 @@ public boolean isProcessing() { public String displayValue() { return displayValue; } + + public int getCode() { + return code; + } + + public static OptimizingStatus ofCode(int code) { + for (OptimizingStatus status : values()) { + if (status.getCode() == code) { + return status; + } + } + return null; + } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java index 6a59d34a8b..4560e5a4e7 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java @@ -33,7 +33,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.pool2.impl.BaseObjectPoolConfig; import org.apache.ibatis.jdbc.ScriptRunner; +import org.apache.ibatis.mapping.DatabaseIdProvider; import org.apache.ibatis.mapping.Environment; +import org.apache.ibatis.mapping.VendorDatabaseIdProvider; import org.apache.ibatis.session.Configuration; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; @@ -53,8 +55,10 @@ import java.nio.file.Paths; import java.sql.Connection; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; +import java.util.Properties; public class SqlSessionFactoryProvider { private static final Logger LOG = LoggerFactory.getLogger(SqlSessionFactoryProvider.class); @@ -73,7 +77,7 @@ public static SqlSessionFactoryProvider getInstance() { private volatile SqlSessionFactory sqlSessionFactory; - public void init(Configurations config) { + public void init(Configurations config) throws SQLException { BasicDataSource dataSource = new BasicDataSource(); dataSource.setUrl(config.getString(AmoroManagementConf.DB_CONNECTION_URL)); dataSource.setDriverClassName(config.getString(AmoroManagementConf.DB_DRIVER_CLASS_NAME)); @@ -111,6 +115,14 @@ public void init(Configurations config) { configuration.addMapper(PlatformFileMapper.class); configuration.addMapper(ResourceMapper.class); configuration.addMapper(TableBlockerMapper.class); + + DatabaseIdProvider provider = new VendorDatabaseIdProvider(); + Properties properties = new Properties(); + properties.setProperty("MySQL", "mysql"); + properties.setProperty("PostgreSQL", "postgres"); + properties.setProperty("Derby", "derby"); + provider.setProperties(properties); + configuration.setDatabaseId(provider.getDatabaseId(dataSource)); if (sqlSessionFactory == null) { synchronized (this) { if (sqlSessionFactory == null) { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java similarity index 93% rename from amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java rename to amoro-ams/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java index 9fd8c05ea5..d94db0a25d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.table; +package org.apache.amoro.server.persistence; import org.apache.amoro.TableFormat; import org.apache.amoro.config.TableConfiguration; @@ -27,6 +27,7 @@ import java.util.Map; +/** The class for table used when transfer data from/to database. */ public class TableRuntimeMeta { private long tableId; private String catalogName; @@ -58,24 +59,8 @@ public class TableRuntimeMeta { private Map fromSequence; private Map toSequence; - private TableRuntime tableRuntime; - public TableRuntimeMeta() {} - public TableRuntime constructTableRuntime(TableManager initializer) { - if (tableRuntime == null) { - tableRuntime = new TableRuntime(this, initializer); - } - return tableRuntime; - } - - public TableRuntime getTableRuntime() { - if (tableRuntime == null) { - throw new IllegalStateException("TableRuntime is not constructed yet."); - } - return tableRuntime; - } - public long getTargetSnapshotId() { return targetSnapshotId; } @@ -212,10 +197,6 @@ public void setTableSummary(OptimizingEvaluator.PendingInput tableSummary) { this.tableSummary = tableSummary; } - public void setTableRuntime(TableRuntime tableRuntime) { - this.tableRuntime = tableRuntime; - } - public void setLastOptimizedChangeSnapshotId(long lastOptimizedChangeSnapshotId) { this.lastOptimizedChangeSnapshotId = lastOptimizedChangeSnapshotId; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java new file mode 100644 index 0000000000..d314bb41bc --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence.converter; + +import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.ibatis.type.BaseTypeHandler; +import org.apache.ibatis.type.JdbcType; +import org.apache.ibatis.type.MappedJdbcTypes; +import org.apache.ibatis.type.MappedTypes; + +import java.sql.CallableStatement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +@MappedJdbcTypes(JdbcType.INTEGER) +@MappedTypes(Enum.class) +public class OptimizingStatusConverter extends BaseTypeHandler { + + @Override + public void setNonNullParameter( + PreparedStatement ps, int i, OptimizingStatus parameter, JdbcType jdbcType) + throws SQLException { + ps.setInt(i, parameter.getCode()); + } + + @Override + public OptimizingStatus getNullableResult(ResultSet rs, String columnName) throws SQLException { + String s = rs.getString(columnName); + return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s)); + } + + @Override + public OptimizingStatus getNullableResult(ResultSet rs, int columnIndex) throws SQLException { + String s = rs.getString(columnIndex); + return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s)); + } + + @Override + public OptimizingStatus getNullableResult(CallableStatement cs, int columnIndex) + throws SQLException { + String s = cs.getString(columnIndex); + return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s)); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java index f056dcb5e8..b6db210dc5 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java @@ -19,13 +19,14 @@ package org.apache.amoro.server.persistence.mapper; import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.converter.JsonObjectConverter; import org.apache.amoro.server.persistence.converter.Long2TsConverter; import org.apache.amoro.server.persistence.converter.Map2StringConverter; import org.apache.amoro.server.persistence.converter.MapLong2StringConverter; +import org.apache.amoro.server.persistence.converter.OptimizingStatusConverter; import org.apache.amoro.server.table.TableMetadata; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Options; @@ -321,7 +322,8 @@ List selectTableIdentifiersByCatalog( + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " last_full_optimizing_time = #{runtime.lastFullOptimizingTime," + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," - + " optimizing_status = #{runtime.optimizingStatus}," + + " optimizing_status_code = #{runtime.optimizingStatus," + + "typeHandler=org.apache.amoro.server.persistence.converter.OptimizingStatusConverter}," + " optimizing_status_start_time = #{runtime.currentStatusStartTime," + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " optimizing_process_id = #{runtime.processId}," @@ -342,7 +344,7 @@ List selectTableIdentifiersByCatalog( "INSERT INTO table_runtime (table_id, catalog_name, db_name, table_name, current_snapshot_id," + " current_change_snapshotId, last_optimized_snapshotId, last_optimized_change_snapshotId," + " last_major_optimizing_time, last_minor_optimizing_time," - + " last_full_optimizing_time, optimizing_status, optimizing_status_start_time, optimizing_process_id," + + " last_full_optimizing_time, optimizing_status_code, optimizing_status_start_time, optimizing_process_id," + " optimizer_group, table_config, pending_input, table_summary) VALUES" + " (#{runtime.tableIdentifier.id}, #{runtime.tableIdentifier.catalog}," + " #{runtime.tableIdentifier.database}, #{runtime.tableIdentifier.tableName}, #{runtime" @@ -354,7 +356,8 @@ List selectTableIdentifiersByCatalog( + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " #{runtime.lastFullOptimizingTime," + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," - + " #{runtime.optimizingStatus}," + + " #{runtime.optimizingStatus," + + " typeHandler=org.apache.amoro.server.persistence.converter.OptimizingStatusConverter}," + " #{runtime.currentStatusStartTime, " + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " #{runtime.processId}, #{runtime.optimizerGroup}," @@ -367,11 +370,11 @@ List selectTableIdentifiersByCatalog( void insertTableRuntime(@Param("runtime") TableRuntime runtime); @Select( - "SELECT a.table_id, a.catalog_name, a.db_name, a.table_name, i.format, a.current_snapshot_id, a" - + ".current_change_snapshotId, a.last_optimized_snapshotId, a.last_optimized_change_snapshotId," - + " a.last_major_optimizing_time, a.last_minor_optimizing_time, a.last_full_optimizing_time, a.optimizing_status," - + " a.optimizing_status_start_time, a.optimizing_process_id," - + " a.optimizer_group, a.table_config, a.pending_input, a.table_summary, b.optimizing_type, b.target_snapshot_id," + "SELECT a.table_id, a.catalog_name, a.db_name, a.table_name, i.format, a.current_snapshot_id," + + " a.current_change_snapshotId, a.last_optimized_snapshotId, a.last_optimized_change_snapshotId," + + " a.last_major_optimizing_time, a.last_minor_optimizing_time, a.last_full_optimizing_time," + + " a.optimizing_status_code, a.optimizing_status_start_time, a.optimizing_process_id," + + " a.optimizer_group, a.table_config, a.pending_input, b.optimizing_type, b.target_snapshot_id," + " b.target_change_snapshot_id, b.plan_time, b.from_sequence, b.to_sequence FROM table_runtime a" + " INNER JOIN table_identifier i ON a.table_id = i.table_id " + " LEFT JOIN table_optimizing_process b ON a.optimizing_process_id = b.process_id") @@ -399,7 +402,10 @@ List selectTableIdentifiersByCatalog( property = "lastFullOptimizingTime", column = "last_full_optimizing_time", typeHandler = Long2TsConverter.class), - @Result(property = "tableStatus", column = "optimizing_status"), + @Result( + property = "tableStatus", + column = "optimizing_status_code", + typeHandler = OptimizingStatusConverter.class), @Result( property = "currentStatusStartTime", column = "optimizing_status_start_time", @@ -420,7 +426,7 @@ List selectTableIdentifiersByCatalog( typeHandler = JsonObjectConverter.class), @Result(property = "optimizingType", column = "optimizing_type"), @Result(property = "targetSnapshotId", column = "target_snapshot_id"), - @Result(property = "targetChangeSnapshotId", column = "target_change_napshot_id"), + @Result(property = "targetChangeSnapshotId", column = "target_change_snapshot_id"), @Result(property = "planTime", column = "plan_time", typeHandler = Long2TsConverter.class), @Result( property = "fromSequence", @@ -432,4 +438,74 @@ List selectTableIdentifiersByCatalog( typeHandler = MapLong2StringConverter.class) }) List selectTableRuntimeMetas(); + + @Select( + "") + @Results({ + @Result(property = "tableId", column = "table_id"), + @Result(property = "catalogName", column = "catalog_name"), + @Result(property = "dbName", column = "db_name"), + @Result(property = "tableName", column = "table_name"), + @Result(property = "currentSnapshotId", column = "current_snapshot_id"), + @Result(property = "currentChangeSnapshotId", column = "current_change_snapshotId"), + @Result(property = "lastOptimizedSnapshotId", column = "last_optimized_snapshotId"), + @Result( + property = "lastOptimizedChangeSnapshotId", + column = "last_optimized_change_snapshotId"), + @Result( + property = "lastMajorOptimizingTime", + column = "last_major_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "lastMinorOptimizingTime", + column = "last_minor_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "lastFullOptimizingTime", + column = "last_full_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "tableStatus", + column = "optimizing_status_code", + typeHandler = OptimizingStatusConverter.class), + @Result( + property = "currentStatusStartTime", + column = "optimizing_status_start_time", + typeHandler = Long2TsConverter.class), + @Result(property = "optimizingProcessId", column = "optimizing_process_id"), + @Result(property = "optimizerGroup", column = "optimizer_group"), + @Result( + property = "pendingInput", + column = "pending_input", + typeHandler = JsonObjectConverter.class), + @Result( + property = "tableConfig", + column = "table_config", + typeHandler = JsonObjectConverter.class), + }) + List selectTableRuntimesForOptimizerGroup( + @Param("optimizerGroup") String optimizerGroup, + @Param("fuzzyDbName") String fuzzyDbName, + @Param("fuzzyTableName") String fuzzyTableName, + @Param("limitCount") int limitCount, + @Param("offsetNum") int offset); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index d513c8c9a2..c764d3e584 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -42,6 +42,7 @@ import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.persistence.StatedPersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; @@ -58,6 +59,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -142,12 +146,6 @@ public ServerCatalog getServerCatalog(String catalogName) { .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); } - @Override - public InternalCatalog getInternalCatalog(String catalogName) { - return Optional.ofNullable(internalCatalogMap.get(catalogName)) - .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); - } - @Override public void createCatalog(CatalogMeta catalogMeta) { checkStarted(); @@ -343,6 +341,26 @@ public List getBlockers(TableIdentifier tableIdentifier) { .collect(Collectors.toList()); } + @Override + public List getTableRuntimes( + String optimizerGroup, + @Nullable String fuzzyDbName, + @Nullable String fuzzyTableName, + int limit, + int offset) { + checkStarted(); + return getAs( + TableMetaMapper.class, + mapper -> + mapper.selectTableRuntimesForOptimizerGroup( + optimizerGroup, fuzzyDbName, fuzzyTableName, limit, offset)); + } + + public InternalCatalog getInternalCatalog(String catalogName) { + return Optional.ofNullable(internalCatalogMap.get(catalogName)) + .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); + } + @Override public void addHandlerChain(RuntimeHandlerChain handler) { checkNotStarted(); @@ -375,15 +393,17 @@ public void initialize() { List tableRuntimeMetaList = getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas); + List tableRuntimes = new ArrayList<>(tableRuntimeMetaList.size()); tableRuntimeMetaList.forEach( tableRuntimeMeta -> { - TableRuntime tableRuntime = tableRuntimeMeta.constructTableRuntime(this); - tableRuntimeMap.put(tableRuntime.getTableIdentifier(), tableRuntime); + TableRuntime tableRuntime = new TableRuntime(tableRuntimeMeta, this); + tableRuntimeMap.put(tableRuntimeMeta.getTableId(), tableRuntime); tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); + tableRuntimes.add(tableRuntime); }); if (headHandler != null) { - headHandler.initialize(tableRuntimeMetaList); + headHandler.initialize(tableRuntimes); } if (tableExplorerExecutors == null) { int threadCount = diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java index 51419878d6..7554eca0ae 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java @@ -51,16 +51,15 @@ protected void appendNext(RuntimeHandlerChain handler) { } } - public final void initialize(List tableRuntimeMetaList) { - List supportedtableRuntimeMetaList = - tableRuntimeMetaList.stream() - .filter( - tableRuntimeMeta -> formatSupported(tableRuntimeMeta.getTableRuntime().getFormat())) + public final void initialize(List tableRuntimes) { + List supportedtableRuntimeList = + tableRuntimes.stream() + .filter(runtime -> formatSupported(runtime.getFormat())) .collect(Collectors.toList()); - initHandler(supportedtableRuntimeMetaList); + initHandler(supportedtableRuntimeList); initialized = true; if (next != null) { - next.initialize(tableRuntimeMetaList); + next.initialize(tableRuntimes); } } @@ -147,7 +146,7 @@ protected abstract void handleConfigChanged( protected abstract void handleTableRemoved(TableRuntime tableRuntime); - protected abstract void initHandler(List tableRuntimeMetaList); + protected abstract void initHandler(List tableRuntimeList); protected abstract void doDispose(); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java index 475ab28f1d..3499b9306f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java @@ -33,6 +33,7 @@ import org.apache.amoro.server.optimizing.TaskRuntime; import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator; import org.apache.amoro.server.persistence.StatedPersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.OptimizingMapper; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; @@ -94,7 +95,16 @@ public class TableRuntime extends StatedPersistentBase { private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics; private final TableSummaryMetrics tableSummaryMetrics; - protected TableRuntime( + private long targetSnapshotId; + + private long targetChangeSnapshotId; + + private Map fromSequence; + private Map toSequence; + + private OptimizingType optimizingType; + + public TableRuntime( ServerTableIdentifier tableIdentifier, TableRuntimeHandler tableHandler, Map properties) { @@ -110,9 +120,10 @@ protected TableRuntime( tableSummaryMetrics = new TableSummaryMetrics(tableIdentifier); } - protected TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler tableHandler) { + public TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler tableHandler) { Preconditions.checkNotNull(tableRuntimeMeta, "TableRuntimeMeta must not be null."); Preconditions.checkNotNull(tableHandler, "TableRuntimeHandler must not be null."); + this.tableHandler = tableHandler; this.tableIdentifier = ServerTableIdentifier.of( @@ -146,6 +157,12 @@ protected TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler ta orphanFilesCleaningMetrics = new TableOrphanFilesCleaningMetrics(tableIdentifier); tableSummaryMetrics = new TableSummaryMetrics(tableIdentifier); tableSummaryMetrics.refresh(tableSummary); + + this.targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId(); + this.targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId(); + this.fromSequence = tableRuntimeMeta.getFromSequence(); + this.toSequence = tableRuntimeMeta.getToSequence(); + this.optimizingType = tableRuntimeMeta.getOptimizingType(); } public void recover(OptimizingProcess optimizingProcess) { @@ -531,6 +548,46 @@ public void setLastPlanTime(long lastPlanTime) { this.lastPlanTime = lastPlanTime; } + public long getTargetSnapshotId() { + return targetSnapshotId; + } + + public void setTargetSnapshotId(long targetSnapshotId) { + this.targetSnapshotId = targetSnapshotId; + } + + public long getTargetChangeSnapshotId() { + return targetChangeSnapshotId; + } + + public void setTargetChangeSnapshotId(long targetChangeSnapshotId) { + this.targetChangeSnapshotId = targetChangeSnapshotId; + } + + public Map getFromSequence() { + return fromSequence; + } + + public void setFromSequence(Map fromSequence) { + this.fromSequence = fromSequence; + } + + public Map getToSequence() { + return toSequence; + } + + public void setToSequence(Map toSequence) { + this.toSequence = toSequence; + } + + public long getProcessId() { + return processId; + } + + public OptimizingType getOptimizingType() { + return optimizingType; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -544,6 +601,10 @@ public String toString() { .add("lastFullOptimizingTime", lastFullOptimizingTime) .add("lastMinorOptimizingTime", lastMinorOptimizingTime) .add("tableConfiguration", tableConfiguration) + .add("targetSnapshotId", targetSnapshotId) + .add("targetChangeSnapshotId", targetChangeSnapshotId) + .add("fromSequence", fromSequence) + .add("toSequence", toSequence) .toString(); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java index 1cb564ac18..5cbd257018 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java @@ -23,6 +23,9 @@ import org.apache.amoro.api.Blocker; import org.apache.amoro.api.TableIdentifier; import org.apache.amoro.server.catalog.CatalogService; +import org.apache.amoro.server.persistence.TableRuntimeMeta; + +import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -88,4 +91,22 @@ Blocker block( * @return block list */ List getBlockers(TableIdentifier tableIdentifier); + + /** + * Get the table info from database for given parameters. + * + * @param optimizerGroup The optimizer group of the table associated to. will be if we want the + * info for all groups. + * @param fuzzyDbName the fuzzy db name used to filter the result, will be null if no filter set. + * @param fuzzyTableName the fuzzy table name used to filter the result, will be null if no filter + * set. + * @param limit How many entries we want to retrieve. + * @param offset The entries we'll skip when retrieving the entries. + */ + List getTableRuntimes( + String optimizerGroup, + @Nullable String fuzzyDbName, + @Nullable String fuzzyTableName, + int limit, + int offset); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java index 1d9002c357..ae70cadaa5 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java @@ -62,8 +62,8 @@ protected BaseTableExecutor(TableManager tableManager, int poolSize) { } @Override - protected void initHandler(List tableRuntimeMetaList) { - tableRuntimeMetaList.stream() + protected void initHandler(List tableRuntimeList) { + tableRuntimeList.stream() .filter(this::enabled) .forEach( tableRuntime -> { diff --git a/amoro-ams/src/main/resources/derby/ams-derby-init.sql b/amoro-ams/src/main/resources/derby/ams-derby-init.sql index f41cdaf276..ca874b317e 100644 --- a/amoro-ams/src/main/resources/derby/ams-derby-init.sql +++ b/amoro-ams/src/main/resources/derby/ams-derby-init.sql @@ -111,7 +111,7 @@ CREATE TABLE table_runtime ( last_major_optimizing_time TIMESTAMP, last_minor_optimizing_time TIMESTAMP, last_full_optimizing_time TIMESTAMP, - optimizing_status VARCHAR(20) DEFAULT 'IDLE', + optimizing_status_code INT DEFAULT 700, optimizing_status_start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, optimizing_process_id BIGINT NOT NULL, optimizer_group VARCHAR(64) NOT NULL, diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql index 76e16147a1..b2e41cf903 100644 --- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql +++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql @@ -122,7 +122,8 @@ CREATE TABLE `table_runtime` `last_major_optimizing_time` timestamp NULL DEFAULT NULL COMMENT 'Latest Major Optimize time for all partitions', `last_minor_optimizing_time` timestamp NULL DEFAULT NULL COMMENT 'Latest Minor Optimize time for all partitions', `last_full_optimizing_time` timestamp NULL DEFAULT NULL COMMENT 'Latest Full Optimize time for all partitions', - `optimizing_status` varchar(20) DEFAULT 'IDLE' COMMENT 'Table optimize status: FULL_OPTIMIZING, MAJOR_OPTIMIZING, MINOR_OPTIMIZING, COMMITTING, PENDING, IDLE', + `optimizing_status_code` int DEFAULT 700 COMMENT 'Table optimize status code: 100(FULL_OPTIMIZING),' || + ' 200(MAJOR_OPTIMIZING), 300(MINOR_OPTIMIZING), 400(COMMITTING), 500(PLANING), 600(PENDING), 700(IDLE)', `optimizing_status_start_time` timestamp default CURRENT_TIMESTAMP COMMENT 'Table optimize status start time', `optimizing_process_id` bigint(20) NOT NULL COMMENT 'optimizing_procedure UUID', `optimizer_group` varchar(64) NOT NULL, @@ -132,6 +133,7 @@ CREATE TABLE `table_runtime` `table_summary` mediumtext, PRIMARY KEY (`table_id`), UNIQUE KEY `table_index` (`catalog_name`,`db_name`,`table_name`) + INDEX idx_optimizer_status_and_time (optimizing_status_code, optimizing_status_start_time DESC); ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'Optimize running information of each table' ROW_FORMAT=DYNAMIC; CREATE TABLE `table_optimizing_process` diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql b/amoro-ams/src/main/resources/mysql/upgrade.sql index 31ab7604da..d075e0f7ca 100644 --- a/amoro-ams/src/main/resources/mysql/upgrade.sql +++ b/amoro-ams/src/main/resources/mysql/upgrade.sql @@ -23,4 +23,29 @@ ALTER TABLE `table_blocker` ADD COLUMN `prev_blocker_id` bigint(20) NOT NULL DEF ALTER TABLE `table_blocker` ADD UNIQUE KEY `uq_prev` (`catalog_name`,`db_name`,`table_name`, `prev_blocker_id`); -- ADD COLUMN table_summary FOR TABLE_RUNTIME -ALTER TABLE `table_runtime` ADD COLUMN `table_summary` mediumtext AFTER `pending_input`; \ No newline at end of file +ALTER TABLE `table_runtime` ADD COLUMN `table_summary` mediumtext AFTER `pending_input`; + +RENAME TABLE table_runtime TO table_runtime_backup; +CREATE TABLE table_runtime LIKE table_runtime_backup; + +ALTER TABLE table_runtime CHANGE COLUMN optimizing_status optimizing_status_code INT DEFAULT 700; +CREATE INDEX idx_optimizer_status_and_time ON table_runtime(optimizing_status_code, optimizing_status_start_time DESC); + +INSERT INTO table_runtime( + `table_id`,`catalog_name`, `db_name`, `table_name`, `current_snapshot_id`,`current_change_snapshotId`, `last_optimized_snapshotId`, + `last_optimized_change_snapshotId`, `last_major_optimizing_time`, `last_minor_optimizing_time`, `last_full_optimizing_time`, + `optimizing_status_code`, `optimizing_status_start_time`, `optimizing_process_id`, `optimizer_group`, `table_config`, + `optimizing_config`, `pending_input`, `table_summary`) +SELECT `table_id`,`catalog_name`, `db_name`, `table_name`, `current_snapshot_id`,`current_change_snapshotId`, `last_optimized_snapshotId`, + `last_optimized_change_snapshotId`, `last_major_optimizing_time`, `last_minor_optimizing_time`, `last_full_optimizing_time`, + CASE + WHEN `optimizing_status` = 'IDLE' THEN 700 + WHEN `optimizing_status` = 'PENDING' THEN 600 + WHEN `optimizing_status` = 'PLANNING' THEN 500 + WHEN `optimizing_status` = 'COMMITTING' THEN 400 + WHEN `optimizing_status` = 'MINOR_OPTIMIZING' THEN 300 + WHEN `optimizing_status` = 'MAJOR_OPTIMIZING' THEN 200 + WHEN `optimizing_status` = 'FULL_OPTIMIZING' THEN 100 + END, + `optimizing_status_start_time`, `optimizing_process_id`, `optimizer_group`, `table_config`, `optimizing_config`, `pending_input`, `table_summary` +FROM table_runtime_backup; \ No newline at end of file diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql index 83c3f6d0fe..a7d0ccf6cf 100644 --- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql +++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql @@ -182,7 +182,7 @@ CREATE TABLE table_runtime last_major_optimizing_time TIMESTAMP, last_minor_optimizing_time TIMESTAMP, last_full_optimizing_time TIMESTAMP, - optimizing_status VARCHAR(20) DEFAULT 'IDLE', + optimizing_status_code INT DEFAULT 700, optimizing_status_start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, optimizing_process_id BIGINT NOT NULL, optimizer_group VARCHAR(64) NOT NULL, @@ -205,7 +205,8 @@ COMMENT ON COLUMN table_runtime.last_optimized_change_snapshotId IS 'Last optimi COMMENT ON COLUMN table_runtime.last_major_optimizing_time IS 'Latest Major Optimize time for all partitions'; COMMENT ON COLUMN table_runtime.last_minor_optimizing_time IS 'Latest Minor Optimize time for all partitions'; COMMENT ON COLUMN table_runtime.last_full_optimizing_time IS 'Latest Full Optimize time for all partitions'; -COMMENT ON COLUMN table_runtime.optimizing_status IS 'Table optimize status: FULL_OPTIMIZING, MAJOR_OPTIMIZING, MINOR_OPTIMIZING, COMMITTING, PENDING, IDLE'; +COMMENT ON COLUMN table_runtime.optimizing_status_code IS 'Table optimize status code: 100(FULL_OPTIMIZING),' || + ' 200(MAJOR_OPTIMIZING), 300(MINOR_OPTIMIZING), 400(COMMITTING), 500(PLANING), 600(PENDING), 700(IDLE)'; COMMENT ON COLUMN table_runtime.optimizing_status_start_time IS 'Table optimize status start time'; COMMENT ON COLUMN table_runtime.optimizing_process_id IS 'Optimizing procedure UUID'; COMMENT ON COLUMN table_runtime.optimizer_group IS 'Optimizer group'; @@ -213,6 +214,7 @@ COMMENT ON COLUMN table_runtime.table_config IS 'Table-specific configuration'; COMMENT ON COLUMN table_runtime.optimizing_config IS 'Optimizing configuration'; COMMENT ON COLUMN table_runtime.pending_input IS 'Pending input data'; COMMENT ON COLUMN table_runtime.table_summary IS 'Table summary data'; +CREATE INDEX idx_optimizer_status_and_time ON table_runtime(optimizing_status_code, optimizing_status_start_time DESC); CREATE TABLE table_optimizing_process ( diff --git a/amoro-ams/src/main/resources/postgres/upgrade-0.7.0-to-0.7.1.sql b/amoro-ams/src/main/resources/postgres/upgrade-0.7.0-to-0.7.1.sql new file mode 100644 index 0000000000..96ef432160 --- /dev/null +++ b/amoro-ams/src/main/resources/postgres/upgrade-0.7.0-to-0.7.1.sql @@ -0,0 +1,39 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +RENAME TABLE table_runtime TO table_runtime_backup; +CREATE TABLE table_runtime LIKE table_runtime_backup; + +ALTER TABLE table_runtime CHANGE COLUMN optimizing_status optimizing_status_code INT DEFAULT 7; +CREATE INDEX idx_optimizer_status_and_time ON table_runtime(optimizing_status_code, optimizing_status_start_time DESC); + +INSERT INTO table_runtime( + `table_id`,`catalog_name`, `db_name`, `table_name`, `current_snapshot_id`,`current_change_snapshotId`, `last_optimized_snapshotId`, + `last_optimized_change_snapshotId`, `last_major_optimizing_time`, `last_minor_optimizing_time`, `last_full_optimizing_time`, + `optimizing_status_code`, `optimizing_status_start_time`, `optimizing_process_id`, `optimizer_group`, `table_config`, + `optimizing_config`, `pending_input`) +SELECT `table_id`,`catalog_name`, `db_name`, `table_name`, `current_snapshot_id`,`current_change_snapshotId`, `last_optimized_snapshotId`, + `last_optimized_change_snapshotId`, `last_major_optimizing_time`, `last_minor_optimizing_time`, `last_full_optimizing_time`, + CASE + WHEN `optimizing_status` = 'IDLE' THEN 700 + WHEN `optimizing_status` = 'PENDING' THEN 600 + WHEN `optimizing_status` = 'PLANNING' THEN 500 + WHEN `optimizing_status` = 'COMMITTING' THEN 400 + WHEN `optimizing_status` = 'MINOR_OPTIMIZING' THEN 300 + WHEN `optimizing_status` = 'MAJOR_OPTIMIZING' THEN 200 + WHEN `optimizing_status` = 'FULL_OPTIMIZING' THEN 100 + END, + `optimizing_status_start_time`, `optimizing_process_id`, `optimizer_group`, `table_config`, `optimizing_config`, `pending_input` +FROM table_runtime_backup; \ No newline at end of file diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java index 96a8d789c1..1b59308b9d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java @@ -92,7 +92,7 @@ private void initTableWithFiles() { .asUnkeyedTable(); appendData(table, 1); appendData(table, 2); - TableRuntime runtime = tableService().getRuntime(serverTableIdentifier()); + TableRuntime runtime = tableService().getRuntime(serverTableIdentifier().getId()); runtime.refresh(tableService().loadTable(serverTableIdentifier())); } @@ -110,7 +110,7 @@ private void appendData(UnkeyedTable table, int id) { void refreshPending() { TableRuntimeRefreshExecutor refresher = new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE); - refresher.execute(tableService().getRuntime(serverTableIdentifier())); + refresher.execute(tableService().getRuntime(serverTableIdentifier().getId())); refresher.dispose(); } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java new file mode 100644 index 0000000000..cadce03273 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.optimizing; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class OptimizingStatusTest { + @Test + public void testOptimizingStatusCodeValue() { + assertEquals(7, OptimizingStatus.values().length); + + assertEquals(OptimizingStatus.FULL_OPTIMIZING, OptimizingStatus.ofCode(100)); + assertEquals(OptimizingStatus.MAJOR_OPTIMIZING, OptimizingStatus.ofCode(200)); + assertEquals(OptimizingStatus.MINOR_OPTIMIZING, OptimizingStatus.ofCode(300)); + assertEquals(OptimizingStatus.COMMITTING, OptimizingStatus.ofCode(400)); + assertEquals(OptimizingStatus.PLANNING, OptimizingStatus.ofCode(500)); + assertEquals(OptimizingStatus.PENDING, OptimizingStatus.ofCode(600)); + assertEquals(OptimizingStatus.IDLE, OptimizingStatus.ofCode(700)); + } +} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java index 0032af9bc1..24ce7c0366 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java @@ -46,13 +46,13 @@ import org.apache.amoro.resource.ResourceGroup; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.metrics.MetricRegistry; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.OptimizerThread; import org.apache.amoro.server.resource.QuotaProvider; import org.apache.amoro.server.table.AMSTableTestBase; import org.apache.amoro.server.table.TableConfigurations; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.MixedTable; @@ -105,13 +105,13 @@ protected static ResourceGroup testResourceGroup() { return new ResourceGroup.Builder("test", "local").build(); } - protected OptimizingQueue buildOptimizingGroupService(TableRuntimeMeta tableRuntimeMeta) { + protected OptimizingQueue buildOptimizingGroupService(TableRuntime tableRuntime) { return new OptimizingQueue( tableService(), testResourceGroup(), quotaProvider, planExecutor, - Collections.singletonList(tableRuntimeMeta), + Collections.singletonList(tableRuntime), 1); } @@ -127,7 +127,7 @@ private OptimizingQueue buildOptimizingGroupService() { @Test public void testPollNoTask() { - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntimeMeta = buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup()); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); Assert.assertNull(queue.pollTask(0)); @@ -138,25 +138,25 @@ public void testPollNoTask() { public void testRefreshAndReleaseTable() { OptimizingQueue queue = buildOptimizingGroupService(); Assert.assertEquals(0, queue.getSchedulingPolicy().getTableRuntimeMap().size()); - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntime = buildTableRuntimeMeta(OptimizingStatus.IDLE, defaultResourceGroup()); - queue.refreshTable(tableRuntimeMeta.getTableRuntime()); + queue.refreshTable(tableRuntime); Assert.assertEquals(1, queue.getSchedulingPolicy().getTableRuntimeMap().size()); Assert.assertTrue( queue.getSchedulingPolicy().getTableRuntimeMap().containsKey(serverTableIdentifier())); - queue.releaseTable(tableRuntimeMeta.getTableRuntime()); + queue.releaseTable(tableRuntime); Assert.assertEquals(0, queue.getSchedulingPolicy().getTableRuntimeMap().size()); - queue.refreshTable(tableRuntimeMeta.getTableRuntime()); + queue.refreshTable(tableRuntime); Assert.assertEquals(1, queue.getSchedulingPolicy().getTableRuntimeMap().size()); queue.dispose(); } @Test public void testPollTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); // 1.poll task TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -169,7 +169,7 @@ public void testPollTask() { @Test public void testRetryTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); + TableRuntime tableRuntimeMeta = initTableWithFiles(); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); // 1.poll task @@ -202,8 +202,8 @@ public void testRetryTask() { @Test public void testCommitTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); Assert.assertEquals(0, queue.collectTasks().size()); TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -218,11 +218,11 @@ public void testCommitTask() { Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus()); // 7.commit - OptimizingProcess optimizingProcess = tableRuntimeMeta.getTableRuntime().getOptimizingProcess(); + OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess(); Assert.assertEquals(OptimizingProcess.Status.RUNNING, optimizingProcess.getStatus()); optimizingProcess.commit(); Assert.assertEquals(OptimizingProcess.Status.SUCCESS, optimizingProcess.getStatus()); - Assert.assertNull(tableRuntimeMeta.getTableRuntime().getOptimizingProcess()); + Assert.assertNull(tableRuntime.getOptimizingProcess()); // 8.commit again, throw exceptions, and status not changed. Assert.assertThrows(IllegalStateException.class, optimizingProcess::commit); @@ -234,8 +234,8 @@ public void testCommitTask() { @Test public void testCollectingTasks() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); Assert.assertEquals(0, queue.collectTasks().size()); TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -249,8 +249,8 @@ public void testCollectingTasks() { @Test public void testTaskAndTableMetrics() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); MetricRegistry registry = MetricManager.getInstance().getGlobalRegistry(); Map tagValues = ImmutableMap.of(GROUP_TAG, testResourceGroup().getName()); @@ -315,7 +315,7 @@ public void testTaskAndTableMetrics() { Assert.assertEquals(0, idleTablesGauge.getValue().longValue()); Assert.assertEquals(1, committingTablesGauge.getValue().longValue()); - OptimizingProcess optimizingProcess = tableRuntimeMeta.getTableRuntime().getOptimizingProcess(); + OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess(); optimizingProcess.commit(); Assert.assertEquals(0, queueTasksGauge.getValue().longValue()); Assert.assertEquals(0, executingTasksGauge.getValue().longValue()); @@ -363,21 +363,19 @@ public void testAddAndRemoveOptimizers() { queue.dispose(); } - protected TableRuntimeMeta initTableWithFiles() { + protected TableRuntime initTableWithFiles() { MixedTable mixedTable = (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); appendData(mixedTable.asUnkeyedTable(), 1); appendData(mixedTable.asUnkeyedTable(), 2); - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntime = buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup()); - TableRuntime runtime = tableRuntimeMeta.getTableRuntime(); - runtime.refresh(tableService().loadTable(serverTableIdentifier())); - return tableRuntimeMeta; + tableRuntime.refresh(tableService().loadTable(serverTableIdentifier())); + return tableRuntime; } - private TableRuntimeMeta buildTableRuntimeMeta( - OptimizingStatus status, ResourceGroup resourceGroup) { + private TableRuntime buildTableRuntimeMeta(OptimizingStatus status, ResourceGroup resourceGroup) { MixedTable mixedTable = (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta(); @@ -389,8 +387,7 @@ private TableRuntimeMeta buildTableRuntimeMeta( tableRuntimeMeta.setTableStatus(status); tableRuntimeMeta.setTableConfig(TableConfigurations.parseTableConfig(mixedTable.properties())); tableRuntimeMeta.setOptimizerGroup(resourceGroup.getName()); - tableRuntimeMeta.constructTableRuntime(tableService()); - return tableRuntimeMeta; + return new TableRuntime(tableRuntimeMeta, tableService()); } private void appendData(UnkeyedTable table, int id) { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/DerbyPersistence.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/DerbyPersistence.java index 7c88fbf8b7..96ca0b9aed 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/DerbyPersistence.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/DerbyPersistence.java @@ -28,8 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.UncheckedIOException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -62,8 +60,8 @@ public class DerbyPersistence extends ExternalResource { LOG.info("Deleted resources in derby persistent."); })); truncateAllTables(); - } catch (IOException e) { - throw new UncheckedIOException(e); + } catch (Exception e) { + throw new RuntimeException(e); } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java index 30aea4055d..641ac0e866 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java @@ -132,7 +132,7 @@ protected DefaultTableService tableService() { static class TestHandler extends RuntimeHandlerChain { - private final List initTables = Lists.newArrayList(); + private final List initTables = Lists.newArrayList(); private final List> statusChangedTables = Lists.newArrayList(); private final List> configChangedTables = @@ -163,8 +163,8 @@ protected void handleTableRemoved(TableRuntime tableRuntime) { } @Override - protected void initHandler(List tableRuntimeMetaList) { - initTables.addAll(tableRuntimeMetaList); + protected void initHandler(List tableRuntimeList) { + initTables.addAll(tableRuntimeList); } @Override @@ -172,7 +172,7 @@ protected void doDispose() { disposed = true; } - public List getInitTables() { + public List getInitTables() { return initTables; } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java index 4549c676f9..b6b8ce1e77 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java @@ -69,27 +69,6 @@ public void testLoadTable() { "unknown", "unknown", "unknown", serverTableIdentifier().getFormat()))); } - @Test - public void testTableContains() { - Assert.assertTrue(tableService().contains(serverTableIdentifier().getId())); - ServerTableIdentifier copyId = - ServerTableIdentifier.of( - null, - serverTableIdentifier().getCatalog(), - serverTableIdentifier().getDatabase(), - serverTableIdentifier().getTableName(), - serverTableIdentifier().getFormat()); - Assert.assertFalse(tableService().contains(copyId.getId())); - copyId = - ServerTableIdentifier.of( - serverTableIdentifier().getId(), - serverTableIdentifier().getCatalog(), - serverTableIdentifier().getDatabase(), - "unknown", - serverTableIdentifier().getFormat()); - Assert.assertFalse(tableService().contains(copyId.getId())); - } - @Test public void testTableRuntime() { TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier().getId()); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java index 9bc914ef78..d8d3b9d196 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java @@ -104,7 +104,7 @@ private void initTableWithFiles() { .asUnkeyedTable(); appendData(table); appendPosDelete(table); - TableRuntime runtime = tableService().getRuntime(serverTableIdentifier()); + TableRuntime runtime = tableService().getRuntime(serverTableIdentifier().getId()); runtime.refresh(tableService().loadTable(serverTableIdentifier())); } @@ -142,7 +142,7 @@ private void appendPosDelete(UnkeyedTable table) { void refreshPending() { TableRuntimeRefreshExecutor refresher = new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE); - refresher.execute(tableService().getRuntime(serverTableIdentifier())); + refresher.execute(tableService().getRuntime(serverTableIdentifier().getId())); refresher.dispose(); }