Skip to content

Commit

Permalink
fix!: honor timeouts for long sync check
Browse files Browse the repository at this point in the history
Signed-off-by: vados <vados@vadosware.io>
  • Loading branch information
t3hmrman committed Jul 12, 2023
1 parent 75de595 commit 68d3252
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 83 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ version = "0.1.0"
edition = "2021"
description = "Run a closure continuously, until is succeeds or times out."
license = "MIT"
license-file = "LICENSE"
readme = "README.md"
rust-version = "1.70"
authors = [
Expand Down
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,16 @@ cargo run --example async-std --features=async-std
If you'd like to control more finely the intervals and how many times a check will occur, you can create the `Waiter` object(s) yourself:

```rust
use situwaition::runtime::AsyncWaiter;
use situwaition::runtime::SyncWaiter;

// Synchronous code
situwaition::sync::SyncWaiter::with_timeout(|| { ... }, Duration::from_millis(500));
SyncWaiter::with_timeout(|| { ... }, Duration::from_millis(500))?;

// Asynchronous code (either tokio or async-std)
situwaition::runtime::AsyncWaiter::with_timeout(|| async { ... }, Duration::from_millis(500)).exect().await;
AsyncWaiter::with_timeout(|| async { ... }, Duration::from_millis(500))?
.exec()
.await;
```

See the methods on [`SyncWaiter`](./src/sync.rs) and [`AsyncWaiter`](./src/runtime/mod.rs) for more options.
Expand Down
6 changes: 3 additions & 3 deletions examples/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
assert!(matches!(result, 42));
eprintln!("resulting value is: {}", result);

// An always failing example
let result = AsyncWaiter::with_timeout(
// This async waiter always fails, so it will resolve to a failure in 500ms
let _ = AsyncWaiter::with_timeout(
|| async { Err(ExampleError::NotDoneCountingError) as Result<(), ExampleError> },
Duration::from_millis(500),
)
)?
.exec()
.await;
eprintln!("asynchronous always-failling result: {:?}", result);
Expand Down
2 changes: 1 addition & 1 deletion examples/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let result = situwaition::sync::SyncWaiter::with_timeout(
|| Err(ExampleError::NotDoneCountingError) as Result<(), ExampleError>,
Duration::from_millis(500),
)
)?
.exec();
eprintln!("synchronous always-failling result: {:?}", result);

Expand Down
6 changes: 3 additions & 3 deletions examples/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
assert!(matches!(result, 42));
eprintln!("resulting value is: {}", result);

// An always failing example
let result = AsyncWaiter::with_timeout(
// This async waiter always fails, so it will resolve to a failure in 500ms
let _ = AsyncWaiter::with_timeout(
|| async { Err(ExampleError::NotDoneCountingError) as Result<(), ExampleError> },
Duration::from_millis(500),
)
)?
.exec()
.await;
eprintln!("asynchronous always-failling result: {:?}", result);
Expand Down
34 changes: 26 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,32 +45,40 @@ pub enum SituwaitionError<E> {
#[error("failed repeatedly until the timeout: {0}")]
TimeoutError(E),

#[error("check fn run exceeded the timeout")]
CheckTimeoutError,

/// A single conditoin failure
#[error("condition check failed: {0}")]
ConditionFailed(E),

#[cfg(feature = "tokio")]
#[error("error joining tokio task: {0}")]
TokioJoinError(tokio::task::JoinError),

#[cfg(feature = "tokio")]
#[error("unexpected error")]
UnexpectedError,
#[error("unexpected error: {0}")]
UnexpectedError(String),
}

/// Options for a given situwaition
#[allow(dead_code)]
#[derive(Debug, Clone, Builder)]
pub struct SituwaitionOpts {
/// The maximum time to wait for a situwaition
pub timeout: Duration,

/// How often to check for a passing condition.
/// Note that in the synchronous case, this determines how quickly
/// you can return *before* a check actually completes (i.e. timing in 100ms when check_fn takes 500ms)
pub check_interval: Duration,

/// Time to wait after a check has been performed.
/// Use this to avoid running resource-intensive checks too frequently
pub check_cooldown: Option<Duration>,
}

impl Default for SituwaitionOpts {
fn default() -> Self {
SituwaitionOpts {
timeout: Duration::from_millis(DEFAULT_SITUWAITION_TIMEOUT_MS),
check_interval: Duration::from_millis(DEFAULT_SITUWAITION_CHECK_INTERVAL_MS),
check_cooldown: None,
}
}
}
Expand All @@ -94,7 +102,7 @@ pub trait SituwaitionBase {
pub trait SyncSituwaition: SituwaitionBase {
/// Execute the situwaition, and wait until it resolves
/// or fails with a timeout
fn exec(&self) -> Result<Self::Result, SituwaitionError<Self::Error>>;
fn exec(&mut self) -> Result<Self::Result, SituwaitionError<Self::Error>>;
}

/// This trait represents a "situwaition" that can be a"waited".
Expand All @@ -106,3 +114,13 @@ pub trait AsyncSituwaition: SituwaitionBase {
/// or fails with a timeout
async fn exec(&mut self) -> Result<Self::Result, SituwaitionError<Self::Error>>;
}

/// Errors that are thrown during waiter creation
#[derive(Debug, Clone, Error)]
pub enum WaiterCreationError {
#[error("invalid timeout: {0}")]
InvalidTimeout(String),

#[error("invalid interval: {0}")]
InvalidInterval(String),
}
57 changes: 48 additions & 9 deletions src/runtime/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{error::Error, future::Future, time::Instant};

use async_std::task::sleep;
use async_std::{future::timeout, task::sleep};
use async_trait::async_trait;

use crate::{AsyncSituwaition, SituwaitionError};
Expand All @@ -19,12 +19,22 @@ where
{
async fn exec(&mut self) -> Result<R, SituwaitionError<E>> {
let start = Instant::now();
let check_timeout = self.opts.timeout;
let cooldown = self.opts.check_cooldown;

loop {
let fut = (self.factory)();
match fut.await {
Ok(v) => return Ok(v),
Err(e) => {
match timeout(check_timeout, fut).await {
// Check completed in time and successfully and we can return
Ok(Ok(v)) => return Ok(v),
// Check timed out
Err(_) => return Err(SituwaitionError::CheckTimeoutError),
// Check completed in time but failed
Ok(Err(e)) => {
if let Some(t) = cooldown {
sleep(t).await;
}

if Instant::now() - start > self.opts.timeout {
return Err(SituwaitionError::TimeoutError(e));
}
Expand Down Expand Up @@ -70,7 +80,7 @@ mod tests {
}

#[async_std::test]
async fn test_unit_async_std_async_executor_from_fn() {
async fn test_unit_async_std_from_fn() {
assert!(
matches!(
AsyncWaiter::from_factory(|| async { Ok::<bool, std::io::Error>(true) })
Expand All @@ -83,28 +93,30 @@ mod tests {
}

#[async_std::test]
async fn test_unit_async_std_async_executor_exec_fail() {
async fn test_unit_async_std_exec_fail() {
assert!(matches!(
AsyncWaiter::with_timeout(
|| async {
Err::<(), std::io::Error>(std::io::Error::new(ErrorKind::Other, "test"))
},
Duration::from_millis(500)
)
.expect("failed to create")
.exec()
.await,
Err(SituwaitionError::TimeoutError(std::io::Error { .. })),
),);
}

#[async_std::test]
async fn test_unit_async_std_async_executor_exec_pass() {
async fn test_unit_async_std_exec_pass() {
assert!(
matches!(
AsyncWaiter::with_check_interval(
|| async { Ok::<bool, std::io::Error>(true) },
Duration::from_millis(100),
)
.expect("failed to create")
.exec()
.await,
Ok(true)
Expand All @@ -114,7 +126,7 @@ mod tests {
}

#[async_std::test]
async fn test_unit_async_std_wait_for_async_executor_with_timeout() {
async fn test_unit_async_std_wait_for_with_timeout() {
let start = Instant::now();

assert!(
Expand All @@ -125,6 +137,7 @@ mod tests {
},
Duration::from_millis(500),
)
.expect("failed to create")
.exec()
.await,
Err(SituwaitionError::TimeoutError(std::io::Error { .. })),
Expand All @@ -138,7 +151,7 @@ mod tests {
}

#[async_std::test]
async fn test_unit_async_std_async_executor_with_check_interval() {
async fn test_unit_async_std_with_check_interval() {
let start = Instant::now();

assert!(
Expand All @@ -147,6 +160,7 @@ mod tests {
|| async { Ok::<bool, std::io::Error>(true) },
Duration::from_millis(100)
)
.expect("failed to create")
.exec()
.await,
Ok(true)
Expand All @@ -158,4 +172,29 @@ mod tests {
"passed faster than default interval (250ms) w/ shorter interval"
);
}

#[async_std::test]
async fn test_unit_async_std_with_long_check() {
let start = Instant::now();
assert!(
matches!(
AsyncWaiter::with_timeout(
|| async {
sleep(Duration::from_millis(500)).await;
Ok::<bool, std::io::Error>(true)
},
Duration::from_millis(250)
)
.expect("failed to create")
.exec()
.await,
Err(SituwaitionError::CheckTimeoutError),
),
"check that finishes in 500ms times out in 100ms as configured"
);
assert!(
Instant::now() - start < Duration::from_millis(500),
"timed out before the check completed"
);
}
}
34 changes: 26 additions & 8 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ use std::{error::Error, future::Future, time::Duration};

use derive_builder::Builder;

use crate::{SituwaitionBase, SituwaitionError, SituwaitionOpts};
use crate::{
SituwaitionBase, SituwaitionError, SituwaitionOpts, WaiterCreationError,
DEFAULT_SITUWAITION_CHECK_INTERVAL_MS, DEFAULT_SITUWAITION_TIMEOUT_MS,
};

#[cfg(feature = "async-std")]
pub mod async_std;
#[cfg(feature = "tokio")]
pub mod tokio;


#[derive(Builder)]
pub struct AsyncWaiter<F, A, R, E>
where
Expand Down Expand Up @@ -72,25 +74,41 @@ where

/// Create a SyncExecutor with only timeout customized
#[allow(dead_code)]
pub fn with_timeout(factory: A, timeout: Duration) -> AsyncWaiter<F, A, R, E> {
Self::with_opts(
pub fn with_timeout(
factory: A,
timeout: Duration,
) -> Result<AsyncWaiter<F, A, R, E>, WaiterCreationError> {
if timeout < Duration::from_millis(DEFAULT_SITUWAITION_CHECK_INTERVAL_MS) {
return Err(WaiterCreationError::InvalidTimeout(
format!("supplied timeout ({}ms) is shorter the default timeout ({DEFAULT_SITUWAITION_CHECK_INTERVAL_MS}ms)", timeout.as_millis())
));
}
Ok(Self::with_opts(
factory,
SituwaitionOpts {
timeout,
..SituwaitionOpts::default()
},
)
))
}

/// Create a SyncExecutor with only check interval customized
#[allow(dead_code)]
pub fn with_check_interval(factory: A, check_interval: Duration) -> AsyncWaiter<F, A, R, E> {
Self::with_opts(
pub fn with_check_interval(
factory: A,
check_interval: Duration,
) -> Result<AsyncWaiter<F, A, R, E>, WaiterCreationError> {
if check_interval > Duration::from_millis(DEFAULT_SITUWAITION_TIMEOUT_MS) {
return Err(WaiterCreationError::InvalidTimeout(
format!("supplied check interval ({}ms) is larger than the default timeout ({DEFAULT_SITUWAITION_TIMEOUT_MS}ms)", check_interval.as_millis())
));
}
Ok(Self::with_opts(
factory,
SituwaitionOpts {
check_interval,
..SituwaitionOpts::default()
},
)
))
}
}
Loading

0 comments on commit 68d3252

Please sign in to comment.