Skip to content

Commit

Permalink
feat(torture): rollback handles ENOSPC
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriele-0201 committed Feb 12, 2025
1 parent 560fe73 commit 262e8da
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 23 deletions.
42 changes: 27 additions & 15 deletions torture/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ pub async fn run(input: UnixStream) -> Result<()> {
should_crash: None,
}) => {
let start = std::time::Instant::now();
agent.rollback(n_commits).await?;
let outcome = agent.rollback(n_commits).await;
tracing::info!("rollback took {}ms", start.elapsed().as_millis());
stream
.send(Envelope {
reqno,
message: ToSupervisor::Ack,
message: ToSupervisor::RollbackResponse { outcome },
})
.await?;
}
Expand Down Expand Up @@ -295,27 +295,30 @@ impl Agent {

// Perform the commit.
let commit_result = tokio::task::block_in_place(|| session.finish(actuals)?.commit(&nomt));

// Classify the result into one of the outcome bins.
let outcome = match commit_result {
Ok(()) => Outcome::Success,
Err(ref err) if is_enospc(err) => Outcome::StorageFull,
Err(err) => Outcome::UnknownFailure(err.to_string()),
};
let commit_outcome = classify_result(commit_result);

// Log the outcome if it was not successful.
if !matches!(outcome, Outcome::Success) {
trace!("unsuccessful commit: {:?}", outcome);
if !matches!(commit_outcome, Outcome::Success) {
trace!("unsuccessful commit: {:?}", commit_outcome);
}

outcome
commit_outcome
}

async fn rollback(&mut self, n_commits: usize) -> Result<()> {
async fn rollback(&mut self, n_commits: usize) -> Outcome {
// UNWRAP: `nomt` is always `Some` except recreation.
let nomt = self.nomt.as_ref().unwrap();
tokio::task::block_in_place(|| nomt.rollback(n_commits))?;
Ok(())

// Perform the rollback.
let rollback_result = tokio::task::block_in_place(|| nomt.rollback(n_commits));
let rollback_outcome = classify_result(rollback_result);

// Log the outcome if it was not successful.
if !matches!(rollback_outcome, Outcome::Success) {
trace!("unsuccessful rollback: {:?}", rollback_outcome);
}

rollback_outcome
}

fn query(&mut self, key: message::Key) -> Result<Option<message::Value>> {
Expand All @@ -332,6 +335,15 @@ impl Agent {
}
}

/// Classify an operation result into one of the outcome.
fn classify_result(operation_result: anyhow::Result<()>) -> Outcome {
match operation_result {
Ok(()) => Outcome::Success,
Err(ref err) if is_enospc(err) => Outcome::StorageFull,
Err(err) => Outcome::UnknownFailure(err.to_string()),
}
}

/// Examines the given error to determine if it is an `ENOSPC` IO error.
fn is_enospc(err: &anyhow::Error) -> bool {
let Some(io_err) = err.downcast_ref::<std::io::Error>() else {
Expand Down
7 changes: 6 additions & 1 deletion torture/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ pub enum ToSupervisor {
CommitResponse {
/// The time it took for the operation to complete.
elapsed: Duration,
/// The outcome of the operation.
/// The outcome of the commit.
outcome: Outcome,
},
/// The response to a completed rollback request.
RollbackResponse {
/// The outcome of the rollback.
outcome: Outcome,
},
/// The response to a query for a key-value pair.
Expand Down
22 changes: 15 additions & 7 deletions torture/src/supervisor/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,13 +615,21 @@ impl Workload {
snapshot: Snapshot,
) -> anyhow::Result<()> {
trace!("exercising rollback of {} commits", n_commits_to_rollback);
rr.send_request(crate::message::ToAgent::Rollback(
crate::message::RollbackPayload {
n_commits: n_commits_to_rollback,
should_crash: None,
},
))
.await?;
let ToSupervisor::RollbackResponse { outcome } = rr
.send_request(crate::message::ToAgent::Rollback(
crate::message::RollbackPayload {
n_commits: n_commits_to_rollback,
should_crash: None,
},
))
.await?
else {
return Err(anyhow::anyhow!(
"RollbackCommit did not execute successfully"
));
};

self.ensure_outcome_validity(rr, &outcome).await?;

let agent_sync_seqn = rr.send_query_sync_seqn().await?;
if agent_sync_seqn != self.state.committed.sync_seqn + 1 {
Expand Down

0 comments on commit 262e8da

Please sign in to comment.