Skip to content

Commit

Permalink
fix(torture): handle poisoning
Browse files Browse the repository at this point in the history
when the database gets ENOSPC it becomes poisoned and it is required to
recreate it in order to perform further changes to the database.

this commit does that.

To get there, it was necessary to split initing of the agent (happens at
most once per agent process) from opening a NOMT instance (can happen
multiple times). Note that opening carries the fields such as
`bitbox_seed` and `rollback` which might not be useful each time. This
is by design, because we might want to test what happens if you do
change it from open to open.
  • Loading branch information
pepyakin committed Feb 4, 2025
1 parent 1a8284f commit 75dd790
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 91 deletions.
145 changes: 91 additions & 54 deletions torture/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::{anyhow, bail, Result};
use futures::SinkExt as _;
use nomt::{Blake3Hasher, Nomt, SessionParams};
use std::future::Future;
use std::path::Path;
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::{
io::{BufReader, BufWriter},
Expand All @@ -17,14 +18,17 @@ use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tracing::trace;

use crate::message::{
self, CommitOutcome, CommitPayload, Envelope, InitPayload, KeyValueChange, RollbackPayload,
ToAgent, ToSupervisor, MAX_ENVELOPE_SIZE,
self, CommitOutcome, CommitPayload, Envelope, InitOutcome, KeyValueChange, OpenOutcome,
OpenPayload, RollbackPayload, ToAgent, ToSupervisor, MAX_ENVELOPE_SIZE,
};

/// The entrypoint for the agent.
///
/// `input` is the UnixStream that the agent should use to communicate with its supervisor.
pub async fn run(input: UnixStream) -> Result<()> {
let pid = std::process::id();
trace!(pid, "Child process started");

// Make the process non-dumpable.
//
// We expect this process to abort on a crash, so we don't want to leave lots of core dumps
Expand All @@ -33,15 +37,8 @@ pub async fn run(input: UnixStream) -> Result<()> {
nix::sys::prctl::set_dumpable(false)?;

let mut stream = Stream::new(input);
let mut agent = recv_init(&mut stream).await?;

crate::logging::init_agent(&agent.id, &agent.workdir);
let pid = std::process::id();
trace!(
pid,
bitbox_seed = hex::encode(&agent.bitbox_seed),
"Child process started"
);
let workdir = initialize(&mut stream).await?;
let mut agent = Agent::new();

loop {
// TODO: make the message processing non-blocking.
Expand Down Expand Up @@ -83,7 +80,7 @@ pub async fn run(input: UnixStream) -> Result<()> {
stream
.send(Envelope {
reqno,
message: ToSupervisor::CommitOutcome { elapsed, outcome },
message: ToSupervisor::CommitResponse { elapsed, outcome },
})
.await?;
}
Expand Down Expand Up @@ -150,7 +147,19 @@ pub async fn run(input: UnixStream) -> Result<()> {
drop(agent);
break;
}
ToAgent::Init(init) => bail!("unexpected init message, id={}", init.id),
ToAgent::Open(open_params) => {
tracing::info!("opening the database");
let outcome = agent.perform_open(&workdir, open_params).await;
stream
.send(Envelope {
reqno,
message: ToSupervisor::OpenResponse(outcome),
})
.await?;
}
ToAgent::Init(_) => {
bail!("Unexpected Init message");
}
}
}
Ok(())
Expand Down Expand Up @@ -191,14 +200,16 @@ async fn crash_task(
unreachable!();
}

/// Receives the [`ToAgent::Init`] message from the supervisor and returns the initialized agent. Sends
/// an Ack message back to the supervisor.
/// Performs the initialization of the agent.
///
/// Receives the [`ToAgent::Init`] message from the supervisor, initializes the logging, confirms
/// the receipt of the message, and returns the path to the working directory.
///
/// # Errors
///
/// Returns an error if the message is not an Init message or if the message is not received
/// within a certain time limit.
async fn recv_init(stream: &mut Stream) -> Result<Agent> {
async fn initialize(stream: &mut Stream) -> Result<PathBuf> {
const DEADLINE: Duration = Duration::from_secs(1);
let Envelope { reqno, message } = match timeout(DEADLINE, stream.recv()).await {
Ok(envelope) => envelope?,
Expand All @@ -209,48 +220,61 @@ async fn recv_init(stream: &mut Stream) -> Result<Agent> {
let ToAgent::Init(init) = message else {
anyhow::bail!("Expected Init message");
};
let agent = Agent::new(init)?;
stream
.send(Envelope {
reqno,
message: ToSupervisor::Ack,
})
.await?;
Ok(agent)

crate::logging::init_agent(&init.id, &init.workdir);

let workdir = PathBuf::from(&init.workdir);
if !workdir.exists() {
stream
.send(Envelope {
reqno,
message: ToSupervisor::InitResponse(InitOutcome::WorkdirDoesNotExist),
})
.await?;
bail!("Workdir does not exist");
} else {
stream
.send(Envelope {
reqno,
message: ToSupervisor::InitResponse(InitOutcome::Success),
})
.await?;
Ok(workdir)
}
}

struct Agent {
workdir: PathBuf,
// We use `Option` here because we want to be able to recreate Nomt instance after it is
// being poisoned.
nomt: Option<Nomt<Blake3Hasher>>,
id: String,
bitbox_seed: [u8; 16],
}

impl Agent {
fn new(init: InitPayload) -> Result<Self> {
let workdir = PathBuf::from(&init.workdir);
if !workdir.exists() {
bail!("workdir does not exist: {:?}", workdir);
fn new() -> Self {
Self { nomt: None }
}

async fn perform_open(&mut self, workdir: &Path, open_params: OpenPayload) -> OpenOutcome {
if let Some(nomt) = self.nomt.take() {
tracing::trace!("dropping the existing NOMT instance");
drop(nomt);
}

let mut o = nomt::Options::new();
o.path(workdir.join("nomt_db"));
o.bitbox_seed(init.bitbox_seed);
o.bitbox_seed(open_params.bitbox_seed);
o.hashtable_buckets(500_000);
if let Some(n_commits) = init.rollback {
if let Some(n_commits) = open_params.rollback {
o.rollback(true);
o.max_rollback_log_len(n_commits);
} else {
o.rollback(false);
}
let nomt = Nomt::open(o)?;
Ok(Self {
workdir,
nomt: Some(nomt),
id: init.id,
bitbox_seed: init.bitbox_seed,
})
let nomt = match tokio::task::block_in_place(|| Nomt::open(o)) {
Ok(nomt) => nomt,
Err(ref err) if is_enospc(err) => return OpenOutcome::StorageFull,
Err(ref err) => return OpenOutcome::UnknownFailure(err.to_string()),
};
self.nomt = Some(nomt);
OpenOutcome::Success
}

async fn commit(&mut self, changeset: Vec<KeyValueChange>) -> CommitOutcome {
Expand All @@ -269,20 +293,22 @@ impl Agent {
}
}

match tokio::task::block_in_place(|| session.finish(actuals).commit(&nomt)) {
// Perform the commit.
let commit_result = tokio::task::block_in_place(|| session.finish(actuals).commit(&nomt));

// Classify the result into one of the outcome bins.
let outcome = match commit_result {
Ok(()) => CommitOutcome::Success,
Err(err) => {
// Classify error.
if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
match io_err.raw_os_error() {
Some(errno) if errno == libc::ENOSPC => CommitOutcome::StorageFull,
_ => CommitOutcome::UnknownFailure,
}
} else {
CommitOutcome::UnknownFailure
}
}
Err(ref err) if is_enospc(err) => CommitOutcome::StorageFull,
Err(_) => CommitOutcome::UnknownFailure,
};

// Log the outcome if it was not successful.
if !matches!(outcome, CommitOutcome::Success) {
trace!("unsuccessful commit: {:?}", outcome);
}

outcome
}

async fn rollback(&mut self, n_commits: usize) -> Result<()> {
Expand All @@ -306,6 +332,17 @@ impl Agent {
}
}

/// Examines the given error to determine if it is an `ENOSPC` IO error.
fn is_enospc(err: &anyhow::Error) -> bool {
let Some(io_err) = err.downcast_ref::<std::io::Error>() else {
return false;
};
let Some(errno) = io_err.raw_os_error() else {
return false;
};
errno == libc::ENOSPC
}

/// Abstraction over the stream of messages from the supervisor.
struct Stream {
rd_stream: SymmetricallyFramed<
Expand Down
4 changes: 2 additions & 2 deletions torture/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub fn init_supervisor() {
.expect("Failed to set supervisor subscriber");
}

pub fn init_agent(agent_id: &str, workdir: &Path) {
pub fn init_agent(agent_id: &str, workdir: &impl AsRef<Path>) {
// Console layer with ANSI colors
let console_layer = fmt::layer()
.with_writer(io::stdout)
Expand All @@ -98,7 +98,7 @@ pub fn init_agent(agent_id: &str, workdir: &Path) {
let file = std::fs::File::options()
.create(true)
.append(true)
.open(workdir.join("agent.log"))
.open(workdir.as_ref().join("agent.log"))
.unwrap();

// TODO: this has an issue currently. While the ANSI is false the colors are not disabled
Expand Down
40 changes: 37 additions & 3 deletions torture/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@ impl KeyValueChange {
}

/// The parameters for the [`ToAgent::Init`] message.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct InitPayload {
/// ID string that can be used to identify the agent. This is used for logging and debugging.
pub id: String,
/// The directory where the child should store the data and other files (such as logs).
///
/// The directory must exist.
pub workdir: String,
}

/// The parameters for the [`ToAgent::Open`] message.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OpenPayload {
/// The seed that should be used for bitbox.
///
/// Only used upon creation a new NOMT db.
Expand Down Expand Up @@ -94,8 +99,11 @@ pub struct Envelope<T> {
#[derive(Debug, Serialize, Deserialize)]
pub enum ToAgent {
/// The first message sent by the supervisor to the child process. Contains the parameters the
/// supervisor expects the child to use.
/// supervisor expects the child to use. Usually sent only once per child process.
Init(InitPayload),
/// The supervisor sends this message to the child process to instruct it to open a database
/// with the given parameters.
Open(OpenPayload),
/// The supervisor sends this message to the child process to indicate that the child should
/// commit.
Commit(CommitPayload),
Expand Down Expand Up @@ -123,13 +131,39 @@ pub enum CommitOutcome {
UnknownFailure,
}

/// Elaboration on the agent initialization result inside of [`ToSupervisor::InitResponse`].
#[derive(Debug, Serialize, Deserialize)]
pub enum InitOutcome {
/// The agent successfully initialized.
Success,
/// The agent failed to initialize because the workdir does not exist.
///
/// This is the supervisor's failure.
WorkdirDoesNotExist,
}

/// Elaboration on the opening the database result inside of [`ToSupervisor::OpenResponse`].
#[derive(Debug, Serialize, Deserialize)]
pub enum OpenOutcome {
/// The agent successfully opened the database.
Success,
/// The agent failed to initialize because the volume is full.
StorageFull,
/// Uncategorised failure has happened with the given message.
UnknownFailure(String),
}

/// Messages sent from the agent to the supervisor.
#[derive(Debug, Serialize, Deserialize)]
pub enum ToSupervisor {
/// A generic acknowledgment message.
Ack,
/// The response to the [`ToAgent::Init`] request.
InitResponse(InitOutcome),
/// The response to the [`ToAgent::Open`] request.
OpenResponse(OpenOutcome),
/// The response to a completed commit request.
CommitOutcome {
CommitResponse {
/// The time it took for the operation to complete.
elapsed: Duration,
/// The outcome of the operation.
Expand Down
Loading

0 comments on commit 75dd790

Please sign in to comment.