Skip to content

Commit

Permalink
refactor(store): add atomic write for sequencer and executor (#2776)
Browse files Browse the repository at this point in the history
* feat(rooch-store): add pop_unsaved_nodes and save_sequenced_tx

Introduce methods to handle unsaved nodes for atomic updates.
Enhanced sequencer and accumulator logic to support batch saves.

* fix(rooch-sequencer): correct sequencer info initialization

Fix initialization of SequencerInfo by passing correct tx_order variable. Removed unnecessary flush call on tx_accumulator to prevent redundant operations.

* feat(rooch-store): add test for accumulator popping unsaved nodes

Introduce a new test to validate the behavior of MerkleAccumulator when popping unsaved nodes and ensuring correct leaf retrieval. This enhances the robustness of accumulator operations within RoochStore.

* refactor(moveos-store): decouple change set processing for atomic updates

Refactored the `apply_change_set` method by extracting the node processing into `change_set_to_nodes`. Updated relevant metrics and streamlined transaction handling for node, config, and transaction store updates. This enhances modularity and maintainability.

* fix(moveos-store): reorder column family initialization

Reorders column family names initialization to ensure proper sequence for write batch operations. This change improves the readability and maintainability of the code by grouping related commands together.

* fix(moveos-store): fix write batch column family handling

* fix(rooch-store): reorder column family name push

Reorder the push of column family names to ensure batch creating is ok
  • Loading branch information
popcnt1 authored Oct 17, 2024
1 parent 184b636 commit d9080d3
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 71 deletions.
18 changes: 10 additions & 8 deletions crates/rooch-genesis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,16 +414,15 @@ impl RoochGenesis {
.ctx
.get::<moveos_types::moveos_std::genesis::GenesisContext>()?
.expect("Moveos Genesis context should exist");
let tx_ledger_data = LedgerTxData::L2Tx(self.genesis_tx());

let mut tx_ledger_data = LedgerTxData::L2Tx(self.genesis_tx());
let tx_hash = tx_ledger_data.tx_hash();
// Init tx accumulator
let genesis_tx_accumulator = MerkleAccumulator::new_with_info(
AccumulatorInfo::default(),
rooch_db.rooch_store.get_transaction_accumulator_store(),
);
let _genesis_accumulator_root =
genesis_tx_accumulator.append(vec![tx_ledger_data.clone().tx_hash()].as_slice())?;
genesis_tx_accumulator.flush()?;
let _genesis_accumulator_root = genesis_tx_accumulator.append(vec![tx_hash].as_slice())?;
let genesis_accumulator_unsaved_nodes = genesis_tx_accumulator.pop_unsaved_nodes();

let genesis_tx_accmulator_info = genesis_tx_accumulator.get_info();
let ledger_tx = LedgerTransaction::build_ledger_transaction(
Expand All @@ -434,9 +433,12 @@ impl RoochGenesis {
genesis_tx_accmulator_info.clone(),
);
let sequencer_info = SequencerInfo::new(genesis_tx_order, genesis_tx_accmulator_info);
rooch_db
.rooch_store
.save_last_transaction_with_sequence_info(ledger_tx.clone(), sequencer_info)?;
rooch_db.rooch_store.save_sequenced_tx(
tx_hash,
ledger_tx.clone(),
sequencer_info,
genesis_accumulator_unsaved_nodes,
)?;

// Save genesis tx state change set
let state_change_set_ext = StateChangeSetExt::new(
Expand Down
23 changes: 15 additions & 8 deletions crates/rooch-sequencer/src/actor/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,17 @@ impl SequencerActor {

let tx_order = self.last_sequencer_info.last_order + 1;

let hash = tx_data.tx_hash();
let mut witness_data = hash.as_ref().to_vec();
let tx_hash = tx_data.tx_hash();
let mut witness_data = tx_hash.as_ref().to_vec();
witness_data.extend(tx_order.to_le_bytes().iter());
let witness_hash = h256::sha3_256_of(&witness_data);
let tx_order_signature = Signature::sign(&witness_hash.0, &self.sequencer_key)
.as_ref()
.to_vec();

// Calc transaction accumulator
let _tx_accumulator_root = self.tx_accumulator.append(vec![hash].as_slice())?;
self.tx_accumulator.flush()?;
let _tx_accumulator_root = self.tx_accumulator.append(vec![tx_hash].as_slice())?;
let tx_accumulator_unsaved_nodes = self.tx_accumulator.pop_unsaved_nodes();

let tx_accumulator_info = self.tx_accumulator.get_info();
let tx = LedgerTransaction::build_ledger_transaction(
Expand All @@ -153,10 +153,17 @@ impl SequencerActor {
tx_accumulator_info.clone(),
);

let sequencer_info = SequencerInfo::new(tx.sequence_info.tx_order, tx_accumulator_info);
self.rooch_store
.save_last_transaction_with_sequence_info(tx.clone(), sequencer_info.clone())?;
info!("sequencer tx: {} order: {:?}", hash, tx_order);
let sequencer_info = SequencerInfo::new(tx_order, tx_accumulator_info);
self.rooch_store.save_sequenced_tx(
tx_hash,
tx.clone(),
sequencer_info.clone(),
tx_accumulator_unsaved_nodes,
)?;
info!(
"sequencer sequenced tx_hash: {} tx_order: {:?}",
tx_hash, tx_order
);
self.last_sequencer_info = sequencer_info;

Ok(tx)
Expand Down
35 changes: 19 additions & 16 deletions crates/rooch-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::da_store::{DAMetaDBStore, DAMetaStore};
use crate::meta_store::{MetaDBStore, MetaStore, SEQUENCER_INFO_KEY};
use crate::state_store::{StateDBStore, StateStore};
use crate::transaction_store::{TransactionDBStore, TransactionStore};
use accumulator::AccumulatorTreeStore;
use accumulator::{AccumulatorNode, AccumulatorTreeStore};
use anyhow::Result;
use moveos_common::utils::to_bytes;
use moveos_config::store_config::RocksdbConfig;
Expand Down Expand Up @@ -138,12 +138,15 @@ impl RoochStore {
&self.da_meta_store
}

pub fn save_last_transaction_with_sequence_info(
/// atomic save updates made by Sequencer.sequence(tx) to the store
pub fn save_sequenced_tx(
&self,
mut tx: LedgerTransaction,
tx_hash: H256,
tx: LedgerTransaction,
sequencer_info: SequencerInfo,
accumulator_nodes: Option<Vec<AccumulatorNode>>,
) -> Result<()> {
// TODO use txn GetForUpdate to guard against Read-Write Conflicts
// TODO use txn GetForUpdate to guard against Read-Write Conflicts (need open rocksdb with TransactionDB)
let pre_sequencer_info = self.get_sequencer_info()?;
if let Some(pre_sequencer_info) = pre_sequencer_info {
if sequencer_info.last_order != pre_sequencer_info.last_order + 1 {
Expand All @@ -152,26 +155,26 @@ impl RoochStore {
}

let inner_store = &self.store_instance;

let tx_hash = tx.tx_hash();
let tx_order = tx.sequence_info.tx_order;

let mut write_batch = WriteBatch::new();
let mut cf_names = vec![
TRANSACTION_COLUMN_FAMILY_NAME,
TX_SEQUENCE_INFO_MAPPING_COLUMN_FAMILY_NAME,
META_SEQUENCER_INFO_COLUMN_FAMILY_NAME,
];
write_batch.put(to_bytes(&tx_hash).unwrap(), to_bytes(&tx).unwrap())?;
write_batch.put(to_bytes(&tx_order).unwrap(), to_bytes(&tx_hash).unwrap())?;
write_batch.put(
to_bytes(SEQUENCER_INFO_KEY).unwrap(),
to_bytes(&sequencer_info).unwrap(),
)?;

inner_store.write_batch_sync_across_cfs(
vec![
TRANSACTION_COLUMN_FAMILY_NAME,
TX_SEQUENCE_INFO_MAPPING_COLUMN_FAMILY_NAME,
META_SEQUENCER_INFO_COLUMN_FAMILY_NAME,
],
write_batch,
)?;
if let Some(accumulator_nodes) = accumulator_nodes {
for node in accumulator_nodes {
write_batch.put(to_bytes(&node.hash()).unwrap(), to_bytes(&node).unwrap())?;
cf_names.push(TX_ACCUMULATOR_NODE_COLUMN_FAMILY_NAME);
}
}
inner_store.write_batch_sync_across_cfs(cf_names, write_batch)?;
Ok(())
}
}
Expand Down
46 changes: 45 additions & 1 deletion crates/rooch-store/src/tests/test_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::RoochStore;
use accumulator::node_index::NodeIndex;
use accumulator::{AccumulatorNode, AccumulatorTreeStore};
use accumulator::{Accumulator, AccumulatorNode, AccumulatorTreeStore, MerkleAccumulator};
use moveos_types::h256::H256;

#[tokio::test]
Expand All @@ -23,3 +23,47 @@ async fn test_accumulator_store() {
.unwrap();
assert_eq!(acc_node, acc_node2);
}

#[tokio::test]
async fn accumulator_pop_unsaved() {
let (rooch_store, _) = RoochStore::mock_rooch_store().unwrap();

let tx_accumulator =
MerkleAccumulator::new_empty(rooch_store.get_transaction_accumulator_store());
let leaves = vec![H256::random(), H256::random(), H256::random()];
let _root = tx_accumulator.append(&leaves).unwrap();
let accumulator_info = tx_accumulator.get_info();

let num_leaves = accumulator_info.num_leaves;
let tx_accumulator_unsaved = MerkleAccumulator::new_with_info(
accumulator_info.clone(),
rooch_store.get_transaction_accumulator_store(),
);
for i in 0..num_leaves - 1 {
let leaf = tx_accumulator_unsaved.get_leaf(i);
assert!(leaf.is_err());
}
// last leaf should be in frozen_subtree_roots, so it should be found.
assert_eq!(
leaves[num_leaves as usize - 1],
tx_accumulator_unsaved
.get_leaf(num_leaves - 1)
.unwrap()
.unwrap()
);

let unsaved_nodes = tx_accumulator.pop_unsaved_nodes();
rooch_store
.get_transaction_accumulator_store()
.save_nodes(unsaved_nodes.unwrap())
.unwrap();
let tx_accumulator_saved = MerkleAccumulator::new_with_info(
accumulator_info,
rooch_store.get_transaction_accumulator_store(),
);
for i in 0..num_leaves {
let leaf = tx_accumulator_saved.get_leaf(i);
assert!(leaf.is_ok());
assert_eq!(leaves[i as usize], leaf.unwrap().unwrap());
}
}
6 changes: 6 additions & 0 deletions moveos/moveos-commons/accumulator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub trait Accumulator {
fn get_proof(&self, leaf_index: u64) -> Result<Option<AccumulatorProof>>;
/// Flush node to storage.
fn flush(&self) -> Result<()>;
/// Pop unsaved nodes for atomic updates with other operations .
fn pop_unsaved_nodes(&self) -> Option<Vec<AccumulatorNode>>;
/// Get current accumulator tree root hash.
fn root_hash(&self) -> H256;
/// Get current accumulator tree number of leaves.
Expand Down Expand Up @@ -193,6 +195,10 @@ impl Accumulator for MerkleAccumulator {
self.tree.lock().flush()
}

fn pop_unsaved_nodes(&self) -> Option<Vec<AccumulatorNode>> {
self.tree.lock().pop_unsaved_nodes()
}

fn root_hash(&self) -> H256 {
self.tree.lock().root_hash
}
Expand Down
15 changes: 15 additions & 0 deletions moveos/moveos-commons/accumulator/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,21 @@ impl AccumulatorTree {
Ok(())
}

/// Pop unsaved nodes for atomic updates with other operations .
pub fn pop_unsaved_nodes(&mut self) -> Option<Vec<AccumulatorNode>> {
let nodes = &mut self.update_nodes;
if !nodes.is_empty() {
let nodes_vec = nodes
.iter()
.map(|(_, node)| node.clone())
.collect::<Vec<AccumulatorNode>>();
nodes.clear();
return Some(nodes_vec);
}

None
}

fn scan_frozen_subtree_roots(&mut self) -> Result<Vec<H256>> {
FrozenSubTreeIterator::new(self.num_leaves)
.map(|p| {
Expand Down
70 changes: 40 additions & 30 deletions moveos/moveos-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
// Copyright (c) The Starcoin Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::config_store::{ConfigDBStore, ConfigStore};
use crate::config_store::{ConfigDBStore, ConfigStore, STARTUP_INFO_KEY};
use crate::event_store::{EventDBStore, EventStore};
use crate::state_store::statedb::StateDBStore;
use crate::state_store::NodeDBStore;
use crate::state_store::{nodes_to_write_batch, NodeDBStore};
use crate::transaction_store::{TransactionDBStore, TransactionStore};
use accumulator::inmemory::InMemoryAccumulator;
use anyhow::{Error, Result};
use bcs::to_bytes;
use move_core_types::language_storage::StructTag;
use moveos_config::store_config::RocksdbConfig;
use moveos_config::DataDirPath;
Expand All @@ -28,7 +29,8 @@ use once_cell::sync::Lazy;
use prometheus::Registry;
use raw_store::metrics::DBMetrics;
use raw_store::rocks::RocksDB;
use raw_store::{ColumnFamilyName, StoreInstance};
use raw_store::traits::DBStore;
use raw_store::{ColumnFamilyName, SchemaStore, StoreInstance};
use smt::NodeReader;
use std::fmt::{Debug, Display, Formatter};
use std::path::Path;
Expand Down Expand Up @@ -151,53 +153,61 @@ impl MoveOSStore {
is_gas_upgrade: _,
} = output;

self.state_store.apply_change_set(&mut changeset)?;
// node_store updates
let changed_nodes = self.state_store.change_set_to_nodes(&mut changeset)?;
// transaction_store updates
let new_state_root = changeset.state_root;
let size = changeset.global_size;
let event_ids = self.event_store.save_events(tx_events.clone())?;
let events = tx_events
.clone()
.into_iter()
.zip(event_ids)
.map(|(event, event_id)| Event::new_with_event_id(event_id, event))
.collect::<Vec<_>>();

let new_state_root = changeset.state_root;
let size = changeset.global_size;

self.config_store
.save_startup_info(StartupInfo::new(new_state_root, size))?;
let event_hashes: Vec<_> = events.iter().map(|e| e.hash()).collect();
let event_root = InMemoryAccumulator::from_leaves(event_hashes.as_slice()).root_hash();
let transaction_info = TransactionExecutionInfo::new(
tx_hash,
new_state_root,
size,
event_root,
gas_used,
status.clone(),
);
// config_store updates
let new_startup_info = StartupInfo::new(new_state_root, size);

if log::log_enabled!(log::Level::Debug) {
log::debug!(
"tx_hash: {}, state_root: {}, size: {}, gas_used: {}, status: {:?}",
"handle_tx_output: tx_hash: {}, state_root: {}, size: {}, gas_used: {}, status: {:?}",
tx_hash,
new_state_root,
size,
gas_used,
status
);
}
let event_hashes: Vec<_> = events.iter().map(|e| e.hash()).collect();
let event_root = InMemoryAccumulator::from_leaves(event_hashes.as_slice()).root_hash();

let transaction_info = TransactionExecutionInfo::new(
tx_hash,
new_state_root,
size,
event_root,
gas_used,
status.clone(),
);
self.transaction_store
.save_tx_execution_info(transaction_info.clone())
.map_err(|e| {
anyhow::anyhow!(
"ExecuteTransactionMessage handler save tx info failed: {:?} {}",
transaction_info,
e
)
})?;
// atomic save updates
let inner_store = &self.node_store.get_store().store();
let mut write_batch = nodes_to_write_batch(changed_nodes);
let mut cf_names = vec![STATE_NODE_COLUMN_FAMILY_NAME; write_batch.rows.len()];
write_batch.put(
to_bytes(&STARTUP_INFO_KEY).unwrap(),
to_bytes(&new_startup_info).unwrap(),
)?;
cf_names.push(CONFIG_STARTUP_INFO_COLUMN_FAMILY_NAME);
write_batch.put(
to_bytes(&tx_hash).unwrap(),
to_bytes(&transaction_info).unwrap(),
)?;
cf_names.push(TRANSACTION_EXECUTION_INFO_COLUMN_FAMILY_NAME);
// TODO: could use non-sync write here, because we could replay tx from rooch store(which has sync write after sequenced) at startup.
inner_store.write_batch_sync_across_cfs(cf_names, write_batch)?;

let out = TransactionOutput::new(status, changeset, events, gas_used, is_upgrade);

Ok((out, transaction_info))
}
}
Expand Down
8 changes: 4 additions & 4 deletions moveos/moveos-store/src/state_store/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ pub struct StateDBMetrics {
pub state_update_fields_bytes: HistogramVec,
pub state_update_nodes_latency_seconds: HistogramVec,
pub state_update_nodes_bytes: HistogramVec,
pub state_apply_change_set_latency_seconds: HistogramVec,
pub state_apply_change_set_bytes: HistogramVec,
pub state_change_set_to_nodes_latency_seconds: HistogramVec,
pub state_change_set_to_nodes_bytes: HistogramVec,
pub state_iter_latency_seconds: HistogramVec,
pub state_get_field_at_latency_seconds: HistogramVec,
pub state_get_field_at_bytes: HistogramVec,
Expand Down Expand Up @@ -57,15 +57,15 @@ impl StateDBMetrics {
registry
)
.unwrap(),
state_apply_change_set_latency_seconds: register_histogram_vec_with_registry!(
state_change_set_to_nodes_latency_seconds: register_histogram_vec_with_registry!(
"state_apply_change_set_latency_seconds",
"State apply change set latency in seconds",
&["fn_name"],
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
state_apply_change_set_bytes: register_histogram_vec_with_registry!(
state_change_set_to_nodes_bytes: register_histogram_vec_with_registry!(
"state_apply_change_set_bytes",
"State apply change set data size in bytes",
&["fn_name"],
Expand Down
Loading

0 comments on commit d9080d3

Please sign in to comment.