Skip to content

Commit

Permalink
Merge pull request #304 from madgik/bug/359_iterative_algorithms_gett…
Browse files Browse the repository at this point in the history
…ing_stuck

Fix for exareme hanging when containers fail.
  • Loading branch information
ThanKarab authored Jan 21, 2021
2 parents 0f900af + 3b1991f commit 219e2b6
Show file tree
Hide file tree
Showing 18 changed files with 73 additions and 72 deletions.
19 changes: 7 additions & 12 deletions Exareme-Docker/files/root/exareme/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ CONSUL_WAIT_FOR_MASTER_IP_MAX_ATTEMPTS=20
EXAREME_NODE_STARTUP_HEALTH_CHECK_MAX_ATTEMPTS=10
EXAREME_NODE_HEALTH_CHECK_TIMEOUT=60
MASTER_NODE_REACHABLE_TIMEOUT=5
PERIODIC_EXAREME_NODES_HEALTH_CHECK_MAX_RETRIES=3
PERIODIC_EXAREME_NODES_HEALTH_CHECK_MAX_RETRIES=5
PERIODIC_EXAREME_NODES_HEALTH_CHECK_INTERVAL=120
EXAREME_HEALTH_CHECK_AWAIT_TIME=60
PERIODIC_TEMP_FILES_REMOVAL=300
Expand Down Expand Up @@ -118,7 +118,7 @@ exaremeNodesHealthCheck() {
echo "$(timestamp) HEALTH CHECK for node with IP ${NODE_IP} and name ${NODE_NAME} ."

if [[ "${FEDERATION_ROLE}" == "master" ]]; then
check=$(curl -s --max-time ${EXAREME_NODE_HEALTH_CHECK_TIMEOUT} "${NODE_IP}:9092/check/worker?NODE_IP=${NODE_IP}&NODE_NAME=${NODE_NAME}")
check=$(curl -s -X POST --max-time ${EXAREME_NODE_HEALTH_CHECK_TIMEOUT} ${NODE_IP}:9090/mining/query/HEALTH_CHECK)
else
check=$(curl -s --max-time ${EXAREME_NODE_HEALTH_CHECK_TIMEOUT} "${MASTER_IP}:9092/check/worker?NODE_IP=${NODE_IP}&NODE_NAME=${NODE_NAME}")
fi
Expand Down Expand Up @@ -255,11 +255,6 @@ if [[ "${FEDERATION_ROLE}" == "master" ]]; then
curl -s -X PUT -d @- ${CONSULURL}/v1/kv/${CONSUL_MASTER_PATH}/${NODE_NAME} <<<${NODE_IP}
curl -s -X PUT -d @- ${CONSULURL}/v1/kv/${CONSUL_ACTIVE_WORKERS_PATH}/${NODE_NAME} <<<${NODE_IP}

if ! startupExaremeNodesHealthCheck; then
echo "$(timestamp) HEALTH CHECK algorithm failed. Switch ENVIRONMENT_TYPE to 'DEV' to see error messages coming from EXAREME. Exiting..."
exit 1
fi

periodicExaremeNodesHealthCheck &

else ##### Running bootstrap on a worker node #####
Expand All @@ -271,16 +266,16 @@ else ##### Running bootstrap on a worker node #####

echo "$(timestamp) Starting Exareme on worker node with IP: ${NODE_IP} and nodeName: ${NODE_NAME}"
. ./start-worker.sh

# Updating consul with node IP
echo -e "\n$(timestamp) Updating consul with worker node IP."
curl -s -X PUT -d @- ${CONSULURL}/v1/kv/${CONSUL_ACTIVE_WORKERS_PATH}/${NODE_NAME} <<<${NODE_IP}


if ! startupExaremeNodesHealthCheck; then
echo "$(timestamp) HEALTH CHECK algorithm failed. Switch ENVIRONMENT_TYPE to 'DEV' to see error messages coming from EXAREME. Exiting..."
exit 1
fi

# Updating consul with node IP
echo -e "\n$(timestamp) Updating consul with worker node IP."
curl -s -X PUT -d @- ${CONSULURL}/v1/kv/${CONSUL_ACTIVE_WORKERS_PATH}/${NODE_NAME} <<<${NODE_IP}

periodicExaremeNodesHealthCheck &

periodicReachableMasterNodeCheck &
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public interface AdpDBClientQueryStatus {

void registerListener(AdpDBQueryListener listener) throws RemoteException;

InputStream getResult() throws RemoteException;
String getResult() throws RemoteException;

InputStream getResult(DataSerialization ds) throws RemoteException;
String getResult(DataSerialization ds) throws RemoteException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.rmi.RemoteException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.FutureTask;

/**
* @author alex
Expand All @@ -35,7 +37,8 @@ public class RmiAdpDBClientQueryStatus implements AdpDBClientQueryStatus {
private String lastStatus;
private TimeFormat timeF;
private boolean finished;
private InputStream result;
private boolean error;
private String result;

public RmiAdpDBClientQueryStatus(AdpDBQueryID queryId, AdpDBClientProperties properties,
AdpDBQueryExecutionPlan plan, AdpDBStatus status) {
Expand All @@ -45,6 +48,7 @@ public RmiAdpDBClientQueryStatus(AdpDBQueryID queryId, AdpDBClientProperties pro
this.lastStatus = null;
this.timeF = new TimeFormat(TimeUnit.min);
this.finished = false;
this.error = false;
result = null;
}

Expand All @@ -61,13 +65,6 @@ public boolean hasFinished() throws RemoteException {
if (!status.hasFinished() && !status.hasError())
return false;

try {
String algorithmResult = IOUtils.toString(getResult(DataSerialization.summary), StandardCharsets.UTF_8);
log.info("Algorithm with queryId " + getQueryID().getQueryID() + " terminated. Result: \n " + algorithmResult);
} catch (IOException e) {
log.error("Could not read the algorithm result table." + getQueryID());
}

finished = true;
return true;
}
Expand Down Expand Up @@ -113,7 +110,7 @@ public void registerListener(AdpDBQueryListener listener) throws RemoteException
}

@Override
public InputStream getResult() throws RemoteException {
public String getResult() throws RemoteException {
return getResult(DataSerialization.ldjson);
}

Expand All @@ -129,14 +126,27 @@ public InputStream getResult() throws RemoteException {
* @throws RemoteException
*/
@Override
public InputStream getResult(DataSerialization ds) throws RemoteException {
public String getResult(DataSerialization ds) throws RemoteException {

// The registry should be updated the 1st time we fetch a result stream.
if (result == null) {
updateRegistry();
}
result = new RmiAdpDBClient(AdpDBManagerLocator.getDBManager(), properties)
InputStream resultStream = new RmiAdpDBClient(AdpDBManagerLocator.getDBManager(), properties)
.readTable(plan.getResultTables().get(0).getName(), ds);

FutureTask<String> getResultFromStream;
try {
getResultFromStream = new FutureTask<>(() ->
IOUtils.toString(resultStream, StandardCharsets.UTF_8));

new Thread(getResultFromStream).start();
result = getResultFromStream.get(30, java.util.concurrent.TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Error reading the result table! QueryID:" + status.getQueryID().getQueryID(), e);
throw new RemoteException("Could not read the result table!");
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ public void run() {
statusManager.getStatistics(status.getId()).setTotalDataTransfers(stats.getTotalData());

while (!sessionManager.hasFinished() && !sessionManager.hasError()) {

boolean updateProgressStatistics = updateProgressStatistics();
if (updateProgressStatistics) {
log.info("Session is running... ID: " + sessionPlan.getSessionID().getLongId()
log.info("Session is updating... ID: " + sessionPlan.getSessionID().getLongId()
+ " , QueryID: " + queryID.getQueryID());
log.debug("Update listeners ...");
synchronized (listeners) {
Expand All @@ -89,13 +88,15 @@ public void run() {
.setAdpEngineStatistics(statsManager.getStatistics());

if (sessionManager != null && !sessionManager.hasError()) {
log.debug("Session finished, closing! ID: " + sessionPlan.getSessionID().getLongId()
log.info("Session finished, closing! ID: " + sessionPlan.getSessionID().getLongId()
+ " , QueryID: " + queryID.getQueryID());
statusManager.setFinished(status.getId());
} else {
log.info("Session error! ID: " + sessionPlan.getSessionID().getLongId()
+ " , QueryID: " + queryID.getQueryID());
statusManager.setError(status.getId(), sessionManager.getErrorList().get(0));
}
log.debug("Session closing! ID: "+ sessionPlan.getSessionID().getLongId()
log.debug("Session closing! ID: " + sessionPlan.getSessionID().getLongId()
+ " , QueryID: " + queryID.getQueryID());
sessionPlan.close();

Expand All @@ -104,7 +105,6 @@ public void run() {
log.error("Cannot monitor job, sessionID: " + sessionPlan.getSessionID().getLongId());
log.error("Cannot monitor job, queryID: " + status.getQueryID().getQueryID(), e);
} finally {
log.debug("Terminate listeners ( " + listeners.size() + ")...");
synchronized (listeners) {
for (AdpDBQueryListener l : listeners) {
l.terminated(queryID, status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ public AdpDBStatus executeScript(AdpDBQueryExecutionPlan execPlan, AdpDBClientPr
AdpDBArtJobMonitor monitor =
new AdpDBArtJobMonitor(sessionPlan, status, statusManager, execPlan.getQueryID());
monitors.put(execPlan.getQueryID(), monitor);

executor.submit(monitor);
statusArray.add(status);
return status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ public NIterativeAlgorithmResultEntity(IterativeAlgorithmState iterativeAlgorith
this.dataSerialization = dataSerialization;
}

private final static String user_error = new String("text/plain+user_error");
private final static String error = new String("text/plain+error");
private final static String warning = new String("text/plain+warning");


/**
* @param encoder is used to save the output
* @param ioctrl will be used from the iterativeAlgorithmState, when the algorithm is complete,
Expand Down Expand Up @@ -87,9 +82,11 @@ public void produceContent(ContentEncoder encoder, IOControl ioctrl) throws IOEx
if (!finalizeQueryStatus.hasError() &&
finalizeQueryStatus.hasFinished()) {
if (channel == null) {
String result = iterativeAlgorithmState.getAdpDBClientQueryStatus().getResult(dataSerialization);
log.info("Iterative algorithm with key " + iterativeAlgorithmState.getAlgorithmKey()
+ " terminated. Result: \n " + result);
channel = Channels.newChannel(
iterativeAlgorithmState.getAdpDBClientQueryStatus()
.getResult(dataSerialization));
new ByteArrayInputStream(result.getBytes(StandardCharsets.UTF_8)));
}
// Reading from the channel to the buffer, flip is required by the API
channel.read(buffer);
Expand Down Expand Up @@ -183,11 +180,13 @@ public void closeQuery() throws IOException {
finalizeQueryStatus.close();
finalizeQueryStatus = null;
}
if (iterativeAlgorithmState != null)
iterativeAlgorithmState.releaseLock();
iterativeAlgorithmState = null;
}

@Override
public void close() throws IOException {
public void close() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import madgik.exareme.common.app.engine.AdpDBQueryID;
import madgik.exareme.master.client.AdpDBClientQueryStatus;
import madgik.exareme.master.engine.iterations.scheduler.IterationsDispatcher;
import madgik.exareme.master.engine.iterations.scheduler.events.phaseCompletion.PhaseCompletionEventHandler;
import madgik.exareme.master.engine.iterations.state.IterationsStateManager;
import madgik.exareme.master.engine.iterations.state.IterativeAlgorithmState;
import madgik.exareme.utils.eventProcessor.EventHandler;
Expand Down Expand Up @@ -56,8 +55,9 @@ protected AdpDBClientQueryStatus submitQueryAndUpdateExecutionPhase(
ias.getAlgorithmKey(),
dflScript);

log.info("New Iterative phase: " + currentPhase);
log.info("Executing Iterative DFL Script: \n" + dflScript);
log.info("New Iterative phase: " + currentPhase + " for algorithm: " + ias.getAlgorithmKey() +
" with queryID: " + queryStatus.getQueryID().getQueryID());
log.debug("Executing Iterative DFL Script: \n" + dflScript);

ias.setCurrentExecutionPhase(currentPhase);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ public void handle(PhaseCompletionEvent event, EventProcessor proc) throws Remot

String terminationConditionResult;
try {
InputStream previousResultStream = ias.getAdpDBClientQueryStatus().getResult();
terminationConditionResult = IOUtils.toString(previousResultStream, StandardCharsets.UTF_8);
terminationConditionResult = ias.getAdpDBClientQueryStatus().getResult();
} catch (IOException e) {
throw new IterationsStateFatalException(
"Could not read the termination_condition result table.", ias.getAlgorithmKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import madgik.exareme.utils.eventProcessor.EventProcessor;

import java.rmi.RemoteException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@
import madgik.exareme.common.app.engine.AdpDBStatistics;
import madgik.exareme.master.engine.AdpDBManagerLocator;
import madgik.exareme.worker.art.executionEngine.session.ExecutionEngineSessionPlan;
import org.apache.log4j.Logger;

import java.rmi.RemoteException;
import java.util.Arrays;
import java.util.Map;

/**
* @author herald
*/
public class AdpDBJobSession {
private static final Logger log = Logger.getLogger(AdpDBJobSession.class);
private boolean finished = false;
private boolean error = false;
private Exception exception = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import madgik.exareme.master.engine.iterations.handler.NIterativeAlgorithmResultEntity;
import madgik.exareme.master.engine.iterations.state.IterativeAlgorithmState;
import madgik.exareme.master.gateway.ExaremeGatewayUtils;
import madgik.exareme.master.gateway.async.handler.HBP.Exceptions.RequestException;
import madgik.exareme.master.gateway.async.handler.HBP.Exceptions.BadUserInputException;
import madgik.exareme.master.gateway.async.handler.HBP.Exceptions.RequestException;
import madgik.exareme.master.gateway.async.handler.entity.NQueryResultEntity;
import madgik.exareme.master.queryProcessor.HBP.AlgorithmProperties;
import madgik.exareme.master.queryProcessor.HBP.Algorithms;
Expand Down Expand Up @@ -160,7 +160,7 @@ private void handleHBPAlgorithmExecution(HttpRequest request, HttpResponse respo
queryStatus = dbClient.query(algorithmKey, dfl);

log.info("Algorithm " + algorithmKey + " with queryID "
+ queryStatus.getQueryID() + " execution started.");
+ queryStatus.getQueryID().getQueryID() + " execution started.");
log.debug("DFL Script: \n " + dfl);

BasicHttpEntity entity = new NQueryResultEntity(queryStatus, ds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
import org.apache.http.nio.entity.HttpAsyncContentProducer;
import org.apache.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;

/**
* TODO flush output before suspend
*/
public class NQueryResultEntity extends BasicHttpEntity implements HttpAsyncContentProducer {

private static final Logger log = Logger.getLogger(NQueryResultEntity.class);
Expand Down Expand Up @@ -55,7 +54,11 @@ public void produceContent(ContentEncoder encoder, IOControl iocontrol)

if (!queryStatus.hasError()) {
if (channel == null) {
channel = Channels.newChannel(queryStatus.getResult(format));
String result = queryStatus.getResult(format);
log.info("Algorithm with queryId " + queryStatus.getQueryID().getQueryID()
+ " terminated. Result: \n " + result);
channel = Channels.newChannel(
new ByteArrayInputStream(result.getBytes(StandardCharsets.UTF_8)));
}
channel.read(buffer);
buffer.flip();
Expand All @@ -68,32 +71,27 @@ public void produceContent(ContentEncoder encoder, IOControl iocontrol)
close();
}
} else {
log.trace("|" + queryStatus.getError() + "|");
if (queryStatus.getError().contains("ExaremeError:")) {
String data = queryStatus.getError().substring(queryStatus.getError().lastIndexOf("ExaremeError:") + "ExaremeError:".length()).replaceAll("\\s", " ");
//type could be error, user_error, warning regarding the error occurred along the process
String result = HBPQueryHelper.ErrorResponse.createErrorResponse(data, user_error);
logErrorMessage(result);
encoder.write(ByteBuffer.wrap(result.getBytes()));
encoder.complete();
} else if (queryStatus.getError().contains("PrivacyError")) {
String data = "The Experiment could not run with the input provided because there are insufficient data.";
//type could be error, user_error, warning regarding the error occurred along the process
String result = HBPQueryHelper.ErrorResponse.createErrorResponse(data, warning);
logErrorMessage(result);
encoder.write(ByteBuffer.wrap(result.getBytes()));
encoder.complete();
} else if (queryStatus.getError().contains("java.rmi.RemoteException")) {
String data = "One or more containers are not responding. Please inform the system administrator.";
//type could be error, user_error, warning regarding the error occurred along the process
String result = HBPQueryHelper.ErrorResponse.createErrorResponse(data, error);
logErrorMessage(result);
encoder.write(ByteBuffer.wrap(result.getBytes()));
encoder.complete();
} else {
log.info("Exception from madis: " + queryStatus.getError());
log.info("Exception when running the query: " + queryStatus.getError());
String data = "Something went wrong. Please inform the system administrator.";
//type could be error, user_error, warning regarding the error occurred along the process
String result = HBPQueryHelper.ErrorResponse.createErrorResponse(data, error);
logErrorMessage(result);
encoder.write(ByteBuffer.wrap(result.getBytes()));
Expand All @@ -110,12 +108,11 @@ public boolean isRepeatable() {
}

public void closeQuery() throws IOException {
log.info("Closing from Query Result : " + queryStatus.getQueryID().getQueryID());
queryStatus.close();
}

@Override
public void close() throws IOException {
public void close() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void produceContent(ContentEncoder encoder, IOControl ioctrl)
}

@Override
public void close() throws IOException {
public void close() {

}

Expand Down
Loading

0 comments on commit 219e2b6

Please sign in to comment.