Skip to content

Commit

Permalink
Fix preloading
Browse files Browse the repository at this point in the history
* Set Status.PRESENT after synchronous preload

Fixes #949

* single underlying get method in AsyncLoader

The nonblocking version of get just runs getBlocking in a thread.
NetworkPreloader has corresponding preload and preloadBlocking methods.

* inline setError method

Method comments said it was called in the buildValue implementation
methods but it was actually only ever called in AsyncLoader error
handling code.

* add comments explaining current exception behavior

* move initial message back to async runnable

Just to keep behavior more similar to previous versions.
This could be changed in the future if we do a more significant refactor
of how exceptions are handled and passed up to the backend.

---------

Co-authored-by: Andrew Byrd <andrew@fastmail.net>
  • Loading branch information
ansoncfit and abyrd authored Dec 5, 2024
1 parent 9ba72c3 commit 5648f0b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 27 deletions.
11 changes: 6 additions & 5 deletions src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public NetworkPreloader(TransportNetworkCache transportNetworkCache) {
this.transportNetworkCache = transportNetworkCache;
}

public LoaderState<TransportNetwork> preloadData (AnalysisWorkerTask task) {
public LoaderState<TransportNetwork> preload (AnalysisWorkerTask task) {
if (task.scenario != null) {
transportNetworkCache.rememberScenario(task.scenario);
}
Expand All @@ -94,10 +94,11 @@ public LoaderState<TransportNetwork> preloadData (AnalysisWorkerTask task) {
* similar tasks will make interleaved calls to setProgress (with superficial map synchronization). Other than
* causing a value to briefly revert from PRESENT to BUILDING this doesn't seem deeply problematic.
* This is provided specifically for regional tasks, to ensure that they remain in preloading mode while all this
* data is prepared.
* data is prepared.
* Any exceptions that occur while building the network will escape this method, leaving the status as BUILDING.
*/
public TransportNetwork synchronousPreload (AnalysisWorkerTask task) {
return buildValue(Key.forTask(task));
public TransportNetwork preloadBlocking (AnalysisWorkerTask task) {
return getBlocking(Key.forTask(task));
}

@Override
Expand Down Expand Up @@ -140,7 +141,7 @@ protected TransportNetwork buildValue(Key key) {
linkedPointSet.getEgressCostTable(progressListener);
}
}
// Finished building all needed inputs for analysis, return the completed network to the AsyncLoader code.
// Finished building all needed inputs for analysis, return the completed network
return scenarioNetwork;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public static void sleepSeconds (int seconds) {
protected byte[] handleAndSerializeOneSinglePointTask (TravelTimeSurfaceTask task) throws IOException {
LOG.debug("Handling single-point task {}", task.toString());
// Get all the data needed to run one analysis task, or at least begin preparing it.
final AsyncLoader.LoaderState<TransportNetwork> networkLoaderState = networkPreloader.preloadData(task);
final AsyncLoader.LoaderState<TransportNetwork> networkLoaderState = networkPreloader.preload(task);

// If loading is not complete, bail out of this function.
// Ideally we'd stall briefly using something like Future.get(timeout) in case loading finishes quickly.
Expand Down Expand Up @@ -467,7 +467,7 @@ protected void handleOneRegionalTask (RegionalTask task) throws Throwable {
// LoadingCaches behind it. Specifically, the TransportNetworkCache and LinkageCache enforce turn-taking and
// prevent redundant work. If those are ever removed, we would need either async regional task preparation, or
// a synchronous mode with per-key blocking on AsyncLoader (kind of reinventing the wheel of LoadingCache).
TransportNetwork transportNetwork = networkPreloader.synchronousPreload(task);
TransportNetwork transportNetwork = networkPreloader.preloadBlocking(task);

// If we are generating a static site, there must be a single metadata file for an entire batch of results.
// Arbitrarily we create this metadata as part of the first task in the job.
Expand Down
45 changes: 25 additions & 20 deletions src/main/java/com/conveyal/r5/util/AsyncLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* "value is present in map".
*
* Potential problem: if we try to return immediately saying whether the needed data are available,
* there are some cases where preparing the reqeusted object might take only a few hundred milliseconds or less.
* there are some cases where preparing the requested object might take only a few hundred milliseconds or less.
* In that case then we don't want the caller to have to re-poll. In this case a Future.get() with timeout is good.
*
* Created by abyrd on 2018-09-14
Expand Down Expand Up @@ -97,10 +97,23 @@ public String toString() {
}
}

/**
* This has been factored out of the executor runnables so subclasses can force a blocking (non-async) load.
* Any exceptions that occur while building the value will escape this method, leaving the status as BUILDING.
*/
protected V getBlocking (K key) {
V value = buildValue(key);
synchronized (map) {
map.put(key, new LoaderState(Status.PRESENT, "Loaded", 100, value));
}
return value;
}

/**
* Attempt to fetch the value for the supplied key.
* If the value is not yet present, and not yet being computed / fetched, enqueue a task to do so.
* Return a response that reports status, and may or may not contain the value.
* Any exception that occurs while building the value is caught and associated with the key with a status of ERROR.
*/
public LoaderState<V> get (K key) {
LoaderState<V> state = null;
Expand All @@ -109,7 +122,7 @@ public LoaderState<V> get (K key) {
state = map.get(key);
if (state == null) {
// Only enqueue a task to load the value for this key if another call hasn't already done it.
state = new LoaderState<V>(Status.WAITING, "Enqueued task...", 0, null);
state = new LoaderState<>(Status.WAITING, "Enqueued task...", 0, null);
map.put(key, state);
enqueueLoadTask = true;
}
Expand All @@ -120,16 +133,16 @@ public LoaderState<V> get (K key) {
// Enqueue task outside the above block (synchronizing the fewest lines possible).
if (enqueueLoadTask) {
executor.execute(() -> {
setProgress(key, 0, "Starting...");
try {
V value = buildValue(key);
synchronized (map) {
map.put(key, new LoaderState(Status.PRESENT, null, 100, value));
}
setProgress(key, 0, "Starting...");
getBlocking(key);
} catch (Throwable t) {
// It's essential to trap Throwable rather than just Exception. Otherwise the executor
// threads can be killed by any Error that happens, stalling the executor.
setError(key, t);
// threads can be killed by any Error that happens, stalling the executor. The below permanently
// associates an error with the key. No further attempt will ever be made to create the value.
synchronized (map) {
map.put(key, new LoaderState(t));
}
LOG.error("Async load failed: " + ExceptionUtils.stackTraceString(t));
}
});
Expand All @@ -139,12 +152,13 @@ public LoaderState<V> get (K key) {

/**
* Override this method in concrete subclasses to specify the logic to build/calculate/fetch a value.
* Implementations may call setProgress to report progress on long operations.
* Implementations may call setProgress to report progress on long operations; if they do so, any callers of this
* method are responsible for also calling setComplete() to ensure loaded objects are marked as PRESENT.
* Throw an exception to indicate an error has occurred and the building process cannot complete.
* It's not entirely clear this should return a value - might be better to call setValue within the overridden
* method, just as we call setProgress or setError.
*/
protected abstract V buildValue(K key) throws Exception;
protected abstract V buildValue(K key);

/**
* Call this method inside the buildValue method to indicate progress.
Expand All @@ -155,13 +169,4 @@ public void setProgress(K key, int percentComplete, String message) {
}
}

/**
* Call this method inside the buildValue method to indicate that an unrecoverable error has happened.
* FIXME this will permanently associate an error with the key. No further attempt will ever be made to create the value.
*/
protected void setError (K key, Throwable throwable) {
synchronized (map) {
map.put(key, new LoaderState(throwable));
}
}
}

0 comments on commit 5648f0b

Please sign in to comment.