diff --git a/benchtop/src/nomt.rs b/benchtop/src/nomt.rs index 61a09cb2..0555207f 100644 --- a/benchtop/src/nomt.rs +++ b/benchtop/src/nomt.rs @@ -109,7 +109,7 @@ impl NomtDB { let mut actual_access: Vec<_> = access.into_iter().collect(); actual_access.sort_by_key(|(k, _)| *k); - let finished = session.finish(actual_access); + let finished = session.finish(actual_access).unwrap(); if self.overlay_window_capacity == 0 { finished.commit(&self.nomt).unwrap(); } else { @@ -183,7 +183,7 @@ impl NomtDB { .collect(); actual_access.sort_by_key(|(k, _)| *k); - let finished = session.finish(actual_access); + let finished = session.finish(actual_access).unwrap(); if self.overlay_window_capacity == 0 { finished.commit(&self.nomt).unwrap(); } else { diff --git a/examples/commit_batch/src/lib.rs b/examples/commit_batch/src/lib.rs index 8a492ba4..c31f7210 100644 --- a/examples/commit_batch/src/lib.rs +++ b/examples/commit_batch/src/lib.rs @@ -58,7 +58,7 @@ impl NomtDB { // The final step in handling a session involves committing all changes // to update the trie structure and obtaining the new root of the trie, // along with a witness and the witnessed operations. - let mut finished = session.finish(actual_access); + let mut finished = session.finish(actual_access).unwrap(); // This field is set because the finished session was configured with // `WitnessMode::read_write`. diff --git a/examples/read_value/src/main.rs b/examples/read_value/src/main.rs index 7b49b593..3e3036f3 100644 --- a/examples/read_value/src/main.rs +++ b/examples/read_value/src/main.rs @@ -26,7 +26,9 @@ fn main() -> Result<()> { // we will prove the read. session.warm_up(key_path); - let mut finished = session.finish(vec![(key_path, KeyReadWrite::Read(value))]); + let mut finished = session + .finish(vec![(key_path, KeyReadWrite::Read(value))]) + .unwrap(); let _witness = finished.take_witness(); finished.commit(&nomt)?; diff --git a/fuzz/fuzz_targets/api_surface.rs b/fuzz/fuzz_targets/api_surface.rs index e2682cba..dd62963d 100644 --- a/fuzz/fuzz_targets/api_surface.rs +++ b/fuzz/fuzz_targets/api_surface.rs @@ -33,7 +33,7 @@ fuzz_target!(|run: Run| { session.warm_up(key_path); } SessionCall::CommitAndProve { keys } => { - let _ = session.finish(keys).commit(&db); + let _ = session.finish(keys).unwrap().commit(&db).unwrap(); break; } SessionCall::Drop => { diff --git a/nomt/src/bitbox/mod.rs b/nomt/src/bitbox/mod.rs index 117bc721..18bd5dcc 100644 --- a/nomt/src/bitbox/mod.rs +++ b/nomt/src/bitbox/mod.rs @@ -558,16 +558,11 @@ impl PageLoader { /// Note that the page loaded by the I/O pool may be a misprobe. You must use /// [`PageLoad::try_complete`] to verify whether the hash-table probe has completed or must be /// tried again. - pub fn probe( - &self, - load: &mut PageLoad, - io_handle: &IoHandle, - user_data: u64, - ) -> anyhow::Result { + pub fn probe(&self, load: &mut PageLoad, io_handle: &IoHandle, user_data: u64) -> bool { let bucket = loop { match load.probe_sequence.next(&self.meta_map) { ProbeResult::Tombstone(_) => continue, - ProbeResult::Empty(_) => return Ok(false), + ProbeResult::Empty(_) => return false, ProbeResult::PossibleHit(bucket) => break BucketIndex(bucket), } }; @@ -580,13 +575,10 @@ impl PageLoader { user_data, }; - match io_handle.send(command) { - Ok(()) => { - load.state = PageLoadState::Submitted; - Ok(true) - } - Err(_) => anyhow::bail!("I/O pool hangup"), - } + // UNWRAP: I/O pool is not expected to hangup. + io_handle.send(command).unwrap(); + load.state = PageLoadState::Submitted; + true } } diff --git a/nomt/src/lib.rs b/nomt/src/lib.rs index 8f3373a0..8bafb874 100644 --- a/nomt/src/lib.rs +++ b/nomt/src/lib.rs @@ -419,7 +419,7 @@ impl Nomt { actuals.push((key, value)); } - sess.finish(actuals).commit(&self)?; + sess.finish(actuals)?.commit(&self)?; Ok(()) } @@ -583,7 +583,10 @@ impl Session { /// considered within the finished session. /// /// This function blocks until the merkle root and changeset are computed. - pub fn finish(mut self, actuals: Vec<(KeyPath, KeyReadWrite)>) -> FinishedSession { + pub fn finish( + mut self, + actuals: Vec<(KeyPath, KeyReadWrite)>, + ) -> std::io::Result { if cfg!(debug_assertions) { // Check that the actuals are sorted by key path. for i in 1..actuals.len() { @@ -606,7 +609,7 @@ impl Session { let merkle_update_handle = self .merkle_updater - .update_and_prove::(compact_actuals, self.witness_mode.0); + .update_and_prove::(compact_actuals, self.witness_mode.0)?; let mut tx = self.store.new_value_tx(); for (path, read_write) in actuals { @@ -616,14 +619,14 @@ impl Session { } let merkle_output = merkle_update_handle.join(); - FinishedSession { + Ok(FinishedSession { value_transaction: tx, merkle_output, rollback_delta, parent_overlay: self.overlay, prev_root: self.prev_root, take_global_guard: self.access_guard.is_some(), - } + }) } } diff --git a/nomt/src/merkle/mod.rs b/nomt/src/merkle/mod.rs index cfba43a0..e9570953 100644 --- a/nomt/src/merkle/mod.rs +++ b/nomt/src/merkle/mod.rs @@ -22,6 +22,7 @@ use crate::{ page_cache::{Page, PageCache, ShardIndex}, rw_pass_cell::WritePassEnvelope, store::{BucketIndex, DirtyPage, SharedMaybeBucketIndex, Store}, + task::{join_task, spawn_task, TaskResult}, HashAlgorithm, Witness, WitnessedOperations, WitnessedPath, WitnessedRead, WitnessedWrite, }; use threadpool::ThreadPool; @@ -214,7 +215,7 @@ impl Updater { self, read_write: Vec<(KeyPath, KeyReadWrite)>, witness: bool, - ) -> UpdateHandle { + ) -> std::io::Result { if let Some(ref warm_up) = self.warm_up { let _ = warm_up.finish_tx.send(()); } @@ -229,9 +230,8 @@ impl Updater { let shard_regions = (0..num_workers).map(ShardIndex::Shard).collect::>(); // receive warm-ups from worker. - // TODO: handle error better. let (warm_ups, warm_page_set) = if let Some(ref warm_up) = self.warm_up { - let output = warm_up.output_rx.recv().unwrap(); + let output = join_task(&warm_up.output_rx)?; (output.paths, Some(output.pages)) } else { (HashMap::new(), None) @@ -262,11 +262,11 @@ impl Updater { spawn_updater::(&self.worker_tp, params, worker_tx.clone()); } - UpdateHandle { + Ok(UpdateHandle { shared, worker_rx, num_workers, - } + }) } } @@ -462,7 +462,7 @@ impl UpdateShared { struct WarmUpHandle { finish_tx: Sender<()>, warmup_tx: Sender, - output_rx: Receiver, + output_rx: Receiver>>, } fn spawn_warm_up( @@ -473,7 +473,11 @@ fn spawn_warm_up( let (output_tx, output_rx) = channel::bounded(1); let (finish_tx, finish_rx) = channel::bounded(1); - worker_tp.execute(move || worker::run_warm_up::(params, warmup_rx, finish_rx, output_tx)); + spawn_task( + &worker_tp, + move || worker::run_warm_up::(params, warmup_rx, finish_rx), + output_tx, + ); WarmUpHandle { warmup_tx, diff --git a/nomt/src/merkle/seek.rs b/nomt/src/merkle/seek.rs index 6c58f900..be7e8897 100644 --- a/nomt/src/merkle/seek.rs +++ b/nomt/src/merkle/seek.rs @@ -329,14 +329,12 @@ impl Seeker { } /// Try to submit as many requests as possible. - pub fn submit_all(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> { + pub fn submit_all(&mut self, page_set: &mut PageSet) { if !self.has_room() { - return Ok(()); + return; } - self.submit_idle_page_loads()?; - self.submit_idle_key_path_requests(page_set)?; - - Ok(()) + self.submit_idle_page_loads(); + self.submit_idle_key_path_requests(page_set); } /// Take the result of a complete request. @@ -365,7 +363,7 @@ impl Seeker { } /// Try to process the next I/O. Does not block the current thread. - pub fn try_recv_page(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> { + pub fn try_recv_page(&mut self, page_set: &mut PageSet) -> std::io::Result<()> { if let Ok(io) = self.io_handle.try_recv() { self.handle_completion(page_set, io)?; } @@ -374,8 +372,11 @@ impl Seeker { } /// Block on processing the next I/O. Blocks the current thread. - pub fn recv_page(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> { - let io = self.io_handle.recv()?; + /// + /// Panics if the I/O pool is down. + pub fn recv_page(&mut self, page_set: &mut PageSet) -> std::io::Result<()> { + // UNWRAP: The I/O pool is not expected to fail before completion is returned. + let io = self.io_handle.recv().unwrap(); self.handle_completion(page_set, io)?; Ok(()) } @@ -393,35 +394,31 @@ impl Seeker { } // resubmit all idle page loads until no more remain. - fn submit_idle_page_loads(&mut self) -> anyhow::Result<()> { + fn submit_idle_page_loads(&mut self) { while let Some(slab_index) = self.idle_page_loads.pop_front() { - self.submit_idle_page_load(slab_index)?; + self.submit_idle_page_load(slab_index); } - - Ok(()) } // submit the next page for each idle key path request until backpressuring or no more progress // can be made. - fn submit_idle_key_path_requests(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> { + fn submit_idle_key_path_requests(&mut self, page_set: &mut PageSet) { while self.has_room() { match self.idle_requests.pop_front() { - None => return Ok(()), + None => return, Some(request_index) => { - self.submit_key_path_request(page_set, request_index)?; + self.submit_key_path_request(page_set, request_index); } } } - - Ok(()) } // submit a page load which is currently in the slab, but idle. - fn submit_idle_page_load(&mut self, slab_index: usize) -> anyhow::Result<()> { + fn submit_idle_page_load(&mut self, slab_index: usize) { if let IoRequest::Merkle(ref mut page_load) = self.io_slab[slab_index] { if !self .page_loader - .probe(page_load, &self.io_handle, slab_index as u64)? + .probe(page_load, &self.io_handle, slab_index as u64) { // PANIC: seek should really never reach fresh pages. we only request a page if // we seek to an internal node above it, and in that case the page really should @@ -434,18 +431,12 @@ impl Seeker { unreachable!() } } - - Ok(()) } // submit the next page for this key path request. - fn submit_key_path_request( - &mut self, - page_set: &mut PageSet, - request_index: usize, - ) -> anyhow::Result<()> { + fn submit_key_path_request(&mut self, page_set: &mut PageSet, request_index: usize) { let i = if request_index < self.processed { - return Ok(()); + return; } else { request_index - self.processed }; @@ -484,7 +475,8 @@ impl Seeker { let load = self.page_loader.start_load(page_id.clone()); vacant_entry.insert(vec![request_index]); let slab_index = self.io_slab.insert(IoRequest::Merkle(load)); - return self.submit_idle_page_load(slab_index); + self.submit_idle_page_load(slab_index); + return; } IoQuery::LeafPage(page_number) => { let vacant_entry = match self.io_waiters.entry(IoQuery::LeafPage(page_number)) { @@ -511,17 +503,15 @@ impl Seeker { vacant_entry.insert(vec![request_index]); assert_eq!(slab_index, self.io_slab.insert(IoRequest::Leaf(leaf_load))); - return Ok(()); + return; } } } } } - - Ok(()) } - fn handle_completion(&mut self, page_set: &mut PageSet, io: CompleteIo) -> anyhow::Result<()> { + fn handle_completion(&mut self, page_set: &mut PageSet, io: CompleteIo) -> std::io::Result<()> { io.result?; let slab_index = io.command.user_data as usize; diff --git a/nomt/src/merkle/worker.rs b/nomt/src/merkle/worker.rs index cebd4e69..c50c90e4 100644 --- a/nomt/src/merkle/worker.rs +++ b/nomt/src/merkle/worker.rs @@ -10,7 +10,7 @@ //! Updates are performed while the next fetch is pending, unless all fetches in //! the range have completed. -use crossbeam::channel::{Receiver, Select, Sender, TryRecvError}; +use crossbeam::channel::{Receiver, Select, TryRecvError}; use nomt_core::{ page_id::ROOT_PAGE_ID, @@ -62,8 +62,7 @@ pub(super) fn run_warm_up( params: WarmUpParams, warmup_rx: Receiver, finish_rx: Receiver<()>, - output_tx: Sender, -) { +) -> std::io::Result { let page_loader = params.store.page_loader(); let io_handle = params.store.io_pool().make_handle(); let page_io_receiver = io_handle.receiver().clone(); @@ -81,14 +80,7 @@ pub(super) fn run_warm_up( true, ); - let result = warm_up_phase(page_io_receiver, seeker, page_set, warmup_rx, finish_rx); - - match result { - Err(_) => return, - Ok(res) => { - let _ = output_tx.send(res); - } - } + warm_up_phase(page_io_receiver, seeker, page_set, warmup_rx, finish_rx) } pub(super) fn run_update(params: UpdateParams) -> anyhow::Result { @@ -130,7 +122,7 @@ fn warm_up_phase( mut page_set: PageSet, warmup_rx: Receiver, finish_rx: Receiver<()>, -) -> anyhow::Result { +) -> std::io::Result { let mut select_all = Select::new(); let warmup_idx = select_all.recv(&warmup_rx); let finish_idx = select_all.recv(&finish_rx); @@ -148,14 +140,17 @@ fn warm_up_phase( continue; } - seeker.submit_all(&mut page_set)?; + seeker.submit_all(&mut page_set); if !seeker.has_room() { // block on interrupt or next page ready. let index = select_no_work.ready(); if index == finish_no_work_idx { match finish_rx.try_recv() { Err(TryRecvError::Empty) => continue, - Err(e) => anyhow::bail!(e), + Err(e) => panic!( + "Wamr-Up worker, unexpected failure of the finish channel: {:?}", + e + ), Ok(()) => break, } } else if index == page_no_work_idx { @@ -169,14 +164,20 @@ fn warm_up_phase( if index == finish_idx { match finish_rx.try_recv() { Err(TryRecvError::Empty) => continue, - Err(e) => anyhow::bail!(e), + Err(e) => panic!( + "Wamr-Up worker, unexpected failure of the finish channel: {:?}", + e + ), Ok(()) => break, } } else if index == warmup_idx { let warm_up_command = match warmup_rx.try_recv() { Ok(command) => command, Err(TryRecvError::Empty) => continue, - Err(e) => anyhow::bail!(e), + Err(e) => panic!( + "Wamr-Up worker, unexpected failure of the warmup channel: {:?}", + e + ), }; seeker.push(warm_up_command.key_path); @@ -193,7 +194,7 @@ fn warm_up_phase( warm_ups.insert(result.key, result); continue; } - seeker.submit_all(&mut page_set)?; + seeker.submit_all(&mut page_set); if seeker.has_live_requests() { seeker.recv_page(&mut page_set)?; } @@ -496,7 +497,7 @@ impl RangeUpdater { } } - seeker.submit_all(page_set)?; + seeker.submit_all(page_set); if !seeker.has_room() && seeker.has_live_requests() { // no way to push work until at least one page fetch has concluded. seeker.recv_page(page_set)?; @@ -515,7 +516,7 @@ impl RangeUpdater { } } else { seeker.push(self.shared.read_write[next_push].0); - seeker.submit_all(page_set)?; + seeker.submit_all(page_set); } } diff --git a/nomt/src/store/mod.rs b/nomt/src/store/mod.rs index ea302019..d02a5011 100644 --- a/nomt/src/store/mod.rs +++ b/nomt/src/store/mod.rs @@ -231,7 +231,7 @@ impl Store { let io_handle = self.io_pool().make_handle(); let mut page_load = page_loader.start_load(page_id); loop { - if !page_loader.probe(&mut page_load, &io_handle, 0)? { + if !page_loader.probe(&mut page_load, &io_handle, 0) { return Ok(None); } diff --git a/nomt/src/store/page_loader.rs b/nomt/src/store/page_loader.rs index 477f9407..a45c4348 100644 --- a/nomt/src/store/page_loader.rs +++ b/nomt/src/store/page_loader.rs @@ -20,12 +20,7 @@ impl PageLoader { /// /// This returns `Ok(true)` if the page request has been submitted and a completion will be /// coming. `Ok(false)` means that the page is guaranteed to be fresh. - pub fn probe( - &self, - load: &mut PageLoad, - io_handle: &IoHandle, - user_data: u64, - ) -> anyhow::Result { + pub fn probe(&self, load: &mut PageLoad, io_handle: &IoHandle, user_data: u64) -> bool { self.inner.probe(load, io_handle, user_data) } } diff --git a/nomt/tests/common/mod.rs b/nomt/tests/common/mod.rs index 8ebe7c76..1aa395bd 100644 --- a/nomt/tests/common/mod.rs +++ b/nomt/tests/common/mod.rs @@ -121,7 +121,7 @@ impl Test { let session = mem::take(&mut self.session).unwrap(); let mut actual_access: Vec<_> = mem::take(&mut self.access).into_iter().collect(); actual_access.sort_by_key(|(k, _)| *k); - let mut finished = session.finish(actual_access); + let mut finished = session.finish(actual_access).unwrap(); let root = finished.root(); let witness = finished.take_witness().unwrap(); finished.commit(&self.nomt).unwrap(); @@ -136,7 +136,7 @@ impl Test { let session = mem::take(&mut self.session).unwrap(); let mut actual_access: Vec<_> = mem::take(&mut self.access).into_iter().collect(); actual_access.sort_by_key(|(k, _)| *k); - let mut finished = session.finish(actual_access); + let mut finished = session.finish(actual_access).unwrap(); let witness = finished.take_witness().unwrap(); self.session = Some( diff --git a/nomt/tests/prev_root_check.rs b/nomt/tests/prev_root_check.rs index 7468aa0f..031abf47 100644 --- a/nomt/tests/prev_root_check.rs +++ b/nomt/tests/prev_root_check.rs @@ -23,10 +23,14 @@ fn setup_nomt(path: &str) -> Nomt { fn test_prev_root_commits() { let nomt = setup_nomt("prev_root_commits"); let session1 = nomt.begin_session(SessionParams::default()); - let finished1 = session1.finish(vec![([1; 32], KeyReadWrite::Write(Some(vec![1, 2, 3])))]); + let finished1 = session1 + .finish(vec![([1; 32], KeyReadWrite::Write(Some(vec![1, 2, 3])))]) + .unwrap(); let session2 = nomt.begin_session(SessionParams::default()); - let finished2 = session2.finish(vec![([1; 32], KeyReadWrite::Write(Some(vec![1, 2, 3])))]); + let finished2 = session2 + .finish(vec![([1; 32], KeyReadWrite::Write(Some(vec![1, 2, 3])))]) + .unwrap(); finished1.commit(&nomt).unwrap(); @@ -37,11 +41,15 @@ fn test_prev_root_commits() { fn test_prev_root_overlay_invalidated() { let nomt = setup_nomt("prev_root_overlay_invalidated"); let session1 = nomt.begin_session(SessionParams::default()); - let finished1 = session1.finish(vec![([1; 32], KeyReadWrite::Write(Some(vec![1, 2, 3])))]); + let finished1 = session1 + .finish(vec![([1; 32], KeyReadWrite::Write(Some(vec![1, 2, 3])))]) + .unwrap(); let overlay1 = finished1.into_overlay(); let session2 = nomt.begin_session(SessionParams::default()); - let finished2 = session2.finish(vec![([1; 32], KeyReadWrite::Write(Some(vec![1, 2, 3])))]); + let finished2 = session2 + .finish(vec![([1; 32], KeyReadWrite::Write(Some(vec![1, 2, 3])))]) + .unwrap(); finished2.commit(&nomt).unwrap(); @@ -52,11 +60,15 @@ fn test_prev_root_overlay_invalidated() { fn test_prev_root_overlay_invalidates_session() { let nomt = setup_nomt("prev_root_overlays"); let session1 = nomt.begin_session(SessionParams::default()); - let finished1 = session1.finish(vec![([1; 32], KeyReadWrite::Write(Some(vec![1, 2, 3])))]); + let finished1 = session1 + .finish(vec![([1; 32], KeyReadWrite::Write(Some(vec![1, 2, 3])))]) + .unwrap(); let overlay1 = finished1.into_overlay(); let session2 = nomt.begin_session(SessionParams::default()); - let finished2 = session2.finish(vec![([1; 32], KeyReadWrite::Write(Some(vec![1, 2, 3])))]); + let finished2 = session2 + .finish(vec![([1; 32], KeyReadWrite::Write(Some(vec![1, 2, 3])))]) + .unwrap(); overlay1.commit(&nomt).unwrap(); diff --git a/nomt/tests/rollback.rs b/nomt/tests/rollback.rs index 227bcf57..a3216ca5 100644 --- a/nomt/tests/rollback.rs +++ b/nomt/tests/rollback.rs @@ -42,10 +42,12 @@ fn test_rollback_disabled() { ); let session = nomt.begin_session(SessionParams::default()); - let finished = session.finish(vec![( - hex!("0000000000000000000000000000000000000000000000000000000000000001"), - KeyReadWrite::Write(Some(vec![1])), - )]); + let finished = session + .finish(vec![( + hex!("0000000000000000000000000000000000000000000000000000000000000001"), + KeyReadWrite::Write(Some(vec![1])), + )]) + .unwrap(); finished.commit(&nomt).unwrap(); let result = nomt.rollback(1); @@ -63,10 +65,12 @@ fn test_rollback_to_initial() { ); let session = nomt.begin_session(SessionParams::default()); - let finished = session.finish(vec![( - hex!("0000000000000000000000000000000000000000000000000000000000000001"), - KeyReadWrite::Write(Some(vec![1])), - )]); + let finished = session + .finish(vec![( + hex!("0000000000000000000000000000000000000000000000000000000000000001"), + KeyReadWrite::Write(Some(vec![1])), + )]) + .unwrap(); finished.commit(&nomt).unwrap(); assert_eq!( @@ -186,7 +190,7 @@ impl TestPlan { operations.push((key.clone(), KeyReadWrite::Write(None))); } operations.sort_by_key(|(key, _)| key.clone()); - let finished = session.finish(operations); + let finished = session.finish(operations).unwrap(); finished.commit(&nomt).unwrap(); let post_root = nomt.root(); expected_roots.push(post_root.into_inner()); @@ -212,7 +216,7 @@ impl TestPlan { operations.push((key.clone(), KeyReadWrite::Write(None))); } operations.sort_by_key(|(key, _)| key.clone()); - let finished = session.finish(operations); + let finished = session.finish(operations).unwrap(); finished.commit(&nomt).unwrap(); } } @@ -428,10 +432,12 @@ fn test_rollback_change_history() { let session = nomt.begin_session(SessionParams::default()); let new_key = KeyPath::from([0xAA; 32]); let new_value = vec![0xBB; 32]; - let finished = session.finish(vec![( - new_key, - KeyReadWrite::Write(Some(new_value.clone())), - )]); + let finished = session + .finish(vec![( + new_key, + KeyReadWrite::Write(Some(new_value.clone())), + )]) + .unwrap(); finished.commit(&nomt).unwrap(); // Verify the new state @@ -461,10 +467,12 @@ fn test_rollback_read_then_write() { let session = nomt.begin_session(SessionParams::default()); let key = KeyPath::from([0xAA; 32]); let original_value = vec![0xBB; 32]; - let finished = session.finish(vec![( - key, - KeyReadWrite::Write(Some(original_value.clone())), - )]); + let finished = session + .finish(vec![( + key, + KeyReadWrite::Write(Some(original_value.clone())), + )]) + .unwrap(); finished.commit(&nomt).unwrap(); // Then, create a new commit with a read-then-write actual specifying the **wrong** prior value. @@ -474,10 +482,12 @@ fn test_rollback_read_then_write() { let session = nomt.begin_session(SessionParams::default()); assert_eq!(session.read(key).unwrap(), Some(original_value.clone())); let new_value = vec![0xCC; 32]; - let finished = session.finish(vec![( - key, - KeyReadWrite::ReadThenWrite(None, Some(new_value.clone())), - )]); + let finished = session + .finish(vec![( + key, + KeyReadWrite::ReadThenWrite(None, Some(new_value.clone())), + )]) + .unwrap(); finished.commit(&nomt).unwrap(); // Rollback and expect the value from the ReadThenWrite operation to be restored. diff --git a/torture/src/agent.rs b/torture/src/agent.rs index 4d6010ed..4f6c7ebe 100644 --- a/torture/src/agent.rs +++ b/torture/src/agent.rs @@ -294,7 +294,7 @@ impl Agent { } // Perform the commit. - let commit_result = tokio::task::block_in_place(|| session.finish(actuals).commit(&nomt)); + let commit_result = tokio::task::block_in_place(|| session.finish(actuals)?.commit(&nomt)); // Classify the result into one of the outcome bins. let outcome = match commit_result {