Skip to content

Commit

Permalink
feat(torture): integrate with trickfs
Browse files Browse the repository at this point in the history
and as the first thing to test, just toggle on the ENOSPC
from run to run.
  • Loading branch information
pepyakin committed Jan 31, 2025
1 parent 041938b commit 184fd76
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions torture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ hex = "0.4.3"
futures-util = "0.3.31"
clap = { version = "4.5.23", features = ["derive"] }

[target.'cfg(target_os = "linux")'.dependencies]
trickfs = { path = "../trickfs" }
49 changes: 35 additions & 14 deletions torture/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tracing::trace;

use crate::message::{
self, CommitPayload, Envelope, InitPayload, KeyValueChange, RollbackPayload, ToAgent,
ToSupervisor, MAX_ENVELOPE_SIZE,
self, CommitOutcome, CommitPayload, Envelope, InitPayload, KeyValueChange, RollbackPayload,
ToAgent, ToSupervisor, MAX_ENVELOPE_SIZE,
};

/// The entrypoint for the agent.
Expand Down Expand Up @@ -77,13 +77,13 @@ pub async fn run(input: UnixStream) -> Result<()> {
should_crash: None,
}) => {
let start = std::time::Instant::now();
agent.commit(changeset).await?;
let outcome = agent.commit(changeset).await;
let elapsed = start.elapsed();
tracing::info!("commit took {}ms", elapsed.as_millis());
stream
.send(Envelope {
reqno,
message: ToSupervisor::CommitSuccessful(elapsed),
message: ToSupervisor::CommitOutcome { elapsed, outcome },
})
.await?;
}
Expand Down Expand Up @@ -221,7 +221,9 @@ async fn recv_init(stream: &mut Stream) -> Result<Agent> {

struct Agent {
workdir: PathBuf,
nomt: Nomt<Blake3Hasher>,
// We use `Option` here because we want to be able to recreate Nomt instance after it is
// being poisoned.
nomt: Option<Nomt<Blake3Hasher>>,
id: String,
bitbox_seed: [u8; 16],
}
Expand All @@ -245,14 +247,16 @@ impl Agent {
let nomt = Nomt::open(o)?;
Ok(Self {
workdir,
nomt,
nomt: Some(nomt),
id: init.id,
bitbox_seed: init.bitbox_seed,
})
}

async fn commit(&mut self, changeset: Vec<KeyValueChange>) -> Result<()> {
let session = self.nomt.begin_session(SessionParams::default());
async fn commit(&mut self, changeset: Vec<KeyValueChange>) -> CommitOutcome {
// UNWRAP: `nomt` is always `Some` except recreation.
let nomt = self.nomt.as_ref().unwrap();
let session = nomt.begin_session(SessionParams::default());
let mut actuals = Vec::with_capacity(changeset.len());
for change in changeset {
match change {
Expand All @@ -265,23 +269,40 @@ impl Agent {
}
}

tokio::task::block_in_place(|| session.finish(actuals).commit(&self.nomt))?;

Ok(())
match tokio::task::block_in_place(|| session.finish(actuals).commit(&nomt)) {
Ok(()) => CommitOutcome::Success,
Err(err) => {
// Classify error.
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
match io_err.raw_os_error() {
Some(errno) if errno == libc::ENOSPC => CommitOutcome::StorageFull,
_ => CommitOutcome::UnknownFailure,
}
} else {
CommitOutcome::UnknownFailure
}
}
}
}

async fn rollback(&mut self, n_commits: usize) -> Result<()> {
tokio::task::block_in_place(|| self.nomt.rollback(n_commits))?;
// UNWRAP: `nomt` is always `Some` except recreation.
let nomt = self.nomt.as_ref().unwrap();
tokio::task::block_in_place(|| nomt.rollback(n_commits))?;
Ok(())
}

fn query(&mut self, key: message::Key) -> Result<Option<message::Value>> {
let value = self.nomt.read(key)?;
// UNWRAP: `nomt` is always `Some` except recreation.
let nomt = self.nomt.as_ref().unwrap();
let value = nomt.read(key)?;
Ok(value)
}

fn query_sync_seqn(&mut self) -> u32 {
self.nomt.sync_seqn()
// UNWRAP: `nomt` is always `Some` except recreation.
let nomt = self.nomt.as_ref().unwrap();
nomt.sync_seqn()
}
}

Expand Down
20 changes: 18 additions & 2 deletions torture/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,29 @@ pub enum ToAgent {
GracefulShutdown,
}

/// Different outcomes of a commit operation.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum CommitOutcome {
/// The commit was successful.
Success,
/// The commit failed because the storage is full, dubbed ENOSPC.
StorageFull,
/// Some other failure occurred.
UnknownFailure,
}

/// Messages sent from the agent to the supervisor.
#[derive(Debug, Serialize, Deserialize)]
pub enum ToSupervisor {
/// A generic acknowledgment message.
Ack,
/// The response to a successful commit, it contains the elapsed time to perform the commit.
CommitSuccessful(Duration),
/// The response to a completed commit request.
CommitOutcome {
/// The time it took for the operation to complete.
elapsed: Duration,
/// The outcome of the operation.
outcome: CommitOutcome,
},
/// The response to a query for a key-value pair.
QueryValue(Option<Value>),
/// The response to a query for the current sequence number of the database.
Expand Down
7 changes: 7 additions & 0 deletions torture/src/supervisor/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,11 @@ pub struct WorkloadParams {
#[clap(default_value = "false")]
#[arg(long = "sample-snapshot")]
pub sample_snapshot: bool,

/// Whether to enable testing using the trickfs.
///
/// Supported on Linux only.
#[clap(default_value = "false")]
#[arg(long = "trickfs")]
pub trickfs: bool,
}
10 changes: 8 additions & 2 deletions torture/src/supervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use workload::Workload;
mod cli;
mod comms;
mod controller;
mod trickfs;
mod workload;

/// The entrypoint for the supervisor part of the program.
Expand Down Expand Up @@ -172,7 +173,7 @@ async fn run_workload(
.suffix(format!("-workload-{}", workload_id).as_str())
.tempdir()
.expect("Failed to create a temp dir");
let mut workload = Workload::new(seed, workdir, workload_params, workload_id);
let mut workload = Workload::new(seed, workdir, workload_params, workload_id)?;
let result = workload.run(cancel_token).await;
match result {
Ok(()) => Ok(None),
Expand Down Expand Up @@ -206,9 +207,14 @@ const NON_DETERMINISM_DISCLAIMER: &str = "torture is a non-deterministic fuzzer.
async fn control_loop(
cancel_token: CancellationToken,
seed: u64,
workload_params: WorkloadParams,
mut workload_params: WorkloadParams,
flag_num_limit: usize,
) -> Result<()> {
if workload_params.trickfs && !trickfs::is_supported() {
warn!("TrickFS is not supported on this system. Disabling.");
workload_params.trickfs = false;
}

let mut flags = Vec::new();
let mut workload_cnt = 0;
// TODO: Run workloads in parallel. Make the concurrency factor configurable.
Expand Down
27 changes: 27 additions & 0 deletions torture/src/supervisor/trickfs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// A wrapper needed to make it compile on non-Linux platforms.

cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
pub use trickfs::TrickHandle;

pub fn is_supported() -> bool {
true
}
} else {
pub struct TrickHandle;

impl TrickHandle {
pub fn set_trigger_enospc(&self, enabled: bool) {
let _ = enabled;
unimplemented!("TrickHandle::set_trigger_enospc");
}

pub fn unmount_and_join(self) {
}
}

pub fn is_supported() -> bool {
false
}
}
}
77 changes: 73 additions & 4 deletions torture/src/supervisor/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ struct Biases {
commit_crash: f64,
/// When executing a rollback this is the probability of causing it to crash.
rollback_crash: f64,
/// The probability of turning on the `ENOSPC` error.
enospc_on: f64,
/// The probability of turning off the `ENOSPC` error.
enospc_off: f64,
}

impl Biases {
Expand Down Expand Up @@ -72,6 +76,8 @@ impl Biases {
commit_crash: (commit_crash as f64) / 100.0,
rollback_crash: (rollback_crash as f64) / 100.0,
new_key_distribution,
enospc_on: 0.1,
enospc_off: 0.2,
}
}
}
Expand Down Expand Up @@ -235,6 +241,10 @@ impl WorkloadState {
pub struct Workload {
/// Working directory for this particular workload.
workdir: TempDir,
/// The handle to the trickfs FUSE FS.
///
/// `Some` until the workload is torn down.
trick_handle: Option<super::trickfs::TrickHandle>,
/// The currently spawned agent.
///
/// Initially `None`.
Expand All @@ -254,6 +264,8 @@ pub struct Workload {
ensure_changeset: bool,
/// Whether to ensure the correctness of the entire state after every crash or rollback.
ensure_snapshot: bool,
/// Whether the trickfs is currently configured to return `ENOSPC` errors for every write.
enabled_enospc: bool,
/// Whether to randomly sample the state after every crash or rollback.
sample_snapshot: bool,
/// The max number of commits involved in a rollback.
Expand All @@ -279,7 +291,7 @@ impl Workload {
workdir: TempDir,
workload_params: &WorkloadParams,
workload_id: u64,
) -> Self {
) -> anyhow::Result<Self> {
// TODO: Make the workload size configurable and more sophisticated.
//
// Right now the workload size is a synonym for the number of iterations. We probably
Expand All @@ -302,8 +314,19 @@ impl Workload {
workload_params.random_size,
);
let bitbox_seed = state.gen_bitbox_seed();
Self {

#[cfg(target_os = "linux")]
let trick_handle = workload_params
.trickfs
.then(|| trickfs::spawn_trick(&workdir.path()))
.transpose()?;

#[cfg(not(target_os = "linux"))]
let trick_handle = None;

Ok(Self {
workdir,
trick_handle,
agent: None,
iterations: workload_params.iterations,
state,
Expand All @@ -316,7 +339,8 @@ impl Workload {
sample_snapshot: workload_params.sample_snapshot,
max_rollback_commits: workload_params.max_rollback_commits,
scheduled_rollback: None,
}
enabled_enospc: false,
})
}

/// Run the workload.
Expand Down Expand Up @@ -359,6 +383,27 @@ impl Workload {
return Ok(());
}

if self.trick_handle.is_some() {
if self.enabled_enospc {
let should_turn_off = self.state.rng.gen_bool(self.state.biases.enospc_off);
if should_turn_off {
info!("unsetting ENOSPC");
self.enabled_enospc = false;
self.trick_handle
.as_ref()
.unwrap()
.set_trigger_enospc(false);
}
} else {
let should_turn_on = self.state.rng.gen_bool(self.state.biases.enospc_on);
if should_turn_on {
info!("setting ENOSPC");
self.enabled_enospc = true;
self.trick_handle.as_ref().unwrap().set_trigger_enospc(true);
}
}
}

// Do not schedule new rollbacks if they are already scheduled.
let is_rollback_scheduled = self.scheduled_rollback.is_some();
if !is_rollback_scheduled && self.state.rng.gen_bool(self.state.biases.rollback) {
Expand All @@ -381,7 +426,7 @@ impl Workload {
/// Commit a changeset.
async fn exercise_commit(&mut self, rr: &comms::RequestResponse) -> anyhow::Result<()> {
let (snapshot, changeset) = self.state.gen_commit();
let ToSupervisor::CommitSuccessful(elapsed) = rr
let ToSupervisor::CommitOutcome { elapsed, outcome } = rr
.send_request(crate::message::ToAgent::Commit(
crate::message::CommitPayload {
changeset: changeset.clone(),
Expand All @@ -393,6 +438,25 @@ impl Workload {
return Err(anyhow::anyhow!("Commit did not execute successfully"));
};

if self.enabled_enospc {
// TODO: the same handling should be extended to the rollback.

// If we are in the `ENOSPC` mode, the commit should have failed.
if !matches!(outcome, crate::message::CommitOutcome::StorageFull) {
return Err(anyhow::anyhow!("Commit did not return ENOSPC"));
}

let agent_sync_seqn = rr.send_query_sync_seqn().await?;
if self.state.committed.sync_seqn != agent_sync_seqn {
return Err(anyhow::anyhow!(
"Unexpected sync_seqn after failed commit with ENOSPC"
));
}
self.ensure_changeset_reverted(rr, &changeset).await?;

return Ok(());
}

self.n_successfull_commit += 1;
self.tot_commit_time += elapsed;

Expand Down Expand Up @@ -750,6 +814,11 @@ impl Workload {
if let Some(agent) = self.agent.take() {
agent.teardown().await;
}
if let Some(trick_handle) = self.trick_handle.take() {
tokio::task::block_in_place(move || {
trick_handle.unmount_and_join();
});
}
}

/// Return the working directory.
Expand Down

0 comments on commit 184fd76

Please sign in to comment.