From 589ffcf35d4d288dd8d3b73f0b264ce2a72792d3 Mon Sep 17 00:00:00 2001 From: klion26 Date: Wed, 31 Jul 2024 17:35:21 +0800 Subject: [PATCH 1/2] [AMORO-2893] Using tableId as key for cache in TableService tableId is unique for every table we can simplify the key using tableId, so that we don't need pass other parameters(maybe need retrieve from db) when retrieve info from cache --- .../dashboard/controller/TableController.java | 6 ++++-- .../server/table/DefaultTableService.java | 21 +++++++++---------- .../amoro/server/table/TableManager.java | 6 +++--- .../table/executor/BaseTableExecutor.java | 9 ++++---- .../server/RestCatalogServiceTestBase.java | 2 +- .../server/TestDefaultOptimizingService.java | 11 ++++++---- .../server/table/TestTableRuntimeHandler.java | 5 +++-- .../server/table/TestTableRuntimeManager.java | 8 +++---- .../amoro/server/table/TestTableService.java | 2 +- 9 files changed, 37 insertions(+), 33 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java index 43a71e785d..754329fa7f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java @@ -148,7 +148,7 @@ public void getTableDetail(Context ctx) { tableService.getServerTableIdentifier( TableIdentifier.of(catalog, database, tableName).buildTableIdentifier())); if (serverTableIdentifier.isPresent()) { - TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get()); + TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get().getId()); tableSummary.setOptimizingStatus(tableRuntime.getOptimizingStatus().name()); OptimizingEvaluator.PendingInput tableRuntimeSummary = tableRuntime.getTableSummary(); if (tableRuntimeSummary != null) { @@ -656,7 +656,9 @@ public void cancelOptimizingProcess(Context ctx) { tableService.getServerTableIdentifier( TableIdentifier.of(catalog, db, table).buildTableIdentifier()); TableRuntime tableRuntime = - serverTableIdentifier != null ? tableService.getRuntime(serverTableIdentifier) : null; + serverTableIdentifier != null + ? tableService.getRuntime(serverTableIdentifier.getId()) + : null; Preconditions.checkArgument( tableRuntime != null diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index d872f3ce3c..d513c8c9a2 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -85,8 +85,7 @@ public class DefaultTableService extends StatedPersistentBase implements TableSe private final Map internalCatalogMap = new ConcurrentHashMap<>(); private final Map externalCatalogMap = new ConcurrentHashMap<>(); - private final Map tableRuntimeMap = - new ConcurrentHashMap<>(); + private final Map tableRuntimeMap = new ConcurrentHashMap<>(); private final ScheduledExecutorService tableExplorerScheduler = Executors.newSingleThreadScheduledExecutor( @@ -213,7 +212,7 @@ public void dropTableMetadata(TableIdentifier tableIdentifier, boolean deleteDat } ServerTableIdentifier serverTableIdentifier = internalCatalog.dropTable(database, table); - Optional.ofNullable(tableRuntimeMap.remove(serverTableIdentifier)) + Optional.ofNullable(tableRuntimeMap.remove(serverTableIdentifier.getId())) .ifPresent( tableRuntime -> { if (headHandler != null) { @@ -411,7 +410,7 @@ public void initialize() { private TableRuntime getAndCheckExist(ServerTableIdentifier tableIdentifier) { Preconditions.checkArgument(tableIdentifier != null, "tableIdentifier cannot be null"); - TableRuntime tableRuntime = getRuntime(tableIdentifier); + TableRuntime tableRuntime = getRuntime(tableIdentifier.getId()); if (tableRuntime == null) { throw new ObjectNotExistsException(tableIdentifier); } @@ -447,15 +446,15 @@ private ServerTableIdentifier getOrSyncServerTableIdentifier(TableIdentifier id) } @Override - public TableRuntime getRuntime(ServerTableIdentifier tableIdentifier) { + public TableRuntime getRuntime(Long tableId) { checkStarted(); - return tableRuntimeMap.get(tableIdentifier); + return tableRuntimeMap.get(tableId); } @Override - public boolean contains(ServerTableIdentifier tableIdentifier) { + public boolean contains(Long tableId) { checkStarted(); - return tableRuntimeMap.containsKey(tableIdentifier); + return tableRuntimeMap.containsKey(tableId); } public void dispose() { @@ -645,7 +644,7 @@ private boolean triggerTableAdded( } } TableRuntime tableRuntime = new TableRuntime(serverTableIdentifier, this, table.properties()); - tableRuntimeMap.put(serverTableIdentifier, tableRuntime); + tableRuntimeMap.put(serverTableIdentifier.getId(), tableRuntime); tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); if (headHandler != null) { headHandler.fireTableAdded(table, tableRuntime); @@ -659,7 +658,7 @@ private void revertTableRuntimeAdded( externalCatalog.getServerTableIdentifier( tableIdentity.getDatabase(), tableIdentity.getTableName()); if (tableIdentifier != null) { - tableRuntimeMap.remove(tableIdentifier); + tableRuntimeMap.remove(tableIdentifier.getId()); } } @@ -671,7 +670,7 @@ private void disposeTable(ServerTableIdentifier tableIdentifier) { tableIdentifier.getCatalog(), tableIdentifier.getDatabase(), tableIdentifier.getTableName())); - Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier)) + Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier.getId())) .ifPresent( tableRuntime -> { if (headHandler != null) { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableManager.java index 4c8c3a8d01..4735bca2d7 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableManager.java @@ -31,9 +31,9 @@ public interface TableManager extends TableRuntimeHandler { */ AmoroTable loadTable(ServerTableIdentifier tableIdentifier); - TableRuntime getRuntime(ServerTableIdentifier tableIdentifier); + TableRuntime getRuntime(Long tableId); - default boolean contains(ServerTableIdentifier tableIdentifier) { - return getRuntime(tableIdentifier) != null; + default boolean contains(Long tableId) { + return getRuntime(tableId) != null; } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java index e2125fbfb4..1d9002c357 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java @@ -25,7 +25,6 @@ import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -63,10 +62,9 @@ protected BaseTableExecutor(TableManager tableManager, int poolSize) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeMetaList) { tableRuntimeMetaList.stream() - .map(tableRuntimeMeta -> tableRuntimeMeta.getTableRuntime()) - .filter(tableRuntime -> enabled(tableRuntime)) + .filter(this::enabled) .forEach( tableRuntime -> { if (scheduledTables.add(tableRuntime.getTableIdentifier())) { @@ -109,7 +107,8 @@ protected String getThreadName() { } private boolean isExecutable(TableRuntime tableRuntime) { - return tableManager.contains(tableRuntime.getTableIdentifier()) && enabled(tableRuntime); + return tableManager.contains(tableRuntime.getTableIdentifier().getId()) + && enabled(tableRuntime); } @Override diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java index 63b2073aaf..99b1853fa7 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/RestCatalogServiceTestBase.java @@ -120,7 +120,7 @@ protected TableMetadata getTableMetadata(TableIdentifier identifier) { protected TableRuntime getTableRuntime(TableIdentifier identifier) { ServerTableIdentifier serverTableIdentifier = getServerTableIdentifier(identifier); - return tableService.getRuntime(serverTableIdentifier); + return tableService.getRuntime(serverTableIdentifier.getId()); } protected void assertTableRuntime(TableIdentifier identifier, TableFormat format) { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index 776a7a72cb..5a237700d7 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -114,7 +114,7 @@ private void initTableWithFiles() { (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); appendData(mixedTable.asUnkeyedTable(), 1); appendData(mixedTable.asUnkeyedTable(), 2); - TableRuntime runtime = tableService().getRuntime(serverTableIdentifier()); + TableRuntime runtime = tableService().getRuntime(serverTableIdentifier().getId()); runtime.refresh(tableService().loadTable(serverTableIdentifier())); } @@ -387,10 +387,13 @@ private void assertTaskCompleted(TaskRuntime taskRuntime) { 0, optimizingService().listTasks(defaultResourceGroup().getName()).size()); Assertions.assertEquals( OptimizingProcess.Status.RUNNING, - tableService().getRuntime(serverTableIdentifier()).getOptimizingProcess().getStatus()); + tableService() + .getRuntime(serverTableIdentifier().getId()) + .getOptimizingProcess() + .getStatus()); Assertions.assertEquals( OptimizingStatus.COMMITTING, - tableService().getRuntime(serverTableIdentifier()).getOptimizingStatus()); + tableService().getRuntime(serverTableIdentifier().getId()).getOptimizingStatus()); } protected void reload() { @@ -415,7 +418,7 @@ public TableRuntimeRefresher() { } void refreshPending() { - execute(tableService().getRuntime(serverTableIdentifier())); + execute(tableService().getRuntime(serverTableIdentifier().getId())); } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java index f629012981..30aea4055d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java @@ -92,14 +92,15 @@ public void testInitialize() throws Exception { tableService.initialize(); Assert.assertEquals(1, handler.getInitTables().size()); Assert.assertEquals( - createTableId.getId().longValue(), handler.getInitTables().get(0).getTableId()); + (Long) createTableId.getId().longValue(), + handler.getInitTables().get(0).getTableIdentifier().getId()); // test change properties MixedTable mixedTable = (MixedTable) tableService().loadTable(createTableId).originalTable(); mixedTable.updateProperties().set(TableProperties.ENABLE_ORPHAN_CLEAN, "true").commit(); tableService() - .getRuntime(createTableId) + .getRuntime(createTableId.getId()) .refresh(tableService.loadTable(serverTableIdentifier())); Assert.assertEquals(1, handler.getConfigChangedTables().size()); validateTableRuntime(handler.getConfigChangedTables().get(0).first()); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java index 277e8e0e34..4549c676f9 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java @@ -71,7 +71,7 @@ public void testLoadTable() { @Test public void testTableContains() { - Assert.assertTrue(tableService().contains(serverTableIdentifier())); + Assert.assertTrue(tableService().contains(serverTableIdentifier().getId())); ServerTableIdentifier copyId = ServerTableIdentifier.of( null, @@ -79,7 +79,7 @@ public void testTableContains() { serverTableIdentifier().getDatabase(), serverTableIdentifier().getTableName(), serverTableIdentifier().getFormat()); - Assert.assertFalse(tableService().contains(copyId)); + Assert.assertFalse(tableService().contains(copyId.getId())); copyId = ServerTableIdentifier.of( serverTableIdentifier().getId(), @@ -87,12 +87,12 @@ public void testTableContains() { serverTableIdentifier().getDatabase(), "unknown", serverTableIdentifier().getFormat()); - Assert.assertFalse(tableService().contains(copyId)); + Assert.assertFalse(tableService().contains(copyId.getId())); } @Test public void testTableRuntime() { - TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier()); + TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier().getId()); validateTableRuntime(tableRuntime); } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java index dea85f1a73..cbf8010c6e 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java @@ -341,7 +341,7 @@ private boolean isBlocked(BlockableOperation operation) { } private boolean isTableRuntimeBlocked(BlockableOperation operation) { - return tableService().getRuntime(serverTableIdentifier()).isBlocked(operation); + return tableService().getRuntime(serverTableIdentifier().getId()).isBlocked(operation); } private void assertBlockerCnt(int i) { From a75639f2046a57848a405fe783db10eb58c4a370 Mon Sep 17 00:00:00 2001 From: klion26 Date: Wed, 31 Jul 2024 17:36:10 +0800 Subject: [PATCH 2/2] [AMORO-2893] Optimize the efficiency for restapi retrieve optimizing tables Befor the change we'll retrieve all tables from db and sort in the memory, this will be slow if there are many entriesin the db, after this change we sort in the db and only retrieve the needed entries. --- .../amoro/server/AmoroServiceContainer.java | 7 +- .../server/DefaultOptimizingService.java | 15 ++- .../controller/OptimizerController.java | 42 +++----- .../server/optimizing/OptimizingQueue.java | 39 ++++---- .../server/optimizing/OptimizingStatus.java | 32 ++++-- .../SqlSessionFactoryProvider.java | 14 ++- .../TableRuntimeMeta.java | 23 +---- .../converter/OptimizingStatusConverter.java | 61 ++++++++++++ .../persistence/mapper/TableMetaMapper.java | 98 ++++++++++++++++--- .../server/table/DefaultTableService.java | 38 +++++-- .../server/table/RuntimeHandlerChain.java | 15 ++- .../amoro/server/table/TableRuntime.java | 65 +++++++++++- .../amoro/server/table/TableService.java | 21 ++++ .../table/executor/BaseTableExecutor.java | 4 +- .../main/resources/derby/ams-derby-init.sql | 2 +- .../main/resources/mysql/ams-mysql-init.sql | 4 +- .../src/main/resources/mysql/upgrade.sql | 27 ++++- .../resources/postgres/ams-postgres-init.sql | 6 +- .../postgres/upgrade-0.7.0-to-0.7.1.sql | 39 ++++++++ .../server/dashboard/TestOverviewCache.java | 4 +- .../optimizing/OptimizingStatusTest.java | 38 +++++++ .../optimizing/TestOptimizingQueue.java | 55 +++++------ .../amoro/server/table/DerbyPersistence.java | 6 +- .../server/table/TestTableRuntimeHandler.java | 8 +- .../server/table/TestTableRuntimeManager.java | 21 ---- .../server/table/TestTableSummaryMetrics.java | 4 +- 26 files changed, 496 insertions(+), 192 deletions(-) rename amoro-ams/src/main/java/org/apache/amoro/server/{table => persistence}/TableRuntimeMeta.java (93%) create mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java create mode 100644 amoro-ams/src/main/resources/postgres/upgrade-0.7.0-to-0.7.1.sql create mode 100644 amoro-ams/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 1dd763f324..20904ec268 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -66,7 +66,6 @@ import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; -import java.io.IOException; import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.Paths; @@ -215,7 +214,7 @@ public void dispose() { MetricManager.dispose(); } - private void initConfig() throws IOException { + private void initConfig() throws Exception { LOG.info("initializing configurations..."); new ConfigurationHelper().init(); } @@ -407,14 +406,14 @@ private class ConfigurationHelper { private JsonNode yamlConfig; - public void init() throws IOException { + public void init() throws Exception { Map envConfig = initEnvConfig(); initServiceConfig(envConfig); setIcebergSystemProperties(); initContainerConfig(); } - private void initServiceConfig(Map envConfig) throws IOException { + private void initServiceConfig(Map envConfig) throws Exception { LOG.info("initializing service configuration..."); String configPath = Environments.getConfigPath() + "/" + SERVER_CONFIG_FILENAME; LOG.info("load config from path: {}", configPath); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index d77b7f9170..1b206ca6e9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -49,7 +49,6 @@ import org.apache.amoro.server.table.DefaultTableService; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; @@ -121,24 +120,24 @@ public RuntimeHandlerChain getTableRuntimeHandler() { return tableHandlerChain; } - private void loadOptimizingQueues(List tableRuntimeMetaList) { + private void loadOptimizingQueues(List tableRuntimeMetaList) { List optimizerGroups = getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups); List optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll); - Map> groupToTableRuntimes = + Map> groupToTableRuntimes = tableRuntimeMetaList.stream() - .collect(Collectors.groupingBy(TableRuntimeMeta::getOptimizerGroup)); + .collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup)); optimizerGroups.forEach( group -> { String groupName = group.getName(); - List tableRuntimeMetas = groupToTableRuntimes.remove(groupName); + List tableRuntimes = groupToTableRuntimes.remove(groupName); OptimizingQueue optimizingQueue = new OptimizingQueue( tableService, group, this, planExecutor, - Optional.ofNullable(tableRuntimeMetas).orElseGet(ArrayList::new), + Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new), maxPlanningParallelism); optimizingQueueByGroup.put(groupName, optimizingQueue); }); @@ -456,9 +455,9 @@ public void handleTableRemoved(TableRuntime tableRuntime) { } @Override - protected void initHandler(List tableRuntimeMetaList) { + protected void initHandler(List tableRuntimeList) { LOG.info("OptimizerManagementService begin initializing"); - loadOptimizingQueues(tableRuntimeMetaList); + loadOptimizingQueues(tableRuntimeList); optimizerKeeper.start(); LOG.info("SuspendingDetector for Optimizer has been started."); LOG.info("OptimizerManagementService initializing has completed"); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java index aa5abf386d..c5770be20b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java @@ -19,7 +19,6 @@ package org.apache.amoro.server.dashboard.controller; import io.javalin.http.Context; -import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.resource.Resource; import org.apache.amoro.resource.ResourceGroup; import org.apache.amoro.resource.ResourceType; @@ -30,13 +29,13 @@ import org.apache.amoro.server.dashboard.response.OkResponse; import org.apache.amoro.server.dashboard.response.PageResult; import org.apache.amoro.server.dashboard.utils.OptimizingUtil; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.ResourceContainers; import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.server.table.TableService; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; import javax.ws.rs.BadRequestException; @@ -67,34 +66,17 @@ public void getOptimizerTables(Context ctx) { Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20); int offset = (page - 1) * pageSize; - List tableRuntimes = new ArrayList<>(); - List tables = tableService.listManagedTables(); - for (ServerTableIdentifier identifier : tables) { - TableRuntime tableRuntime = tableService.getRuntime(identifier); - if (tableRuntime == null) { - continue; - } - if ((ALL_GROUP.equals(optimizerGroup) - || tableRuntime.getOptimizerGroup().equals(optimizerGroup)) - && (StringUtils.isEmpty(dbFilterStr) - || StringUtils.containsIgnoreCase(identifier.getDatabase(), dbFilterStr)) - && (StringUtils.isEmpty(tableFilterStr) - || StringUtils.containsIgnoreCase(identifier.getTableName(), tableFilterStr))) { - tableRuntimes.add(tableRuntime); - } - } - tableRuntimes.sort( - (o1, o2) -> { - // first we compare the status , and then we compare the start time when status are equal; - int statDiff = o1.getOptimizingStatus().compareTo(o2.getOptimizingStatus()); - // status order is asc, startTime order is desc - if (statDiff == 0) { - long timeDiff = o1.getCurrentStatusStartTime() - o2.getCurrentStatusStartTime(); - return timeDiff >= 0 ? (timeDiff == 0 ? 0 : -1) : 1; - } else { - return statDiff; - } - }); + String optimizerGroupUsedInDbFilter = ALL_GROUP.equals(optimizerGroup) ? null : optimizerGroup; + // get all info from underlying table table_runtime + List tableRuntimeBeans = + tableService.getTableRuntimes( + optimizerGroupUsedInDbFilter, dbFilterStr, tableFilterStr, pageSize, offset); + + List tableRuntimes = + tableRuntimeBeans.stream() + .map(meta -> tableService.getRuntime(meta.getTableId())) + .collect(Collectors.toList()); + PageResult amsPageResult = PageResult.of(tableRuntimes, offset, pageSize, OptimizingUtil::buildTableOptimizeInfo); ctx.json(OkResponse.of(amsPageResult)); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index dd192c1298..77ec96cfc1 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -36,7 +36,6 @@ import org.apache.amoro.server.resource.QuotaProvider; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; @@ -93,7 +92,7 @@ public OptimizingQueue( ResourceGroup optimizerGroup, QuotaProvider quotaProvider, Executor planExecutor, - List tableRuntimeMetaList, + List tableRuntimeList, int maxPlanningParallelism) { Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null"); this.planExecutor = planExecutor; @@ -106,14 +105,12 @@ public OptimizingQueue( new OptimizerGroupMetrics( optimizerGroup.getName(), MetricManager.getInstance().getGlobalRegistry(), this); this.metrics.register(); - tableRuntimeMetaList.forEach(this::initTableRuntime); + tableRuntimeList.forEach(this::initTableRuntime); } - private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) { - TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime(); - if (tableRuntime.getOptimizingStatus().isProcessing() - && tableRuntimeMeta.getOptimizingProcessId() != 0) { - tableRuntime.recover(new TableOptimizingProcess(tableRuntimeMeta)); + private void initTableRuntime(TableRuntime tableRuntime) { + if (tableRuntime.getOptimizingStatus().isProcessing() && tableRuntime.getProcessId() != 0) { + tableRuntime.recover(new TableOptimizingProcess(tableRuntime)); } if (tableRuntime.isOptimizingEnabled()) { @@ -122,7 +119,7 @@ private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) { if (!tableRuntime.getOptimizingStatus().isProcessing()) { scheduler.addTable(tableRuntime); } else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) { - tableQueue.offer(new TableOptimizingProcess(tableRuntimeMeta)); + tableQueue.offer(new TableOptimizingProcess(tableRuntime)); } } else { OptimizingProcess process = tableRuntime.getOptimizingProcess(); @@ -387,21 +384,21 @@ public TableOptimizingProcess(OptimizingPlanner planner) { beginAndPersistProcess(); } - public TableOptimizingProcess(TableRuntimeMeta tableRuntimeMeta) { - processId = tableRuntimeMeta.getOptimizingProcessId(); - tableRuntime = tableRuntimeMeta.getTableRuntime(); - optimizingType = tableRuntimeMeta.getOptimizingType(); - targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId(); - targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId(); - planTime = tableRuntimeMeta.getPlanTime(); - if (tableRuntimeMeta.getFromSequence() != null) { - fromSequence = tableRuntimeMeta.getFromSequence(); + public TableOptimizingProcess(TableRuntime tableRuntime) { + processId = tableRuntime.getProcessId(); + this.tableRuntime = tableRuntime; + optimizingType = tableRuntime.getOptimizingType(); + targetSnapshotId = tableRuntime.getTargetSnapshotId(); + targetChangeSnapshotId = tableRuntime.getTargetChangeSnapshotId(); + planTime = tableRuntime.getLastPlanTime(); + if (tableRuntime.getFromSequence() != null) { + fromSequence = tableRuntime.getFromSequence(); } - if (tableRuntimeMeta.getToSequence() != null) { - toSequence = tableRuntimeMeta.getToSequence(); + if (tableRuntime.getToSequence() != null) { + toSequence = tableRuntime.getToSequence(); } if (this.status != OptimizingProcess.Status.CLOSED) { - tableRuntimeMeta.getTableRuntime().recover(this); + tableRuntime.recover(this); } loadTaskRuntimes(this); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java index 7faca66261..5e34be9ea9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingStatus.java @@ -19,20 +19,23 @@ package org.apache.amoro.server.optimizing; public enum OptimizingStatus { - FULL_OPTIMIZING("full", true), - MAJOR_OPTIMIZING("major", true), - MINOR_OPTIMIZING("minor", true), - COMMITTING("committing", true), - PLANNING("planning", false), - PENDING("pending", false), - IDLE("idle", false); + FULL_OPTIMIZING("full", true, 100), + MAJOR_OPTIMIZING("major", true, 200), + MINOR_OPTIMIZING("minor", true, 300), + COMMITTING("committing", true, 400), + PLANNING("planning", false, 500), + PENDING("pending", false, 600), + IDLE("idle", false, 700); private final String displayValue; private final boolean isProcessing; - OptimizingStatus(String displayValue, boolean isProcessing) { + private final int code; + + OptimizingStatus(String displayValue, boolean isProcessing, int code) { this.displayValue = displayValue; this.isProcessing = isProcessing; + this.code = code; } public boolean isProcessing() { @@ -42,4 +45,17 @@ public boolean isProcessing() { public String displayValue() { return displayValue; } + + public int getCode() { + return code; + } + + public static OptimizingStatus ofCode(int code) { + for (OptimizingStatus status : values()) { + if (status.getCode() == code) { + return status; + } + } + return null; + } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java index 6a59d34a8b..4560e5a4e7 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java @@ -33,7 +33,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.pool2.impl.BaseObjectPoolConfig; import org.apache.ibatis.jdbc.ScriptRunner; +import org.apache.ibatis.mapping.DatabaseIdProvider; import org.apache.ibatis.mapping.Environment; +import org.apache.ibatis.mapping.VendorDatabaseIdProvider; import org.apache.ibatis.session.Configuration; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; @@ -53,8 +55,10 @@ import java.nio.file.Paths; import java.sql.Connection; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; +import java.util.Properties; public class SqlSessionFactoryProvider { private static final Logger LOG = LoggerFactory.getLogger(SqlSessionFactoryProvider.class); @@ -73,7 +77,7 @@ public static SqlSessionFactoryProvider getInstance() { private volatile SqlSessionFactory sqlSessionFactory; - public void init(Configurations config) { + public void init(Configurations config) throws SQLException { BasicDataSource dataSource = new BasicDataSource(); dataSource.setUrl(config.getString(AmoroManagementConf.DB_CONNECTION_URL)); dataSource.setDriverClassName(config.getString(AmoroManagementConf.DB_DRIVER_CLASS_NAME)); @@ -111,6 +115,14 @@ public void init(Configurations config) { configuration.addMapper(PlatformFileMapper.class); configuration.addMapper(ResourceMapper.class); configuration.addMapper(TableBlockerMapper.class); + + DatabaseIdProvider provider = new VendorDatabaseIdProvider(); + Properties properties = new Properties(); + properties.setProperty("MySQL", "mysql"); + properties.setProperty("PostgreSQL", "postgres"); + properties.setProperty("Derby", "derby"); + provider.setProperties(properties); + configuration.setDatabaseId(provider.getDatabaseId(dataSource)); if (sqlSessionFactory == null) { synchronized (this) { if (sqlSessionFactory == null) { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java similarity index 93% rename from amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java rename to amoro-ams/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java index 9fd8c05ea5..d94db0a25d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeMeta.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TableRuntimeMeta.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.table; +package org.apache.amoro.server.persistence; import org.apache.amoro.TableFormat; import org.apache.amoro.config.TableConfiguration; @@ -27,6 +27,7 @@ import java.util.Map; +/** The class for table used when transfer data from/to database. */ public class TableRuntimeMeta { private long tableId; private String catalogName; @@ -58,24 +59,8 @@ public class TableRuntimeMeta { private Map fromSequence; private Map toSequence; - private TableRuntime tableRuntime; - public TableRuntimeMeta() {} - public TableRuntime constructTableRuntime(TableManager initializer) { - if (tableRuntime == null) { - tableRuntime = new TableRuntime(this, initializer); - } - return tableRuntime; - } - - public TableRuntime getTableRuntime() { - if (tableRuntime == null) { - throw new IllegalStateException("TableRuntime is not constructed yet."); - } - return tableRuntime; - } - public long getTargetSnapshotId() { return targetSnapshotId; } @@ -212,10 +197,6 @@ public void setTableSummary(OptimizingEvaluator.PendingInput tableSummary) { this.tableSummary = tableSummary; } - public void setTableRuntime(TableRuntime tableRuntime) { - this.tableRuntime = tableRuntime; - } - public void setLastOptimizedChangeSnapshotId(long lastOptimizedChangeSnapshotId) { this.lastOptimizedChangeSnapshotId = lastOptimizedChangeSnapshotId; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java new file mode 100644 index 0000000000..d314bb41bc --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/OptimizingStatusConverter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence.converter; + +import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.ibatis.type.BaseTypeHandler; +import org.apache.ibatis.type.JdbcType; +import org.apache.ibatis.type.MappedJdbcTypes; +import org.apache.ibatis.type.MappedTypes; + +import java.sql.CallableStatement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +@MappedJdbcTypes(JdbcType.INTEGER) +@MappedTypes(Enum.class) +public class OptimizingStatusConverter extends BaseTypeHandler { + + @Override + public void setNonNullParameter( + PreparedStatement ps, int i, OptimizingStatus parameter, JdbcType jdbcType) + throws SQLException { + ps.setInt(i, parameter.getCode()); + } + + @Override + public OptimizingStatus getNullableResult(ResultSet rs, String columnName) throws SQLException { + String s = rs.getString(columnName); + return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s)); + } + + @Override + public OptimizingStatus getNullableResult(ResultSet rs, int columnIndex) throws SQLException { + String s = rs.getString(columnIndex); + return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s)); + } + + @Override + public OptimizingStatus getNullableResult(CallableStatement cs, int columnIndex) + throws SQLException { + String s = cs.getString(columnIndex); + return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s)); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java index f056dcb5e8..b6db210dc5 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java @@ -19,13 +19,14 @@ package org.apache.amoro.server.persistence.mapper; import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.converter.JsonObjectConverter; import org.apache.amoro.server.persistence.converter.Long2TsConverter; import org.apache.amoro.server.persistence.converter.Map2StringConverter; import org.apache.amoro.server.persistence.converter.MapLong2StringConverter; +import org.apache.amoro.server.persistence.converter.OptimizingStatusConverter; import org.apache.amoro.server.table.TableMetadata; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Options; @@ -321,7 +322,8 @@ List selectTableIdentifiersByCatalog( + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " last_full_optimizing_time = #{runtime.lastFullOptimizingTime," + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," - + " optimizing_status = #{runtime.optimizingStatus}," + + " optimizing_status_code = #{runtime.optimizingStatus," + + "typeHandler=org.apache.amoro.server.persistence.converter.OptimizingStatusConverter}," + " optimizing_status_start_time = #{runtime.currentStatusStartTime," + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " optimizing_process_id = #{runtime.processId}," @@ -342,7 +344,7 @@ List selectTableIdentifiersByCatalog( "INSERT INTO table_runtime (table_id, catalog_name, db_name, table_name, current_snapshot_id," + " current_change_snapshotId, last_optimized_snapshotId, last_optimized_change_snapshotId," + " last_major_optimizing_time, last_minor_optimizing_time," - + " last_full_optimizing_time, optimizing_status, optimizing_status_start_time, optimizing_process_id," + + " last_full_optimizing_time, optimizing_status_code, optimizing_status_start_time, optimizing_process_id," + " optimizer_group, table_config, pending_input, table_summary) VALUES" + " (#{runtime.tableIdentifier.id}, #{runtime.tableIdentifier.catalog}," + " #{runtime.tableIdentifier.database}, #{runtime.tableIdentifier.tableName}, #{runtime" @@ -354,7 +356,8 @@ List selectTableIdentifiersByCatalog( + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " #{runtime.lastFullOptimizingTime," + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," - + " #{runtime.optimizingStatus}," + + " #{runtime.optimizingStatus," + + " typeHandler=org.apache.amoro.server.persistence.converter.OptimizingStatusConverter}," + " #{runtime.currentStatusStartTime, " + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + " #{runtime.processId}, #{runtime.optimizerGroup}," @@ -367,11 +370,11 @@ List selectTableIdentifiersByCatalog( void insertTableRuntime(@Param("runtime") TableRuntime runtime); @Select( - "SELECT a.table_id, a.catalog_name, a.db_name, a.table_name, i.format, a.current_snapshot_id, a" - + ".current_change_snapshotId, a.last_optimized_snapshotId, a.last_optimized_change_snapshotId," - + " a.last_major_optimizing_time, a.last_minor_optimizing_time, a.last_full_optimizing_time, a.optimizing_status," - + " a.optimizing_status_start_time, a.optimizing_process_id," - + " a.optimizer_group, a.table_config, a.pending_input, a.table_summary, b.optimizing_type, b.target_snapshot_id," + "SELECT a.table_id, a.catalog_name, a.db_name, a.table_name, i.format, a.current_snapshot_id," + + " a.current_change_snapshotId, a.last_optimized_snapshotId, a.last_optimized_change_snapshotId," + + " a.last_major_optimizing_time, a.last_minor_optimizing_time, a.last_full_optimizing_time," + + " a.optimizing_status_code, a.optimizing_status_start_time, a.optimizing_process_id," + + " a.optimizer_group, a.table_config, a.pending_input, b.optimizing_type, b.target_snapshot_id," + " b.target_change_snapshot_id, b.plan_time, b.from_sequence, b.to_sequence FROM table_runtime a" + " INNER JOIN table_identifier i ON a.table_id = i.table_id " + " LEFT JOIN table_optimizing_process b ON a.optimizing_process_id = b.process_id") @@ -399,7 +402,10 @@ List selectTableIdentifiersByCatalog( property = "lastFullOptimizingTime", column = "last_full_optimizing_time", typeHandler = Long2TsConverter.class), - @Result(property = "tableStatus", column = "optimizing_status"), + @Result( + property = "tableStatus", + column = "optimizing_status_code", + typeHandler = OptimizingStatusConverter.class), @Result( property = "currentStatusStartTime", column = "optimizing_status_start_time", @@ -420,7 +426,7 @@ List selectTableIdentifiersByCatalog( typeHandler = JsonObjectConverter.class), @Result(property = "optimizingType", column = "optimizing_type"), @Result(property = "targetSnapshotId", column = "target_snapshot_id"), - @Result(property = "targetChangeSnapshotId", column = "target_change_napshot_id"), + @Result(property = "targetChangeSnapshotId", column = "target_change_snapshot_id"), @Result(property = "planTime", column = "plan_time", typeHandler = Long2TsConverter.class), @Result( property = "fromSequence", @@ -432,4 +438,74 @@ List selectTableIdentifiersByCatalog( typeHandler = MapLong2StringConverter.class) }) List selectTableRuntimeMetas(); + + @Select( + "") + @Results({ + @Result(property = "tableId", column = "table_id"), + @Result(property = "catalogName", column = "catalog_name"), + @Result(property = "dbName", column = "db_name"), + @Result(property = "tableName", column = "table_name"), + @Result(property = "currentSnapshotId", column = "current_snapshot_id"), + @Result(property = "currentChangeSnapshotId", column = "current_change_snapshotId"), + @Result(property = "lastOptimizedSnapshotId", column = "last_optimized_snapshotId"), + @Result( + property = "lastOptimizedChangeSnapshotId", + column = "last_optimized_change_snapshotId"), + @Result( + property = "lastMajorOptimizingTime", + column = "last_major_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "lastMinorOptimizingTime", + column = "last_minor_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "lastFullOptimizingTime", + column = "last_full_optimizing_time", + typeHandler = Long2TsConverter.class), + @Result( + property = "tableStatus", + column = "optimizing_status_code", + typeHandler = OptimizingStatusConverter.class), + @Result( + property = "currentStatusStartTime", + column = "optimizing_status_start_time", + typeHandler = Long2TsConverter.class), + @Result(property = "optimizingProcessId", column = "optimizing_process_id"), + @Result(property = "optimizerGroup", column = "optimizer_group"), + @Result( + property = "pendingInput", + column = "pending_input", + typeHandler = JsonObjectConverter.class), + @Result( + property = "tableConfig", + column = "table_config", + typeHandler = JsonObjectConverter.class), + }) + List selectTableRuntimesForOptimizerGroup( + @Param("optimizerGroup") String optimizerGroup, + @Param("fuzzyDbName") String fuzzyDbName, + @Param("fuzzyTableName") String fuzzyTableName, + @Param("limitCount") int limitCount, + @Param("offsetNum") int offset); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index d513c8c9a2..c764d3e584 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -42,6 +42,7 @@ import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.persistence.StatedPersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; @@ -58,6 +59,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -142,12 +146,6 @@ public ServerCatalog getServerCatalog(String catalogName) { .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); } - @Override - public InternalCatalog getInternalCatalog(String catalogName) { - return Optional.ofNullable(internalCatalogMap.get(catalogName)) - .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); - } - @Override public void createCatalog(CatalogMeta catalogMeta) { checkStarted(); @@ -343,6 +341,26 @@ public List getBlockers(TableIdentifier tableIdentifier) { .collect(Collectors.toList()); } + @Override + public List getTableRuntimes( + String optimizerGroup, + @Nullable String fuzzyDbName, + @Nullable String fuzzyTableName, + int limit, + int offset) { + checkStarted(); + return getAs( + TableMetaMapper.class, + mapper -> + mapper.selectTableRuntimesForOptimizerGroup( + optimizerGroup, fuzzyDbName, fuzzyTableName, limit, offset)); + } + + public InternalCatalog getInternalCatalog(String catalogName) { + return Optional.ofNullable(internalCatalogMap.get(catalogName)) + .orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName)); + } + @Override public void addHandlerChain(RuntimeHandlerChain handler) { checkNotStarted(); @@ -375,15 +393,17 @@ public void initialize() { List tableRuntimeMetaList = getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas); + List tableRuntimes = new ArrayList<>(tableRuntimeMetaList.size()); tableRuntimeMetaList.forEach( tableRuntimeMeta -> { - TableRuntime tableRuntime = tableRuntimeMeta.constructTableRuntime(this); - tableRuntimeMap.put(tableRuntime.getTableIdentifier(), tableRuntime); + TableRuntime tableRuntime = new TableRuntime(tableRuntimeMeta, this); + tableRuntimeMap.put(tableRuntimeMeta.getTableId(), tableRuntime); tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); + tableRuntimes.add(tableRuntime); }); if (headHandler != null) { - headHandler.initialize(tableRuntimeMetaList); + headHandler.initialize(tableRuntimes); } if (tableExplorerExecutors == null) { int threadCount = diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java index 51419878d6..7554eca0ae 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/RuntimeHandlerChain.java @@ -51,16 +51,15 @@ protected void appendNext(RuntimeHandlerChain handler) { } } - public final void initialize(List tableRuntimeMetaList) { - List supportedtableRuntimeMetaList = - tableRuntimeMetaList.stream() - .filter( - tableRuntimeMeta -> formatSupported(tableRuntimeMeta.getTableRuntime().getFormat())) + public final void initialize(List tableRuntimes) { + List supportedtableRuntimeList = + tableRuntimes.stream() + .filter(runtime -> formatSupported(runtime.getFormat())) .collect(Collectors.toList()); - initHandler(supportedtableRuntimeMetaList); + initHandler(supportedtableRuntimeList); initialized = true; if (next != null) { - next.initialize(tableRuntimeMetaList); + next.initialize(tableRuntimes); } } @@ -147,7 +146,7 @@ protected abstract void handleConfigChanged( protected abstract void handleTableRemoved(TableRuntime tableRuntime); - protected abstract void initHandler(List tableRuntimeMetaList); + protected abstract void initHandler(List tableRuntimeList); protected abstract void doDispose(); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java index 475ab28f1d..3499b9306f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntime.java @@ -33,6 +33,7 @@ import org.apache.amoro.server.optimizing.TaskRuntime; import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator; import org.apache.amoro.server.persistence.StatedPersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.persistence.mapper.OptimizingMapper; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; @@ -94,7 +95,16 @@ public class TableRuntime extends StatedPersistentBase { private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics; private final TableSummaryMetrics tableSummaryMetrics; - protected TableRuntime( + private long targetSnapshotId; + + private long targetChangeSnapshotId; + + private Map fromSequence; + private Map toSequence; + + private OptimizingType optimizingType; + + public TableRuntime( ServerTableIdentifier tableIdentifier, TableRuntimeHandler tableHandler, Map properties) { @@ -110,9 +120,10 @@ protected TableRuntime( tableSummaryMetrics = new TableSummaryMetrics(tableIdentifier); } - protected TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler tableHandler) { + public TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler tableHandler) { Preconditions.checkNotNull(tableRuntimeMeta, "TableRuntimeMeta must not be null."); Preconditions.checkNotNull(tableHandler, "TableRuntimeHandler must not be null."); + this.tableHandler = tableHandler; this.tableIdentifier = ServerTableIdentifier.of( @@ -146,6 +157,12 @@ protected TableRuntime(TableRuntimeMeta tableRuntimeMeta, TableRuntimeHandler ta orphanFilesCleaningMetrics = new TableOrphanFilesCleaningMetrics(tableIdentifier); tableSummaryMetrics = new TableSummaryMetrics(tableIdentifier); tableSummaryMetrics.refresh(tableSummary); + + this.targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId(); + this.targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId(); + this.fromSequence = tableRuntimeMeta.getFromSequence(); + this.toSequence = tableRuntimeMeta.getToSequence(); + this.optimizingType = tableRuntimeMeta.getOptimizingType(); } public void recover(OptimizingProcess optimizingProcess) { @@ -531,6 +548,46 @@ public void setLastPlanTime(long lastPlanTime) { this.lastPlanTime = lastPlanTime; } + public long getTargetSnapshotId() { + return targetSnapshotId; + } + + public void setTargetSnapshotId(long targetSnapshotId) { + this.targetSnapshotId = targetSnapshotId; + } + + public long getTargetChangeSnapshotId() { + return targetChangeSnapshotId; + } + + public void setTargetChangeSnapshotId(long targetChangeSnapshotId) { + this.targetChangeSnapshotId = targetChangeSnapshotId; + } + + public Map getFromSequence() { + return fromSequence; + } + + public void setFromSequence(Map fromSequence) { + this.fromSequence = fromSequence; + } + + public Map getToSequence() { + return toSequence; + } + + public void setToSequence(Map toSequence) { + this.toSequence = toSequence; + } + + public long getProcessId() { + return processId; + } + + public OptimizingType getOptimizingType() { + return optimizingType; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -544,6 +601,10 @@ public String toString() { .add("lastFullOptimizingTime", lastFullOptimizingTime) .add("lastMinorOptimizingTime", lastMinorOptimizingTime) .add("tableConfiguration", tableConfiguration) + .add("targetSnapshotId", targetSnapshotId) + .add("targetChangeSnapshotId", targetChangeSnapshotId) + .add("fromSequence", fromSequence) + .add("toSequence", toSequence) .toString(); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java index 1cb564ac18..5cbd257018 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java @@ -23,6 +23,9 @@ import org.apache.amoro.api.Blocker; import org.apache.amoro.api.TableIdentifier; import org.apache.amoro.server.catalog.CatalogService; +import org.apache.amoro.server.persistence.TableRuntimeMeta; + +import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -88,4 +91,22 @@ Blocker block( * @return block list */ List getBlockers(TableIdentifier tableIdentifier); + + /** + * Get the table info from database for given parameters. + * + * @param optimizerGroup The optimizer group of the table associated to. will be if we want the + * info for all groups. + * @param fuzzyDbName the fuzzy db name used to filter the result, will be null if no filter set. + * @param fuzzyTableName the fuzzy table name used to filter the result, will be null if no filter + * set. + * @param limit How many entries we want to retrieve. + * @param offset The entries we'll skip when retrieving the entries. + */ + List getTableRuntimes( + String optimizerGroup, + @Nullable String fuzzyDbName, + @Nullable String fuzzyTableName, + int limit, + int offset); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java index 1d9002c357..ae70cadaa5 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/BaseTableExecutor.java @@ -62,8 +62,8 @@ protected BaseTableExecutor(TableManager tableManager, int poolSize) { } @Override - protected void initHandler(List tableRuntimeMetaList) { - tableRuntimeMetaList.stream() + protected void initHandler(List tableRuntimeList) { + tableRuntimeList.stream() .filter(this::enabled) .forEach( tableRuntime -> { diff --git a/amoro-ams/src/main/resources/derby/ams-derby-init.sql b/amoro-ams/src/main/resources/derby/ams-derby-init.sql index f41cdaf276..ca874b317e 100644 --- a/amoro-ams/src/main/resources/derby/ams-derby-init.sql +++ b/amoro-ams/src/main/resources/derby/ams-derby-init.sql @@ -111,7 +111,7 @@ CREATE TABLE table_runtime ( last_major_optimizing_time TIMESTAMP, last_minor_optimizing_time TIMESTAMP, last_full_optimizing_time TIMESTAMP, - optimizing_status VARCHAR(20) DEFAULT 'IDLE', + optimizing_status_code INT DEFAULT 700, optimizing_status_start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, optimizing_process_id BIGINT NOT NULL, optimizer_group VARCHAR(64) NOT NULL, diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql index 76e16147a1..b2e41cf903 100644 --- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql +++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql @@ -122,7 +122,8 @@ CREATE TABLE `table_runtime` `last_major_optimizing_time` timestamp NULL DEFAULT NULL COMMENT 'Latest Major Optimize time for all partitions', `last_minor_optimizing_time` timestamp NULL DEFAULT NULL COMMENT 'Latest Minor Optimize time for all partitions', `last_full_optimizing_time` timestamp NULL DEFAULT NULL COMMENT 'Latest Full Optimize time for all partitions', - `optimizing_status` varchar(20) DEFAULT 'IDLE' COMMENT 'Table optimize status: FULL_OPTIMIZING, MAJOR_OPTIMIZING, MINOR_OPTIMIZING, COMMITTING, PENDING, IDLE', + `optimizing_status_code` int DEFAULT 700 COMMENT 'Table optimize status code: 100(FULL_OPTIMIZING),' || + ' 200(MAJOR_OPTIMIZING), 300(MINOR_OPTIMIZING), 400(COMMITTING), 500(PLANING), 600(PENDING), 700(IDLE)', `optimizing_status_start_time` timestamp default CURRENT_TIMESTAMP COMMENT 'Table optimize status start time', `optimizing_process_id` bigint(20) NOT NULL COMMENT 'optimizing_procedure UUID', `optimizer_group` varchar(64) NOT NULL, @@ -132,6 +133,7 @@ CREATE TABLE `table_runtime` `table_summary` mediumtext, PRIMARY KEY (`table_id`), UNIQUE KEY `table_index` (`catalog_name`,`db_name`,`table_name`) + INDEX idx_optimizer_status_and_time (optimizing_status_code, optimizing_status_start_time DESC); ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'Optimize running information of each table' ROW_FORMAT=DYNAMIC; CREATE TABLE `table_optimizing_process` diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql b/amoro-ams/src/main/resources/mysql/upgrade.sql index 31ab7604da..d075e0f7ca 100644 --- a/amoro-ams/src/main/resources/mysql/upgrade.sql +++ b/amoro-ams/src/main/resources/mysql/upgrade.sql @@ -23,4 +23,29 @@ ALTER TABLE `table_blocker` ADD COLUMN `prev_blocker_id` bigint(20) NOT NULL DEF ALTER TABLE `table_blocker` ADD UNIQUE KEY `uq_prev` (`catalog_name`,`db_name`,`table_name`, `prev_blocker_id`); -- ADD COLUMN table_summary FOR TABLE_RUNTIME -ALTER TABLE `table_runtime` ADD COLUMN `table_summary` mediumtext AFTER `pending_input`; \ No newline at end of file +ALTER TABLE `table_runtime` ADD COLUMN `table_summary` mediumtext AFTER `pending_input`; + +RENAME TABLE table_runtime TO table_runtime_backup; +CREATE TABLE table_runtime LIKE table_runtime_backup; + +ALTER TABLE table_runtime CHANGE COLUMN optimizing_status optimizing_status_code INT DEFAULT 700; +CREATE INDEX idx_optimizer_status_and_time ON table_runtime(optimizing_status_code, optimizing_status_start_time DESC); + +INSERT INTO table_runtime( + `table_id`,`catalog_name`, `db_name`, `table_name`, `current_snapshot_id`,`current_change_snapshotId`, `last_optimized_snapshotId`, + `last_optimized_change_snapshotId`, `last_major_optimizing_time`, `last_minor_optimizing_time`, `last_full_optimizing_time`, + `optimizing_status_code`, `optimizing_status_start_time`, `optimizing_process_id`, `optimizer_group`, `table_config`, + `optimizing_config`, `pending_input`, `table_summary`) +SELECT `table_id`,`catalog_name`, `db_name`, `table_name`, `current_snapshot_id`,`current_change_snapshotId`, `last_optimized_snapshotId`, + `last_optimized_change_snapshotId`, `last_major_optimizing_time`, `last_minor_optimizing_time`, `last_full_optimizing_time`, + CASE + WHEN `optimizing_status` = 'IDLE' THEN 700 + WHEN `optimizing_status` = 'PENDING' THEN 600 + WHEN `optimizing_status` = 'PLANNING' THEN 500 + WHEN `optimizing_status` = 'COMMITTING' THEN 400 + WHEN `optimizing_status` = 'MINOR_OPTIMIZING' THEN 300 + WHEN `optimizing_status` = 'MAJOR_OPTIMIZING' THEN 200 + WHEN `optimizing_status` = 'FULL_OPTIMIZING' THEN 100 + END, + `optimizing_status_start_time`, `optimizing_process_id`, `optimizer_group`, `table_config`, `optimizing_config`, `pending_input`, `table_summary` +FROM table_runtime_backup; \ No newline at end of file diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql index 83c3f6d0fe..a7d0ccf6cf 100644 --- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql +++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql @@ -182,7 +182,7 @@ CREATE TABLE table_runtime last_major_optimizing_time TIMESTAMP, last_minor_optimizing_time TIMESTAMP, last_full_optimizing_time TIMESTAMP, - optimizing_status VARCHAR(20) DEFAULT 'IDLE', + optimizing_status_code INT DEFAULT 700, optimizing_status_start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, optimizing_process_id BIGINT NOT NULL, optimizer_group VARCHAR(64) NOT NULL, @@ -205,7 +205,8 @@ COMMENT ON COLUMN table_runtime.last_optimized_change_snapshotId IS 'Last optimi COMMENT ON COLUMN table_runtime.last_major_optimizing_time IS 'Latest Major Optimize time for all partitions'; COMMENT ON COLUMN table_runtime.last_minor_optimizing_time IS 'Latest Minor Optimize time for all partitions'; COMMENT ON COLUMN table_runtime.last_full_optimizing_time IS 'Latest Full Optimize time for all partitions'; -COMMENT ON COLUMN table_runtime.optimizing_status IS 'Table optimize status: FULL_OPTIMIZING, MAJOR_OPTIMIZING, MINOR_OPTIMIZING, COMMITTING, PENDING, IDLE'; +COMMENT ON COLUMN table_runtime.optimizing_status_code IS 'Table optimize status code: 100(FULL_OPTIMIZING),' || + ' 200(MAJOR_OPTIMIZING), 300(MINOR_OPTIMIZING), 400(COMMITTING), 500(PLANING), 600(PENDING), 700(IDLE)'; COMMENT ON COLUMN table_runtime.optimizing_status_start_time IS 'Table optimize status start time'; COMMENT ON COLUMN table_runtime.optimizing_process_id IS 'Optimizing procedure UUID'; COMMENT ON COLUMN table_runtime.optimizer_group IS 'Optimizer group'; @@ -213,6 +214,7 @@ COMMENT ON COLUMN table_runtime.table_config IS 'Table-specific configuration'; COMMENT ON COLUMN table_runtime.optimizing_config IS 'Optimizing configuration'; COMMENT ON COLUMN table_runtime.pending_input IS 'Pending input data'; COMMENT ON COLUMN table_runtime.table_summary IS 'Table summary data'; +CREATE INDEX idx_optimizer_status_and_time ON table_runtime(optimizing_status_code, optimizing_status_start_time DESC); CREATE TABLE table_optimizing_process ( diff --git a/amoro-ams/src/main/resources/postgres/upgrade-0.7.0-to-0.7.1.sql b/amoro-ams/src/main/resources/postgres/upgrade-0.7.0-to-0.7.1.sql new file mode 100644 index 0000000000..96ef432160 --- /dev/null +++ b/amoro-ams/src/main/resources/postgres/upgrade-0.7.0-to-0.7.1.sql @@ -0,0 +1,39 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +RENAME TABLE table_runtime TO table_runtime_backup; +CREATE TABLE table_runtime LIKE table_runtime_backup; + +ALTER TABLE table_runtime CHANGE COLUMN optimizing_status optimizing_status_code INT DEFAULT 7; +CREATE INDEX idx_optimizer_status_and_time ON table_runtime(optimizing_status_code, optimizing_status_start_time DESC); + +INSERT INTO table_runtime( + `table_id`,`catalog_name`, `db_name`, `table_name`, `current_snapshot_id`,`current_change_snapshotId`, `last_optimized_snapshotId`, + `last_optimized_change_snapshotId`, `last_major_optimizing_time`, `last_minor_optimizing_time`, `last_full_optimizing_time`, + `optimizing_status_code`, `optimizing_status_start_time`, `optimizing_process_id`, `optimizer_group`, `table_config`, + `optimizing_config`, `pending_input`) +SELECT `table_id`,`catalog_name`, `db_name`, `table_name`, `current_snapshot_id`,`current_change_snapshotId`, `last_optimized_snapshotId`, + `last_optimized_change_snapshotId`, `last_major_optimizing_time`, `last_minor_optimizing_time`, `last_full_optimizing_time`, + CASE + WHEN `optimizing_status` = 'IDLE' THEN 700 + WHEN `optimizing_status` = 'PENDING' THEN 600 + WHEN `optimizing_status` = 'PLANNING' THEN 500 + WHEN `optimizing_status` = 'COMMITTING' THEN 400 + WHEN `optimizing_status` = 'MINOR_OPTIMIZING' THEN 300 + WHEN `optimizing_status` = 'MAJOR_OPTIMIZING' THEN 200 + WHEN `optimizing_status` = 'FULL_OPTIMIZING' THEN 100 + END, + `optimizing_status_start_time`, `optimizing_process_id`, `optimizer_group`, `table_config`, `optimizing_config`, `pending_input` +FROM table_runtime_backup; \ No newline at end of file diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java index 96a8d789c1..1b59308b9d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java @@ -92,7 +92,7 @@ private void initTableWithFiles() { .asUnkeyedTable(); appendData(table, 1); appendData(table, 2); - TableRuntime runtime = tableService().getRuntime(serverTableIdentifier()); + TableRuntime runtime = tableService().getRuntime(serverTableIdentifier().getId()); runtime.refresh(tableService().loadTable(serverTableIdentifier())); } @@ -110,7 +110,7 @@ private void appendData(UnkeyedTable table, int id) { void refreshPending() { TableRuntimeRefreshExecutor refresher = new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE); - refresher.execute(tableService().getRuntime(serverTableIdentifier())); + refresher.execute(tableService().getRuntime(serverTableIdentifier().getId())); refresher.dispose(); } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java new file mode 100644 index 0000000000..cadce03273 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/OptimizingStatusTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.optimizing; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class OptimizingStatusTest { + @Test + public void testOptimizingStatusCodeValue() { + assertEquals(7, OptimizingStatus.values().length); + + assertEquals(OptimizingStatus.FULL_OPTIMIZING, OptimizingStatus.ofCode(100)); + assertEquals(OptimizingStatus.MAJOR_OPTIMIZING, OptimizingStatus.ofCode(200)); + assertEquals(OptimizingStatus.MINOR_OPTIMIZING, OptimizingStatus.ofCode(300)); + assertEquals(OptimizingStatus.COMMITTING, OptimizingStatus.ofCode(400)); + assertEquals(OptimizingStatus.PLANNING, OptimizingStatus.ofCode(500)); + assertEquals(OptimizingStatus.PENDING, OptimizingStatus.ofCode(600)); + assertEquals(OptimizingStatus.IDLE, OptimizingStatus.ofCode(700)); + } +} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java index 0032af9bc1..24ce7c0366 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java @@ -46,13 +46,13 @@ import org.apache.amoro.resource.ResourceGroup; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.metrics.MetricRegistry; +import org.apache.amoro.server.persistence.TableRuntimeMeta; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.OptimizerThread; import org.apache.amoro.server.resource.QuotaProvider; import org.apache.amoro.server.table.AMSTableTestBase; import org.apache.amoro.server.table.TableConfigurations; import org.apache.amoro.server.table.TableRuntime; -import org.apache.amoro.server.table.TableRuntimeMeta; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.MixedTable; @@ -105,13 +105,13 @@ protected static ResourceGroup testResourceGroup() { return new ResourceGroup.Builder("test", "local").build(); } - protected OptimizingQueue buildOptimizingGroupService(TableRuntimeMeta tableRuntimeMeta) { + protected OptimizingQueue buildOptimizingGroupService(TableRuntime tableRuntime) { return new OptimizingQueue( tableService(), testResourceGroup(), quotaProvider, planExecutor, - Collections.singletonList(tableRuntimeMeta), + Collections.singletonList(tableRuntime), 1); } @@ -127,7 +127,7 @@ private OptimizingQueue buildOptimizingGroupService() { @Test public void testPollNoTask() { - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntimeMeta = buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup()); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); Assert.assertNull(queue.pollTask(0)); @@ -138,25 +138,25 @@ public void testPollNoTask() { public void testRefreshAndReleaseTable() { OptimizingQueue queue = buildOptimizingGroupService(); Assert.assertEquals(0, queue.getSchedulingPolicy().getTableRuntimeMap().size()); - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntime = buildTableRuntimeMeta(OptimizingStatus.IDLE, defaultResourceGroup()); - queue.refreshTable(tableRuntimeMeta.getTableRuntime()); + queue.refreshTable(tableRuntime); Assert.assertEquals(1, queue.getSchedulingPolicy().getTableRuntimeMap().size()); Assert.assertTrue( queue.getSchedulingPolicy().getTableRuntimeMap().containsKey(serverTableIdentifier())); - queue.releaseTable(tableRuntimeMeta.getTableRuntime()); + queue.releaseTable(tableRuntime); Assert.assertEquals(0, queue.getSchedulingPolicy().getTableRuntimeMap().size()); - queue.refreshTable(tableRuntimeMeta.getTableRuntime()); + queue.refreshTable(tableRuntime); Assert.assertEquals(1, queue.getSchedulingPolicy().getTableRuntimeMap().size()); queue.dispose(); } @Test public void testPollTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); // 1.poll task TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -169,7 +169,7 @@ public void testPollTask() { @Test public void testRetryTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); + TableRuntime tableRuntimeMeta = initTableWithFiles(); OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); // 1.poll task @@ -202,8 +202,8 @@ public void testRetryTask() { @Test public void testCommitTask() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); Assert.assertEquals(0, queue.collectTasks().size()); TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -218,11 +218,11 @@ public void testCommitTask() { Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus()); // 7.commit - OptimizingProcess optimizingProcess = tableRuntimeMeta.getTableRuntime().getOptimizingProcess(); + OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess(); Assert.assertEquals(OptimizingProcess.Status.RUNNING, optimizingProcess.getStatus()); optimizingProcess.commit(); Assert.assertEquals(OptimizingProcess.Status.SUCCESS, optimizingProcess.getStatus()); - Assert.assertNull(tableRuntimeMeta.getTableRuntime().getOptimizingProcess()); + Assert.assertNull(tableRuntime.getOptimizingProcess()); // 8.commit again, throw exceptions, and status not changed. Assert.assertThrows(IllegalStateException.class, optimizingProcess::commit); @@ -234,8 +234,8 @@ public void testCommitTask() { @Test public void testCollectingTasks() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); Assert.assertEquals(0, queue.collectTasks().size()); TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); @@ -249,8 +249,8 @@ public void testCollectingTasks() { @Test public void testTaskAndTableMetrics() { - TableRuntimeMeta tableRuntimeMeta = initTableWithFiles(); - OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); + TableRuntime tableRuntime = initTableWithFiles(); + OptimizingQueue queue = buildOptimizingGroupService(tableRuntime); MetricRegistry registry = MetricManager.getInstance().getGlobalRegistry(); Map tagValues = ImmutableMap.of(GROUP_TAG, testResourceGroup().getName()); @@ -315,7 +315,7 @@ public void testTaskAndTableMetrics() { Assert.assertEquals(0, idleTablesGauge.getValue().longValue()); Assert.assertEquals(1, committingTablesGauge.getValue().longValue()); - OptimizingProcess optimizingProcess = tableRuntimeMeta.getTableRuntime().getOptimizingProcess(); + OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess(); optimizingProcess.commit(); Assert.assertEquals(0, queueTasksGauge.getValue().longValue()); Assert.assertEquals(0, executingTasksGauge.getValue().longValue()); @@ -363,21 +363,19 @@ public void testAddAndRemoveOptimizers() { queue.dispose(); } - protected TableRuntimeMeta initTableWithFiles() { + protected TableRuntime initTableWithFiles() { MixedTable mixedTable = (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); appendData(mixedTable.asUnkeyedTable(), 1); appendData(mixedTable.asUnkeyedTable(), 2); - TableRuntimeMeta tableRuntimeMeta = + TableRuntime tableRuntime = buildTableRuntimeMeta(OptimizingStatus.PENDING, defaultResourceGroup()); - TableRuntime runtime = tableRuntimeMeta.getTableRuntime(); - runtime.refresh(tableService().loadTable(serverTableIdentifier())); - return tableRuntimeMeta; + tableRuntime.refresh(tableService().loadTable(serverTableIdentifier())); + return tableRuntime; } - private TableRuntimeMeta buildTableRuntimeMeta( - OptimizingStatus status, ResourceGroup resourceGroup) { + private TableRuntime buildTableRuntimeMeta(OptimizingStatus status, ResourceGroup resourceGroup) { MixedTable mixedTable = (MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable(); TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta(); @@ -389,8 +387,7 @@ private TableRuntimeMeta buildTableRuntimeMeta( tableRuntimeMeta.setTableStatus(status); tableRuntimeMeta.setTableConfig(TableConfigurations.parseTableConfig(mixedTable.properties())); tableRuntimeMeta.setOptimizerGroup(resourceGroup.getName()); - tableRuntimeMeta.constructTableRuntime(tableService()); - return tableRuntimeMeta; + return new TableRuntime(tableRuntimeMeta, tableService()); } private void appendData(UnkeyedTable table, int id) { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/DerbyPersistence.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/DerbyPersistence.java index 7c88fbf8b7..96ca0b9aed 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/DerbyPersistence.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/DerbyPersistence.java @@ -28,8 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.UncheckedIOException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -62,8 +60,8 @@ public class DerbyPersistence extends ExternalResource { LOG.info("Deleted resources in derby persistent."); })); truncateAllTables(); - } catch (IOException e) { - throw new UncheckedIOException(e); + } catch (Exception e) { + throw new RuntimeException(e); } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java index 30aea4055d..641ac0e866 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeHandler.java @@ -132,7 +132,7 @@ protected DefaultTableService tableService() { static class TestHandler extends RuntimeHandlerChain { - private final List initTables = Lists.newArrayList(); + private final List initTables = Lists.newArrayList(); private final List> statusChangedTables = Lists.newArrayList(); private final List> configChangedTables = @@ -163,8 +163,8 @@ protected void handleTableRemoved(TableRuntime tableRuntime) { } @Override - protected void initHandler(List tableRuntimeMetaList) { - initTables.addAll(tableRuntimeMetaList); + protected void initHandler(List tableRuntimeList) { + initTables.addAll(tableRuntimeList); } @Override @@ -172,7 +172,7 @@ protected void doDispose() { disposed = true; } - public List getInitTables() { + public List getInitTables() { return initTables; } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java index 4549c676f9..b6b8ce1e77 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java @@ -69,27 +69,6 @@ public void testLoadTable() { "unknown", "unknown", "unknown", serverTableIdentifier().getFormat()))); } - @Test - public void testTableContains() { - Assert.assertTrue(tableService().contains(serverTableIdentifier().getId())); - ServerTableIdentifier copyId = - ServerTableIdentifier.of( - null, - serverTableIdentifier().getCatalog(), - serverTableIdentifier().getDatabase(), - serverTableIdentifier().getTableName(), - serverTableIdentifier().getFormat()); - Assert.assertFalse(tableService().contains(copyId.getId())); - copyId = - ServerTableIdentifier.of( - serverTableIdentifier().getId(), - serverTableIdentifier().getCatalog(), - serverTableIdentifier().getDatabase(), - "unknown", - serverTableIdentifier().getFormat()); - Assert.assertFalse(tableService().contains(copyId.getId())); - } - @Test public void testTableRuntime() { TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier().getId()); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java index 9bc914ef78..d8d3b9d196 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryMetrics.java @@ -104,7 +104,7 @@ private void initTableWithFiles() { .asUnkeyedTable(); appendData(table); appendPosDelete(table); - TableRuntime runtime = tableService().getRuntime(serverTableIdentifier()); + TableRuntime runtime = tableService().getRuntime(serverTableIdentifier().getId()); runtime.refresh(tableService().loadTable(serverTableIdentifier())); } @@ -142,7 +142,7 @@ private void appendPosDelete(UnkeyedTable table) { void refreshPending() { TableRuntimeRefreshExecutor refresher = new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE); - refresher.execute(tableService().getRuntime(serverTableIdentifier())); + refresher.execute(tableService().getRuntime(serverTableIdentifier().getId())); refresher.dispose(); }