From 73552511bb6c5e720b59b30471a6eef858ed5ff8 Mon Sep 17 00:00:00 2001 From: Laika <36010519+its-laika@users.noreply.github.com> Date: Tue, 4 Feb 2025 22:11:52 +0100 Subject: [PATCH] chore: Use `shotgun` for shutdown signal --- backend/Cargo.lock | 6 +++++ backend/Cargo.toml | 1 + backend/src/api/server.rs | 4 +-- backend/src/cleanup.rs | 51 ++++++++------------------------------- backend/src/error.rs | 2 -- backend/src/main.rs | 14 ++++------- 6 files changed, 24 insertions(+), 54 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 58fcccb..a04218b 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -2698,6 +2698,11 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "shotgun" +version = "0.1.2" +source = "git+https://github.com/its-laika/shotgun.git#c5db12f41911ed1f545951f5fe953bf733c8ed51" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -3391,6 +3396,7 @@ dependencies = [ "sea-orm", "serde", "serde_json", + "shotgun", "tokio", "tokio-util", "uuid", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 0c11615..ec71f76 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -31,3 +31,4 @@ serde_json = "1.0" tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7.13", features = ["io"] } uuid = { version = "1.12.1", features = ["v4"] } +shotgun = { git = "https://github.com/its-laika/shotgun.git", version = "0.1.2" } diff --git a/backend/src/api/server.rs b/backend/src/api/server.rs index a633f15..2e19e66 100644 --- a/backend/src/api/server.rs +++ b/backend/src/api/server.rs @@ -6,11 +6,11 @@ use axum::{ }; use sea_orm::DatabaseConnection; use std::io; -use tokio::{net::TcpListener, sync::broadcast}; +use tokio::net::TcpListener; pub async fn listen( connection: DatabaseConnection, - mut shutdown: broadcast::Receiver<()>, + shutdown: shotgun::Receiver<()>, ) -> io::Result<()> { let app = Router::new() .route("/files", post(routes::upload::handler)) diff --git a/backend/src/cleanup.rs b/backend/src/cleanup.rs index f517738..6e77430 100644 --- a/backend/src/cleanup.rs +++ b/backend/src/cleanup.rs @@ -1,14 +1,7 @@ -use crate::{ - database, - error::{Error, Result}, - file, -}; +use crate::{database, error::Result, file}; use sea_orm::DatabaseConnection; use std::time::Duration; -use tokio::{ - sync::broadcast::{self, error::TryRecvError}, - time, -}; +use tokio::{select, time}; use uuid::Uuid; /// The interval in seconds between each cleanup operation. @@ -20,7 +13,7 @@ const CLEANUP_INTERVAL_SECONDS: u64 = 10 * 60; /* 10 minutes */ /// # Arguments /// /// * `database_connection` - A connection to the database. -/// * `shutdown` - A broadcast receiver to listen for shutdown signals. +/// * `shutdown` - A shotgun receiver to listen for shutdown signal. /// /// # Returns /// @@ -28,12 +21,15 @@ const CLEANUP_INTERVAL_SECONDS: u64 = 10 * 60; /* 10 minutes */ /// or an error if something goes wrong. pub async fn run( database_connection: DatabaseConnection, - mut shutdown: broadcast::Receiver<()>, + shutdown: shotgun::Receiver<()>, ) -> Result<()> { loop { - if !wait(&mut shutdown).await? { - return Ok(()); - } + let shutdown = shutdown.clone(); + + select! { + _ = time::sleep(Duration::from_secs(CLEANUP_INTERVAL_SECONDS)) => (), + _ = shutdown => return Ok(()), + }; log::info!("Cleaning up outdating files..."); @@ -42,33 +38,6 @@ pub async fn run( } } -/// Waits for defined cleanup interval, returning early if `shutdown` is received. -/// -/// # Arguments -/// -/// * `shutdown` - A broadcast receiver to listen for shutdown signals. -/// -/// # Returns -/// -/// * `Result` - Returns an Ok result if the wait completes successfully, -/// with `true` if the wait was not interrupted by a shutdown signal, or `false` otherwise. -async fn wait(shutdown: &mut broadcast::Receiver<()>) -> Result { - let sleep_time = Duration::from_secs(1); - - for _ in 0..CLEANUP_INTERVAL_SECONDS { - /* Prevent locking for 10 mins after a SIGINT like a spin lock */ - match shutdown.try_recv() { - Err(TryRecvError::Empty) => (), - Err(_) => return Err(Error::BroadcastRecvFailed), - Ok(()) => return Ok(false), - }; - - time::sleep(sleep_time).await; - } - - Ok(true) -} - /// Deletes outdated files from the file system. /// /// # Arguments diff --git a/backend/src/error.rs b/backend/src/error.rs index 0a0e496..5a79a72 100644 --- a/backend/src/error.rs +++ b/backend/src/error.rs @@ -21,7 +21,6 @@ pub enum Error { InvalidEncryptionData(String), HashingFailure(String), HashVerificationFailure(String), - BroadcastRecvFailed, } impl fmt::Debug for Error { @@ -44,7 +43,6 @@ impl fmt::Debug for Error { Self::InvalidEncryptionData(inner) => write!(f, "Invalid encryption data: {inner}"), Self::HashingFailure(inner) => write!(f, "Hashing failure: {inner}"), Self::HashVerificationFailure(inner) => write!(f, "Hash verification failure: {inner}"), - Self::BroadcastRecvFailed => write!(f, "Broadcast recv error"), } } } diff --git a/backend/src/main.rs b/backend/src/main.rs index b913f0b..af2b0a5 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -2,7 +2,7 @@ use configuration::CONFIGURATION; use migration::{Migrator, MigratorTrait}; use sea_orm::{ConnectOptions, Database, DatabaseConnection}; use std::{process, time::Duration}; -use tokio::{signal::ctrl_c, sync::broadcast, task::JoinSet}; +use tokio::{signal::ctrl_c, task::JoinSet}; mod api; mod cleanup; @@ -31,10 +31,10 @@ async fn main() { }; let mut join_set = JoinSet::new(); - let (shutdown_tx, shutdown_rx) = broadcast::channel(1); + let (shotgun_tx, shotgun_rx) = shotgun::channel(); let api_database_connection = database_connection.clone(); - let api_shutdown_rx = shutdown_rx.resubscribe(); + let api_shutdown_rx = shotgun_rx.clone(); join_set.spawn(async move { if let Err(error) = api::listen(api_database_connection, api_shutdown_rx).await { @@ -43,7 +43,7 @@ async fn main() { }); let cleanup_database_connection = database_connection.clone(); - let cleanup_shutdown_rx = shutdown_rx.resubscribe(); + let cleanup_shutdown_rx = shotgun_rx.clone(); join_set.spawn(async move { if let Err(error) = cleanup::run(cleanup_database_connection, cleanup_shutdown_rx).await { @@ -62,11 +62,7 @@ async fn main() { info!("Received ctrl+c (SIGINT)"); - if let Err(error) = shutdown_tx.send(()) { - error!("Could not inform about shutdown: {error}"); - error!("Exiting process. Bye."); - process::exit(1); - }; + shotgun_tx.send(()); }); join_set.join_all().await;