From edefd937994014c319cf732482253370f46dde7f Mon Sep 17 00:00:00 2001 From: "Spencer C. Imbleau" Date: Sun, 1 Dec 2024 16:37:38 -0500 Subject: [PATCH] feat: bevy_async_task 0.3 --- .clippy.toml | 8 ++ CHANGELOG.md | 22 ++++ Cargo.toml | 79 ++++++++++++-- README.md | 37 +++---- examples/blocking.rs | 22 ---- examples/cross_system.rs | 47 +++++++++ examples/pool.rs | 11 +- examples/run_wasm/Cargo.toml | 1 + examples/simple.rs | 32 +++--- examples/timeout.rs | 54 ++++++++-- src/error.rs | 8 ++ src/lib.rs | 15 +-- src/receiver.rs | 28 ++--- src/task.rs | 195 +++++++++++++++++++++-------------- src/task_pool.rs | 20 ++-- src/task_runner.rs | 42 +++----- 16 files changed, 400 insertions(+), 221 deletions(-) create mode 100644 .clippy.toml delete mode 100644 examples/blocking.rs create mode 100644 examples/cross_system.rs create mode 100644 src/error.rs diff --git a/.clippy.toml b/.clippy.toml new file mode 100644 index 0000000..6259fd9 --- /dev/null +++ b/.clippy.toml @@ -0,0 +1,8 @@ +# LINEBENDER LINT SET - .clippy.toml - v1 +# See https://linebender.org/wiki/canonical-lints/ +# The default Clippy value is capped at 8 bytes, which was chosen to improve performance on 32-bit. +# Given that we are building for the future and even low-end mobile phones have 64-bit CPUs, +# it makes sense to optimize for 64-bit and accept the performance hits on 32-bit. +# 16 bytes is the number of bytes that fits into two 64-bit CPU registers. +trivial-copy-size-limit = 16 +# END LINEBENDER LINT SET diff --git a/CHANGELOG.md b/CHANGELOG.md index aa152b5..6afb9e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,28 @@ Subheadings to categorize changes are `added, changed, deprecated, removed, fixe ## Unreleased +## 0.3.0 + +### Added + +- Many types now implement `Debug`. + +### Changed + +- Updated to bevy 0.15. +- `AsyncTaskStatus` was replaced with `std::task::Poll`. If you wish to check if a task runner or pool is idle, you can still do so with `.is_idle()`/`.is_idle()`. +- Replaced `TimeoutError` with a non-exhaustive `TaskError` for future proofing. + +### Removed + +- `blocking_recv()` functions were removed. You should now use `bevy::tasks::block_on(fut)`. +- `AsyncTask.with_timeout()` until further rework is done to return this functionality. Please use `AsyncTask::new_with_duration(Dur, F)` instead. + +### Fixed + +- Timeouts now work correctly on wasm32. +- `AsyncReceiver` now uses an `AtomicWaker` to ensure the sender is never dropped before receiving. + ## 0.2.0 ### Changed diff --git a/Cargo.toml b/Cargo.toml index 0d50a3c..1bd73e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,30 +9,89 @@ license = "Apache-2.0 OR MIT" repository = "https://github.com/loopystudios/bevy_async_task" authors = ["Spencer C. Imbleau"] keywords = ["gamedev", "async"] -version = "0.2.0" +categories = ["game-development", "asynchronous"] +version = "0.3.0" edition = "2021" +[lints] +# LINEBENDER LINT SET - Cargo.toml - v2 +# See https://linebender.org/wiki/canonical-lints/ +rust.keyword_idents_2024 = "forbid" +rust.non_ascii_idents = "forbid" +rust.non_local_definitions = "forbid" +rust.unsafe_op_in_unsafe_fn = "forbid" +rust.elided_lifetimes_in_paths = "warn" +rust.let_underscore_drop = "warn" +rust.missing_debug_implementations = "warn" +rust.missing_docs = "warn" +rust.single_use_lifetimes = "warn" +rust.trivial_numeric_casts = "warn" +rust.unexpected_cfgs = "warn" +rust.unit_bindings = "warn" +rust.unnameable_types = "warn" +rust.unreachable_pub = "warn" +rust.unused_import_braces = "warn" +rust.unused_lifetimes = "warn" +rust.unused_macro_rules = "warn" +rust.unused_qualifications = "warn" +rust.variant_size_differences = "warn" +clippy.allow_attributes = "warn" +clippy.allow_attributes_without_reason = "warn" +clippy.cast_possible_truncation = "warn" +clippy.collection_is_never_read = "warn" +clippy.dbg_macro = "warn" +clippy.debug_assert_with_mut_call = "warn" +clippy.doc_markdown = "warn" +clippy.exhaustive_enums = "warn" +clippy.fn_to_numeric_cast_any = "forbid" +clippy.infinite_loop = "warn" +clippy.large_include_file = "warn" +clippy.large_stack_arrays = "warn" +clippy.match_same_arms = "warn" +clippy.mismatching_type_param_order = "warn" +clippy.missing_assert_message = "warn" +clippy.missing_errors_doc = "warn" +clippy.missing_fields_in_debug = "warn" +clippy.missing_panics_doc = "warn" +clippy.partial_pub_fields = "warn" +clippy.return_self_not_must_use = "warn" +clippy.same_functions_in_if_condition = "warn" +clippy.semicolon_if_nothing_returned = "warn" +clippy.shadow_unrelated = "warn" +clippy.should_panic_without_expect = "warn" +clippy.todo = "warn" +clippy.trivially_copy_pass_by_ref = "warn" +clippy.unseparated_literal_suffix = "warn" +clippy.use_self = "warn" +clippy.wildcard_imports = "warn" +clippy.cargo_common_metadata = "warn" +clippy.negative_feature_names = "warn" +clippy.redundant_feature_names = "warn" +clippy.wildcard_dependencies = "warn" +# END LINEBENDER LINT SET + [lib] [dependencies] -tokio = { version = "1.38.0", default-features = false, features = ["sync"] } -bevy = { version = "0.14.0", default-features = false, features = [ +tokio = { version = "1.41.1", default-features = false, features = ["sync"] } +bevy = { version = "0.15.0", default-features = false, features = [ "multi_threaded", ] } -async-std = "1.12.0" +async-std = "1.13.0" +thiserror = "2.0.3" +futures = "0.3.31" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] async-compat = "0.2.4" [dev-dependencies] -futures = "0.3.30" futures-timer = "3.0.3" [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -tokio = { version = "1.38.0", features = ["full"] } +tokio = { version = "1.41.1", features = ["full"] } [target.'cfg(target_arch = "wasm32")'.dev-dependencies] -wasm-bindgen-futures = "0.4.42" -wasm-bindgen-test = "0.3.42" -js-sys = "0.3.69" -wasm-bindgen = "0.2.92" +wasm-bindgen-futures = "0.4.47" +wasm-bindgen-test = "0.3.47" +js-sys = "0.3.74" +wasm-bindgen = "0.2.97" diff --git a/README.md b/README.md index 84e4351..3fe31f4 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ A minimum crate for ergonomic abstractions to async programming in Bevy. There i -Bevy Async Task provides Bevy system parameters to run asyncronous tasks in the background with timeout support and future output in the same system. It also provides syntactic sugar to reduce boiler plate when blocking on futures within synchronous contexts. +Bevy Async Task provides Bevy system parameters to run asynchronous tasks in the background on web and native with timeouts and output capture. ## Bevy version support @@ -46,18 +46,19 @@ async fn long_task() -> u32 { 5 } -fn my_system(mut task_executor: AsyncTaskRunner) { - match task_executor.poll() { - AsyncTaskStatus::Idle => { - task_executor.start(long_task()); - info!("Started new task!"); - } - AsyncTaskStatus::Pending => { - // - } - AsnycTaskStatus::Finished(v) => { +fn my_system(mut task_runner: AsyncTaskRunner) { + if task_runner.is_idle() { + task_executor.start(long_task()); + info!("Started!"); + } + + match task_runner.poll() { + Poll::Ready(v) => { info!("Received {v}"); } + Poll::Pending => { + // Waiting... + } } } ``` @@ -84,19 +85,7 @@ fn my_system(mut task_pool: AsyncTaskPool) { } ``` -Also, you may use timeouts or block on an `AsyncTask`: - -```rust -// Blocking: -let task = AsyncTask::new(async { 5 }); -assert_eq!(5, task.blocking_recv()); - -// Timeout: -let task = AsyncTask::<()>::pending().with_timeout(Duration::from_millis(10)); -assert!(task.blocking_recv().is_err()); -``` - -Need to steer manually? Break the task into parts. +Need to steer manually? Break the task into parts. Also see our [`cross_system` example](./examples/cross_system.rs). ```rust let task = AsyncTask::new(async move { diff --git a/examples/blocking.rs b/examples/blocking.rs deleted file mode 100644 index 42ebd8d..0000000 --- a/examples/blocking.rs +++ /dev/null @@ -1,22 +0,0 @@ -use bevy::{app::PanicHandlerPlugin, log::LogPlugin, prelude::*}; -use bevy_async_task::{AsyncTask, AsyncTaskRunner}; - -/// You can block with a task runner -fn system1(mut task_executor: AsyncTaskRunner) { - let result = task_executor.blocking_recv(async { 1 }); - info!("Received {result}"); -} - -/// Or block on a task, without the need of a system parameter. -fn system2() { - let result = AsyncTask::new(async { 2 }).blocking_recv(); - info!("Received {result}"); -} - -pub fn main() { - App::new() - .add_plugins((MinimalPlugins, LogPlugin::default(), PanicHandlerPlugin)) - .add_systems(Startup, system2) - .add_systems(Startup, system1) - .run(); -} diff --git a/examples/cross_system.rs b/examples/cross_system.rs new file mode 100644 index 0000000..ccf57ac --- /dev/null +++ b/examples/cross_system.rs @@ -0,0 +1,47 @@ +//! Cross system example - This example shows how to start a task from one system and poll it from +//! another through a resource. + +use async_std::task::sleep; +use bevy::{app::PanicHandlerPlugin, log::LogPlugin, prelude::*, tasks::AsyncComputeTaskPool}; +use bevy_async_task::{AsyncReceiver, AsyncTask}; +use std::time::Duration; + +#[derive(Resource, DerefMut, Deref, Default)] +struct MyTask(Option>); + +/// An async task that takes time to compute! +async fn long_task() -> u32 { + sleep(Duration::from_millis(1000)).await; + 5 +} + +fn system1_start(mut my_task: ResMut<'_, MyTask>) { + let (fut, receiver) = AsyncTask::new(long_task()).into_parts(); + my_task.replace(receiver); + AsyncComputeTaskPool::get().spawn_local(fut).detach(); + info!("Started!"); +} + +fn system2_poll(mut my_task: ResMut<'_, MyTask>) { + let Some(receiver) = my_task.0.as_mut() else { + return; + }; + match receiver.try_recv() { + Some(v) => { + info!("Received {v}"); + } + None => { + // Waiting... + } + } +} + +/// Entry point +pub fn main() { + App::new() + .init_resource::() + .add_plugins((MinimalPlugins, LogPlugin::default(), PanicHandlerPlugin)) + .add_systems(Startup, system1_start) + .add_systems(Update, system2_poll) + .run(); +} diff --git a/examples/pool.rs b/examples/pool.rs index a19078e..31d6116 100644 --- a/examples/pool.rs +++ b/examples/pool.rs @@ -1,9 +1,11 @@ +//! Task pool example - this demonstrates running several async tasks concurrently. + use async_std::task::sleep; use bevy::{app::PanicHandlerPlugin, log::LogPlugin, prelude::*}; -use bevy_async_task::{AsyncTaskPool, AsyncTaskStatus}; -use std::time::Duration; +use bevy_async_task::AsyncTaskPool; +use std::{task::Poll, time::Duration}; -fn system1(mut task_pool: AsyncTaskPool) { +fn system1(mut task_pool: AsyncTaskPool<'_, u64>) { if task_pool.is_idle() { info!("Queueing 5 tasks..."); for i in 1..=5 { @@ -15,12 +17,13 @@ fn system1(mut task_pool: AsyncTaskPool) { } for status in task_pool.iter_poll() { - if let AsyncTaskStatus::Finished(t) = status { + if let Poll::Ready(t) = status { info!("Received {t}"); } } } +/// Entry point pub fn main() { App::new() .add_plugins((MinimalPlugins, LogPlugin::default(), PanicHandlerPlugin)) diff --git a/examples/run_wasm/Cargo.toml b/examples/run_wasm/Cargo.toml index 0f5c799..f2e0783 100644 --- a/examples/run_wasm/Cargo.toml +++ b/examples/run_wasm/Cargo.toml @@ -1,5 +1,6 @@ [package] name = "run_wasm" +edition = "2021" publish = false [dependencies] diff --git a/examples/simple.rs b/examples/simple.rs index 6cd8363..b68146c 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -1,7 +1,9 @@ +//! Simple example - this demonstrates running one async task continuously. + use async_std::task::sleep; use bevy::{app::PanicHandlerPlugin, log::LogPlugin, prelude::*}; -use bevy_async_task::{AsyncTaskRunner, AsyncTaskStatus}; -use std::time::Duration; +use bevy_async_task::AsyncTaskRunner; +use std::{task::Poll, time::Duration}; /// An async task that takes time to compute! async fn long_task() -> u32 { @@ -9,24 +11,26 @@ async fn long_task() -> u32 { 5 } -fn my_system(mut task_executor: AsyncTaskRunner) { - match task_executor.poll() { - AsyncTaskStatus::Idle => { - // Start an async task! - task_executor.start(long_task()); - // Closures also work: - // task_executor.start(async { 5 }); - info!("Started!"); +fn my_system(mut task_runner: AsyncTaskRunner<'_, u32>) { + if task_runner.is_idle() { + // Start an async task! + task_runner.start(long_task()); + // Closures also work: + // task_executor.start(async { 5 }); + info!("Started!"); + } + + match task_runner.poll() { + Poll::Ready(v) => { + info!("Received {v}"); } - AsyncTaskStatus::Pending => { + Poll::Pending => { // Waiting... } - AsyncTaskStatus::Finished(v) => { - info!("Received {v}"); - } } } +/// Entry point pub fn main() { App::new() .add_plugins((MinimalPlugins, LogPlugin::default(), PanicHandlerPlugin)) diff --git a/examples/timeout.rs b/examples/timeout.rs index 6856a31..8a40124 100644 --- a/examples/timeout.rs +++ b/examples/timeout.rs @@ -1,20 +1,54 @@ +//! Timeout example - this demonstrates running one task with a timeout continuously. + +use async_std::{future::pending, task::sleep}; use bevy::{app::PanicHandlerPlugin, log::LogPlugin, prelude::*}; -use bevy_async_task::AsyncTask; -use std::time::Duration; +use bevy_async_task::{AsyncTask, AsyncTaskRunner, TaskError}; +use std::{task::Poll, time::Duration}; + +fn system_does_timeout(mut task_executor: AsyncTaskRunner<'_, Result<(), TaskError>>) { + if task_executor.is_idle() { + let timeout_task = AsyncTask::new_with_timeout(Duration::from_secs(1), pending()); + task_executor.start(timeout_task); + info!("Started A!"); + } + + match task_executor.poll() { + Poll::Ready(Err(TaskError::Timeout(_))) => { + info!("Timeout on A!"); + } + Poll::Pending => { + // Waiting... + } + _ => unreachable!(), + } +} -/// Use a timeout -fn system() { - AsyncTask::<()>::pending() - .with_timeout(Duration::from_millis(1000)) - .blocking_recv() - .unwrap_err(); +fn system_doesnt_timeout(mut task_executor: AsyncTaskRunner<'_, Result>) { + if task_executor.is_idle() { + let timeout_task = AsyncTask::new_with_timeout(Duration::from_secs(10), async { + sleep(Duration::from_secs(2)).await; + 5 + }); + task_executor.start(timeout_task); + info!("Started B!"); + } - info!("Timeout!"); + match task_executor.poll() { + Poll::Ready(Ok(v)) => { + info!("Received B: {v}!"); + } + Poll::Pending => { + // Waiting... + } + _ => unreachable!(), + } } +/// Entry point pub fn main() { App::new() .add_plugins((MinimalPlugins, LogPlugin::default(), PanicHandlerPlugin)) - .add_systems(Update, system) + .add_systems(Update, system_doesnt_timeout) + .add_systems(Update, system_does_timeout) .run(); } diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..114128b --- /dev/null +++ b/src/error.rs @@ -0,0 +1,8 @@ +/// Errors that may occur from running asynchronous tasks. +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub enum TaskError { + /// Timeout occurred. + #[error(transparent)] + Timeout(#[from] async_std::future::TimeoutError), +} diff --git a/src/lib.rs b/src/lib.rs index 68929c0..05f16be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,22 +1,13 @@ -#![deny(missing_docs)] //! Ergonomic abstractions to async programming in Bevy for all platforms. +mod error; mod receiver; mod task; mod task_pool; mod task_runner; +pub use error::TaskError; pub use receiver::AsyncReceiver; -pub use task::{AsyncTask, TimeoutError}; +pub use task::AsyncTask; pub use task_pool::AsyncTaskPool; pub use task_runner::AsyncTaskRunner; - -/// A poll status for an [`AsyncTask`]. -pub enum AsyncTaskStatus { - /// No task is currently being polled. - Idle, - /// The task is currently working. - Pending, - /// The task is finished. - Finished(T), -} diff --git a/src/receiver.rs b/src/receiver.rs index 27b0083..da2ac74 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -1,27 +1,29 @@ -use tokio::sync::oneshot::{self, error::TryRecvError}; +use futures::task::AtomicWaker; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; +use tokio::sync::oneshot::{self}; /// A channel that catches an [`AsyncTask`](crate::AsyncTask) result. +#[derive(Debug)] pub struct AsyncReceiver { - pub(crate) received: bool, - pub(crate) buffer: oneshot::Receiver, + pub(crate) received: Arc, + pub(crate) waker: Arc, // Waker to wake the sender + pub(crate) receiver: oneshot::Receiver, } impl AsyncReceiver { /// Poll the current thread waiting for the async result. - /// - /// # Panics - /// Panics if the sender was dropped without sending pub fn try_recv(&mut self) -> Option { - match self.buffer.try_recv() { + match self.receiver.try_recv() { Ok(t) => { - self.received = true; - self.buffer.close(); + self.receiver.close(); + self.received.store(true, Ordering::Relaxed); + self.waker.wake(); // Wake the sender to drop Some(t) } - Err(TryRecvError::Empty) => None, - Err(TryRecvError::Closed) => { - panic!("the sender was dropped without sending") - } + Err(_) => None, } } } diff --git a/src/task.rs b/src/task.rs index 5a49f75..4c6d487 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,14 +1,22 @@ -use crate::AsyncReceiver; +use crate::{AsyncReceiver, TaskError}; #[cfg(not(target_arch = "wasm32"))] use async_compat::CompatExt; use async_std::future::timeout; use bevy::utils::{ConditionalSend, ConditionalSendFuture}; -use std::{future::pending, pin::Pin, time::Duration}; +use futures::task::AtomicWaker; +use std::{ + fmt::Debug, + future::pending, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + task::Poll, + time::Duration, +}; use tokio::sync::oneshot; -// Re-export timeout error -pub use async_std::future::TimeoutError; - /// A wrapper type around an async future. The future may be executed /// asynchronously by an [`AsyncTaskRunner`](crate::AsyncTaskRunner) or /// [`AsyncTaskPool`](crate::AsyncTaskPool), or it may be blocked on the current @@ -18,30 +26,25 @@ pub struct AsyncTask { receiver: AsyncReceiver, } +impl Debug for AsyncTask +where + T: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AsyncTask") + .field("fut", &"") + .field("receiver", &self.receiver) + .finish() + } +} + impl AsyncTask where T: ConditionalSend + 'static, { /// Never resolves to a value or finishes. - pub fn pending() -> AsyncTask { - AsyncTask::new(pending()) - } - - /// Add a timeout to the task. - pub fn with_timeout(mut self, dur: Duration) -> AsyncTask> { - let (tx, rx) = oneshot::channel(); - let new_fut = async move { - let result = timeout(dur, self.fut) - .await - .map(|_| self.receiver.try_recv().unwrap()); - _ = tx.send(result); - }; - let fut = Box::pin(new_fut); - let receiver = AsyncReceiver { - received: false, - buffer: rx, - }; - AsyncTask::> { fut, receiver } + pub fn pending() -> Self { + Self::new(pending()) } } @@ -53,54 +56,81 @@ impl AsyncTask { F::Output: ConditionalSend + 'static, { let (tx, rx) = oneshot::channel(); - let new_fut = async move { - #[cfg(target_arch = "wasm32")] - let result = fut.await; - #[cfg(not(target_arch = "wasm32"))] - let result = fut.compat().await; - _ = tx.send(result); + let waker = Arc::new(AtomicWaker::new()); + let received = Arc::new(AtomicBool::new(false)); + let fut = { + let waker = waker.clone(); + let received = received.clone(); + async move { + #[cfg(target_arch = "wasm32")] + let result = fut.await; + #[cfg(not(target_arch = "wasm32"))] + let result = fut.compat().await; + if let Ok(()) = tx.send(result) { + // Wait for the receiver to get the result before dropping. + futures::future::poll_fn(|cx| { + waker.register(cx.waker()); + if received.load(Ordering::Relaxed) { + Poll::Ready(()) + } else { + Poll::Pending::<()> + } + }) + .await; + } + } }; - let fut = Box::pin(new_fut); + let fut = Box::pin(fut); let receiver = AsyncReceiver { - received: false, - buffer: rx, + received, + waker, + receiver: rx, }; Self { fut, receiver } } /// Create an async task from a future with a timeout. - pub fn new_with_timeout(dur: Duration, fut: F) -> AsyncTask> + pub fn new_with_timeout(dur: Duration, fut: F) -> AsyncTask> where F: ConditionalSendFuture + 'static, F::Output: ConditionalSend + 'static, { let (tx, rx) = oneshot::channel(); - let new_fut = async move { - #[cfg(target_arch = "wasm32")] - let result = timeout(dur, fut).await; - #[cfg(not(target_arch = "wasm32"))] - let result = timeout(dur, fut.compat()).await; - _ = tx.send(result); + let waker = Arc::new(AtomicWaker::new()); + let received = Arc::new(AtomicBool::new(false)); + let fut = { + let waker = waker.clone(); + let received = received.clone(); + async move { + #[cfg(target_arch = "wasm32")] + let result = timeout(dur, fut).await.map_err(TaskError::Timeout); + #[cfg(not(target_arch = "wasm32"))] + let result = timeout(dur, fut.compat()).await.map_err(TaskError::Timeout); + if let Ok(()) = tx.send(result) { + // Wait for the receiver to get the result before dropping. + futures::future::poll_fn(|cx| { + waker.register(cx.waker()); + if received.load(Ordering::Relaxed) { + Poll::Ready(()) + } else { + Poll::Pending::<()> + } + }) + .await; + } + } }; - let fut = Box::pin(new_fut); + let fut = Box::pin(fut); let receiver = AsyncReceiver { - received: false, - buffer: rx, + received, + waker, + receiver: rx, }; - AsyncTask::> { fut, receiver } - } - - /// Block awaiting the task result. Can only be used outside of async - /// contexts. - pub fn blocking_recv(self) -> T { - let (fut, mut rx) = self.into_parts(); - bevy::tasks::block_on(fut); - rx.buffer.try_recv().unwrap() + AsyncTask::> { fut, receiver } } /// Break apart the task into a runnable future and the receiver. The /// receiver is used to catch the output when the runnable is polled. - #[allow(clippy::type_complexity)] #[must_use] pub fn into_parts( self, @@ -118,7 +148,7 @@ where Fnc::Output: ConditionalSend + 'static, { fn from(value: Fnc) -> Self { - AsyncTask::new(value) + Self::new(value) } } @@ -147,12 +177,6 @@ mod test { } } - #[test] - fn test_blocking_recv() { - let task = AsyncTask::new(async move { 5 }); - assert_eq!(5, task.blocking_recv()); - } - #[tokio::test] async fn test_try_recv() { let task = AsyncTask::new(async move { 5 }); @@ -202,8 +226,12 @@ mod test { select! { _ = (&mut fetch).fuse() => { if let Some(v) = rx.try_recv() { - assert!(v.is_err(), "timeout should have triggered!"); - break 'result; + if matches!(v, Err(TaskError::Timeout(_))) { + // Good ending + break 'result; + } else { + panic!("timeout should have triggered!"); + } } else { // Reset the clock fetch.reset(Duration::from_millis(1)); @@ -216,7 +244,7 @@ mod test { #[tokio::test] async fn test_with_timeout() { - let task = AsyncTask::<()>::pending().with_timeout(Duration::from_millis(5)); + let task = AsyncTask::new_with_timeout(Duration::from_millis(5), pending::<()>()); let (fut, mut rx) = task.into_parts(); assert_eq!(None, rx.try_recv()); @@ -232,8 +260,12 @@ mod test { select! { _ = (&mut fetch).fuse() => { if let Some(v) = rx.try_recv() { - assert!(v.is_err(), "timeout should have triggered!"); - break 'result; + if matches!(v, Err(TaskError::Timeout(_))) { + // Good ending + break 'result; + } else { + panic!("timeout should have triggered!"); + } } else { // Reset the clock fetch.reset(Duration::from_millis(1)); @@ -271,13 +303,9 @@ mod test { Ok(JsValue::NULL) })) .await - .unwrap(); - } - - #[wasm_bindgen_test] - fn test_blocking_recv() { - let task = AsyncTask::new(async move { 5 }); - assert_eq!(5, task.blocking_recv()); + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); } #[wasm_bindgen_test] @@ -293,7 +321,9 @@ mod test { Ok(JsValue::NULL) })) .await - .unwrap(); + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); // Spawn assert_eq!(Some(5), rx.try_recv()); @@ -312,16 +342,20 @@ mod test { Ok(JsValue::NULL) })) .await - .unwrap(); + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); // Spawn - let v = rx.try_recv().expect("future loaded no value"); + let v = rx.try_recv().unwrap_or_else(|| { + panic!("expected result after await"); + }); assert!(v.is_err(), "timeout should have triggered!"); } #[wasm_bindgen_test] async fn test_with_timeout() { - let task = AsyncTask::<()>::pending().with_timeout(Duration::from_millis(5)); + let task = AsyncTask::new_with_timeout(Duration::from_millis(5), pending::<()>()); let (fut, mut rx) = task.into_parts(); assert_eq!(None, rx.try_recv()); @@ -332,10 +366,15 @@ mod test { Ok(JsValue::NULL) })) .await - .unwrap(); + .unwrap_or_else(|e| { + panic!("awaiting promise failed: {e:?}"); + }); // Spawn - let v = rx.try_recv().expect("future loaded no value"); + let v = rx.try_recv().unwrap_or_else(|| { + panic!("expected result after await"); + }); + assert!(matches!(v, Err(TaskError::Timeout(_))), ""); assert!(v.is_err(), "timeout should have triggered!"); } } diff --git a/src/task_pool.rs b/src/task_pool.rs index 6295f34..4e6c02a 100644 --- a/src/task_pool.rs +++ b/src/task_pool.rs @@ -1,4 +1,4 @@ -use crate::{AsyncReceiver, AsyncTask, AsyncTaskStatus}; +use crate::{AsyncReceiver, AsyncTask}; use bevy::{ ecs::{ component::Tick, @@ -9,12 +9,14 @@ use bevy::{ tasks::AsyncComputeTaskPool, utils::synccell::SyncCell, }; +use std::task::Poll; /// A Bevy [`SystemParam`] to execute many similar [`AsyncTask`]s in the /// background simultaneously. +#[derive(Debug)] pub struct AsyncTaskPool<'s, T>(pub(crate) &'s mut Vec>>); -impl<'s, T> AsyncTaskPool<'s, T> { +impl AsyncTaskPool<'_, T> { /// Returns whether the task pool is idle. pub fn is_idle(&self) -> bool { self.0.is_empty() || !self.0.iter().any(Option::is_some) @@ -32,20 +34,20 @@ impl<'s, T> AsyncTaskPool<'s, T> { /// Iterate and poll the task pool for the current task statuses. A task can /// yield `Idle`, `Pending`, or `Finished(T)`. - pub fn iter_poll(&mut self) -> impl Iterator> { + pub fn iter_poll(&mut self) -> impl Iterator> { let mut statuses = vec![]; self.0.retain_mut(|task| match task { Some(rx) => { if let Some(v) = rx.try_recv() { - statuses.push(AsyncTaskStatus::Finished(v)); + statuses.push(Poll::Ready(v)); false } else { - statuses.push(AsyncTaskStatus::Pending); + statuses.push(Poll::Pending); true } } None => { - statuses.push(AsyncTaskStatus::Idle); + statuses.push(Poll::Pending); true } }); @@ -53,7 +55,7 @@ impl<'s, T> AsyncTaskPool<'s, T> { } } -impl<'_s, T: Send + 'static> ExclusiveSystemParam for AsyncTaskPool<'_s, T> { +impl ExclusiveSystemParam for AsyncTaskPool<'_, T> { type State = SyncCell>>>; type Item<'s> = AsyncTaskPool<'s, T>; @@ -68,10 +70,10 @@ impl<'_s, T: Send + 'static> ExclusiveSystemParam for AsyncTaskPool<'_s, T> { } // SAFETY: only local state is accessed -unsafe impl<'s, T: Send + 'static> ReadOnlySystemParam for AsyncTaskPool<'s, T> {} +unsafe impl ReadOnlySystemParam for AsyncTaskPool<'_, T> {} // SAFETY: only local state is accessed -unsafe impl<'a, T: Send + 'static> SystemParam for AsyncTaskPool<'a, T> { +unsafe impl SystemParam for AsyncTaskPool<'_, T> { type State = SyncCell>>>; type Item<'w, 's> = AsyncTaskPool<'s, T>; diff --git a/src/task_runner.rs b/src/task_runner.rs index 3c23b82..bf874a1 100644 --- a/src/task_runner.rs +++ b/src/task_runner.rs @@ -1,4 +1,4 @@ -use crate::{AsyncReceiver, AsyncTask, AsyncTaskStatus}; +use crate::{AsyncReceiver, AsyncTask}; use bevy::{ ecs::{ component::Tick, @@ -9,12 +9,14 @@ use bevy::{ tasks::AsyncComputeTaskPool, utils::synccell::SyncCell, }; +use std::{sync::atomic::Ordering, task::Poll}; /// A Bevy [`SystemParam`] to execute [`AsyncTask`]s individually in the /// background. +#[derive(Debug)] pub struct AsyncTaskRunner<'s, T>(pub(crate) &'s mut Option>); -impl<'s, T> AsyncTaskRunner<'s, T> { +impl AsyncTaskRunner<'_, T> { /// Returns whether the task runner is idle. pub fn is_idle(&self) -> bool { self.0.is_none() @@ -23,7 +25,7 @@ impl<'s, T> AsyncTaskRunner<'s, T> { /// Returns whether the task runner is pending (running, but not finished). pub fn is_pending(&self) -> bool { if let Some(ref rx) = self.0 { - !rx.received + !rx.received.load(Ordering::Relaxed) } else { false } @@ -32,22 +34,12 @@ impl<'s, T> AsyncTaskRunner<'s, T> { /// Returns whether the task runner is finished. pub fn is_finished(&self) -> bool { if let Some(ref rx) = self.0 { - rx.received + rx.received.load(Ordering::Relaxed) } else { false } } - /// Block awaiting the task result. Can only be used outside of async - /// contexts. - /// - /// # Panics - /// Panics if called within an async context. - pub fn blocking_recv(&mut self, task: impl Into>) -> T { - let task = task.into(); - task.blocking_recv() - } - /// Start an async task in the background. If there is an existing task /// pending, it will be dropped and replaced with the given task. If you /// need to run multiple tasks, use the [`AsyncTaskPool`]. @@ -60,25 +52,25 @@ impl<'s, T> AsyncTaskRunner<'s, T> { self.0.replace(rx); } - /// Poll the task runner for the current task status. If no task has begun, - /// this will return `Idle`. Possible returns are `Idle`, `Pending`, or - /// `Finished(T)`. - #[must_use] - pub fn poll(&mut self) -> AsyncTaskStatus { + /// Poll the task runner for the current task status. Possible returns are `Pending` or `Ready(T)`. + pub fn poll(&mut self) -> Poll { match self.0.as_mut() { Some(rx) => match rx.try_recv() { Some(v) => { self.0.take(); - AsyncTaskStatus::Finished(v) + Poll::Ready(v) } - None => AsyncTaskStatus::Pending, + None => Poll::Pending, }, - None => AsyncTaskStatus::Idle, + None => { + warn!("You are polling a task runner before a task was started"); + Poll::Pending + } } } } -impl<'_s, T: Send + 'static> ExclusiveSystemParam for AsyncTaskRunner<'_s, T> { +impl ExclusiveSystemParam for AsyncTaskRunner<'_, T> { type State = SyncCell>>; type Item<'s> = AsyncTaskRunner<'s, T>; @@ -92,10 +84,10 @@ impl<'_s, T: Send + 'static> ExclusiveSystemParam for AsyncTaskRunner<'_s, T> { } // SAFETY: only local state is accessed -unsafe impl<'s, T: Send + 'static> ReadOnlySystemParam for AsyncTaskRunner<'s, T> {} +unsafe impl ReadOnlySystemParam for AsyncTaskRunner<'_, T> {} // SAFETY: only local state is accessed -unsafe impl<'a, T: Send + 'static> SystemParam for AsyncTaskRunner<'a, T> { +unsafe impl SystemParam for AsyncTaskRunner<'_, T> { type State = SyncCell>>; type Item<'w, 's> = AsyncTaskRunner<'s, T>;