Skip to content

Commit

Permalink
[AMORO-2893] Optimizing the efficiency for restapi retrieve optimizin…
Browse files Browse the repository at this point in the history
…g tables

Befor the change we'll retrieve all tables from db and sort in the memory,
this will be slow if there are many entriesin the db, after this change
we sort in the db and only retrieve the returned entries.
  • Loading branch information
klion26 committed Jul 30, 2024
1 parent f5d2e85 commit 779fc06
Show file tree
Hide file tree
Showing 24 changed files with 448 additions and 170 deletions.
11 changes: 11 additions & 0 deletions amoro-ams/amoro-ams-dashboard/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,16 @@
</pluginManagement>
</build>
</profile>
<profile>
<id>use-alibaba-mirror</id>
<activation>
<property>
<name>use-alibaba-mirror</name>
</property>
</activation>
<properties>
<npm.proxy>--registry http://172.17.0.1:8888/repository/npm/</npm.proxy>
</properties>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -121,17 +120,17 @@ public RuntimeHandlerChain getTableRuntimeHandler() {
return tableHandlerChain;
}

private void loadOptimizingQueues(List<TableRuntimeMeta> tableRuntimeMetaList) {
private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
List<ResourceGroup> optimizerGroups =
getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
List<OptimizerInstance> optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
Map<String, List<TableRuntimeMeta>> groupToTableRuntimes =
Map<String, List<TableRuntime>> groupToTableRuntimes =
tableRuntimeMetaList.stream()
.collect(Collectors.groupingBy(TableRuntimeMeta::getOptimizerGroup));
.collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup));
optimizerGroups.forEach(
group -> {
String groupName = group.getName();
List<TableRuntimeMeta> tableRuntimeMetas = groupToTableRuntimes.remove(groupName);
List<TableRuntime> tableRuntimeMetas = groupToTableRuntimes.remove(groupName);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
tableService,
Expand Down Expand Up @@ -455,7 +454,7 @@ public void handleTableRemoved(TableRuntime tableRuntime) {
}

@Override
protected void initHandler(List<TableRuntimeMeta> tableRuntimeMetaList) {
protected void initHandler(List<TableRuntime> tableRuntimeMetaList) {
LOG.info("OptimizerManagementService begin initializing");
loadOptimizingQueues(tableRuntimeMetaList);
optimizerKeeper.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.amoro.server.dashboard.controller;

import io.javalin.http.Context;
import org.apache.amoro.api.ServerTableIdentifier;
import org.apache.amoro.api.resource.Resource;
import org.apache.amoro.api.resource.ResourceGroup;
import org.apache.amoro.api.resource.ResourceType;
Expand All @@ -30,13 +29,13 @@
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.dashboard.response.PageResult;
import org.apache.amoro.server.dashboard.utils.OptimizingUtil;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;

import javax.ws.rs.BadRequestException;

Expand Down Expand Up @@ -67,34 +66,17 @@ public void getOptimizerTables(Context ctx) {
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);
int offset = (page - 1) * pageSize;

List<TableRuntime> tableRuntimes = new ArrayList<>();
List<ServerTableIdentifier> tables = tableService.listManagedTables();
for (ServerTableIdentifier identifier : tables) {
TableRuntime tableRuntime = tableService.getRuntime(identifier);
if (tableRuntime == null) {
continue;
}
if ((ALL_GROUP.equals(optimizerGroup)
|| tableRuntime.getOptimizerGroup().equals(optimizerGroup))
&& (StringUtils.isEmpty(dbFilterStr)
|| StringUtils.containsIgnoreCase(identifier.getDatabase(), dbFilterStr))
&& (StringUtils.isEmpty(tableFilterStr)
|| StringUtils.containsIgnoreCase(identifier.getTableName(), tableFilterStr))) {
tableRuntimes.add(tableRuntime);
}
}
tableRuntimes.sort(
(o1, o2) -> {
// first we compare the status , and then we compare the start time when status are equal;
int statDiff = o1.getOptimizingStatus().compareTo(o2.getOptimizingStatus());
// status order is asc, startTime order is desc
if (statDiff == 0) {
long timeDiff = o1.getCurrentStatusStartTime() - o2.getCurrentStatusStartTime();
return timeDiff >= 0 ? (timeDiff == 0 ? 0 : -1) : 1;
} else {
return statDiff;
}
});
String optimizerGroupUsedInDbFilter = "all".equals(optimizerGroup) ? null : optimizerGroup;
// get all info from underlying table table_runtime
List<TableRuntimeMeta> tableRuntimeBeans =
tableService.getTableRuntimes(
optimizerGroupUsedInDbFilter, dbFilterStr, tableFilterStr, pageSize, offset);

List<TableRuntime> tableRuntimes =
tableRuntimeBeans.stream()
.map(meta -> tableService.getRuntime(meta.getTableId()))
.collect(Collectors.toList());

PageResult<TableOptimizingInfo> amsPageResult =
PageResult.of(tableRuntimes, offset, pageSize, OptimizingUtil::buildTableOptimizeInfo);
ctx.json(OkResponse.of(amsPageResult));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void getTableDetail(Context ctx) {
if (serverTableIdentifier.isPresent()) {
tableSummary.put(
"optimizingStatus",
tableService.getRuntime(serverTableIdentifier.get()).getOptimizingStatus());
tableService.getRuntime(serverTableIdentifier.get().getId()).getOptimizingStatus());
} else {
tableSummary.put("optimizingStatus", OptimizingStatus.IDLE);
}
Expand Down Expand Up @@ -650,7 +650,9 @@ public void cancelOptimizingProcess(Context ctx) {
tableService.getServerTableIdentifier(
TableIdentifier.of(catalog, db, table).buildTableIdentifier());
TableRuntime tableRuntime =
serverTableIdentifier != null ? tableService.getRuntime(serverTableIdentifier) : null;
serverTableIdentifier != null
? tableService.getRuntime(serverTableIdentifier.getId())
: null;

Preconditions.checkArgument(
tableRuntime != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down Expand Up @@ -93,7 +92,7 @@ public OptimizingQueue(
ResourceGroup optimizerGroup,
QuotaProvider quotaProvider,
Executor planExecutor,
List<TableRuntimeMeta> tableRuntimeMetaList,
List<TableRuntime> tableRuntimeMetaList,
int maxPlanningParallelism) {
Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null");
this.planExecutor = planExecutor;
Expand All @@ -109,11 +108,10 @@ public OptimizingQueue(
tableRuntimeMetaList.forEach(this::initTableRuntime);
}

private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime();
private void initTableRuntime(TableRuntime tableRuntime) {
if (tableRuntime.getOptimizingStatus().isProcessing()
&& tableRuntimeMeta.getOptimizingProcessId() != 0) {
tableRuntime.recover(new TableOptimizingProcess(tableRuntimeMeta));
&& tableRuntime.getOptimizingProcess().getProcessId() != 0) {
tableRuntime.recover(new TableOptimizingProcess(tableRuntime));
}

if (tableRuntime.isOptimizingEnabled()) {
Expand All @@ -122,7 +120,7 @@ private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
if (!tableRuntime.getOptimizingStatus().isProcessing()) {
scheduler.addTable(tableRuntime);
} else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) {
tableQueue.offer(new TableOptimizingProcess(tableRuntimeMeta));
tableQueue.offer(new TableOptimizingProcess(tableRuntime));
}
} else {
OptimizingProcess process = tableRuntime.getOptimizingProcess();
Expand Down Expand Up @@ -376,21 +374,21 @@ public TableOptimizingProcess(OptimizingPlanner planner) {
beginAndPersistProcess();
}

public TableOptimizingProcess(TableRuntimeMeta tableRuntimeMeta) {
processId = tableRuntimeMeta.getOptimizingProcessId();
tableRuntime = tableRuntimeMeta.getTableRuntime();
optimizingType = tableRuntimeMeta.getOptimizingType();
targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId();
targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId();
planTime = tableRuntimeMeta.getPlanTime();
if (tableRuntimeMeta.getFromSequence() != null) {
fromSequence = tableRuntimeMeta.getFromSequence();
public TableOptimizingProcess(TableRuntime tableRuntime) {
processId = tableRuntime.getOptimizingProcess().getProcessId();
this.tableRuntime = tableRuntime;
optimizingType = tableRuntime.getOptimizingProcess().getOptimizingType();
targetSnapshotId = tableRuntime.getTargetSnapshotId();
targetChangeSnapshotId = tableRuntime.getTargetChangeSnapshotId();
planTime = tableRuntime.getLastPlanTime();
if (tableRuntime.getFromSequence() != null) {
fromSequence = tableRuntime.getFromSequence();
}
if (tableRuntimeMeta.getToSequence() != null) {
toSequence = tableRuntimeMeta.getToSequence();
if (tableRuntime.getToSequence() != null) {
toSequence = tableRuntime.getToSequence();
}
if (this.status != OptimizingProcess.Status.CLOSED) {
tableRuntimeMeta.getTableRuntime().recover(this);
tableRuntime.recover(this);
}
loadTaskRuntimes(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@
package org.apache.amoro.server.optimizing;

public enum OptimizingStatus {
FULL_OPTIMIZING("full", true),
MAJOR_OPTIMIZING("major", true),
MINOR_OPTIMIZING("minor", true),
COMMITTING("committing", true),
PLANNING("planning", false),
PENDING("pending", false),
IDLE("idle", false);
FULL_OPTIMIZING("full", true, 0),
MAJOR_OPTIMIZING("major", true, 1),
MINOR_OPTIMIZING("minor", true, 2),
COMMITTING("committing", true, 3),
PLANNING("planning", false, 4),
PENDING("pending", false, 5),
IDLE("idle", false, 6);
private final String displayValue;

private final boolean isProcessing;

OptimizingStatus(String displayValue, boolean isProcessing) {
private final int code;

OptimizingStatus(String displayValue, boolean isProcessing, int code) {
this.displayValue = displayValue;
this.isProcessing = isProcessing;
this.code = code;
}

public boolean isProcessing() {
Expand All @@ -42,4 +45,16 @@ public boolean isProcessing() {
public String displayValue() {
return displayValue;
}

public int getCode() {
return code;
}

public static OptimizingStatus ofCode(int code) {
if (code >= 0 && code <= values().length) {
return values()[code];
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.amoro.server.table;
package org.apache.amoro.server.persistence;

import org.apache.amoro.TableFormat;
import org.apache.amoro.api.config.TableConfiguration;
Expand All @@ -27,6 +27,7 @@

import java.util.Map;

/** The class for table used when transfer data from/to database. */
public class TableRuntimeMeta {
private long tableId;
private String catalogName;
Expand Down Expand Up @@ -57,24 +58,8 @@ public class TableRuntimeMeta {
private Map<String, Long> fromSequence;
private Map<String, Long> toSequence;

private TableRuntime tableRuntime;

public TableRuntimeMeta() {}

public TableRuntime constructTableRuntime(TableManager initializer) {
if (tableRuntime == null) {
tableRuntime = new TableRuntime(this, initializer);
}
return tableRuntime;
}

public TableRuntime getTableRuntime() {
if (tableRuntime == null) {
throw new IllegalStateException("TableRuntime is not constructed yet.");
}
return tableRuntime;
}

public long getTargetSnapshotId() {
return targetSnapshotId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.persistence.converter;

import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;
import org.apache.ibatis.type.MappedJdbcTypes;
import org.apache.ibatis.type.MappedTypes;

import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

@MappedJdbcTypes(JdbcType.INTEGER)
@MappedTypes(Enum.class)
public class OptimizingStatusConverter extends BaseTypeHandler<OptimizingStatus> {

@Override
public void setNonNullParameter(
PreparedStatement ps, int i, OptimizingStatus parameter, JdbcType jdbcType)
throws SQLException {
ps.setInt(i, parameter.getCode());
}

@Override
public OptimizingStatus getNullableResult(ResultSet rs, String columnName) throws SQLException {
String s = rs.getString(columnName);
return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s));
}

@Override
public OptimizingStatus getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
String s = rs.getString(columnIndex);
return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s));
}

@Override
public OptimizingStatus getNullableResult(CallableStatement cs, int columnIndex)
throws SQLException {
String s = cs.getString(columnIndex);
return s == null ? null : OptimizingStatus.ofCode(Integer.parseInt(s));
}
}
Loading

0 comments on commit 779fc06

Please sign in to comment.