Skip to content

najamelan/async_nursery

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

80 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

async_nursery

standard-readme compliant Build Status Docs crates.io

Primitive for structured concurrency.

The nursery allows writing concurrent programs adhering to structured concurrency. If you are new to the concept, there are some excellent resources on the dedicated structured concurrency forum. The name of the library is inspired by the excellent python Trio library.

Table of Contents

Description

async_nursery brings a structured concurrency primitive to Rust. There are three main goals in structured concurrency:

1. A sane control flow for concurrent programs.

Notes on structured concurrency, or: Go statement considered harmful by Nathaniel J. Smith explains this exquisitely. To summarize, if a function wants to split off and do some work concurrently, make sure that all its child tasks are finished when the function returns. That way it functions as the black box we are used to from synchronous code. A function has inputs and a return value, and when it is done, no code it created is running anymore.

You could already do this by stuffing JoinHandles from async_executors in a FuturesUnordered, but as we will see below, async_nursery is a bit more flexible and convenient. As opposed to the JoinHandles from tokio or async-std directly, the ones from async_executors do not detach on drop by default.

2. Prevent resource leakage.

Orphaned tasks, spawned without a JoinHandle can potentially stay alive forever, either running in loops, or deadlocking. Structured concurrency makes sure there are no leaks, putting all resources neatly in a call tree, very similar to a call stack. In a call tree, a stack frame can be several stack frames sitting side by side doing things concurrently, but when we return to the previous stack frame, all of them are done.

3. Propagate errors

In Rust it is common to propagate errors up the call stack. If you spawn a task and let it run off into the void, you need out-of-band error handling like channels. In structured concurrency, since all tasks get joined before their parent returns, you can return the errors just like in sync code. It is also possible to cancel all sibling tasks if one task runs into an error.

Properties of async_nursery:

  • Nursery acts as spawner.
  • NurseryStream implements Stream<Out> of the results of all the futures it nurses.
  • NurseryStream implements Future<Output=()> if you just want to wait for everything to finish, but don't care for returned values.
  • NurseryStream basically manages JoinHandles for you.
  • Can be backed by any executor that implements SpawnHandle or LocalSpawnHandle.
  • Cancels all running futures on dropping NurseryStream.
  • Nursery implements Sink for FutureObj and/or LocalFutureObj as well as Nurse and NurseExt.
  • Nursery forwards async_executor traits from the wrapped executor. This works for Timer, TokioIo, YieldNow and SpawnBlocking. Note that when using SpawnBlocking like this, the nursery does not manage the tasks, it just let's you use the wrapped executor.

Missing features

  • No API provided for cooperative cancellation. Since there is no support for that in std::task::Context, you must basically pass some cancellation token into a task that needs to do cleanup and doesn't support being dropped at all await points. Since it requires specific support of the spawned task, I leave this to the user. An example using an AtomicBool is included in the examples directory. The advantage is flexibility. You could cancel just certain tasks in the nursery and leave others running, or let the others be canceled by drop if they support it, etc. Async drop will most likely alleviate this pain one day, but it's not there yet.

  • No API is provided for running non-'static futures. This is not possible in safe rust because std::mem::forget could be used to leak the nursery and trick it to outlive it's parent stack frame, at which point it would hold an invalid reference. If you really want to go there, I suggest you look at the async-scoped crate which allows it by requiring you to use unsafe.

Install

With cargo add: cargo add async_nursery

With cargo yaml:

dependencies:

   async_nursery: ^0.5

With Cargo.toml

[dependencies]

   async_nursery = "0.5"

Upgrade

Please check out the changelog when upgrading.

Dependencies

This crate has few dependencies (futures and async_executors). Cargo will automatically handle it's dependencies for you. You will have to choose executors from the async_executors crate and set the correct feature on that crate to enable it.

There are no optional features.

Security

The crate uses forbid(unsafe), but depends on futures which has quite some unsafe. There are no security issues I'm aware of specific to using this crate.

Performance

Currently the implementation is simple. Nursery just sends the JoinHandle to NurseryStream over an unbounded channel. This is convenient, because it means NurseExt::nurse doesn't have to be async, but it has some overhead compared to using the underlying executor directly. In the future I hope to optimize the implementation.

Usage

Warning: If ever you wait on the stream to finish, remember it will only finish if there are no Nursery's alive anymore. You must drop the Nursery before awaiting the NurseryStream. If your program deadlocks, this should be the first place to look.

All tasks spawned on a nursery must have the same Future::Output type.

Basic example

There is an extensive list of examples for all kinds of patterns of using async_nursery in the examples directory. Please have a look at them.

use
{
   async_nursery   :: { Nursery, NurseExt } ,
   async_executors :: { AsyncStd          } ,
};

pub type DynResult<T> = Result<T, Box< dyn std::error::Error + Send + Sync + 'static >>;

async fn self_contained() -> DynResult<()>
{
   let (nursery, output) = Nursery::new( AsyncStd );

   for _ in 0..5
   {
      nursery.nurse( async { /* do something useful */ } )?;
   }

   // This is necessary. Since we could keep spawning tasks even after starting to poll
   // the output, it can't know that we are done, unless we drop all senders or call
   // `close_nursery`. If we don't, the await below deadlocks.
   //
   drop(nursery);

   // Resolves when all spawned tasks are done.
   //
   output.await;

   Ok(())
}

Returning errors

The functionality of TryStreamExt::try_next can be used to bail early if all concurrent tasks need to complete successfully. You can now drop the NurseryStream and cancel all running sibling tasks.

Recover other return types

It's possible to return useful data sometimes from spawned tasks. You can effectively see them as function calls or closures that can run concurrently. The nursery let's you recover these as you go. It could be used to implement a progress bar for example.

Another possibility is using collect to gain a collection of all returned values when everything is done.

Panics

Nursery has no special handling of panics. If your task panics, it depends on the executor what happens. Currently tokio is different from other executors in that it will catch_unwind your spawned tasks. Other executors propagate the panic to the thread that awaits the JoinHandles (eg. that awaits the NurseryStream). If you want a resilient application that works on all executors, use the catch_unwind combinator from the futures library. Again using TryStreamExt::try_next you can bail early if one task panics.

Differences with FuturesUnordered

Nursery and NurseryStream wrap a FuturesUnordered internally. The main feature this gives us is that it allows to us to start polling the stream of outputs and still continue to spawn more subtasks. FuturesUnordered has a very strict two phase API. First spawn, then get output. This allows us to use NuseryStream as a long-lived container. Eg. if you are going to spawn network requests, you can continuously listen to NurseryStream for errors that happened during processing while continuing to spawn further requests. Then when the connection closes, we want to stop processing outstanding requests for this connection. By dropping the NurseryStream, we can do that.

Further a few conveniences are added:

  • Nursery does the spawning for you, never need to handle JoinHandles manually.
  • NurseryStream not only implements Stream, but also Future, if you just want to wait for everything to finish and don't care for the Outputs.
  • Nursery can be cloned and send around, into function calls and spawned subtasks. You don't have to send back the JoinHandles through a channel manually to push them into the FuturesUnordered.

API

API documentation can be found on docs.rs.

Contributing

Please check out the contribution guidelines.

Testing

cargo test and wasm-pack test --firefox --headless -- -Z features=itarget --no-default-features although the latter requires nightly and doesn't work until rustwasm/wasm-pack#698 is resolved or you patch wasm-pack. You could use wasm-bindgen-cli.

Code of conduct

Any of the behaviors described in point 4 "Unacceptable Behavior" of the Citizens Code of Conduct are not welcome here and might get you banned. If anyone, including maintainers and moderators of the project, fail to respect these/your limits, you are entitled to call them out.

License

Unlicence