Skip to content

Commit ff53453

Browse files
committed
feat: add exponential backoff with jitter in the worker loop retrials
1 parent f041454 commit ff53453

File tree

5 files changed

+36
-12
lines changed

5 files changed

+36
-12
lines changed

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ htmlescape = "0.3.1"
3737
actix-web-flash-messages = { version = "0.4", features = ["cookies"] }
3838
actix-session = { version = "0.9", features = ["redis-rs-tls-session"] }
3939
serde_json = "1.0"
40+
exponential-backoff = "1.2.0"
4041

4142
[dependencies.reqwest]
4243
version = "0.12"

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Add these validation checks to our POST /admin/password endpoint.
2929

3030
### Section 11
3131
- [ ] There is no retry if the email delivery attempt fails. We could enhance this by adding a `n_retries` and `execute_after` columns to keep track of how many attemps have already taken place and when the next attempt should be executed.
32-
- [ ] Add an exponential backoff with jitter in the `issue_delivery_worker::worker_loop` function.
32+
- [X] Add an exponential backoff with jitter in the `issue_delivery_worker::worker_loop` function.
3333
- [ ] Add an expiry mechanism for the idempotency keys using background workers as reference.
3434

3535
## Troubleshooting

src/idempotency/persistence.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,7 @@ pub async fn save_response(
5959
http_response: HttpResponse,
6060
) -> Result<HttpResponse, anyhow::Error> {
6161
let (response_head, body) = http_response.into_parts();
62-
let body = to_bytes(body)
63-
.await
64-
.map_err(|e| anyhow::anyhow!("{}", e))?
65-
;
62+
let body = to_bytes(body).await.map_err(|e| anyhow::anyhow!("{}", e))?;
6663
let status_code = response_head.status().as_u16() as i16;
6764
let headers = {
6865
let mut h = Vec::with_capacity(response_head.headers().len());

src/issue_delivery_worker.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::domain::SubscriberEmail;
22
use crate::email_client::EmailClient;
33
use crate::{configuration::Settings, startup::get_connection_pool};
4+
use exponential_backoff::Backoff;
45
use sqlx::{Executor, PgPool, Postgres, Transaction};
6+
use std::ops::Add;
57
use std::time::Duration;
68
use tracing::{field::display, Span};
79
use uuid::Uuid;
@@ -34,16 +36,30 @@ async fn get_issue(pool: &PgPool, issue_id: Uuid) -> Result<NewsletterIssue, any
3436
Ok(issue)
3537
}
3638

39+
fn get_backoff() -> Backoff {
40+
let retries = 8;
41+
let min = Duration::from_secs(1);
42+
let max = Duration::from_secs(60);
43+
let mut backoff = Backoff::new(retries, min, max);
44+
backoff.set_jitter(0.25);
45+
backoff
46+
}
47+
3748
async fn worker_loop(pool: PgPool, email_client: EmailClient) -> Result<(), anyhow::Error> {
49+
let backoff = get_backoff();
3850
loop {
39-
match try_execute_task(&pool, &email_client).await {
40-
Ok(ExecutionOutcome::EmptyQueue) => {
41-
tokio::time::sleep(Duration::from_secs(10)).await;
42-
}
43-
Err(_) => {
44-
tokio::time::sleep(Duration::from_secs(1)).await;
51+
for duration in &backoff {
52+
match try_execute_task(&pool, &email_client).await {
53+
Ok(ExecutionOutcome::EmptyQueue) => {
54+
tokio::time::sleep(duration.add(Duration::from_secs(10))).await;
55+
}
56+
Err(_) => {
57+
tokio::time::sleep(duration).await;
58+
}
59+
Ok(ExecutionOutcome::TaskCompleted) => {
60+
break;
61+
}
4562
}
46-
Ok(ExecutionOutcome::TaskCompleted) => {}
4763
}
4864
}
4965
}

0 commit comments

Comments
 (0)