Skip to content

Commit

Permalink
[AMORO-3195] OptimizerController decoupling to OptimizerGroupController
Browse files Browse the repository at this point in the history
  • Loading branch information
mansonliwh committed Sep 24, 2024
1 parent 3f627ec commit 0d57b60
Show file tree
Hide file tree
Showing 8 changed files with 473 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.amoro.server.dashboard.controller.HealthCheckController;
import org.apache.amoro.server.dashboard.controller.LoginController;
import org.apache.amoro.server.dashboard.controller.OptimizerController;
import org.apache.amoro.server.dashboard.controller.OptimizerGroupController;
import org.apache.amoro.server.dashboard.controller.OverviewController;
import org.apache.amoro.server.dashboard.controller.PlatformFileInfoController;
import org.apache.amoro.server.dashboard.controller.SettingController;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class DashboardServer {
private final CatalogController catalogController;
private final HealthCheckController healthCheckController;
private final LoginController loginController;
private final OptimizerGroupController optimizerGroupController;
private final OptimizerController optimizerController;
private final PlatformFileInfoController platformFileInfoController;
private final SettingController settingController;
Expand All @@ -98,7 +100,8 @@ public DashboardServer(
this.catalogController = new CatalogController(tableService, platformFileManager);
this.healthCheckController = new HealthCheckController();
this.loginController = new LoginController(serviceConfig);
this.optimizerController = new OptimizerController(tableService, optimizerManager);
this.optimizerGroupController = new OptimizerGroupController(tableService, optimizerManager);
this.optimizerController = new OptimizerController(optimizerManager);
this.platformFileInfoController = new PlatformFileInfoController(platformFileManager);
this.settingController = new SettingController(serviceConfig, optimizerManager);
ServerTableDescriptor tableDescriptor = new ServerTableDescriptor(tableService, serviceConfig);
Expand Down Expand Up @@ -274,26 +277,29 @@ private EndpointGroup apiGroup() {
() -> {
get(
"/optimizerGroups/{optimizerGroup}/tables",
optimizerController::getOptimizerTables);
get("/optimizerGroups/{optimizerGroup}/optimizers", optimizerController::getOptimizers);
get("/optimizerGroups", optimizerController::getOptimizerGroups);
optimizerGroupController::getOptimizerTables);
get(
"/optimizerGroups/{optimizerGroup}/optimizers",
optimizerGroupController::getOptimizers);
get("/optimizerGroups", optimizerGroupController::getOptimizerGroups);
get(
"/optimizerGroups/{optimizerGroup}/info",
optimizerController::getOptimizerGroupInfo);
delete(
"/optimizerGroups/{optimizerGroup}/optimizers/{jobId}",
optimizerController::releaseOptimizer);
optimizerGroupController::getOptimizerGroupInfo);
post(
"/optimizerGroups/{optimizerGroup}/optimizers",
optimizerController::scaleOutOptimizer);
get("/resourceGroups", optimizerController::getResourceGroup);
post("/resourceGroups", optimizerController::createResourceGroup);
put("/resourceGroups", optimizerController::updateResourceGroup);
delete("/resourceGroups/{resourceGroupName}", optimizerController::deleteResourceGroup);
optimizerGroupController::scaleOutOptimizer);
post("/optimizers", optimizerController::createOptimizer);
delete("/optimizers/{jobId}", optimizerController::releaseOptimizer);
get("/resourceGroups", optimizerGroupController::getResourceGroup);
post("/resourceGroups", optimizerGroupController::createResourceGroup);
put("/resourceGroups", optimizerGroupController::updateResourceGroup);
delete(
"/resourceGroups/{resourceGroupName}",
optimizerGroupController::deleteResourceGroup);
get(
"/resourceGroups/{resourceGroupName}/delete/check",
optimizerController::deleteCheckResourceGroup);
get("/containers/get", optimizerController::getContainers);
optimizerGroupController::deleteCheckResourceGroup);
get("/containers/get", optimizerGroupController::getContainers);
});

// console apis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,137 +23,25 @@
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.resource.ResourceType;
import org.apache.amoro.server.DefaultOptimizingService;
import org.apache.amoro.server.dashboard.model.OptimizerInstanceInfo;
import org.apache.amoro.server.dashboard.model.OptimizerResourceInfo;
import org.apache.amoro.server.dashboard.model.TableOptimizingInfo;
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.dashboard.response.PageResult;
import org.apache.amoro.server.dashboard.utils.OptimizingUtil;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;

import javax.ws.rs.BadRequestException;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/** The controller that handles optimizer requests. */
public class OptimizerController {
private static final String ALL_GROUP = "all";
private final TableService tableService;

private final DefaultOptimizingService optimizerManager;

public OptimizerController(TableService tableService, DefaultOptimizingService optimizerManager) {
this.tableService = tableService;
public OptimizerController(DefaultOptimizingService optimizerManager) {
this.optimizerManager = optimizerManager;
}

/** Get optimize tables. * @return List of {@link TableOptimizingInfo} */
public void getOptimizerTables(Context ctx) {
String optimizerGroup = ctx.pathParam("optimizerGroup");
String dbFilterStr = ctx.queryParam("dbSearchInput");
String tableFilterStr = ctx.queryParam("tableSearchInput");
Integer page = ctx.queryParamAsClass("page", Integer.class).getOrDefault(1);
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);
int offset = (page - 1) * pageSize;

String optimizerGroupUsedInDbFilter = ALL_GROUP.equals(optimizerGroup) ? null : optimizerGroup;
// get all info from underlying table table_runtime
List<TableRuntimeMeta> tableRuntimeBeans =
tableService.getTableRuntimes(
optimizerGroupUsedInDbFilter, dbFilterStr, tableFilterStr, pageSize, offset);

List<TableRuntime> tableRuntimes =
tableRuntimeBeans.stream()
.map(meta -> tableService.getRuntime(meta.getTableId()))
.collect(Collectors.toList());

PageResult<TableOptimizingInfo> amsPageResult =
PageResult.of(tableRuntimes, offset, pageSize, OptimizingUtil::buildTableOptimizeInfo);
ctx.json(OkResponse.of(amsPageResult));
}

/** get optimizers. */
public void getOptimizers(Context ctx) {
String optimizerGroup = ctx.pathParam("optimizerGroup");
Integer page = ctx.queryParamAsClass("page", Integer.class).getOrDefault(1);
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);

int offset = (page - 1) * pageSize;
List<OptimizerInstance> optimizers;
if (optimizerGroup.equals("all")) {
optimizers = optimizerManager.listOptimizers();
} else {
optimizers = optimizerManager.listOptimizers(optimizerGroup);
}
List<OptimizerInstance> optimizerList = new ArrayList<>(optimizers);
optimizerList.sort(Comparator.comparingLong(OptimizerInstance::getStartTime).reversed());
List<OptimizerInstanceInfo> result =
optimizerList.stream()
.map(
e ->
OptimizerInstanceInfo.builder()
.token(e.getToken())
.startTime(e.getStartTime())
.touchTime(e.getTouchTime())
.jobId(e.getResourceId())
.groupName(e.getGroupName())
.coreNumber(e.getThreadCount())
.memory(e.getMemoryMb())
.jobStatus("RUNNING")
.container(e.getContainerName())
.build())
.collect(Collectors.toList());

PageResult<OptimizerInstanceInfo> amsPageResult = PageResult.of(result, offset, pageSize);
ctx.json(OkResponse.of(amsPageResult));
}

/** get optimizerGroup: optimizerGroupId, optimizerGroupName url = /optimizerGroups. */
public void getOptimizerGroups(Context ctx) {
List<Map<String, String>> result =
optimizerManager.listResourceGroups().stream()
.filter(
resourceGroup ->
!ResourceContainers.EXTERNAL_CONTAINER_NAME.equals(
resourceGroup.getContainer()))
.map(
e -> {
Map<String, String> mapObj = new HashMap<>();
mapObj.put("optimizerGroupName", e.getName());
return mapObj;
})
.collect(Collectors.toList());
ctx.json(OkResponse.of(result));
}

/** get optimizer info: occupationCore, occupationMemory */
public void getOptimizerGroupInfo(Context ctx) {
String optimizerGroup = ctx.pathParam("optimizerGroup");
List<OptimizerInstance> optimizers;
if (optimizerGroup.equals("all")) {
optimizers = optimizerManager.listOptimizers();
} else {
optimizers = optimizerManager.listOptimizers(optimizerGroup);
}
OptimizerResourceInfo optimizerResourceInfo = new OptimizerResourceInfo();
optimizers.forEach(
e -> {
optimizerResourceInfo.addOccupationCore(e.getThreadCount());
optimizerResourceInfo.addOccupationMemory(e.getMemoryMb());
});
ctx.json(OkResponse.of(optimizerResourceInfo));
}

/**
* release optimizer.
*
Expand All @@ -180,12 +68,11 @@ public void releaseOptimizer(Context ctx) {
ctx.json(OkResponse.of("Success to release optimizer"));
}

/** scale out optimizers, url:/optimizerGroups/{optimizerGroup}/optimizers. */
public void scaleOutOptimizer(Context ctx) {
String optimizerGroup = ctx.pathParam("optimizerGroup");
Map<String, Integer> map = ctx.bodyAsClass(Map.class);
int parallelism = map.get("parallelism");

/** scale out optimizers, url:/optimizers. */
public void createOptimizer(Context ctx) {
Map<String, Object> map = ctx.bodyAsClass(Map.class);
int parallelism = Integer.parseInt(map.get("parallelism").toString());
String optimizerGroup = map.get("optimizerGroup").toString();
ResourceGroup resourceGroup = optimizerManager.getResourceGroup(optimizerGroup);
Resource resource =
new Resource.Builder(
Expand All @@ -195,75 +82,7 @@ public void scaleOutOptimizer(Context ctx) {
.build();
ResourceContainers.get(resource.getContainerName()).requestResource(resource);
optimizerManager.createResource(resource);
ctx.json(OkResponse.of("success to scaleOut optimizer"));
}

/** get {@link List<OptimizerResourceInfo>} url = /optimize/resourceGroups */
public void getResourceGroup(Context ctx) {
List<OptimizerResourceInfo> result =
optimizerManager.listResourceGroups().stream()
.map(
group -> {
List<OptimizerInstance> optimizers =
optimizerManager.listOptimizers(group.getName());
OptimizerResourceInfo optimizerResourceInfo = new OptimizerResourceInfo();
optimizerResourceInfo.setResourceGroup(
optimizerManager.getResourceGroup(group.getName()));
optimizers.forEach(
optimizer -> {
optimizerResourceInfo.addOccupationCore(optimizer.getThreadCount());
optimizerResourceInfo.addOccupationMemory(optimizer.getMemoryMb());
});
return optimizerResourceInfo;
})
.collect(Collectors.toList());
ctx.json(OkResponse.of(result));
}

/**
* create optimizeGroup: name, container, schedulePolicy, properties url =
* /optimize/resourceGroups/create
*/
public void createResourceGroup(Context ctx) {
Map<String, Object> map = ctx.bodyAsClass(Map.class);
String name = (String) map.get("name");
String container = (String) map.get("container");
Map<String, String> properties = (Map) map.get("properties");
if (optimizerManager.getResourceGroup(name) != null) {
throw new BadRequestException(String.format("Optimizer group:%s already existed.", name));
}
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
optimizerManager.createResourceGroup(builder.build());
ctx.json(OkResponse.of("The optimizer group has been successfully created."));
}

/**
* update optimizeGroup: name, container, schedulePolicy, properties url =
* /optimize/resourceGroups/update
*/
public void updateResourceGroup(Context ctx) {
Map<String, Object> map = ctx.bodyAsClass(Map.class);
String name = (String) map.get("name");
String container = (String) map.get("container");
Map<String, String> properties = (Map) map.get("properties");
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
optimizerManager.updateResourceGroup(builder.build());
ctx.json(OkResponse.of("The optimizer group has been successfully updated."));
}

/** delete optimizeGroup url = /optimize/resourceGroups/{resourceGroupName} */
public void deleteResourceGroup(Context ctx) {
String name = ctx.pathParam("resourceGroupName");
optimizerManager.deleteResourceGroup(name);
ctx.json(OkResponse.of("The optimizer group has been successfully deleted."));
}

/** check if optimizerGroup can be deleted url = /optimize/resourceGroups/delete/check */
public void deleteCheckResourceGroup(Context ctx) {
String name = ctx.pathParam("resourceGroupName");
ctx.json(OkResponse.of(optimizerManager.canDeleteResourceGroup(name)));
ctx.json(OkResponse.of("success to create optimizer"));
}

/** check if optimizerGroup can be deleted url = /optimize/containers/get */
Expand Down
Loading

0 comments on commit 0d57b60

Please sign in to comment.