Skip to content

Commit

Permalink
feat: warm-up worker properly propagates errors
Browse files Browse the repository at this point in the history
Also include some changes to ensure that every I/O error
is correctly propagated, while everything else causes a panic.
  • Loading branch information
gabriele-0201 authored and pepyakin committed Feb 7, 2025
1 parent 9dfdbd3 commit f175544
Show file tree
Hide file tree
Showing 15 changed files with 130 additions and 121 deletions.
4 changes: 2 additions & 2 deletions benchtop/src/nomt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl NomtDB {
let mut actual_access: Vec<_> = access.into_iter().collect();
actual_access.sort_by_key(|(k, _)| *k);

let finished = session.finish(actual_access);
let finished = session.finish(actual_access).unwrap();
if self.overlay_window_capacity == 0 {
finished.commit(&self.nomt).unwrap();
} else {
Expand Down Expand Up @@ -183,7 +183,7 @@ impl NomtDB {
.collect();
actual_access.sort_by_key(|(k, _)| *k);

let finished = session.finish(actual_access);
let finished = session.finish(actual_access).unwrap();
if self.overlay_window_capacity == 0 {
finished.commit(&self.nomt).unwrap();
} else {
Expand Down
2 changes: 1 addition & 1 deletion examples/commit_batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl NomtDB {
// The final step in handling a session involves committing all changes
// to update the trie structure and obtaining the new root of the trie,
// along with a witness and the witnessed operations.
let mut finished = session.finish(actual_access);
let mut finished = session.finish(actual_access).unwrap();

// This field is set because the finished session was configured with
// `WitnessMode::read_write`.
Expand Down
4 changes: 3 additions & 1 deletion examples/read_value/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ fn main() -> Result<()> {
// we will prove the read.
session.warm_up(key_path);

let mut finished = session.finish(vec![(key_path, KeyReadWrite::Read(value))]);
let mut finished = session
.finish(vec![(key_path, KeyReadWrite::Read(value))])
.unwrap();
let _witness = finished.take_witness();
finished.commit(&nomt)?;

Expand Down
2 changes: 1 addition & 1 deletion fuzz/fuzz_targets/api_surface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fuzz_target!(|run: Run| {
session.warm_up(key_path);
}
SessionCall::CommitAndProve { keys } => {
let _ = session.finish(keys).commit(&db);
let _ = session.finish(keys).unwrap().commit(&db).unwrap();
break;
}
SessionCall::Drop => {
Expand Down
20 changes: 6 additions & 14 deletions nomt/src/bitbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,16 +558,11 @@ impl PageLoader {
/// Note that the page loaded by the I/O pool may be a misprobe. You must use
/// [`PageLoad::try_complete`] to verify whether the hash-table probe has completed or must be
/// tried again.
pub fn probe(
&self,
load: &mut PageLoad,
io_handle: &IoHandle,
user_data: u64,
) -> anyhow::Result<bool> {
pub fn probe(&self, load: &mut PageLoad, io_handle: &IoHandle, user_data: u64) -> bool {
let bucket = loop {
match load.probe_sequence.next(&self.meta_map) {
ProbeResult::Tombstone(_) => continue,
ProbeResult::Empty(_) => return Ok(false),
ProbeResult::Empty(_) => return false,
ProbeResult::PossibleHit(bucket) => break BucketIndex(bucket),
}
};
Expand All @@ -580,13 +575,10 @@ impl PageLoader {
user_data,
};

match io_handle.send(command) {
Ok(()) => {
load.state = PageLoadState::Submitted;
Ok(true)
}
Err(_) => anyhow::bail!("I/O pool hangup"),
}
// UNWRAP: I/O pool is not expected to hangup.
io_handle.send(command).unwrap();
load.state = PageLoadState::Submitted;
true
}
}

Expand Down
13 changes: 8 additions & 5 deletions nomt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ impl<T: HashAlgorithm> Nomt<T> {
actuals.push((key, value));
}

sess.finish(actuals).commit(&self)?;
sess.finish(actuals)?.commit(&self)?;

Ok(())
}
Expand Down Expand Up @@ -583,7 +583,10 @@ impl<T: HashAlgorithm> Session<T> {
/// considered within the finished session.
///
/// This function blocks until the merkle root and changeset are computed.
pub fn finish(mut self, actuals: Vec<(KeyPath, KeyReadWrite)>) -> FinishedSession {
pub fn finish(
mut self,
actuals: Vec<(KeyPath, KeyReadWrite)>,
) -> std::io::Result<FinishedSession> {
if cfg!(debug_assertions) {
// Check that the actuals are sorted by key path.
for i in 1..actuals.len() {
Expand All @@ -606,7 +609,7 @@ impl<T: HashAlgorithm> Session<T> {

let merkle_update_handle = self
.merkle_updater
.update_and_prove::<T>(compact_actuals, self.witness_mode.0);
.update_and_prove::<T>(compact_actuals, self.witness_mode.0)?;

let mut tx = self.store.new_value_tx();
for (path, read_write) in actuals {
Expand All @@ -616,14 +619,14 @@ impl<T: HashAlgorithm> Session<T> {
}

let merkle_output = merkle_update_handle.join();
FinishedSession {
Ok(FinishedSession {
value_transaction: tx,
merkle_output,
rollback_delta,
parent_overlay: self.overlay,
prev_root: self.prev_root,
take_global_guard: self.access_guard.is_some(),
}
})
}
}

Expand Down
18 changes: 11 additions & 7 deletions nomt/src/merkle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
page_cache::{Page, PageCache, ShardIndex},
rw_pass_cell::WritePassEnvelope,
store::{BucketIndex, DirtyPage, SharedMaybeBucketIndex, Store},
task::{join_task, spawn_task, TaskResult},
HashAlgorithm, Witness, WitnessedOperations, WitnessedPath, WitnessedRead, WitnessedWrite,
};
use threadpool::ThreadPool;
Expand Down Expand Up @@ -214,7 +215,7 @@ impl Updater {
self,
read_write: Vec<(KeyPath, KeyReadWrite)>,
witness: bool,
) -> UpdateHandle {
) -> std::io::Result<UpdateHandle> {
if let Some(ref warm_up) = self.warm_up {
let _ = warm_up.finish_tx.send(());
}
Expand All @@ -229,9 +230,8 @@ impl Updater {
let shard_regions = (0..num_workers).map(ShardIndex::Shard).collect::<Vec<_>>();

// receive warm-ups from worker.
// TODO: handle error better.
let (warm_ups, warm_page_set) = if let Some(ref warm_up) = self.warm_up {
let output = warm_up.output_rx.recv().unwrap();
let output = join_task(&warm_up.output_rx)?;
(output.paths, Some(output.pages))
} else {
(HashMap::new(), None)
Expand Down Expand Up @@ -262,11 +262,11 @@ impl Updater {
spawn_updater::<H>(&self.worker_tp, params, worker_tx.clone());
}

UpdateHandle {
Ok(UpdateHandle {
shared,
worker_rx,
num_workers,
}
})
}
}

Expand Down Expand Up @@ -462,7 +462,7 @@ impl UpdateShared {
struct WarmUpHandle {
finish_tx: Sender<()>,
warmup_tx: Sender<WarmUpCommand>,
output_rx: Receiver<WarmUpOutput>,
output_rx: Receiver<TaskResult<std::io::Result<WarmUpOutput>>>,
}

fn spawn_warm_up<H: HashAlgorithm>(
Expand All @@ -473,7 +473,11 @@ fn spawn_warm_up<H: HashAlgorithm>(
let (output_tx, output_rx) = channel::bounded(1);
let (finish_tx, finish_rx) = channel::bounded(1);

worker_tp.execute(move || worker::run_warm_up::<H>(params, warmup_rx, finish_rx, output_tx));
spawn_task(
&worker_tp,
move || worker::run_warm_up::<H>(params, warmup_rx, finish_rx),
output_tx,
);

WarmUpHandle {
warmup_tx,
Expand Down
56 changes: 23 additions & 33 deletions nomt/src/merkle/seek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,12 @@ impl<H: HashAlgorithm> Seeker<H> {
}

/// Try to submit as many requests as possible.
pub fn submit_all(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> {
pub fn submit_all(&mut self, page_set: &mut PageSet) {
if !self.has_room() {
return Ok(());
return;
}
self.submit_idle_page_loads()?;
self.submit_idle_key_path_requests(page_set)?;

Ok(())
self.submit_idle_page_loads();
self.submit_idle_key_path_requests(page_set);
}

/// Take the result of a complete request.
Expand Down Expand Up @@ -365,7 +363,7 @@ impl<H: HashAlgorithm> Seeker<H> {
}

/// Try to process the next I/O. Does not block the current thread.
pub fn try_recv_page(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> {
pub fn try_recv_page(&mut self, page_set: &mut PageSet) -> std::io::Result<()> {
if let Ok(io) = self.io_handle.try_recv() {
self.handle_completion(page_set, io)?;
}
Expand All @@ -374,8 +372,11 @@ impl<H: HashAlgorithm> Seeker<H> {
}

/// Block on processing the next I/O. Blocks the current thread.
pub fn recv_page(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> {
let io = self.io_handle.recv()?;
///
/// Panics if the I/O pool is down.
pub fn recv_page(&mut self, page_set: &mut PageSet) -> std::io::Result<()> {
// UNWRAP: The I/O pool is not expected to fail before completion is returned.
let io = self.io_handle.recv().unwrap();
self.handle_completion(page_set, io)?;
Ok(())
}
Expand All @@ -393,35 +394,31 @@ impl<H: HashAlgorithm> Seeker<H> {
}

// resubmit all idle page loads until no more remain.
fn submit_idle_page_loads(&mut self) -> anyhow::Result<()> {
fn submit_idle_page_loads(&mut self) {
while let Some(slab_index) = self.idle_page_loads.pop_front() {
self.submit_idle_page_load(slab_index)?;
self.submit_idle_page_load(slab_index);
}

Ok(())
}

// submit the next page for each idle key path request until backpressuring or no more progress
// can be made.
fn submit_idle_key_path_requests(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> {
fn submit_idle_key_path_requests(&mut self, page_set: &mut PageSet) {
while self.has_room() {
match self.idle_requests.pop_front() {
None => return Ok(()),
None => return,
Some(request_index) => {
self.submit_key_path_request(page_set, request_index)?;
self.submit_key_path_request(page_set, request_index);
}
}
}

Ok(())
}

// submit a page load which is currently in the slab, but idle.
fn submit_idle_page_load(&mut self, slab_index: usize) -> anyhow::Result<()> {
fn submit_idle_page_load(&mut self, slab_index: usize) {
if let IoRequest::Merkle(ref mut page_load) = self.io_slab[slab_index] {
if !self
.page_loader
.probe(page_load, &self.io_handle, slab_index as u64)?
.probe(page_load, &self.io_handle, slab_index as u64)
{
// PANIC: seek should really never reach fresh pages. we only request a page if
// we seek to an internal node above it, and in that case the page really should
Expand All @@ -434,18 +431,12 @@ impl<H: HashAlgorithm> Seeker<H> {
unreachable!()
}
}

Ok(())
}

// submit the next page for this key path request.
fn submit_key_path_request(
&mut self,
page_set: &mut PageSet,
request_index: usize,
) -> anyhow::Result<()> {
fn submit_key_path_request(&mut self, page_set: &mut PageSet, request_index: usize) {
let i = if request_index < self.processed {
return Ok(());
return;
} else {
request_index - self.processed
};
Expand Down Expand Up @@ -484,7 +475,8 @@ impl<H: HashAlgorithm> Seeker<H> {
let load = self.page_loader.start_load(page_id.clone());
vacant_entry.insert(vec![request_index]);
let slab_index = self.io_slab.insert(IoRequest::Merkle(load));
return self.submit_idle_page_load(slab_index);
self.submit_idle_page_load(slab_index);
return;
}
IoQuery::LeafPage(page_number) => {
let vacant_entry = match self.io_waiters.entry(IoQuery::LeafPage(page_number)) {
Expand All @@ -511,17 +503,15 @@ impl<H: HashAlgorithm> Seeker<H> {

vacant_entry.insert(vec![request_index]);
assert_eq!(slab_index, self.io_slab.insert(IoRequest::Leaf(leaf_load)));
return Ok(());
return;
}
}
}
}
}

Ok(())
}

fn handle_completion(&mut self, page_set: &mut PageSet, io: CompleteIo) -> anyhow::Result<()> {
fn handle_completion(&mut self, page_set: &mut PageSet, io: CompleteIo) -> std::io::Result<()> {
io.result?;
let slab_index = io.command.user_data as usize;

Expand Down
Loading

0 comments on commit f175544

Please sign in to comment.