diff --git a/Cargo.toml b/Cargo.toml index 4726092fa2..adf37e8061 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,7 +132,7 @@ async = [ "snarkvm-ledger/async", "snarkvm-synthesizer/async" ] cuda = [ "snarkvm-algorithms/cuda" ] parameters_no_std_out = [ "snarkvm-parameters/no_std_out" ] noconfig = [ ] -rocks = [ "snarkvm-ledger/rocks" ] +rocks = [ "snarkvm-ledger/rocks", "snarkvm-synthesizer/rocks" ] test = [ "snarkvm-ledger/test" ] test-helpers = [ "snarkvm-ledger/test-helpers" ] timer = [ "snarkvm-ledger/timer" ] diff --git a/ledger/store/src/block/mod.rs b/ledger/store/src/block/mod.rs index fdeac2ceb3..5ee1c5e467 100644 --- a/ledger/store/src/block/mod.rs +++ b/ledger/store/src/block/mod.rs @@ -393,6 +393,20 @@ pub trait BlockStorage: 'static + Clone + Send + Sync { self.transaction_store().finish_atomic() } + /// Pauses atomic writes. + fn pause_atomic_writes(&self) -> Result<()> { + // Since this applies to the entire storage, any map can be used; this + // one is just the first one in the list. + self.state_root_map().pause_atomic_writes() + } + + /// Unpauses atomic writes. + fn unpause_atomic_writes(&self) -> Result<()> { + // Since this applies to the entire storage, any map can be used; this + // one is just the first one in the list. + self.state_root_map().unpause_atomic_writes::() + } + /// Stores the given `(state root, block)` pair into storage. fn insert(&self, state_root: N::StateRoot, block: &Block) -> Result<()> { // Prepare the confirmed transactions. @@ -1049,6 +1063,20 @@ impl> BlockStore { Ok(()) } + /// Reverts the Merkle tree to its shape before the insertion of the last 'n' blocks. + pub fn remove_last_n_from_tree_only(&self, n: u32) -> Result<()> { + // Ensure 'n' is non-zero. + ensure!(n > 0, "Cannot remove zero blocks"); + // Acquire the write lock on the block tree. + let mut tree = self.tree.write(); + // Prepare an updated Merkle tree removing the last 'n' block hashes. + let updated_tree = tree.prepare_remove_last_n(usize::try_from(n)?)?; + // Update the block tree. + *tree = updated_tree; + // Return success. + Ok(()) + } + /// Removes the last 'n' blocks from storage. pub fn remove_last_n(&self, n: u32) -> Result<()> { // Ensure 'n' is non-zero. @@ -1073,7 +1101,6 @@ impl> BlockStore { } None => bail!("Failed to remove last '{n}' blocks: no blocks in storage"), }; - // Fetch the block hashes to remove. let hashes = cfg_into_iter!(heights) .map(|height| match self.storage.get_block_hash(height)? { @@ -1148,6 +1175,16 @@ impl> BlockStore { pub fn storage_mode(&self) -> &StorageMode { self.storage.storage_mode() } + + /// Pauses atomic writes. + pub fn pause_atomic_writes(&self) -> Result<()> { + self.storage.pause_atomic_writes() + } + + /// Unpauses atomic writes. + pub fn unpause_atomic_writes(&self) -> Result<()> { + self.storage.unpause_atomic_writes::() + } } impl> BlockStore { diff --git a/ledger/store/src/helpers/memory/internal/map.rs b/ledger/store/src/helpers/memory/internal/map.rs index 211d7e11a1..514e4d551c 100644 --- a/ledger/store/src/helpers/memory/internal/map.rs +++ b/ledger/store/src/helpers/memory/internal/map.rs @@ -228,6 +228,25 @@ impl< Ok(()) } + + /// + /// Once called, the subsequent atomic write batches will be queued instead of being executed + /// at the end of their scope. `unpause_atomic_writes` needs to be called in order to + /// restore the usual behavior. + /// + fn pause_atomic_writes(&self) -> Result<()> { + // No effect. + Ok(()) + } + + /// + /// Executes all of the queued writes as a single atomic operation and restores the usual + /// behavior of atomic write batches that was altered by calling `pause_atomic_writes`. + /// + fn unpause_atomic_writes(&self) -> Result<()> { + // No effect. + Ok(()) + } } impl< diff --git a/ledger/store/src/helpers/rocksdb/internal/map.rs b/ledger/store/src/helpers/rocksdb/internal/map.rs index 105d220121..bd868d4bc6 100644 --- a/ledger/store/src/helpers/rocksdb/internal/map.rs +++ b/ledger/store/src/helpers/rocksdb/internal/map.rs @@ -107,8 +107,11 @@ impl< // Ensure that the atomic batch is empty. assert!(self.atomic_batch.lock().is_empty()); - // Ensure that the database atomic batch is empty. - assert!(self.database.atomic_batch.lock().is_empty()); + // Ensure that the database atomic batch is empty; skip this check if the atomic + // writes are paused, as there may be pending operations. + if !self.database.are_atomic_writes_paused() { + assert!(self.database.atomic_batch.lock().is_empty()); + } } /// @@ -217,8 +220,9 @@ impl< assert!(previous_atomic_depth != 0); // If we're at depth 0, it is the final call to `finish_atomic` and the - // atomic write batch can be physically executed. - if previous_atomic_depth == 1 { + // atomic write batch can be physically executed. This is skipped if the + // atomic writes are paused. + if previous_atomic_depth == 1 && !self.database.are_atomic_writes_paused() { // Empty the collection of pending operations. let batch = mem::take(&mut *self.database.atomic_batch.lock()); // Execute all the operations atomically. @@ -229,6 +233,23 @@ impl< Ok(()) } + + /// + /// Once called, the subsequent atomic write batches will be queued instead of being executed + /// at the end of their scope. `unpause_atomic_writes` needs to be called in order to + /// restore the usual behavior. + /// + fn pause_atomic_writes(&self) -> Result<()> { + self.database.pause_atomic_writes() + } + + /// + /// Executes all of the queued writes as a single atomic operation and restores the usual + /// behavior of atomic write batches that was altered by calling `pause_atomic_writes`. + /// + fn unpause_atomic_writes(&self) -> Result<()> { + self.database.unpause_atomic_writes::() + } } impl< diff --git a/ledger/store/src/helpers/rocksdb/internal/mod.rs b/ledger/store/src/helpers/rocksdb/internal/mod.rs index a6411f6bd3..20222de928 100644 --- a/ledger/store/src/helpers/rocksdb/internal/mod.rs +++ b/ledger/store/src/helpers/rocksdb/internal/mod.rs @@ -25,16 +25,17 @@ pub use nested_map::*; mod tests; use aleo_std_storage::StorageMode; -use anyhow::{bail, Result}; +use anyhow::{bail, ensure, Result}; use once_cell::sync::OnceCell; use parking_lot::Mutex; use serde::{de::DeserializeOwned, Serialize}; use std::{ borrow::Borrow, marker::PhantomData, + mem, ops::Deref, sync::{ - atomic::{AtomicBool, AtomicUsize}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, }; @@ -88,6 +89,8 @@ pub struct RocksDB { /// The depth of the current atomic write batch; it gets incremented with every call /// to `start_atomic` and decremented with each call to `finish_atomic`. pub(super) atomic_depth: Arc, + /// A flag indicating whether the atomic writes are currently paused. + pub(super) atomic_writes_paused: Arc, } impl Deref for RocksDB { @@ -132,6 +135,7 @@ impl Database for RocksDB { storage_mode: storage.clone().into(), atomic_batch: Default::default(), atomic_depth: Default::default(), + atomic_writes_paused: Default::default(), }) })? .clone(); @@ -202,6 +206,56 @@ impl Database for RocksDB { } impl RocksDB { + /// Pause the execution of atomic writes for the entire database. + fn pause_atomic_writes(&self) -> Result<()> { + // This operation is only intended to be performed before or after + // atomic batches - never in the middle of them. + assert_eq!(self.atomic_depth.load(Ordering::SeqCst), 0); + + // Set the flag indicating that the pause is in effect. + let already_paused = self.atomic_writes_paused.swap(true, Ordering::SeqCst); + // Make sure that we haven't already paused atomic writes (which would + // indicate a logic bug). + assert!(!already_paused); + + Ok(()) + } + + /// Unpause the execution of atomic writes for the entire database; this + /// executes all the writes that have been queued since they were paused. + fn unpause_atomic_writes(&self) -> Result<()> { + // Ensure the call to unpause is only performed before or after an atomic batch scope + // - and never in the middle of one (otherwise there is a fundamental logic bug). + // Note: In production, this `ensure` is a safety-critical invariant that never fails. + ensure!(self.atomic_depth.load(Ordering::SeqCst) == 0, "Atomic depth must be 0 to unpause atomic writes"); + + // https://github.com/rust-lang/rust/issues/98485 + let currently_paused = self.atomic_writes_paused.load(Ordering::SeqCst); + // Ensure the database is paused (otherwise there is a fundamental logic bug). + // Note: In production, this `ensure` is a safety-critical invariant that never fails. + ensure!(currently_paused, "Atomic writes must be paused to unpause them"); + + // In order to ensure that all the operations that are intended + // to be atomic via the usual macro approach are still performed + // atomically (just as a part of a larger batch), every atomic + // storage operation that has accumulated from the moment the + // writes have been paused becomes executed as a single atomic batch. + let batch = mem::take(&mut *self.atomic_batch.lock()); + if !DISCARD_BATCH { + self.rocksdb.write(batch)?; + } + + // Unset the flag indicating that the pause is in effect. + self.atomic_writes_paused.store(false, Ordering::SeqCst); + + Ok(()) + } + + /// Checks whether the atomic writes are currently paused. + fn are_atomic_writes_paused(&self) -> bool { + self.atomic_writes_paused.load(Ordering::SeqCst) + } + /// Opens the test database. #[cfg(any(test, feature = "test"))] pub fn open_testing(temp_dir: std::path::PathBuf, dev: Option) -> Result { @@ -254,6 +308,7 @@ impl RocksDB { storage_mode: storage_mode.clone(), atomic_batch: Default::default(), atomic_depth: Default::default(), + atomic_writes_paused: Default::default(), }) }?; diff --git a/ledger/store/src/helpers/rocksdb/internal/nested_map.rs b/ledger/store/src/helpers/rocksdb/internal/nested_map.rs index a86e64d97e..86366628e9 100644 --- a/ledger/store/src/helpers/rocksdb/internal/nested_map.rs +++ b/ledger/store/src/helpers/rocksdb/internal/nested_map.rs @@ -201,8 +201,11 @@ impl< // Ensure that the atomic batch is empty. assert!(self.atomic_batch.lock().is_empty()); - // Ensure that the database atomic batch is empty. - assert!(self.database.atomic_batch.lock().is_empty()); + // Ensure that the database atomic batch is empty; skip this check if the atomic + // writes are paused, as there may be pending operations. + if !self.database.are_atomic_writes_paused() { + assert!(self.database.atomic_batch.lock().is_empty()); + } } /// @@ -323,8 +326,9 @@ impl< assert!(previous_atomic_depth != 0); // If we're at depth 0, it is the final call to `finish_atomic` and the - // atomic write batch can be physically executed. - if previous_atomic_depth == 1 { + // atomic write batch can be physically executed. This is skipped if the + // atomic writes are paused. + if previous_atomic_depth == 1 && !self.database.are_atomic_writes_paused() { // Empty the collection of pending operations. let batch = mem::take(&mut *self.database.atomic_batch.lock()); // Execute all the operations atomically. diff --git a/ledger/store/src/helpers/traits/map.rs b/ledger/store/src/helpers/traits/map.rs index 3f3c15d298..b0eef6a4ac 100644 --- a/ledger/store/src/helpers/traits/map.rs +++ b/ledger/store/src/helpers/traits/map.rs @@ -73,6 +73,19 @@ pub trait Map< /// Finishes an atomic operation, performing all the queued writes. /// fn finish_atomic(&self) -> Result<()>; + + /// + /// Once called, the subsequent atomic write batches will be queued instead of being executed + /// at the end of their scope. `unpause_atomic_writes` needs to be called in order to + /// restore the usual behavior. + /// + fn pause_atomic_writes(&self) -> Result<()>; + + /// + /// Executes all of the queued writes as a single atomic operation and restores the usual + /// behavior of atomic write batches that was altered by calling `pause_atomic_writes`. + /// + fn unpause_atomic_writes(&self) -> Result<()>; } /// A trait representing map-like storage operations with read-only capabilities. diff --git a/synthesizer/Cargo.toml b/synthesizer/Cargo.toml index 40f9fcc0d2..7e9b95e809 100644 --- a/synthesizer/Cargo.toml +++ b/synthesizer/Cargo.toml @@ -31,6 +31,7 @@ snark = [ "synthesizer-snark" ] aleo-cli = [ ] async = [ "ledger-query/async", "synthesizer-process/async" ] cuda = [ "algorithms/cuda" ] +rocks = [ "ledger-store/rocks" ] serial = [ "console/serial", "ledger-block/serial", diff --git a/synthesizer/src/vm/mod.rs b/synthesizer/src/vm/mod.rs index d5762da826..d7f7912c9a 100644 --- a/synthesizer/src/vm/mod.rs +++ b/synthesizer/src/vm/mod.rs @@ -344,22 +344,45 @@ impl> VM { block.previous_hash(), )?; - // Attention: The following order is crucial because if 'finalize' fails, we can rollback the block. - // If one first calls 'finalize', then calls 'insert(block)' and it fails, there is no way to rollback 'finalize'. + // Pause the atomic writes, so that both the insertion and finalization belong to a single batch. + #[cfg(feature = "rocks")] + self.block_store().pause_atomic_writes()?; // First, insert the block. self.block_store().insert(block)?; // Next, finalize the transactions. match self.finalize(state, block.ratifications(), block.solutions(), block.transactions()) { - Ok(_ratified_finalize_operations) => Ok(()), + Ok(_ratified_finalize_operations) => { + // Unpause the atomic writes, executing the ones queued from block insertion and finalization. + #[cfg(feature = "rocks")] + self.block_store().unpause_atomic_writes::()?; + Ok(()) + } Err(finalize_error) => { - // Rollback the block. - self.block_store().remove_last_n(1).map_err(|removal_error| { - // Log the finalize error. - error!("Failed to finalize block {} - {finalize_error}", block.height()); - // Return the removal error. - removal_error - })?; + if cfg!(feature = "rocks") { + // Clear all pending atomic operations so that unpausing the atomic writes + // doesn't execute any of the queued storage operations. + self.block_store().abort_atomic(); + self.finalize_store().abort_atomic(); + // Disable the atomic batch override. + // Note: This call is guaranteed to succeed (without error), because `DISCARD_BATCH == true`. + self.block_store().unpause_atomic_writes::()?; + // Rollback the Merkle tree. + self.block_store().remove_last_n_from_tree_only(1).map_err(|removal_error| { + // Log the finalize error. + error!("Failed to finalize block {} - {finalize_error}", block.height()); + // Return the removal error. + removal_error + })?; + } else { + // Rollback the block. + self.block_store().remove_last_n(1).map_err(|removal_error| { + // Log the finalize error. + error!("Failed to finalize block {} - {finalize_error}", block.height()); + // Return the removal error. + removal_error + })?; + } // Return the finalize error. Err(finalize_error) }