Skip to content

Commit

Permalink
chore: Use shotgun for shutdown signal
Browse files Browse the repository at this point in the history
  • Loading branch information
its-laika committed Feb 4, 2025
1 parent ffa2aa9 commit 7355251
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 54 deletions.
6 changes: 6 additions & 0 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7.13", features = ["io"] }
uuid = { version = "1.12.1", features = ["v4"] }
shotgun = { git = "https://github.com/its-laika/shotgun.git", version = "0.1.2" }
4 changes: 2 additions & 2 deletions backend/src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use axum::{
};
use sea_orm::DatabaseConnection;
use std::io;
use tokio::{net::TcpListener, sync::broadcast};
use tokio::net::TcpListener;

pub async fn listen(
connection: DatabaseConnection,
mut shutdown: broadcast::Receiver<()>,
shutdown: shotgun::Receiver<()>,
) -> io::Result<()> {
let app = Router::new()
.route("/files", post(routes::upload::handler))
Expand Down
51 changes: 10 additions & 41 deletions backend/src/cleanup.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
use crate::{
database,
error::{Error, Result},
file,
};
use crate::{database, error::Result, file};
use sea_orm::DatabaseConnection;
use std::time::Duration;
use tokio::{
sync::broadcast::{self, error::TryRecvError},
time,
};
use tokio::{select, time};
use uuid::Uuid;

/// The interval in seconds between each cleanup operation.
Expand All @@ -20,20 +13,23 @@ const CLEANUP_INTERVAL_SECONDS: u64 = 10 * 60; /* 10 minutes */
/// # Arguments
///
/// * `database_connection` - A connection to the database.
/// * `shutdown` - A broadcast receiver to listen for shutdown signals.
/// * `shutdown` - A shotgun receiver to listen for shutdown signal.
///
/// # Returns
///
/// * `Result<()>` - Returns an Ok result if the cleanup process runs successfully,
/// or an error if something goes wrong.
pub async fn run(
database_connection: DatabaseConnection,
mut shutdown: broadcast::Receiver<()>,
shutdown: shotgun::Receiver<()>,
) -> Result<()> {
loop {
if !wait(&mut shutdown).await? {
return Ok(());
}
let shutdown = shutdown.clone();

select! {
_ = time::sleep(Duration::from_secs(CLEANUP_INTERVAL_SECONDS)) => (),
_ = shutdown => return Ok(()),
};

log::info!("Cleaning up outdating files...");

Expand All @@ -42,33 +38,6 @@ pub async fn run(
}
}

/// Waits for defined cleanup interval, returning early if `shutdown` is received.
///
/// # Arguments
///
/// * `shutdown` - A broadcast receiver to listen for shutdown signals.
///
/// # Returns
///
/// * `Result<bool>` - Returns an Ok result if the wait completes successfully,
/// with `true` if the wait was not interrupted by a shutdown signal, or `false` otherwise.
async fn wait(shutdown: &mut broadcast::Receiver<()>) -> Result<bool> {
let sleep_time = Duration::from_secs(1);

for _ in 0..CLEANUP_INTERVAL_SECONDS {
/* Prevent locking for 10 mins after a SIGINT like a spin lock */
match shutdown.try_recv() {
Err(TryRecvError::Empty) => (),
Err(_) => return Err(Error::BroadcastRecvFailed),
Ok(()) => return Ok(false),
};

time::sleep(sleep_time).await;
}

Ok(true)
}

/// Deletes outdated files from the file system.
///
/// # Arguments
Expand Down
2 changes: 0 additions & 2 deletions backend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub enum Error {
InvalidEncryptionData(String),
HashingFailure(String),
HashVerificationFailure(String),
BroadcastRecvFailed,
}

impl fmt::Debug for Error {
Expand All @@ -44,7 +43,6 @@ impl fmt::Debug for Error {
Self::InvalidEncryptionData(inner) => write!(f, "Invalid encryption data: {inner}"),
Self::HashingFailure(inner) => write!(f, "Hashing failure: {inner}"),
Self::HashVerificationFailure(inner) => write!(f, "Hash verification failure: {inner}"),
Self::BroadcastRecvFailed => write!(f, "Broadcast recv error"),
}
}
}
14 changes: 5 additions & 9 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use configuration::CONFIGURATION;
use migration::{Migrator, MigratorTrait};
use sea_orm::{ConnectOptions, Database, DatabaseConnection};
use std::{process, time::Duration};
use tokio::{signal::ctrl_c, sync::broadcast, task::JoinSet};
use tokio::{signal::ctrl_c, task::JoinSet};

mod api;
mod cleanup;
Expand Down Expand Up @@ -31,10 +31,10 @@ async fn main() {
};

let mut join_set = JoinSet::new();
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let (shotgun_tx, shotgun_rx) = shotgun::channel();

let api_database_connection = database_connection.clone();
let api_shutdown_rx = shutdown_rx.resubscribe();
let api_shutdown_rx = shotgun_rx.clone();

join_set.spawn(async move {
if let Err(error) = api::listen(api_database_connection, api_shutdown_rx).await {
Expand All @@ -43,7 +43,7 @@ async fn main() {
});

let cleanup_database_connection = database_connection.clone();
let cleanup_shutdown_rx = shutdown_rx.resubscribe();
let cleanup_shutdown_rx = shotgun_rx.clone();

join_set.spawn(async move {
if let Err(error) = cleanup::run(cleanup_database_connection, cleanup_shutdown_rx).await {
Expand All @@ -62,11 +62,7 @@ async fn main() {

info!("Received ctrl+c (SIGINT)");

if let Err(error) = shutdown_tx.send(()) {
error!("Could not inform about shutdown: {error}");
error!("Exiting process. Bye.");
process::exit(1);
};
shotgun_tx.send(());
});

join_set.join_all().await;
Expand Down

0 comments on commit 7355251

Please sign in to comment.