Skip to content

Commit

Permalink
refactor(state): move update_starknet_state to pathfinder_merkle_tree
Browse files Browse the repository at this point in the history
  • Loading branch information
CHr15F0x committed Dec 5, 2024
1 parent a88f518 commit ca85d64
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 154 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/merkle-tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pathfinder-common = { path = "../common" }
pathfinder-crypto = { path = "../crypto" }
pathfinder-storage = { path = "../storage" }
rand = { workspace = true }
rayon = { workspace = true }
starknet-gateway-types = { path = "../gateway-types" }
thiserror = { workspace = true }
tracing = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/merkle-tree/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod contract_state;
pub mod merkle_node;
pub mod starknet_state;
pub mod storage;
pub mod tree;

Expand Down
143 changes: 143 additions & 0 deletions crates/merkle-tree/src/starknet_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use anyhow::Context;
use pathfinder_common::state_update::StateUpdateRef;
use pathfinder_common::{BlockNumber, ClassCommitment, StorageCommitment};
use pathfinder_storage::{Storage, Transaction};

use crate::contract_state::update_contract_state;
use crate::{ClassCommitmentTree, StorageCommitmentTree};

pub fn update_starknet_state(
transaction: &Transaction<'_>,
state_update: StateUpdateRef<'_>,
verify_hashes: bool,
block: BlockNumber,
// we need this so that we can create extra read-only transactions for
// parallel contract state updates
storage: Storage,
) -> anyhow::Result<(StorageCommitment, ClassCommitment)> {
use rayon::prelude::*;

let mut storage_commitment_tree = match block.parent() {
Some(parent) => StorageCommitmentTree::load(transaction, parent)
.context("Loading storage commitment tree")?,
None => StorageCommitmentTree::empty(transaction),
}
.with_verify_hashes(verify_hashes);

let (send, recv) = std::sync::mpsc::channel();

rayon::scope(|s| {
s.spawn(|_| {
let result: Result<Vec<_>, _> = state_update
.contract_updates
.par_iter()
.map_init(
|| storage.clone().connection(),
|connection, (contract_address, update)| {
let connection = match connection {
Ok(connection) => connection,
Err(e) => anyhow::bail!(
"Failed to create database connection in rayon thread: {}",
e
),
};
let transaction = connection.transaction()?;
update_contract_state(
**contract_address,
update.storage,
*update.nonce,
update.class.as_ref().map(|x| x.class_hash()),
&transaction,
verify_hashes,
block,
)
},
)
.collect();
let _ = send.send(result);
})
});

let contract_update_results = recv.recv().context("Panic on rayon thread")??;

for contract_update_result in contract_update_results.into_iter() {
storage_commitment_tree
.set(
contract_update_result.contract_address,
contract_update_result.state_hash,
)
.context("Updating storage commitment tree")?;
contract_update_result
.insert(block, transaction)
.context("Inserting contract update result")?;
}

for (contract, update) in state_update.system_contract_updates {
let update_result = update_contract_state(
*contract,
update.storage,
None,
None,
transaction,
verify_hashes,
block,
)
.context("Update system contract state")?;

storage_commitment_tree
.set(*contract, update_result.state_hash)
.context("Updating system contract storage commitment tree")?;

update_result
.insert(block, transaction)
.context("Persisting system contract trie updates")?;
}

// Apply storage commitment tree changes.
let (storage_commitment, trie_update) = storage_commitment_tree
.commit()
.context("Apply storage commitment tree updates")?;

let root_idx = transaction
.insert_storage_trie(&trie_update, block)
.context("Persisting storage trie")?;

transaction
.insert_storage_root(block, root_idx)
.context("Inserting storage root index")?;

// Add new Sierra classes to class commitment tree.
let mut class_commitment_tree = match block.parent() {
Some(parent) => ClassCommitmentTree::load(transaction, parent)
.context("Loading class commitment tree")?,
None => ClassCommitmentTree::empty(transaction),
}
.with_verify_hashes(verify_hashes);

for (sierra, casm) in state_update.declared_sierra_classes {
let leaf_hash = pathfinder_common::calculate_class_commitment_leaf_hash(*casm);

transaction
.insert_class_commitment_leaf(block, &leaf_hash, casm)
.context("Adding class commitment leaf")?;

class_commitment_tree
.set(*sierra, leaf_hash)
.context("Update class commitment tree")?;
}

// Apply all class commitment tree changes.
let (class_commitment, trie_update) = class_commitment_tree
.commit()
.context("Apply class commitment tree updates")?;

let class_root_idx = transaction
.insert_class_trie(&trie_update, block)
.context("Persisting class trie")?;

transaction
.insert_class_root(block, class_root_idx)
.context("Inserting class root index")?;

Ok((storage_commitment, class_commitment))
}
11 changes: 1 addition & 10 deletions crates/pathfinder/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,4 @@
pub mod block_hash;
mod sync;

pub use sync::{
l1,
l2,
revert,
sync,
update_starknet_state,
Gossiper,
SyncContext,
RESET_DELAY_ON_FAILURE,
};
pub use sync::{l1, l2, revert, sync, Gossiper, SyncContext, RESET_DELAY_ON_FAILURE};
142 changes: 2 additions & 140 deletions crates/pathfinder/src/state/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::time::{Duration, Instant};

use anyhow::Context;
use pathfinder_common::prelude::*;
use pathfinder_common::state_update::StateUpdateRef;
use pathfinder_common::{
BlockCommitmentSignature,
Chain,
Expand All @@ -20,11 +19,10 @@ use pathfinder_common::{
};
use pathfinder_crypto::Felt;
use pathfinder_ethereum::{EthereumApi, EthereumStateUpdate};
use pathfinder_merkle_tree::contract_state::update_contract_state;
use pathfinder_merkle_tree::{ClassCommitmentTree, StorageCommitmentTree};
use pathfinder_merkle_tree::starknet_state::update_starknet_state;
use pathfinder_rpc::types::syncing::{self, NumberedBlock, Syncing};
use pathfinder_rpc::{Notifications, PendingData, Reorg, SyncState, TopicBroadcasters};
use pathfinder_storage::{Connection, Storage, Transaction, TransactionBehavior};
use pathfinder_storage::{Connection, Storage, TransactionBehavior};
use primitive_types::H160;
use starknet_gateway_client::GatewayApi;
use starknet_gateway_types::reply::{Block, PendingBlock};
Expand Down Expand Up @@ -1126,142 +1124,6 @@ async fn l2_reorg(
})
}

pub fn update_starknet_state(
transaction: &Transaction<'_>,
state_update: StateUpdateRef<'_>,
verify_hashes: bool,
block: BlockNumber,
// we need this so that we can create extra read-only transactions for
// parallel contract state updates
storage: Storage,
) -> anyhow::Result<(StorageCommitment, ClassCommitment)> {
use rayon::prelude::*;

let mut storage_commitment_tree = match block.parent() {
Some(parent) => StorageCommitmentTree::load(transaction, parent)
.context("Loading storage commitment tree")?,
None => StorageCommitmentTree::empty(transaction),
}
.with_verify_hashes(verify_hashes);

let (send, recv) = std::sync::mpsc::channel();

rayon::scope(|s| {
s.spawn(|_| {
let result: Result<Vec<_>, _> = state_update
.contract_updates
.par_iter()
.map_init(
|| storage.clone().connection(),
|connection, (contract_address, update)| {
let connection = match connection {
Ok(connection) => connection,
Err(e) => anyhow::bail!(
"Failed to create database connection in rayon thread: {}",
e
),
};
let transaction = connection.transaction()?;
update_contract_state(
**contract_address,
update.storage,
*update.nonce,
update.class.as_ref().map(|x| x.class_hash()),
&transaction,
verify_hashes,
block,
)
},
)
.collect();
let _ = send.send(result);
})
});

let contract_update_results = recv.recv().context("Panic on rayon thread")??;

for contract_update_result in contract_update_results.into_iter() {
storage_commitment_tree
.set(
contract_update_result.contract_address,
contract_update_result.state_hash,
)
.context("Updating storage commitment tree")?;
contract_update_result
.insert(block, transaction)
.context("Inserting contract update result")?;
}

for (contract, update) in state_update.system_contract_updates {
let update_result = update_contract_state(
*contract,
update.storage,
None,
None,
transaction,
verify_hashes,
block,
)
.context("Update system contract state")?;

storage_commitment_tree
.set(*contract, update_result.state_hash)
.context("Updating system contract storage commitment tree")?;

update_result
.insert(block, transaction)
.context("Persisting system contract trie updates")?;
}

// Apply storage commitment tree changes.
let (storage_commitment, trie_update) = storage_commitment_tree
.commit()
.context("Apply storage commitment tree updates")?;

let root_idx = transaction
.insert_storage_trie(&trie_update, block)
.context("Persisting storage trie")?;

transaction
.insert_storage_root(block, root_idx)
.context("Inserting storage root index")?;

// Add new Sierra classes to class commitment tree.
let mut class_commitment_tree = match block.parent() {
Some(parent) => ClassCommitmentTree::load(transaction, parent)
.context("Loading class commitment tree")?,
None => ClassCommitmentTree::empty(transaction),
}
.with_verify_hashes(verify_hashes);

for (sierra, casm) in state_update.declared_sierra_classes {
let leaf_hash = pathfinder_common::calculate_class_commitment_leaf_hash(*casm);

transaction
.insert_class_commitment_leaf(block, &leaf_hash, casm)
.context("Adding class commitment leaf")?;

class_commitment_tree
.set(*sierra, leaf_hash)
.context("Update class commitment tree")?;
}

// Apply all class commitment tree changes.
let (class_commitment, trie_update) = class_commitment_tree
.commit()
.context("Apply class commitment tree updates")?;

let class_root_idx = transaction
.insert_class_trie(&trie_update, block)
.context("Persisting class trie")?;

transaction
.insert_class_root(block, class_root_idx)
.context("Inserting class root index")?;

Ok((storage_commitment, class_commitment))
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ mod tests {
use pathfinder_crypto::signature::ecdsa_sign;
use pathfinder_crypto::Felt;
use pathfinder_ethereum::EthereumClient;
use pathfinder_merkle_tree::starknet_state::update_starknet_state;
use pathfinder_storage::fake::{generate, Block, Config};
use pathfinder_storage::{Storage, StorageBuilder};
use rand::Rng;
Expand All @@ -345,7 +346,6 @@ mod tests {
compute_final_hash,
BlockHeaderData,
};
use crate::state::update_starknet_state;

/// Generate a fake chain of blocks as in
/// [`pathfinder_storage::fake::generate`] but with additional
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/src/sync/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1236,12 +1236,12 @@ mod tests {
use pathfinder_common::transaction::DeployTransactionV0;
use pathfinder_common::TransactionHash;
use pathfinder_crypto::Felt;
use pathfinder_merkle_tree::starknet_state::update_starknet_state;
use pathfinder_storage::fake::{self as fake_storage, Block, Config};
use pathfinder_storage::StorageBuilder;

use super::super::handle_state_diff_stream;
use super::*;
use crate::state::update_starknet_state;

struct Setup {
pub streamed_state_diffs: Vec<StreamItem<(StateUpdateData, BlockNumber)>>,
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/src/sync/state_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ use pathfinder_common::{
StorageCommitment,
};
use pathfinder_merkle_tree::contract_state::ContractStateUpdateResult;
use pathfinder_merkle_tree::starknet_state::update_starknet_state;
use pathfinder_merkle_tree::StorageCommitmentTree;
use pathfinder_storage::{Storage, TrieUpdate};
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
use tokio_stream::wrappers::ReceiverStream;

use super::storage_adapters;
use crate::state::update_starknet_state;
use crate::sync::error::SyncError;
use crate::sync::stream::ProcessStage;

Expand Down
Loading

0 comments on commit ca85d64

Please sign in to comment.