Skip to content

Commit

Permalink
create a trait and factorize
Browse files Browse the repository at this point in the history
  • Loading branch information
doubleailes committed Sep 11, 2024
1 parent 97489fe commit ee4331d
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 46 deletions.
37 changes: 19 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/spfs-cli/cmd-monitor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ spfs-cli-common = { workspace = true }
tokio = { version = "1.20", features = ["rt", "rt-multi-thread"] }
tracing = { workspace = true }
url = "2.2"
futures = "0.3.30"
48 changes: 20 additions & 28 deletions crates/spfs-cli/cmd-monitor/src/cmd_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ use tokio::signal::unix::{signal, SignalKind};
use tokio::signal::windows::ctrl_c;
use tokio::time::timeout;

mod signal;
#[cfg(unix)]
use signal::unix_signal_handler::UnixSignalHandler as SignalHandlerImpl;
#[cfg(windows)]
use windows_signal_handler::WindowsSignalHandler as SignalHandlerImpl;

use signal::SignalHandler;



fn main() -> Result<()> {
// because this function exits right away it does not
// properly handle destruction of data, so we put the actual
Expand Down Expand Up @@ -144,32 +154,16 @@ impl CmdMonitor {
}

pub async fn run_async(&mut self, config: &spfs::Config) -> Result<i32> {
#[cfg(unix)]
let mut interrupt = signal(SignalKind::interrupt())
.map_err(|err| Error::process_spawn_error("signal()", err, None))?;
#[cfg(windows)]
let mut interrupt =
ctrl_c().map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?;
#[cfg(unix)]
let mut quit = signal(SignalKind::quit())
.map_err(|err| Error::process_spawn_error("signal()", err, None))?;
#[cfg(windows)]
let mut quit = ctrl_c().map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?;
#[cfg(unix)]
let mut terminate = signal(SignalKind::terminate())
.map_err(|err| Error::process_spawn_error("signal()", err, None))?;
#[cfg(windows)]
let mut terminate =
ctrl_c().map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?;

let signal_future = SignalHandlerImpl::build_signal_future();

let repo = spfs::open_repository(&self.runtime_storage).await?;
let storage = spfs::runtime::Storage::new(repo)?;
let runtime = storage.read_runtime(&self.runtime).await?;
tracing::trace!("read runtime from storage repo");

let mut owned = spfs::runtime::OwnedRuntime::upgrade_as_monitor(runtime).await?;
tracing::trace!("upgraded to owned runtime, waiting for empty runtime");

let fut = spfs::monitor::wait_for_empty_runtime(&owned, config);
let res = tokio::select! {
res = fut => {
Expand All @@ -178,18 +172,16 @@ impl CmdMonitor {
}
// we explicitly catch any signal related to interruption
// and will act by cleaning up the runtime early
_ = terminate.recv() => Err(spfs::Error::String("Terminate signal received, cleaning up runtime early".to_string())),
_ = interrupt.recv() => Err(spfs::Error::String("Interrupt signal received, cleaning up runtime early".to_string())),
_ = quit.recv() => Err(spfs::Error::String("Quit signal received, cleaning up runtime early".to_string())),
_ = signal_future => Err(spfs::Error::String("Signal received, cleaning up runtime early".to_string())),
};
tracing::trace!("runtime empty of processes ");

// need to reload the runtime here to get any changes made to
// the runtime while it was running so we don't blast them the
// next time this process saves the runtime state.
tracing::trace!("reloading runtime data before cleanup");
owned.reload_state_from_storage().await?;

// try to set the running to false to make this
// runtime easier to identify as safe to delete
// if the automatic cleanup fails. Any error
Expand All @@ -198,12 +190,12 @@ impl CmdMonitor {
if let Err(err) = owned.save_state_to_storage().await {
tracing::error!("failed to save runtime: {err:?}");
}

tracing::trace!("tearing down and exiting");
if let Err(err) = spfs::exit_runtime(&owned).await {
tracing::error!("failed to tear down runtime: {err:?}");
}

tracing::trace!(
"{} runtime data",
if owned.is_durable() {
Expand All @@ -224,7 +216,7 @@ impl CmdMonitor {
} else if let Err(err) = owned.delete().await {
tracing::error!("failed to clean up runtime data: {err:?}")
}

res?;
Ok(0)
}
Expand Down
67 changes: 67 additions & 0 deletions crates/spfs-cli/cmd-monitor/src/signal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use futures::future::Future;
use std::pin::Pin;
use spfs::Error;

pub trait SignalHandler {
fn build_signal_future() -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
}

#[cfg(unix)]
pub mod unix_signal_handler {
use super::*;
use tokio::signal::unix::{signal, SignalKind};

pub struct UnixSignalHandler;

impl SignalHandler for UnixSignalHandler {
fn build_signal_future() -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> {
Box::pin(async move {
let mut interrupt = signal(SignalKind::interrupt())
.map_err(|err| Error::process_spawn_error("signal()", err, None))?;
let mut quit = signal(SignalKind::quit())
.map_err(|err| Error::process_spawn_error("signal()", err, None))?;
let mut terminate = signal(SignalKind::terminate())
.map_err(|err| Error::process_spawn_error("signal()", err, None))?;

futures::future::select_all(vec![
Box::pin(interrupt.recv()),
Box::pin(quit.recv()),
Box::pin(terminate.recv()),
])
.await;

Ok(())
})
}
}
}

#[cfg(windows)]
pub mod windows_signal_handler {
use super::*;
use tokio::signal::ctrl_c;

pub struct WindowsSignalHandler;

impl SignalHandler for WindowsSignalHandler {
fn build_signal_future() -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> {
Box::pin(async move {
let mut interrupt = ctrl_c()
.map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?;
let mut quit = ctrl_c()
.map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?;
let mut terminate = ctrl_c()
.map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?;

futures::future::select_all(vec![
Box::pin(interrupt),
Box::pin(quit),
Box::pin(terminate),
])
.await;

Ok(())
})
}
}
}

0 comments on commit ee4331d

Please sign in to comment.