Skip to content

Commit 435906a

Browse files
pepyakingabriele-0201
authored andcommitted
feat(torture): integrate with trickfs
and as the first thing to test, just toggle on the ENOSPC from run to run.
1 parent cd088f1 commit 435906a

File tree

8 files changed

+171
-22
lines changed

8 files changed

+171
-22
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

torture/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,5 @@ hex = "0.4.3"
2626
futures-util = "0.3.31"
2727
clap = { version = "4.5.23", features = ["derive"] }
2828

29+
[target.'cfg(target_os = "linux")'.dependencies]
30+
trickfs = { path = "../trickfs" }

torture/src/agent.rs

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
1717
use tracing::trace;
1818

1919
use crate::message::{
20-
self, CommitPayload, Envelope, InitPayload, KeyValueChange, RollbackPayload, ToAgent,
21-
ToSupervisor, MAX_ENVELOPE_SIZE,
20+
self, CommitOutcome, CommitPayload, Envelope, InitPayload, KeyValueChange, RollbackPayload,
21+
ToAgent, ToSupervisor, MAX_ENVELOPE_SIZE,
2222
};
2323

2424
/// The entrypoint for the agent.
@@ -77,13 +77,13 @@ pub async fn run(input: UnixStream) -> Result<()> {
7777
should_crash: None,
7878
}) => {
7979
let start = std::time::Instant::now();
80-
agent.commit(changeset).await?;
80+
let outcome = agent.commit(changeset).await;
8181
let elapsed = start.elapsed();
8282
tracing::info!("commit took {}ms", elapsed.as_millis());
8383
stream
8484
.send(Envelope {
8585
reqno,
86-
message: ToSupervisor::CommitSuccessful(elapsed),
86+
message: ToSupervisor::CommitOutcome { elapsed, outcome },
8787
})
8888
.await?;
8989
}
@@ -221,7 +221,9 @@ async fn recv_init(stream: &mut Stream) -> Result<Agent> {
221221

222222
struct Agent {
223223
workdir: PathBuf,
224-
nomt: Nomt<Blake3Hasher>,
224+
// We use `Option` here because we want to be able to recreate Nomt instance after it is
225+
// being poisoned.
226+
nomt: Option<Nomt<Blake3Hasher>>,
225227
id: String,
226228
bitbox_seed: [u8; 16],
227229
}
@@ -245,14 +247,16 @@ impl Agent {
245247
let nomt = Nomt::open(o)?;
246248
Ok(Self {
247249
workdir,
248-
nomt,
250+
nomt: Some(nomt),
249251
id: init.id,
250252
bitbox_seed: init.bitbox_seed,
251253
})
252254
}
253255

254-
async fn commit(&mut self, changeset: Vec<KeyValueChange>) -> Result<()> {
255-
let session = self.nomt.begin_session(SessionParams::default());
256+
async fn commit(&mut self, changeset: Vec<KeyValueChange>) -> CommitOutcome {
257+
// UNWRAP: `nomt` is always `Some` except recreation.
258+
let nomt = self.nomt.as_ref().unwrap();
259+
let session = nomt.begin_session(SessionParams::default());
256260
let mut actuals = Vec::with_capacity(changeset.len());
257261
for change in changeset {
258262
match change {
@@ -265,23 +269,40 @@ impl Agent {
265269
}
266270
}
267271

268-
tokio::task::block_in_place(|| session.finish(actuals).commit(&self.nomt))?;
269-
270-
Ok(())
272+
match tokio::task::block_in_place(|| session.finish(actuals).commit(&nomt)) {
273+
Ok(()) => CommitOutcome::Success,
274+
Err(err) => {
275+
// Classify error.
276+
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
277+
match io_err.raw_os_error() {
278+
Some(errno) if errno == libc::ENOSPC => CommitOutcome::StorageFull,
279+
_ => CommitOutcome::UnknownFailure,
280+
}
281+
} else {
282+
CommitOutcome::UnknownFailure
283+
}
284+
}
285+
}
271286
}
272287

273288
async fn rollback(&mut self, n_commits: usize) -> Result<()> {
274-
tokio::task::block_in_place(|| self.nomt.rollback(n_commits))?;
289+
// UNWRAP: `nomt` is always `Some` except recreation.
290+
let nomt = self.nomt.as_ref().unwrap();
291+
tokio::task::block_in_place(|| nomt.rollback(n_commits))?;
275292
Ok(())
276293
}
277294

278295
fn query(&mut self, key: message::Key) -> Result<Option<message::Value>> {
279-
let value = self.nomt.read(key)?;
296+
// UNWRAP: `nomt` is always `Some` except recreation.
297+
let nomt = self.nomt.as_ref().unwrap();
298+
let value = nomt.read(key)?;
280299
Ok(value)
281300
}
282301

283302
fn query_sync_seqn(&mut self) -> u32 {
284-
self.nomt.sync_seqn()
303+
// UNWRAP: `nomt` is always `Some` except recreation.
304+
let nomt = self.nomt.as_ref().unwrap();
305+
nomt.sync_seqn()
285306
}
286307
}
287308

torture/src/message.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,29 @@ pub enum ToAgent {
112112
GracefulShutdown,
113113
}
114114

115+
/// Different outcomes of a commit operation.
116+
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
117+
pub enum CommitOutcome {
118+
/// The commit was successful.
119+
Success,
120+
/// The commit failed because the storage is full, dubbed ENOSPC.
121+
StorageFull,
122+
/// Some other failure occurred.
123+
UnknownFailure,
124+
}
125+
115126
/// Messages sent from the agent to the supervisor.
116127
#[derive(Debug, Serialize, Deserialize)]
117128
pub enum ToSupervisor {
118129
/// A generic acknowledgment message.
119130
Ack,
120-
/// The response to a successful commit, it contains the elapsed time to perform the commit.
121-
CommitSuccessful(Duration),
131+
/// The response to a completed commit request.
132+
CommitOutcome {
133+
/// The time it took for the operation to complete.
134+
elapsed: Duration,
135+
/// The outcome of the operation.
136+
outcome: CommitOutcome,
137+
},
122138
/// The response to a query for a key-value pair.
123139
QueryValue(Option<Value>),
124140
/// The response to a query for the current sequence number of the database.

torture/src/supervisor/cli.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,11 @@ pub struct WorkloadParams {
110110
#[clap(default_value = "false")]
111111
#[arg(long = "sample-snapshot")]
112112
pub sample_snapshot: bool,
113+
114+
/// Whether to enable testing using the trickfs.
115+
///
116+
/// Supported on Linux only.
117+
#[clap(default_value = "false")]
118+
#[arg(long = "trickfs")]
119+
pub trickfs: bool,
113120
}

torture/src/supervisor/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use workload::Workload;
1717
mod cli;
1818
mod comms;
1919
mod controller;
20+
mod trickfs;
2021
mod workload;
2122

2223
/// The entrypoint for the supervisor part of the program.
@@ -172,7 +173,7 @@ async fn run_workload(
172173
.suffix(format!("-workload-{}", workload_id).as_str())
173174
.tempdir()
174175
.expect("Failed to create a temp dir");
175-
let mut workload = Workload::new(seed, workdir, workload_params, workload_id);
176+
let mut workload = Workload::new(seed, workdir, workload_params, workload_id)?;
176177
let result = workload.run(cancel_token).await;
177178
match result {
178179
Ok(()) => Ok(None),
@@ -206,9 +207,14 @@ const NON_DETERMINISM_DISCLAIMER: &str = "torture is a non-deterministic fuzzer.
206207
async fn control_loop(
207208
cancel_token: CancellationToken,
208209
seed: u64,
209-
workload_params: WorkloadParams,
210+
mut workload_params: WorkloadParams,
210211
flag_num_limit: usize,
211212
) -> Result<()> {
213+
if workload_params.trickfs && !trickfs::is_supported() {
214+
warn!("TrickFS is not supported on this system. Disabling.");
215+
workload_params.trickfs = false;
216+
}
217+
212218
let mut flags = Vec::new();
213219
let mut workload_cnt = 0;
214220
// TODO: Run workloads in parallel. Make the concurrency factor configurable.

torture/src/supervisor/trickfs.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// A wrapper needed to make it compile on non-Linux platforms.
2+
3+
cfg_if::cfg_if! {
4+
if #[cfg(target_os = "linux")] {
5+
pub use trickfs::TrickHandle;
6+
7+
pub fn is_supported() -> bool {
8+
true
9+
}
10+
} else {
11+
pub struct TrickHandle;
12+
13+
impl TrickHandle {
14+
pub fn set_trigger_enospc(&self, enabled: bool) {
15+
let _ = enabled;
16+
unimplemented!("TrickHandle::set_trigger_enospc");
17+
}
18+
19+
pub fn unmount_and_join(self) {
20+
}
21+
}
22+
23+
pub fn is_supported() -> bool {
24+
false
25+
}
26+
}
27+
}

torture/src/supervisor/workload.rs

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ struct Biases {
3737
commit_crash: f64,
3838
/// When executing a rollback this is the probability of causing it to crash.
3939
rollback_crash: f64,
40+
/// The probability of turning on the `ENOSPC` error.
41+
enospc_on: f64,
42+
/// The probability of turning off the `ENOSPC` error.
43+
enospc_off: f64,
4044
}
4145

4246
impl Biases {
@@ -72,6 +76,8 @@ impl Biases {
7276
commit_crash: (commit_crash as f64) / 100.0,
7377
rollback_crash: (rollback_crash as f64) / 100.0,
7478
new_key_distribution,
79+
enospc_on: 0.1,
80+
enospc_off: 0.2,
7581
}
7682
}
7783
}
@@ -235,6 +241,10 @@ impl WorkloadState {
235241
pub struct Workload {
236242
/// Working directory for this particular workload.
237243
workdir: TempDir,
244+
/// The handle to the trickfs FUSE FS.
245+
///
246+
/// `Some` until the workload is torn down.
247+
trick_handle: Option<super::trickfs::TrickHandle>,
238248
/// The currently spawned agent.
239249
///
240250
/// Initially `None`.
@@ -254,6 +264,8 @@ pub struct Workload {
254264
ensure_changeset: bool,
255265
/// Whether to ensure the correctness of the entire state after every crash or rollback.
256266
ensure_snapshot: bool,
267+
/// Whether the trickfs is currently configured to return `ENOSPC` errors for every write.
268+
enabled_enospc: bool,
257269
/// Whether to randomly sample the state after every crash or rollback.
258270
sample_snapshot: bool,
259271
/// The max number of commits involved in a rollback.
@@ -279,7 +291,7 @@ impl Workload {
279291
workdir: TempDir,
280292
workload_params: &WorkloadParams,
281293
workload_id: u64,
282-
) -> Self {
294+
) -> anyhow::Result<Self> {
283295
// TODO: Make the workload size configurable and more sophisticated.
284296
//
285297
// Right now the workload size is a synonym for the number of iterations. We probably
@@ -302,8 +314,19 @@ impl Workload {
302314
workload_params.random_size,
303315
);
304316
let bitbox_seed = state.gen_bitbox_seed();
305-
Self {
317+
318+
#[cfg(target_os = "linux")]
319+
let trick_handle = workload_params
320+
.trickfs
321+
.then(|| trickfs::spawn_trick(&workdir.path()))
322+
.transpose()?;
323+
324+
#[cfg(not(target_os = "linux"))]
325+
let trick_handle = None;
326+
327+
Ok(Self {
306328
workdir,
329+
trick_handle,
307330
agent: None,
308331
iterations: workload_params.iterations,
309332
state,
@@ -316,7 +339,8 @@ impl Workload {
316339
sample_snapshot: workload_params.sample_snapshot,
317340
max_rollback_commits: workload_params.max_rollback_commits,
318341
scheduled_rollback: None,
319-
}
342+
enabled_enospc: false,
343+
})
320344
}
321345

322346
/// Run the workload.
@@ -359,6 +383,27 @@ impl Workload {
359383
return Ok(());
360384
}
361385

386+
if self.trick_handle.is_some() {
387+
if self.enabled_enospc {
388+
let should_turn_off = self.state.rng.gen_bool(self.state.biases.enospc_off);
389+
if should_turn_off {
390+
info!("unsetting ENOSPC");
391+
self.enabled_enospc = false;
392+
self.trick_handle
393+
.as_ref()
394+
.unwrap()
395+
.set_trigger_enospc(false);
396+
}
397+
} else {
398+
let should_turn_on = self.state.rng.gen_bool(self.state.biases.enospc_on);
399+
if should_turn_on {
400+
info!("setting ENOSPC");
401+
self.enabled_enospc = true;
402+
self.trick_handle.as_ref().unwrap().set_trigger_enospc(true);
403+
}
404+
}
405+
}
406+
362407
// Do not schedule new rollbacks if they are already scheduled.
363408
let is_rollback_scheduled = self.scheduled_rollback.is_some();
364409
if !is_rollback_scheduled && self.state.rng.gen_bool(self.state.biases.rollback) {
@@ -381,7 +426,7 @@ impl Workload {
381426
/// Commit a changeset.
382427
async fn exercise_commit(&mut self, rr: &comms::RequestResponse) -> anyhow::Result<()> {
383428
let (snapshot, changeset) = self.state.gen_commit();
384-
let ToSupervisor::CommitSuccessful(elapsed) = rr
429+
let ToSupervisor::CommitOutcome { elapsed, outcome } = rr
385430
.send_request(crate::message::ToAgent::Commit(
386431
crate::message::CommitPayload {
387432
changeset: changeset.clone(),
@@ -393,6 +438,25 @@ impl Workload {
393438
return Err(anyhow::anyhow!("Commit did not execute successfully"));
394439
};
395440

441+
if self.enabled_enospc {
442+
// TODO: the same handling should be extended to the rollback.
443+
444+
// If we are in the `ENOSPC` mode, the commit should have failed.
445+
if !matches!(outcome, crate::message::CommitOutcome::StorageFull) {
446+
return Err(anyhow::anyhow!("Commit did not return ENOSPC"));
447+
}
448+
449+
let agent_sync_seqn = rr.send_query_sync_seqn().await?;
450+
if self.state.committed.sync_seqn != agent_sync_seqn {
451+
return Err(anyhow::anyhow!(
452+
"Unexpected sync_seqn after failed commit with ENOSPC"
453+
));
454+
}
455+
self.ensure_changeset_reverted(rr, &changeset).await?;
456+
457+
return Ok(());
458+
}
459+
396460
self.n_successfull_commit += 1;
397461
self.tot_commit_time += elapsed;
398462

@@ -750,6 +814,11 @@ impl Workload {
750814
if let Some(agent) = self.agent.take() {
751815
agent.teardown().await;
752816
}
817+
if let Some(trick_handle) = self.trick_handle.take() {
818+
tokio::task::block_in_place(move || {
819+
trick_handle.unmount_and_join();
820+
});
821+
}
753822
}
754823

755824
/// Return the working directory.

0 commit comments

Comments
 (0)