Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADH-5111] Return action response to the end user #122

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ public class EchoAction extends SmartAction {

@Override
protected void execute() throws Exception {
appendLog(getArguments().get(PRINT_MESSAGE));
appendResult(getArguments().get(PRINT_MESSAGE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,19 @@
)
public class SleepAction extends SmartAction {
public static final String TIME_IN_MS = "-ms";

private boolean started = false;
private long toSleep;
private long startTm;

private volatile long toSleep;

@Override
protected void execute() throws Exception {
if (!getArguments().containsKey(TIME_IN_MS)) {
throw new IllegalArgumentException("Time to sleep not specified (through option '"
+ TIME_IN_MS + "').");
}
toSleep = Long.valueOf(getArguments().get(TIME_IN_MS));
toSleep = Long.parseLong(getArguments().get(TIME_IN_MS));
if (toSleep == 0) {
return;
}
Expand Down
10 changes: 5 additions & 5 deletions smart-action/src/main/java/org/smartdata/action/SmartAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.smartdata.protocol.message.ActionStatus;

import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
Expand Down Expand Up @@ -126,14 +126,14 @@ public float getProgress() {
return 0.0F;
}

public ActionStatus getActionStatus() throws UnsupportedEncodingException {
public ActionStatus getActionStatus() {
return new ActionStatus(
cmdletId,
lastAction,
actionId,
getProgress(),
resultOutputStream.toString("UTF-8"),
logOutputStream.toString("UTF-8"),
resultOutputStream.toString(StandardCharsets.UTF_8),
logOutputStream.toString(StandardCharsets.UTF_8),
startTime,
finishTime,
throwable,
Expand All @@ -146,7 +146,7 @@ private void stop() {
}

@VisibleForTesting
public boolean getExpectedAfterRun() throws UnsupportedEncodingException {
public boolean getExpectedAfterRun() {
ActionStatus actionStatus = getActionStatus();
return actionStatus.isFinished() && actionStatus.getThrowable() == null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@
*/
package org.smartdata.protocol.message;

import lombok.Data;

import java.util.List;

@Data
public class StatusReport implements StatusMessage {
private List<ActionStatus> actionStatuses;

public StatusReport(List<ActionStatus> actionStatuses) {
this.actionStatuses = actionStatuses;
}

public List<ActionStatus> getActionStatuses() {
return actionStatuses;
}
private final List<ActionStatus> actionStatuses;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.smartdata.model.CmdletState;
import org.smartdata.protocol.message.ActionStatus;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -103,12 +102,12 @@ private void runAllActions() {
act.init(act.getArguments());
act.run();
if (!act.isSuccessful()) {
while (iter.hasNext()) {
SmartAction nextAct = iter.next();
synchronized (this) {
actionReportList.remove(nextAct);
}
while (iter.hasNext()) {
SmartAction nextAct = iter.next();
synchronized (this) {
actionReportList.remove(nextAct);
}
}
state = CmdletState.FAILED;
stateUpdateTime = System.currentTimeMillis();
LOG.error("Executing Cmdlet [id={}] meets failed.", getId());
Expand All @@ -125,7 +124,7 @@ public void run() {
runAllActions();
}

public synchronized List<ActionStatus> getActionStatuses() throws UnsupportedEncodingException {
public synchronized List<ActionStatus> getActionStatuses() {
if (actionReportList.isEmpty()) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,12 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.conf.SmartConf;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.model.CmdletState;
import org.smartdata.protocol.message.ActionStatus;
import org.smartdata.protocol.message.StatusReport;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -42,45 +39,42 @@
//Todo: 1. make this a interface so that we could have different executor implementation
// 2. add api providing available resource
public class CmdletExecutor {
static final Logger LOG = LoggerFactory.getLogger(CmdletExecutor.class);
private final Map<Long, Future<?>> listenableFutures;
private final Map<Long, Cmdlet> runningCmdlets;
private final Map<Long, Cmdlet> idToReportCmdlet;

private final SmartConf smartConf;
private Map<Long, Future> listenableFutures;
private Map<Long, Cmdlet> runningCmdlets;
private Map<Long, Cmdlet> idToReportCmdlet;

private ListeningExecutorService executorService;
private final ListeningExecutorService executorService;

public CmdletExecutor(SmartConf smartConf) {
this.smartConf = smartConf;
this.listenableFutures = new ConcurrentHashMap<>();
this.runningCmdlets = new ConcurrentHashMap<>();
this.idToReportCmdlet = new ConcurrentHashMap<>();
int nThreads =
int cmdletExecutorsNum =
smartConf.getInt(
SmartConfKeys.SMART_CMDLET_EXECUTORS_KEY,
SmartConfKeys.SMART_CMDLET_EXECUTORS_DEFAULT);
this.executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(nThreads));
this.executorService = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(cmdletExecutorsNum));
}

public void execute(Cmdlet cmdlet) {
ListenableFuture<?> future = this.executorService.submit(cmdlet);
ListenableFuture<?> future = executorService.submit(cmdlet);
Futures.addCallback(future, new CmdletCallBack(cmdlet), executorService);
this.listenableFutures.put(cmdlet.getId(), future);
this.runningCmdlets.put(cmdlet.getId(), cmdlet);
listenableFutures.put(cmdlet.getId(), future);
runningCmdlets.put(cmdlet.getId(), cmdlet);
idToReportCmdlet.put(cmdlet.getId(), cmdlet);
}

public void stop(Long cmdletId) {
if (this.listenableFutures.containsKey(cmdletId)) {
if (listenableFutures.containsKey(cmdletId)) {
runningCmdlets.get(cmdletId).setState(CmdletState.FAILED);
this.listenableFutures.get(cmdletId).cancel(true);
listenableFutures.get(cmdletId).cancel(true);
}
removeCmdlet(cmdletId);
}

public void shutdown() {
this.executorService.shutdown();
executorService.shutdown();
}

public StatusReport getStatusReport() {
Expand All @@ -92,24 +86,20 @@ public StatusReport getStatusReport() {
Iterator<Cmdlet> iter = idToReportCmdlet.values().iterator();
while (iter.hasNext()) {
Cmdlet cmdlet = iter.next();
try {
List<ActionStatus> statuses = cmdlet.getActionStatuses();
if (statuses != null) {
actionStatusList.addAll(statuses);
} else {
iter.remove();
}
} catch (UnsupportedEncodingException e) {
LOG.error("Get actionStatus for cmdlet [id={}] error", cmdlet.getId(), e);
List<ActionStatus> statuses = cmdlet.getActionStatuses();
if (statuses != null) {
actionStatusList.addAll(statuses);
} else {
iter.remove();
}
}

return new StatusReport(actionStatusList);
}

private void removeCmdlet(long cmdletId) {
this.runningCmdlets.remove(cmdletId);
this.listenableFutures.remove(cmdletId);
runningCmdlets.remove(cmdletId);
listenableFutures.remove(cmdletId);
}

private class CmdletCallBack implements FutureCallback<Object> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@
import org.smartdata.protocol.message.StatusReporter;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class StatusReportTask implements Runnable {
private StatusReporter statusReporter;
private CmdletExecutor cmdletExecutor;
private final StatusReporter statusReporter;
private final CmdletExecutor cmdletExecutor;
private final int interval;
private final double ratio;
private final Map<Long, ActionStatus> idToActionStatus;
private long lastReportTime;
private int interval;
public double ratio;
private Map<Long, ActionStatus> idToActionStatus;

public StatusReportTask(
StatusReporter statusReporter, CmdletExecutor cmdletExecutor, SmartConf conf) {
Expand All @@ -51,29 +52,26 @@ public StatusReportTask(
this.idToActionStatus = new HashMap<>();
}

@Override
public void run() {
StatusReport statusReport = cmdletExecutor.getStatusReport();
if (statusReport != null) {
List<ActionStatus> actionStatuses = statusReport.getActionStatuses();
for (ActionStatus actionStatus : actionStatuses) {
idToActionStatus.put(actionStatus.getActionId(), actionStatus);
}
if (!idToActionStatus.values().isEmpty()) {
int finishedNum = 0;
for (ActionStatus actionStatus : idToActionStatus.values()) {
if (actionStatus.isFinished()) {
finishedNum++;
}
}
long currentTime = System.currentTimeMillis();
if (currentTime - lastReportTime >= interval
|| (double) finishedNum / idToActionStatus.size() >= ratio) {
statusReporter.report(new StatusReport(new ArrayList(idToActionStatus.values())));
idToActionStatus.clear();
lastReportTime = currentTime;
}
}
Optional.ofNullable(cmdletExecutor.getStatusReport())
.map(StatusReport::getActionStatuses)
.orElseGet(Collections::emptyList)
.forEach(status -> idToActionStatus.put(status.getActionId(), status));

if (idToActionStatus.isEmpty()) {
return;
}
long finishedNum = idToActionStatus.values()
.stream()
.filter(ActionStatus::isFinished)
.count();

long currentTime = System.currentTimeMillis();
if (currentTime - lastReportTime >= interval
|| (double) finishedNum / idToActionStatus.size() >= ratio) {
statusReporter.report(new StatusReport(new ArrayList<>(idToActionStatus.values())));
idToActionStatus.clear();
lastReportTime = currentTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.smartdata.hdfs.action;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.smartdata.action.annotation.ActionSignature;

import java.util.Map;
import java.util.Optional;

/**
* An action to check the EC policy for a file or dir.
Expand All @@ -43,12 +45,15 @@ public void init(Map<String, String> args) {

@Override
public void execute() throws Exception {
ErasureCodingPolicy srcEcPolicy = dfsClient.getErasureCodingPolicy(srcPath);
if (srcEcPolicy == null) {
appendLog(RESULT_OF_NULL_EC_POLICY);
} else {
appendLog(srcEcPolicy.toString());
if (StringUtils.isBlank(srcPath)) {
throw new IllegalArgumentException("File parameter is missing! ");
}

String result = Optional.ofNullable(dfsClient.getErasureCodingPolicy(srcPath))
.map(ErasureCodingPolicy::toString)
.orElse(RESULT_OF_NULL_EC_POLICY);

appendResult(result);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void init(Map<String, String> args) {
@Override
public void execute() throws Exception {
for (ErasureCodingPolicyInfo policyInfo : dfsClient.getErasureCodingPolicies()) {
appendLog("{" + policyInfo.toString() + "}");
appendResult("{" + policyInfo.toString() + "}");
}
}

Expand Down

This file was deleted.

Loading
Loading