diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..52f7945 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,9 @@ +version: 2 +updates: + - package-ecosystem: cargo + directory: / + schedule: + interval: weekly + commit-message: + prefix: '' + labels: [] diff --git a/.github/workflows/build-and-test.yaml b/.github/workflows/build-and-test.yaml deleted file mode 100644 index 00e00cc..0000000 --- a/.github/workflows/build-and-test.yaml +++ /dev/null @@ -1,43 +0,0 @@ -name: Build and test - -on: - push: - branches: - - master - pull_request: - -jobs: - build_and_test: - runs-on: ${{ matrix.os }} - strategy: - fail-fast: false - matrix: - os: [ubuntu-latest] - rust: [nightly, beta, stable] - steps: - - uses: actions/checkout@v2 - - - name: Install latest ${{ matrix.rust }} - uses: actions-rs/toolchain@v1 - with: - toolchain: ${{ matrix.rust }} - profile: minimal - override: true - - - name: Run cargo check - uses: actions-rs/cargo@v1 - with: - command: check - args: --all --bins --examples --tests --all-features - - - name: Run cargo check (without dev-dependencies to catch missing feature flags) - if: startsWith(matrix.rust, 'nightly') - uses: actions-rs/cargo@v1 - with: - command: check - args: -Z features=dev_dep - - - name: Run cargo test - uses: actions-rs/cargo@v1 - with: - command: test diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..7a0ba04 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,91 @@ +name: CI + +permissions: + contents: read + +on: + pull_request: + push: + branches: + - master + schedule: + - cron: '0 2 * * 0' + +env: + CARGO_INCREMENTAL: 0 + CARGO_NET_GIT_FETCH_WITH_CLI: true + CARGO_NET_RETRY: 10 + CARGO_TERM_COLOR: always + RUST_BACKTRACE: 1 + RUSTFLAGS: -D warnings + RUSTDOCFLAGS: -D warnings + RUSTUP_MAX_RETRIES: 10 + +defaults: + run: + shell: bash + +jobs: + fmt: + uses: smol-rs/.github/.github/workflows/fmt.yml@main + security_audit: + uses: smol-rs/.github/.github/workflows/security_audit.yml@main + permissions: + checks: write + contents: read + issues: write + secrets: inherit + + test: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + rust: [nightly, beta, stable] + steps: + - uses: actions/checkout@v4 + - name: Install Rust + run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} + - run: rustup target add wasm32-unknown-unknown + - uses: taiki-e/install-action@cargo-hack + - run: cargo build --all --all-features --all-targets + if: startsWith(matrix.rust, 'nightly') + - name: Run cargo check (without dev-dependencies to catch missing feature flags) + if: startsWith(matrix.rust, 'nightly') + run: cargo check -Z features=dev_dep + - run: cargo test + - run: cargo test --all-features + - run: cargo check --all --all-features --target wasm32-unknown-unknown + - run: cargo hack build --all --all-features --target wasm32-unknown-unknown --no-dev-deps + + msrv: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install cargo-hack + uses: taiki-e/install-action@cargo-hack + - run: cargo hack build --rust-version + + clippy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install Rust + run: rustup update stable + - run: cargo clippy --all-features --all-targets + + miri: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install Rust + run: rustup toolchain install nightly --component miri && rustup default nightly + - run: cargo miri test + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout + - run: cargo miri test --all-features + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml deleted file mode 100644 index b6017f1..0000000 --- a/.github/workflows/lint.yaml +++ /dev/null @@ -1,23 +0,0 @@ -name: Lint - -on: - push: - branches: - - master - pull_request: - -jobs: - clippy: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - profile: minimal - components: clippy - - uses: actions-rs/clippy-check@v1 - with: - token: ${{ secrets.GITHUB_TOKEN }} - args: --all-features -- -W clippy::all diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..59fad1f --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,22 @@ +name: Release + +permissions: + contents: write + +on: + push: + tags: + - v[0-9]+.* + +jobs: + create-release: + if: github.repository_owner == 'smol-rs' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: taiki-e/create-gh-release-action@v1 + with: + changelog: CHANGELOG.md + branch: master + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/security.yaml b/.github/workflows/security.yaml deleted file mode 100644 index c4f7947..0000000 --- a/.github/workflows/security.yaml +++ /dev/null @@ -1,17 +0,0 @@ -name: Security audit - -on: - push: - branches: - - master - pull_request: - -jobs: - security_audit: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - - uses: actions-rs/audit-check@v1 - with: - token: ${{ secrets.GITHUB_TOKEN }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 3098544..94cb58d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,106 @@ +# Version 1.14.0 + +- Bump MSRV to 1.65. (#146) +- Fix docs.rs build. (#152) +- Upstreaming parts of the Hermit `no_std` patchset: + - Use `Self` where possible (#155) + - Import items from `core` and `alloc` if possible (#160) + +# Version 1.13.3 + +- Avoid places where the code had a possibility to block or panic. (#147) + +# Version 1.13.2 + +- Fix build failure with minimal-versions. (#132) +- Prevent executor from becoming unusable by panic of the iterator passed by the user to the `spawn_many`. (#136) +- Reduce memory footprint. (#137) + +# Version 1.13.1 + +- Fix docs.rs build. (#125) + +# Version 1.13.0 + +- Relax the `Send` bound on `LocalExecutor::spawn_many`. (#120) +- Ensure all features are documented on `docs.rs`. (#122) + +# Version 1.12.0 + +- Add static executors, which are an optimization over executors that are kept + around forever. (#112) + +# Version 1.11.0 + +- Re-export the `async_task::FallibleTask` primitive. (#113) +- Support racy initialization of the executor state. This should allow the executor to be + initialized on web targets without any issues. (#108) + +# Version 1.10.0 + +- Add a function `spawn_batch` that allows users to spawn multiple tasks while only locking the executor once. (#92) + +# Version 1.9.1 + +- Remove the thread-local optimization due to the bugs that it introduces. (#106) + +# Version 1.9.0 + +- Re-introduce the thread-local task push optimization to the executor. (#93) +- Bump `async-task` to v4.4.0. (#90) +- Replace some unnecessary atomic operations with non-atomic operations. (#94) +- Use weaker atomic orderings for notifications. (#95) +- When spawning a future, avoid looking up the ID to assign to that future twice. (#96) + +# Version 1.8.0 + +- When spawned tasks panic, the panic is caught and then surfaced in the spawned + `Task`. Previously, the panic would be surfaced in `tick()` or `run()`. (#78) + +# Version 1.7.2 + +- Fix compilation under WebAssembly targets (#77). + +# Version 1.7.1 + +- Fix compilation under WebAssembly targets (#75). +- Add a disclaimer indicating that this is a reference executor (#74). + +# Version 1.7.0 + +- Bump `async-lock` and `futures-lite` to their latest versions. (#70) + +# Version 1.6.0 + +- Remove the thread-local queue optimization, as it caused a number of bugs in production use cases. (#61) + +# Version 1.5.4 + +- Fix a panic that could happen when two concurrent `run()` calls are made and the thread local task slot is left as `None`. (#55) + +# Version 1.5.3 + +- Fix an accidental breaking change in v1.5.2, where `ex.run()` was no longer `Send`. (#50) +- Remove the unused `memchr` dependency. (#51) + +# Version 1.5.2 + +- Add thread-local task queue optimizations, allowing new tasks to avoid using the global queue. (#37) +- Update `fastrand` to v2. (#45) + +# Version 1.5.1 + +- Implement a better form of debug output for Executor and LocalExecutor. (#33) + +# Version 1.5.0 + +- Remove the dependency on the `once_cell` crate to restore the MSRV. (#29) +- Update `concurrent-queue` to v2. + +# Version 1.4.1 + +- Remove dependency on deprecated `vec-arena`. (#23) + # Version 1.4.0 - Add `Executor::is_empty()` and `LocalExecutor::is_empty()`. diff --git a/Cargo.toml b/Cargo.toml index 2c58175..827438e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,26 +1,48 @@ [package] name = "async-executor" -version = "1.4.0" -authors = ["Stjepan Glavina "] -edition = "2018" +# When publishing a new version: +# - Update CHANGELOG.md +# - Create "v1.x.y" git tag +version = "1.14.0" +authors = ["Stjepan Glavina ", "John Nunley "] +edition = "2021" +rust-version = "1.65" description = "Async executor" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/async-executor" -homepage = "https://github.com/smol-rs/async-executor" -documentation = "https://docs.rs/async-executor" keywords = ["asynchronous", "executor", "single", "multi", "spawn"] categories = ["asynchronous", "concurrency"] +exclude = ["/.*"] + +[features] +# Adds support for executors optimized for use in static variables. +static = [] [dependencies] -async-task = "4.0.0" -concurrent-queue = "1.2.2" -fastrand = "1.3.4" -futures-lite = "1.11.0" -once_cell = "1.4.1" -vec-arena = "1.0.0" +async-task = "4.4.0" +concurrent-queue = "2.5.0" +fastrand = "2.0.0" +futures-lite = { version = "2.0.0", default-features = false } +pin-project-lite = "0.2" +slab = "0.4.7" + +[target.'cfg(target_family = "wasm")'.dependencies] +futures-lite = { version = "2.0.0", default-features = false, features = ["std"] } [dev-dependencies] -async-channel = "1.4.1" -async-io = "1.1.9" +async-channel = "2.0.0" +async-io = "2.1.0" +async-lock = "3.0.0" +criterion = { version = "0.8", default-features = false, features = ["cargo_bench_support"] } easy-parallel = "3.1.0" -num_cpus = "1.13.0" +fastrand = "2.0.0" +futures-lite = "2.0.0" +once_cell = "1.16.0" + +[[bench]] +name = "executor" +harness = false +required-features = ["static"] + +[package.metadata.docs.rs] +all-features = true diff --git a/README.md b/README.md index c330bd6..fed40b9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # async-executor -[![Build](https://github.com/smol-rs/async-executor/workflows/Build%20and%20test/badge.svg)]( +[![Build](https://github.com/smol-rs/async-executor/actions/workflows/ci.yml/badge.svg)]( https://github.com/smol-rs/async-executor/actions) [![License](https://img.shields.io/badge/license-Apache--2.0_OR_MIT-blue.svg)]( https://github.com/smol-rs/async-executor) @@ -11,6 +11,13 @@ https://docs.rs/async-executor) Async executors. +This crate provides two reference executors that trade performance for +functionality. They should be considered reference executors that are "good +enough" for most use cases. For more specialized use cases, consider writing +your own executor on top of [`async-task`]. + +[`async-task`]: https://crates.io/crates/async-task + ## Examples ```rust diff --git a/benches/executor.rs b/benches/executor.rs index 98f1cb5..5fc140f 100644 --- a/benches/executor.rs +++ b/benches/executor.rs @@ -1,10 +1,11 @@ -#![feature(test)] +#![allow(clippy::incompatible_msrv)] // false positive: https://github.com/rust-lang/rust-clippy/issues/12257#issuecomment-2093667187 -extern crate test; +use std::hint::black_box; +use std::mem; +use std::thread::available_parallelism; -use std::future::Future; - -use async_executor::Executor; +use async_executor::{Executor, StaticExecutor}; +use criterion::{criterion_group, criterion_main, Criterion}; use futures_lite::{future, prelude::*}; const TASKS: usize = 300; @@ -12,98 +13,486 @@ const STEPS: usize = 300; const LIGHT_TASKS: usize = 25_000; static EX: Executor<'_> = Executor::new(); +static STATIC_EX: StaticExecutor = StaticExecutor::new(); + +fn run(f: impl FnOnce(), multithread: bool) { + let limit = if multithread { + available_parallelism().unwrap().get() + } else { + 1 + }; -fn run(f: impl FnOnce()) { let (s, r) = async_channel::bounded::<()>(1); easy_parallel::Parallel::new() - .each(0..num_cpus::get(), |_| future::block_on(EX.run(r.recv()))) + .each(0..limit, |_| future::block_on(EX.run(r.recv()))) .finish(move || { let _s = s; f() }); } -#[bench] -fn create(b: &mut test::Bencher) { - b.iter(move || { - let ex = Executor::new(); - let task = ex.spawn(async {}); - future::block_on(ex.run(task)); - }); -} +fn run_static(f: impl FnOnce(), multithread: bool) { + let limit = if multithread { + available_parallelism().unwrap().get() + } else { + 1 + }; -#[bench] -fn spawn_one(b: &mut test::Bencher) { - run(|| { - b.iter(move || { - future::block_on(async { EX.spawn(async {}).await }); + let (s, r) = async_channel::bounded::<()>(1); + easy_parallel::Parallel::new() + .each(0..limit, |_| future::block_on(STATIC_EX.run(r.recv()))) + .finish(move || { + let _s = s; + f() }); +} + +fn create(c: &mut Criterion) { + c.bench_function("executor::create", |b| { + b.iter(|| { + let ex = Executor::new(); + let task = ex.spawn(async {}); + future::block_on(ex.run(task)); + }) }); } -#[bench] -fn spawn_many(b: &mut test::Bencher) { - run(|| { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..LIGHT_TASKS { - tasks.push(EX.spawn(async {})); - } - for task in tasks { - task.await; +fn running_benches(c: &mut Criterion) { + for (prefix, with_static) in [("executor", false), ("static_executor", true)] { + for (group_name, multithread) in [("single_thread", false), ("multi_thread", true)].iter() { + let mut group = c.benchmark_group(group_name.to_string()); + + group.bench_function(format!("{prefix}::spawn_one"), |b| { + if with_static { + run_static( + || { + b.iter(|| { + future::block_on(async { STATIC_EX.spawn(async {}).await }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(|| { + future::block_on(async { EX.spawn(async {}).await }); + }); + }, + *multithread, + ); } }); - }); - }); -} -#[bench] -fn spawn_recursively(b: &mut test::Bencher) { - fn go(i: usize) -> impl Future + Send + 'static { - async move { - if i != 0 { - EX.spawn(async move { - let fut = go(i - 1).boxed(); - fut.await; - }) - .await; + if !with_static { + group.bench_function("executor::spawn_batch", |b| { + run( + || { + let mut handles = vec![]; + + b.iter(|| { + EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles); + }); + + handles.clear(); + }, + *multithread, + ) + }); } - } - } - run(|| { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..TASKS { - tasks.push(EX.spawn(go(STEPS))); - } - for task in tasks { - task.await; + group.bench_function(format!("{prefix}::spawn_many_local"), |b| { + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..LIGHT_TASKS { + tasks.push(STATIC_EX.spawn(async {})); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..LIGHT_TASKS { + tasks.push(EX.spawn(async {})); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); } }); - }); - }); -} -#[bench] -fn yield_now(b: &mut test::Bencher) { - run(|| { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..TASKS { - tasks.push(EX.spawn(async move { - for _ in 0..STEPS { - future::yield_now().await; + group.bench_function(format!("{prefix}::spawn_recursively"), |b| { + #[allow(clippy::manual_async_fn)] + fn go(i: usize) -> impl Future + Send + 'static { + async move { + if i != 0 { + EX.spawn(async move { + let fut = go(i - 1).boxed(); + fut.await; + }) + .await; + } + } + } + + #[allow(clippy::manual_async_fn)] + fn go_static(i: usize) -> impl Future + Send + 'static { + async move { + if i != 0 { + STATIC_EX + .spawn(async move { + let fut = go_static(i - 1).boxed(); + fut.await; + }) + .await; } - })); + } } - for task in tasks { - task.await; + + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(STATIC_EX.spawn(go_static(STEPS))); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(EX.spawn(go(STEPS))); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); } }); - }); - }); + + group.bench_function(format!("{prefix}::yield_now"), |b| { + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(STATIC_EX.spawn(async move { + for _ in 0..STEPS { + future::yield_now().await; + } + })); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(EX.spawn(async move { + for _ in 0..STEPS { + future::yield_now().await; + } + })); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } + }); + + group.bench_function(format!("{prefix}::channels"), |b| { + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + // Create channels. + let mut tasks = Vec::new(); + let (first_send, first_recv) = async_channel::bounded(1); + let mut current_recv = first_recv; + + for _ in 0..TASKS { + let (next_send, next_recv) = async_channel::bounded(1); + let current_recv = + mem::replace(&mut current_recv, next_recv); + + tasks.push(STATIC_EX.spawn(async move { + // Send a notification on to the next task. + for _ in 0..STEPS { + current_recv.recv().await.unwrap(); + next_send.send(()).await.unwrap(); + } + })); + } + + for _ in 0..STEPS { + first_send.send(()).await.unwrap(); + current_recv.recv().await.unwrap(); + } + + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ) + } else { + run( + || { + b.iter(move || { + future::block_on(async { + // Create channels. + let mut tasks = Vec::new(); + let (first_send, first_recv) = async_channel::bounded(1); + let mut current_recv = first_recv; + + for _ in 0..TASKS { + let (next_send, next_recv) = async_channel::bounded(1); + let current_recv = + mem::replace(&mut current_recv, next_recv); + + tasks.push(EX.spawn(async move { + // Send a notification on to the next task. + for _ in 0..STEPS { + current_recv.recv().await.unwrap(); + next_send.send(()).await.unwrap(); + } + })); + } + + for _ in 0..STEPS { + first_send.send(()).await.unwrap(); + current_recv.recv().await.unwrap(); + } + + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ) + } + }); + + group.bench_function(format!("{prefix}::web_server"), |b| { + if with_static { + run_static( + || { + b.iter(move || { + future::block_on(async { + let (db_send, db_recv) = + async_channel::bounded::>( + TASKS / 5, + ); + let mut db_rng = fastrand::Rng::with_seed(0x12345678); + let mut web_rng = db_rng.fork(); + + // This task simulates a database. + let db_task = STATIC_EX.spawn(async move { + loop { + // Wait for a new task. + let incoming = match db_recv.recv().await { + Ok(incoming) => incoming, + Err(_) => break, + }; + + // Process the task. Maybe it takes a while. + for _ in 0..db_rng.usize(..10) { + future::yield_now().await; + } + + // Send the data back. + incoming.send(db_rng.usize(..)).await.ok(); + } + }); + + // This task simulates a web server waiting for new tasks. + let server_task = STATIC_EX.spawn(async move { + for i in 0..TASKS { + // Get a new connection. + if web_rng.usize(..=16) == 16 { + future::yield_now().await; + } + + let mut web_rng = web_rng.fork(); + let db_send = db_send.clone(); + let task = STATIC_EX.spawn(async move { + // Check if the data is cached... + if web_rng.bool() { + // ...it's in cache! + future::yield_now().await; + return; + } + + // Otherwise we have to make a DB call or two. + for _ in 0..web_rng.usize(STEPS / 2..STEPS) { + let (resp_send, resp_recv) = + async_channel::bounded(1); + db_send.send(resp_send).await.unwrap(); + black_box(resp_recv.recv().await.unwrap()); + } + + // Send the data back... + for _ in 0..web_rng.usize(3..16) { + future::yield_now().await; + } + }); + + task.detach(); + + if i & 16 == 0 { + future::yield_now().await; + } + } + }); + + // Spawn and wait for it to stop. + server_task.await; + db_task.await; + }); + }) + }, + *multithread, + ) + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let (db_send, db_recv) = + async_channel::bounded::>( + TASKS / 5, + ); + let mut db_rng = fastrand::Rng::with_seed(0x12345678); + let mut web_rng = db_rng.fork(); + + // This task simulates a database. + let db_task = EX.spawn(async move { + loop { + // Wait for a new task. + let incoming = match db_recv.recv().await { + Ok(incoming) => incoming, + Err(_) => break, + }; + + // Process the task. Maybe it takes a while. + for _ in 0..db_rng.usize(..10) { + future::yield_now().await; + } + + // Send the data back. + incoming.send(db_rng.usize(..)).await.ok(); + } + }); + + // This task simulates a web server waiting for new tasks. + let server_task = EX.spawn(async move { + for i in 0..TASKS { + // Get a new connection. + if web_rng.usize(..=16) == 16 { + future::yield_now().await; + } + + let mut web_rng = web_rng.fork(); + let db_send = db_send.clone(); + let task = EX.spawn(async move { + // Check if the data is cached... + if web_rng.bool() { + // ...it's in cache! + future::yield_now().await; + return; + } + + // Otherwise we have to make a DB call or two. + for _ in 0..web_rng.usize(STEPS / 2..STEPS) { + let (resp_send, resp_recv) = + async_channel::bounded(1); + db_send.send(resp_send).await.unwrap(); + black_box(resp_recv.recv().await.unwrap()); + } + + // Send the data back... + for _ in 0..web_rng.usize(3..16) { + future::yield_now().await; + } + }); + + task.detach(); + + if i & 16 == 0 { + future::yield_now().await; + } + } + }); + + // Spawn and wait for it to stop. + server_task.await; + db_task.await; + }); + }) + }, + *multithread, + ) + } + }); + } + } } + +criterion_group!(benches, create, running_benches); + +criterion_main!(benches); diff --git a/examples/limit.rs b/examples/limit.rs new file mode 100644 index 0000000..4342c79 --- /dev/null +++ b/examples/limit.rs @@ -0,0 +1,95 @@ +//! An executor where you can only push a limited number of tasks. + +use async_executor::{Executor, Task}; +use async_lock::Semaphore; +use std::{future::Future, sync::Arc, time::Duration}; + +/// An executor where you can only push a limited number of tasks. +struct LimitedExecutor { + /// Inner running executor. + executor: Executor<'static>, + + /// Semaphore limiting the number of tasks. + semaphore: Arc, +} + +impl LimitedExecutor { + fn new(max: usize) -> Self { + Self { + executor: Executor::new(), + semaphore: Semaphore::new(max).into(), + } + } + + /// Spawn a task, waiting until there is a slot available. + async fn spawn(&self, future: F) -> Task + where + F::Output: Send + 'static, + { + // Wait for a semaphore permit. + let permit = self.semaphore.acquire_arc().await; + + // Wrap it into a new future. + let future = async move { + let result = future.await; + drop(permit); + result + }; + + // Spawn the task. + self.executor.spawn(future) + } + + /// Run a future to completion. + async fn run(&self, future: F) -> F::Output { + self.executor.run(future).await + } +} + +fn main() { + futures_lite::future::block_on(async { + let ex = Arc::new(LimitedExecutor::new(10)); + ex.run({ + let ex = ex.clone(); + async move { + // Spawn a bunch of tasks that wait for a while. + for i in 0..15 { + ex.spawn(async move { + async_io::Timer::after(Duration::from_millis(fastrand::u64(1..3))).await; + println!("Waiting task #{i} finished!"); + }) + .await + .detach(); + } + + let (start_tx, start_rx) = async_channel::bounded::<()>(1); + let mut current_rx = start_rx; + + // Send the first message. + start_tx.send(()).await.unwrap(); + + // Spawn a bunch of channel tasks that wake eachother up. + for i in 0..25 { + let (next_tx, next_rx) = async_channel::bounded::<()>(1); + + ex.spawn(async move { + current_rx.recv().await.unwrap(); + println!("Channel task {i} woken up!"); + next_tx.send(()).await.unwrap(); + println!("Channel task {i} finished!"); + }) + .await + .detach(); + + current_rx = next_rx; + } + + // Wait for the last task to finish. + current_rx.recv().await.unwrap(); + + println!("All tasks finished!"); + } + }) + .await; + }); +} diff --git a/examples/priority.rs b/examples/priority.rs index df77dd1..b1dc01e 100644 --- a/examples/priority.rs +++ b/examples/priority.rs @@ -1,6 +1,5 @@ //! An executor with task priorities. -use std::future::Future; use std::thread; use async_executor::{Executor, Task}; @@ -73,9 +72,9 @@ fn main() { // Spawn a task with this priority. tasks.push(EX.spawn(priority, async move { - println!("{:?}", priority); + println!("{priority:?}"); future::yield_now().await; - println!("{:?}", priority); + println!("{priority:?}"); })); } diff --git a/rustfmt.toml b/rustfmt.toml deleted file mode 100644 index 1082fd8..0000000 --- a/rustfmt.toml +++ /dev/null @@ -1 +0,0 @@ -version = "Two" diff --git a/src/lib.rs b/src/lib.rs index a43a498..89f95d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,12 @@ //! Async executors. //! +//! This crate provides two reference executors that trade performance for +//! functionality. They should be considered reference executors that are "good +//! enough" for most use cases. For more specialized use cases, consider writing +//! your own executor on top of [`async-task`]. +//! +//! [`async-task`]: https://crates.io/crates/async-task +//! //! # Examples //! //! ``` @@ -18,23 +25,48 @@ //! future::block_on(ex.run(task)); //! ``` -#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] - -use std::future::Future; -use std::marker::PhantomData; -use std::panic::{RefUnwindSafe, UnwindSafe}; -use std::rc::Rc; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; -use std::task::{Poll, Waker}; - -use async_task::Runnable; +#![warn( + missing_docs, + missing_debug_implementations, + rust_2018_idioms, + clippy::undocumented_unsafe_blocks +)] +#![doc( + html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" +)] +#![doc( + html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" +)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![allow(clippy::unused_unit)] // false positive fixed in Rust 1.89 + +extern crate alloc; + +use alloc::rc::Rc; +use alloc::sync::Arc; +use alloc::vec::Vec; +use core::fmt; +use core::marker::PhantomData; +use core::panic::{RefUnwindSafe, UnwindSafe}; +use core::pin::Pin; +use core::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; +use core::task::{Context, Poll, Waker}; +use std::sync::{Mutex, MutexGuard, PoisonError, RwLock, TryLockError}; + +use async_task::{Builder, Runnable}; use concurrent_queue::ConcurrentQueue; use futures_lite::{future, prelude::*}; -use vec_arena::Arena; +use pin_project_lite::pin_project; +use slab::Slab; + +#[cfg(feature = "static")] +mod static_executors; #[doc(no_inline)] -pub use async_task::Task; +pub use async_task::{FallibleTask, Task}; +#[cfg(feature = "static")] +#[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))] +pub use static_executors::*; /// An async executor. /// @@ -60,21 +92,28 @@ pub use async_task::Task; /// drop(signal); /// })); /// ``` -#[derive(Debug)] pub struct Executor<'a> { /// The executor state. - state: once_cell::sync::OnceCell>, + state: AtomicPtr, /// Makes the `'a` lifetime invariant. - _marker: PhantomData>, + _marker: PhantomData>, } +// SAFETY: Executor stores no thread local state that can be accessed via other thread. unsafe impl Send for Executor<'_> {} +// SAFETY: Executor internally synchronizes all of it's operations internally. unsafe impl Sync for Executor<'_> {} impl UnwindSafe for Executor<'_> {} impl RefUnwindSafe for Executor<'_> {} +impl fmt::Debug for Executor<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_executor(self, "Executor", f) + } +} + impl<'a> Executor<'a> { /// Creates a new executor. /// @@ -85,9 +124,9 @@ impl<'a> Executor<'a> { /// /// let ex = Executor::new(); /// ``` - pub const fn new() -> Executor<'a> { - Executor { - state: once_cell::sync::OnceCell::new(), + pub const fn new() -> Self { + Self { + state: AtomicPtr::new(core::ptr::null_mut()), _marker: PhantomData, } } @@ -111,7 +150,7 @@ impl<'a> Executor<'a> { /// assert!(ex.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.state().active.lock().unwrap().is_empty() + self.state().active().is_empty() } /// Spawns a task onto the executor. @@ -128,19 +167,124 @@ impl<'a> Executor<'a> { /// }); /// ``` pub fn spawn(&self, future: impl Future + Send + 'a) -> Task { - let mut active = self.state().active.lock().unwrap(); + let state = self.state(); + let mut active = state.active(); + // SAFETY: `T` and the future are `Send`. + unsafe { Self::spawn_inner(state, future, &mut active) } + } + + /// Spawns many tasks onto the executor. + /// + /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and + /// spawns all of the tasks in one go. With large amounts of tasks this can improve + /// contention. + /// + /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to + /// prevent runner thread starvation. It is assumed that the iterator provided does not + /// block; blocking iterators can lock up the internal mutex and therefore the entire + /// executor. + /// + /// ## Example + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::{stream, prelude::*}; + /// use core::future::ready; + /// + /// # futures_lite::future::block_on(async { + /// let mut ex = Executor::new(); + /// + /// let futures = [ + /// ready(1), + /// ready(2), + /// ready(3) + /// ]; + /// + /// // Spawn all of the futures onto the executor at once. + /// let mut tasks = vec![]; + /// ex.spawn_many(futures, &mut tasks); + /// + /// // Await all of them. + /// let results = ex.run(async move { + /// stream::iter(tasks).then(|x| x).collect::>().await + /// }).await; + /// assert_eq!(results, [1, 2, 3]); + /// # }); + /// ``` + /// + /// [`spawn`]: Executor::spawn + pub fn spawn_many + Send + 'a>( + &self, + futures: impl IntoIterator, + handles: &mut impl Extend>, + ) { + let state = self.state(); + let mut active = Some(state.as_ref().active()); + + // Convert the futures into tasks. + let tasks = futures.into_iter().enumerate().map(move |(i, future)| { + // SAFETY: `T` and the future are `Send`. + let task = unsafe { Self::spawn_inner(state, future, active.as_mut().unwrap()) }; + + // Yield the lock every once in a while to ease contention. + if i.wrapping_sub(1) % 500 == 0 { + drop(active.take()); + active = Some(self.state().active()); + } + + task + }); + + // Push the tasks to the user's collection. + handles.extend(tasks); + } + + /// Spawn a future while holding the inner lock. + /// + /// # Safety + /// + /// If this is an `Executor`, `F` and `T` must be `Send`. + unsafe fn spawn_inner( + state: Pin<&'a State>, + future: impl Future + 'a, + active: &mut Slab, + ) -> Task { // Remove the task from the set of active tasks when the future finishes. - let index = active.next_vacant(); - let state = self.state().clone(); - let future = async move { - let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(index))); - future.await - }; + let entry = active.vacant_entry(); + let index = entry.key(); + let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index))); // Create the task and register it in the set of active tasks. - let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) }; - active.insert(runnable.waker()); + // + // SAFETY: + // + // If `future` is not `Send`, this must be a `LocalExecutor` as per this + // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `LocalExecutor`. Similarly, `spawn` can only be called + // from the origin thread, ensuring that `future` and the executor share + // the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // + // `future` is not `'static`, but we make sure that the `Runnable` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. Then, the queue inside of + // the `Executor` is drained of all of its runnables. This ensures that + // runnables are dropped and this precondition is satisfied. + // + // `Self::schedule` is `Send` and `Sync`, as checked below. + // Therefore we do not need to worry about which thread the `Waker` is used + // and dropped on. + // + // `Self::schedule` may not be `'static`, but we make sure that the `Waker` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. + let (runnable, task) = Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, Self::schedule(state)); + entry.insert(runnable.waker()); runnable.schedule(); task @@ -164,18 +308,7 @@ impl<'a> Executor<'a> { /// assert!(ex.try_tick()); // a task was found /// ``` pub fn try_tick(&self) -> bool { - match self.state().queue.pop() { - Err(_) => false, - Ok(runnable) => { - // Notify another ticker now to pick up where this ticker left off, just in case - // running the task takes a long time. - self.state().notify(); - - // Run the task. - runnable.run(); - true - } - } + self.state().try_tick() } /// Runs a single task. @@ -198,9 +331,7 @@ impl<'a> Executor<'a> { /// future::block_on(ex.tick()); // runs the task /// ``` pub async fn tick(&self) { - let state = self.state(); - let runnable = Ticker::new(state).runnable().await; - runnable.run(); + self.state().tick().await; } /// Runs the executor until the given future completes. @@ -219,59 +350,75 @@ impl<'a> Executor<'a> { /// assert_eq!(res, 6); /// ``` pub async fn run(&self, future: impl Future) -> T { - let runner = Runner::new(self.state()); - - // A future that runs tasks forever. - let run_forever = async { - loop { - for _ in 0..200 { - let runnable = runner.runnable().await; - runnable.run(); - } - future::yield_now().await; - } - }; - - // Run `future` and `run_forever` concurrently until `future` completes. - future.or(run_forever).await + self.state().run(future).await } /// Returns a function that schedules a runnable task when it gets woken up. - fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { - let state = self.state().clone(); - - // TODO(stjepang): If possible, push into the current local queue and notify the ticker. + fn schedule(state: Pin<&'a State>) -> impl Fn(Runnable) + Send + Sync + 'a { + // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { - state.queue.push(runnable).unwrap(); + let result = state.queue.push(runnable); + debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail. state.notify(); } } - /// Returns a reference to the inner state. - fn state(&self) -> &Arc { - self.state.get_or_init(|| Arc::new(State::new())) + /// Returns a pointer to the inner state. + #[inline] + fn state(&self) -> Pin<&'a State> { + #[cold] + fn alloc_state(atomic_ptr: &AtomicPtr) -> *mut State { + let state = Arc::new(State::new()); + let ptr = Arc::into_raw(state).cast_mut(); + if let Err(actual) = atomic_ptr.compare_exchange( + core::ptr::null_mut(), + ptr, + Ordering::AcqRel, + Ordering::Acquire, + ) { + // SAFETY: This was just created from Arc::into_raw. + drop(unsafe { Arc::from_raw(ptr) }); + actual + } else { + ptr + } + } + + let mut ptr = self.state.load(Ordering::Acquire); + if ptr.is_null() { + ptr = alloc_state(&self.state); + } + + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // and will never be moved until it's dropped. + Pin::new(unsafe { &*ptr }) } } impl Drop for Executor<'_> { fn drop(&mut self) { - if let Some(state) = self.state.get() { - let mut active = state.active.lock().unwrap(); - for i in 0..active.capacity() { - if let Some(w) = active.remove(i) { - w.wake(); - } - } - drop(active); + let ptr = *self.state.get_mut(); + if ptr.is_null() { + return; + } + + // SAFETY: As ptr is not null, it was allocated via Arc::new and converted + // via Arc::into_raw in state_ptr. + let state = unsafe { Arc::from_raw(ptr) }; - while state.queue.pop().is_ok() {} + let mut active = state.pin().active(); + for w in active.drain() { + w.wake(); } + drop(active); + + while state.queue.pop().is_ok() {} } } impl<'a> Default for Executor<'a> { - fn default() -> Executor<'a> { - Executor::new() + fn default() -> Self { + Self::new() } } @@ -291,10 +438,9 @@ impl<'a> Default for Executor<'a> { /// println!("Hello world!"); /// })); /// ``` -#[derive(Debug)] pub struct LocalExecutor<'a> { /// The inner executor. - inner: once_cell::unsync::OnceCell>, + inner: Executor<'a>, /// Makes the type `!Send` and `!Sync`. _marker: PhantomData>, @@ -303,6 +449,12 @@ pub struct LocalExecutor<'a> { impl UnwindSafe for LocalExecutor<'_> {} impl RefUnwindSafe for LocalExecutor<'_> {} +impl fmt::Debug for LocalExecutor<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_executor(&self.inner, "LocalExecutor", f) + } +} + impl<'a> LocalExecutor<'a> { /// Creates a single-threaded executor. /// @@ -313,9 +465,9 @@ impl<'a> LocalExecutor<'a> { /// /// let local_ex = LocalExecutor::new(); /// ``` - pub const fn new() -> LocalExecutor<'a> { - LocalExecutor { - inner: once_cell::unsync::OnceCell::new(), + pub const fn new() -> Self { + Self { + inner: Executor::new(), _marker: PhantomData, } } @@ -356,22 +508,73 @@ impl<'a> LocalExecutor<'a> { /// }); /// ``` pub fn spawn(&self, future: impl Future + 'a) -> Task { - let mut active = self.inner().state().active.lock().unwrap(); - - // Remove the task from the set of active tasks when the future finishes. - let index = active.next_vacant(); - let state = self.inner().state().clone(); - let future = async move { - let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(index))); - future.await - }; + let state = self.inner().state(); + let mut active = state.active(); - // Create the task and register it in the set of active tasks. - let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) }; - active.insert(runnable.waker()); + // SAFETY: This executor is not thread safe, so the future and its result + // cannot be sent to another thread. + unsafe { Executor::spawn_inner(state, future, &mut active) } + } - runnable.schedule(); - task + /// Spawns many tasks onto the executor. + /// + /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and + /// spawns all of the tasks in one go. With large amounts of tasks this can improve + /// contention. + /// + /// It is assumed that the iterator provided does not block; blocking iterators can lock up + /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the + /// mutex is not released, as there are no other threads that can poll this executor. + /// + /// ## Example + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::{stream, prelude::*}; + /// use core::future::ready; + /// + /// # futures_lite::future::block_on(async { + /// let mut ex = LocalExecutor::new(); + /// + /// let futures = [ + /// ready(1), + /// ready(2), + /// ready(3) + /// ]; + /// + /// // Spawn all of the futures onto the executor at once. + /// let mut tasks = vec![]; + /// ex.spawn_many(futures, &mut tasks); + /// + /// // Await all of them. + /// let results = ex.run(async move { + /// stream::iter(tasks).then(|x| x).collect::>().await + /// }).await; + /// assert_eq!(results, [1, 2, 3]); + /// # }); + /// ``` + /// + /// [`spawn`]: LocalExecutor::spawn + pub fn spawn_many + 'a>( + &self, + futures: impl IntoIterator, + handles: &mut impl Extend>, + ) { + let state = self.inner().state(); + let mut active = state.active(); + + // Convert all of the futures to tasks. + let tasks = futures.into_iter().map(|future| { + // SAFETY: This executor is not thread safe, so the future and its result + // cannot be sent to another thread. + unsafe { Executor::spawn_inner(state, future, &mut active) } + + // As only one thread can spawn or poll tasks at a time, there is no need + // to release lock contention here. + }); + + // Push them to the user's collection. + handles.extend(tasks); } /// Attempts to run a task if at least one is scheduled. @@ -437,30 +640,19 @@ impl<'a> LocalExecutor<'a> { self.inner().run(future).await } - /// Returns a function that schedules a runnable task when it gets woken up. - fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { - let state = self.inner().state().clone(); - - move |runnable| { - state.queue.push(runnable).unwrap(); - state.notify(); - } - } - /// Returns a reference to the inner executor. fn inner(&self) -> &Executor<'a> { - self.inner.get_or_init(|| Executor::new()) + &self.inner } } impl<'a> Default for LocalExecutor<'a> { - fn default() -> LocalExecutor<'a> { - LocalExecutor::new() + fn default() -> Self { + Self::new() } } /// The state of a executor. -#[derive(Debug)] struct State { /// The global queue. queue: ConcurrentQueue, @@ -475,13 +667,13 @@ struct State { sleepers: Mutex, /// Currently active tasks. - active: Mutex>, + active: Mutex>, } impl State { /// Creates state for a new executor. - fn new() -> State { - State { + const fn new() -> Self { + Self { queue: ConcurrentQueue::unbounded(), local_queues: RwLock::new(Vec::new()), notified: AtomicBool::new(true), @@ -490,28 +682,82 @@ impl State { wakers: Vec::new(), free_ids: Vec::new(), }), - active: Mutex::new(Arena::new()), + active: Mutex::new(Slab::new()), } } + fn pin(&self) -> Pin<&Self> { + Pin::new(self) + } + + /// Returns a reference to currently active tasks. + fn active(self: Pin<&Self>) -> MutexGuard<'_, Slab> { + self.get_ref() + .active + .lock() + .unwrap_or_else(PoisonError::into_inner) + } + /// Notifies a sleeping ticker. #[inline] fn notify(&self) { if self .notified - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) .is_ok() { - let waker = self.sleepers.lock().unwrap().notify(); + let waker = self + .sleepers + .lock() + .unwrap_or_else(PoisonError::into_inner) + .notify(); if let Some(w) = waker { w.wake(); } } } + + pub(crate) fn try_tick(&self) -> bool { + match self.queue.pop() { + Err(_) => false, + Ok(runnable) => { + // Notify another ticker now to pick up where this ticker left off, just in case + // running the task takes a long time. + self.notify(); + + // Run the task. + runnable.run(); + true + } + } + } + + pub(crate) async fn tick(&self) { + let runnable = Ticker::new(self).runnable().await; + runnable.run(); + } + + pub async fn run(&self, future: impl Future) -> T { + let mut runner = Runner::new(self); + let mut rng = fastrand::Rng::new(); + + // A future that runs tasks forever. + let run_forever = async { + loop { + for _ in 0..200 { + let runnable = runner.runnable(&mut rng).await; + runnable.run(); + } + future::yield_now().await; + } + }; + + // Run `future` and `run_forever` concurrently until `future` completes. + future.or(run_forever).await + } } /// A list of sleeping tickers. -#[derive(Debug)] struct Sleepers { /// Number of sleeping tickers (both notified and unnotified). count: usize, @@ -543,9 +789,7 @@ impl Sleepers { fn update(&mut self, id: usize, waker: &Waker) -> bool { for item in &mut self.wakers { if item.0 == id { - if !item.1.will_wake(waker) { - item.1 = waker.clone(); - } + item.1.clone_from(waker); return false; } } @@ -588,7 +832,6 @@ impl Sleepers { } /// Runs task one by one. -#[derive(Debug)] struct Ticker<'a> { /// The executor state. state: &'a State, @@ -597,31 +840,32 @@ struct Ticker<'a> { /// /// States a ticker can be in: /// 1) Woken. - /// 2a) Sleeping and unnotified. - /// 2b) Sleeping and notified. - sleeping: AtomicUsize, + /// 2a) Sleeping and unnotified. + /// 2b) Sleeping and notified. + sleeping: usize, } -impl Ticker<'_> { +impl<'a> Ticker<'a> { /// Creates a ticker. - fn new(state: &State) -> Ticker<'_> { - Ticker { - state, - sleeping: AtomicUsize::new(0), - } + fn new(state: &'a State) -> Self { + Self { state, sleeping: 0 } } /// Moves the ticker into sleeping and unnotified state. /// /// Returns `false` if the ticker was already sleeping and unnotified. - fn sleep(&self, waker: &Waker) -> bool { - let mut sleepers = self.state.sleepers.lock().unwrap(); - - match self.sleeping.load(Ordering::SeqCst) { + fn sleep(&mut self, waker: &Waker) -> bool { + let mut sleepers = self + .state + .sleepers + .lock() + .unwrap_or_else(PoisonError::into_inner); + + match self.sleeping { // Move to sleeping state. - 0 => self - .sleeping - .store(sleepers.insert(waker), Ordering::SeqCst), + 0 => { + self.sleeping = sleepers.insert(waker); + } // Already sleeping, check if notified. id => { @@ -633,31 +877,35 @@ impl Ticker<'_> { self.state .notified - .swap(sleepers.is_notified(), Ordering::SeqCst); + .store(sleepers.is_notified(), Ordering::Release); true } /// Moves the ticker into woken state. - fn wake(&self) { - let id = self.sleeping.swap(0, Ordering::SeqCst); - if id != 0 { - let mut sleepers = self.state.sleepers.lock().unwrap(); - sleepers.remove(id); + fn wake(&mut self) { + if self.sleeping != 0 { + let mut sleepers = self + .state + .sleepers + .lock() + .unwrap_or_else(PoisonError::into_inner); + sleepers.remove(self.sleeping); self.state .notified - .swap(sleepers.is_notified(), Ordering::SeqCst); + .store(sleepers.is_notified(), Ordering::Release); } + self.sleeping = 0; } /// Waits for the next runnable task to run. - async fn runnable(&self) -> Runnable { + async fn runnable(&mut self) -> Runnable { self.runnable_with(|| self.state.queue.pop().ok()).await } /// Waits for the next runnable task to run, given a function that searches for a task. - async fn runnable_with(&self, mut search: impl FnMut() -> Option) -> Runnable { + async fn runnable_with(&mut self, mut search: impl FnMut() -> Option) -> Runnable { future::poll_fn(|cx| { loop { match search() { @@ -688,14 +936,17 @@ impl Ticker<'_> { impl Drop for Ticker<'_> { fn drop(&mut self) { // If this ticker is in sleeping state, it must be removed from the sleepers list. - let id = self.sleeping.swap(0, Ordering::SeqCst); - if id != 0 { - let mut sleepers = self.state.sleepers.lock().unwrap(); - let notified = sleepers.remove(id); + if self.sleeping != 0 { + let mut sleepers = self + .state + .sleepers + .lock() + .unwrap_or_else(PoisonError::into_inner); + let notified = sleepers.remove(self.sleeping); self.state .notified - .swap(sleepers.is_notified(), Ordering::SeqCst); + .store(sleepers.is_notified(), Ordering::Release); // If this ticker was notified, then notify another ticker. if notified { @@ -709,7 +960,6 @@ impl Drop for Ticker<'_> { /// A worker in a work-stealing executor. /// /// This is just a ticker that also has an associated local queue for improved cache locality. -#[derive(Debug)] struct Runner<'a> { /// The executor state. state: &'a State, @@ -721,28 +971,28 @@ struct Runner<'a> { local: Arc>, /// Bumped every time a runnable task is found. - ticks: AtomicUsize, + ticks: usize, } -impl Runner<'_> { +impl<'a> Runner<'a> { /// Creates a runner and registers it in the executor state. - fn new(state: &State) -> Runner<'_> { - let runner = Runner { + fn new(state: &'a State) -> Self { + let runner = Self { state, ticker: Ticker::new(state), local: Arc::new(ConcurrentQueue::bounded(512)), - ticks: AtomicUsize::new(0), + ticks: 0, }; state .local_queues .write() - .unwrap() + .unwrap_or_else(PoisonError::into_inner) .push(runner.local.clone()); runner } /// Waits for the next runnable task to run. - async fn runnable(&self) -> Runnable { + async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable { let runnable = self .ticker .runnable_with(|| { @@ -758,25 +1008,25 @@ impl Runner<'_> { } // Try stealing from other runners. - let local_queues = self.state.local_queues.read().unwrap(); - - // Pick a random starting point in the iterator list and rotate the list. - let n = local_queues.len(); - let start = fastrand::usize(..n); - let iter = local_queues - .iter() - .chain(local_queues.iter()) - .skip(start) - .take(n); - - // Remove this runner's local queue. - let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); - - // Try stealing from each local queue in the list. - for local in iter { - steal(local, &self.local); - if let Ok(r) = self.local.pop() { - return Some(r); + if let Ok(local_queues) = self.state.local_queues.try_read() { + // Pick a random starting point in the iterator list and rotate the list. + let n = local_queues.len(); + let start = rng.usize(..n); + let iter = local_queues + .iter() + .chain(local_queues.iter()) + .skip(start) + .take(n); + + // Remove this runner's local queue. + let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); + + // Try stealing from each local queue in the list. + for local in iter { + steal(local, &self.local); + if let Ok(r) = self.local.pop() { + return Some(r); + } } } @@ -785,9 +1035,9 @@ impl Runner<'_> { .await; // Bump the tick counter. - let ticks = self.ticks.fetch_add(1, Ordering::SeqCst); + self.ticks = self.ticks.wrapping_add(1); - if ticks % 64 == 0 { + if self.ticks % 64 == 0 { // Steal tasks from the global queue to ensure fair task scheduling. steal(&self.state.queue, &self.local); } @@ -802,7 +1052,7 @@ impl Drop for Runner<'_> { self.state .local_queues .write() - .unwrap() + .unwrap_or_else(PoisonError::into_inner) .retain(|local| !Arc::ptr_eq(local, &self.local)); // Re-schedule remaining tasks in the local queue. @@ -834,11 +1084,153 @@ fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { } } +/// Debug implementation for `Executor` and `LocalExecutor`. +fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Get a reference to the state. + let ptr = executor.state.load(Ordering::Acquire); + if ptr.is_null() { + // The executor has not been initialized. + struct Uninitialized; + + impl fmt::Debug for Uninitialized { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + + return f.debug_tuple(name).field(&Uninitialized).finish(); + } + + // SAFETY: If the state pointer is not null, it must have been + // allocated properly by Arc::new and converted via Arc::into_raw + // in state_ptr. + let state = unsafe { &*ptr }; + + debug_state(state, name, f) +} + +/// Debug implementation for `Executor` and `LocalExecutor`. +fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { + /// Debug wrapper for the number of active tasks. + struct ActiveTasks<'a>(&'a Mutex>); + + impl fmt::Debug for ActiveTasks<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_lock() { + Ok(lock) => fmt::Debug::fmt(&lock.len(), f), + Err(TryLockError::WouldBlock) => f.write_str(""), + Err(TryLockError::Poisoned(err)) => fmt::Debug::fmt(&err.into_inner().len(), f), + } + } + } + + /// Debug wrapper for the local runners. + struct LocalRunners<'a>(&'a RwLock>>>); + + impl fmt::Debug for LocalRunners<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_read() { + Ok(lock) => f + .debug_list() + .entries(lock.iter().map(|queue| queue.len())) + .finish(), + Err(TryLockError::WouldBlock) => f.write_str(""), + Err(TryLockError::Poisoned(_)) => f.write_str(""), + } + } + } + + /// Debug wrapper for the sleepers. + struct SleepCount<'a>(&'a Mutex); + + impl fmt::Debug for SleepCount<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_lock() { + Ok(lock) => fmt::Debug::fmt(&lock.count, f), + Err(TryLockError::WouldBlock) => f.write_str(""), + Err(TryLockError::Poisoned(_)) => f.write_str(""), + } + } + } + + f.debug_struct(name) + .field("active", &ActiveTasks(&state.active)) + .field("global_tasks", &state.queue.len()) + .field("local_runners", &LocalRunners(&state.local_queues)) + .field("sleepers", &SleepCount(&state.sleepers)) + .finish() +} + /// Runs a closure when dropped. -struct CallOnDrop(F); +struct CallOnDrop(F); -impl Drop for CallOnDrop { +impl Drop for CallOnDrop { fn drop(&mut self) { (self.0)(); } } + +pin_project! { + /// A wrapper around a future, running a closure when dropped. + struct AsyncCallOnDrop { + #[pin] + future: Fut, + cleanup: CallOnDrop, + } +} + +impl AsyncCallOnDrop { + fn new(future: Fut, cleanup: Cleanup) -> Self { + Self { + future, + cleanup: CallOnDrop(cleanup), + } + } +} + +impl Future for AsyncCallOnDrop { + type Output = Fut::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().future.poll(cx) + } +} + +fn _ensure_send_and_sync() { + use futures_lite::future::pending; + + fn is_send(_: T) {} + fn is_sync(_: T) {} + fn is_static(_: T) {} + + is_send::>(Executor::new()); + is_sync::>(Executor::new()); + + let ex = Executor::new(); + let state = ex.state(); + is_send(ex.run(pending::<()>())); + is_sync(ex.run(pending::<()>())); + is_send(ex.tick()); + is_sync(ex.tick()); + is_send(Executor::schedule(state)); + is_sync(Executor::schedule(state)); + is_static(Executor::schedule(state)); + + /// ```compile_fail + /// use async_executor::LocalExecutor; + /// use futures_lite::future::pending; + /// + /// fn is_send(_: T) {} + /// fn is_sync(_: T) {} + /// + /// is_send::>(LocalExecutor::new()); + /// is_sync::>(LocalExecutor::new()); + /// + /// let ex = LocalExecutor::new(); + /// is_send(ex.run(pending::<()>())); + /// is_sync(ex.run(pending::<()>())); + /// is_send(ex.tick()); + /// is_sync(ex.tick()); + /// ``` + fn _negative_test() {} +} diff --git a/src/static_executors.rs b/src/static_executors.rs new file mode 100644 index 0000000..3f9ec89 --- /dev/null +++ b/src/static_executors.rs @@ -0,0 +1,496 @@ +use crate::{debug_state, Executor, LocalExecutor, State}; +use alloc::boxed::Box; +use async_task::{Builder, Runnable, Task}; +use core::{ + cell::UnsafeCell, + fmt, + future::Future, + marker::PhantomData, + panic::{RefUnwindSafe, UnwindSafe}, + sync::atomic::Ordering, +}; +use slab::Slab; +use std::sync::PoisonError; + +impl Executor<'static> { + /// Consumes the [`Executor`] and intentionally leaks it. + /// + /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced + /// [`StaticExecutor`]'s functions are optimized to require fewer synchronizing operations + /// when spawning, running, and finishing tasks. + /// + /// `StaticExecutor` cannot be converted back into a `Executor`, so this operation is + /// irreversible without the use of unsafe. + /// + /// # Example + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::future; + /// + /// let ex = Executor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// future::block_on(ex.run(task)); + /// ``` + pub fn leak(self) -> &'static StaticExecutor { + let ptr = self.state.load(Ordering::Relaxed); + + let state: &'static State = if ptr.is_null() { + Box::leak(Box::new(State::new())) + } else { + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + unsafe { &*ptr } + }; + + core::mem::forget(self); + + let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner); + if !active.is_empty() { + // Reschedule all of the active tasks. + for waker in active.drain() { + waker.wake(); + } + // Overwrite to ensure that the slab is deallocated. + *active = Slab::new(); + } + + // SAFETY: StaticExecutor has the same memory layout as State as it's repr(transparent). + // The lifetime is not altered: 'static -> 'static. + let static_executor: &'static StaticExecutor = unsafe { core::mem::transmute(state) }; + static_executor + } +} + +impl LocalExecutor<'static> { + /// Consumes the [`LocalExecutor`] and intentionally leaks it. + /// + /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced + /// [`StaticLocalExecutor`]'s functions are optimized to require fewer synchronizing operations + /// when spawning, running, and finishing tasks. + /// + /// `StaticLocalExecutor` cannot be converted back into a `Executor`, so this operation is + /// irreversible without the use of unsafe. + /// + /// # Example + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// future::block_on(ex.run(task)); + /// ``` + pub fn leak(self) -> &'static StaticLocalExecutor { + let ptr = self.inner.state.load(Ordering::Relaxed); + + let state: &'static State = if ptr.is_null() { + Box::leak(Box::new(State::new())) + } else { + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + unsafe { &*ptr } + }; + + core::mem::forget(self); + + let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner); + if !active.is_empty() { + // Reschedule all of the active tasks. + for waker in active.drain() { + waker.wake(); + } + // Overwrite to ensure that the slab is deallocated. + *active = Slab::new(); + } + + // SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent). + // The lifetime is not altered: 'static -> 'static. + let static_executor: &'static StaticLocalExecutor = unsafe { core::mem::transmute(state) }; + static_executor + } +} + +/// A static-lifetimed async [`Executor`]. +/// +/// This is primarily intended to be used in [`static`] variables, or types intended to be used, or can be created in non-static +/// contexts via [`Executor::leak`]. +/// +/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. +/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases. +/// +/// As this type does not implement `Drop`, losing the handle to the executor or failing +/// to consistently drive the executor with [`StaticExecutor::tick`] or +/// [`StaticExecutor::run`] will cause the all spawned tasks to permanently leak. Any +/// tasks at the time will not be cancelled. +/// +/// [`static`]: https://doc.rust-lang.org/core/keyword.static.html +#[repr(transparent)] +pub struct StaticExecutor { + state: State, +} + +// SAFETY: Executor stores no thread local state that can be accessed via other thread. +unsafe impl Send for StaticExecutor {} +// SAFETY: Executor internally synchronizes all of it's operations internally. +unsafe impl Sync for StaticExecutor {} + +impl UnwindSafe for StaticExecutor {} +impl RefUnwindSafe for StaticExecutor {} + +impl fmt::Debug for StaticExecutor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_state(&self.state, "StaticExecutor", f) + } +} + +impl StaticExecutor { + /// Creates a new StaticExecutor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// ``` + pub const fn new() -> Self { + Self { + state: State::new(), + } + } + + /// Spawns a task onto the executor. + /// + /// Note: unlike [`Executor::spawn`], this function requires being called with a `'static` + /// borrow on the executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// + /// let task = EXECUTOR.spawn(async { + /// println!("Hello world"); + /// }); + /// ``` + pub fn spawn( + &'static self, + future: impl Future + Send + 'static, + ) -> Task { + let (runnable, task) = Builder::new() + .propagate_panic(true) + .spawn(|()| future, self.schedule()); + runnable.schedule(); + task + } + + /// Spawns a non-`'static` task onto the executor. + /// + /// ## Safety + /// + /// The caller must ensure that the returned task terminates + /// or is cancelled before the end of 'a. + pub unsafe fn spawn_scoped<'a, T: Send + 'a>( + &'static self, + future: impl Future + Send + 'a, + ) -> Task { + // SAFETY: + // + // - `future` is `Send` + // - `future` is not `'static`, but the caller guarantees that the + // task, and thus its `Runnable` must not live longer than `'a`. + // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; + runnable.schedule(); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// + /// assert!(!EXECUTOR.try_tick()); // no tasks to run + /// + /// let task = EXECUTOR.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// assert!(EXECUTOR.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + self.state.try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// use futures_lite::future; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// + /// let task = EXECUTOR.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// future::block_on(EXECUTOR.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + self.state.tick().await; + } + + /// Runs the executor until the given future completes. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// use futures_lite::future; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// + /// let task = EXECUTOR.spawn(async { 1 + 2 }); + /// let res = future::block_on(EXECUTOR.run(async { task.await * 2 })); + /// + /// assert_eq!(res, 6); + /// ``` + pub async fn run(&self, future: impl Future) -> T { + self.state.run(future).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state: &'static State = &self.state; + // TODO: If possible, push into the current local queue and notify the ticker. + move |runnable| { + let result = state.queue.push(runnable); + debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail. + state.notify(); + } + } +} + +impl Default for StaticExecutor { + fn default() -> Self { + Self::new() + } +} + +/// A static async [`LocalExecutor`] created from [`LocalExecutor::leak`]. +/// +/// This is primarily intended to be used in [`thread_local`] variables, or can be created in non-static +/// contexts via [`LocalExecutor::leak`]. +/// +/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. +/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases. +/// +/// As this type does not implement `Drop`, losing the handle to the executor or failing +/// to consistently drive the executor with [`StaticLocalExecutor::tick`] or +/// [`StaticLocalExecutor::run`] will cause the all spawned tasks to permanently leak. Any +/// tasks at the time will not be cancelled. +/// +/// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html +#[repr(transparent)] +pub struct StaticLocalExecutor { + state: State, + marker_: PhantomData>, +} + +impl UnwindSafe for StaticLocalExecutor {} +impl RefUnwindSafe for StaticLocalExecutor {} + +impl fmt::Debug for StaticLocalExecutor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_state(&self.state, "StaticLocalExecutor", f) + } +} + +impl StaticLocalExecutor { + /// Creates a new StaticLocalExecutor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticLocalExecutor; + /// + /// thread_local! { + /// static EXECUTOR: StaticLocalExecutor = StaticLocalExecutor::new(); + /// } + /// ``` + pub const fn new() -> Self { + Self { + state: State::new(), + marker_: PhantomData, + } + } + + /// Spawns a task onto the executor. + /// + /// Note: unlike [`LocalExecutor::spawn`], this function requires being called with a `'static` + /// borrow on the executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// ``` + pub fn spawn(&'static self, future: impl Future + 'static) -> Task { + let (runnable, task) = Builder::new() + .propagate_panic(true) + .spawn_local(|()| future, self.schedule()); + runnable.schedule(); + task + } + + /// Spawns a non-`'static` task onto the executor. + /// + /// ## Safety + /// + /// The caller must ensure that the returned task terminates + /// or is cancelled before the end of 'a. + pub unsafe fn spawn_scoped<'a, T: 'a>( + &'static self, + future: impl Future + 'a, + ) -> Task { + // SAFETY: + // + // - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only + // be called from the origin thread, ensuring that `future` and the executor + // share the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // - `future` is not `'static`, but the caller guarantees that the + // task, and thus its `Runnable` must not live longer than `'a`. + // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; + runnable.schedule(); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let ex = LocalExecutor::new().leak(); + /// assert!(!ex.try_tick()); // no tasks to run + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// assert!(ex.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + self.state.try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// future::block_on(ex.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + self.state.tick().await; + } + + /// Runs the executor until the given future completes. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { 1 + 2 }); + /// let res = future::block_on(ex.run(async { task.await * 2 })); + /// + /// assert_eq!(res, 6); + /// ``` + pub async fn run(&self, future: impl Future) -> T { + self.state.run(future).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state: &'static State = &self.state; + // TODO: If possible, push into the current local queue and notify the ticker. + move |runnable| { + let result = state.queue.push(runnable); + debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail. + state.notify(); + } + } +} + +impl Default for StaticLocalExecutor { + fn default() -> Self { + Self::new() + } +} diff --git a/tests/different_executors.rs b/tests/different_executors.rs new file mode 100644 index 0000000..afef3be --- /dev/null +++ b/tests/different_executors.rs @@ -0,0 +1,34 @@ +use async_executor::LocalExecutor; +use futures_lite::future::{block_on, pending, poll_once}; +use futures_lite::pin; +use std::cell::Cell; + +#[test] +fn shared_queue_slot() { + block_on(async { + let was_polled = Cell::new(false); + let future = async { + was_polled.set(true); + pending::<()>().await; + }; + + let ex1 = LocalExecutor::new(); + let ex2 = LocalExecutor::new(); + + // Start the futures for running forever. + let (run1, run2) = (ex1.run(pending::<()>()), ex2.run(pending::<()>())); + pin!(run1); + pin!(run2); + assert!(poll_once(run1.as_mut()).await.is_none()); + assert!(poll_once(run2.as_mut()).await.is_none()); + + // Spawn the future on executor one and then poll executor two. + ex1.spawn(future).detach(); + assert!(poll_once(run2).await.is_none()); + assert!(!was_polled.get()); + + // Poll the first one. + assert!(poll_once(run1).await.is_none()); + assert!(was_polled.get()); + }); +} diff --git a/tests/drop.rs b/tests/drop.rs index 7293cae..5d089b5 100644 --- a/tests/drop.rs +++ b/tests/drop.rs @@ -1,3 +1,4 @@ +#[cfg(not(miri))] use std::mem; use std::panic::catch_unwind; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -11,7 +12,7 @@ use once_cell::sync::Lazy; #[test] fn executor_cancels_everything() { static DROP: AtomicUsize = AtomicUsize::new(0); - static WAKER: Lazy>> = Lazy::new(|| Default::default()); + static WAKER: Lazy>> = Lazy::new(Default::default); let ex = Executor::new(); @@ -38,10 +39,11 @@ fn executor_cancels_everything() { assert_eq!(DROP.load(Ordering::SeqCst), 1); } +#[cfg(not(miri))] #[test] fn leaked_executor_leaks_everything() { static DROP: AtomicUsize = AtomicUsize::new(0); - static WAKER: Lazy>> = Lazy::new(|| Default::default()); + static WAKER: Lazy>> = Lazy::new(Default::default); let ex = Executor::new(); @@ -119,6 +121,23 @@ fn drop_finished_task_and_then_drop_executor() { assert_eq!(DROP.load(Ordering::SeqCst), 1); } +#[test] +fn iterator_panics_mid_run() { + let ex = Executor::new(); + + let panic = std::panic::catch_unwind(|| { + let mut handles = vec![]; + ex.spawn_many( + (0..50).map(|i| if i == 25 { panic!() } else { future::ready(i) }), + &mut handles, + ) + }); + assert!(panic.is_err()); + + let task = ex.spawn(future::ready(0)); + assert_eq!(future::block_on(ex.run(task)), 0); +} + struct CallOnDrop(F); impl Drop for CallOnDrop { diff --git a/tests/larger_tasks.rs b/tests/larger_tasks.rs new file mode 100644 index 0000000..cc57988 --- /dev/null +++ b/tests/larger_tasks.rs @@ -0,0 +1,99 @@ +//! Test for larger tasks. + +use async_executor::Executor; +use futures_lite::future::{self, block_on}; +use futures_lite::prelude::*; + +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +fn do_run>(mut f: impl FnMut(Arc>) -> Fut) { + // This should not run for longer than two minutes. + #[cfg(not(miri))] + let _stop_timeout = { + let (stop_timeout, stopper) = async_channel::bounded::<()>(1); + thread::spawn(move || { + block_on(async move { + let timeout = async { + async_io::Timer::after(Duration::from_secs(2 * 60)).await; + eprintln!("test timed out after 2m"); + std::process::exit(1) + }; + + let _ = stopper.recv().or(timeout).await; + }) + }); + stop_timeout + }; + + let ex = Arc::new(Executor::new()); + + // Test 1: Use the `run` command. + block_on(ex.run(f(ex.clone()))); + + // Test 2: Loop on `tick`. + block_on(async { + let ticker = async { + loop { + ex.tick().await; + } + }; + + f(ex.clone()).or(ticker).await + }); + + // Test 3: Run on many threads. + thread::scope(|scope| { + let (_signal, shutdown) = async_channel::bounded::<()>(1); + + for _ in 0..16 { + let shutdown = shutdown.clone(); + let ex = &ex; + scope.spawn(move || block_on(ex.run(shutdown.recv()))); + } + + block_on(f(ex.clone())); + }); + + // Test 4: Tick loop on many threads. + thread::scope(|scope| { + let (_signal, shutdown) = async_channel::bounded::<()>(1); + + for _ in 0..16 { + let shutdown = shutdown.clone(); + let ex = &ex; + scope.spawn(move || { + block_on(async move { + let ticker = async { + loop { + ex.tick().await; + } + }; + + shutdown.recv().or(ticker).await + }) + }); + } + + block_on(f(ex.clone())); + }); +} + +#[test] +fn smoke() { + do_run(|ex| async move { ex.spawn(async {}).await }); +} + +#[test] +fn yield_now() { + do_run(|ex| async move { ex.spawn(future::yield_now()).await }) +} + +#[test] +fn timer() { + do_run(|ex| async move { + ex.spawn(async_io::Timer::after(Duration::from_millis(5))) + .await; + }) +} diff --git a/tests/local_queue.rs b/tests/local_queue.rs new file mode 100644 index 0000000..4678366 --- /dev/null +++ b/tests/local_queue.rs @@ -0,0 +1,24 @@ +use async_executor::Executor; +use futures_lite::{future, pin}; + +#[test] +fn two_queues() { + future::block_on(async { + // Create an executor with two runners. + let ex = Executor::new(); + let (run1, run2) = ( + ex.run(future::pending::<()>()), + ex.run(future::pending::<()>()), + ); + let mut run1 = Box::pin(run1); + pin!(run2); + + // Poll them both. + assert!(future::poll_once(run1.as_mut()).await.is_none()); + assert!(future::poll_once(run2.as_mut()).await.is_none()); + + // Drop the first one, which should leave the local queue in the `None` state. + drop(run1); + assert!(future::poll_once(run2.as_mut()).await.is_none()); + }); +} diff --git a/tests/panic_prop.rs b/tests/panic_prop.rs new file mode 100644 index 0000000..eab4901 --- /dev/null +++ b/tests/panic_prop.rs @@ -0,0 +1,14 @@ +use async_executor::Executor; +use futures_lite::{future, prelude::*}; + +#[test] +fn test_panic_propagation() { + let ex = Executor::new(); + let task = ex.spawn(async { panic!("should be caught by the task") }); + + // Running the executor should not panic. + assert!(ex.try_tick()); + + // Polling the task should. + assert!(future::block_on(task.catch_unwind()).is_err()); +} diff --git a/tests/spawn_many.rs b/tests/spawn_many.rs new file mode 100644 index 0000000..cebe2d3 --- /dev/null +++ b/tests/spawn_many.rs @@ -0,0 +1,45 @@ +use async_executor::{Executor, LocalExecutor}; +use futures_lite::future; + +#[cfg(not(miri))] +const READY_COUNT: usize = 50_000; +#[cfg(miri)] +const READY_COUNT: usize = 505; + +#[test] +fn spawn_many() { + future::block_on(async { + let ex = Executor::new(); + + // Spawn a lot of tasks. + let mut tasks = vec![]; + ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks); + + // Run all of the tasks in parallel. + ex.run(async move { + for (i, task) in tasks.into_iter().enumerate() { + assert_eq!(task.await, i); + } + }) + .await; + }); +} + +#[test] +fn spawn_many_local() { + future::block_on(async { + let ex = LocalExecutor::new(); + + // Spawn a lot of tasks. + let mut tasks = vec![]; + ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks); + + // Run all of the tasks in parallel. + ex.run(async move { + for (i, task) in tasks.into_iter().enumerate() { + assert_eq!(task.await, i); + } + }) + .await; + }); +}