From c3d28ba976588b6130bb732a1d2871adb98c703a Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Mon, 20 Jan 2025 16:44:42 +0000 Subject: [PATCH] feat(torture): integrate with trickfs and as the first thing to test, just toggle on the ENOSPC from run to run. --- Cargo.lock | 1 + torture/Cargo.toml | 2 + torture/src/agent.rs | 49 +++++++++++++------ torture/src/message.rs | 20 +++++++- torture/src/supervisor/cli.rs | 7 +++ torture/src/supervisor/mod.rs | 10 +++- torture/src/supervisor/trickfs.rs | 27 +++++++++++ torture/src/supervisor/workload.rs | 77 ++++++++++++++++++++++++++++-- 8 files changed, 171 insertions(+), 22 deletions(-) create mode 100644 torture/src/supervisor/trickfs.rs diff --git a/Cargo.lock b/Cargo.lock index 9a9be6bf..b567a1b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1818,6 +1818,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "trickfs", ] [[package]] diff --git a/torture/Cargo.toml b/torture/Cargo.toml index 78728e5f..714908bc 100644 --- a/torture/Cargo.toml +++ b/torture/Cargo.toml @@ -26,3 +26,5 @@ hex = "0.4.3" futures-util = "0.3.31" clap = { version = "4.5.23", features = ["derive"] } +[target.'cfg(target_os = "linux")'.dependencies] +trickfs = { path = "../trickfs" } diff --git a/torture/src/agent.rs b/torture/src/agent.rs index f7230602..4f00f75b 100644 --- a/torture/src/agent.rs +++ b/torture/src/agent.rs @@ -17,8 +17,8 @@ use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use tracing::trace; use crate::message::{ - self, CommitPayload, Envelope, InitPayload, KeyValueChange, RollbackPayload, ToAgent, - ToSupervisor, MAX_ENVELOPE_SIZE, + self, CommitOutcome, CommitPayload, Envelope, InitPayload, KeyValueChange, RollbackPayload, + ToAgent, ToSupervisor, MAX_ENVELOPE_SIZE, }; /// The entrypoint for the agent. @@ -77,13 +77,13 @@ pub async fn run(input: UnixStream) -> Result<()> { should_crash: None, }) => { let start = std::time::Instant::now(); - agent.commit(changeset).await?; + let outcome = agent.commit(changeset).await; let elapsed = start.elapsed(); tracing::info!("commit took {}ms", elapsed.as_millis()); stream .send(Envelope { reqno, - message: ToSupervisor::CommitSuccessful(elapsed), + message: ToSupervisor::CommitOutcome { elapsed, outcome }, }) .await?; } @@ -221,7 +221,9 @@ async fn recv_init(stream: &mut Stream) -> Result { struct Agent { workdir: PathBuf, - nomt: Nomt, + // We use `Option` here because we want to be able to recreate Nomt instance after it is + // being poisoned. + nomt: Option>, id: String, bitbox_seed: [u8; 16], } @@ -245,14 +247,16 @@ impl Agent { let nomt = Nomt::open(o)?; Ok(Self { workdir, - nomt, + nomt: Some(nomt), id: init.id, bitbox_seed: init.bitbox_seed, }) } - async fn commit(&mut self, changeset: Vec) -> Result<()> { - let session = self.nomt.begin_session(SessionParams::default()); + async fn commit(&mut self, changeset: Vec) -> CommitOutcome { + // UNWRAP: `nomt` is always `Some` except recreation. + let nomt = self.nomt.as_ref().unwrap(); + let session = nomt.begin_session(SessionParams::default()); let mut actuals = Vec::with_capacity(changeset.len()); for change in changeset { match change { @@ -265,23 +269,40 @@ impl Agent { } } - tokio::task::block_in_place(|| session.finish(actuals).commit(&self.nomt))?; - - Ok(()) + match tokio::task::block_in_place(|| session.finish(actuals).commit(&nomt)) { + Ok(()) => CommitOutcome::Success, + Err(err) => { + // Classify error. + if let Some(io_err) = err.downcast_ref::() { + match io_err.raw_os_error() { + Some(errno) if errno == libc::ENOSPC => CommitOutcome::StorageFull, + _ => CommitOutcome::UnknownFailure, + } + } else { + CommitOutcome::UnknownFailure + } + } + } } async fn rollback(&mut self, n_commits: usize) -> Result<()> { - tokio::task::block_in_place(|| self.nomt.rollback(n_commits))?; + // UNWRAP: `nomt` is always `Some` except recreation. + let nomt = self.nomt.as_ref().unwrap(); + tokio::task::block_in_place(|| nomt.rollback(n_commits))?; Ok(()) } fn query(&mut self, key: message::Key) -> Result> { - let value = self.nomt.read(key)?; + // UNWRAP: `nomt` is always `Some` except recreation. + let nomt = self.nomt.as_ref().unwrap(); + let value = nomt.read(key)?; Ok(value) } fn query_sync_seqn(&mut self) -> u32 { - self.nomt.sync_seqn() + // UNWRAP: `nomt` is always `Some` except recreation. + let nomt = self.nomt.as_ref().unwrap(); + nomt.sync_seqn() } } diff --git a/torture/src/message.rs b/torture/src/message.rs index 1e188d06..576e1a1a 100644 --- a/torture/src/message.rs +++ b/torture/src/message.rs @@ -112,13 +112,29 @@ pub enum ToAgent { GracefulShutdown, } +/// Different outcomes of a commit operation. +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum CommitOutcome { + /// The commit was successful. + Success, + /// The commit failed because the storage is full, dubbed ENOSPC. + StorageFull, + /// Some other failure occurred. + UnknownFailure, +} + /// Messages sent from the agent to the supervisor. #[derive(Debug, Serialize, Deserialize)] pub enum ToSupervisor { /// A generic acknowledgment message. Ack, - /// The response to a successful commit, it contains the elapsed time to perform the commit. - CommitSuccessful(Duration), + /// The response to a completed commit request. + CommitOutcome { + /// The time it took for the operation to complete. + elapsed: Duration, + /// The outcome of the operation. + outcome: CommitOutcome, + }, /// The response to a query for a key-value pair. QueryValue(Option), /// The response to a query for the current sequence number of the database. diff --git a/torture/src/supervisor/cli.rs b/torture/src/supervisor/cli.rs index 0bd2b366..afc1c088 100644 --- a/torture/src/supervisor/cli.rs +++ b/torture/src/supervisor/cli.rs @@ -110,4 +110,11 @@ pub struct WorkloadParams { #[clap(default_value = "false")] #[arg(long = "sample-snapshot")] pub sample_snapshot: bool, + + /// Whether to enable testing using the trickfs. + /// + /// Supported on Linux only. + #[clap(default_value = "false")] + #[arg(long = "trickfs")] + pub trickfs: bool, } diff --git a/torture/src/supervisor/mod.rs b/torture/src/supervisor/mod.rs index 6262f95e..b8e64bd8 100644 --- a/torture/src/supervisor/mod.rs +++ b/torture/src/supervisor/mod.rs @@ -17,6 +17,7 @@ use workload::Workload; mod cli; mod comms; mod controller; +mod trickfs; mod workload; /// The entrypoint for the supervisor part of the program. @@ -172,7 +173,7 @@ async fn run_workload( .suffix(format!("-workload-{}", workload_id).as_str()) .tempdir() .expect("Failed to create a temp dir"); - let mut workload = Workload::new(seed, workdir, workload_params, workload_id); + let mut workload = Workload::new(seed, workdir, workload_params, workload_id)?; let result = workload.run(cancel_token).await; match result { Ok(()) => Ok(None), @@ -206,9 +207,14 @@ const NON_DETERMINISM_DISCLAIMER: &str = "torture is a non-deterministic fuzzer. async fn control_loop( cancel_token: CancellationToken, seed: u64, - workload_params: WorkloadParams, + mut workload_params: WorkloadParams, flag_num_limit: usize, ) -> Result<()> { + if workload_params.trickfs && !trickfs::is_supported() { + warn!("TrickFS is not supported on this system. Disabling."); + workload_params.trickfs = false; + } + let mut flags = Vec::new(); let mut workload_cnt = 0; // TODO: Run workloads in parallel. Make the concurrency factor configurable. diff --git a/torture/src/supervisor/trickfs.rs b/torture/src/supervisor/trickfs.rs new file mode 100644 index 00000000..fa55f818 --- /dev/null +++ b/torture/src/supervisor/trickfs.rs @@ -0,0 +1,27 @@ +// A wrapper needed to make it compile on non-Linux platforms. + +cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + pub use trickfs::TrickHandle; + + pub fn is_supported() -> bool { + true + } + } else { + pub struct TrickHandle; + + impl TrickHandle { + pub fn set_trigger_enospc(&self, enabled: bool) { + let _ = enabled; + unimplemented!("TrickHandle::set_trigger_enospc"); + } + + pub fn unmount_and_join(self) { + } + } + + pub fn is_supported() -> bool { + false + } + } +} diff --git a/torture/src/supervisor/workload.rs b/torture/src/supervisor/workload.rs index ef533f55..783d18ae 100644 --- a/torture/src/supervisor/workload.rs +++ b/torture/src/supervisor/workload.rs @@ -37,6 +37,10 @@ struct Biases { commit_crash: f64, /// When executing a rollback this is the probability of causing it to crash. rollback_crash: f64, + /// The probability of turning on the `ENOSPC` error. + enospc_on: f64, + /// The probability of turning off the `ENOSPC` error. + enospc_off: f64, } impl Biases { @@ -72,6 +76,8 @@ impl Biases { commit_crash: (commit_crash as f64) / 100.0, rollback_crash: (rollback_crash as f64) / 100.0, new_key_distribution, + enospc_on: 0.1, + enospc_off: 0.2, } } } @@ -235,6 +241,10 @@ impl WorkloadState { pub struct Workload { /// Working directory for this particular workload. workdir: TempDir, + /// The handle to the trickfs FUSE FS. + /// + /// `Some` until the workload is torn down. + trick_handle: Option, /// The currently spawned agent. /// /// Initially `None`. @@ -254,6 +264,8 @@ pub struct Workload { ensure_changeset: bool, /// Whether to ensure the correctness of the entire state after every crash or rollback. ensure_snapshot: bool, + /// Whether the trickfs is currently configured to return `ENOSPC` errors for every write. + enabled_enospc: bool, /// Whether to randomly sample the state after every crash or rollback. sample_snapshot: bool, /// The max number of commits involved in a rollback. @@ -279,7 +291,7 @@ impl Workload { workdir: TempDir, workload_params: &WorkloadParams, workload_id: u64, - ) -> Self { + ) -> anyhow::Result { // TODO: Make the workload size configurable and more sophisticated. // // Right now the workload size is a synonym for the number of iterations. We probably @@ -302,8 +314,19 @@ impl Workload { workload_params.random_size, ); let bitbox_seed = state.gen_bitbox_seed(); - Self { + + #[cfg(target_os = "linux")] + let trick_handle = workload_params + .trickfs + .then(|| trickfs::spawn_trick(&workdir.path())) + .transpose()?; + + #[cfg(not(target_os = "linux"))] + let trick_handle = None; + + Ok(Self { workdir, + trick_handle, agent: None, iterations: workload_params.iterations, state, @@ -316,7 +339,8 @@ impl Workload { sample_snapshot: workload_params.sample_snapshot, max_rollback_commits: workload_params.max_rollback_commits, scheduled_rollback: None, - } + enabled_enospc: false, + }) } /// Run the workload. @@ -359,6 +383,27 @@ impl Workload { return Ok(()); } + if self.trick_handle.is_some() { + if self.enabled_enospc { + let should_turn_off = self.state.rng.gen_bool(self.state.biases.enospc_off); + if should_turn_off { + info!("unsetting ENOSPC"); + self.enabled_enospc = false; + self.trick_handle + .as_ref() + .unwrap() + .set_trigger_enospc(false); + } + } else { + let should_turn_on = self.state.rng.gen_bool(self.state.biases.enospc_on); + if should_turn_on { + info!("setting ENOSPC"); + self.enabled_enospc = true; + self.trick_handle.as_ref().unwrap().set_trigger_enospc(true); + } + } + } + // Do not schedule new rollbacks if they are already scheduled. let is_rollback_scheduled = self.scheduled_rollback.is_some(); if !is_rollback_scheduled && self.state.rng.gen_bool(self.state.biases.rollback) { @@ -381,7 +426,7 @@ impl Workload { /// Commit a changeset. async fn exercise_commit(&mut self, rr: &comms::RequestResponse) -> anyhow::Result<()> { let (snapshot, changeset) = self.state.gen_commit(); - let ToSupervisor::CommitSuccessful(elapsed) = rr + let ToSupervisor::CommitOutcome { elapsed, outcome } = rr .send_request(crate::message::ToAgent::Commit( crate::message::CommitPayload { changeset: changeset.clone(), @@ -393,6 +438,25 @@ impl Workload { return Err(anyhow::anyhow!("Commit did not execute successfully")); }; + if self.enabled_enospc { + // TODO: the same handling should be extended to the rollback. + + // If we are in the `ENOSPC` mode, the commit should have failed. + if !matches!(outcome, crate::message::CommitOutcome::StorageFull) { + return Err(anyhow::anyhow!("Commit did not return ENOSPC")); + } + + let agent_sync_seqn = rr.send_query_sync_seqn().await?; + if self.state.committed.sync_seqn != agent_sync_seqn { + return Err(anyhow::anyhow!( + "Unexpected sync_seqn after failed commit with ENOSPC" + )); + } + self.ensure_changeset_reverted(rr, &changeset).await?; + + return Ok(()); + } + self.n_successfull_commit += 1; self.tot_commit_time += elapsed; @@ -750,6 +814,11 @@ impl Workload { if let Some(agent) = self.agent.take() { agent.teardown().await; } + if let Some(trick_handle) = self.trick_handle.take() { + tokio::task::block_in_place(move || { + trick_handle.unmount_and_join(); + }); + } } /// Return the working directory.