Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-3195]: OptimizerController decoupling to OptimizerGroupController #3216

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.amoro.server.dashboard.controller.HealthCheckController;
import org.apache.amoro.server.dashboard.controller.LoginController;
import org.apache.amoro.server.dashboard.controller.OptimizerController;
import org.apache.amoro.server.dashboard.controller.OptimizerGroupController;
import org.apache.amoro.server.dashboard.controller.OverviewController;
import org.apache.amoro.server.dashboard.controller.PlatformFileInfoController;
import org.apache.amoro.server.dashboard.controller.SettingController;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class DashboardServer {
private final CatalogController catalogController;
private final HealthCheckController healthCheckController;
private final LoginController loginController;
private final OptimizerGroupController optimizerGroupController;
private final OptimizerController optimizerController;
private final PlatformFileInfoController platformFileInfoController;
private final SettingController settingController;
Expand All @@ -98,7 +100,8 @@ public DashboardServer(
this.catalogController = new CatalogController(tableService, platformFileManager);
this.healthCheckController = new HealthCheckController();
this.loginController = new LoginController(serviceConfig);
this.optimizerController = new OptimizerController(tableService, optimizerManager);
this.optimizerGroupController = new OptimizerGroupController(tableService, optimizerManager);
this.optimizerController = new OptimizerController(optimizerManager);
this.platformFileInfoController = new PlatformFileInfoController(platformFileManager);
this.settingController = new SettingController(serviceConfig, optimizerManager);
ServerTableDescriptor tableDescriptor = new ServerTableDescriptor(tableService, serviceConfig);
Expand Down Expand Up @@ -274,26 +277,29 @@ private EndpointGroup apiGroup() {
() -> {
get(
"/optimizerGroups/{optimizerGroup}/tables",
optimizerController::getOptimizerTables);
get("/optimizerGroups/{optimizerGroup}/optimizers", optimizerController::getOptimizers);
get("/optimizerGroups", optimizerController::getOptimizerGroups);
optimizerGroupController::getOptimizerTables);
get(
"/optimizerGroups/{optimizerGroup}/optimizers",
optimizerGroupController::getOptimizers);
get("/optimizerGroups", optimizerGroupController::getOptimizerGroups);
get(
"/optimizerGroups/{optimizerGroup}/info",
optimizerController::getOptimizerGroupInfo);
delete(
"/optimizerGroups/{optimizerGroup}/optimizers/{jobId}",
optimizerController::releaseOptimizer);
optimizerGroupController::getOptimizerGroupInfo);
post(
"/optimizerGroups/{optimizerGroup}/optimizers",
optimizerController::scaleOutOptimizer);
get("/resourceGroups", optimizerController::getResourceGroup);
post("/resourceGroups", optimizerController::createResourceGroup);
put("/resourceGroups", optimizerController::updateResourceGroup);
delete("/resourceGroups/{resourceGroupName}", optimizerController::deleteResourceGroup);
optimizerGroupController::scaleOutOptimizer);
post("/optimizers", optimizerController::createOptimizer);
delete("/optimizers/{jobId}", optimizerController::releaseOptimizer);
get("/resourceGroups", optimizerGroupController::getResourceGroup);
post("/resourceGroups", optimizerGroupController::createResourceGroup);
put("/resourceGroups", optimizerGroupController::updateResourceGroup);
delete(
"/resourceGroups/{resourceGroupName}",
optimizerGroupController::deleteResourceGroup);
get(
"/resourceGroups/{resourceGroupName}/delete/check",
optimizerController::deleteCheckResourceGroup);
get("/containers/get", optimizerController::getContainers);
optimizerGroupController::deleteCheckResourceGroup);
get("/containers/get", optimizerGroupController::getContainers);
});

// console apis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,137 +23,25 @@
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.resource.ResourceType;
import org.apache.amoro.server.DefaultOptimizingService;
import org.apache.amoro.server.dashboard.model.OptimizerInstanceInfo;
import org.apache.amoro.server.dashboard.model.OptimizerResourceInfo;
import org.apache.amoro.server.dashboard.model.TableOptimizingInfo;
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.dashboard.response.PageResult;
import org.apache.amoro.server.dashboard.utils.OptimizingUtil;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;

import javax.ws.rs.BadRequestException;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/** The controller that handles optimizer requests. */
public class OptimizerController {
private static final String ALL_GROUP = "all";
private final TableService tableService;

private final DefaultOptimizingService optimizerManager;

public OptimizerController(TableService tableService, DefaultOptimizingService optimizerManager) {
this.tableService = tableService;
public OptimizerController(DefaultOptimizingService optimizerManager) {
this.optimizerManager = optimizerManager;
}

/** Get optimize tables. * @return List of {@link TableOptimizingInfo} */
public void getOptimizerTables(Context ctx) {
String optimizerGroup = ctx.pathParam("optimizerGroup");
String dbFilterStr = ctx.queryParam("dbSearchInput");
String tableFilterStr = ctx.queryParam("tableSearchInput");
Integer page = ctx.queryParamAsClass("page", Integer.class).getOrDefault(1);
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);
int offset = (page - 1) * pageSize;

String optimizerGroupUsedInDbFilter = ALL_GROUP.equals(optimizerGroup) ? null : optimizerGroup;
// get all info from underlying table table_runtime
List<TableRuntimeMeta> tableRuntimeBeans =
tableService.getTableRuntimes(
optimizerGroupUsedInDbFilter, dbFilterStr, tableFilterStr, pageSize, offset);

List<TableRuntime> tableRuntimes =
tableRuntimeBeans.stream()
.map(meta -> tableService.getRuntime(meta.getTableId()))
.collect(Collectors.toList());

PageResult<TableOptimizingInfo> amsPageResult =
PageResult.of(tableRuntimes, offset, pageSize, OptimizingUtil::buildTableOptimizeInfo);
ctx.json(OkResponse.of(amsPageResult));
}

/** get optimizers. */
public void getOptimizers(Context ctx) {
String optimizerGroup = ctx.pathParam("optimizerGroup");
Integer page = ctx.queryParamAsClass("page", Integer.class).getOrDefault(1);
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);

int offset = (page - 1) * pageSize;
List<OptimizerInstance> optimizers;
if (optimizerGroup.equals("all")) {
optimizers = optimizerManager.listOptimizers();
} else {
optimizers = optimizerManager.listOptimizers(optimizerGroup);
}
List<OptimizerInstance> optimizerList = new ArrayList<>(optimizers);
optimizerList.sort(Comparator.comparingLong(OptimizerInstance::getStartTime).reversed());
List<OptimizerInstanceInfo> result =
optimizerList.stream()
.map(
e ->
OptimizerInstanceInfo.builder()
.token(e.getToken())
.startTime(e.getStartTime())
.touchTime(e.getTouchTime())
.jobId(e.getResourceId())
.groupName(e.getGroupName())
.coreNumber(e.getThreadCount())
.memory(e.getMemoryMb())
.jobStatus("RUNNING")
.container(e.getContainerName())
.build())
.collect(Collectors.toList());

PageResult<OptimizerInstanceInfo> amsPageResult = PageResult.of(result, offset, pageSize);
ctx.json(OkResponse.of(amsPageResult));
}

/** get optimizerGroup: optimizerGroupId, optimizerGroupName url = /optimizerGroups. */
public void getOptimizerGroups(Context ctx) {
List<Map<String, String>> result =
optimizerManager.listResourceGroups().stream()
.filter(
resourceGroup ->
!ResourceContainers.EXTERNAL_CONTAINER_NAME.equals(
resourceGroup.getContainer()))
.map(
e -> {
Map<String, String> mapObj = new HashMap<>();
mapObj.put("optimizerGroupName", e.getName());
return mapObj;
})
.collect(Collectors.toList());
ctx.json(OkResponse.of(result));
}

/** get optimizer info: occupationCore, occupationMemory */
public void getOptimizerGroupInfo(Context ctx) {
String optimizerGroup = ctx.pathParam("optimizerGroup");
List<OptimizerInstance> optimizers;
if (optimizerGroup.equals("all")) {
optimizers = optimizerManager.listOptimizers();
} else {
optimizers = optimizerManager.listOptimizers(optimizerGroup);
}
OptimizerResourceInfo optimizerResourceInfo = new OptimizerResourceInfo();
optimizers.forEach(
e -> {
optimizerResourceInfo.addOccupationCore(e.getThreadCount());
optimizerResourceInfo.addOccupationMemory(e.getMemoryMb());
});
ctx.json(OkResponse.of(optimizerResourceInfo));
}

/**
* release optimizer.
*
Expand All @@ -180,12 +68,11 @@ public void releaseOptimizer(Context ctx) {
ctx.json(OkResponse.of("Success to release optimizer"));
}

/** scale out optimizers, url:/optimizerGroups/{optimizerGroup}/optimizers. */
public void scaleOutOptimizer(Context ctx) {
String optimizerGroup = ctx.pathParam("optimizerGroup");
Map<String, Integer> map = ctx.bodyAsClass(Map.class);
int parallelism = map.get("parallelism");

/** scale out optimizers, url:/optimizers. */
public void createOptimizer(Context ctx) {
Map<String, Object> map = ctx.bodyAsClass(Map.class);
int parallelism = Integer.parseInt(map.get("parallelism").toString());
String optimizerGroup = map.get("optimizerGroup").toString();
ResourceGroup resourceGroup = optimizerManager.getResourceGroup(optimizerGroup);
Resource resource =
new Resource.Builder(
Expand All @@ -195,75 +82,7 @@ public void scaleOutOptimizer(Context ctx) {
.build();
ResourceContainers.get(resource.getContainerName()).requestResource(resource);
optimizerManager.createResource(resource);
ctx.json(OkResponse.of("success to scaleOut optimizer"));
}

/** get {@link List<OptimizerResourceInfo>} url = /optimize/resourceGroups */
public void getResourceGroup(Context ctx) {
List<OptimizerResourceInfo> result =
optimizerManager.listResourceGroups().stream()
.map(
group -> {
List<OptimizerInstance> optimizers =
optimizerManager.listOptimizers(group.getName());
OptimizerResourceInfo optimizerResourceInfo = new OptimizerResourceInfo();
optimizerResourceInfo.setResourceGroup(
optimizerManager.getResourceGroup(group.getName()));
optimizers.forEach(
optimizer -> {
optimizerResourceInfo.addOccupationCore(optimizer.getThreadCount());
optimizerResourceInfo.addOccupationMemory(optimizer.getMemoryMb());
});
return optimizerResourceInfo;
})
.collect(Collectors.toList());
ctx.json(OkResponse.of(result));
}

/**
* create optimizeGroup: name, container, schedulePolicy, properties url =
* /optimize/resourceGroups/create
*/
public void createResourceGroup(Context ctx) {
Map<String, Object> map = ctx.bodyAsClass(Map.class);
String name = (String) map.get("name");
String container = (String) map.get("container");
Map<String, String> properties = (Map) map.get("properties");
if (optimizerManager.getResourceGroup(name) != null) {
throw new BadRequestException(String.format("Optimizer group:%s already existed.", name));
}
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
optimizerManager.createResourceGroup(builder.build());
ctx.json(OkResponse.of("The optimizer group has been successfully created."));
}

/**
* update optimizeGroup: name, container, schedulePolicy, properties url =
* /optimize/resourceGroups/update
*/
public void updateResourceGroup(Context ctx) {
Map<String, Object> map = ctx.bodyAsClass(Map.class);
String name = (String) map.get("name");
String container = (String) map.get("container");
Map<String, String> properties = (Map) map.get("properties");
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
optimizerManager.updateResourceGroup(builder.build());
ctx.json(OkResponse.of("The optimizer group has been successfully updated."));
}

/** delete optimizeGroup url = /optimize/resourceGroups/{resourceGroupName} */
public void deleteResourceGroup(Context ctx) {
String name = ctx.pathParam("resourceGroupName");
optimizerManager.deleteResourceGroup(name);
ctx.json(OkResponse.of("The optimizer group has been successfully deleted."));
}

/** check if optimizerGroup can be deleted url = /optimize/resourceGroups/delete/check */
public void deleteCheckResourceGroup(Context ctx) {
String name = ctx.pathParam("resourceGroupName");
ctx.json(OkResponse.of(optimizerManager.canDeleteResourceGroup(name)));
ctx.json(OkResponse.of("success to create optimizer"));
}

/** check if optimizerGroup can be deleted url = /optimize/containers/get */
Expand Down
Loading
Loading