Skip to content

Commit 8318629

Browse files
committed
chore: migrate lower-level components to io err
This reduces the scope of errors that could be thrown by the components deliberately.
1 parent 594f8e7 commit 8318629

File tree

9 files changed

+47
-36
lines changed

9 files changed

+47
-36
lines changed

nomt/src/beatree/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ impl SyncController {
457457
/// with updating the manifest.
458458
///
459459
/// This must be called after [`Self::begin_sync`].
460-
pub fn wait_pre_meta(&mut self) -> anyhow::Result<SyncData> {
460+
pub fn wait_pre_meta(&mut self) -> std::io::Result<SyncData> {
461461
self.inner.sync.bbn_fsync.wait()?;
462462
self.inner.sync.ln_fsync.wait()?;
463463

nomt/src/bitbox/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,9 @@ impl DB {
273273
pub struct SyncController {
274274
db: DB,
275275
/// The channel to send the result of the WAL writeout. Option is to allow `take`.
276-
wal_result_tx: Option<Sender<anyhow::Result<()>>>,
276+
wal_result_tx: Option<Sender<std::io::Result<()>>>,
277277
/// The channel to receive the result of the WAL writeout.
278-
wal_result_rx: Receiver<anyhow::Result<()>>,
278+
wal_result_rx: Receiver<std::io::Result<()>>,
279279
/// The pages along with their page numbers to write out to the HT file.
280280
ht_to_write: Arc<Mutex<Option<Vec<(u64, Arc<FatPage>)>>>>,
281281
}
@@ -323,7 +323,7 @@ impl SyncController {
323323
});
324324
}
325325

326-
fn spawn_wal_writeout(wal_result_tx: Sender<anyhow::Result<()>>, bitbox: DB) {
326+
fn spawn_wal_writeout(wal_result_tx: Sender<std::io::Result<()>>, bitbox: DB) {
327327
let bitbox = bitbox.clone();
328328
let tp = bitbox.shared.sync_tp.clone();
329329
tp.execute(move || {
@@ -337,7 +337,7 @@ impl SyncController {
337337
/// Wait for the pre-meta WAL file to be written out.
338338
///
339339
/// Must be invoked by the sync thread. Blocking.
340-
pub fn wait_pre_meta(&self) -> anyhow::Result<()> {
340+
pub fn wait_pre_meta(&self) -> std::io::Result<()> {
341341
match self.wal_result_rx.recv() {
342342
Ok(wal_result) => wal_result,
343343
Err(_) => panic!("unexpected hungup"),
@@ -348,7 +348,7 @@ impl SyncController {
348348
///
349349
/// Has to be called after the manifest is updated. Must be invoked by the sync
350350
/// thread. Blocking.
351-
pub fn post_meta(&self, io_handle: IoHandle) -> anyhow::Result<()> {
351+
pub fn post_meta(&self, io_handle: IoHandle) -> std::io::Result<()> {
352352
let ht_pages = self.ht_to_write.lock().take().unwrap();
353353
// Writeout the HT pages and truncate the WAL file.
354354
//

nomt/src/bitbox/writeout.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{
1414

1515
use crate::io::{FatPage, IoCommand, IoHandle, IoKind};
1616

17-
pub(super) fn write_wal(mut wal_fd: &File, wal_blob: &[u8]) -> anyhow::Result<()> {
17+
pub(super) fn write_wal(mut wal_fd: &File, wal_blob: &[u8]) -> std::io::Result<()> {
1818
wal_fd.set_len(0)?;
1919
wal_fd.seek(SeekFrom::Start(0))?;
2020
wal_fd.write_all(wal_blob)?;
@@ -25,7 +25,7 @@ pub(super) fn write_wal(mut wal_fd: &File, wal_blob: &[u8]) -> anyhow::Result<()
2525
/// Truncates the WAL file to zero length.
2626
///
2727
/// Conditionally syncs the file to disk.
28-
pub(super) fn truncate_wal(mut wal_fd: &File, do_sync: bool) -> anyhow::Result<()> {
28+
pub(super) fn truncate_wal(mut wal_fd: &File, do_sync: bool) -> std::io::Result<()> {
2929
wal_fd.set_len(0)?;
3030
wal_fd.seek(SeekFrom::Start(0))?;
3131
if do_sync {
@@ -38,7 +38,7 @@ pub(super) fn write_ht(
3838
io_handle: IoHandle,
3939
ht_fd: &File,
4040
mut ht: Vec<(u64, Arc<FatPage>)>,
41-
) -> anyhow::Result<()> {
41+
) -> std::io::Result<()> {
4242
let mut sent = 0;
4343

4444
ht.sort_unstable_by_key(|item| item.0);

nomt/src/rollback/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ impl Rollback {
262262
&self,
263263
new_start_live: Option<u64>,
264264
new_end_live: Option<u64>,
265-
) -> anyhow::Result<()> {
265+
) -> std::io::Result<()> {
266266
if let Some(new_start_live) = new_start_live {
267267
let mut seglog = self.shared.seglog.lock();
268268
seglog.prune_back(new_start_live.into())?;
@@ -279,7 +279,7 @@ pub struct SyncController {
279279
rollback: Rollback,
280280
wd: Arc<Mutex<Option<WriteoutData>>>,
281281
wd_cv: Arc<Condvar>,
282-
post_meta: Arc<Mutex<Option<anyhow::Result<()>>>>,
282+
post_meta: Arc<Mutex<Option<std::io::Result<()>>>>,
283283
post_meta_cv: Arc<Condvar>,
284284
}
285285

@@ -343,7 +343,7 @@ impl SyncController {
343343
}
344344

345345
/// Wait until the post-meta writeout completes.
346-
pub fn wait_post_meta(&self) -> anyhow::Result<()> {
346+
pub fn wait_post_meta(&self) -> std::io::Result<()> {
347347
let mut post_meta = self.post_meta.lock();
348348
self.post_meta_cv
349349
.wait_while(&mut post_meta, |post_meta| post_meta.is_none());

nomt/src/seglog/mod.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ impl SegmentedLog {
277277
/// # Panics
278278
///
279279
/// The new live range must be a subset of the old live range.
280-
pub fn prune_back(&mut self, new_start_live: RecordId) -> Result<()> {
280+
pub fn prune_back(&mut self, new_start_live: RecordId) -> std::io::Result<()> {
281281
if new_start_live.is_nil() {
282282
self.start_live = RecordId::nil();
283283
self.end_live = RecordId::nil();
@@ -333,7 +333,7 @@ impl SegmentedLog {
333333
///
334334
/// It's possible to return remove all items from the log, i.e. to reset the log to an empty
335335
/// state, by setting the new live end to zero. That would update the live range to `(0, 0)`.
336-
pub fn prune_front(&mut self, new_end_live: RecordId) -> Result<()> {
336+
pub fn prune_front(&mut self, new_end_live: RecordId) -> std::io::Result<()> {
337337
if new_end_live.is_nil() {
338338
self.start_live = RecordId::nil();
339339
self.end_live = RecordId::nil();
@@ -390,7 +390,7 @@ impl SegmentedLog {
390390
Ok(())
391391
}
392392

393-
fn remove_all_segments(&mut self) -> Result<()> {
393+
fn remove_all_segments(&mut self) -> std::io::Result<()> {
394394
let _ = self.head_segment_writer.take();
395395

396396
for segment in &self.segments {
@@ -602,7 +602,7 @@ impl Recovery {
602602
}
603603

604604
/// Scans the segment file and returns the file offset of the end of the specified record.
605-
fn scan_record_end(path: &Path, end_live: RecordId) -> Result<Option<u64>> {
605+
fn scan_record_end(path: &Path, end_live: RecordId) -> std::io::Result<Option<u64>> {
606606
let mut seg_reader = SegmentFileReader::new(File::open(path)?, None)?;
607607
loop {
608608
let header = match seg_reader.read_header()? {
@@ -622,11 +622,15 @@ fn scan_record_end(path: &Path, end_live: RecordId) -> Result<Option<u64>> {
622622
}
623623
}
624624

625-
fn truncate_head_segment(path: &Path, new_end_live: RecordId) -> Result<SegmentFileWriter> {
625+
fn truncate_head_segment(
626+
path: &Path,
627+
new_end_live: RecordId,
628+
) -> std::io::Result<SegmentFileWriter> {
626629
let end = match scan_record_end(path, new_end_live)? {
627630
None => {
628-
return Err(anyhow::anyhow!(
629-
"Failed to find the last live record in the head segment"
631+
return Err(std::io::Error::new(
632+
std::io::ErrorKind::Other,
633+
anyhow::anyhow!("Failed to find the last live record in the head segment"),
630634
));
631635
}
632636
Some(offset) => offset,

nomt/src/seglog/segment_rw.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use anyhow::Result;
21
use std::{
32
fs::File,
43
io::{BufReader, Read, Seek, SeekFrom, Write},
@@ -25,7 +24,11 @@ impl SegmentFileWriter {
2524
Self { file, file_size }
2625
}
2726

28-
pub fn write_header(&mut self, payload_length: u32, record_id: RecordId) -> Result<()> {
27+
pub fn write_header(
28+
&mut self,
29+
payload_length: u32,
30+
record_id: RecordId,
31+
) -> std::io::Result<()> {
2932
let mut header = [0u8; HEADER_SIZE as usize];
3033
{
3134
let mut header = RecordHeaderMut::new(&mut header);
@@ -37,7 +40,7 @@ impl SegmentFileWriter {
3740
Ok(())
3841
}
3942

40-
pub fn write_payload(&mut self, payload: &[u8]) -> Result<()> {
43+
pub fn write_payload(&mut self, payload: &[u8]) -> std::io::Result<()> {
4144
self.file.write_all(payload)?;
4245
// Calculate the next aligned position.
4346
let record_alignment = RECORD_ALIGNMENT as u64;
@@ -55,7 +58,7 @@ impl SegmentFileWriter {
5558
Ok(())
5659
}
5760

58-
pub fn fsync(&mut self) -> Result<()> {
61+
pub fn fsync(&mut self) -> std::io::Result<()> {
5962
self.file.sync_data()?;
6063
Ok(())
6164
}
@@ -85,7 +88,7 @@ pub struct SegmentFileReader {
8588
}
8689

8790
impl SegmentFileReader {
88-
pub fn new(file: File, file_size: Option<u64>) -> Result<Self> {
91+
pub fn new(file: File, file_size: Option<u64>) -> std::io::Result<Self> {
8992
let file_size = if let Some(file_size) = file_size {
9093
file_size
9194
} else {
@@ -103,7 +106,7 @@ impl SegmentFileReader {
103106
/// Reads the header of the record.
104107
///
105108
/// Returns `None` if the end of the file is reached.
106-
pub fn read_header(&mut self) -> Result<Option<RecordHeader>> {
109+
pub fn read_header(&mut self) -> std::io::Result<Option<RecordHeader>> {
107110
if self.next_pos.unwrap_or(0) >= self.file_size {
108111
return Ok(None);
109112
}
@@ -114,9 +117,13 @@ impl SegmentFileReader {
114117
// will read the next header.
115118
self.buf_reader
116119
.seek(SeekFrom::Current(-(HEADER_SIZE as i64)))?;
117-
return Err(anyhow::anyhow!(
118-
"Record payload length is too large: {}",
119-
header.payload_length()
120+
121+
return Err(std::io::Error::new(
122+
std::io::ErrorKind::Other,
123+
anyhow::anyhow!(
124+
"Record payload length is too large: {}",
125+
header.payload_length()
126+
),
120127
));
121128
}
122129
self.payload_length = Some(header.payload_length());
@@ -134,15 +141,15 @@ impl SegmentFileReader {
134141
/// Skips the payload of the record.
135142
///
136143
/// Must be called after `read_header`.
137-
pub fn skip_payload(&mut self) -> Result<()> {
144+
pub fn skip_payload(&mut self) -> std::io::Result<()> {
138145
self.seek_next()?;
139146
Ok(())
140147
}
141148

142149
/// Reads the payload of the record.
143150
///
144151
/// Must be called after `read_header`.
145-
pub fn read_payload(&mut self, payload_buf: &mut Vec<u8>) -> Result<()> {
152+
pub fn read_payload(&mut self, payload_buf: &mut Vec<u8>) -> std::io::Result<()> {
146153
payload_buf.resize(self.payload_length.unwrap() as usize, 0);
147154
self.buf_reader.read_exact(payload_buf)?;
148155
self.seek_next()?;
@@ -152,14 +159,14 @@ impl SegmentFileReader {
152159
/// Positions the reader at the beginning of the next record.
153160
///
154161
/// Must be called after `read_header`.
155-
pub fn seek_next(&mut self) -> Result<()> {
162+
pub fn seek_next(&mut self) -> std::io::Result<()> {
156163
self.buf_reader
157164
.seek(SeekFrom::Start(self.next_pos.unwrap()))?;
158165
Ok(())
159166
}
160167

161168
/// Returns the current position of the reader.
162-
pub fn pos(&mut self) -> Result<u64> {
169+
pub fn pos(&mut self) -> std::io::Result<u64> {
163170
Ok(self.buf_reader.stream_position()?)
164171
}
165172
}

nomt/src/store/meta.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,13 @@ impl Meta {
137137
}
138138
}
139139

140-
pub fn read(page_pool: &PagePool, fd: &File) -> Result<Self> {
140+
pub fn read(page_pool: &PagePool, fd: &File) -> std::io::Result<Self> {
141141
let page = io::read_page(page_pool, fd, 0)?;
142142
let meta = Meta::decode(&page[..META_SIZE]);
143143
Ok(meta)
144144
}
145145

146-
pub fn write(page_pool: &PagePool, fd: &File, meta: &Meta) -> Result<()> {
146+
pub fn write(page_pool: &PagePool, fd: &File, meta: &Meta) -> std::io::Result<()> {
147147
let mut page = page_pool.alloc_fat_page();
148148
meta.encode_to(&mut page.as_mut()[..META_SIZE]);
149149
fd.write_all_at(&page[..], 0)?;

nomt/src/store/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ impl Store {
306306
self.shared
307307
.poisoned
308308
.store(true, std::sync::atomic::Ordering::Relaxed);
309-
return Err(e);
309+
anyhow::bail!(e);
310310
}
311311
Ok(())
312312
}

nomt/src/store/sync.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ impl Sync {
3737
rollback: Option<rollback::Rollback>,
3838
page_cache: PageCache,
3939
updated_pages: impl IntoIterator<Item = (PageId, DirtyPage)> + Send + 'static,
40-
) -> anyhow::Result<()> {
40+
) -> std::io::Result<()> {
4141
self.sync_seqn += 1;
4242
let sync_seqn = self.sync_seqn;
4343

0 commit comments

Comments
 (0)