Skip to content

Commit

Permalink
Merge branch 'master' into dynamic-format
Browse files Browse the repository at this point in the history
# Conflicts:
#	amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableMetaMapper.java
  • Loading branch information
baiyangtx committed Sep 23, 2024
2 parents 8308a6b + 3f627ec commit 0e86f3d
Show file tree
Hide file tree
Showing 60 changed files with 822 additions and 1,150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down Expand Up @@ -216,7 +215,7 @@ public void dispose() {
MetricManager.dispose();
}

private void initConfig() throws IOException {
private void initConfig() throws Exception {
LOG.info("initializing configurations...");
new ConfigurationHelper().init();
}
Expand Down Expand Up @@ -409,14 +408,14 @@ private class ConfigurationHelper {

private JsonNode yamlConfig;

public void init() throws IOException {
public void init() throws Exception {
Map<String, Object> envConfig = initEnvConfig();
initServiceConfig(envConfig);
setIcebergSystemProperties();
initContainerConfig();
}

private void initServiceConfig(Map<String, Object> envConfig) throws IOException {
private void initServiceConfig(Map<String, Object> envConfig) throws Exception {
LOG.info("initializing service configuration...");
String configPath = Environments.getConfigPath() + "/" + SERVER_CONFIG_FILENAME;
LOG.info("load config from path: {}", configPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -121,24 +120,24 @@ public RuntimeHandlerChain getTableRuntimeHandler() {
return tableHandlerChain;
}

private void loadOptimizingQueues(List<TableRuntimeMeta> tableRuntimeMetaList) {
private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
List<ResourceGroup> optimizerGroups =
getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
List<OptimizerInstance> optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
Map<String, List<TableRuntimeMeta>> groupToTableRuntimes =
Map<String, List<TableRuntime>> groupToTableRuntimes =
tableRuntimeMetaList.stream()
.collect(Collectors.groupingBy(TableRuntimeMeta::getOptimizerGroup));
.collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup));
optimizerGroups.forEach(
group -> {
String groupName = group.getName();
List<TableRuntimeMeta> tableRuntimeMetas = groupToTableRuntimes.remove(groupName);
List<TableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
tableService,
group,
this,
planExecutor,
Optional.ofNullable(tableRuntimeMetas).orElseGet(ArrayList::new),
Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new),
maxPlanningParallelism);
optimizingQueueByGroup.put(groupName, optimizingQueue);
});
Expand Down Expand Up @@ -456,9 +455,9 @@ public void handleTableRemoved(TableRuntime tableRuntime) {
}

@Override
protected void initHandler(List<TableRuntimeMeta> tableRuntimeMetaList) {
protected void initHandler(List<TableRuntime> tableRuntimeList) {
LOG.info("OptimizerManagementService begin initializing");
loadOptimizingQueues(tableRuntimeMetaList);
loadOptimizingQueues(tableRuntimeList);
optimizerKeeper.start();
LOG.info("SuspendingDetector for Optimizer has been started.");
LOG.info("OptimizerManagementService initializing has completed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ private void updateTableDetail(long ts, Map<MetricDefine, List<MetricKey>> metri
long totalTableSize = 0L;

// table size
List<MetricKey> metricKeys = metricDefineMap.get(TABLE_SUMMARY_TOTAL_FILES_SIZE);
List<MetricKey> metricKeys =
metricDefineMap.getOrDefault(TABLE_SUMMARY_TOTAL_FILES_SIZE, ImmutableList.of());
for (MetricKey metricKey : metricKeys) {
String tableName = fullTableName(metricKey);
allCatalogs.add(catalog(metricKey));
Expand All @@ -197,7 +198,7 @@ private void updateTableDetail(long ts, Map<MetricDefine, List<MetricKey>> metri
}

// file count
metricKeys = metricDefineMap.get(TABLE_SUMMARY_TOTAL_FILES);
metricKeys = metricDefineMap.getOrDefault(TABLE_SUMMARY_TOTAL_FILES, ImmutableList.of());
for (MetricKey metricKey : metricKeys) {
String tableName = fullTableName(metricKey);
allCatalogs.add(catalog(metricKey));
Expand All @@ -209,7 +210,7 @@ private void updateTableDetail(long ts, Map<MetricDefine, List<MetricKey>> metri
}

// health score
metricKeys = metricDefineMap.get(TABLE_SUMMARY_HEALTH_SCORE);
metricKeys = metricDefineMap.getOrDefault(TABLE_SUMMARY_HEALTH_SCORE, ImmutableList.of());
for (MetricKey metricKey : metricKeys) {
String tableName = fullTableName(metricKey);
allCatalogs.add(catalog(metricKey));
Expand All @@ -221,7 +222,7 @@ private void updateTableDetail(long ts, Map<MetricDefine, List<MetricKey>> metri

this.totalDataSize.set(totalTableSize);
this.totalCatalog.set(allCatalogs.size());
this.totalTableCount.set(metricKeys.size());
this.totalTableCount.set(topTableItemMap.size());
this.allTopTableItem = new ArrayList<>(topTableItemMap.values());
addAndCheck(new OverviewDataSizeItem(ts, totalTableSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.amoro.server.dashboard.controller;

import io.javalin.http.Context;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.resource.ResourceType;
Expand All @@ -30,13 +29,13 @@
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.dashboard.response.PageResult;
import org.apache.amoro.server.dashboard.utils.OptimizingUtil;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;

import javax.ws.rs.BadRequestException;

Expand Down Expand Up @@ -67,34 +66,17 @@ public void getOptimizerTables(Context ctx) {
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);
int offset = (page - 1) * pageSize;

List<TableRuntime> tableRuntimes = new ArrayList<>();
List<ServerTableIdentifier> tables = tableService.listManagedTables();
for (ServerTableIdentifier identifier : tables) {
TableRuntime tableRuntime = tableService.getRuntime(identifier);
if (tableRuntime == null) {
continue;
}
if ((ALL_GROUP.equals(optimizerGroup)
|| tableRuntime.getOptimizerGroup().equals(optimizerGroup))
&& (StringUtils.isEmpty(dbFilterStr)
|| StringUtils.containsIgnoreCase(identifier.getDatabase(), dbFilterStr))
&& (StringUtils.isEmpty(tableFilterStr)
|| StringUtils.containsIgnoreCase(identifier.getTableName(), tableFilterStr))) {
tableRuntimes.add(tableRuntime);
}
}
tableRuntimes.sort(
(o1, o2) -> {
// first we compare the status , and then we compare the start time when status are equal;
int statDiff = o1.getOptimizingStatus().compareTo(o2.getOptimizingStatus());
// status order is asc, startTime order is desc
if (statDiff == 0) {
long timeDiff = o1.getCurrentStatusStartTime() - o2.getCurrentStatusStartTime();
return timeDiff >= 0 ? (timeDiff == 0 ? 0 : -1) : 1;
} else {
return statDiff;
}
});
String optimizerGroupUsedInDbFilter = ALL_GROUP.equals(optimizerGroup) ? null : optimizerGroup;
// get all info from underlying table table_runtime
List<TableRuntimeMeta> tableRuntimeBeans =
tableService.getTableRuntimes(
optimizerGroupUsedInDbFilter, dbFilterStr, tableFilterStr, pageSize, offset);

List<TableRuntime> tableRuntimes =
tableRuntimeBeans.stream()
.map(meta -> tableService.getRuntime(meta.getTableId()))
.collect(Collectors.toList());

PageResult<TableOptimizingInfo> amsPageResult =
PageResult.of(tableRuntimes, offset, pageSize, OptimizingUtil::buildTableOptimizeInfo);
ctx.json(OkResponse.of(amsPageResult));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void getTableDetail(Context ctx) {
tableService.getServerTableIdentifier(
TableIdentifier.of(catalog, database, tableName).buildTableIdentifier()));
if (serverTableIdentifier.isPresent()) {
TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get());
TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get().getId());
tableSummary.setOptimizingStatus(tableRuntime.getOptimizingStatus().name());
OptimizingEvaluator.PendingInput tableRuntimeSummary = tableRuntime.getTableSummary();
if (tableRuntimeSummary != null) {
Expand Down Expand Up @@ -654,7 +654,9 @@ public void cancelOptimizingProcess(Context ctx) {
tableService.getServerTableIdentifier(
TableIdentifier.of(catalog, db, table).buildTableIdentifier());
TableRuntime tableRuntime =
serverTableIdentifier != null ? tableService.getRuntime(serverTableIdentifier) : null;
serverTableIdentifier != null
? tableService.getRuntime(serverTableIdentifier.getId())
: null;

Preconditions.checkArgument(
tableRuntime != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down Expand Up @@ -93,7 +92,7 @@ public OptimizingQueue(
ResourceGroup optimizerGroup,
QuotaProvider quotaProvider,
Executor planExecutor,
List<TableRuntimeMeta> tableRuntimeMetaList,
List<TableRuntime> tableRuntimeList,
int maxPlanningParallelism) {
Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null");
this.planExecutor = planExecutor;
Expand All @@ -106,14 +105,12 @@ public OptimizingQueue(
new OptimizerGroupMetrics(
optimizerGroup.getName(), MetricManager.getInstance().getGlobalRegistry(), this);
this.metrics.register();
tableRuntimeMetaList.forEach(this::initTableRuntime);
tableRuntimeList.forEach(this::initTableRuntime);
}

private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime();
if (tableRuntime.getOptimizingStatus().isProcessing()
&& tableRuntimeMeta.getOptimizingProcessId() != 0) {
tableRuntime.recover(new TableOptimizingProcess(tableRuntimeMeta));
private void initTableRuntime(TableRuntime tableRuntime) {
if (tableRuntime.getOptimizingStatus().isProcessing() && tableRuntime.getProcessId() != 0) {
tableRuntime.recover(new TableOptimizingProcess(tableRuntime));
}

if (tableRuntime.isOptimizingEnabled()) {
Expand All @@ -122,7 +119,7 @@ private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
if (!tableRuntime.getOptimizingStatus().isProcessing()) {
scheduler.addTable(tableRuntime);
} else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) {
tableQueue.offer(new TableOptimizingProcess(tableRuntimeMeta));
tableQueue.offer(new TableOptimizingProcess(tableRuntime));
}
} else {
OptimizingProcess process = tableRuntime.getOptimizingProcess();
Expand Down Expand Up @@ -235,12 +232,20 @@ private void triggerAsyncPlanning(
planningTables.remove(tableRuntime.getTableIdentifier());
if (process != null) {
tableQueue.offer(process);
String skipIds =
skipTables.stream()
.map(ServerTableIdentifier::getId)
.sorted()
.map(item -> item + "")
.collect(Collectors.joining(","));
LOG.info(
"Completed planning on table {} with {} tasks with a total cost of {} ms, skipping tables {}",
"Completed planning on table {} with {} tasks with a total cost of {} ms, skipping {} tables,"
+ " id list:{}",
tableRuntime.getTableIdentifier(),
process.getTaskMap().size(),
currentTime - startTime,
skipTables);
skipTables.size(),
skipIds);
} else if (throwable == null) {
LOG.info(
"Skip planning table {} with a total cost of {} ms.",
Expand Down Expand Up @@ -379,21 +384,21 @@ public TableOptimizingProcess(OptimizingPlanner planner) {
beginAndPersistProcess();
}

public TableOptimizingProcess(TableRuntimeMeta tableRuntimeMeta) {
processId = tableRuntimeMeta.getOptimizingProcessId();
tableRuntime = tableRuntimeMeta.getTableRuntime();
optimizingType = tableRuntimeMeta.getOptimizingType();
targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId();
targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId();
planTime = tableRuntimeMeta.getPlanTime();
if (tableRuntimeMeta.getFromSequence() != null) {
fromSequence = tableRuntimeMeta.getFromSequence();
public TableOptimizingProcess(TableRuntime tableRuntime) {
processId = tableRuntime.getProcessId();
this.tableRuntime = tableRuntime;
optimizingType = tableRuntime.getOptimizingType();
targetSnapshotId = tableRuntime.getTargetSnapshotId();
targetChangeSnapshotId = tableRuntime.getTargetChangeSnapshotId();
planTime = tableRuntime.getLastPlanTime();
if (tableRuntime.getFromSequence() != null) {
fromSequence = tableRuntime.getFromSequence();
}
if (tableRuntimeMeta.getToSequence() != null) {
toSequence = tableRuntimeMeta.getToSequence();
if (tableRuntime.getToSequence() != null) {
toSequence = tableRuntime.getToSequence();
}
if (this.status != OptimizingProcess.Status.CLOSED) {
tableRuntimeMeta.getTableRuntime().recover(this);
tableRuntime.recover(this);
}
loadTaskRuntimes(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@
package org.apache.amoro.server.optimizing;

public enum OptimizingStatus {
FULL_OPTIMIZING("full", true),
MAJOR_OPTIMIZING("major", true),
MINOR_OPTIMIZING("minor", true),
COMMITTING("committing", true),
PLANNING("planning", false),
PENDING("pending", false),
IDLE("idle", false);
FULL_OPTIMIZING("full", true, 100),
MAJOR_OPTIMIZING("major", true, 200),
MINOR_OPTIMIZING("minor", true, 300),
COMMITTING("committing", true, 400),
PLANNING("planning", false, 500),
PENDING("pending", false, 600),
IDLE("idle", false, 700);
private final String displayValue;

private final boolean isProcessing;

OptimizingStatus(String displayValue, boolean isProcessing) {
private final int code;

OptimizingStatus(String displayValue, boolean isProcessing, int code) {
this.displayValue = displayValue;
this.isProcessing = isProcessing;
this.code = code;
}

public boolean isProcessing() {
Expand All @@ -42,4 +45,17 @@ public boolean isProcessing() {
public String displayValue() {
return displayValue;
}

public int getCode() {
return code;
}

public static OptimizingStatus ofCode(int code) {
for (OptimizingStatus status : values()) {
if (status.getCode() == code) {
return status;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public List<TaskDescriptor> planTasks() {
return cacheAndReturnTasks(Collections.emptyList());
}

List<PartitionEvaluator> evaluators = new ArrayList<>(partitionPlanMap.values());
List<PartitionEvaluator> evaluators = new ArrayList<>(needOptimizingPlanMap.values());
// prioritize partitions with high cost to avoid starvation
evaluators.sort(Comparator.comparing(PartitionEvaluator::getWeight, Comparator.reverseOrder()));

Expand Down
Loading

0 comments on commit 0e86f3d

Please sign in to comment.