Skip to content

Commit dda4199

Browse files
gabriele-0201pepyakin
authored andcommitted
feat: bitbox sync properly propagates errors
1 parent a8e9a7b commit dda4199

File tree

1 file changed

+45
-46
lines changed

1 file changed

+45
-46
lines changed

nomt/src/bitbox/mod.rs

Lines changed: 45 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::{
1717
io::{self, page_pool::FatPage, IoCommand, IoHandle, IoKind, PagePool, PAGE_SIZE},
1818
page_cache::{Page, PageCache},
1919
store::{BucketInfo, DirtyPage},
20+
task::{join_task, spawn_task, TaskResult},
2021
};
2122

2223
use self::{ht_file::HTOffsets, meta_map::MetaMap};
@@ -29,26 +30,18 @@ mod meta_map;
2930
mod wal;
3031
pub(crate) mod writeout;
3132

32-
/// An error that can happen during bitbox's sync.
33-
#[derive(Debug)]
34-
pub enum SyncError {
35-
/// During assigning a bucket to a page, the allocator gave up, meaning that the occupancy rate
36-
/// is too high.
37-
BucketExhaustion,
38-
/// An error occurred while writing to the WAL file.
39-
WalWrite(std::io::Error),
40-
}
33+
/// During assigning a bucket to a page, the allocator gave up, meaning that the occupancy rate
34+
/// is too high.
35+
#[derive(fmt::Debug)]
36+
pub struct BucketExhaustion;
4137

42-
impl fmt::Display for SyncError {
38+
impl fmt::Display for BucketExhaustion {
4339
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44-
match self {
45-
SyncError::BucketExhaustion => write!(f, "bucket exhaustion"),
46-
SyncError::WalWrite(e) => write!(f, "wal write error: {}", e),
47-
}
40+
write!(f, "bucket exhaustion")
4841
}
4942
}
5043

51-
impl std::error::Error for SyncError {}
44+
impl std::error::Error for BucketExhaustion {}
5245

5346
/// The index of a bucket within the map.
5447
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -189,7 +182,7 @@ impl DB {
189182
Vec<(u64, Arc<FatPage>)>,
190183
Vec<(PageId, Option<(Page, BucketIndex)>)>,
191184
),
192-
SyncError,
185+
BucketExhaustion,
193186
> {
194187
wal_blob_builder.reset(sync_seqn);
195188

@@ -224,15 +217,15 @@ impl DB {
224217
BucketInfo::Known(bucket) => (false, bucket),
225218
BucketInfo::FreshWithNoDependents => {
226219
let bucket = allocate_bucket(&page_id, &mut meta_map, &self.shared.seed)
227-
.ok_or(SyncError::BucketExhaustion)?;
220+
.ok_or(BucketExhaustion)?;
228221
(true, bucket)
229222
}
230223
BucketInfo::FreshOrDependent(maybe_bucket) => match maybe_bucket.get() {
231224
Some(bucket) => (false, bucket),
232225
None => {
233226
let bucket =
234227
allocate_bucket(&page_id, &mut meta_map, &self.shared.seed)
235-
.ok_or(SyncError::BucketExhaustion)?;
228+
.ok_or(BucketExhaustion)?;
236229
// Propagate changes to dependents.
237230
maybe_bucket.set(bucket);
238231
(true, bucket)
@@ -302,20 +295,27 @@ impl DB {
302295
pub struct SyncController {
303296
db: DB,
304297
/// The channel to send the result of the pre-meta sync errors. Option is to allow `take`.
305-
pre_meta_result_tx: Option<Sender<Result<(), SyncError>>>,
298+
pre_meta_result_tx: Option<Sender<TaskResult<std::io::Result<()>>>>,
306299
/// he channel to receive the result of the pre-meta sync errors.
307-
pre_meta_result_rx: Receiver<Result<(), SyncError>>,
300+
pre_meta_result_rx: Receiver<TaskResult<std::io::Result<()>>>,
301+
/// The channel to send the result of the begin_sync task. Option is to allow `take`.
302+
begin_sync_result_tx: Option<Sender<TaskResult<Result<(), BucketExhaustion>>>>,
303+
/// The channel to receive the result of the the begin_sync task.
304+
begin_sync_result_rx: Receiver<TaskResult<Result<(), BucketExhaustion>>>,
308305
/// The pages along with their page numbers to write out to the HT file.
309306
ht_to_write: Arc<Mutex<Option<Vec<(u64, Arc<FatPage>)>>>>,
310307
}
311308

312309
impl SyncController {
313310
fn new(db: DB) -> Self {
314311
let (pre_meta_result_tx, pre_meta_result_rx) = crossbeam_channel::bounded(1);
312+
let (begin_sync_result_tx, begin_sync_result_rx) = crossbeam_channel::bounded(1);
315313
Self {
316314
db,
317315
pre_meta_result_tx: Some(pre_meta_result_tx),
318316
pre_meta_result_rx,
317+
begin_sync_result_tx: Some(begin_sync_result_tx),
318+
begin_sync_result_rx,
319319
ht_to_write: Arc::new(Mutex::new(None)),
320320
}
321321
}
@@ -335,25 +335,17 @@ impl SyncController {
335335
let wal_blob_builder = self.db.shared.wal_blob_builder.clone();
336336
// UNWRAP: safe because begin_sync is called only once.
337337
let pre_meta_result_tx = self.pre_meta_result_tx.take().unwrap();
338-
self.db.shared.sync_tp.execute(move || {
338+
let begin_sync_task = move || {
339339
let mut wal_blob_builder = wal_blob_builder.lock();
340-
let (ht_pages, cache_updates) = match bitbox.prepare_sync(
340+
341+
// if fails The sync coordinator will poison the database and all further commits will
342+
// be rejected. Therefore, there is no need to perform cleanup.
343+
let (ht_pages, cache_updates) = bitbox.prepare_sync(
341344
sync_seqn,
342345
&page_pool,
343346
updated_pages,
344347
&mut *wal_blob_builder,
345-
) {
346-
Ok(v) => v,
347-
Err(SyncError::BucketExhaustion) => {
348-
// Bail the commit.
349-
//
350-
// The sync coordinator will poison the database and all further commits will
351-
// be rejected. Therefore, there is no need to perform cleanup.
352-
let _ = pre_meta_result_tx.send(Err(SyncError::BucketExhaustion));
353-
return;
354-
}
355-
Err(SyncError::WalWrite(_)) => unreachable!(),
356-
};
348+
)?;
357349
drop(wal_blob_builder);
358350

359351
// Set the hash-table pages before spawning WAL writeout so they don't race with it.
@@ -364,31 +356,38 @@ impl SyncController {
364356
// evict and drop old pages outside of the critical path.
365357
page_cache.batch_update(cache_updates);
366358
page_cache.evict();
367-
});
359+
Ok(())
360+
};
361+
// UNWRAP: safe because begin_sync is called only once.
362+
let begin_sync_result_tx = self.begin_sync_result_tx.take().unwrap();
363+
spawn_task(
364+
&self.db.shared.sync_tp,
365+
begin_sync_task,
366+
begin_sync_result_tx,
367+
);
368368
}
369369

370-
fn spawn_wal_writeout(pre_meta_result_tx: Sender<Result<(), SyncError>>, bitbox: DB) {
370+
fn spawn_wal_writeout(pre_meta_result_tx: Sender<TaskResult<std::io::Result<()>>>, bitbox: DB) {
371371
let bitbox = bitbox.clone();
372372
let tp = bitbox.shared.sync_tp.clone();
373-
tp.execute(move || {
373+
let wal_writeout_task = move || {
374374
let wal_blob_builder = bitbox.shared.wal_blob_builder.lock();
375375
let wal_slice = wal_blob_builder.as_slice();
376-
let wal_result =
377-
writeout::write_wal(&bitbox.shared.wal_fd, wal_slice).map_err(SyncError::WalWrite);
378-
let _ = pre_meta_result_tx.send(wal_result);
379-
});
376+
writeout::write_wal(&bitbox.shared.wal_fd, wal_slice)
377+
};
378+
379+
spawn_task(&tp, wal_writeout_task, pre_meta_result_tx);
380380
}
381381

382382
/// Wait for the pre-meta operations to complete.
383383
///
384384
/// This includes WAL file to be written out.
385385
///
386386
/// Must be invoked by the sync thread. Blocking.
387-
pub fn wait_pre_meta(&self) -> Result<(), SyncError> {
388-
match self.pre_meta_result_rx.recv() {
389-
Ok(wal_result) => wal_result,
390-
Err(_) => panic!("unexpected hungup"),
391-
}
387+
pub fn wait_pre_meta(&self) -> anyhow::Result<()> {
388+
join_task(&self.begin_sync_result_rx)?;
389+
join_task(&self.pre_meta_result_rx)?;
390+
Ok(())
392391
}
393392

394393
/// Write out the HT pages and truncate the WAL file.

0 commit comments

Comments
 (0)