diff --git a/nomt/src/lib.rs b/nomt/src/lib.rs index 6b62a782..100cdcea 100644 --- a/nomt/src/lib.rs +++ b/nomt/src/lib.rs @@ -16,7 +16,7 @@ use nomt_core::{ }; use overlay::{LiveOverlay, OverlayMarker}; use page_cache::PageCache; -use parking_lot::Mutex; +use parking_lot::{ArcRwLockReadGuard, Mutex, RwLock}; use store::{Store, ValueTransaction}; // CARGO HACK: silence lint; this is used in integration tests @@ -249,6 +249,8 @@ pub struct Nomt { page_pool: PagePool, store: Store, shared: Arc>, + /// Used to protect the multiple-readers-one-writer API + access_lock: Arc>, metrics: Metrics, _marker: std::marker::PhantomData, } @@ -280,6 +282,7 @@ impl Nomt { root: Root(root), last_commit_marker: None, })), + access_lock: Arc::new(RwLock::new(())), metrics, _marker: std::marker::PhantomData, }) @@ -302,6 +305,7 @@ impl Nomt { /// This is used for testing for now. #[doc(hidden)] pub fn read(&self, path: KeyPath) -> anyhow::Result> { + let _guard = self.access_lock.read(); self.store.load_value(path) } @@ -326,6 +330,9 @@ impl Nomt { /// Create a new [`Session`] object with the given parameters. /// + /// This will block if there are any ongoing commits or rollbacks. Multiple sessions may + /// coexist. + /// /// The [`Session`] is a read-only handle on the database and is used to create a changeset to /// be applied to the database. Sessions provide read interfaces and additionally coordinate /// work such as proving and rollback preimages which make committing more efficient. @@ -345,6 +352,7 @@ impl Nomt { } else { None }; + Session { store, merkle_updater: self.merkle_update_pool.begin::( @@ -360,17 +368,26 @@ impl Nomt { rollback_delta, overlay: live_overlay, witness_mode: params.witness, + access_guard: params + .take_global_guard + .then(|| RwLock::read_arc(&self.access_lock)), _marker: std::marker::PhantomData, } } /// Perform a rollback of the last `n` commits. /// - /// This function assumes no sessions are active and panics otherwise. + /// This function will block until all ongoing commits or [`Session`]s are finished. + /// + /// Fails if the DB is not configured for rollback or doesn't have enough commits logged to + /// rollback. pub fn rollback(&self, n: usize) -> anyhow::Result<()> { if n == 0 { return Ok(()); } + + let _write_guard = self.access_lock.write(); + let Some(rollback) = self.store.rollback() else { anyhow::bail!("rollback: not enabled"); }; @@ -381,9 +398,11 @@ impl Nomt { // Begin a new session. We do not allow rollback for this operation because that would // interfere with the rollback log: if another rollback were to be issued, it must rollback // the changes in the rollback log and not the changes performed by the current rollback. - // UNWRAP: `None` ancestors are always valid. let mut session_params = SessionParams::default(); session_params.record_rollback_delta = false; + + // We hold a write guard and don't need the session to take any other. + session_params.take_global_guard = false; let sess = self.begin_session(session_params); // Convert the traceback into a series of write commands. @@ -429,7 +448,11 @@ impl WitnessMode { /// Parameters for instantiating a session. pub struct SessionParams { + // INTERNAL: only false during rollback. determines whether the rollback delta is built record_rollback_delta: bool, + // INTERNAL: only false during rollback. determines whether a global read lock is taken + take_global_guard: bool, + witness: WitnessMode, overlay: LiveOverlay, } @@ -438,6 +461,7 @@ impl Default for SessionParams { fn default() -> Self { SessionParams { record_rollback_delta: true, + take_global_guard: true, witness: WitnessMode::disabled(), // UNWRAP: empty live overlay always valid. overlay: LiveOverlay::new(None).unwrap(), @@ -488,6 +512,7 @@ pub struct Session { rollback_delta: Option, overlay: LiveOverlay, witness_mode: WitnessMode, + access_guard: Option>, _marker: std::marker::PhantomData, } @@ -572,7 +597,6 @@ impl Session { compact_actuals.push((path.clone(), read_write.to_compact::())); } - // UNWRAP: merkle_updater always `Some` during lifecycle. let merkle_update_handle = self .merkle_updater .update_and_prove::(compact_actuals, self.witness_mode.0); @@ -589,8 +613,8 @@ impl Session { value_transaction: tx, merkle_output, rollback_delta, - // UNWRAP: session overlay is always `Some` during lifecycle. parent_overlay: self.overlay, + take_global_guard: self.access_guard.is_some(), } } } @@ -607,6 +631,8 @@ pub struct FinishedSession { merkle_output: merkle::Output, rollback_delta: Option, parent_overlay: LiveOverlay, + // INTERNAL: whether to take a write guard while committing. always true except during rollback. + take_global_guard: bool, } impl FinishedSession { @@ -643,12 +669,16 @@ impl FinishedSession { /// Commit this session to disk directly. /// + /// This function will block until all ongoing sessions and commits have finished. + /// /// This will return an error if I/O fails or if the changeset is no longer valid. /// The changeset may be invalidated if another competing session, overlay, or rollback was /// committed. pub fn commit(self, nomt: &Nomt) -> Result<(), anyhow::Error> { // TODO: do a prev_root check or something to ensure continuity? + let _write_guard = self.take_global_guard.then(|| nomt.access_lock.write()); + { let mut shared = nomt.shared.lock(); shared.root = Root(self.merkle_output.root); @@ -674,7 +704,7 @@ impl FinishedSession { impl Overlay { /// Commit the changes from this overlay to the underlying database. /// - /// This assumes no sessions are active and will panic otherwise. + /// This function will block until all ongoing sessions and commits have finished. /// /// This will return an error if I/O fails or if the changeset is no longer valid, or if the /// overlay has an uncommitted parent. An overlay may be invalidated by a competing commit or @@ -697,6 +727,8 @@ impl Overlay { .collect(); let rollback_delta = self.rollback_delta().map(|delta| delta.clone()); + let _write_guard = nomt.access_lock.write(); + let marker = self.mark_committed(); {