Skip to content

Commit

Permalink
[AMORO-2893] Optimizing the efficiency for restapi retrieve optimizin…
Browse files Browse the repository at this point in the history
…g tables

Befor the change we'll retrieve all tables from db and sort in the memory,
this will be slow if there are many entriesin the db, after this change
we sort in the db and only retrieve the returned entries.
  • Loading branch information
klion26 committed Jul 29, 2024
1 parent f5d2e85 commit 956f91d
Show file tree
Hide file tree
Showing 18 changed files with 274 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -121,17 +120,17 @@ public RuntimeHandlerChain getTableRuntimeHandler() {
return tableHandlerChain;
}

private void loadOptimizingQueues(List<TableRuntimeMeta> tableRuntimeMetaList) {
private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
List<ResourceGroup> optimizerGroups =
getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
List<OptimizerInstance> optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
Map<String, List<TableRuntimeMeta>> groupToTableRuntimes =
Map<String, List<TableRuntime>> groupToTableRuntimes =
tableRuntimeMetaList.stream()
.collect(Collectors.groupingBy(TableRuntimeMeta::getOptimizerGroup));
.collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup));
optimizerGroups.forEach(
group -> {
String groupName = group.getName();
List<TableRuntimeMeta> tableRuntimeMetas = groupToTableRuntimes.remove(groupName);
List<TableRuntime> tableRuntimeMetas = groupToTableRuntimes.remove(groupName);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
tableService,
Expand Down Expand Up @@ -455,7 +454,7 @@ public void handleTableRemoved(TableRuntime tableRuntime) {
}

@Override
protected void initHandler(List<TableRuntimeMeta> tableRuntimeMetaList) {
protected void initHandler(List<TableRuntime> tableRuntimeMetaList) {
LOG.info("OptimizerManagementService begin initializing");
loadOptimizingQueues(tableRuntimeMetaList);
optimizerKeeper.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.amoro.server.dashboard.controller;

import io.javalin.http.Context;
import org.apache.amoro.api.ServerTableIdentifier;
import org.apache.amoro.api.resource.Resource;
import org.apache.amoro.api.resource.ResourceGroup;
import org.apache.amoro.api.resource.ResourceType;
Expand All @@ -30,13 +29,13 @@
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.dashboard.response.PageResult;
import org.apache.amoro.server.dashboard.utils.OptimizingUtil;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;

import javax.ws.rs.BadRequestException;

Expand Down Expand Up @@ -67,34 +66,16 @@ public void getOptimizerTables(Context ctx) {
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);
int offset = (page - 1) * pageSize;

List<TableRuntime> tableRuntimes = new ArrayList<>();
List<ServerTableIdentifier> tables = tableService.listManagedTables();
for (ServerTableIdentifier identifier : tables) {
TableRuntime tableRuntime = tableService.getRuntime(identifier);
if (tableRuntime == null) {
continue;
}
if ((ALL_GROUP.equals(optimizerGroup)
|| tableRuntime.getOptimizerGroup().equals(optimizerGroup))
&& (StringUtils.isEmpty(dbFilterStr)
|| StringUtils.containsIgnoreCase(identifier.getDatabase(), dbFilterStr))
&& (StringUtils.isEmpty(tableFilterStr)
|| StringUtils.containsIgnoreCase(identifier.getTableName(), tableFilterStr))) {
tableRuntimes.add(tableRuntime);
}
}
tableRuntimes.sort(
(o1, o2) -> {
// first we compare the status , and then we compare the start time when status are equal;
int statDiff = o1.getOptimizingStatus().compareTo(o2.getOptimizingStatus());
// status order is asc, startTime order is desc
if (statDiff == 0) {
long timeDiff = o1.getCurrentStatusStartTime() - o2.getCurrentStatusStartTime();
return timeDiff >= 0 ? (timeDiff == 0 ? 0 : -1) : 1;
} else {
return statDiff;
}
});
String optimizerGroupUsedInDbFilter = "all".equals(optimizerGroup) ? null : optimizerGroup;
// get all info from underlying table table_runtime
List<TableRuntimeMeta> tableRuntimeBeans =
tableService.getTableRuntimes(optimizerGroupUsedInDbFilter, dbFilterStr, tableFilterStr, pageSize, offset);

List<TableRuntime> tableRuntimes =
tableRuntimeBeans.stream()
.map(meta -> tableService.getRuntime(meta.getTableId()))
.collect(Collectors.toList());

PageResult<TableOptimizingInfo> amsPageResult =
PageResult.of(tableRuntimes, offset, pageSize, OptimizingUtil::buildTableOptimizeInfo);
ctx.json(OkResponse.of(amsPageResult));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void getTableDetail(Context ctx) {
if (serverTableIdentifier.isPresent()) {
tableSummary.put(
"optimizingStatus",
tableService.getRuntime(serverTableIdentifier.get()).getOptimizingStatus());
tableService.getRuntime(serverTableIdentifier.get().getId()).getOptimizingStatus());
} else {
tableSummary.put("optimizingStatus", OptimizingStatus.IDLE);
}
Expand Down Expand Up @@ -650,7 +650,9 @@ public void cancelOptimizingProcess(Context ctx) {
tableService.getServerTableIdentifier(
TableIdentifier.of(catalog, db, table).buildTableIdentifier());
TableRuntime tableRuntime =
serverTableIdentifier != null ? tableService.getRuntime(serverTableIdentifier) : null;
serverTableIdentifier != null
? tableService.getRuntime(serverTableIdentifier.getId())
: null;

Preconditions.checkArgument(
tableRuntime != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down Expand Up @@ -93,7 +92,7 @@ public OptimizingQueue(
ResourceGroup optimizerGroup,
QuotaProvider quotaProvider,
Executor planExecutor,
List<TableRuntimeMeta> tableRuntimeMetaList,
List<TableRuntime> tableRuntimeMetaList,
int maxPlanningParallelism) {
Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null");
this.planExecutor = planExecutor;
Expand All @@ -109,11 +108,10 @@ public OptimizingQueue(
tableRuntimeMetaList.forEach(this::initTableRuntime);
}

private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime();
private void initTableRuntime(TableRuntime tableRuntime) {
if (tableRuntime.getOptimizingStatus().isProcessing()
&& tableRuntimeMeta.getOptimizingProcessId() != 0) {
tableRuntime.recover(new TableOptimizingProcess(tableRuntimeMeta));
&& tableRuntime.getOptimizingProcess().getProcessId() != 0) {
tableRuntime.recover(new TableOptimizingProcess(tableRuntime));
}

if (tableRuntime.isOptimizingEnabled()) {
Expand All @@ -122,7 +120,7 @@ private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
if (!tableRuntime.getOptimizingStatus().isProcessing()) {
scheduler.addTable(tableRuntime);
} else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) {
tableQueue.offer(new TableOptimizingProcess(tableRuntimeMeta));
tableQueue.offer(new TableOptimizingProcess(tableRuntime));
}
} else {
OptimizingProcess process = tableRuntime.getOptimizingProcess();
Expand Down Expand Up @@ -376,21 +374,21 @@ public TableOptimizingProcess(OptimizingPlanner planner) {
beginAndPersistProcess();
}

public TableOptimizingProcess(TableRuntimeMeta tableRuntimeMeta) {
processId = tableRuntimeMeta.getOptimizingProcessId();
tableRuntime = tableRuntimeMeta.getTableRuntime();
optimizingType = tableRuntimeMeta.getOptimizingType();
targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId();
targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId();
planTime = tableRuntimeMeta.getPlanTime();
if (tableRuntimeMeta.getFromSequence() != null) {
fromSequence = tableRuntimeMeta.getFromSequence();
public TableOptimizingProcess(TableRuntime tableRuntime) {
processId = tableRuntime.getOptimizingProcess().getProcessId();
this.tableRuntime = tableRuntime;
optimizingType = tableRuntime.getOptimizingProcess().getOptimizingType();
targetSnapshotId = tableRuntime.getTargetSnapshotId();
targetChangeSnapshotId = tableRuntime.getTargetChangeSnapshotId();
planTime = tableRuntime.getLastPlanTime();
if (tableRuntime.getFromSequence() != null) {
fromSequence = tableRuntime.getFromSequence();
}
if (tableRuntimeMeta.getToSequence() != null) {
toSequence = tableRuntimeMeta.getToSequence();
if (tableRuntime.getToSequence() != null) {
toSequence = tableRuntime.getToSequence();
}
if (this.status != OptimizingProcess.Status.CLOSED) {
tableRuntimeMeta.getTableRuntime().recover(this);
tableRuntime.recover(this);
}
loadTaskRuntimes(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.amoro.server.table;
package org.apache.amoro.server.persistence;

import org.apache.amoro.TableFormat;
import org.apache.amoro.api.config.TableConfiguration;
Expand All @@ -27,6 +27,7 @@

import java.util.Map;

/** The class for table used when transfer data from/to database. */
public class TableRuntimeMeta {
private long tableId;
private String catalogName;
Expand Down Expand Up @@ -57,24 +58,8 @@ public class TableRuntimeMeta {
private Map<String, Long> fromSequence;
private Map<String, Long> toSequence;

private TableRuntime tableRuntime;

public TableRuntimeMeta() {}

public TableRuntime constructTableRuntime(TableManager initializer) {
if (tableRuntime == null) {
tableRuntime = new TableRuntime(this, initializer);
}
return tableRuntime;
}

public TableRuntime getTableRuntime() {
if (tableRuntime == null) {
throw new IllegalStateException("TableRuntime is not constructed yet.");
}
return tableRuntime;
}

public long getTargetSnapshotId() {
return targetSnapshotId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.amoro.server.persistence.mapper;

import org.apache.amoro.api.ServerTableIdentifier;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.persistence.converter.JsonObjectConverter;
import org.apache.amoro.server.persistence.converter.Long2TsConverter;
import org.apache.amoro.server.persistence.converter.Map2StringConverter;
import org.apache.amoro.server.persistence.converter.MapLong2StringConverter;
import org.apache.amoro.server.table.TableMetadata;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Options;
Expand Down Expand Up @@ -424,4 +424,75 @@ List<ServerTableIdentifier> selectTableIdentifiersByCatalog(
typeHandler = MapLong2StringConverter.class)
})
List<TableRuntimeMeta> selectTableRuntimeMetas();

@Select(
"<script>"
+ "<bind name=\"isMySQL\" value=\"_databaseId == 'MySQL'\" />"
+ "<bind name=\"isPostgreSQL\" value=\"_databaseId == 'PostgreSQL'\" />"
+ "<bind name=\"isDerby\" value=\"_databaseId == 'Derby'\" />"
+ "SELECT table_id, catalog_name, db_name, table_name, current_snapshot_id, "
+ "current_change_snapshotId, last_optimized_snapshotId, last_optimized_change_snapshotId, "
+ "last_major_optimizing_time, last_minor_optimizing_time, last_full_optimizing_time, optimizing_status, "
+ "optimizing_status_start_time, optimizing_process_id, "
+ "optimizer_group, table_config, pending_input FROM table_runtime "
+ "WHERE 1=1 "
+ "<if test='optimizerGroup != null'> AND optimizer_group = #{optimizerGroup} </if> "
+ "<if test='fuzzyDbName != null'> AND db_name like '%#{fuzzyDbName}%' </if>"
+ "<if test='fuzzyTableName != null'> AND table_name like '%#{fuzzyTableName}%' </if>"
+ "ORDER BY CASE optimizing_status "
+ "WHEN 'MAJOR_OPTIMIZING' THEN 1 "
+ "WHEN 'MINOR_OPTIMIZING' THEN 2 "
+ "WHEN 'COMMITTING' THEN 3 "
+ "WHEN 'PLANNING' THEN 4 "
+ "WHEN 'PENDING' THEN 5 "
+ "ELSE 6 END, "
+ "optimizing_status_start_time DESC "
+ "<if test='isMySQL or isPostgreSQL'> LIMIT #{limitCount} OFFSET #{offsetNum} </if>"
+ "<if test='isDerby'> OFFSET #{offsetNum} FETCH NEXT #{limitCount} ROWS ONLY </if>"
+ "</script>")
@Results({
@Result(property = "tableId", column = "table_id"),
@Result(property = "catalogName", column = "catalog_name"),
@Result(property = "dbName", column = "db_name"),
@Result(property = "tableName", column = "table_name"),
@Result(property = "currentSnapshotId", column = "current_snapshot_id"),
@Result(property = "currentChangeSnapshotId", column = "current_change_snapshotId"),
@Result(property = "lastOptimizedSnapshotId", column = "last_optimized_snapshotId"),
@Result(
property = "lastOptimizedChangeSnapshotId",
column = "last_optimized_change_snapshotId"),
@Result(
property = "lastMajorOptimizingTime",
column = "last_major_optimizing_time",
typeHandler = Long2TsConverter.class),
@Result(
property = "lastMinorOptimizingTime",
column = "last_minor_optimizing_time",
typeHandler = Long2TsConverter.class),
@Result(
property = "lastFullOptimizingTime",
column = "last_full_optimizing_time",
typeHandler = Long2TsConverter.class),
@Result(property = "tableStatus", column = "optimizing_status"),
@Result(
property = "currentStatusStartTime",
column = "optimizing_status_start_time",
typeHandler = Long2TsConverter.class),
@Result(property = "optimizingProcessId", column = "optimizing_process_id"),
@Result(property = "optimizerGroup", column = "optimizer_group"),
@Result(
property = "pendingInput",
column = "pending_input",
typeHandler = JsonObjectConverter.class),
@Result(
property = "tableConfig",
column = "table_config",
typeHandler = JsonObjectConverter.class),
})
List<TableRuntimeMeta> selectTableRuntimesForOptimizerGroup(
@Param("optimizerGroup") String optimizerGroup,
@Param("fuzzyDbName") String fuzzyDbName,
@Param("fuzzyTableName") String fuzzyTableName,
@Param("limitCount") int limitCount,
@Param("offsetNum") int offset);
}
Loading

0 comments on commit 956f91d

Please sign in to comment.