Skip to content

Commit

Permalink
fix: read/write locking on NOMT session/commit APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier committed Feb 3, 2025
1 parent 6ff0f15 commit b667d00
Showing 1 changed file with 38 additions and 6 deletions.
44 changes: 38 additions & 6 deletions nomt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use nomt_core::{
};
use overlay::{LiveOverlay, OverlayMarker};
use page_cache::PageCache;
use parking_lot::Mutex;
use parking_lot::{ArcRwLockReadGuard, Mutex, RwLock};
use store::{Store, ValueTransaction};

// CARGO HACK: silence lint; this is used in integration tests
Expand Down Expand Up @@ -249,6 +249,8 @@ pub struct Nomt<T: HashAlgorithm> {
page_pool: PagePool,
store: Store,
shared: Arc<Mutex<Shared>>,
/// Used to protect the multiple-readers-one-writer API
access_lock: Arc<RwLock<()>>,
metrics: Metrics,
_marker: std::marker::PhantomData<T>,
}
Expand Down Expand Up @@ -280,6 +282,7 @@ impl<T: HashAlgorithm> Nomt<T> {
root: Root(root),
last_commit_marker: None,
})),
access_lock: Arc::new(RwLock::new(())),
metrics,
_marker: std::marker::PhantomData,
})
Expand All @@ -302,6 +305,7 @@ impl<T: HashAlgorithm> Nomt<T> {
/// This is used for testing for now.
#[doc(hidden)]
pub fn read(&self, path: KeyPath) -> anyhow::Result<Option<Value>> {
let _guard = self.access_lock.read();
self.store.load_value(path)
}

Expand All @@ -326,6 +330,9 @@ impl<T: HashAlgorithm> Nomt<T> {

/// Create a new [`Session`] object with the given parameters.
///
/// This will block if there are any ongoing commits or rollbacks. Multiple sessions may
/// coexist.
///
/// The [`Session`] is a read-only handle on the database and is used to create a changeset to
/// be applied to the database. Sessions provide read interfaces and additionally coordinate
/// work such as proving and rollback preimages which make committing more efficient.
Expand All @@ -345,6 +352,7 @@ impl<T: HashAlgorithm> Nomt<T> {
} else {
None
};

Session {
store,
merkle_updater: self.merkle_update_pool.begin::<T>(
Expand All @@ -360,17 +368,26 @@ impl<T: HashAlgorithm> Nomt<T> {
rollback_delta,
overlay: live_overlay,
witness_mode: params.witness,
access_guard: params
.take_global_guard
.then(|| RwLock::read_arc(&self.access_lock)),
_marker: std::marker::PhantomData,
}
}

/// Perform a rollback of the last `n` commits.
///
/// This function assumes no sessions are active and panics otherwise.
/// This function will block until all ongoing commits or [`Session`]s are finished.
///
/// Fails if the DB is not configured for rollback or doesn't have enough commits logged to
/// rollback.
pub fn rollback(&self, n: usize) -> anyhow::Result<()> {
if n == 0 {
return Ok(());
}

let _write_guard = self.access_lock.write();

let Some(rollback) = self.store.rollback() else {
anyhow::bail!("rollback: not enabled");
};
Expand All @@ -381,9 +398,11 @@ impl<T: HashAlgorithm> Nomt<T> {
// Begin a new session. We do not allow rollback for this operation because that would
// interfere with the rollback log: if another rollback were to be issued, it must rollback
// the changes in the rollback log and not the changes performed by the current rollback.
// UNWRAP: `None` ancestors are always valid.
let mut session_params = SessionParams::default();
session_params.record_rollback_delta = false;

// We hold a write guard and don't need the session to take any other.
session_params.take_global_guard = false;
let sess = self.begin_session(session_params);

// Convert the traceback into a series of write commands.
Expand Down Expand Up @@ -429,7 +448,11 @@ impl WitnessMode {

/// Parameters for instantiating a session.
pub struct SessionParams {
// INTERNAL: only false during rollback. determines whether the rollback delta is built
record_rollback_delta: bool,
// INTERNAL: only false during rollback. determines whether a global read lock is taken
take_global_guard: bool,

witness: WitnessMode,
overlay: LiveOverlay,
}
Expand All @@ -438,6 +461,7 @@ impl Default for SessionParams {
fn default() -> Self {
SessionParams {
record_rollback_delta: true,
take_global_guard: true,
witness: WitnessMode::disabled(),
// UNWRAP: empty live overlay always valid.
overlay: LiveOverlay::new(None).unwrap(),
Expand Down Expand Up @@ -488,6 +512,7 @@ pub struct Session<T: HashAlgorithm> {
rollback_delta: Option<rollback::ReverseDeltaBuilder>,
overlay: LiveOverlay,
witness_mode: WitnessMode,
access_guard: Option<ArcRwLockReadGuard<parking_lot::RawRwLock, ()>>,
_marker: std::marker::PhantomData<T>,
}

Expand Down Expand Up @@ -572,7 +597,6 @@ impl<T: HashAlgorithm> Session<T> {
compact_actuals.push((path.clone(), read_write.to_compact::<T>()));
}

// UNWRAP: merkle_updater always `Some` during lifecycle.
let merkle_update_handle = self
.merkle_updater
.update_and_prove::<T>(compact_actuals, self.witness_mode.0);
Expand All @@ -589,8 +613,8 @@ impl<T: HashAlgorithm> Session<T> {
value_transaction: tx,
merkle_output,
rollback_delta,
// UNWRAP: session overlay is always `Some` during lifecycle.
parent_overlay: self.overlay,
take_global_guard: self.access_guard.is_some(),
}
}
}
Expand All @@ -607,6 +631,8 @@ pub struct FinishedSession {
merkle_output: merkle::Output,
rollback_delta: Option<rollback::Delta>,
parent_overlay: LiveOverlay,
// INTERNAL: whether to take a write guard while committing. always true except during rollback.
take_global_guard: bool,
}

impl FinishedSession {
Expand Down Expand Up @@ -643,12 +669,16 @@ impl FinishedSession {

/// Commit this session to disk directly.
///
/// This function will block until all ongoing sessions and commits have finished.
///
/// This will return an error if I/O fails or if the changeset is no longer valid.
/// The changeset may be invalidated if another competing session, overlay, or rollback was
/// committed.
pub fn commit<T: HashAlgorithm>(self, nomt: &Nomt<T>) -> Result<(), anyhow::Error> {
// TODO: do a prev_root check or something to ensure continuity?

let _write_guard = self.take_global_guard.then(|| nomt.access_lock.write());

{
let mut shared = nomt.shared.lock();
shared.root = Root(self.merkle_output.root);
Expand All @@ -674,7 +704,7 @@ impl FinishedSession {
impl Overlay {
/// Commit the changes from this overlay to the underlying database.
///
/// This assumes no sessions are active and will panic otherwise.
/// This function will block until all ongoing sessions and commits have finished.
///
/// This will return an error if I/O fails or if the changeset is no longer valid, or if the
/// overlay has an uncommitted parent. An overlay may be invalidated by a competing commit or
Expand All @@ -697,6 +727,8 @@ impl Overlay {
.collect();
let rollback_delta = self.rollback_delta().map(|delta| delta.clone());

let _write_guard = nomt.access_lock.write();

let marker = self.mark_committed();

{
Expand Down

0 comments on commit b667d00

Please sign in to comment.