Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torture): integrate with trickfs #768

Open
wants to merge 1 commit into
base: pep-trickfs-enospc
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions torture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ hex = "0.4.3"
futures-util = "0.3.31"
clap = { version = "4.5.23", features = ["derive"] }

[target.'cfg(target_os = "linux")'.dependencies]
trickfs = { path = "../trickfs" }
49 changes: 35 additions & 14 deletions torture/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tracing::trace;

use crate::message::{
self, CommitPayload, Envelope, InitPayload, KeyValueChange, RollbackPayload, ToAgent,
ToSupervisor, MAX_ENVELOPE_SIZE,
self, CommitOutcome, CommitPayload, Envelope, InitPayload, KeyValueChange, RollbackPayload,
ToAgent, ToSupervisor, MAX_ENVELOPE_SIZE,
};

/// The entrypoint for the agent.
Expand Down Expand Up @@ -77,13 +77,13 @@ pub async fn run(input: UnixStream) -> Result<()> {
should_crash: None,
}) => {
let start = std::time::Instant::now();
agent.commit(changeset).await?;
let outcome = agent.commit(changeset).await;
let elapsed = start.elapsed();
tracing::info!("commit took {}ms", elapsed.as_millis());
stream
.send(Envelope {
reqno,
message: ToSupervisor::CommitSuccessful(elapsed),
message: ToSupervisor::CommitOutcome { elapsed, outcome },
})
.await?;
}
Expand Down Expand Up @@ -221,7 +221,9 @@ async fn recv_init(stream: &mut Stream) -> Result<Agent> {

struct Agent {
workdir: PathBuf,
nomt: Nomt<Blake3Hasher>,
// We use `Option` here because we want to be able to recreate Nomt instance after it is
// being poisoned.
nomt: Option<Nomt<Blake3Hasher>>,
id: String,
bitbox_seed: [u8; 16],
}
Expand All @@ -245,14 +247,16 @@ impl Agent {
let nomt = Nomt::open(o)?;
Ok(Self {
workdir,
nomt,
nomt: Some(nomt),
id: init.id,
bitbox_seed: init.bitbox_seed,
})
}

async fn commit(&mut self, changeset: Vec<KeyValueChange>) -> Result<()> {
let session = self.nomt.begin_session(SessionParams::default());
async fn commit(&mut self, changeset: Vec<KeyValueChange>) -> CommitOutcome {
// UNWRAP: `nomt` is always `Some` except recreation.
let nomt = self.nomt.as_ref().unwrap();
let session = nomt.begin_session(SessionParams::default());
let mut actuals = Vec::with_capacity(changeset.len());
for change in changeset {
match change {
Expand All @@ -265,23 +269,40 @@ impl Agent {
}
}

tokio::task::block_in_place(|| session.finish(actuals).commit(&self.nomt))?;

Ok(())
match tokio::task::block_in_place(|| session.finish(actuals).commit(&nomt)) {
Ok(()) => CommitOutcome::Success,
Err(err) => {
// Classify error.
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
match io_err.raw_os_error() {
Some(errno) if errno == libc::ENOSPC => CommitOutcome::StorageFull,
_ => CommitOutcome::UnknownFailure,
}
} else {
CommitOutcome::UnknownFailure
}
}
}
}

async fn rollback(&mut self, n_commits: usize) -> Result<()> {
tokio::task::block_in_place(|| self.nomt.rollback(n_commits))?;
// UNWRAP: `nomt` is always `Some` except recreation.
let nomt = self.nomt.as_ref().unwrap();
tokio::task::block_in_place(|| nomt.rollback(n_commits))?;
Ok(())
}

fn query(&mut self, key: message::Key) -> Result<Option<message::Value>> {
let value = self.nomt.read(key)?;
// UNWRAP: `nomt` is always `Some` except recreation.
let nomt = self.nomt.as_ref().unwrap();
let value = nomt.read(key)?;
Ok(value)
}

fn query_sync_seqn(&mut self) -> u32 {
self.nomt.sync_seqn()
// UNWRAP: `nomt` is always `Some` except recreation.
let nomt = self.nomt.as_ref().unwrap();
nomt.sync_seqn()
}
}

Expand Down
20 changes: 18 additions & 2 deletions torture/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,29 @@ pub enum ToAgent {
GracefulShutdown,
}

/// Different outcomes of a commit operation.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum CommitOutcome {
/// The commit was successful.
Success,
/// The commit failed because the storage is full, dubbed ENOSPC.
StorageFull,
/// Some other failure occurred.
UnknownFailure,
}

/// Messages sent from the agent to the supervisor.
#[derive(Debug, Serialize, Deserialize)]
pub enum ToSupervisor {
/// A generic acknowledgment message.
Ack,
/// The response to a successful commit, it contains the elapsed time to perform the commit.
CommitSuccessful(Duration),
/// The response to a completed commit request.
CommitOutcome {
/// The time it took for the operation to complete.
elapsed: Duration,
/// The outcome of the operation.
outcome: CommitOutcome,
},
/// The response to a query for a key-value pair.
QueryValue(Option<Value>),
/// The response to a query for the current sequence number of the database.
Expand Down
7 changes: 7 additions & 0 deletions torture/src/supervisor/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,11 @@ pub struct WorkloadParams {
#[clap(default_value = "false")]
#[arg(long = "sample-snapshot")]
pub sample_snapshot: bool,

/// Whether to enable testing using the trickfs.
///
/// Supported on Linux only.
#[clap(default_value = "false")]
#[arg(long = "trickfs")]
pub trickfs: bool,
}
10 changes: 8 additions & 2 deletions torture/src/supervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use workload::Workload;
mod cli;
mod comms;
mod controller;
mod trickfs;
mod workload;

/// The entrypoint for the supervisor part of the program.
Expand Down Expand Up @@ -172,7 +173,7 @@ async fn run_workload(
.suffix(format!("-workload-{}", workload_id).as_str())
.tempdir()
.expect("Failed to create a temp dir");
let mut workload = Workload::new(seed, workdir, workload_params, workload_id);
let mut workload = Workload::new(seed, workdir, workload_params, workload_id)?;
let result = workload.run(cancel_token).await;
match result {
Ok(()) => Ok(None),
Expand Down Expand Up @@ -206,9 +207,14 @@ const NON_DETERMINISM_DISCLAIMER: &str = "torture is a non-deterministic fuzzer.
async fn control_loop(
cancel_token: CancellationToken,
seed: u64,
workload_params: WorkloadParams,
mut workload_params: WorkloadParams,
flag_num_limit: usize,
) -> Result<()> {
if workload_params.trickfs && !trickfs::is_supported() {
warn!("TrickFS is not supported on this system. Disabling.");
workload_params.trickfs = false;
}

let mut flags = Vec::new();
let mut workload_cnt = 0;
// TODO: Run workloads in parallel. Make the concurrency factor configurable.
Expand Down
27 changes: 27 additions & 0 deletions torture/src/supervisor/trickfs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// A wrapper needed to make it compile on non-Linux platforms.

cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
pub use trickfs::TrickHandle;

pub fn is_supported() -> bool {
true
}
} else {
pub struct TrickHandle;

impl TrickHandle {
pub fn set_trigger_enospc(&self, enabled: bool) {
let _ = enabled;
unimplemented!("TrickHandle::set_trigger_enospc");
}

pub fn unmount_and_join(self) {
}
}

pub fn is_supported() -> bool {
false
}
}
}
77 changes: 73 additions & 4 deletions torture/src/supervisor/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ struct Biases {
commit_crash: f64,
/// When executing a rollback this is the probability of causing it to crash.
rollback_crash: f64,
/// The probability of turning on the `ENOSPC` error.
enospc_on: f64,
/// The probability of turning off the `ENOSPC` error.
enospc_off: f64,
}

impl Biases {
Expand Down Expand Up @@ -72,6 +76,8 @@ impl Biases {
commit_crash: (commit_crash as f64) / 100.0,
rollback_crash: (rollback_crash as f64) / 100.0,
new_key_distribution,
enospc_on: 0.1,
enospc_off: 0.2,
}
}
}
Expand Down Expand Up @@ -235,6 +241,10 @@ impl WorkloadState {
pub struct Workload {
/// Working directory for this particular workload.
workdir: TempDir,
/// The handle to the trickfs FUSE FS.
///
/// `Some` until the workload is torn down.
trick_handle: Option<super::trickfs::TrickHandle>,
/// The currently spawned agent.
///
/// Initially `None`.
Expand All @@ -254,6 +264,8 @@ pub struct Workload {
ensure_changeset: bool,
/// Whether to ensure the correctness of the entire state after every crash or rollback.
ensure_snapshot: bool,
/// Whether the trickfs is currently configured to return `ENOSPC` errors for every write.
enabled_enospc: bool,
/// Whether to randomly sample the state after every crash or rollback.
sample_snapshot: bool,
/// The max number of commits involved in a rollback.
Expand All @@ -279,7 +291,7 @@ impl Workload {
workdir: TempDir,
workload_params: &WorkloadParams,
workload_id: u64,
) -> Self {
) -> anyhow::Result<Self> {
// TODO: Make the workload size configurable and more sophisticated.
//
// Right now the workload size is a synonym for the number of iterations. We probably
Expand All @@ -302,8 +314,19 @@ impl Workload {
workload_params.random_size,
);
let bitbox_seed = state.gen_bitbox_seed();
Self {

#[cfg(target_os = "linux")]
let trick_handle = workload_params
.trickfs
.then(|| trickfs::spawn_trick(&workdir.path()))
.transpose()?;

#[cfg(not(target_os = "linux"))]
let trick_handle = None;

Ok(Self {
workdir,
trick_handle,
agent: None,
iterations: workload_params.iterations,
state,
Expand All @@ -316,7 +339,8 @@ impl Workload {
sample_snapshot: workload_params.sample_snapshot,
max_rollback_commits: workload_params.max_rollback_commits,
scheduled_rollback: None,
}
enabled_enospc: false,
})
}

/// Run the workload.
Expand Down Expand Up @@ -359,6 +383,27 @@ impl Workload {
return Ok(());
}

if self.trick_handle.is_some() {
if self.enabled_enospc {
let should_turn_off = self.state.rng.gen_bool(self.state.biases.enospc_off);
if should_turn_off {
info!("unsetting ENOSPC");
self.enabled_enospc = false;
self.trick_handle
.as_ref()
.unwrap()
.set_trigger_enospc(false);
}
} else {
let should_turn_on = self.state.rng.gen_bool(self.state.biases.enospc_on);
if should_turn_on {
info!("setting ENOSPC");
self.enabled_enospc = true;
self.trick_handle.as_ref().unwrap().set_trigger_enospc(true);
}
}
}

// Do not schedule new rollbacks if they are already scheduled.
let is_rollback_scheduled = self.scheduled_rollback.is_some();
if !is_rollback_scheduled && self.state.rng.gen_bool(self.state.biases.rollback) {
Expand All @@ -381,7 +426,7 @@ impl Workload {
/// Commit a changeset.
async fn exercise_commit(&mut self, rr: &comms::RequestResponse) -> anyhow::Result<()> {
let (snapshot, changeset) = self.state.gen_commit();
let ToSupervisor::CommitSuccessful(elapsed) = rr
let ToSupervisor::CommitOutcome { elapsed, outcome } = rr
.send_request(crate::message::ToAgent::Commit(
crate::message::CommitPayload {
changeset: changeset.clone(),
Expand All @@ -393,6 +438,25 @@ impl Workload {
return Err(anyhow::anyhow!("Commit did not execute successfully"));
};

if self.enabled_enospc {
// TODO: the same handling should be extended to the rollback.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code should be very similar, so I think this TODO shooul be resolved in this pr before it is merged. Otherwise, we would have a broken torture because rollbacks could fail while torture expects them to succeed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kindly disagree:

  1. trickfs is hidden behind a flag. The behavior is unchanged if the flag is off.
  2. I wish it was easy. I think I shared it with you before, that I would like to unify crashing and not crashing exercise paths. Non-crashing path is very similiar to the crashing one except it just does not crash. Now we have this multiplicity due to the enospc!


// If we are in the `ENOSPC` mode, the commit should have failed.
if !matches!(outcome, crate::message::CommitOutcome::StorageFull) {
return Err(anyhow::anyhow!("Commit did not return ENOSPC"));
}

let agent_sync_seqn = rr.send_query_sync_seqn().await?;
if self.state.committed.sync_seqn != agent_sync_seqn {
return Err(anyhow::anyhow!(
"Unexpected sync_seqn after failed commit with ENOSPC"
));
}
self.ensure_changeset_reverted(rr, &changeset).await?;

return Ok(());
}

self.n_successfull_commit += 1;
self.tot_commit_time += elapsed;

Expand Down Expand Up @@ -750,6 +814,11 @@ impl Workload {
if let Some(agent) = self.agent.take() {
agent.teardown().await;
}
if let Some(trick_handle) = self.trick_handle.take() {
tokio::task::block_in_place(move || {
trick_handle.unmount_and_join();
});
}
}

/// Return the working directory.
Expand Down