Skip to content

Commit

Permalink
chore: migrate lower-level components to io err
Browse files Browse the repository at this point in the history
This reduces the scope of errors that could be thrown by the components
deliberately.
  • Loading branch information
pepyakin committed Jan 29, 2025
1 parent 594f8e7 commit 859377b
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 36 deletions.
2 changes: 1 addition & 1 deletion nomt/src/beatree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ impl SyncController {
/// with updating the manifest.
///
/// This must be called after [`Self::begin_sync`].
pub fn wait_pre_meta(&mut self) -> anyhow::Result<SyncData> {
pub fn wait_pre_meta(&mut self) -> std::io::Result<SyncData> {
self.inner.sync.bbn_fsync.wait()?;
self.inner.sync.ln_fsync.wait()?;

Expand Down
10 changes: 5 additions & 5 deletions nomt/src/bitbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ impl DB {
pub struct SyncController {
db: DB,
/// The channel to send the result of the WAL writeout. Option is to allow `take`.
wal_result_tx: Option<Sender<anyhow::Result<()>>>,
wal_result_tx: Option<Sender<std::io::Result<()>>>,
/// The channel to receive the result of the WAL writeout.
wal_result_rx: Receiver<anyhow::Result<()>>,
wal_result_rx: Receiver<std::io::Result<()>>,
/// The pages along with their page numbers to write out to the HT file.
ht_to_write: Arc<Mutex<Option<Vec<(u64, Arc<FatPage>)>>>>,
}
Expand Down Expand Up @@ -323,7 +323,7 @@ impl SyncController {
});
}

fn spawn_wal_writeout(wal_result_tx: Sender<anyhow::Result<()>>, bitbox: DB) {
fn spawn_wal_writeout(wal_result_tx: Sender<std::io::Result<()>>, bitbox: DB) {
let bitbox = bitbox.clone();
let tp = bitbox.shared.sync_tp.clone();
tp.execute(move || {
Expand All @@ -337,7 +337,7 @@ impl SyncController {
/// Wait for the pre-meta WAL file to be written out.
///
/// Must be invoked by the sync thread. Blocking.
pub fn wait_pre_meta(&self) -> anyhow::Result<()> {
pub fn wait_pre_meta(&self) -> std::io::Result<()> {
match self.wal_result_rx.recv() {
Ok(wal_result) => wal_result,
Err(_) => panic!("unexpected hungup"),
Expand All @@ -348,7 +348,7 @@ impl SyncController {
///
/// Has to be called after the manifest is updated. Must be invoked by the sync
/// thread. Blocking.
pub fn post_meta(&self, io_handle: IoHandle) -> anyhow::Result<()> {
pub fn post_meta(&self, io_handle: IoHandle) -> std::io::Result<()> {
let ht_pages = self.ht_to_write.lock().take().unwrap();
// Writeout the HT pages and truncate the WAL file.
//
Expand Down
6 changes: 3 additions & 3 deletions nomt/src/bitbox/writeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{

use crate::io::{FatPage, IoCommand, IoHandle, IoKind};

pub(super) fn write_wal(mut wal_fd: &File, wal_blob: &[u8]) -> anyhow::Result<()> {
pub(super) fn write_wal(mut wal_fd: &File, wal_blob: &[u8]) -> std::io::Result<()> {
wal_fd.set_len(0)?;
wal_fd.seek(SeekFrom::Start(0))?;
wal_fd.write_all(wal_blob)?;
Expand All @@ -25,7 +25,7 @@ pub(super) fn write_wal(mut wal_fd: &File, wal_blob: &[u8]) -> anyhow::Result<()
/// Truncates the WAL file to zero length.
///
/// Conditionally syncs the file to disk.
pub(super) fn truncate_wal(mut wal_fd: &File, do_sync: bool) -> anyhow::Result<()> {
pub(super) fn truncate_wal(mut wal_fd: &File, do_sync: bool) -> std::io::Result<()> {
wal_fd.set_len(0)?;
wal_fd.seek(SeekFrom::Start(0))?;
if do_sync {
Expand All @@ -38,7 +38,7 @@ pub(super) fn write_ht(
io_handle: IoHandle,
ht_fd: &File,
mut ht: Vec<(u64, Arc<FatPage>)>,
) -> anyhow::Result<()> {
) -> std::io::Result<()> {
let mut sent = 0;

ht.sort_unstable_by_key(|item| item.0);
Expand Down
6 changes: 3 additions & 3 deletions nomt/src/rollback/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl Rollback {
&self,
new_start_live: Option<u64>,
new_end_live: Option<u64>,
) -> anyhow::Result<()> {
) -> std::io::Result<()> {
if let Some(new_start_live) = new_start_live {
let mut seglog = self.shared.seglog.lock();
seglog.prune_back(new_start_live.into())?;
Expand All @@ -279,7 +279,7 @@ pub struct SyncController {
rollback: Rollback,
wd: Arc<Mutex<Option<WriteoutData>>>,
wd_cv: Arc<Condvar>,
post_meta: Arc<Mutex<Option<anyhow::Result<()>>>>,
post_meta: Arc<Mutex<Option<std::io::Result<()>>>>,
post_meta_cv: Arc<Condvar>,
}

Expand Down Expand Up @@ -343,7 +343,7 @@ impl SyncController {
}

/// Wait until the post-meta writeout completes.
pub fn wait_post_meta(&self) -> anyhow::Result<()> {
pub fn wait_post_meta(&self) -> std::io::Result<()> {
let mut post_meta = self.post_meta.lock();
self.post_meta_cv
.wait_while(&mut post_meta, |post_meta| post_meta.is_none());
Expand Down
18 changes: 11 additions & 7 deletions nomt/src/seglog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ impl SegmentedLog {
/// # Panics
///
/// The new live range must be a subset of the old live range.
pub fn prune_back(&mut self, new_start_live: RecordId) -> Result<()> {
pub fn prune_back(&mut self, new_start_live: RecordId) -> std::io::Result<()> {
if new_start_live.is_nil() {
self.start_live = RecordId::nil();
self.end_live = RecordId::nil();
Expand Down Expand Up @@ -333,7 +333,7 @@ impl SegmentedLog {
///
/// It's possible to return remove all items from the log, i.e. to reset the log to an empty
/// state, by setting the new live end to zero. That would update the live range to `(0, 0)`.
pub fn prune_front(&mut self, new_end_live: RecordId) -> Result<()> {
pub fn prune_front(&mut self, new_end_live: RecordId) -> std::io::Result<()> {
if new_end_live.is_nil() {
self.start_live = RecordId::nil();
self.end_live = RecordId::nil();
Expand Down Expand Up @@ -390,7 +390,7 @@ impl SegmentedLog {
Ok(())
}

fn remove_all_segments(&mut self) -> Result<()> {
fn remove_all_segments(&mut self) -> std::io::Result<()> {
let _ = self.head_segment_writer.take();

for segment in &self.segments {
Expand Down Expand Up @@ -602,7 +602,7 @@ impl Recovery {
}

/// Scans the segment file and returns the file offset of the end of the specified record.
fn scan_record_end(path: &Path, end_live: RecordId) -> Result<Option<u64>> {
fn scan_record_end(path: &Path, end_live: RecordId) -> std::io::Result<Option<u64>> {
let mut seg_reader = SegmentFileReader::new(File::open(path)?, None)?;
loop {
let header = match seg_reader.read_header()? {
Expand All @@ -622,11 +622,15 @@ fn scan_record_end(path: &Path, end_live: RecordId) -> Result<Option<u64>> {
}
}

fn truncate_head_segment(path: &Path, new_end_live: RecordId) -> Result<SegmentFileWriter> {
fn truncate_head_segment(
path: &Path,
new_end_live: RecordId,
) -> std::io::Result<SegmentFileWriter> {
let end = match scan_record_end(path, new_end_live)? {
None => {
return Err(anyhow::anyhow!(
"Failed to find the last live record in the head segment"
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
anyhow::anyhow!("Failed to find the last live record in the head segment"),
));
}
Some(offset) => offset,
Expand Down
33 changes: 20 additions & 13 deletions nomt/src/seglog/segment_rw.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use anyhow::Result;
use std::{
fs::File,
io::{BufReader, Read, Seek, SeekFrom, Write},
Expand All @@ -25,7 +24,11 @@ impl SegmentFileWriter {
Self { file, file_size }
}

pub fn write_header(&mut self, payload_length: u32, record_id: RecordId) -> Result<()> {
pub fn write_header(
&mut self,
payload_length: u32,
record_id: RecordId,
) -> std::io::Result<()> {
let mut header = [0u8; HEADER_SIZE as usize];
{
let mut header = RecordHeaderMut::new(&mut header);
Expand All @@ -37,7 +40,7 @@ impl SegmentFileWriter {
Ok(())
}

pub fn write_payload(&mut self, payload: &[u8]) -> Result<()> {
pub fn write_payload(&mut self, payload: &[u8]) -> std::io::Result<()> {
self.file.write_all(payload)?;
// Calculate the next aligned position.
let record_alignment = RECORD_ALIGNMENT as u64;
Expand All @@ -55,7 +58,7 @@ impl SegmentFileWriter {
Ok(())
}

pub fn fsync(&mut self) -> Result<()> {
pub fn fsync(&mut self) -> std::io::Result<()> {
self.file.sync_data()?;
Ok(())
}
Expand Down Expand Up @@ -85,7 +88,7 @@ pub struct SegmentFileReader {
}

impl SegmentFileReader {
pub fn new(file: File, file_size: Option<u64>) -> Result<Self> {
pub fn new(file: File, file_size: Option<u64>) -> std::io::Result<Self> {
let file_size = if let Some(file_size) = file_size {
file_size
} else {
Expand All @@ -103,7 +106,7 @@ impl SegmentFileReader {
/// Reads the header of the record.
///
/// Returns `None` if the end of the file is reached.
pub fn read_header(&mut self) -> Result<Option<RecordHeader>> {
pub fn read_header(&mut self) -> std::io::Result<Option<RecordHeader>> {
if self.next_pos.unwrap_or(0) >= self.file_size {
return Ok(None);
}
Expand All @@ -114,9 +117,13 @@ impl SegmentFileReader {
// will read the next header.
self.buf_reader
.seek(SeekFrom::Current(-(HEADER_SIZE as i64)))?;
return Err(anyhow::anyhow!(
"Record payload length is too large: {}",
header.payload_length()

return Err(std::io::Error::new(
std::io::ErrorKind::Other,
anyhow::anyhow!(
"Record payload length is too large: {}",
header.payload_length()
),
));
}
self.payload_length = Some(header.payload_length());
Expand All @@ -134,15 +141,15 @@ impl SegmentFileReader {
/// Skips the payload of the record.
///
/// Must be called after `read_header`.
pub fn skip_payload(&mut self) -> Result<()> {
pub fn skip_payload(&mut self) -> std::io::Result<()> {
self.seek_next()?;
Ok(())
}

/// Reads the payload of the record.
///
/// Must be called after `read_header`.
pub fn read_payload(&mut self, payload_buf: &mut Vec<u8>) -> Result<()> {
pub fn read_payload(&mut self, payload_buf: &mut Vec<u8>) -> std::io::Result<()> {
payload_buf.resize(self.payload_length.unwrap() as usize, 0);
self.buf_reader.read_exact(payload_buf)?;
self.seek_next()?;
Expand All @@ -152,14 +159,14 @@ impl SegmentFileReader {
/// Positions the reader at the beginning of the next record.
///
/// Must be called after `read_header`.
pub fn seek_next(&mut self) -> Result<()> {
pub fn seek_next(&mut self) -> std::io::Result<()> {
self.buf_reader
.seek(SeekFrom::Start(self.next_pos.unwrap()))?;
Ok(())
}

/// Returns the current position of the reader.
pub fn pos(&mut self) -> Result<u64> {
pub fn pos(&mut self) -> std::io::Result<u64> {
Ok(self.buf_reader.stream_position()?)
}
}
Expand Down
4 changes: 2 additions & 2 deletions nomt/src/store/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ impl Meta {
}
}

pub fn read(page_pool: &PagePool, fd: &File) -> Result<Self> {
pub fn read(page_pool: &PagePool, fd: &File) -> std::io::Result<Self> {
let page = io::read_page(page_pool, fd, 0)?;
let meta = Meta::decode(&page[..META_SIZE]);
Ok(meta)
}

pub fn write(page_pool: &PagePool, fd: &File, meta: &Meta) -> Result<()> {
pub fn write(page_pool: &PagePool, fd: &File, meta: &Meta) -> std::io::Result<()> {
let mut page = page_pool.alloc_fat_page();
meta.encode_to(&mut page.as_mut()[..META_SIZE]);
fd.write_all_at(&page[..], 0)?;
Expand Down
2 changes: 1 addition & 1 deletion nomt/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ impl Store {
self.shared
.poisoned
.store(true, std::sync::atomic::Ordering::Relaxed);
return Err(e);
anyhow::bail!(e);
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion nomt/src/store/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl Sync {
rollback: Option<rollback::Rollback>,
page_cache: PageCache,
updated_pages: impl IntoIterator<Item = (PageId, DirtyPage)> + Send + 'static,
) -> anyhow::Result<()> {
) -> std::io::Result<()> {
self.sync_seqn += 1;
let sync_seqn = self.sync_seqn;

Expand Down

0 comments on commit 859377b

Please sign in to comment.