Skip to content

Commit

Permalink
Add a heartbeat between spfs-monitor and spfs-fuse
Browse files Browse the repository at this point in the history
As mentioned in #895, there are cases where spfs-fuse never shuts down.
An easy way to repro this is to `kill -9` the spfs-monitor process, so
it doesn't get a chance to clean up the runtime normally.

We observe spfs-fuse processes accumulating over time on our CI runners.
It could be from users canceling pipelines (maybe the runner does a
`kill -9` of all the child processes?). It is also extremely common for
zombie runtimes to accumulate over time (not just on CI runners), perhaps
spfs-monitor is crashing / failing to cleanup commonly?

This is a proof of concept and enabling this or tuning the timing of the
heartbeat and timeout should be a configurable thing but these are hard
coded for now.

Signed-off-by: J Robert Ray <jrray@jrray.org>
  • Loading branch information
jrray committed Aug 21, 2024
1 parent 8151a88 commit f53ccfc
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 14 deletions.
74 changes: 62 additions & 12 deletions crates/spfs-cli/cmd-fuse/src/cmd_fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
// https://github.com/spkenv/spk

use std::sync::Arc;

use clap::Parser;
use fuser::MountOption;
use miette::{bail, miette, Context, IntoDiagnostic, Result};
Expand Down Expand Up @@ -198,13 +200,10 @@ impl CmdFuse {
// introspected and unmounted by other parts of spfs such as the monitor
tracing::debug!("Establishing fuse session...");
let mount_opts = opts.mount_options.iter().cloned().collect::<Vec<_>>();
let mut session = fuser::Session::new(
Session::new(self.reference.clone(), opts.clone()),
&mountpoint,
&mount_opts,
)
.into_diagnostic()
.wrap_err("Failed to create a FUSE session")?;
let session = Session::new(self.reference.clone(), opts.clone());
let mut fuser_session = fuser::Session::new(session.clone(), &mountpoint, &mount_opts)
.into_diagnostic()
.wrap_err("Failed to create a FUSE session")?;

if opts.gid != calling_gid {
nix::unistd::setgid(opts.gid)
Expand Down Expand Up @@ -250,16 +249,38 @@ impl CmdFuse {
let mut interrupt = signal(SignalKind::interrupt()).into_diagnostic().wrap_err("interrupt signal handler")?;
let mut quit = signal(SignalKind::quit()).into_diagnostic().wrap_err("quit signal handler")?;
let mut terminate = signal(SignalKind::terminate()).into_diagnostic().wrap_err("terminate signal handler")?;
let (heartbeat_send, mut heartbeat_recv) = tokio::sync::mpsc::channel(1);

tracing::info!("Starting FUSE filesystem");
// Although the filesystem could run in the current thread, we prefer to
// create a blocking future that can move into tokio and be managed/scheduled
// as desired, otherwise this thread will block and may affect the runtime
// operation unpredictably
let join_handle = tokio::task::spawn_blocking(move || session.run());
let abort_handle = join_handle.abort_handle();
let unmount_callable = Arc::new(std::sync::Mutex::new(fuser_session.unmount_callable()));
let mut join_handle = tokio::task::spawn_blocking(move || fuser_session.run());

let heartbeat_monitor =
tokio::task::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));

loop {
interval.tick().await;
let seconds = session.seconds_since_last_heartbeat();
tracing::trace!(seconds_since_last_heartbeat = ?seconds, "heartbeat monitor");
if seconds >= 300 {
tracing::warn!("loss of heartbeat, shutting down filesystem");
// XXX: Calling unmount here has no apparent effect!
heartbeat_send.send(()).await.into_diagnostic().wrap_err("Failed to send unmount signal")?;
break;
}
}

Ok::<_, miette::Report>(())
});

let mut heartbeat_failed = false;
let res = tokio::select!{
res = join_handle => {
res = &mut join_handle => {
tracing::info!("Filesystem shutting down");
res.into_diagnostic().wrap_err("FUSE session failed")
}
Expand All @@ -268,11 +289,40 @@ impl CmdFuse {
_ = terminate.recv() => Err(miette!("Terminate signal received, filesystem shutting down")),
_ = interrupt.recv() => Err(miette!("Interrupt signal received, filesystem shutting down")),
_ = quit.recv() => Err(miette!("Quit signal received, filesystem shutting down")),
_ = heartbeat_recv.recv() => {
heartbeat_failed = true;
Err(miette!("Heartbeat monitor triggered, filesystem shutting down"))
}
};
// the filesystem task must be fully terminated in order for the subsequent unmount

heartbeat_monitor.abort_handle().abort();
if let Err(err) = heartbeat_monitor.await {
tracing::warn!("Heartbeat monitor failed: {err:?}");
}

// The filesystem task must be fully terminated in order for the subsequent unmount
// process to function. Otherwise, the background task will keep this process alive
// forever.
abort_handle.abort();
//
// Exception: if spfs-monitor died without cleaning up the runtime,
// fuser::Session::run will not exit and there's no way to tell it
// to break out of its loop. When the fuse filesystem is included
// in the lowerdir of an overlayfs, even though `fusermount -u`
// appears to succeed, and the mount disappears from /etc/mtab,
// spfs-fuse stays alive and still functions when accessed via the
// overlayfs. As a last-ditch effort to avoid leaving a spfs-fuse
// process running forever, we will return without attempting any
// cleanup.
if heartbeat_failed {
return res;
}
if !join_handle.is_finished() {
// XXX: Calling unmount has no apparent effect!
unmount_callable.lock().unwrap().unmount().into_diagnostic().wrap_err("FUSE unmount failed")?;
tracing::trace!("Joining FUSE session");
join_handle.await.into_diagnostic().wrap_err("FUSE join_handle await failed")?.into_diagnostic().wrap_err("FUSE session failed after unmount")?;
tracing::trace!("FUSE session joined");
}
res
});

Expand Down
8 changes: 6 additions & 2 deletions crates/spfs-cli/common/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ pub struct Logging {
#[clap(skip)]
pub syslog: bool,

/// Enables timestamp in logging
/// Enables timestamp in logging (always enabled in file log)
#[clap(long, global = true, env = "SPFS_LOG_TIMESTAMP")]
pub timestamp: bool,
}
Expand Down Expand Up @@ -403,7 +403,11 @@ impl Logging {
})
.map(|log_file| {
let layer = fmt_layer().with_writer(log_file);
let layer = configure_timestamp!(layer, self.timestamp).with_filter(env_filter());
let layer = configure_timestamp!(layer, {
// file logs should always have a timestamp (fight me!)
true
})
.with_filter(env_filter());
without_sentry_target!(layer)
});

Expand Down
25 changes: 25 additions & 0 deletions crates/spfs-vfs/src/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::ffi::{OsStr, OsString};
use std::io::{Seek, SeekFrom};
use std::mem::ManuallyDrop;
use std::os::fd::{AsRawFd, FromRawFd};
use std::os::unix::ffi::OsStrExt;
use std::os::unix::prelude::FileExt;
#[cfg(feature = "fuse-backend-abi-7-31")]
use std::pin::Pin;
Expand Down Expand Up @@ -686,6 +687,7 @@ impl Filesystem {
/// This implements the [`fuser::Filesystem`] trait, receives
/// all requests and arranges for their async execution in the
/// spfs virtual filesystem.
#[derive(Clone)]
pub struct Session {
inner: Arc<SessionInner>,
}
Expand All @@ -694,20 +696,35 @@ impl Session {
/// Construct a new session which serves the provided reference
/// in its filesystem
pub fn new(reference: EnvSpec, opts: Config) -> Self {
let session_start = tokio::time::Instant::now();

Self {
inner: Arc::new(SessionInner {
opts,
reference,
fs: tokio::sync::OnceCell::new(),
session_start,
last_heartbeat_seconds_since_session_start: AtomicU64::new(0),
}),
}
}

/// Return the number of seconds since the last heartbeat was received
pub fn seconds_since_last_heartbeat(&self) -> u64 {
self.inner.session_start.elapsed().as_secs()
- self
.inner
.last_heartbeat_seconds_since_session_start
.load(Ordering::Relaxed)
}
}

struct SessionInner {
opts: Config,
reference: EnvSpec,
fs: tokio::sync::OnceCell<Arc<Filesystem>>,
session_start: tokio::time::Instant,
last_heartbeat_seconds_since_session_start: AtomicU64,
}

impl SessionInner {
Expand Down Expand Up @@ -790,6 +807,14 @@ impl fuser::Filesystem for Session {
}

fn lookup(&mut self, _req: &Request<'_>, parent: u64, name: &OsStr, reply: ReplyEntry) {
if name.as_bytes().starts_with(b".fuse-heartbeat") {
let seconds_since_session_start = self.inner.session_start.elapsed().as_secs();
tracing::trace!(?seconds_since_session_start, "heard heartbeat");
self.inner
.last_heartbeat_seconds_since_session_start
.store(seconds_since_session_start, Ordering::Relaxed);
}

let name = name.to_owned();
let session = Arc::clone(&self.inner);
tokio::task::spawn(async move {
Expand Down
17 changes: 17 additions & 0 deletions crates/spfs/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ pub async fn wait_for_empty_runtime(rt: &runtime::Runtime) -> Result<()> {
const LOG_UPDATE_INTERVAL: tokio::time::Duration = tokio::time::Duration::from_secs(5);
let mut log_update_deadline = tokio::time::Instant::now() + LOG_UPDATE_INTERVAL;

const SPFS_HEARTBEAT_INTERVAL: tokio::time::Duration = tokio::time::Duration::from_secs(60);
let mut spfs_heartbeat_deadline = tokio::time::Instant::now() + SPFS_HEARTBEAT_INTERVAL;

let rt_is_fuse = rt.is_backend_fuse();

while let Some(event) = events_stream.next().await {
let no_more_processes = tracked_processes.is_empty();

Expand All @@ -307,6 +312,18 @@ pub async fn wait_for_empty_runtime(rt: &runtime::Runtime) -> Result<()> {
log_update_deadline = now + LOG_UPDATE_INTERVAL;
}

if rt_is_fuse && now >= spfs_heartbeat_deadline {
// Tickle the spfs filesystem to let `spfs-fuse` know we're still
// alive. This is a read operation to avoid issues with ro mounts
// or modifying any content in /spfs.
// The filename has a unique component to avoid any caching.
let _ =
tokio::fs::symlink_metadata(format!("/spfs/.fuse-heartbeat-{}", ulid::Ulid::new()))
.await;

spfs_heartbeat_deadline = now + SPFS_HEARTBEAT_INTERVAL;
}

if no_more_processes {
// If the mount namespace is known, verify there really aren't any
// more processes in the namespace. Since the polling might not deliver
Expand Down
21 changes: 21 additions & 0 deletions crates/spfs/src/runtime/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ impl Config {
self.csh_startup_file = root.join(Self::CSH_STARTUP_FILE);
self.runtime_dir = Some(root);
}

pub fn is_backend_fuse(&self) -> bool {
self.mount_backend.is_fuse()
}
}

/// Identifies a filesystem backend for spfs
Expand Down Expand Up @@ -510,6 +514,15 @@ impl MountBackend {
matches!(self, Self::WinFsp)
}

pub fn is_fuse(&self) -> bool {
match self {
MountBackend::OverlayFsWithRenders => false,
MountBackend::OverlayFsWithFuse => true,
MountBackend::FuseOnly => true,
MountBackend::WinFsp => false,
}
}

/// Reports whether this mount backend requires that all
/// data be synced to the local repository before being executed
pub fn requires_localization(&self) -> bool {
Expand Down Expand Up @@ -554,6 +567,10 @@ impl Data {
&self.config.upper_dir
}

pub fn is_backend_fuse(&self) -> bool {
self.config.is_backend_fuse()
}

/// Whether to keep the runtime when the process is exits
pub fn is_durable(&self) -> bool {
self.config.durable
Expand Down Expand Up @@ -693,6 +710,10 @@ impl Runtime {
&self.storage
}

pub fn is_backend_fuse(&self) -> bool {
self.data.is_backend_fuse()
}

pub fn is_durable(&self) -> bool {
self.data.is_durable()
}
Expand Down
1 change: 1 addition & 0 deletions cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@
"MRTM",
"msvc",
"MSVC",
"mtab",
"mthreads",
"multilib",
"MWXAJUULAJMFTGZPODQ",
Expand Down

0 comments on commit f53ccfc

Please sign in to comment.