Skip to content

Commit

Permalink
[ADH-5111] Return action response to the end user
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrulya-exe committed Oct 11, 2024
1 parent ca012b4 commit 0239cc4
Show file tree
Hide file tree
Showing 23 changed files with 409 additions and 444 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ public class EchoAction extends SmartAction {

@Override
protected void execute() throws Exception {
appendLog(getArguments().get(PRINT_MESSAGE));
appendResult(getArguments().get(PRINT_MESSAGE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,19 @@
)
public class SleepAction extends SmartAction {
public static final String TIME_IN_MS = "-ms";

private boolean started = false;
private long toSleep;
private long startTm;

private volatile long toSleep;

@Override
protected void execute() throws Exception {
if (!getArguments().containsKey(TIME_IN_MS)) {
throw new IllegalArgumentException("Time to sleep not specified (through option '"
+ TIME_IN_MS + "').");
}
toSleep = Long.valueOf(getArguments().get(TIME_IN_MS));
toSleep = Long.parseLong(getArguments().get(TIME_IN_MS));
if (toSleep == 0) {
return;
}
Expand Down
10 changes: 5 additions & 5 deletions smart-action/src/main/java/org/smartdata/action/SmartAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.smartdata.protocol.message.ActionStatus;

import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
Expand Down Expand Up @@ -126,14 +126,14 @@ public float getProgress() {
return 0.0F;
}

public ActionStatus getActionStatus() throws UnsupportedEncodingException {
public ActionStatus getActionStatus() {
return new ActionStatus(
cmdletId,
lastAction,
actionId,
getProgress(),
resultOutputStream.toString("UTF-8"),
logOutputStream.toString("UTF-8"),
resultOutputStream.toString(StandardCharsets.UTF_8),
logOutputStream.toString(StandardCharsets.UTF_8),
startTime,
finishTime,
throwable,
Expand All @@ -146,7 +146,7 @@ private void stop() {
}

@VisibleForTesting
public boolean getExpectedAfterRun() throws UnsupportedEncodingException {
public boolean getExpectedAfterRun() {
ActionStatus actionStatus = getActionStatus();
return actionStatus.isFinished() && actionStatus.getThrowable() == null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@
*/
package org.smartdata.protocol.message;

import lombok.Data;

import java.util.List;

@Data
public class StatusReport implements StatusMessage {
private List<ActionStatus> actionStatuses;

public StatusReport(List<ActionStatus> actionStatuses) {
this.actionStatuses = actionStatuses;
}

public List<ActionStatus> getActionStatuses() {
return actionStatuses;
}
private final List<ActionStatus> actionStatuses;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.smartdata.model.CmdletState;
import org.smartdata.protocol.message.ActionStatus;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -103,12 +102,12 @@ private void runAllActions() {
act.init(act.getArguments());
act.run();
if (!act.isSuccessful()) {
while (iter.hasNext()) {
SmartAction nextAct = iter.next();
synchronized (this) {
actionReportList.remove(nextAct);
}
while (iter.hasNext()) {
SmartAction nextAct = iter.next();
synchronized (this) {
actionReportList.remove(nextAct);
}
}
state = CmdletState.FAILED;
stateUpdateTime = System.currentTimeMillis();
LOG.error("Executing Cmdlet [id={}] meets failed.", getId());
Expand All @@ -125,7 +124,7 @@ public void run() {
runAllActions();
}

public synchronized List<ActionStatus> getActionStatuses() throws UnsupportedEncodingException {
public synchronized List<ActionStatus> getActionStatuses() {
if (actionReportList.isEmpty()) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,12 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.conf.SmartConf;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.model.CmdletState;
import org.smartdata.protocol.message.ActionStatus;
import org.smartdata.protocol.message.StatusReport;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -42,45 +39,42 @@
//Todo: 1. make this a interface so that we could have different executor implementation
// 2. add api providing available resource
public class CmdletExecutor {
static final Logger LOG = LoggerFactory.getLogger(CmdletExecutor.class);
private final Map<Long, Future<?>> listenableFutures;
private final Map<Long, Cmdlet> runningCmdlets;
private final Map<Long, Cmdlet> idToReportCmdlet;

private final SmartConf smartConf;
private Map<Long, Future> listenableFutures;
private Map<Long, Cmdlet> runningCmdlets;
private Map<Long, Cmdlet> idToReportCmdlet;

private ListeningExecutorService executorService;
private final ListeningExecutorService executorService;

public CmdletExecutor(SmartConf smartConf) {
this.smartConf = smartConf;
this.listenableFutures = new ConcurrentHashMap<>();
this.runningCmdlets = new ConcurrentHashMap<>();
this.idToReportCmdlet = new ConcurrentHashMap<>();
int nThreads =
int cmdletExecutorsNum =
smartConf.getInt(
SmartConfKeys.SMART_CMDLET_EXECUTORS_KEY,
SmartConfKeys.SMART_CMDLET_EXECUTORS_DEFAULT);
this.executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(nThreads));
this.executorService = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(cmdletExecutorsNum));
}

public void execute(Cmdlet cmdlet) {
ListenableFuture<?> future = this.executorService.submit(cmdlet);
ListenableFuture<?> future = executorService.submit(cmdlet);
Futures.addCallback(future, new CmdletCallBack(cmdlet), executorService);
this.listenableFutures.put(cmdlet.getId(), future);
this.runningCmdlets.put(cmdlet.getId(), cmdlet);
listenableFutures.put(cmdlet.getId(), future);
runningCmdlets.put(cmdlet.getId(), cmdlet);
idToReportCmdlet.put(cmdlet.getId(), cmdlet);
}

public void stop(Long cmdletId) {
if (this.listenableFutures.containsKey(cmdletId)) {
if (listenableFutures.containsKey(cmdletId)) {
runningCmdlets.get(cmdletId).setState(CmdletState.FAILED);
this.listenableFutures.get(cmdletId).cancel(true);
listenableFutures.get(cmdletId).cancel(true);
}
removeCmdlet(cmdletId);
}

public void shutdown() {
this.executorService.shutdown();
executorService.shutdown();
}

public StatusReport getStatusReport() {
Expand All @@ -92,24 +86,20 @@ public StatusReport getStatusReport() {
Iterator<Cmdlet> iter = idToReportCmdlet.values().iterator();
while (iter.hasNext()) {
Cmdlet cmdlet = iter.next();
try {
List<ActionStatus> statuses = cmdlet.getActionStatuses();
if (statuses != null) {
actionStatusList.addAll(statuses);
} else {
iter.remove();
}
} catch (UnsupportedEncodingException e) {
LOG.error("Get actionStatus for cmdlet [id={}] error", cmdlet.getId(), e);
List<ActionStatus> statuses = cmdlet.getActionStatuses();
if (statuses != null) {
actionStatusList.addAll(statuses);
} else {
iter.remove();
}
}

return new StatusReport(actionStatusList);
}

private void removeCmdlet(long cmdletId) {
this.runningCmdlets.remove(cmdletId);
this.listenableFutures.remove(cmdletId);
runningCmdlets.remove(cmdletId);
listenableFutures.remove(cmdletId);
}

private class CmdletCallBack implements FutureCallback<Object> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@
import org.smartdata.protocol.message.StatusReporter;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class StatusReportTask implements Runnable {
private StatusReporter statusReporter;
private CmdletExecutor cmdletExecutor;
private final StatusReporter statusReporter;
private final CmdletExecutor cmdletExecutor;
private final int interval;
private final double ratio;
private final Map<Long, ActionStatus> idToActionStatus;
private long lastReportTime;
private int interval;
public double ratio;
private Map<Long, ActionStatus> idToActionStatus;

public StatusReportTask(
StatusReporter statusReporter, CmdletExecutor cmdletExecutor, SmartConf conf) {
Expand All @@ -51,29 +52,26 @@ public StatusReportTask(
this.idToActionStatus = new HashMap<>();
}

@Override
public void run() {
StatusReport statusReport = cmdletExecutor.getStatusReport();
if (statusReport != null) {
List<ActionStatus> actionStatuses = statusReport.getActionStatuses();
for (ActionStatus actionStatus : actionStatuses) {
idToActionStatus.put(actionStatus.getActionId(), actionStatus);
}
if (!idToActionStatus.values().isEmpty()) {
int finishedNum = 0;
for (ActionStatus actionStatus : idToActionStatus.values()) {
if (actionStatus.isFinished()) {
finishedNum++;
}
}
long currentTime = System.currentTimeMillis();
if (currentTime - lastReportTime >= interval
|| (double) finishedNum / idToActionStatus.size() >= ratio) {
statusReporter.report(new StatusReport(new ArrayList(idToActionStatus.values())));
idToActionStatus.clear();
lastReportTime = currentTime;
}
}
Optional.ofNullable(cmdletExecutor.getStatusReport())
.map(StatusReport::getActionStatuses)
.orElseGet(Collections::emptyList)
.forEach(status -> idToActionStatus.put(status.getActionId(), status));

if (idToActionStatus.isEmpty()) {
return;
}
long finishedNum = idToActionStatus.values()
.stream()
.filter(ActionStatus::isFinished)
.count();

long currentTime = System.currentTimeMillis();
if (currentTime - lastReportTime >= interval
|| (double) finishedNum / idToActionStatus.size() >= ratio) {
statusReporter.report(new StatusReport(new ArrayList<>(idToActionStatus.values())));
idToActionStatus.clear();
lastReportTime = currentTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.smartdata.hdfs.action;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.smartdata.action.annotation.ActionSignature;

import java.util.Map;
import java.util.Optional;

/**
* An action to check the EC policy for a file or dir.
Expand All @@ -43,12 +45,15 @@ public void init(Map<String, String> args) {

@Override
public void execute() throws Exception {
ErasureCodingPolicy srcEcPolicy = dfsClient.getErasureCodingPolicy(srcPath);
if (srcEcPolicy == null) {
appendLog(RESULT_OF_NULL_EC_POLICY);
} else {
appendLog(srcEcPolicy.toString());
if (StringUtils.isBlank(srcPath)) {
throw new IllegalArgumentException("File parameter is missing! ");
}

String result = Optional.ofNullable(dfsClient.getErasureCodingPolicy(srcPath))
.map(ErasureCodingPolicy::toString)
.orElse(RESULT_OF_NULL_EC_POLICY);

appendResult(result);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void init(Map<String, String> args) {
@Override
public void execute() throws Exception {
for (ErasureCodingPolicyInfo policyInfo : dfsClient.getErasureCodingPolicies()) {
appendLog("{" + policyInfo.toString() + "}");
appendResult("{" + policyInfo.toString() + "}");
}
}

Expand Down

This file was deleted.

Loading

0 comments on commit 0239cc4

Please sign in to comment.