Skip to content

Commit

Permalink
feat(torture): unify rollback and rollback crash
Browse files Browse the repository at this point in the history
The unification of the two has the side effect of handling
ENOSPC errors also for rollbacks that are expected to crash.
  • Loading branch information
gabriele-0201 committed Feb 7, 2025
1 parent d704d59 commit 1885e40
Showing 1 changed file with 45 additions and 53 deletions.
98 changes: 45 additions & 53 deletions torture/src/supervisor/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,8 @@ impl Workload {
// Do not schedule new rollbacks if they are already scheduled.
let is_rollback_scheduled = self.scheduled_rollback.is_some();
if !is_rollback_scheduled && self.state.rng.gen_bool(self.state.biases.rollback) {
if self.state.rng.gen_bool(self.state.biases.rollback_crash) {
self.schedule_rollback(true /*should_crash*/).await?
} else {
self.schedule_rollback(false /*should_crash*/).await?
}
let should_crash = self.state.rng.gen_bool(self.state.biases.rollback_crash);
self.schedule_rollback(should_crash).await?
}

let should_crash = self.state.rng.gen_bool(self.state.biases.commit_crash);
Expand Down Expand Up @@ -592,9 +589,9 @@ impl Workload {
} = scheduled_rollback;

match should_crash {
None => self.exercise_rollback(&rr, n_commits, snapshot).await,
None => self.exercise_rollback(&rr, n_commits, snapshot, None).await,
Some(crash_delay) => {
self.exercise_rollback_crashing(&rr, n_commits, snapshot, crash_delay)
self.exercise_rollback(&rr, n_commits, snapshot, Some(crash_delay))
.await
}
}
Expand All @@ -605,69 +602,64 @@ impl Workload {
rr: &comms::RequestResponse,
n_commits_to_rollback: usize,
snapshot: Snapshot,
should_crash: Option<Duration>,
) -> anyhow::Result<()> {
trace!("exercising rollback of {} commits", n_commits_to_rollback);
let ToSupervisor::RollbackResponse { outcome } = rr
let maybe_crash_text = if should_crash.is_some() { " crash" } else { "" };
trace!(
"exercising rollback{} of {} commits",
maybe_crash_text,
n_commits_to_rollback
);

let rollback_outcome = rr
.send_request(crate::message::ToAgent::Rollback(
crate::message::RollbackPayload {
n_commits: n_commits_to_rollback,
should_crash: None,
should_crash: should_crash.clone(),
},
))
.await?
else {
return Err(anyhow::anyhow!(
"RollbackCommit did not execute successfully"
));
};
.await?;

self.ensure_outcome_validity(rr, &outcome).await?;
if should_crash.is_some() {
let ToSupervisor::Ack = rollback_outcome else {
return Err(anyhow::anyhow!(
"RollbackCommit crash did not execute successfully"
));
};

let agent_sync_seqn = rr.send_query_sync_seqn().await?;
if agent_sync_seqn != self.state.committed.sync_seqn + 1 {
return Err(anyhow::anyhow!("Unexpected sync_seqn after rollback"));
self.wait_for_crash().await?;

// During a rollback crash, every type of error could happen.
// However the agent will be respawned, so it will just
// make sure the rollback was correctly applied or not.
self.spawn_new_agent().await?;
} else {
let ToSupervisor::RollbackResponse { outcome } = rollback_outcome else {
return Err(anyhow::anyhow!(
"RollbackCommit did not execute successfully"
));
};

self.ensure_outcome_validity(rr, &outcome).await?;
}

self.state.rollback(snapshot);
self.ensure_snapshot_validity(rr).await?;
Ok(())
}
// Udpate RequestResponse, the agent could have been respawned.
let agent = self.agent.as_ref().unwrap();
let rr = agent.rr().clone();

async fn exercise_rollback_crashing(
&mut self,
rr: &comms::RequestResponse,
n_commits_to_rollback: usize,
snapshot: Snapshot,
crash_delay: Duration,
) -> anyhow::Result<()> {
trace!(
"exercising rollback crash of {} commits",
n_commits_to_rollback
);
rr.send_request(crate::message::ToAgent::Rollback(
crate::message::RollbackPayload {
n_commits: n_commits_to_rollback,
should_crash: Some(crash_delay),
},
))
.await?;

self.wait_for_crash().await?;

// Spawns a new agent and checks whether the rollback was applied to the database and if so
// we rollback to the correct snapshot in the state.
self.spawn_new_agent().await?;
let rr = self.agent.as_ref().unwrap().rr().clone();
let sync_seqn = rr.send_query_sync_seqn().await?;
let agent_sync_seqn = rr.send_query_sync_seqn().await?;
let last_sync_seqn = self.state.committed.sync_seqn;
if sync_seqn == last_sync_seqn + 1 {
if agent_sync_seqn == last_sync_seqn + 1 {
// sync_seqn has increased, so the rollback is expected to be applied correctly
self.state.rollback(snapshot);
} else if sync_seqn == last_sync_seqn {
} else if agent_sync_seqn == last_sync_seqn {
// The rollback successfully crashed.
info!("rollback crashed, seqno: {}", last_sync_seqn);
} else {
return Err(anyhow::anyhow!("Unexpected sync_seqn after rollback crash"));
return Err(anyhow::anyhow!(
"Unexpected sync_seqn after rollback{}",
maybe_crash_text
));
}

self.ensure_snapshot_validity(&rr).await?;
Expand Down

0 comments on commit 1885e40

Please sign in to comment.