From ee4331d2c1126de2fba6aa93b16f6f5cbcb820be Mon Sep 17 00:00:00 2001 From: Philippe Llerena Date: Thu, 5 Sep 2024 17:44:03 +0200 Subject: [PATCH] create a trait and factorize --- Cargo.lock | 37 +++++----- crates/spfs-cli/cmd-monitor/Cargo.toml | 1 + .../spfs-cli/cmd-monitor/src/cmd_monitor.rs | 48 ++++++------- crates/spfs-cli/cmd-monitor/src/signal.rs | 67 +++++++++++++++++++ 4 files changed, 107 insertions(+), 46 deletions(-) create mode 100644 crates/spfs-cli/cmd-monitor/src/signal.rs diff --git a/Cargo.lock b/Cargo.lock index 10e357aad..ca30a51c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1270,9 +1270,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", @@ -1285,9 +1285,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -1295,15 +1295,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" dependencies = [ "futures-core", "futures-task", @@ -1312,15 +1312,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-macro" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", @@ -1329,15 +1329,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-timer" @@ -1347,9 +1347,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-channel", "futures-core", @@ -3768,6 +3768,7 @@ name = "spfs-cli-monitor" version = "0.42.0" dependencies = [ "clap 4.5.0", + "futures", "miette", "nix", "spfs", diff --git a/crates/spfs-cli/cmd-monitor/Cargo.toml b/crates/spfs-cli/cmd-monitor/Cargo.toml index 834bf3740..db5cf8b73 100644 --- a/crates/spfs-cli/cmd-monitor/Cargo.toml +++ b/crates/spfs-cli/cmd-monitor/Cargo.toml @@ -28,3 +28,4 @@ spfs-cli-common = { workspace = true } tokio = { version = "1.20", features = ["rt", "rt-multi-thread"] } tracing = { workspace = true } url = "2.2" +futures = "0.3.30" diff --git a/crates/spfs-cli/cmd-monitor/src/cmd_monitor.rs b/crates/spfs-cli/cmd-monitor/src/cmd_monitor.rs index 3eecfa565..9808af0cf 100644 --- a/crates/spfs-cli/cmd-monitor/src/cmd_monitor.rs +++ b/crates/spfs-cli/cmd-monitor/src/cmd_monitor.rs @@ -18,6 +18,16 @@ use tokio::signal::unix::{signal, SignalKind}; use tokio::signal::windows::ctrl_c; use tokio::time::timeout; +mod signal; +#[cfg(unix)] +use signal::unix_signal_handler::UnixSignalHandler as SignalHandlerImpl; +#[cfg(windows)] +use windows_signal_handler::WindowsSignalHandler as SignalHandlerImpl; + +use signal::SignalHandler; + + + fn main() -> Result<()> { // because this function exits right away it does not // properly handle destruction of data, so we put the actual @@ -144,32 +154,16 @@ impl CmdMonitor { } pub async fn run_async(&mut self, config: &spfs::Config) -> Result { - #[cfg(unix)] - let mut interrupt = signal(SignalKind::interrupt()) - .map_err(|err| Error::process_spawn_error("signal()", err, None))?; - #[cfg(windows)] - let mut interrupt = - ctrl_c().map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?; - #[cfg(unix)] - let mut quit = signal(SignalKind::quit()) - .map_err(|err| Error::process_spawn_error("signal()", err, None))?; - #[cfg(windows)] - let mut quit = ctrl_c().map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?; - #[cfg(unix)] - let mut terminate = signal(SignalKind::terminate()) - .map_err(|err| Error::process_spawn_error("signal()", err, None))?; - #[cfg(windows)] - let mut terminate = - ctrl_c().map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?; - + let signal_future = SignalHandlerImpl::build_signal_future(); + let repo = spfs::open_repository(&self.runtime_storage).await?; let storage = spfs::runtime::Storage::new(repo)?; let runtime = storage.read_runtime(&self.runtime).await?; tracing::trace!("read runtime from storage repo"); - + let mut owned = spfs::runtime::OwnedRuntime::upgrade_as_monitor(runtime).await?; tracing::trace!("upgraded to owned runtime, waiting for empty runtime"); - + let fut = spfs::monitor::wait_for_empty_runtime(&owned, config); let res = tokio::select! { res = fut => { @@ -178,18 +172,16 @@ impl CmdMonitor { } // we explicitly catch any signal related to interruption // and will act by cleaning up the runtime early - _ = terminate.recv() => Err(spfs::Error::String("Terminate signal received, cleaning up runtime early".to_string())), - _ = interrupt.recv() => Err(spfs::Error::String("Interrupt signal received, cleaning up runtime early".to_string())), - _ = quit.recv() => Err(spfs::Error::String("Quit signal received, cleaning up runtime early".to_string())), + _ = signal_future => Err(spfs::Error::String("Signal received, cleaning up runtime early".to_string())), }; tracing::trace!("runtime empty of processes "); - + // need to reload the runtime here to get any changes made to // the runtime while it was running so we don't blast them the // next time this process saves the runtime state. tracing::trace!("reloading runtime data before cleanup"); owned.reload_state_from_storage().await?; - + // try to set the running to false to make this // runtime easier to identify as safe to delete // if the automatic cleanup fails. Any error @@ -198,12 +190,12 @@ impl CmdMonitor { if let Err(err) = owned.save_state_to_storage().await { tracing::error!("failed to save runtime: {err:?}"); } - + tracing::trace!("tearing down and exiting"); if let Err(err) = spfs::exit_runtime(&owned).await { tracing::error!("failed to tear down runtime: {err:?}"); } - + tracing::trace!( "{} runtime data", if owned.is_durable() { @@ -224,7 +216,7 @@ impl CmdMonitor { } else if let Err(err) = owned.delete().await { tracing::error!("failed to clean up runtime data: {err:?}") } - + res?; Ok(0) } diff --git a/crates/spfs-cli/cmd-monitor/src/signal.rs b/crates/spfs-cli/cmd-monitor/src/signal.rs new file mode 100644 index 000000000..bcc6027c1 --- /dev/null +++ b/crates/spfs-cli/cmd-monitor/src/signal.rs @@ -0,0 +1,67 @@ +use futures::future::Future; +use std::pin::Pin; +use spfs::Error; + +pub trait SignalHandler { + fn build_signal_future() -> Pin> + Send>>; +} + +#[cfg(unix)] +pub mod unix_signal_handler { + use super::*; + use tokio::signal::unix::{signal, SignalKind}; + + pub struct UnixSignalHandler; + + impl SignalHandler for UnixSignalHandler { + fn build_signal_future() -> Pin> + Send>> { + Box::pin(async move { + let mut interrupt = signal(SignalKind::interrupt()) + .map_err(|err| Error::process_spawn_error("signal()", err, None))?; + let mut quit = signal(SignalKind::quit()) + .map_err(|err| Error::process_spawn_error("signal()", err, None))?; + let mut terminate = signal(SignalKind::terminate()) + .map_err(|err| Error::process_spawn_error("signal()", err, None))?; + + futures::future::select_all(vec![ + Box::pin(interrupt.recv()), + Box::pin(quit.recv()), + Box::pin(terminate.recv()), + ]) + .await; + + Ok(()) + }) + } + } +} + +#[cfg(windows)] +pub mod windows_signal_handler { + use super::*; + use tokio::signal::ctrl_c; + + pub struct WindowsSignalHandler; + + impl SignalHandler for WindowsSignalHandler { + fn build_signal_future() -> Pin> + Send>> { + Box::pin(async move { + let mut interrupt = ctrl_c() + .map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?; + let mut quit = ctrl_c() + .map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?; + let mut terminate = ctrl_c() + .map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?; + + futures::future::select_all(vec![ + Box::pin(interrupt), + Box::pin(quit), + Box::pin(terminate), + ]) + .await; + + Ok(()) + }) + } + } +} \ No newline at end of file