Skip to content

fix: read/write locking on NOMT session/commit APIs #762

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions nomt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use nomt_core::{
};
use overlay::{LiveOverlay, OverlayMarker};
use page_cache::PageCache;
use parking_lot::Mutex;
use parking_lot::{ArcRwLockReadGuard, Mutex, RwLock};
use store::{Store, ValueTransaction};

// CARGO HACK: silence lint; this is used in integration tests
Expand Down Expand Up @@ -249,6 +249,8 @@ pub struct Nomt<T: HashAlgorithm> {
page_pool: PagePool,
store: Store,
shared: Arc<Mutex<Shared>>,
/// Used to protect the multiple-readers-one-writer API
access_lock: Arc<RwLock<()>>,
metrics: Metrics,
_marker: std::marker::PhantomData<T>,
}
Expand Down Expand Up @@ -280,6 +282,7 @@ impl<T: HashAlgorithm> Nomt<T> {
root: Root(root),
last_commit_marker: None,
})),
access_lock: Arc::new(RwLock::new(())),
metrics,
_marker: std::marker::PhantomData,
})
Expand All @@ -302,6 +305,7 @@ impl<T: HashAlgorithm> Nomt<T> {
/// This is used for testing for now.
#[doc(hidden)]
pub fn read(&self, path: KeyPath) -> anyhow::Result<Option<Value>> {
let _guard = self.access_lock.read();
self.store.load_value(path)
}

Expand All @@ -313,6 +317,9 @@ impl<T: HashAlgorithm> Nomt<T> {

/// Create a new [`Session`] object with the given parameters.
///
/// This will block if there are any ongoing commits or rollbacks. Multiple sessions may
/// coexist.
///
/// The [`Session`] is a read-only handle on the database and is used to create a changeset to
/// be applied to the database. Sessions provide read interfaces and additionally coordinate
/// work such as proving and rollback preimages which make committing more efficient.
Expand All @@ -332,6 +339,7 @@ impl<T: HashAlgorithm> Nomt<T> {
} else {
None
};

Session {
store,
merkle_updater: self.merkle_update_pool.begin::<T>(
Expand All @@ -347,17 +355,26 @@ impl<T: HashAlgorithm> Nomt<T> {
rollback_delta,
overlay: live_overlay,
witness_mode: params.witness,
access_guard: params
.take_global_guard
.then(|| RwLock::read_arc(&self.access_lock)),
_marker: std::marker::PhantomData,
}
}

/// Perform a rollback of the last `n` commits.
///
/// This function assumes no sessions are active and panics otherwise.
/// This function will block until all ongoing commits or [`Session`]s are finished.
///
/// Fails if the DB is not configured for rollback or doesn't have enough commits logged to
/// rollback.
pub fn rollback(&self, n: usize) -> anyhow::Result<()> {
if n == 0 {
return Ok(());
}

let _write_guard = self.access_lock.write();

let Some(rollback) = self.store.rollback() else {
anyhow::bail!("rollback: not enabled");
};
Expand All @@ -368,9 +385,11 @@ impl<T: HashAlgorithm> Nomt<T> {
// Begin a new session. We do not allow rollback for this operation because that would
// interfere with the rollback log: if another rollback were to be issued, it must rollback
// the changes in the rollback log and not the changes performed by the current rollback.
// UNWRAP: `None` ancestors are always valid.
let mut session_params = SessionParams::default();
session_params.record_rollback_delta = false;

// We hold a write guard and don't need the session to take any other.
session_params.take_global_guard = false;
let sess = self.begin_session(session_params);

// Convert the traceback into a series of write commands.
Expand Down Expand Up @@ -416,7 +435,11 @@ impl WitnessMode {

/// Parameters for instantiating a session.
pub struct SessionParams {
// INTERNAL: only false during rollback. determines whether the rollback delta is built
record_rollback_delta: bool,
// INTERNAL: only false during rollback. determines whether a global read lock is taken
take_global_guard: bool,

witness: WitnessMode,
overlay: LiveOverlay,
}
Expand All @@ -425,6 +448,7 @@ impl Default for SessionParams {
fn default() -> Self {
SessionParams {
record_rollback_delta: true,
take_global_guard: true,
witness: WitnessMode::disabled(),
// UNWRAP: empty live overlay always valid.
overlay: LiveOverlay::new(None).unwrap(),
Expand Down Expand Up @@ -475,6 +499,7 @@ pub struct Session<T: HashAlgorithm> {
rollback_delta: Option<rollback::ReverseDeltaBuilder>,
overlay: LiveOverlay,
witness_mode: WitnessMode,
access_guard: Option<ArcRwLockReadGuard<parking_lot::RawRwLock, ()>>,
_marker: std::marker::PhantomData<T>,
}

Expand Down Expand Up @@ -559,7 +584,6 @@ impl<T: HashAlgorithm> Session<T> {
compact_actuals.push((path.clone(), read_write.to_compact::<T>()));
}

// UNWRAP: merkle_updater always `Some` during lifecycle.
let merkle_update_handle = self
.merkle_updater
.update_and_prove::<T>(compact_actuals, self.witness_mode.0);
Expand All @@ -576,8 +600,8 @@ impl<T: HashAlgorithm> Session<T> {
value_transaction: tx,
merkle_output,
rollback_delta,
// UNWRAP: session overlay is always `Some` during lifecycle.
parent_overlay: self.overlay,
take_global_guard: self.access_guard.is_some(),
}
}
}
Expand All @@ -594,6 +618,8 @@ pub struct FinishedSession {
merkle_output: merkle::Output,
rollback_delta: Option<rollback::Delta>,
parent_overlay: LiveOverlay,
// INTERNAL: whether to take a write guard while committing. always true except during rollback.
take_global_guard: bool,
}

impl FinishedSession {
Expand Down Expand Up @@ -630,12 +656,16 @@ impl FinishedSession {

/// Commit this session to disk directly.
///
/// This function will block until all ongoing sessions and commits have finished.
///
/// This will return an error if I/O fails or if the changeset is no longer valid.
/// The changeset may be invalidated if another competing session, overlay, or rollback was
/// committed.
pub fn commit<T: HashAlgorithm>(self, nomt: &Nomt<T>) -> Result<(), anyhow::Error> {
// TODO: do a prev_root check or something to ensure continuity?

let _write_guard = self.take_global_guard.then(|| nomt.access_lock.write());

{
let mut shared = nomt.shared.lock();
shared.root = Root(self.merkle_output.root);
Expand All @@ -661,7 +691,7 @@ impl FinishedSession {
impl Overlay {
/// Commit the changes from this overlay to the underlying database.
///
/// This assumes no sessions are active and will panic otherwise.
/// This function will block until all ongoing sessions and commits have finished.
///
/// This will return an error if I/O fails or if the changeset is no longer valid, or if the
/// overlay has an uncommitted parent. An overlay may be invalidated by a competing commit or
Expand All @@ -684,6 +714,8 @@ impl Overlay {
.collect();
let rollback_delta = self.rollback_delta().map(|delta| delta.clone());

let _write_guard = nomt.access_lock.write();

let marker = self.mark_committed();

{
Expand Down