Skip to content

Commit

Permalink
replace std::Mutex with parking_lot::Mutex
Browse files Browse the repository at this point in the history
Closes #204
  • Loading branch information
glendc committed May 27, 2024
1 parent 0e0e153 commit 53bbb23
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ iri-string = "0.7.0"
escargot = "0.5.10"
divan = "0.1.14"
webpki-roots = "0.26.1"
parking_lot = "0.12.3"

[package]
name = "rama"
Expand Down Expand Up @@ -129,6 +130,7 @@ opentelemetry = { workspace = true, optional = true }
opentelemetry-prometheus = { workspace = true, optional = true }
opentelemetry-semantic-conventions = { workspace = true, optional = true }
opentelemetry_sdk = { workspace = true, optional = true }
parking_lot = { workspace = true }
paste = { workspace = true }
percent-encoding = { workspace = true }
pin-project-lite = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions src/http/layer/retry/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::future::Future;
/// use rama::service::Context;
/// use rama::http::Request;
/// use rama::http::layer::retry::{Policy, PolicyResult, RetryBody};
/// use std::sync::{Arc, Mutex};
/// use std::sync::Arc;
/// use parking_lot::Mutex;
///
/// struct Attempts(Arc<Mutex<usize>>);
///
Expand All @@ -30,7 +31,7 @@ use std::future::Future;
/// Err(_) => {
/// // Treat all errors as failures...
/// // But we limit the number of attempts...
/// let mut attempts = self.0.lock().unwrap();
/// let mut attempts = self.0.lock();
/// if *attempts > 0 {
/// // Try again!
/// *attempts -= 1;
Expand Down
7 changes: 4 additions & 3 deletions src/http/layer/retry/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use crate::error::{error, OpaqueError};
use crate::http::{response::IntoResponse, BodyExtractExt};
use crate::http::{Request, Response};
use crate::service::{Service, ServiceBuilder};
use parking_lot::Mutex;
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
Arc,
};

#[tokio::test]
Expand Down Expand Up @@ -281,7 +282,7 @@ impl Policy<State, Response, Error> for Limit {
req: Request<RetryBody>,
result: Result<Response, Error>,
) -> PolicyResult<State, Response, Error> {
let mut attempts = self.0.lock().unwrap();
let mut attempts = self.0.lock();
if result.is_err() && *attempts > 0 {
*attempts -= 1;
PolicyResult::Retry { ctx, req }
Expand Down Expand Up @@ -369,7 +370,7 @@ where
_req: Request<RetryBody>,
_result: Result<Response, Error>,
) -> PolicyResult<State, Response, Error> {
let mut remaining = self.remaining.lock().unwrap();
let mut remaining = self.remaining.lock();
if *remaining == 0 {
PolicyResult::Abort(Err(error!("out of retries")))
} else {
Expand Down
7 changes: 4 additions & 3 deletions src/service/layer/limit/policy/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
use super::{Policy, PolicyOutput, PolicyResult};
use crate::service::{util::backoff::Backoff, Context};
use std::sync::{Arc, Mutex};
use parking_lot::Mutex;
use std::sync::Arc;

/// A [`Policy`] that limits the number of concurrent requests.
#[derive(Debug)]
Expand Down Expand Up @@ -256,7 +257,7 @@ impl ConcurrentTracker for ConcurrentCounter {
type Error = LimitReached;

fn try_access(&self) -> Result<Self::Guard, Self::Error> {
let mut current = self.current.lock().unwrap();
let mut current = self.current.lock();
if *current < self.max {
*current += 1;
Ok(ConcurrentCounterGuard {
Expand All @@ -276,7 +277,7 @@ pub struct ConcurrentCounterGuard {

impl Drop for ConcurrentCounterGuard {
fn drop(&mut self) {
let mut current = self.current.lock().unwrap();
let mut current = self.current.lock();
*current -= 1;
}
}
Expand Down
29 changes: 15 additions & 14 deletions src/service/util/backoff/exponential.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use parking_lot::Mutex;
use std::fmt::Display;
use std::time::Duration;
use std::{fmt::Display, sync::Mutex};
use tokio::time;

use crate::service::util::rng::{HasherRng, Rng};
Expand Down Expand Up @@ -134,7 +135,7 @@ impl<F, R: Rng> ExponentialBackoff<F, R> {
"Maximum backoff must be non-zero"
);
self.min
.checked_mul(2_u32.saturating_pow(self.state.lock().unwrap().iterations))
.checked_mul(2_u32.saturating_pow(self.state.lock().iterations))
.unwrap_or(self.max)
.min(self.max)
}
Expand All @@ -145,7 +146,7 @@ impl<F, R: Rng> ExponentialBackoff<F, R> {
if self.jitter <= 0.0 {
None
} else {
let jitter_factor = self.state.lock().unwrap().rng.next_f64();
let jitter_factor = self.state.lock().rng.next_f64();
debug_assert!(
jitter_factor > 0.0,
"rng returns values between 0.0 and 1.0"
Expand Down Expand Up @@ -181,14 +182,14 @@ where

let next = base + jitter;

self.state.lock().unwrap().iterations += 1;
self.state.lock().iterations += 1;

tokio::time::sleep(next).await;
true
}

async fn reset(&self) {
self.state.lock().unwrap().iterations = 0;
self.state.lock().iterations = 0;
}
}

Expand Down Expand Up @@ -232,26 +233,26 @@ mod tests {
async fn backoff_reset() {
let backoff = ExponentialBackoff::default();
assert!(backoff.next_backoff().await);
assert!(backoff.state.lock().unwrap().iterations == 1);
assert!(backoff.state.lock().iterations == 1);
backoff.reset().await;
assert!(backoff.state.lock().unwrap().iterations == 0);
assert!(backoff.state.lock().iterations == 0);
}

#[tokio::test]
async fn backoff_clone() {
let backoff = ExponentialBackoff::default();

assert!(backoff.state.lock().unwrap().iterations == 0);
assert!(backoff.state.lock().iterations == 0);
assert!(backoff.next_backoff().await);
assert!(backoff.state.lock().unwrap().iterations == 1);
assert!(backoff.state.lock().iterations == 1);

let cloned = backoff.clone();
assert!(cloned.state.lock().unwrap().iterations == 0);
assert!(backoff.state.lock().unwrap().iterations == 1);
assert!(cloned.state.lock().iterations == 0);
assert!(backoff.state.lock().iterations == 1);

assert!(cloned.next_backoff().await);
assert!(cloned.state.lock().unwrap().iterations == 1);
assert!(backoff.state.lock().unwrap().iterations == 1);
assert!(cloned.state.lock().iterations == 1);
assert!(backoff.state.lock().iterations == 1);
}

quickcheck! {
Expand All @@ -275,7 +276,7 @@ mod tests {
Ok(backoff) => backoff,
};

backoff.state.lock().unwrap().iterations = iterations;
backoff.state.lock().iterations = iterations;
let delay = backoff.base();
TestResult::from_bool(min <= delay && delay <= max)
}
Expand Down

0 comments on commit 53bbb23

Please sign in to comment.