From 859377b9de773b8aaafb16b8d2d4c7dc6d4dfed7 Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Wed, 29 Jan 2025 15:57:03 +0000 Subject: [PATCH] chore: migrate lower-level components to io err This reduces the scope of errors that could be thrown by the components deliberately. --- nomt/src/beatree/mod.rs | 2 +- nomt/src/bitbox/mod.rs | 10 +++++----- nomt/src/bitbox/writeout.rs | 6 +++--- nomt/src/rollback/mod.rs | 6 +++--- nomt/src/seglog/mod.rs | 18 +++++++++++------- nomt/src/seglog/segment_rw.rs | 33 ++++++++++++++++++++------------- nomt/src/store/meta.rs | 4 ++-- nomt/src/store/mod.rs | 2 +- nomt/src/store/sync.rs | 2 +- 9 files changed, 47 insertions(+), 36 deletions(-) diff --git a/nomt/src/beatree/mod.rs b/nomt/src/beatree/mod.rs index b24bbe55..14afbc11 100644 --- a/nomt/src/beatree/mod.rs +++ b/nomt/src/beatree/mod.rs @@ -457,7 +457,7 @@ impl SyncController { /// with updating the manifest. /// /// This must be called after [`Self::begin_sync`]. - pub fn wait_pre_meta(&mut self) -> anyhow::Result { + pub fn wait_pre_meta(&mut self) -> std::io::Result { self.inner.sync.bbn_fsync.wait()?; self.inner.sync.ln_fsync.wait()?; diff --git a/nomt/src/bitbox/mod.rs b/nomt/src/bitbox/mod.rs index b954d0d4..016ba3e3 100644 --- a/nomt/src/bitbox/mod.rs +++ b/nomt/src/bitbox/mod.rs @@ -273,9 +273,9 @@ impl DB { pub struct SyncController { db: DB, /// The channel to send the result of the WAL writeout. Option is to allow `take`. - wal_result_tx: Option>>, + wal_result_tx: Option>>, /// The channel to receive the result of the WAL writeout. - wal_result_rx: Receiver>, + wal_result_rx: Receiver>, /// The pages along with their page numbers to write out to the HT file. ht_to_write: Arc)>>>>, } @@ -323,7 +323,7 @@ impl SyncController { }); } - fn spawn_wal_writeout(wal_result_tx: Sender>, bitbox: DB) { + fn spawn_wal_writeout(wal_result_tx: Sender>, bitbox: DB) { let bitbox = bitbox.clone(); let tp = bitbox.shared.sync_tp.clone(); tp.execute(move || { @@ -337,7 +337,7 @@ impl SyncController { /// Wait for the pre-meta WAL file to be written out. /// /// Must be invoked by the sync thread. Blocking. - pub fn wait_pre_meta(&self) -> anyhow::Result<()> { + pub fn wait_pre_meta(&self) -> std::io::Result<()> { match self.wal_result_rx.recv() { Ok(wal_result) => wal_result, Err(_) => panic!("unexpected hungup"), @@ -348,7 +348,7 @@ impl SyncController { /// /// Has to be called after the manifest is updated. Must be invoked by the sync /// thread. Blocking. - pub fn post_meta(&self, io_handle: IoHandle) -> anyhow::Result<()> { + pub fn post_meta(&self, io_handle: IoHandle) -> std::io::Result<()> { let ht_pages = self.ht_to_write.lock().take().unwrap(); // Writeout the HT pages and truncate the WAL file. // diff --git a/nomt/src/bitbox/writeout.rs b/nomt/src/bitbox/writeout.rs index db678355..ab42eb12 100644 --- a/nomt/src/bitbox/writeout.rs +++ b/nomt/src/bitbox/writeout.rs @@ -14,7 +14,7 @@ use std::{ use crate::io::{FatPage, IoCommand, IoHandle, IoKind}; -pub(super) fn write_wal(mut wal_fd: &File, wal_blob: &[u8]) -> anyhow::Result<()> { +pub(super) fn write_wal(mut wal_fd: &File, wal_blob: &[u8]) -> std::io::Result<()> { wal_fd.set_len(0)?; wal_fd.seek(SeekFrom::Start(0))?; wal_fd.write_all(wal_blob)?; @@ -25,7 +25,7 @@ pub(super) fn write_wal(mut wal_fd: &File, wal_blob: &[u8]) -> anyhow::Result<() /// Truncates the WAL file to zero length. /// /// Conditionally syncs the file to disk. -pub(super) fn truncate_wal(mut wal_fd: &File, do_sync: bool) -> anyhow::Result<()> { +pub(super) fn truncate_wal(mut wal_fd: &File, do_sync: bool) -> std::io::Result<()> { wal_fd.set_len(0)?; wal_fd.seek(SeekFrom::Start(0))?; if do_sync { @@ -38,7 +38,7 @@ pub(super) fn write_ht( io_handle: IoHandle, ht_fd: &File, mut ht: Vec<(u64, Arc)>, -) -> anyhow::Result<()> { +) -> std::io::Result<()> { let mut sent = 0; ht.sort_unstable_by_key(|item| item.0); diff --git a/nomt/src/rollback/mod.rs b/nomt/src/rollback/mod.rs index 3c3514e7..ca74e4ab 100644 --- a/nomt/src/rollback/mod.rs +++ b/nomt/src/rollback/mod.rs @@ -262,7 +262,7 @@ impl Rollback { &self, new_start_live: Option, new_end_live: Option, - ) -> anyhow::Result<()> { + ) -> std::io::Result<()> { if let Some(new_start_live) = new_start_live { let mut seglog = self.shared.seglog.lock(); seglog.prune_back(new_start_live.into())?; @@ -279,7 +279,7 @@ pub struct SyncController { rollback: Rollback, wd: Arc>>, wd_cv: Arc, - post_meta: Arc>>>, + post_meta: Arc>>>, post_meta_cv: Arc, } @@ -343,7 +343,7 @@ impl SyncController { } /// Wait until the post-meta writeout completes. - pub fn wait_post_meta(&self) -> anyhow::Result<()> { + pub fn wait_post_meta(&self) -> std::io::Result<()> { let mut post_meta = self.post_meta.lock(); self.post_meta_cv .wait_while(&mut post_meta, |post_meta| post_meta.is_none()); diff --git a/nomt/src/seglog/mod.rs b/nomt/src/seglog/mod.rs index 5bb5cd05..46bb1d3e 100644 --- a/nomt/src/seglog/mod.rs +++ b/nomt/src/seglog/mod.rs @@ -277,7 +277,7 @@ impl SegmentedLog { /// # Panics /// /// The new live range must be a subset of the old live range. - pub fn prune_back(&mut self, new_start_live: RecordId) -> Result<()> { + pub fn prune_back(&mut self, new_start_live: RecordId) -> std::io::Result<()> { if new_start_live.is_nil() { self.start_live = RecordId::nil(); self.end_live = RecordId::nil(); @@ -333,7 +333,7 @@ impl SegmentedLog { /// /// It's possible to return remove all items from the log, i.e. to reset the log to an empty /// state, by setting the new live end to zero. That would update the live range to `(0, 0)`. - pub fn prune_front(&mut self, new_end_live: RecordId) -> Result<()> { + pub fn prune_front(&mut self, new_end_live: RecordId) -> std::io::Result<()> { if new_end_live.is_nil() { self.start_live = RecordId::nil(); self.end_live = RecordId::nil(); @@ -390,7 +390,7 @@ impl SegmentedLog { Ok(()) } - fn remove_all_segments(&mut self) -> Result<()> { + fn remove_all_segments(&mut self) -> std::io::Result<()> { let _ = self.head_segment_writer.take(); for segment in &self.segments { @@ -602,7 +602,7 @@ impl Recovery { } /// Scans the segment file and returns the file offset of the end of the specified record. -fn scan_record_end(path: &Path, end_live: RecordId) -> Result> { +fn scan_record_end(path: &Path, end_live: RecordId) -> std::io::Result> { let mut seg_reader = SegmentFileReader::new(File::open(path)?, None)?; loop { let header = match seg_reader.read_header()? { @@ -622,11 +622,15 @@ fn scan_record_end(path: &Path, end_live: RecordId) -> Result> { } } -fn truncate_head_segment(path: &Path, new_end_live: RecordId) -> Result { +fn truncate_head_segment( + path: &Path, + new_end_live: RecordId, +) -> std::io::Result { let end = match scan_record_end(path, new_end_live)? { None => { - return Err(anyhow::anyhow!( - "Failed to find the last live record in the head segment" + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + anyhow::anyhow!("Failed to find the last live record in the head segment"), )); } Some(offset) => offset, diff --git a/nomt/src/seglog/segment_rw.rs b/nomt/src/seglog/segment_rw.rs index 52670d3d..4ca285fa 100644 --- a/nomt/src/seglog/segment_rw.rs +++ b/nomt/src/seglog/segment_rw.rs @@ -1,4 +1,3 @@ -use anyhow::Result; use std::{ fs::File, io::{BufReader, Read, Seek, SeekFrom, Write}, @@ -25,7 +24,11 @@ impl SegmentFileWriter { Self { file, file_size } } - pub fn write_header(&mut self, payload_length: u32, record_id: RecordId) -> Result<()> { + pub fn write_header( + &mut self, + payload_length: u32, + record_id: RecordId, + ) -> std::io::Result<()> { let mut header = [0u8; HEADER_SIZE as usize]; { let mut header = RecordHeaderMut::new(&mut header); @@ -37,7 +40,7 @@ impl SegmentFileWriter { Ok(()) } - pub fn write_payload(&mut self, payload: &[u8]) -> Result<()> { + pub fn write_payload(&mut self, payload: &[u8]) -> std::io::Result<()> { self.file.write_all(payload)?; // Calculate the next aligned position. let record_alignment = RECORD_ALIGNMENT as u64; @@ -55,7 +58,7 @@ impl SegmentFileWriter { Ok(()) } - pub fn fsync(&mut self) -> Result<()> { + pub fn fsync(&mut self) -> std::io::Result<()> { self.file.sync_data()?; Ok(()) } @@ -85,7 +88,7 @@ pub struct SegmentFileReader { } impl SegmentFileReader { - pub fn new(file: File, file_size: Option) -> Result { + pub fn new(file: File, file_size: Option) -> std::io::Result { let file_size = if let Some(file_size) = file_size { file_size } else { @@ -103,7 +106,7 @@ impl SegmentFileReader { /// Reads the header of the record. /// /// Returns `None` if the end of the file is reached. - pub fn read_header(&mut self) -> Result> { + pub fn read_header(&mut self) -> std::io::Result> { if self.next_pos.unwrap_or(0) >= self.file_size { return Ok(None); } @@ -114,9 +117,13 @@ impl SegmentFileReader { // will read the next header. self.buf_reader .seek(SeekFrom::Current(-(HEADER_SIZE as i64)))?; - return Err(anyhow::anyhow!( - "Record payload length is too large: {}", - header.payload_length() + + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + anyhow::anyhow!( + "Record payload length is too large: {}", + header.payload_length() + ), )); } self.payload_length = Some(header.payload_length()); @@ -134,7 +141,7 @@ impl SegmentFileReader { /// Skips the payload of the record. /// /// Must be called after `read_header`. - pub fn skip_payload(&mut self) -> Result<()> { + pub fn skip_payload(&mut self) -> std::io::Result<()> { self.seek_next()?; Ok(()) } @@ -142,7 +149,7 @@ impl SegmentFileReader { /// Reads the payload of the record. /// /// Must be called after `read_header`. - pub fn read_payload(&mut self, payload_buf: &mut Vec) -> Result<()> { + pub fn read_payload(&mut self, payload_buf: &mut Vec) -> std::io::Result<()> { payload_buf.resize(self.payload_length.unwrap() as usize, 0); self.buf_reader.read_exact(payload_buf)?; self.seek_next()?; @@ -152,14 +159,14 @@ impl SegmentFileReader { /// Positions the reader at the beginning of the next record. /// /// Must be called after `read_header`. - pub fn seek_next(&mut self) -> Result<()> { + pub fn seek_next(&mut self) -> std::io::Result<()> { self.buf_reader .seek(SeekFrom::Start(self.next_pos.unwrap()))?; Ok(()) } /// Returns the current position of the reader. - pub fn pos(&mut self) -> Result { + pub fn pos(&mut self) -> std::io::Result { Ok(self.buf_reader.stream_position()?) } } diff --git a/nomt/src/store/meta.rs b/nomt/src/store/meta.rs index 4b8a71a3..94d125f1 100644 --- a/nomt/src/store/meta.rs +++ b/nomt/src/store/meta.rs @@ -137,13 +137,13 @@ impl Meta { } } - pub fn read(page_pool: &PagePool, fd: &File) -> Result { + pub fn read(page_pool: &PagePool, fd: &File) -> std::io::Result { let page = io::read_page(page_pool, fd, 0)?; let meta = Meta::decode(&page[..META_SIZE]); Ok(meta) } - pub fn write(page_pool: &PagePool, fd: &File, meta: &Meta) -> Result<()> { + pub fn write(page_pool: &PagePool, fd: &File, meta: &Meta) -> std::io::Result<()> { let mut page = page_pool.alloc_fat_page(); meta.encode_to(&mut page.as_mut()[..META_SIZE]); fd.write_all_at(&page[..], 0)?; diff --git a/nomt/src/store/mod.rs b/nomt/src/store/mod.rs index ea302019..62f8e27e 100644 --- a/nomt/src/store/mod.rs +++ b/nomt/src/store/mod.rs @@ -306,7 +306,7 @@ impl Store { self.shared .poisoned .store(true, std::sync::atomic::Ordering::Relaxed); - return Err(e); + anyhow::bail!(e); } Ok(()) } diff --git a/nomt/src/store/sync.rs b/nomt/src/store/sync.rs index d8a19e11..f955c729 100644 --- a/nomt/src/store/sync.rs +++ b/nomt/src/store/sync.rs @@ -37,7 +37,7 @@ impl Sync { rollback: Option, page_cache: PageCache, updated_pages: impl IntoIterator + Send + 'static, - ) -> anyhow::Result<()> { + ) -> std::io::Result<()> { self.sync_seqn += 1; let sync_seqn = self.sync_seqn;