Skip to content

Commit

Permalink
Merge branch 'dev' into freeform-guardrails
Browse files Browse the repository at this point in the history
  • Loading branch information
abyrd authored Dec 28, 2023
2 parents 2481a93 + f349656 commit 06e479f
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 32 deletions.
38 changes: 25 additions & 13 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,23 @@ public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAn
LOG.error("Someone tried to enqueue job {} but it already exists.", templateTask.jobId);
throw new RuntimeException("Enqueued duplicate job " + templateTask.jobId);
}
// Create the Job object to share with the MultiOriginAssembler, but defer adding this job to the Multimap of
// active jobs until we're sure the result assembler was constructed without any errors. Always add and remove
// the Job and corresponding MultiOriginAssembler as a unit in the same synchronized block of code (see #887).
WorkerTags workerTags = WorkerTags.fromRegionalAnalysis(regionalAnalysis);
Job job = new Job(templateTask, workerTags);
jobs.put(job.workerCategory, job);

// Register the regional job so results received from multiple workers can be assembled into one file.
// If any parameters fail checks here, an exception may cause this method to exit early.
// TODO encapsulate MultiOriginAssemblers in a new Component
// Note: if this fails with an exception we'll have a job enqueued, possibly being processed, with no assembler.
// That is not catastrophic, but the user may need to recognize and delete the stalled regional job.
MultiOriginAssembler assembler = new MultiOriginAssembler(regionalAnalysis, job, fileStorage);
resultAssemblers.put(templateTask.jobId, assembler);

// A MultiOriginAssembler was successfully put in place. It's now safe to register and start the Job.
jobs.put(job.workerCategory, job);

// If this is a fake job for testing, don't confuse the worker startup code below with its null graph ID.
if (config.testTaskRedelivery()) {
// This is a fake job for testing, don't confuse the worker startup code below with null graph ID.
return;
}

Expand Down Expand Up @@ -385,14 +389,20 @@ public synchronized void markTaskCompleted (Job job, int taskId) {
}

/**
* When job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
* Record an error that happened while a worker was processing a task on the given job. This method is tolerant
* of job being null, because it's called on a code path where any number of things could be wrong or missing.
* This method also ensures synchronization of writes to Jobs from any non-synchronized sections of an HTTP handler.
* Once job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
* The Job will stop delivering tasks, allowing workers to shut down, but will continue to exist allowing the user
* to see the error message. User will then need to manually delete it, which will remove the result assembler.
* This method ensures synchronization of writes to Jobs from the unsynchronized worker poll HTTP handler.
*/
private synchronized void recordJobError (Job job, String error) {
if (job != null) {
job.errors.add(error);
// Limit the number of errors recorded to one.
// Still using a Set<String> instead of just String since the set of errors is exposed in a UI-facing API.
if (job.errors.isEmpty()) {
job.errors.add(error);
}
}
}

Expand Down Expand Up @@ -488,21 +498,23 @@ public void handleRegionalWorkResult(RegionalWorkResult workResult) {
// Once the job is retrieved, it can be used below to requestExtraWorkersIfAppropriate without synchronization,
// because that method only uses final fields of the job.
Job job = null;
MultiOriginAssembler assembler;
try {
MultiOriginAssembler assembler;
synchronized (this) {
job = findJob(workResult.jobId);
// Record any error reported by the worker and don't pass bad results on to regional result assembly.
// This will mark the job as errored and not-active, stopping distribution of tasks to workers.
// To ensure that happens, record errors before any other conditional that could exit this method.
if (workResult.error != null) {
recordJobError(job, workResult.error);
return;
}
assembler = resultAssemblers.get(workResult.jobId);
if (job == null || assembler == null || !job.isActive()) {
// This will happen naturally for all delivered tasks after a job is deleted or it errors out.
LOG.debug("Ignoring result for unrecognized, deleted, or inactive job ID {}.", workResult.jobId);
return;
}
if (workResult.error != null) {
// Record any error reported by the worker and don't pass bad results on to regional result assembly.
recordJobError(job, workResult.error);
return;
}
// Mark tasks completed first before passing results to the assembler. On the final result received,
// this will minimize the risk of race conditions by quickly making the job invisible to incoming stray
// results from spurious redeliveries, before the assembler is busy finalizing and uploading results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ private RegionalTask makeOneTask (int taskNumber) {
public int deliveryPass = 0;

/**
* If any error compromises the usabilty or quality of results from any origin, it is recorded here.
* If any error compromises the usability or quality of results from any origin, it is recorded here.
* This is a Set because identical errors are likely to be reported from many workers or individual tasks.
* The presence of an error here causes the job to be considered "errored" and "inactive" and stop delivering tasks.
* There is some risk here of accumulating unbounded amounts of large error messages (see #919).
* The field type could be changed to a single String instead of Set, but it's exposed on a UI-facing API as a Set.
*/
public final Set<String> errors = new HashSet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.conveyal.r5.util.ExceptionUtils;

import static com.conveyal.r5.util.ExceptionUtils.filterStackTrace;

/**
* This Event is fired each time a Throwable (usually an Exception or Error) occurs on the backend. It can then be
* recorded or tracked in various places - the console logs, Slack, etc. This could eventually be used for errors on
Expand Down Expand Up @@ -59,20 +61,4 @@ public String traceWithContext (boolean verbose) {
return builder.toString();
}

private static String filterStackTrace (String stackTrace) {
if (stackTrace == null) return null;
final String unknownFrame = "Unknown stack frame, probably optimized out by JVM.";
String error = stackTrace.lines().findFirst().get();
String frame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at "))
.findFirst().orElse(unknownFrame);
String conveyalFrame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at com.conveyal."))
.filter(s -> !frame.equals(s))
.findFirst().orElse("");
return String.join("\n", error, frame, conveyalFrame);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,17 @@ public RegionalWorkResult(OneOriginResult result, RegionalTask task) {
// TODO checkTravelTimeInvariants, checkAccessibilityInvariants to verify that values are monotonically increasing
}

/** Constructor used when results for this origin are considered unusable due to an unhandled error. */
/**
* Constructor used when results for this origin are considered unusable due to an unhandled error. Besides the
* short-form exception, most result fields are left null. There is no information to communicate, and because
* errors are often produced faster than valid results, we don't want to flood the backend with unnecessarily
* voluminous error reports. The short-form exception message is used for a similar reason, to limit the total size
* of error messages.
*/
public RegionalWorkResult(Throwable t, RegionalTask task) {
this.jobId = task.jobId;
this.taskId = task.taskId;
this.error = ExceptionUtils.shortAndLongString(t);
this.error = ExceptionUtils.filterStackTrace(t);
}

}
25 changes: 25 additions & 0 deletions src/main/java/com/conveyal/r5/util/ExceptionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,29 @@ public static String shortAndLongString (Throwable throwable) {
return shortCauseString(throwable) + "\n[detail follows]\n" + stackTraceString(throwable);
}

/**
* Given a full stack trace string with one frame per line, keep only the exception name, the first stack frame,
* and all additional frames that come from Conveyal packages. This yields a much shorter stack trace that still
* shows where the exception was thrown and where the problem originates in our own code.
*/
public static String filterStackTrace (String stackTrace) {
if (stackTrace == null) return null;
final String unknownFrame = "Unknown stack frame, probably optimized out by JVM.";
String error = stackTrace.lines().findFirst().get();
String frame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at "))
.findFirst().orElse(unknownFrame);
String conveyalFrame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at com.conveyal."))
.filter(s -> !frame.equals(s))
.findFirst().orElse("");
return String.join("\n", error, frame, conveyalFrame);
}

public static String filterStackTrace (Throwable throwable) {
return filterStackTrace(stackTraceString(throwable));
}

}

0 comments on commit 06e479f

Please sign in to comment.