Skip to content

Commit

Permalink
refactor: create a DB transaction for the whole subscribe process
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsavio committed Apr 3, 2024
1 parent 3988725 commit e77fd23
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions src/routes/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use actix_web::{web, HttpResponse};
use chrono::Utc;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use sqlx::PgPool;
use sqlx::{Executor, PgPool, Postgres, Transaction};
use uuid::Uuid;

#[derive(serde::Deserialize)]
Expand Down Expand Up @@ -52,17 +52,24 @@ pub async fn subscribe(
Ok(subscriber) => subscriber,
Err(_) => return HttpResponse::BadRequest().finish(),
};
let subscriber_id = match insert_subscriber(&pool, &new_subscriber).await {
let mut transaction = match pool.begin().await {
Ok(transaction) => transaction,
Err(_) => return HttpResponse::InternalServerError().finish(),
};
let subscriber_id = match insert_subscriber(&mut transaction, &new_subscriber).await {
Ok(subscriber_id) => subscriber_id,
Err(_) => return HttpResponse::InternalServerError().finish(),
};
let subscription_token = generate_subscription_token();
if store_token(&pool, subscriber_id, &subscription_token)
if store_token(&mut transaction, subscriber_id, &subscription_token)
.await
.is_err()
{
return HttpResponse::InternalServerError().finish();
}
if transaction.commit().await.is_err() {
return HttpResponse::InternalServerError().finish();
}
if send_confirmation_email(
&email_client,
new_subscriber,
Expand Down Expand Up @@ -106,22 +113,20 @@ pub async fn send_confirmation_email(

#[tracing::instrument(
name = "Store subscription token in the database",
skip(subscription_token, pool)
skip(subscription_token, transaction)
)]
pub async fn store_token(
pool: &PgPool,
transaction: &mut Transaction<'_, Postgres>,
subscriber_id: Uuid,
subscription_token: &str,
) -> Result<(), sqlx::Error> {
sqlx::query!(
let query = sqlx::query!(
r#"INSERT INTO subscription_tokens (subscription_token, subscriber_id)
VALUES ($1, $2)"#,
subscription_token,
subscriber_id
)
.execute(pool)
.await
.map_err(|e| {
);
transaction.execute(query).await.map_err(|e| {
tracing::error!("Failed to execute query: {:?}", e);
e
})?;
Expand All @@ -130,25 +135,23 @@ pub async fn store_token(

#[tracing::instrument(
name = "Saving new subscriber details in the database",
skip(new_subscriber, pool)
skip(new_subscriber, transaction)
)]
pub async fn insert_subscriber(
pool: &PgPool,
transaction: &mut Transaction<'_, Postgres>,
new_subscriber: &NewSubscriber,
) -> Result<Uuid, sqlx::Error> {
let subscriber_id = Uuid::new_v4();
sqlx::query!(
let query = sqlx::query!(
r#"
INSERT INTO subscriptions (id, email, name, subscribed_at, status)
VALUES ($1, $2, $3, $4, 'pending_confirmation')"#,
subscriber_id,
new_subscriber.email.as_ref(),
new_subscriber.name.as_ref(),
Utc::now()
)
.execute(pool)
.await
.map_err(|e| {
);
transaction.execute(query).await.map_err(|e| {
tracing::error!("Failed to execute query: {:?}", e);
e
})?;
Expand Down

0 comments on commit e77fd23

Please sign in to comment.