diff --git a/Cargo.lock b/Cargo.lock index 434a38a1c3..507d029327 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7527,6 +7527,7 @@ dependencies = [ "pathfinder-storage", "pretty_assertions_sorted", "rand", + "rayon", "starknet-gateway-types", "thiserror", "tracing", diff --git a/crates/merkle-tree/Cargo.toml b/crates/merkle-tree/Cargo.toml index ac2359fdb4..54daf4e1c6 100644 --- a/crates/merkle-tree/Cargo.toml +++ b/crates/merkle-tree/Cargo.toml @@ -14,6 +14,7 @@ pathfinder-common = { path = "../common" } pathfinder-crypto = { path = "../crypto" } pathfinder-storage = { path = "../storage" } rand = { workspace = true } +rayon = { workspace = true } starknet-gateway-types = { path = "../gateway-types" } thiserror = { workspace = true } tracing = { workspace = true } diff --git a/crates/merkle-tree/src/lib.rs b/crates/merkle-tree/src/lib.rs index 0bf2bbcdce..3059ee558a 100644 --- a/crates/merkle-tree/src/lib.rs +++ b/crates/merkle-tree/src/lib.rs @@ -1,5 +1,6 @@ pub mod contract_state; pub mod merkle_node; +pub mod starknet_state; pub mod storage; pub mod tree; diff --git a/crates/merkle-tree/src/starknet_state.rs b/crates/merkle-tree/src/starknet_state.rs new file mode 100644 index 0000000000..7a3c614ceb --- /dev/null +++ b/crates/merkle-tree/src/starknet_state.rs @@ -0,0 +1,143 @@ +use anyhow::Context; +use pathfinder_common::state_update::StateUpdateRef; +use pathfinder_common::{BlockNumber, ClassCommitment, StorageCommitment}; +use pathfinder_storage::{Storage, Transaction}; + +use crate::contract_state::update_contract_state; +use crate::{ClassCommitmentTree, StorageCommitmentTree}; + +pub fn update_starknet_state( + transaction: &Transaction<'_>, + state_update: StateUpdateRef<'_>, + verify_hashes: bool, + block: BlockNumber, + // we need this so that we can create extra read-only transactions for + // parallel contract state updates + storage: Storage, +) -> anyhow::Result<(StorageCommitment, ClassCommitment)> { + use rayon::prelude::*; + + let mut storage_commitment_tree = match block.parent() { + Some(parent) => StorageCommitmentTree::load(transaction, parent) + .context("Loading storage commitment tree")?, + None => StorageCommitmentTree::empty(transaction), + } + .with_verify_hashes(verify_hashes); + + let (send, recv) = std::sync::mpsc::channel(); + + rayon::scope(|s| { + s.spawn(|_| { + let result: Result, _> = state_update + .contract_updates + .par_iter() + .map_init( + || storage.clone().connection(), + |connection, (contract_address, update)| { + let connection = match connection { + Ok(connection) => connection, + Err(e) => anyhow::bail!( + "Failed to create database connection in rayon thread: {}", + e + ), + }; + let transaction = connection.transaction()?; + update_contract_state( + **contract_address, + update.storage, + *update.nonce, + update.class.as_ref().map(|x| x.class_hash()), + &transaction, + verify_hashes, + block, + ) + }, + ) + .collect(); + let _ = send.send(result); + }) + }); + + let contract_update_results = recv.recv().context("Panic on rayon thread")??; + + for contract_update_result in contract_update_results.into_iter() { + storage_commitment_tree + .set( + contract_update_result.contract_address, + contract_update_result.state_hash, + ) + .context("Updating storage commitment tree")?; + contract_update_result + .insert(block, transaction) + .context("Inserting contract update result")?; + } + + for (contract, update) in state_update.system_contract_updates { + let update_result = update_contract_state( + *contract, + update.storage, + None, + None, + transaction, + verify_hashes, + block, + ) + .context("Update system contract state")?; + + storage_commitment_tree + .set(*contract, update_result.state_hash) + .context("Updating system contract storage commitment tree")?; + + update_result + .insert(block, transaction) + .context("Persisting system contract trie updates")?; + } + + // Apply storage commitment tree changes. + let (storage_commitment, trie_update) = storage_commitment_tree + .commit() + .context("Apply storage commitment tree updates")?; + + let root_idx = transaction + .insert_storage_trie(&trie_update, block) + .context("Persisting storage trie")?; + + transaction + .insert_storage_root(block, root_idx) + .context("Inserting storage root index")?; + + // Add new Sierra classes to class commitment tree. + let mut class_commitment_tree = match block.parent() { + Some(parent) => ClassCommitmentTree::load(transaction, parent) + .context("Loading class commitment tree")?, + None => ClassCommitmentTree::empty(transaction), + } + .with_verify_hashes(verify_hashes); + + for (sierra, casm) in state_update.declared_sierra_classes { + let leaf_hash = pathfinder_common::calculate_class_commitment_leaf_hash(*casm); + + transaction + .insert_class_commitment_leaf(block, &leaf_hash, casm) + .context("Adding class commitment leaf")?; + + class_commitment_tree + .set(*sierra, leaf_hash) + .context("Update class commitment tree")?; + } + + // Apply all class commitment tree changes. + let (class_commitment, trie_update) = class_commitment_tree + .commit() + .context("Apply class commitment tree updates")?; + + let class_root_idx = transaction + .insert_class_trie(&trie_update, block) + .context("Persisting class trie")?; + + transaction + .insert_class_root(block, class_root_idx) + .context("Inserting class root index")?; + + Ok((storage_commitment, class_commitment)) +} diff --git a/crates/pathfinder/src/state.rs b/crates/pathfinder/src/state.rs index a827ac7f33..7451691523 100644 --- a/crates/pathfinder/src/state.rs +++ b/crates/pathfinder/src/state.rs @@ -1,13 +1,4 @@ pub mod block_hash; mod sync; -pub use sync::{ - l1, - l2, - revert, - sync, - update_starknet_state, - Gossiper, - SyncContext, - RESET_DELAY_ON_FAILURE, -}; +pub use sync::{l1, l2, revert, sync, Gossiper, SyncContext, RESET_DELAY_ON_FAILURE}; diff --git a/crates/pathfinder/src/state/sync.rs b/crates/pathfinder/src/state/sync.rs index 9715085e4f..b2702eefba 100644 --- a/crates/pathfinder/src/state/sync.rs +++ b/crates/pathfinder/src/state/sync.rs @@ -10,7 +10,6 @@ use std::time::{Duration, Instant}; use anyhow::Context; use pathfinder_common::prelude::*; -use pathfinder_common::state_update::StateUpdateRef; use pathfinder_common::{ BlockCommitmentSignature, Chain, @@ -20,11 +19,10 @@ use pathfinder_common::{ }; use pathfinder_crypto::Felt; use pathfinder_ethereum::{EthereumApi, EthereumStateUpdate}; -use pathfinder_merkle_tree::contract_state::update_contract_state; -use pathfinder_merkle_tree::{ClassCommitmentTree, StorageCommitmentTree}; +use pathfinder_merkle_tree::starknet_state::update_starknet_state; use pathfinder_rpc::types::syncing::{self, NumberedBlock, Syncing}; use pathfinder_rpc::{Notifications, PendingData, Reorg, SyncState, TopicBroadcasters}; -use pathfinder_storage::{Connection, Storage, Transaction, TransactionBehavior}; +use pathfinder_storage::{Connection, Storage, TransactionBehavior}; use primitive_types::H160; use starknet_gateway_client::GatewayApi; use starknet_gateway_types::reply::{Block, PendingBlock}; @@ -1126,142 +1124,6 @@ async fn l2_reorg( }) } -pub fn update_starknet_state( - transaction: &Transaction<'_>, - state_update: StateUpdateRef<'_>, - verify_hashes: bool, - block: BlockNumber, - // we need this so that we can create extra read-only transactions for - // parallel contract state updates - storage: Storage, -) -> anyhow::Result<(StorageCommitment, ClassCommitment)> { - use rayon::prelude::*; - - let mut storage_commitment_tree = match block.parent() { - Some(parent) => StorageCommitmentTree::load(transaction, parent) - .context("Loading storage commitment tree")?, - None => StorageCommitmentTree::empty(transaction), - } - .with_verify_hashes(verify_hashes); - - let (send, recv) = std::sync::mpsc::channel(); - - rayon::scope(|s| { - s.spawn(|_| { - let result: Result, _> = state_update - .contract_updates - .par_iter() - .map_init( - || storage.clone().connection(), - |connection, (contract_address, update)| { - let connection = match connection { - Ok(connection) => connection, - Err(e) => anyhow::bail!( - "Failed to create database connection in rayon thread: {}", - e - ), - }; - let transaction = connection.transaction()?; - update_contract_state( - **contract_address, - update.storage, - *update.nonce, - update.class.as_ref().map(|x| x.class_hash()), - &transaction, - verify_hashes, - block, - ) - }, - ) - .collect(); - let _ = send.send(result); - }) - }); - - let contract_update_results = recv.recv().context("Panic on rayon thread")??; - - for contract_update_result in contract_update_results.into_iter() { - storage_commitment_tree - .set( - contract_update_result.contract_address, - contract_update_result.state_hash, - ) - .context("Updating storage commitment tree")?; - contract_update_result - .insert(block, transaction) - .context("Inserting contract update result")?; - } - - for (contract, update) in state_update.system_contract_updates { - let update_result = update_contract_state( - *contract, - update.storage, - None, - None, - transaction, - verify_hashes, - block, - ) - .context("Update system contract state")?; - - storage_commitment_tree - .set(*contract, update_result.state_hash) - .context("Updating system contract storage commitment tree")?; - - update_result - .insert(block, transaction) - .context("Persisting system contract trie updates")?; - } - - // Apply storage commitment tree changes. - let (storage_commitment, trie_update) = storage_commitment_tree - .commit() - .context("Apply storage commitment tree updates")?; - - let root_idx = transaction - .insert_storage_trie(&trie_update, block) - .context("Persisting storage trie")?; - - transaction - .insert_storage_root(block, root_idx) - .context("Inserting storage root index")?; - - // Add new Sierra classes to class commitment tree. - let mut class_commitment_tree = match block.parent() { - Some(parent) => ClassCommitmentTree::load(transaction, parent) - .context("Loading class commitment tree")?, - None => ClassCommitmentTree::empty(transaction), - } - .with_verify_hashes(verify_hashes); - - for (sierra, casm) in state_update.declared_sierra_classes { - let leaf_hash = pathfinder_common::calculate_class_commitment_leaf_hash(*casm); - - transaction - .insert_class_commitment_leaf(block, &leaf_hash, casm) - .context("Adding class commitment leaf")?; - - class_commitment_tree - .set(*sierra, leaf_hash) - .context("Update class commitment tree")?; - } - - // Apply all class commitment tree changes. - let (class_commitment, trie_update) = class_commitment_tree - .commit() - .context("Apply class commitment tree updates")?; - - let class_root_idx = transaction - .insert_class_trie(&trie_update, block) - .context("Persisting class trie")?; - - transaction - .insert_class_root(block, class_root_idx) - .context("Inserting class root index")?; - - Ok((storage_commitment, class_commitment)) -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/crates/pathfinder/src/sync.rs b/crates/pathfinder/src/sync.rs index 0f20ec2695..da688c8cd2 100644 --- a/crates/pathfinder/src/sync.rs +++ b/crates/pathfinder/src/sync.rs @@ -329,6 +329,7 @@ mod tests { use pathfinder_crypto::signature::ecdsa_sign; use pathfinder_crypto::Felt; use pathfinder_ethereum::EthereumClient; + use pathfinder_merkle_tree::starknet_state::update_starknet_state; use pathfinder_storage::fake::{generate, Block, Config}; use pathfinder_storage::{Storage, StorageBuilder}; use rand::Rng; @@ -345,7 +346,6 @@ mod tests { compute_final_hash, BlockHeaderData, }; - use crate::state::update_starknet_state; /// Generate a fake chain of blocks as in /// [`pathfinder_storage::fake::generate`] but with additional diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index 2a76e12685..740427f8e2 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -1236,12 +1236,12 @@ mod tests { use pathfinder_common::transaction::DeployTransactionV0; use pathfinder_common::TransactionHash; use pathfinder_crypto::Felt; + use pathfinder_merkle_tree::starknet_state::update_starknet_state; use pathfinder_storage::fake::{self as fake_storage, Block, Config}; use pathfinder_storage::StorageBuilder; use super::super::handle_state_diff_stream; use super::*; - use crate::state::update_starknet_state; struct Setup { pub streamed_state_diffs: Vec>, diff --git a/crates/pathfinder/src/sync/state_updates.rs b/crates/pathfinder/src/sync/state_updates.rs index 26ba3b2e8f..8c4254cc78 100644 --- a/crates/pathfinder/src/sync/state_updates.rs +++ b/crates/pathfinder/src/sync/state_updates.rs @@ -29,6 +29,7 @@ use pathfinder_common::{ StorageCommitment, }; use pathfinder_merkle_tree::contract_state::ContractStateUpdateResult; +use pathfinder_merkle_tree::starknet_state::update_starknet_state; use pathfinder_merkle_tree::StorageCommitmentTree; use pathfinder_storage::{Storage, TrieUpdate}; use tokio::sync::mpsc; @@ -36,7 +37,6 @@ use tokio::task::spawn_blocking; use tokio_stream::wrappers::ReceiverStream; use super::storage_adapters; -use crate::state::update_starknet_state; use crate::sync::error::SyncError; use crate::sync::stream::ProcessStage; diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index 0b69f909ee..88c4bb9582 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -40,13 +40,13 @@ use pathfinder_common::{ TransactionCommitment, TransactionHash, }; +use pathfinder_merkle_tree::starknet_state::update_starknet_state; use pathfinder_storage::Storage; use starknet_gateway_client::GatewayApi; use tokio_stream::wrappers::ReceiverStream; use super::class_definitions::CompiledClass; use super::{state_updates, transactions}; -use crate::state::update_starknet_state; use crate::sync::class_definitions::{self, ClassWithLayout}; use crate::sync::error::SyncError; use crate::sync::stream::{ProcessStage, SyncReceiver, SyncResult};