Skip to content

Commit

Permalink
[AMORO-2893] Optimize the efficiency for restapi retrieve optimizing …
Browse files Browse the repository at this point in the history
…tables

Befor the change we'll retrieve all tables from db and sort in the memory,
this will be slow if there are many entriesin the db, after this change
we sort in the db and only retrieve the returned entries.
  • Loading branch information
klion26 committed Sep 20, 2024
1 parent 6f14e10 commit 5cc731f
Show file tree
Hide file tree
Showing 26 changed files with 509 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down Expand Up @@ -215,7 +214,7 @@ public void dispose() {
MetricManager.dispose();
}

private void initConfig() throws IOException {
private void initConfig() throws Exception {
LOG.info("initializing configurations...");
new ConfigurationHelper().init();
}
Expand Down Expand Up @@ -407,14 +406,14 @@ private class ConfigurationHelper {

private JsonNode yamlConfig;

public void init() throws IOException {
public void init() throws Exception {
Map<String, Object> envConfig = initEnvConfig();
initServiceConfig(envConfig);
setIcebergSystemProperties();
initContainerConfig();
}

private void initServiceConfig(Map<String, Object> envConfig) throws IOException {
private void initServiceConfig(Map<String, Object> envConfig) throws Exception {
LOG.info("initializing service configuration...");
String configPath = Environments.getConfigPath() + "/" + SERVER_CONFIG_FILENAME;
LOG.info("load config from path: {}", configPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -121,24 +120,24 @@ public RuntimeHandlerChain getTableRuntimeHandler() {
return tableHandlerChain;
}

private void loadOptimizingQueues(List<TableRuntimeMeta> tableRuntimeMetaList) {
private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
List<ResourceGroup> optimizerGroups =
getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
List<OptimizerInstance> optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
Map<String, List<TableRuntimeMeta>> groupToTableRuntimes =
Map<String, List<TableRuntime>> groupToTableRuntimes =
tableRuntimeMetaList.stream()
.collect(Collectors.groupingBy(TableRuntimeMeta::getOptimizerGroup));
.collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup));
optimizerGroups.forEach(
group -> {
String groupName = group.getName();
List<TableRuntimeMeta> tableRuntimeMetas = groupToTableRuntimes.remove(groupName);
List<TableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
tableService,
group,
this,
planExecutor,
Optional.ofNullable(tableRuntimeMetas).orElseGet(ArrayList::new),
Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new),
maxPlanningParallelism);
optimizingQueueByGroup.put(groupName, optimizingQueue);
});
Expand Down Expand Up @@ -456,9 +455,9 @@ public void handleTableRemoved(TableRuntime tableRuntime) {
}

@Override
protected void initHandler(List<TableRuntimeMeta> tableRuntimeMetaList) {
protected void initHandler(List<TableRuntime> tableRuntimeList) {
LOG.info("OptimizerManagementService begin initializing");
loadOptimizingQueues(tableRuntimeMetaList);
loadOptimizingQueues(tableRuntimeList);
optimizerKeeper.start();
LOG.info("SuspendingDetector for Optimizer has been started.");
LOG.info("OptimizerManagementService initializing has completed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.amoro.server.dashboard.controller;

import io.javalin.http.Context;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.resource.ResourceType;
Expand All @@ -30,13 +29,13 @@
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.dashboard.response.PageResult;
import org.apache.amoro.server.dashboard.utils.OptimizingUtil;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;

import javax.ws.rs.BadRequestException;

Expand Down Expand Up @@ -67,34 +66,17 @@ public void getOptimizerTables(Context ctx) {
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);
int offset = (page - 1) * pageSize;

List<TableRuntime> tableRuntimes = new ArrayList<>();
List<ServerTableIdentifier> tables = tableService.listManagedTables();
for (ServerTableIdentifier identifier : tables) {
TableRuntime tableRuntime = tableService.getRuntime(identifier);
if (tableRuntime == null) {
continue;
}
if ((ALL_GROUP.equals(optimizerGroup)
|| tableRuntime.getOptimizerGroup().equals(optimizerGroup))
&& (StringUtils.isEmpty(dbFilterStr)
|| StringUtils.containsIgnoreCase(identifier.getDatabase(), dbFilterStr))
&& (StringUtils.isEmpty(tableFilterStr)
|| StringUtils.containsIgnoreCase(identifier.getTableName(), tableFilterStr))) {
tableRuntimes.add(tableRuntime);
}
}
tableRuntimes.sort(
(o1, o2) -> {
// first we compare the status , and then we compare the start time when status are equal;
int statDiff = o1.getOptimizingStatus().compareTo(o2.getOptimizingStatus());
// status order is asc, startTime order is desc
if (statDiff == 0) {
long timeDiff = o1.getCurrentStatusStartTime() - o2.getCurrentStatusStartTime();
return timeDiff >= 0 ? (timeDiff == 0 ? 0 : -1) : 1;
} else {
return statDiff;
}
});
String optimizerGroupUsedInDbFilter = ALL_GROUP.equals(optimizerGroup) ? null : optimizerGroup;
// get all info from underlying table table_runtime
List<TableRuntimeMeta> tableRuntimeBeans =
tableService.getTableRuntimes(
optimizerGroupUsedInDbFilter, dbFilterStr, tableFilterStr, pageSize, offset);

List<TableRuntime> tableRuntimes =
tableRuntimeBeans.stream()
.map(meta -> tableService.getRuntime(meta.getTableId()))
.collect(Collectors.toList());

PageResult<TableOptimizingInfo> amsPageResult =
PageResult.of(tableRuntimes, offset, pageSize, OptimizingUtil::buildTableOptimizeInfo);
ctx.json(OkResponse.of(amsPageResult));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down Expand Up @@ -93,7 +92,7 @@ public OptimizingQueue(
ResourceGroup optimizerGroup,
QuotaProvider quotaProvider,
Executor planExecutor,
List<TableRuntimeMeta> tableRuntimeMetaList,
List<TableRuntime> tableRuntimeList,
int maxPlanningParallelism) {
Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null");
this.planExecutor = planExecutor;
Expand All @@ -106,14 +105,12 @@ public OptimizingQueue(
new OptimizerGroupMetrics(
optimizerGroup.getName(), MetricManager.getInstance().getGlobalRegistry(), this);
this.metrics.register();
tableRuntimeMetaList.forEach(this::initTableRuntime);
tableRuntimeList.forEach(this::initTableRuntime);
}

private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime();
if (tableRuntime.getOptimizingStatus().isProcessing()
&& tableRuntimeMeta.getOptimizingProcessId() != 0) {
tableRuntime.recover(new TableOptimizingProcess(tableRuntimeMeta));
private void initTableRuntime(TableRuntime tableRuntime) {
if (tableRuntime.getOptimizingStatus().isProcessing() && tableRuntime.getProcessId() != 0) {
tableRuntime.recover(new TableOptimizingProcess(tableRuntime));
}

if (tableRuntime.isOptimizingEnabled()) {
Expand All @@ -122,7 +119,7 @@ private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
if (!tableRuntime.getOptimizingStatus().isProcessing()) {
scheduler.addTable(tableRuntime);
} else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) {
tableQueue.offer(new TableOptimizingProcess(tableRuntimeMeta));
tableQueue.offer(new TableOptimizingProcess(tableRuntime));
}
} else {
OptimizingProcess process = tableRuntime.getOptimizingProcess();
Expand Down Expand Up @@ -379,21 +376,21 @@ public TableOptimizingProcess(OptimizingPlanner planner) {
beginAndPersistProcess();
}

public TableOptimizingProcess(TableRuntimeMeta tableRuntimeMeta) {
processId = tableRuntimeMeta.getOptimizingProcessId();
tableRuntime = tableRuntimeMeta.getTableRuntime();
optimizingType = tableRuntimeMeta.getOptimizingType();
targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId();
targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId();
planTime = tableRuntimeMeta.getPlanTime();
if (tableRuntimeMeta.getFromSequence() != null) {
fromSequence = tableRuntimeMeta.getFromSequence();
public TableOptimizingProcess(TableRuntime tableRuntime) {
processId = tableRuntime.getProcessId();
this.tableRuntime = tableRuntime;
optimizingType = tableRuntime.getOptimizingType();
targetSnapshotId = tableRuntime.getTargetSnapshotId();
targetChangeSnapshotId = tableRuntime.getTargetChangeSnapshotId();
planTime = tableRuntime.getLastPlanTime();
if (tableRuntime.getFromSequence() != null) {
fromSequence = tableRuntime.getFromSequence();
}
if (tableRuntimeMeta.getToSequence() != null) {
toSequence = tableRuntimeMeta.getToSequence();
if (tableRuntime.getToSequence() != null) {
toSequence = tableRuntime.getToSequence();
}
if (this.status != OptimizingProcess.Status.CLOSED) {
tableRuntimeMeta.getTableRuntime().recover(this);
tableRuntime.recover(this);
}
loadTaskRuntimes(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@
package org.apache.amoro.server.optimizing;

public enum OptimizingStatus {
FULL_OPTIMIZING("full", true),
MAJOR_OPTIMIZING("major", true),
MINOR_OPTIMIZING("minor", true),
COMMITTING("committing", true),
PLANNING("planning", false),
PENDING("pending", false),
IDLE("idle", false);
FULL_OPTIMIZING("full", true, 100),
MAJOR_OPTIMIZING("major", true, 200),
MINOR_OPTIMIZING("minor", true, 300),
COMMITTING("committing", true, 400),
PLANNING("planning", false, 500),
PENDING("pending", false, 600),
IDLE("idle", false, 700);
private final String displayValue;

private final boolean isProcessing;

OptimizingStatus(String displayValue, boolean isProcessing) {
private final int code;

OptimizingStatus(String displayValue, boolean isProcessing, int code) {
this.displayValue = displayValue;
this.isProcessing = isProcessing;
this.code = code;
}

public boolean isProcessing() {
Expand All @@ -42,4 +45,17 @@ public boolean isProcessing() {
public String displayValue() {
return displayValue;
}

public int getCode() {
return code;
}

public static OptimizingStatus ofCode(int code) {
for (OptimizingStatus status : values()) {
if (status.getCode() == code) {
return status;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.pool2.impl.BaseObjectPoolConfig;
import org.apache.ibatis.jdbc.ScriptRunner;
import org.apache.ibatis.mapping.DatabaseIdProvider;
import org.apache.ibatis.mapping.Environment;
import org.apache.ibatis.mapping.VendorDatabaseIdProvider;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
Expand All @@ -53,8 +55,10 @@
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Properties;

public class SqlSessionFactoryProvider {
private static final Logger LOG = LoggerFactory.getLogger(SqlSessionFactoryProvider.class);
Expand All @@ -73,7 +77,7 @@ public static SqlSessionFactoryProvider getInstance() {

private volatile SqlSessionFactory sqlSessionFactory;

public void init(Configurations config) {
public void init(Configurations config) throws SQLException {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUrl(config.getString(AmoroManagementConf.DB_CONNECTION_URL));
dataSource.setDriverClassName(config.getString(AmoroManagementConf.DB_DRIVER_CLASS_NAME));
Expand Down Expand Up @@ -111,6 +115,14 @@ public void init(Configurations config) {
configuration.addMapper(PlatformFileMapper.class);
configuration.addMapper(ResourceMapper.class);
configuration.addMapper(TableBlockerMapper.class);

DatabaseIdProvider provider = new VendorDatabaseIdProvider();
Properties properties = new Properties();
properties.setProperty("MySQL", "mysql");
properties.setProperty("PostgreSQL", "postgres");
properties.setProperty("Derby", "derby");
provider.setProperties(properties);
configuration.setDatabaseId(provider.getDatabaseId(dataSource));
if (sqlSessionFactory == null) {
synchronized (this) {
if (sqlSessionFactory == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.amoro.server.table;
package org.apache.amoro.server.persistence;

import org.apache.amoro.TableFormat;
import org.apache.amoro.config.TableConfiguration;
Expand All @@ -27,6 +27,7 @@

import java.util.Map;

/** The class for table used when transfer data from/to database. */
public class TableRuntimeMeta {
private long tableId;
private String catalogName;
Expand Down Expand Up @@ -58,24 +59,8 @@ public class TableRuntimeMeta {
private Map<String, Long> fromSequence;
private Map<String, Long> toSequence;

private TableRuntime tableRuntime;

public TableRuntimeMeta() {}

public TableRuntime constructTableRuntime(TableManager initializer) {
if (tableRuntime == null) {
tableRuntime = new TableRuntime(this, initializer);
}
return tableRuntime;
}

public TableRuntime getTableRuntime() {
if (tableRuntime == null) {
throw new IllegalStateException("TableRuntime is not constructed yet.");
}
return tableRuntime;
}

public long getTargetSnapshotId() {
return targetSnapshotId;
}
Expand Down Expand Up @@ -212,10 +197,6 @@ public void setTableSummary(OptimizingEvaluator.PendingInput tableSummary) {
this.tableSummary = tableSummary;
}

public void setTableRuntime(TableRuntime tableRuntime) {
this.tableRuntime = tableRuntime;
}

public void setLastOptimizedChangeSnapshotId(long lastOptimizedChangeSnapshotId) {
this.lastOptimizedChangeSnapshotId = lastOptimizedChangeSnapshotId;
}
Expand Down
Loading

0 comments on commit 5cc731f

Please sign in to comment.