Skip to content

Commit

Permalink
feat: Allow retryable error handles (#7)
Browse files Browse the repository at this point in the history
* feat: Allow retryable error handles

Signed-off-by: Xuanwo <github@xuanwo.io>

* Add docs

Signed-off-by: Xuanwo <github@xuanwo.io>

* Update overview

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix typo

Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Apr 14, 2022
1 parent 3fc925b commit a215593
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
authors = ["Xuanwo <github@xuanwo.io>"]
description = "Backoff policies"
description = "Retry futures in backoff without effort."
documentation = "https://docs.rs/backon"
edition = "2021"
license = "Apache-2.0"
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
[Latest Version]: https://img.shields.io/crates/v/backon.svg
[crates.io]: https://crates.io/crates/backon

Retry futures in backoff without effort.

---

The opposite backoff implementation of the popular [backoff](https://docs.rs/backoff).

- Newer: developed by Rust edition 2021 and latest stable.
Expand Down
25 changes: 25 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
//!
//! # Examples
//!
//! Retry with default settings.
//!
//! ```no_run
//! use backon::Retryable;
//! use backon::ExponentialBackoff;
Expand All @@ -27,8 +29,30 @@
//! #[tokio::main]
//! async fn main() -> Result<()> {
//! let content = fetch.retry(ExponentialBackoff::default()).await?;
//!
//! println!("fetch succeeded: {}", content);
//! Ok(())
//! }
//! ```
//!
//! Retry with specify retryable error.
//!
//! ```no_run
//! use backon::Retryable;
//! use backon::ExponentialBackoff;
//! use anyhow::Result;
//!
//! async fn fetch() -> Result<String> {
//! Ok(reqwest::get("https://www.rust-lang.org").await?.text().await?)
//! }
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//! let content = fetch
//! .retry(ExponentialBackoff::default())
//! .with_error_fn(|e| e.to_string() == "retryable").await?;
//!
//! println!("fetch succeeded: {}", content);
//! Ok(())
//! }
//! ```
Expand All @@ -43,4 +67,5 @@ mod exponential;
pub use exponential::ExponentialBackoff;

mod retry;
pub use retry::Retry;
pub use retry::Retryable;
162 changes: 140 additions & 22 deletions src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,48 +55,109 @@ use crate::Backoff;
/// }
/// ```
pub trait Retryable<
P: Backoff,
B: Backoff,
T,
E,
Fut: Future<Output = std::result::Result<T, E>>,
FutureFn: FnMut() -> Fut,
>
{
fn retry(self, policy: P) -> Retry<P, T, E, Fut, FutureFn>;
fn retry(self, backoff: B) -> Retry<B, T, E, Fut, FutureFn>;
}

impl<P, T, E, Fut, FutureFn> Retryable<P, T, E, Fut, FutureFn> for FutureFn
impl<B, T, E, Fut, FutureFn> Retryable<B, T, E, Fut, FutureFn> for FutureFn
where
P: Backoff,
B: Backoff,
Fut: Future<Output = std::result::Result<T, E>>,
FutureFn: FnMut() -> Fut,
{
fn retry(self, policy: P) -> Retry<P, T, E, Fut, FutureFn> {
Retry {
backoff: policy,
error_fn: |_: &E| true,
future_fn: self,
state: State::Idle,
}
fn retry(self, backoff: B) -> Retry<B, T, E, Fut, FutureFn> {
Retry::new(self, backoff)
}
}

#[pin_project]
pub struct Retry<
P: Backoff,
B: Backoff,
T,
E,
Fut: Future<Output = std::result::Result<T, E>>,
FutureFn: FnMut() -> Fut,
> {
backoff: P,
backoff: B,
error_fn: fn(&E) -> bool,
future_fn: FutureFn,

#[pin]
state: State<T, E, Fut>,
}

impl<B, T, E, Fut, FutureFn> Retry<B, T, E, Fut, FutureFn>
where
B: Backoff,
Fut: Future<Output = std::result::Result<T, E>>,
FutureFn: FnMut() -> Fut,
{
/// Create a new retry.
///
/// # Examples
///
/// ```no_run
/// use backon::Retryable;
/// use backon::Retry;
/// use backon::ExponentialBackoff;
/// use anyhow::Result;
///
/// async fn fetch() -> Result<String> {
/// Ok(reqwest::get("https://www.rust-lang.org").await?.text().await?)
/// }
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// let content = Retry::new(fetch, ExponentialBackoff::default()).await?;
/// println!("fetch succeeded: {}", content);
///
/// Ok(())
/// }
/// ```
pub fn new(future_fn: FutureFn, backoff: B) -> Self {
Retry {
backoff,
error_fn: |_: &E| true,
future_fn,
state: State::Idle,
}
}

/// Set error_fn of retry
///
/// # Examples
///
/// ```no_run
/// use backon::Retry;
/// use backon::ExponentialBackoff;
/// use anyhow::Result;
///
/// async fn fetch() -> Result<String> {
/// Ok(reqwest::get("https://www.rust-lang.org").await?.text().await?)
/// }
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// let retry = Retry::new(fetch, ExponentialBackoff::default())
/// .with_error_fn(|e| e.to_string() == "EOF");
/// let content = retry.await?;
/// println!("fetch succeeded: {}", content);
///
/// Ok(())
/// }
/// ```
pub fn with_error_fn(mut self, error_fn: fn(&E) -> bool) -> Self {
self.error_fn = error_fn;
self
}
}

/// State maintains internal state of retry.
///
/// # Notes
Expand All @@ -120,9 +181,9 @@ where
}
}

impl<P, T, E, Fut, FutureFn> Future for Retry<P, T, E, Fut, FutureFn>
impl<B, T, E, Fut, FutureFn> Future for Retry<B, T, E, Fut, FutureFn>
where
P: Backoff,
B: Backoff,
Fut: Future<Output = std::result::Result<T, E>>,
FutureFn: FnMut() -> Fut,
{
Expand All @@ -140,14 +201,20 @@ where
}
StateProject::Polling(fut) => match ready!(fut.poll(cx)) {
Ok(v) => return Poll::Ready(Ok(v)),
Err(err) => match this.backoff.next() {
None => return Poll::Ready(Err(err)),
Some(dur) => {
this.state
.set(State::Sleeping(Box::pin(tokio::time::sleep(dur))));
continue;
Err(err) => {
// If input error is not retryable, return error directly.
if !(this.error_fn)(&err) {
return Poll::Ready(Err(err));
}
match this.backoff.next() {
None => return Poll::Ready(Err(err)),
Some(dur) => {
this.state
.set(State::Sleeping(Box::pin(tokio::time::sleep(dur))));
continue;
}
}
},
}
},
StateProject::Sleeping(sl) => {
ready!(sl.poll(cx));
Expand All @@ -162,6 +229,7 @@ where
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::sync::Mutex;

use super::*;
use crate::exponential::ExponentialBackoff;
Expand All @@ -180,4 +248,54 @@ mod tests {
assert_eq!("test_query meets error", result.unwrap_err().to_string());
Ok(())
}

#[tokio::test]
async fn test_retry_with_not_retryable_error() -> anyhow::Result<()> {
let error_times = Mutex::new(0);

let f = || async {
let mut x = error_times.lock().await;
*x += 1;
Err::<(), anyhow::Error>(anyhow::anyhow!("not retryable"))
};

let backoff = ExponentialBackoff::default().with_min_delay(Duration::from_millis(1));
let result = f
.retry(backoff)
// Only retry If error message is `retryable`
.with_error_fn(|e| e.to_string() == "retryable")
.await;

assert!(result.is_err());
assert_eq!("not retryable", result.unwrap_err().to_string());
// `f` always returns error "not retryable", so it should be executed
// only once.
assert_eq!(*error_times.lock().await, 1);
Ok(())
}

#[tokio::test]
async fn test_retry_with_retryable_error() -> anyhow::Result<()> {
let error_times = Mutex::new(0);

let f = || async {
let mut x = error_times.lock().await;
*x += 1;
Err::<(), anyhow::Error>(anyhow::anyhow!("retryable"))
};

let backoff = ExponentialBackoff::default().with_min_delay(Duration::from_millis(1));
let result = f
.retry(backoff)
// Only retry If error message is `retryable`
.with_error_fn(|e| e.to_string() == "retryable")
.await;

assert!(result.is_err());
assert_eq!("retryable", result.unwrap_err().to_string());
// `f` always returns error "retryable", so it should be executed
// 4 times (retry 3 times).
assert_eq!(*error_times.lock().await, 4);
Ok(())
}
}

0 comments on commit a215593

Please sign in to comment.