Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
klion26 committed Jul 26, 2024
1 parent f5d2e85 commit c0045d9
Show file tree
Hide file tree
Showing 14 changed files with 224 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -121,17 +120,17 @@ public RuntimeHandlerChain getTableRuntimeHandler() {
return tableHandlerChain;
}

private void loadOptimizingQueues(List<TableRuntimeMeta> tableRuntimeMetaList) {
private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
List<ResourceGroup> optimizerGroups =
getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
List<OptimizerInstance> optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
Map<String, List<TableRuntimeMeta>> groupToTableRuntimes =
Map<String, List<TableRuntime>> groupToTableRuntimes =
tableRuntimeMetaList.stream()
.collect(Collectors.groupingBy(TableRuntimeMeta::getOptimizerGroup));
.collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup));
optimizerGroups.forEach(
group -> {
String groupName = group.getName();
List<TableRuntimeMeta> tableRuntimeMetas = groupToTableRuntimes.remove(groupName);
List<TableRuntime> tableRuntimeMetas = groupToTableRuntimes.remove(groupName);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
tableService,
Expand Down Expand Up @@ -455,7 +454,7 @@ public void handleTableRemoved(TableRuntime tableRuntime) {
}

@Override
protected void initHandler(List<TableRuntimeMeta> tableRuntimeMetaList) {
protected void initHandler(List<TableRuntime> tableRuntimeMetaList) {
LOG.info("OptimizerManagementService begin initializing");
loadOptimizingQueues(tableRuntimeMetaList);
optimizerKeeper.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.amoro.server.dashboard.controller;

import io.javalin.http.Context;
import org.apache.amoro.TableFormat;
import org.apache.amoro.api.ServerTableIdentifier;
import org.apache.amoro.api.resource.Resource;
import org.apache.amoro.api.resource.ResourceGroup;
Expand All @@ -30,6 +31,7 @@
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.dashboard.response.PageResult;
import org.apache.amoro.server.dashboard.utils.OptimizingUtil;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.ResourceContainers;
Expand Down Expand Up @@ -67,8 +69,22 @@ public void getOptimizerTables(Context ctx) {
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);
int offset = (page - 1) * pageSize;

String optimizerGroupUsedInDbFilter = "all".equals(optimizerGroup) ? null : optimizerGroup;
// get all info from underlying table table_runtime
List<TableRuntimeMeta> tableRuntimeBeans =
tableService.getTableRuntimes(optimizerGroupUsedInDbFilter, pageSize, offset);

List<TableRuntime> tableRuntimes = new ArrayList<>();
List<ServerTableIdentifier> tables = tableService.listManagedTables();
List<ServerTableIdentifier> tables = new ArrayList<>();
for (TableRuntimeMeta bean : tableRuntimeBeans) {
tables.add(
ServerTableIdentifier.of(
bean.getTableId(),
bean.getCatalogName(),
bean.getDbName(),
bean.getTableName(),
TableFormat.ICEBERG)); // current hard code for this.
}
for (ServerTableIdentifier identifier : tables) {
TableRuntime tableRuntime = tableService.getRuntime(identifier);
if (tableRuntime == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down Expand Up @@ -93,7 +92,7 @@ public OptimizingQueue(
ResourceGroup optimizerGroup,
QuotaProvider quotaProvider,
Executor planExecutor,
List<TableRuntimeMeta> tableRuntimeMetaList,
List<TableRuntime> tableRuntimeMetaList,
int maxPlanningParallelism) {
Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null");
this.planExecutor = planExecutor;
Expand All @@ -109,11 +108,11 @@ public OptimizingQueue(
tableRuntimeMetaList.forEach(this::initTableRuntime);
}

private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime();
private void initTableRuntime(TableRuntime tableRuntime) {
// TableRuntime tableRuntime = tableRuntimeMeta.getTableRuntime();
if (tableRuntime.getOptimizingStatus().isProcessing()
&& tableRuntimeMeta.getOptimizingProcessId() != 0) {
tableRuntime.recover(new TableOptimizingProcess(tableRuntimeMeta));
&& tableRuntime.getOptimizingProcess().getProcessId() != 0) {
tableRuntime.recover(new TableOptimizingProcess(tableRuntime));
}

if (tableRuntime.isOptimizingEnabled()) {
Expand All @@ -122,7 +121,7 @@ private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
if (!tableRuntime.getOptimizingStatus().isProcessing()) {
scheduler.addTable(tableRuntime);
} else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) {
tableQueue.offer(new TableOptimizingProcess(tableRuntimeMeta));
tableQueue.offer(new TableOptimizingProcess(tableRuntime));
}
} else {
OptimizingProcess process = tableRuntime.getOptimizingProcess();
Expand Down Expand Up @@ -376,21 +375,21 @@ public TableOptimizingProcess(OptimizingPlanner planner) {
beginAndPersistProcess();
}

public TableOptimizingProcess(TableRuntimeMeta tableRuntimeMeta) {
processId = tableRuntimeMeta.getOptimizingProcessId();
tableRuntime = tableRuntimeMeta.getTableRuntime();
optimizingType = tableRuntimeMeta.getOptimizingType();
targetSnapshotId = tableRuntimeMeta.getTargetSnapshotId();
targetChangeSnapshotId = tableRuntimeMeta.getTargetChangeSnapshotId();
planTime = tableRuntimeMeta.getPlanTime();
if (tableRuntimeMeta.getFromSequence() != null) {
fromSequence = tableRuntimeMeta.getFromSequence();
public TableOptimizingProcess(TableRuntime tableRuntime) {
processId = tableRuntime.getOptimizingProcess().getProcessId();
this.tableRuntime = tableRuntime;
optimizingType = tableRuntime.getOptimizingProcess().getOptimizingType();
targetSnapshotId = tableRuntime.getTargetSnapshotId();
targetChangeSnapshotId = tableRuntime.getTargetChangeSnapshotId();
planTime = tableRuntime.getLastPlanTime();
if (tableRuntime.getFromSequence() != null) {
fromSequence = tableRuntime.getFromSequence();
}
if (tableRuntimeMeta.getToSequence() != null) {
toSequence = tableRuntimeMeta.getToSequence();
if (tableRuntime.getToSequence() != null) {
toSequence = tableRuntime.getToSequence();
}
if (this.status != OptimizingProcess.Status.CLOSED) {
tableRuntimeMeta.getTableRuntime().recover(this);
tableRuntime.recover(this);
}
loadTaskRuntimes(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.amoro.server.table;
package org.apache.amoro.server.persistence;

import org.apache.amoro.TableFormat;
import org.apache.amoro.api.config.TableConfiguration;
Expand All @@ -27,6 +27,7 @@

import java.util.Map;

/** The bean class for table used to when transfer data from/to database. */
public class TableRuntimeMeta {
private long tableId;
private String catalogName;
Expand Down Expand Up @@ -57,24 +58,8 @@ public class TableRuntimeMeta {
private Map<String, Long> fromSequence;
private Map<String, Long> toSequence;

private TableRuntime tableRuntime;

public TableRuntimeMeta() {}

public TableRuntime constructTableRuntime(TableManager initializer) {
if (tableRuntime == null) {
tableRuntime = new TableRuntime(this, initializer);
}
return tableRuntime;
}

public TableRuntime getTableRuntime() {
if (tableRuntime == null) {
throw new IllegalStateException("TableRuntime is not constructed yet.");
}
return tableRuntime;
}

public long getTargetSnapshotId() {
return targetSnapshotId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.amoro.server.persistence.mapper;

import org.apache.amoro.api.ServerTableIdentifier;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.persistence.converter.JsonObjectConverter;
import org.apache.amoro.server.persistence.converter.Long2TsConverter;
import org.apache.amoro.server.persistence.converter.Map2StringConverter;
import org.apache.amoro.server.persistence.converter.MapLong2StringConverter;
import org.apache.amoro.server.table.TableMetadata;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Options;
Expand Down Expand Up @@ -424,4 +424,71 @@ List<ServerTableIdentifier> selectTableIdentifiersByCatalog(
typeHandler = MapLong2StringConverter.class)
})
List<TableRuntimeMeta> selectTableRuntimeMetas();

@Select(
"<script>"
+ "<bind name=\"isMySQL\" value=\"_databaseId == 'MySQL'\" />"
+ "<bind name=\"isPostgreSQL\" value=\"_databaseId == 'PostgreSQL'\" />"
+ "<bind name=\"isDerby\" value=\"_databaseId == 'Derby'\" />"
+ "SELECT table_id, catalog_name, db_name, table_name, current_snapshot_id, "
+ "current_change_snapshotId, last_optimized_snapshotId, last_optimized_change_snapshotId, "
+ "last_major_optimizing_time, last_minor_optimizing_time, last_full_optimizing_time, optimizing_status, "
+ "optimizing_status_start_time, optimizing_process_id, "
+ "optimizer_group, table_config, pending_input FROM table_runtime "
+ "WHERE 1=1 "
+ "<if test='optimizerGroup != null'> AND optimizer_group = #{optimizerGroup} </if> "
+ "ORDER BY CASE optimizing_status "
+ "WHEN 'MAJOR_OPTIMIZING' THEN 1 "
+ "WHEN 'MINOR_OPTIMIZING' THEN 2 "
+ "WHEN 'COMMITTING' THEN 3 "
+ "WHEN 'PLANNING' THEN 4 "
+ "WHEN 'PENDING' THEN 5 "
+ "ELSE 6 END, "
+ "optimizing_status_start_time DESC "
+ "<if test='isMySQL or isPostgreSQL'> LIMIT #{limitCount} OFFSET #{offsetNum} </if>"
+ "<if test='isDerby'> OFFSET #{offsetNum} FETCH NEXT #{limitCount} ROWS ONLY </if>"
+ "</script>")
@Results({
@Result(property = "tableId", column = "table_id"),
@Result(property = "catalogName", column = "catalog_name"),
@Result(property = "dbName", column = "db_name"),
@Result(property = "tableName", column = "table_name"),
@Result(property = "currentSnapshotId", column = "current_snapshot_id"),
@Result(property = "currentChangeSnapshotId", column = "current_change_snapshotId"),
@Result(property = "lastOptimizedSnapshotId", column = "last_optimized_snapshotId"),
@Result(
property = "lastOptimizedChangeSnapshotId",
column = "last_optimized_change_snapshotId"),
@Result(
property = "lastMajorOptimizingTime",
column = "last_major_optimizing_time",
typeHandler = Long2TsConverter.class),
@Result(
property = "lastMinorOptimizingTime",
column = "last_minor_optimizing_time",
typeHandler = Long2TsConverter.class),
@Result(
property = "lastFullOptimizingTime",
column = "last_full_optimizing_time",
typeHandler = Long2TsConverter.class),
@Result(property = "tableStatus", column = "optimizing_status"),
@Result(
property = "currentStatusStartTime",
column = "optimizing_status_start_time",
typeHandler = Long2TsConverter.class),
@Result(property = "optimizingProcessId", column = "optimizing_process_id"),
@Result(property = "optimizerGroup", column = "optimizer_group"),
@Result(
property = "pendingInput",
column = "pending_input",
typeHandler = JsonObjectConverter.class),
@Result(
property = "tableConfig",
column = "table_config",
typeHandler = JsonObjectConverter.class),
})
List<TableRuntimeMeta> selectTableRuntimesForOptimizerGroup(
@Param("optimizerGroup") String optimizerGroup,
@Param("limitCount") int limitCount,
@Param("offsetNum") int offset);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.persistence.StatedPersistentBase;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import org.apache.amoro.server.table.blocker.TableBlocker;
Expand All @@ -55,6 +56,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -138,12 +140,6 @@ public ServerCatalog getServerCatalog(String catalogName) {
.orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName));
}

@Override
public InternalCatalog getInternalCatalog(String catalogName) {
return Optional.ofNullable(internalCatalogMap.get(catalogName))
.orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName));
}

@Override
public void createCatalog(CatalogMeta catalogMeta) {
checkStarted();
Expand Down Expand Up @@ -281,6 +277,19 @@ public List<Blocker> getBlockers(TableIdentifier tableIdentifier) {
.collect(Collectors.toList());
}

@Override
public List<TableRuntimeMeta> getTableRuntimes(String optimizerGroup, int limit, int offset) {
checkStarted();
return getAs(
TableMetaMapper.class,
mapper -> mapper.selectTableRuntimesForOptimizerGroup(optimizerGroup, limit, offset));
}

public InternalCatalog getInternalCatalog(String catalogName) {
return Optional.ofNullable(internalCatalogMap.get(catalogName))
.orElseThrow(() -> new ObjectNotExistsException("Catalog " + catalogName));
}

@Override
public void addHandlerChain(RuntimeHandlerChain handler) {
checkNotStarted();
Expand Down Expand Up @@ -313,15 +322,19 @@ public void initialize() {

List<TableRuntimeMeta> tableRuntimeMetaList =
getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas);
List<TableRuntime> tableRuntimes = new ArrayList<>(tableRuntimeMetaList.size());
tableRuntimeMetaList.forEach(
tableRuntimeMeta -> {
TableRuntime tableRuntime = tableRuntimeMeta.constructTableRuntime(this);
TableRuntime tableRuntime =
new TableRuntime(
tableRuntimeMeta, this); // tableRuntimeMeta.constructTableRuntime(this);
tableRuntimeMap.put(tableRuntime.getTableIdentifier(), tableRuntime);
tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry());
tableRuntimes.add(tableRuntime);
});

if (headHandler != null) {
headHandler.initialize(tableRuntimeMetaList);
headHandler.initialize(tableRuntimes);
}
if (tableExplorerExecutors == null) {
int threadCount =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,15 @@ protected void appendNext(RuntimeHandlerChain handler) {
}
}

public final void initialize(List<TableRuntimeMeta> tableRuntimeMetaList) {
List<TableRuntimeMeta> supportedtableRuntimeMetaList =
tableRuntimeMetaList.stream()
.filter(
tableRuntimeMeta -> formatSupported(tableRuntimeMeta.getTableRuntime().getFormat()))
public final void initialize(List<TableRuntime> tableRuntimes) {
List<TableRuntime> supportedtableRuntimeMetaList =
tableRuntimes.stream()
.filter(runtime -> formatSupported(runtime.getFormat()))
.collect(Collectors.toList());
initHandler(supportedtableRuntimeMetaList);
initialized = true;
if (next != null) {
next.initialize(tableRuntimeMetaList);
next.initialize(tableRuntimes);
}
}

Expand Down Expand Up @@ -147,7 +146,7 @@ protected abstract void handleConfigChanged(

protected abstract void handleTableRemoved(TableRuntime tableRuntime);

protected abstract void initHandler(List<TableRuntimeMeta> tableRuntimeMetaList);
protected abstract void initHandler(List<TableRuntime> tableRuntimeMetaList);

protected abstract void doDispose();
}
Loading

0 comments on commit c0045d9

Please sign in to comment.