diff --git a/applications/tari_indexer/src/dry_run/processor.rs b/applications/tari_indexer/src/dry_run/processor.rs index 52f1423c5..b15d8fc7e 100644 --- a/applications/tari_indexer/src/dry_run/processor.rs +++ b/applications/tari_indexer/src/dry_run/processor.rs @@ -251,7 +251,7 @@ where TSubstateCache: SubstateCache + 'static for (epoch, public_key) in claim_instructions { let vn = self .epoch_manager - .get_validator_node_by_public_key(epoch, &public_key) + .get_validator_node_by_public_key(epoch, public_key.clone()) .await?; let address = VirtualSubstateId::UnclaimedValidatorFee { epoch: epoch.as_u64(), diff --git a/applications/tari_swarm_daemon/src/webserver/rpc/instances.rs b/applications/tari_swarm_daemon/src/webserver/rpc/instances.rs index 054167539..521603049 100644 --- a/applications/tari_swarm_daemon/src/webserver/rpc/instances.rs +++ b/applications/tari_swarm_daemon/src/webserver/rpc/instances.rs @@ -8,6 +8,27 @@ use serde::{Deserialize, Serialize}; use crate::{config::InstanceType, process_manager::InstanceId, webserver::context::HandlerContext}; +#[derive(Debug, Clone, Deserialize)] +pub struct StartAllRequest { + instance_type: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct StartAllResponse { + pub num_instances: u32, +} + +pub async fn start_all(context: &HandlerContext, req: StartAllRequest) -> Result { + let instances = context.process_manager().list_instances(req.instance_type).await?; + + let num_instances = instances.len() as u32; + for instance in instances { + context.process_manager().start_instance(instance.id).await?; + } + + Ok(StartAllResponse { num_instances }) +} + pub type StartInstanceRequest = String; #[derive(Debug, Clone, Serialize)] @@ -65,6 +86,27 @@ pub async fn stop(context: &HandlerContext, req: StopInstanceRequest) -> Result< Ok(StopInstanceResponse { success: true }) } +#[derive(Debug, Clone, Deserialize)] +pub struct StopAllRequest { + instance_type: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct StopAllResponse { + pub num_instances: u32, +} + +pub async fn stop_all(context: &HandlerContext, req: StopAllRequest) -> Result { + let instances = context.process_manager().list_instances(req.instance_type).await?; + + let num_instances = instances.len() as u32; + for instance in instances { + context.process_manager().stop_instance(instance.id).await?; + } + + Ok(StopAllResponse { num_instances }) +} + #[derive(Debug, Clone, Deserialize)] pub struct ListInstancesRequest { pub by_type: Option, diff --git a/applications/tari_swarm_daemon/src/webserver/server.rs b/applications/tari_swarm_daemon/src/webserver/server.rs index 044535969..c01c8c31a 100644 --- a/applications/tari_swarm_daemon/src/webserver/server.rs +++ b/applications/tari_swarm_daemon/src/webserver/server.rs @@ -111,7 +111,9 @@ async fn json_rpc_handler(Extension(context): Extension>, va "add_indexer" => call_handler(context, value, rpc::indexers::create).await, "add_validator_node" => call_handler(context, value, rpc::validator_nodes::create).await, "start" => call_handler(context, value, rpc::instances::start).await, + "start_all" => call_handler(context, value, rpc::instances::start_all).await, "stop" => call_handler(context, value, rpc::instances::stop).await, + "stop_all" => call_handler(context, value, rpc::instances::stop_all).await, "list_instances" => call_handler(context, value, rpc::instances::list).await, "delete_data" => call_handler(context, value, rpc::instances::delete_data).await, "burn_funds" => call_handler(context, value, rpc::minotari_wallets::burn_funds).await, diff --git a/applications/tari_swarm_daemon/webui/src/routes/Main.tsx b/applications/tari_swarm_daemon/webui/src/routes/Main.tsx index 0b8957817..3f1715b4d 100644 --- a/applications/tari_swarm_daemon/webui/src/routes/Main.tsx +++ b/applications/tari_swarm_daemon/webui/src/routes/Main.tsx @@ -173,7 +173,7 @@ function ExtraInfoVN({ name, url, addTxToPool, autoRefresh, state, horizontal }: } return (<>
-

Pool transaction

+

Pool transactions {pool.length}

@@ -493,8 +493,19 @@ export default function Main() { console.log("resp", resp); }); }; + + const stopAll = () => { + jsonRpc("stop_all", { instance_type: "TariValidatorNode" }).then(getInfo); + }; + + const startAll = () => { + jsonRpc("start_all", { instance_type: "TariValidatorNode" }).then(getInfo); + }; + return (
+ + diff --git a/applications/tari_validator_node/src/bootstrap.rs b/applications/tari_validator_node/src/bootstrap.rs index 8b673a2f8..ef844873e 100644 --- a/applications/tari_validator_node/src/bootstrap.rs +++ b/applications/tari_validator_node/src/bootstrap.rs @@ -352,6 +352,7 @@ pub async fn spawn_services( state_store.clone(), mempool.clone(), virtual_substate_manager, + consensus_handle.clone(), ) .await?; // Save final node identity after comms has initialized. This is required because the public_address can be @@ -447,6 +448,7 @@ async fn spawn_p2p_rpc( shard_store_store: SqliteStateStore, mempool: MempoolHandle, virtual_substate_manager: VirtualSubstateManager, EpochManagerHandle>, + consensus: ConsensusHandle, ) -> anyhow::Result<()> { let rpc_server = RpcServer::builder() .with_maximum_simultaneous_sessions(config.validator_node.rpc.max_simultaneous_sessions) @@ -457,6 +459,7 @@ async fn spawn_p2p_rpc( shard_store_store, mempool, virtual_substate_manager, + consensus, )); let (notify_tx, notify_rx) = mpsc::unbounded_channel(); diff --git a/applications/tari_validator_node/src/consensus/handle.rs b/applications/tari_validator_node/src/consensus/handle.rs index 1acfc8f85..c46ce0d1f 100644 --- a/applications/tari_validator_node/src/consensus/handle.rs +++ b/applications/tari_validator_node/src/consensus/handle.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use tari_consensus::hotstuff::{ConsensusCurrentState, CurrentView, HotstuffEvent}; +use tari_dan_common_types::Epoch; use tari_transaction::Transaction; use tokio::sync::{broadcast, mpsc, watch}; @@ -30,6 +31,10 @@ impl ConsensusHandle { } } + pub fn current_epoch(&self) -> Epoch { + self.current_view.get_epoch() + } + pub async fn notify_new_transaction( &self, transaction: Transaction, diff --git a/applications/tari_validator_node/src/p2p/rpc/mod.rs b/applications/tari_validator_node/src/p2p/rpc/mod.rs index 98a283a64..88e9629cf 100644 --- a/applications/tari_validator_node/src/p2p/rpc/mod.rs +++ b/applications/tari_validator_node/src/p2p/rpc/mod.rs @@ -30,18 +30,24 @@ use tari_epoch_manager::base_layer::EpochManagerHandle; use tari_state_store_sqlite::SqliteStateStore; use tari_validator_node_rpc::rpc_service::ValidatorNodeRpcServer; -use crate::{p2p::services::mempool::MempoolHandle, virtual_substate::VirtualSubstateManager}; +use crate::{ + consensus::ConsensusHandle, + p2p::services::mempool::MempoolHandle, + virtual_substate::VirtualSubstateManager, +}; pub fn create_tari_validator_node_rpc_service( epoch_manager: EpochManagerHandle, shard_store_store: SqliteStateStore, mempool: MempoolHandle, virtual_substate_manager: VirtualSubstateManager, EpochManagerHandle>, + consensus: ConsensusHandle, ) -> ValidatorNodeRpcServer { ValidatorNodeRpcServer::new(ValidatorNodeRpcServiceImpl::new( epoch_manager, shard_store_store, mempool, virtual_substate_manager, + consensus, )) } diff --git a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs index 8b190cbbb..9e1af4097 100644 --- a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs +++ b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs @@ -66,6 +66,7 @@ use tari_validator_node_rpc::rpc_service::ValidatorNodeRpcService; use tokio::{sync::mpsc, task}; use crate::{ + consensus::ConsensusHandle, p2p::{ rpc::{block_sync_task::BlockSyncTask, state_sync_task::StateSyncTask}, services::mempool::MempoolHandle, @@ -80,6 +81,7 @@ pub struct ValidatorNodeRpcServiceImpl { shard_state_store: SqliteStateStore, mempool: MempoolHandle, virtual_substate_manager: VirtualSubstateManager, EpochManagerHandle>, + consensus: ConsensusHandle, } impl ValidatorNodeRpcServiceImpl { @@ -91,12 +93,14 @@ impl ValidatorNodeRpcServiceImpl { SqliteStateStore, EpochManagerHandle, >, + consensus: ConsensusHandle, ) -> Self { Self { epoch_manager, shard_state_store, mempool, virtual_substate_manager, + consensus, } } } @@ -340,11 +344,7 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl { request: Request, ) -> Result, RpcStatus> { let msg = request.into_message(); - let current_epoch = self - .epoch_manager - .current_epoch() - .await - .map_err(RpcStatus::log_internal_error(LOG_TARGET))?; + let current_epoch = self.consensus.current_epoch(); if msg.current_epoch != current_epoch { // This may occur if one of the nodes has not fully scanned the base layer return Err(RpcStatus::bad_request(format!( diff --git a/dan_layer/common_types/src/committee.rs b/dan_layer/common_types/src/committee.rs index 6624bfa3d..b2fddc899 100644 --- a/dan_layer/common_types/src/committee.rs +++ b/dan_layer/common_types/src/committee.rs @@ -259,18 +259,6 @@ impl CommitteeInfo { .into_iter() .filter(|substate_address| self.includes_substate_address(substate_address.borrow())) } - - /// Calculates the number of distinct shard groups for the given addresses - pub fn count_distinct_shard_groups, I: IntoIterator>( - &self, - addresses: I, - ) -> usize { - addresses - .into_iter() - .map(|addr| addr.borrow().to_shard_group(self.num_shards, self.num_committees)) - .collect::>() - .len() - } } #[derive(Debug, Clone, Serialize)] diff --git a/dan_layer/common_types/src/versioned_substate_id.rs b/dan_layer/common_types/src/versioned_substate_id.rs index a27e26b18..702a030f4 100644 --- a/dan_layer/common_types/src/versioned_substate_id.rs +++ b/dan_layer/common_types/src/versioned_substate_id.rs @@ -59,6 +59,10 @@ impl SubstateRequirement { .map(|v| SubstateAddress::from_substate_id(self.substate_id(), v)) } + pub fn to_substate_address_zero_version(&self) -> SubstateAddress { + SubstateAddress::from_substate_id(self.substate_id(), 0) + } + /// Calculates and returns the shard number that this SubstateAddress belongs. /// A shard is a fixed division of the 256-bit shard space. /// If the substate version is not known, None is returned. @@ -118,7 +122,7 @@ impl Display for SubstateRequirement { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.version { Some(v) => write!(f, "{}:{}", self.substate_id, v), - None => write!(f, "{}", self.substate_id), + None => write!(f, "{}:?", self.substate_id), } } } @@ -180,16 +184,6 @@ impl VersionedSubstateId { self.version } - /// Calculates and returns the shard number that this SubstateAddress belongs. - /// A shard is an equal division of the 256-bit shard space. - pub fn to_shard(&self, num_shards: NumPreshards) -> Shard { - self.to_substate_address().to_shard(num_shards) - } - - pub fn to_shard_group(&self, num_shards: NumPreshards, num_committees: u32) -> ShardGroup { - self.to_substate_address().to_shard_group(num_shards, num_committees) - } - pub fn to_previous_version(&self) -> Option { self.version .checked_sub(1) diff --git a/dan_layer/consensus/src/block_validations.rs b/dan_layer/consensus/src/block_validations.rs index 506682bb9..120f2a058 100644 --- a/dan_layer/consensus/src/block_validations.rs +++ b/dan_layer/consensus/src/block_validations.rs @@ -27,7 +27,7 @@ pub async fn check_proposal( check_sidechain_id(block, config)?; check_hash_and_height(block)?; let committee_for_block = epoch_manager - .get_committee_by_validator_public_key(block.epoch(), block.proposed_by()) + .get_committee_by_validator_public_key(block.epoch(), block.proposed_by().clone()) .await?; check_proposed_by_leader(leader_strategy, &committee_for_block, block)?; check_signature(block)?; @@ -181,7 +181,7 @@ pub async fn check_quorum_certificate( let mut vns = vec![]; for signature in qc.signatures() { let vn = epoch_manager - .get_validator_node_by_public_key(qc.epoch(), signature.public_key()) + .get_validator_node_by_public_key(qc.epoch(), signature.public_key().clone()) .await?; let committee_info = epoch_manager .get_committee_info_for_substate(qc.epoch(), vn.shard_key) @@ -209,7 +209,8 @@ pub async fn check_quorum_certificate( qc.signatures() .first() .ok_or::(ProposalValidationError::QuorumWasNotReached { qc: qc.clone() }.into())? - .public_key(), + .public_key() + .clone(), ) .await?; diff --git a/dan_layer/consensus/src/hotstuff/block_change_set.rs b/dan_layer/consensus/src/hotstuff/block_change_set.rs index f6b54e193..2a9a9ce92 100644 --- a/dan_layer/consensus/src/hotstuff/block_change_set.rs +++ b/dan_layer/consensus/src/hotstuff/block_change_set.rs @@ -46,6 +46,13 @@ use crate::tracing::TraceTimer; const LOG_TARGET: &str = "tari::dan::consensus::block_change_set"; +const MEM_MAX_BLOCK_DIFF_CHANGES: usize = 10000; +const MEM_MAX_STATE_TREE_DIFF_SIZE: usize = 1000; +const MEM_MAX_SUBSTATE_LOCK_SIZE: usize = 100000; +const MEM_MAX_TRANSACTION_CHANGE_SIZE: usize = 1000; +const MEM_MAX_PROPOSED_FOREIGN_PROPOSALS_SIZE: usize = 1000; +const MEM_MAX_PROPOSED_UTXO_MINTS_SIZE: usize = 1000; + #[derive(Debug, Clone)] pub struct BlockDecision { pub quorum_decision: Option, @@ -56,6 +63,12 @@ pub struct BlockDecision { pub end_of_epoch: Option, } +impl BlockDecision { + pub fn is_accept(&self) -> bool { + matches!(self.quorum_decision, Some(QuorumDecision::Accept)) + } +} + #[derive(Debug, Clone)] pub struct ProposedBlockChangeSet { block: LeafBlock, @@ -84,20 +97,84 @@ impl ProposedBlockChangeSet { } } + pub fn set_block(&mut self, block: LeafBlock) -> &mut Self { + self.block = block; + self + } + pub fn no_vote(&mut self, no_vote_reason: NoVoteReason) -> &mut Self { + self.clear(); self.no_vote_reason = Some(no_vote_reason); - // This means no vote - self.quorum_decision = None; - // The remaining info discarded (not strictly necessary) - self.block_diff = Vec::new(); - self.transaction_changes = IndexMap::new(); - self.state_tree_diffs = IndexMap::new(); - self.substate_locks = IndexMap::new(); - self.proposed_foreign_proposals = Vec::new(); - self.proposed_utxo_mints = Vec::new(); self } + pub fn clear(&mut self) { + self.quorum_decision = None; + + self.block_diff.clear(); + if self.block_diff.capacity() > MEM_MAX_BLOCK_DIFF_CHANGES { + debug!( + target: LOG_TARGET, + "Shrinking block_diff from {} to {}", + self.block_diff.capacity(), + MEM_MAX_BLOCK_DIFF_CHANGES + ); + self.block_diff.shrink_to(MEM_MAX_BLOCK_DIFF_CHANGES); + } + self.transaction_changes.clear(); + if self.transaction_changes.capacity() > MEM_MAX_TRANSACTION_CHANGE_SIZE { + debug!( + target: LOG_TARGET, + "Shrinking transaction_changes from {} to {}", + self.transaction_changes.capacity(), + MEM_MAX_TRANSACTION_CHANGE_SIZE + ); + self.transaction_changes.shrink_to(MEM_MAX_TRANSACTION_CHANGE_SIZE); + } + self.state_tree_diffs.clear(); + if self.state_tree_diffs.capacity() > MEM_MAX_STATE_TREE_DIFF_SIZE { + debug!( + target: LOG_TARGET, + "Shrinking state_tree_diffs from {} to {}", + self.state_tree_diffs.capacity(), + MEM_MAX_STATE_TREE_DIFF_SIZE + ); + self.state_tree_diffs.shrink_to(MEM_MAX_STATE_TREE_DIFF_SIZE); + } + self.substate_locks.clear(); + if self.substate_locks.capacity() > MEM_MAX_SUBSTATE_LOCK_SIZE { + debug!( + target: LOG_TARGET, + "Shrinking substate_locks from {} to {}", + self.substate_locks.capacity(), + MEM_MAX_SUBSTATE_LOCK_SIZE + ); + self.substate_locks.shrink_to(MEM_MAX_SUBSTATE_LOCK_SIZE); + } + self.proposed_foreign_proposals.clear(); + if self.proposed_foreign_proposals.capacity() > MEM_MAX_PROPOSED_FOREIGN_PROPOSALS_SIZE { + debug!( + target: LOG_TARGET, + "Shrinking proposed_foreign_proposals from {} to {}", + self.proposed_foreign_proposals.capacity(), + MEM_MAX_PROPOSED_FOREIGN_PROPOSALS_SIZE + ); + self.proposed_foreign_proposals + .shrink_to(MEM_MAX_PROPOSED_FOREIGN_PROPOSALS_SIZE); + } + self.proposed_utxo_mints.clear(); + if self.proposed_utxo_mints.capacity() > MEM_MAX_PROPOSED_UTXO_MINTS_SIZE { + debug!( + target: LOG_TARGET, + "Shrinking proposed_utxo_mints from {} to {}", + self.proposed_utxo_mints.capacity(), + MEM_MAX_PROPOSED_UTXO_MINTS_SIZE + ); + self.proposed_utxo_mints.shrink_to(MEM_MAX_PROPOSED_UTXO_MINTS_SIZE); + } + self.no_vote_reason = None; + } + pub fn set_state_tree_diffs(&mut self, diffs: IndexMap) -> &mut Self { self.state_tree_diffs = diffs; self @@ -222,21 +299,21 @@ impl ProposedBlockChangeSet { .entry(*transaction.transaction_id()) .or_default(); - let ready_now = transaction.is_ready_for_next_stage(); + let ready_now = transaction.is_ready_for_pending_stage(); change_mut.next_update = Some(TransactionPoolStatusUpdate::new(transaction, ready_now)); Ok(self) } } impl ProposedBlockChangeSet { - pub fn save(self, tx: &mut TTx) -> Result<(), StorageError> + pub fn save(&self, tx: &mut TTx) -> Result<(), StorageError> where TTx: StateStoreWriteTransaction + Deref, TTx::Target: StateStoreReadTransaction, { - if let Some(reason) = self.no_vote_reason { + if let Some(ref reason) = self.no_vote_reason { warn!(target: LOG_TARGET, "❌ No vote: {}", reason); - if let Err(err) = tx.diagnostics_add_no_vote(self.block.block_id, reason) { + if let Err(err) = tx.diagnostics_add_no_vote(self.block.block_id, reason.clone()) { error!(target: LOG_TARGET, "Failed to save no vote reason: {}", err); } // No vote @@ -244,20 +321,18 @@ impl ProposedBlockChangeSet { } let _timer = TraceTimer::debug(LOG_TARGET, "ProposedBlockChangeSet::save"); - let block_diff = BlockDiff::new(self.block.block_id, self.block_diff); // Store the block diff - block_diff.insert(tx)?; + BlockDiff::insert_record(tx, &self.block.block_id, &self.block_diff)?; // Store the tree diffs for each effected shard - let shard_tree_diffs = self.state_tree_diffs; - for (shard, diff) in shard_tree_diffs { - PendingShardStateTreeDiff::create(tx, *self.block.block_id(), shard, diff)?; + for (shard, diff) in &self.state_tree_diffs { + PendingShardStateTreeDiff::create(tx, *self.block.block_id(), *shard, diff)?; } // Save locks - SubstateRecord::lock_all(tx, self.block.block_id, self.substate_locks)?; + SubstateRecord::lock_all(tx, &self.block.block_id, &self.substate_locks)?; - for (transaction_id, change) in self.transaction_changes { + for (transaction_id, change) in &self.transaction_changes { // Save any transaction executions for the block if let Some(ref execution) = change.execution { // This may already exist if we proposed the block @@ -283,17 +358,17 @@ impl ProposedBlockChangeSet { update.insert_for_block(tx, self.block.block_id())?; } - for (shard_group, pledges) in change.foreign_pledges { - tx.foreign_substate_pledges_save(transaction_id, shard_group, pledges)?; + for (shard_group, pledges) in &change.foreign_pledges { + tx.foreign_substate_pledges_save(transaction_id, *shard_group, pledges)?; } } - for block_id in self.proposed_foreign_proposals { - ForeignProposal::set_proposed_in(tx, &block_id, &self.block.block_id)?; + for block_id in &self.proposed_foreign_proposals { + ForeignProposal::set_proposed_in(tx, block_id, &self.block.block_id)?; } - for mint in self.proposed_utxo_mints { - BurntUtxo::set_proposed_in_block(tx, &mint, &self.block.block_id)? + for mint in &self.proposed_utxo_mints { + BurntUtxo::set_proposed_in_block(tx, mint, &self.block.block_id)? } Ok(()) @@ -347,3 +422,38 @@ impl TransactionChangeSet { } } } + +#[cfg(test)] +mod tests { + use std::mem::size_of; + + use super::*; + + #[test] + fn check_max_mem_usage() { + let sz = size_of::(); + eprintln!("ProposedBlockChangeSet: {}", sz); + const TARGET_MAX_MEM_USAGE: usize = 22_112_000; + let mem_block_diff = size_of::() * MEM_MAX_BLOCK_DIFF_CHANGES; + eprintln!("mem_block_diff: {}MiB", mem_block_diff / 1024 / 1024); + let mem_state_tree_diffs = + size_of::() * size_of::() * MEM_MAX_STATE_TREE_DIFF_SIZE; + eprintln!("mem_state_tree_diffs: {}", mem_state_tree_diffs); + let mem_substate_locks = (size_of::() + size_of::()) * MEM_MAX_SUBSTATE_LOCK_SIZE; + eprintln!("mem_substate_locks: {}", mem_substate_locks); + let mem_transaction_changes = + (size_of::() + size_of::()) * MEM_MAX_TRANSACTION_CHANGE_SIZE; + eprintln!("mem_transaction_changes: {}", mem_transaction_changes); + let mem_proposed_foreign_proposals = size_of::() * MEM_MAX_PROPOSED_FOREIGN_PROPOSALS_SIZE; + eprintln!("mem_proposed_foreign_proposals: {}", mem_proposed_foreign_proposals); + let mem_proposed_utxo_mints = size_of::() * MEM_MAX_PROPOSED_UTXO_MINTS_SIZE; + eprintln!("mem_proposed_utxo_mints: {}", mem_proposed_utxo_mints); + let total_mem = mem_block_diff + + mem_state_tree_diffs + + mem_substate_locks + + mem_transaction_changes + + mem_proposed_foreign_proposals + + mem_proposed_utxo_mints; + assert_eq!(total_mem, TARGET_MAX_MEM_USAGE); + } +} diff --git a/dan_layer/consensus/src/hotstuff/common.rs b/dan_layer/consensus/src/hotstuff/common.rs index 592ab3af1..503dbc1d1 100644 --- a/dan_layer/consensus/src/hotstuff/common.rs +++ b/dan_layer/consensus/src/hotstuff/common.rs @@ -21,6 +21,7 @@ use tari_dan_common_types::{ use tari_dan_storage::{ consensus_models::{ Block, + BlockId, EpochCheckpoint, LeafBlock, PendingShardStateTreeDiff, @@ -86,28 +87,35 @@ pub fn calculate_last_dummy_block>( - candidate_block: &Block, - justify_block: &Block, + network: Network, + epoch: Epoch, + shard_group: ShardGroup, + high_qc: &QuorumCertificate, + expected_parent_block_id: &BlockId, + parent_merkle_root: FixedHash, + new_height: NodeHeight, leader_strategy: &TLeaderStrategy, local_committee: &Committee, + parent_timestamp: u64, + parent_base_layer_block_height: u64, + parent_base_layer_block_hash: FixedHash, ) -> Vec { let mut dummies = Vec::new(); with_dummy_blocks( - candidate_block.network(), - candidate_block.epoch(), - candidate_block.shard_group(), - candidate_block.justify(), - *justify_block.merkle_root(), - candidate_block.height(), + network, + epoch, + shard_group, + high_qc, + parent_merkle_root, + new_height, leader_strategy, local_committee, - justify_block.timestamp(), - justify_block.base_layer_block_height(), - *justify_block.base_layer_block_hash(), + parent_timestamp, + parent_base_layer_block_height, + parent_base_layer_block_hash, |dummy_block| { - if dummy_block.id() == candidate_block.parent() { + if dummy_block.id() == expected_parent_block_id { dummies.push(dummy_block); ControlFlow::Break(()) } else { @@ -120,6 +128,29 @@ pub fn calculate_dummy_blocks>( + candidate_block: &Block, + justify_block: &Block, + leader_strategy: &TLeaderStrategy, + local_committee: &Committee, +) -> Vec { + calculate_dummy_blocks( + candidate_block.network(), + candidate_block.epoch(), + candidate_block.shard_group(), + candidate_block.justify(), + candidate_block.parent(), + *justify_block.merkle_root(), + candidate_block.height(), + leader_strategy, + local_committee, + justify_block.timestamp(), + justify_block.base_layer_block_height(), + *justify_block.base_layer_block_hash(), + ) +} + fn with_dummy_blocks( network: Network, epoch: Epoch, @@ -139,8 +170,8 @@ fn with_dummy_blocks( F: FnMut(Block) -> ControlFlow<()>, { let mut parent_block = high_qc.as_leaf_block(); - let mut current_height = high_qc.block_height() + NodeHeight(1); - if current_height > new_height { + let mut current_height = high_qc.block_height(); + if current_height >= new_height { error!( target: LOG_TARGET, "BUG: 🍼 no dummy blocks to calculate. current height {} is greater than new height {}", @@ -158,7 +189,12 @@ fn with_dummy_blocks( new_height, ); loop { + if current_height == new_height { + break; + } + current_height += NodeHeight(1); let leader = leader_strategy.get_leader_public_key(local_committee, current_height); + let dummy_block = Block::dummy_block( network, *parent_block.block_id(), @@ -182,11 +218,6 @@ fn with_dummy_blocks( if callback(dummy_block).is_break() { break; } - - if current_height == new_height { - break; - } - current_height += NodeHeight(1); } } diff --git a/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs b/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs index f2823f416..395e1b784 100644 --- a/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs +++ b/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs @@ -46,7 +46,7 @@ pub fn process_foreign_block( ); info!( target: LOG_TARGET, - "🧩 Processing FOREIGN PROPOSAL for block {}, justify_qc: {}", + "🧩 Processing FOREIGN PROPOSAL {}, justify_qc: {}", proposal.block(), proposal.justify_qc(), ); @@ -62,7 +62,11 @@ pub fn process_foreign_block( for cmd in block.commands() { match cmd { Command::LocalPrepare(atom) => { - if !local_committee_info.includes_any_address(atom.evidence.substate_addresses_iter()) { + if atom + .evidence + .shard_groups_iter() + .all(|sg| *sg != local_committee_info.shard_group()) + { debug!( target: LOG_TARGET, "🧩 FOREIGN PROPOSAL: Command: LocalPrepare({}, {}), block: {} not relevant to local committee", @@ -213,7 +217,11 @@ pub fn process_foreign_block( } }, Command::LocalAccept(atom) => { - if !local_committee_info.includes_any_address(atom.evidence.substate_addresses_iter()) { + if atom + .evidence + .shard_groups_iter() + .all(|sg| *sg != local_committee_info.shard_group()) + { continue; } @@ -325,7 +333,7 @@ pub fn process_foreign_block( } proposed_block_change_set.set_next_transaction_update(tx_rec)?; } - } else if tx_rec.current_stage().is_local_prepared() && tx_rec.is_ready_for_next_stage() { + } else if tx_rec.current_stage().is_local_prepared() && tx_rec.is_ready_for_pending_stage() { info!( target: LOG_TARGET, "🧩 FOREIGN PROPOSAL: Transaction is ready for propose ALL_PREPARED({}, {}) Local Stage: {}", @@ -336,7 +344,7 @@ pub fn process_foreign_block( tx_rec.set_next_stage(TransactionPoolStage::LocalPrepared)?; proposed_block_change_set.set_next_transaction_update(tx_rec)?; - } else if tx_rec.current_stage().is_local_accepted() && tx_rec.is_ready_for_next_stage() { + } else if tx_rec.current_stage().is_local_accepted() && tx_rec.is_ready_for_pending_stage() { info!( target: LOG_TARGET, "🧩 FOREIGN PROPOSAL: Transaction is ready for propose ALL_ACCEPT({}, {}) Local Stage: {}", diff --git a/dan_layer/consensus/src/hotstuff/on_catch_up_sync.rs b/dan_layer/consensus/src/hotstuff/on_catch_up_sync.rs index f4f7a1e2b..d3ba7899f 100644 --- a/dan_layer/consensus/src/hotstuff/on_catch_up_sync.rs +++ b/dan_layer/consensus/src/hotstuff/on_catch_up_sync.rs @@ -32,7 +32,7 @@ impl OnCatchUpSync { } } - pub async fn request_sync(&mut self, epoch: Epoch, from: &TConsensusSpec::Addr) -> Result<(), HotStuffError> { + pub async fn request_sync(&mut self, epoch: Epoch, from: TConsensusSpec::Addr) -> Result<(), HotStuffError> { let high_qc = self.store.with_read_tx(|tx| HighQc::get(tx, epoch))?; info!( target: LOG_TARGET, @@ -41,6 +41,7 @@ impl OnCatchUpSync { from, self.pacemaker.current_view() ); + // Reset leader timeout to previous height since we're behind and need to process catch up blocks. This is the // only case where the view is non-monotonic. TODO: is this correct? self.pacemaker @@ -51,8 +52,8 @@ impl OnCatchUpSync { if self .outbound_messaging .send( - from.clone(), - HotstuffMessage::CatchUpSyncRequest(SyncRequestMessage { epoch, high_qc }), + from, + HotstuffMessage::CatchUpSyncRequest(SyncRequestMessage { high_qc }), ) .await .is_err() diff --git a/dan_layer/consensus/src/hotstuff/on_message_validate.rs b/dan_layer/consensus/src/hotstuff/on_message_validate.rs index f8791c010..4b7e33993 100644 --- a/dan_layer/consensus/src/hotstuff/on_message_validate.rs +++ b/dan_layer/consensus/src/hotstuff/on_message_validate.rs @@ -162,46 +162,40 @@ impl OnMessageValidate { .await } - pub fn update_local_parked_blocks( + pub fn update_local_parked_blocks<'a, I: IntoIterator + ExactSizeIterator>( &self, current_height: NodeHeight, - transaction_id: &TransactionId, - ) -> Result, HotStuffError> { - let maybe_unparked_block = self - .store - .with_write_tx(|tx| tx.missing_transactions_remove(current_height, transaction_id))?; - - let Some((unparked_block, foreign_proposals)) = maybe_unparked_block else { - return Ok(None); - }; - - info!(target: LOG_TARGET, "♻️ all transactions for block {unparked_block} are ready for consensus"); - - let _ignore = self.tx_events.send(HotstuffEvent::ParkedBlockReady { - block: unparked_block.as_leaf_block(), - }); - - Ok(Some(ProposalMessage { - block: unparked_block, - foreign_proposals, - })) - } - - pub fn update_foreign_parked_blocks( - &self, - transaction_id: &TransactionId, - ) -> Result, HotStuffError> { - let unparked_foreign_blocks = self - .store - .with_write_tx(|tx| ForeignParkedProposal::remove_by_transaction_id(tx, transaction_id))?; - - if unparked_foreign_blocks.is_empty() { - return Ok(vec![]); - }; - - info!(target: LOG_TARGET, "♻️ all transactions for {} foreign block(s) are ready for consensus", unparked_foreign_blocks.len()); + transaction_ids: I, + ) -> Result<(Vec, Vec), HotStuffError> { + let _timer = TraceTimer::debug(LOG_TARGET, "update_local_parked_blocks").with_iterations(transaction_ids.len()); + self.store.with_write_tx(|tx| { + // TODO(perf) + let mut unparked_blocks = Vec::new(); + let mut foreign_unparked_blocks = Vec::new(); + for transaction_id in transaction_ids { + if let Some((unparked_block, foreign_proposals)) = + tx.missing_transactions_remove(current_height, transaction_id)? + { + info!(target: LOG_TARGET, "♻️ all transactions for local block {unparked_block} are ready for consensus"); + + let _ignore = self.tx_events.send(HotstuffEvent::ParkedBlockReady { + block: unparked_block.as_leaf_block(), + }); + + unparked_blocks.push(ProposalMessage { + block: unparked_block, + foreign_proposals, + }); + } - Ok(unparked_foreign_blocks) + let foreign_unparked = ForeignParkedProposal::remove_by_transaction_id(tx, transaction_id)?; + if !foreign_unparked.is_empty() { + info!(target: LOG_TARGET, "♻️ all transactions for {} foreign block(s) are ready for consensus", foreign_unparked.len()); + foreign_unparked_blocks.extend(foreign_unparked.into_iter().map(Into::into)); + } + } + Ok((unparked_blocks, foreign_unparked_blocks)) + }) } async fn check_proposal(&self, block: &Block) -> Result<(), HotStuffError> { @@ -365,8 +359,9 @@ impl OnMessageValidate { ); let parked_block = ForeignParkedProposal::from(msg); - parked_block.insert(tx)?; - parked_block.add_missing_transactions(tx, &missing_tx_ids)?; + if parked_block.save(tx)? { + parked_block.add_missing_transactions(tx, &missing_tx_ids)?; + } Ok(MessageValidationResult::ParkedProposal { block_id: *parked_block.block().id(), diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index ecfcb42c8..874d0975f 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -129,6 +129,7 @@ where TConsensusSpec: ConsensusSpec is_newview_propose: bool, propose_epoch_end: bool, ) -> Result<(), HotStuffError> { + let _timer = TraceTimer::info(LOG_TARGET, "OnPropose"); if let Some(last_proposed) = self.store.with_read_tx(|tx| LastProposed::get(tx)).optional()? { if last_proposed.epoch == leaf_block.epoch && last_proposed.height > leaf_block.height { // is_newview_propose means that a NEWVIEW has reached quorum and nodes are expecting us to propose. @@ -179,6 +180,13 @@ where TConsensusSpec: ConsensusSpec let high_qc = HighQc::get(&**tx, epoch)?; let high_qc_cert = high_qc.get_quorum_certificate(&**tx)?; + info!( + target: LOG_TARGET, + "🌿 PROPOSE local block with parent {}. HighQC: {}", + leaf_block, + high_qc_cert, + ); + let next_block = on_propose.build_next_block( tx, epoch, @@ -302,7 +310,7 @@ where TConsensusSpec: ConsensusSpec // Leader thinks that all local nodes agree that all shard groups have prepared, we are ready to accept // locally TransactionPoolStage::AllPrepared => Ok(Some(Command::LocalAccept( - self.get_transaction_atom_with_leader_fee(local_committee_info, &mut tx_rec)?, + self.get_transaction_atom_with_leader_fee(&mut tx_rec)?, ))), // Leader thinks local nodes are ready to accept an ABORT TransactionPoolStage::SomePrepared => Ok(Some(Command::LocalAccept(tx_rec.get_current_transaction_atom()))), @@ -373,7 +381,9 @@ where TConsensusSpec: ConsensusSpec high_qc.qc_id ); - change_set.set_next_transaction_update(pool_tx)?; + if cmd.is_local_prepare() || cmd.is_local_accept() { + change_set.set_next_transaction_update(pool_tx)?; + } } Ok(()) @@ -396,19 +406,24 @@ where TConsensusSpec: ConsensusSpec // TODO: Configure const TARGET_BLOCK_SIZE: usize = 500; + let justifies_parent = high_qc_certificate.block_id() == parent_block.block_id(); let next_height = parent_block.height() + NodeHeight(1); + let start_of_chain_id = if justifies_parent || high_qc_certificate.is_zero() { + // Parent is justified - we can include its state in the MR calc, foreign propose etc + parent_block.block_id() + } else { + // Parent is not justified which means we have dummy blocks between the parent and the justified block so we + // can exclude them from the query. Also note that the query will fail if we used the parent + // block id, since the dummy blocks do not exist yet. + high_qc_certificate.block_id() + }; let mut total_leader_fee = 0; let foreign_proposals = if propose_epoch_end { vec![] } else { - ForeignProposal::get_all_new( - tx, - base_layer_block_height, - parent_block.block_id(), - TARGET_BLOCK_SIZE / 4, - )? + ForeignProposal::get_all_new(tx, base_layer_block_height, start_of_chain_id, TARGET_BLOCK_SIZE / 4)? }; if !foreign_proposals.is_empty() { @@ -425,7 +440,7 @@ where TConsensusSpec: ConsensusSpec TARGET_BLOCK_SIZE .checked_sub(foreign_proposals.len() * 4) .filter(|n| *n > 0) - .map(|size| BurntUtxo::get_all_unproposed(tx, parent_block.block_id(), size)) + .map(|size| BurntUtxo::get_all_unproposed(tx, start_of_chain_id, size)) .transpose()? .unwrap_or_default() }; @@ -443,7 +458,7 @@ where TConsensusSpec: ConsensusSpec // Each foreign proposal is "heavier" than a transaction command .checked_sub(foreign_proposals.len() * 4 + burnt_utxos.len()) .filter(|n| *n > 0) - .map(|size| self.transaction_pool.get_batch_for_next_block(tx, size)) + .map(|size| self.transaction_pool.get_batch_for_next_block(tx, size, parent_block.block_id())) .transpose()? .unwrap_or_default() }; @@ -498,6 +513,12 @@ where TConsensusSpec: ConsensusSpec } } + debug!( + target: LOG_TARGET, + "🌿 PROPOSE: {} (or less) transaction(s), {} foreign proposal(s), {} UTXOs for next block (justifies_parent = {})", + batch.len(), foreign_proposals.len() , burnt_utxos.len(), justifies_parent + ); + // batch is empty for is_empty, is_epoch_end and is_epoch_start blocks let mut substate_store = PendingSubstateStore::new(tx, *parent_block.block_id(), self.config.num_preshards); let mut executed_transactions = HashMap::new(); @@ -526,7 +547,7 @@ where TConsensusSpec: ConsensusSpec } timer.done(); - // This relies on the UTXO commands being ordered last + // This relies on the UTXO commands being ordered after transaction commands for utxo in burnt_utxos { let id = VersionedSubstateId::new(utxo.substate_id.clone(), 0); let shard = id.to_substate_address().to_shard(local_committee_info.num_preshards()); @@ -548,8 +569,8 @@ where TConsensusSpec: ConsensusSpec ); let timer = TraceTimer::info(LOG_TARGET, "Propose calculate state root"); - let pending_tree_diffs = - PendingShardStateTreeDiff::get_all_up_to_commit_block(tx, high_qc_certificate.block_id())?; + + let pending_tree_diffs = PendingShardStateTreeDiff::get_all_up_to_commit_block(tx, start_of_chain_id)?; let (state_root, _) = calculate_state_merkle_root( tx, @@ -727,15 +748,14 @@ where TConsensusSpec: ConsensusSpec // foreign inputs/outputs. tx_rec.set_local_decision(Decision::Commit); // Set partial evidence using local inputs and known outputs. - tx_rec.set_evidence(multishard.to_initial_evidence( - local_committee_info.num_preshards(), - local_committee_info.num_committees(), - )); + tx_rec + .evidence_mut() + .update(&multishard.to_initial_evidence(local_committee_info)); } }, Decision::Abort => { // CASE: The transaction was ABORTed due to a lock conflict - let execution = multishard.into_execution().expect("Abort should have execution"); + let execution = multishard.into_execution().expect("Abort must have execution"); tx_rec.update_from_execution( local_committee_info.num_preshards(), local_committee_info.num_committees(), @@ -849,18 +869,16 @@ where TConsensusSpec: ConsensusSpec *tx_rec.transaction_id(), &filter_diff_for_committee(local_committee_info, diff), )?; - let atom = self.get_transaction_atom_with_leader_fee(local_committee_info, tx_rec)?; + let atom = self.get_transaction_atom_with_leader_fee(tx_rec)?; Ok(Some(Command::AllAccept(atom))) } fn get_transaction_atom_with_leader_fee( &self, - local_committee_info: &CommitteeInfo, tx_rec: &mut TransactionPoolRecord, ) -> Result { if tx_rec.current_decision().is_commit() { - let num_involved_shard_groups = - local_committee_info.count_distinct_shard_groups(tx_rec.evidence().substate_addresses_iter()); + let num_involved_shard_groups = tx_rec.evidence().num_shard_groups(); let involved = NonZeroU64::new(num_involved_shard_groups as u64).ok_or_else(|| { HotStuffError::InvariantError(format!( "PROPOSE: Transaction {} involves zero shard groups", diff --git a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs index 45aab9d6b..47985da61 100644 --- a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs +++ b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs @@ -9,6 +9,7 @@ use tari_dan_common_types::{ committee::CommitteeInfo, optional::Optional, Epoch, + ShardGroup, ToSubstateAddress, VersionedSubstateId, }; @@ -108,7 +109,8 @@ where TConsensusSpec: ConsensusSpec valid_block: &ValidBlock, local_committee_info: &CommitteeInfo, can_propose_epoch_end: bool, - foreign_committee_infos: HashMap, + foreign_committee_infos: HashMap, + change_set: &mut ProposedBlockChangeSet, ) -> Result { let _timer = TraceTimer::info(LOG_TARGET, "Decide on local block").with_iterations(valid_block.block().commands().len()); @@ -118,12 +120,11 @@ where TConsensusSpec: ConsensusSpec valid_block, ); - self.store.with_write_tx(|tx| { - let mut change_set = ProposedBlockChangeSet::new(valid_block.block().as_leaf_block()); + let block_decision = self.store.with_write_tx(|tx| { let mut justified_block = valid_block.justify().get_block(&**tx)?; // This comes before decide so that all evidence can be in place before LocalPrepare and LocalAccept if !justified_block.is_justified() { - self.process_newly_justified_block(tx, &justified_block, local_committee_info, &mut change_set)?; + self.process_newly_justified_block(tx, &justified_block, local_committee_info, change_set)?; justified_block.set_as_justified(tx)?; } @@ -133,7 +134,7 @@ where TConsensusSpec: ConsensusSpec valid_block, can_propose_epoch_end, &foreign_committee_infos, - &mut change_set, + change_set, )?; let mut locked_blocks = Vec::new(); @@ -181,7 +182,9 @@ where TConsensusSpec: ConsensusSpec finalized_transactions, end_of_epoch, }) - }) + })?; + + Ok(block_decision) } fn process_newly_justified_block( @@ -191,8 +194,7 @@ where TConsensusSpec: ConsensusSpec local_committee_info: &CommitteeInfo, change_set: &mut ProposedBlockChangeSet, ) -> Result<(), HotStuffError> { - let _timer = TraceTimer::info(LOG_TARGET, "Process newly justified block") - .with_iterations(new_leaf_block.commands().len()); + let timer = TraceTimer::info(LOG_TARGET, "Process newly justified block"); let locked_block = LockedBlock::get(tx, new_leaf_block.epoch())?; info!( target: LOG_TARGET, @@ -200,6 +202,7 @@ where TConsensusSpec: ConsensusSpec new_leaf_block, ); + let mut num_applicable_commands = 0; let leaf = new_leaf_block.as_leaf_block(); let justify_id = *new_leaf_block.justify().id(); for cmd in new_leaf_block.commands() { @@ -207,6 +210,8 @@ where TConsensusSpec: ConsensusSpec continue; } + num_applicable_commands += 1; + let atom = cmd.transaction().expect("Command must be a transaction"); let Some(mut pool_tx) = change_set.get_transaction(tx, &locked_block, &leaf, atom.id())? else { @@ -226,18 +231,22 @@ where TConsensusSpec: ConsensusSpec } if !pool_tx.is_ready() { - if pool_tx.current_stage().is_local_prepared() && pool_tx.is_ready_for_next_stage() { + if pool_tx.current_stage().is_local_prepared() && pool_tx.is_ready_for_pending_stage() { pool_tx.set_next_stage(TransactionPoolStage::LocalPrepared)?; - } else if pool_tx.current_stage().is_local_accepted() && pool_tx.is_ready_for_next_stage() { + } else if pool_tx.current_stage().is_local_accepted() && pool_tx.is_ready_for_pending_stage() { pool_tx.set_next_stage(TransactionPoolStage::LocalAccepted)?; } else { // Nothing } } - change_set.set_next_transaction_update(pool_tx)?; + if cmd.is_local_prepare() || cmd.is_local_accept() { + change_set.set_next_transaction_update(pool_tx)?; + } } + timer.with_iterations(num_applicable_commands); + Ok(()) } @@ -247,7 +256,7 @@ where TConsensusSpec: ConsensusSpec local_committee_info: &CommitteeInfo, valid_block: &ValidBlock, can_propose_epoch_end: bool, - foreign_committee_infos: &HashMap, + foreign_committee_infos: &HashMap, proposed_block_change_set: &mut ProposedBlockChangeSet, ) -> Result<(), HotStuffError> { if !self.should_vote(tx, valid_block.block())? { @@ -299,7 +308,7 @@ where TConsensusSpec: ConsensusSpec block: &Block, local_committee_info: &CommitteeInfo, can_propose_epoch_end: bool, - foreign_committee_infos: &HashMap, + foreign_committee_infos: &HashMap, proposed_block_change_set: &mut ProposedBlockChangeSet, ) -> Result<(), HotStuffError> { // Store used for transactions that have inputs without specific versions. @@ -371,14 +380,9 @@ where TConsensusSpec: ConsensusSpec } }, Command::LocalAccept(atom) => { - if let Some(reason) = self.evaluate_local_accept_command( - tx, - block, - &locked_block, - atom, - local_committee_info, - proposed_block_change_set, - )? { + if let Some(reason) = + self.evaluate_local_accept_command(tx, block, &locked_block, atom, proposed_block_change_set)? + { proposed_block_change_set.no_vote(reason); return Ok(()); } @@ -407,11 +411,12 @@ where TConsensusSpec: ConsensusSpec } }, Command::ForeignProposal(fp_atom) => { - let Some(foreign_committee_info) = foreign_committee_infos.get(&fp_atom.block_id) else { + let Some(foreign_committee_info) = foreign_committee_infos.get(&fp_atom.shard_group) else { warn!( target: LOG_TARGET, - "❌ NO VOTE: ForeignProposal command in block {} but no foreign proposal found", + "❌ NO VOTE: ForeignProposal command in block {} {} but no foreign proposal found", fp_atom.block_id, + fp_atom.shard_group, ); proposed_block_change_set.no_vote(NoVoteReason::ForeignProposalCommandInBlockMissing); return Ok(()); @@ -481,7 +486,7 @@ where TConsensusSpec: ConsensusSpec return Ok(()); } - let pending = PendingShardStateTreeDiff::get_all_up_to_commit_block(tx, block.justify().block_id())?; + let pending = PendingShardStateTreeDiff::get_all_up_to_commit_block(tx, block.parent())?; let (expected_merkle_root, tree_diffs) = calculate_state_merkle_root( tx, block.shard_group(), @@ -713,7 +718,7 @@ where TConsensusSpec: ConsensusSpec info!( target: LOG_TARGET, - "πŸ‘¨β€πŸ”§ PREPARE: Executing transaction {} in block {}", + "πŸ‘¨β€πŸ”§ PREPARE: Transaction {} in block {}", tx_rec.transaction_id(), block, ); @@ -785,10 +790,9 @@ where TConsensusSpec: ConsensusSpec // foreign inputs/outputs. tx_rec.set_local_decision(Decision::Commit); // Set partial evidence for local inputs using what we know. - tx_rec.set_evidence(multishard.to_initial_evidence( - local_committee_info.num_preshards(), - local_committee_info.num_committees(), - )); + tx_rec + .evidence_mut() + .update(&multishard.to_initial_evidence(local_committee_info)); } }, Decision::Abort => { @@ -1152,7 +1156,6 @@ where TConsensusSpec: ConsensusSpec block: &Block, locked_block: &LockedBlock, atom: &TransactionAtom, - local_committee_info: &CommitteeInfo, proposed_block_change_set: &mut ProposedBlockChangeSet, ) -> Result, HotStuffError> { let Some(mut tx_rec) = @@ -1225,8 +1228,7 @@ where TConsensusSpec: ConsensusSpec // Check the leader fee in the local accept phase. The fee only applied (is added to the block fee) for // AllAccept - let num_involved_shard_groups = - local_committee_info.count_distinct_shard_groups(tx_rec.evidence().substate_addresses_iter()); + let num_involved_shard_groups = tx_rec.evidence().num_shard_groups(); let involved = NonZeroU64::new(num_involved_shard_groups as u64) .ok_or_else(|| HotStuffError::InvariantError("Number of involved shard groups is 0".to_string()))?; let calculated_leader_fee = tx_rec.calculate_leader_fee(involved, EXHAUST_DIVISOR); @@ -1481,7 +1483,7 @@ where TConsensusSpec: ConsensusSpec { warn!( target: LOG_TARGET, - "❌ NO VOTE: Foreign proposal for block {block_id} has already been proposed in this block.", + "❌ NO VOTE: Foreign proposal {block_id} has already been proposed in this block.", block_id = fp_atom.block_id, ); return Ok(Some(NoVoteReason::ForeignProposalAlreadyProposed)); @@ -1490,7 +1492,7 @@ where TConsensusSpec: ConsensusSpec let Some(fp) = fp_atom.get_proposal(tx).optional()? else { warn!( target: LOG_TARGET, - "❌ NO VOTE: Foreign proposal for block {block_id} has not been received.", + "❌ NO VOTE: Foreign proposal {block_id} has not been received.", block_id = fp_atom.block_id, ); return Ok(Some(NoVoteReason::ForeignProposalNotReceived)); @@ -1502,7 +1504,7 @@ where TConsensusSpec: ConsensusSpec if matches!(fp.status(), ForeignProposalStatus::Confirmed) { warn!( target: LOG_TARGET, - "❌ NO VOTE: Foreign proposal for block {block_id} has status {status}.", + "❌ NO VOTE: Foreign proposal {block_id} has status {status}.", block_id = fp_atom.block_id, status = fp.status(), ); diff --git a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs index 70b2178d7..e462faaba 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs @@ -46,20 +46,6 @@ where TConsensusSpec: ConsensusSpec local_committee_info: &CommitteeInfo, ) -> Result<(), HotStuffError> { let _timer = TraceTimer::debug(LOG_TARGET, "OnReceiveForeignProposal"); - let foreign_committee_info = self - .epoch_manager - .get_committee_info_by_validator_public_key(message.block.epoch(), message.block.proposed_by()) - .await?; - self.validate_and_save(message, local_committee_info, &foreign_committee_info)?; - Ok(()) - } - - pub fn validate_and_save( - &mut self, - message: ForeignProposalMessage, - local_committee_info: &CommitteeInfo, - foreign_committee_info: &CommitteeInfo, - ) -> Result<(), HotStuffError> { let proposal = ForeignProposal::from(message); if self.store.with_read_tx(|tx| proposal.exists(tx))? { @@ -72,10 +58,24 @@ where TConsensusSpec: ConsensusSpec return Ok(()); } + let foreign_committee_info = self + .epoch_manager + .get_committee_info_by_validator_public_key(proposal.block.epoch(), proposal.block.proposed_by().clone()) + .await?; + self.store + .with_write_tx(|tx| self.validate_and_save(tx, proposal, local_committee_info, &foreign_committee_info))?; + Ok(()) + } + + pub fn validate_and_save( + &self, + tx: &mut ::WriteTransaction<'_>, + proposal: ForeignProposal, + local_committee_info: &CommitteeInfo, + foreign_committee_info: &CommitteeInfo, + ) -> Result<(), HotStuffError> { // TODO: validate justify_qc - let mut foreign_receive_counter = self - .store - .with_read_tx(|tx| ForeignReceiveCounters::get_or_default(tx))?; + let mut foreign_receive_counter = ForeignReceiveCounters::get_or_default(&**tx)?; if let Err(err) = self.validate_proposed_block( proposal.block(), @@ -107,15 +107,13 @@ where TConsensusSpec: ConsensusSpec info!( target: LOG_TARGET, - "🧩 Receive FOREIGN PROPOSAL for block {}, justify_qc: {}", + "🧩 Receive FOREIGN PROPOSAL {}, justify_qc: {}", proposal.block(), proposal.justify_qc(), ); - self.store.with_write_tx(|tx| { - foreign_receive_counter.save(tx)?; - proposal.upsert(tx, None) - })?; + foreign_receive_counter.save(tx)?; + proposal.upsert(tx, None)?; // Foreign proposals to propose self.pacemaker.on_beat(); diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index b6144fd98..d9da6128c 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -9,18 +9,10 @@ use tari_dan_common_types::{ optional::Optional, Epoch, NumPreshards, + ShardGroup, }; use tari_dan_storage::{ - consensus_models::{ - Block, - BlockId, - HighQc, - LastSentVote, - QuorumCertificate, - QuorumDecision, - TransactionPool, - ValidBlock, - }, + consensus_models::{Block, HighQc, LastSentVote, QuorumCertificate, QuorumDecision, TransactionPool, ValidBlock}, StateStore, }; use tari_epoch_manager::EpochManagerReader; @@ -28,7 +20,8 @@ use tokio::{sync::broadcast, task}; use crate::{ hotstuff::{ - calculate_dummy_blocks, + block_change_set::ProposedBlockChangeSet, + calculate_dummy_blocks_from_justify, create_epoch_checkpoint, error::HotStuffError, on_ready_to_vote_on_local_block::OnReadyToVoteOnLocalBlock, @@ -60,6 +53,7 @@ pub struct OnReceiveLocalProposalHandler { leader_strategy: TConsensusSpec::LeaderStrategy, pacemaker: PaceMakerHandle, on_ready_to_vote_on_local_block: OnReadyToVoteOnLocalBlock, + change_set: Option, outbound_messaging: TConsensusSpec::OutboundMessaging, vote_signing_service: TConsensusSpec::SignatureService, on_receive_foreign_proposal: OnReceiveForeignProposalHandler, @@ -103,6 +97,7 @@ impl OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler Result<(), HotStuffError> { let _timer = TraceTimer::debug(LOG_TARGET, "OnReceiveLocalProposalHandler"); - let exists = self.store.with_read_tx(|tx| msg.block.exists(tx))?; - if exists { - info!(target: LOG_TARGET, "🧊 Block {} already exists", msg.block); + let is_justified = self + .store + .with_read_tx(|tx| Block::has_been_justified(tx, msg.block.id())) + .optional()? + .unwrap_or(false); + if is_justified { + info!(target: LOG_TARGET, "🧊 Block {} has already been processed", msg.block); return Ok(()); } + // Do not trigger leader failures while processing a proposal. // Leader failures will be resumed after the proposal has been processed. // If we vote ACCEPT for the proposal, the leader failure timer will be reset and resume, otherwise (no vote) @@ -130,41 +130,98 @@ impl OnReceiveLocalProposalHandler(Some((high_qc, valid_block))) + })?; + + let Some((high_qc, valid_block)) = maybe_high_qc_and_block else { + // Validation failed, this is already logged so we can exit here + return Ok(()); + }; + // First validate and save the attached foreign proposals - let mut foreign_committees = HashMap::with_capacity(msg.foreign_proposals.len()); - for foreign_proposal in msg.foreign_proposals { - let block_id = *foreign_proposal.block.id(); + let mut foreign_committees = HashMap::with_capacity(foreign_proposals.len()); + // TODO(perf): fetch committee info in single call + for foreign_proposal in &foreign_proposals { + let shard_group = foreign_proposal.block.shard_group(); + if foreign_committees.contains_key(&shard_group) { + continue; + } let foreign_committee_info = self .epoch_manager .get_committee_info_by_validator_public_key( foreign_proposal.block.epoch(), - foreign_proposal.block.proposed_by(), + foreign_proposal.block.proposed_by().clone(), ) .await?; - self.on_receive_foreign_proposal.validate_and_save( - foreign_proposal.into(), - local_committee_info, - &foreign_committee_info, - )?; - foreign_committees.insert(block_id, foreign_committee_info); + foreign_committees.insert(shard_group, foreign_committee_info); } + self.store.with_write_tx(|tx| { + for foreign_proposal in foreign_proposals { + if foreign_proposal.exists(&**tx)? { + // This is expected behaviour, we may receive the same foreign proposal multiple times + debug!( + target: LOG_TARGET, + "FOREIGN PROPOSAL: Already received proposal for block {}", + foreign_proposal.block().id(), + ); + + continue; + } + let shard_group = foreign_proposal.block().shard_group(); + + self.on_receive_foreign_proposal.validate_and_save( + tx, + foreign_proposal, + local_committee_info, + foreign_committees.get(&shard_group).unwrap(), + )?; + } + Ok::<_, HotStuffError>(()) + })?; + let result = self - .process_block(current_epoch, local_committee_info, msg.block, foreign_committees) + .process_block( + current_epoch, + local_committee_info, + valid_block, + high_qc, + foreign_committees, + ) .await; - if let Err(err) = self.pacemaker.resume_leader_failure().await { - error!(target: LOG_TARGET, "Error resuming leader failure: {:?}", err); - } - match result { Ok(()) => Ok(()), - Err(err @ HotStuffError::ProposalValidationError(_)) => { - self.hooks.on_block_validation_failed(&err); + Err(err) => { + if let Err(err) = self.pacemaker.resume_leader_failure().await { + error!(target: LOG_TARGET, "Error resuming leader failure: {:?}", err); + } + if matches!(err, HotStuffError::ProposalValidationError(_)) { + self.hooks.on_block_validation_failed(&err); + } Err(err) }, - Err(err) => Err(err), } } @@ -173,36 +230,25 @@ impl OnReceiveLocalProposalHandler, + valid_block: ValidBlock, + high_qc: HighQc, + foreign_committees: HashMap, ) -> Result<(), HotStuffError> { - let local_committee = self - .epoch_manager - .get_committee_by_validator_public_key(block.epoch(), block.proposed_by()) - .await?; - - let maybe_high_qc_and_block = self.store.with_write_tx(|tx| { - let Some(valid_block) = self.validate_block_header(&*tx, block, &local_committee, local_committee_info)? - else { - return Ok(None); - }; - - // Save the block as soon as it is valid to ensure we have a valid pacemaker height. - let high_qc = self.save_block(tx, &valid_block)?; - info!(target: LOG_TARGET, "βœ… Block {} is valid and persisted. HighQc({})", valid_block, high_qc); - Ok::<_, HotStuffError>(Some((high_qc, valid_block))) - })?; - - let Some((high_qc, valid_block)) = maybe_high_qc_and_block else { - // Validation failed, this is already logged so we can exit here - return Ok(()); - }; - let em_epoch = self.epoch_manager.current_epoch().await?; let can_propose_epoch_end = em_epoch > current_epoch; + let is_epoch_end = valid_block.block().is_epoch_end(); let mut on_ready_to_vote_on_local_block = self.on_ready_to_vote_on_local_block.clone(); - let (block_decision, valid_block) = task::spawn_blocking({ + // Reusing the change set allocated memory. + let mut change_set = self + .change_set + .take() + .map(|mut c| { + c.set_block(valid_block.block().as_leaf_block()); + c + }) + .unwrap_or_else(|| ProposedBlockChangeSet::new(valid_block.block().as_leaf_block())); + let (block_decision, valid_block, mut change_set) = task::spawn_blocking({ // Move into task let local_committee_info = *local_committee_info; move || { @@ -211,12 +257,17 @@ impl OnReceiveLocalProposalHandler((decision, valid_block)) + Ok::<_, HotStuffError>((decision, valid_block, change_set)) } }) .await??; + change_set.clear(); + self.change_set = Some(change_set); + + let is_accept_decision = block_decision.is_accept(); if let Some(decision) = block_decision.quorum_decision { self.pacemaker .update_view(valid_block.epoch(), valid_block.height(), high_qc.block_height()) @@ -234,6 +285,8 @@ impl OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler) { + fn propose_newly_locked_blocks( + &mut self, + local_committee_info: CommitteeInfo, + blocks: Vec<(Block, QuorumCertificate)>, + ) { if blocks.is_empty() { return; } @@ -339,6 +400,7 @@ impl OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler, _local_committee_info: &CommitteeInfo, ) -> Result { - if Block::has_been_processed(tx, candidate_block.id())? { + if Block::has_been_justified(tx, candidate_block.id())? { return Err(ProposalValidationError::BlockAlreadyProcessed { block_id: *candidate_block.id(), height: candidate_block.height(), @@ -535,6 +597,8 @@ impl OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler( outbound_messaging: TConsensusSpec::OutboundMessaging, store: TConsensusSpec::StateStore, num_preshards: NumPreshards, + local_committee_info: CommitteeInfo, blocks: Vec<(Block, QuorumCertificate)>, ) { let _timer = TraceTimer::debug(LOG_TARGET, "ProposeNewlyLockedBlocks").with_iterations(blocks.len()); @@ -638,6 +705,7 @@ async fn propose_newly_locked_blocks_task( outbound_messaging, store, num_preshards, + &local_committee_info, blocks, ) .await @@ -652,6 +720,7 @@ async fn propose_newly_locked_blocks_task_inner( mut outbound_messaging: TConsensusSpec::OutboundMessaging, store: TConsensusSpec::StateStore, num_preshards: NumPreshards, + local_committee_info: &CommitteeInfo, blocks: Vec<(Block, QuorumCertificate)>, ) -> Result<(), HotStuffError> { for (block, justify_qc) in blocks.into_iter().rev() { @@ -665,7 +734,7 @@ async fn propose_newly_locked_blocks_task_inner( }; let local_committee = epoch_manager - .get_committee_by_validator_public_key(block.epoch(), block.proposed_by()) + .get_committee_by_validator_public_key(block.epoch(), block.proposed_by().clone()) .await?; let leader_index = leader_strategy.calculate_leader(&local_committee, block.height()) as usize; let my_index = local_committee @@ -689,6 +758,7 @@ async fn propose_newly_locked_blocks_task_inner( &epoch_manager, &store, num_preshards, + local_committee_info, block, justify_qc, ) @@ -703,6 +773,7 @@ async fn broadcast_foreign_proposal_if_required( epoch_manager: &TConsensusSpec::EpochManager, store: &TConsensusSpec::StateStore, num_preshards: NumPreshards, + _local_committee_info: &CommitteeInfo, block: Block, justify_qc: QuorumCertificate, ) -> Result<(), HotStuffError> { @@ -713,9 +784,14 @@ async fn broadcast_foreign_proposal_if_required( let non_local_shard_groups = block .commands() .iter() - .filter_map(|c| c.local_prepare().or_else(|| c.local_accept())) - .flat_map(|p| p.evidence.substate_addresses_iter()) - .map(|addr| addr.to_shard_group(num_preshards, num_committees)) + .filter_map(|c| { + c.local_prepare() + // No need to broadcast LocalPrepare if the committee is output only + // FIXME: this breaks free_coins transaction (see transaction_generator) + // .filter(|atom| !atom.evidence.is_committee_output_only(local_committee_info)) + .or_else(|| c.local_accept()) + }) + .flat_map(|p| p.evidence.shard_groups_iter().copied()) .filter(|shard_group| local_shard_group != *shard_group) .collect::>(); if non_local_shard_groups.is_empty() { @@ -723,18 +799,13 @@ async fn broadcast_foreign_proposal_if_required( } info!( target: LOG_TARGET, - "🌿 PROPOSING new locked block {} to {} foreign shard groups. justify: {} ({}), parent: {}", + "🌐 BROADCASTING new locked block {} to {} foreign shard groups. justify: {} ({}), parent: {}", block, non_local_shard_groups.len(), justify_qc.block_id(), justify_qc.block_height(), block.parent() ); - debug!( - target: LOG_TARGET, - "non_local_shards : [{}]", - non_local_shard_groups.iter().map(|s|s.to_string()).collect::>().join(","), - ); let block_pledge = store .with_read_tx(|tx| block.get_block_pledge(tx)) @@ -751,7 +822,7 @@ async fn broadcast_foreign_proposal_if_required( for shard_group in non_local_shard_groups { info!( target: LOG_TARGET, - "🌿 FOREIGN PROPOSE: Broadcasting locked block {} with {} pledge(s) to shard group {}.", + "🌐 FOREIGN PROPOSE: Broadcasting locked block {} with {} pledge(s) to shard group {}.", &block, &block_pledge.num_substates_pledged(), shard_group, diff --git a/dan_layer/consensus/src/hotstuff/on_receive_new_transaction.rs b/dan_layer/consensus/src/hotstuff/on_receive_new_transaction.rs index 3a5ebefbc..92da44b1e 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_new_transaction.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_new_transaction.rs @@ -2,13 +2,13 @@ // SPDX-License-Identifier: BSD-3-Clause use log::*; -use tari_dan_common_types::{committee::CommitteeInfo, optional::Optional, Epoch}; +use tari_dan_common_types::{committee::CommitteeInfo, Epoch}; use tari_dan_storage::{ consensus_models::{TransactionPool, TransactionRecord}, StateStore, }; use tari_engine_types::commit_result::RejectReason; -use tari_transaction::{Transaction, TransactionId}; +use tari_transaction::TransactionId; use tokio::sync::mpsc; use crate::{ @@ -24,7 +24,7 @@ pub struct OnReceiveNewTransaction { store: TConsensusSpec::StateStore, transaction_pool: TransactionPool, executor: TConsensusSpec::TransactionExecutor, - tx_missing_transactions: mpsc::UnboundedSender, + tx_missing_transactions: mpsc::UnboundedSender>, } impl OnReceiveNewTransaction @@ -34,7 +34,7 @@ where TConsensusSpec: ConsensusSpec store: TConsensusSpec::StateStore, transaction_pool: TransactionPool, executor: TConsensusSpec::TransactionExecutor, - tx_missing_transactions: mpsc::UnboundedSender, + tx_missing_transactions: mpsc::UnboundedSender>, ) -> Self { Self { store, @@ -52,22 +52,29 @@ where TConsensusSpec: ConsensusSpec local_committee_info: &CommitteeInfo, ) -> Result<(), HotStuffError> { let _timer = TraceTimer::debug(LOG_TARGET, "OnReceiveRequestedTransactions"); - info!(target: LOG_TARGET, "Receiving {} requested transactions for block {} from {:?}", msg.transactions.len(), msg.block_id, from, ); + info!(target: LOG_TARGET, "Receiving {} requested transactions for block {} from {:?}", msg.transactions.len(), msg.block_id, from); self.store.with_write_tx(|tx| { - for transaction in msg.transactions { - if let Some(rec) = - self.validate_and_sequence_transaction(tx, current_epoch, transaction, local_committee_info)? + let recs = TransactionRecord::get_any_or_build(&**tx, msg.transactions)?; + let mut batch = Vec::with_capacity(recs.len()); + for transaction in recs { + if let Some(transaction_and_is_ready) = + self.validate_new_transaction(tx, current_epoch, transaction, local_committee_info)? { - // TODO: Could this cause a race-condition? Transaction could be proposed as Prepare before the - // unparked block is processed (however, if there's a parked block it's probably not our turn to - // propose). Ideally we remove this channel because it's a work around - self.tx_missing_transactions - .send(*rec.id()) - .map_err(|_| HotStuffError::InternalChannelClosed { - context: "process_requested", - })?; + batch.push(transaction_and_is_ready); } } + + self.transaction_pool + .insert_new_batched(tx, batch.iter().map(|(t, is_ready)| (t, *is_ready)))?; + + // TODO: Could this cause a race-condition? Transaction could be proposed as Prepare before the + // unparked block is processed (however, if there's a parked block it's probably not our turn to + // propose). Ideally we remove this channel because it's a work around + self.tx_missing_transactions + .send(batch.iter().map(|(t, _)| *t.id()).collect()) + .map_err(|_| HotStuffError::InternalChannelClosed { + context: "process_requested", + })?; Ok(()) }) } @@ -75,29 +82,32 @@ where TConsensusSpec: ConsensusSpec pub fn try_sequence_transaction( &mut self, current_epoch: Epoch, - transaction: Transaction, + transaction: TransactionRecord, local_committee_info: &CommitteeInfo, ) -> Result, HotStuffError> { self.store.with_write_tx(|tx| { - self.validate_and_sequence_transaction(tx, current_epoch, transaction, local_committee_info) + let Some((transaction, is_ready)) = + self.validate_new_transaction(tx, current_epoch, transaction, local_committee_info)? + else { + return Ok(None); + }; + + self.add_to_pool(tx, &transaction, is_ready)?; + Ok(Some(transaction)) }) } - fn validate_and_sequence_transaction( + fn validate_new_transaction( &self, tx: &mut <::StateStore as StateStore>::WriteTransaction<'_>, current_epoch: Epoch, - transaction: Transaction, + mut rec: TransactionRecord, local_committee_info: &CommitteeInfo, - ) -> Result, HotStuffError> { - if self.transaction_pool.exists(&**tx, transaction.id())? { + ) -> Result, HotStuffError> { + if self.transaction_pool.exists(&**tx, rec.id())? { return Ok(None); } - let mut rec = TransactionRecord::get(&**tx, transaction.id()) - .optional()? - .unwrap_or_else(|| TransactionRecord::new(transaction)); - // Edge case: a validator sends a transaction that is already finalized as a missing transaction or via // propagation if rec.is_finalized() { @@ -115,9 +125,9 @@ where TConsensusSpec: ConsensusSpec "Transaction {} failed validation: {}", rec.id(), err ); rec.set_abort_reason(RejectReason::InvalidTransaction(err.to_string())) - .insert(tx)?; - self.add_to_pool(tx, &rec, true)?; - return Ok(Some(rec)); + .save(tx)?; + // self.add_to_pool(tx, &rec, true)?; + return Ok(Some((rec, true))); } rec.save(tx)?; @@ -126,9 +136,8 @@ where TConsensusSpec: ConsensusSpec .includes_substate_id(&rec.to_receipt_id().into()) || rec.has_any_local_inputs(local_committee_info) || rec.has_all_foreign_input_pledges(&**tx, local_committee_info)?; - self.add_to_pool(tx, &rec, has_some_local_inputs_or_all_foreign_inputs)?; - Ok(Some(rec)) + Ok(Some((rec, has_some_local_inputs_or_all_foreign_inputs))) } fn add_to_pool( diff --git a/dan_layer/consensus/src/hotstuff/on_receive_new_view.rs b/dan_layer/consensus/src/hotstuff/on_receive_new_view.rs index 6a1afec70..3ce20f32a 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_new_view.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_new_view.rs @@ -89,17 +89,12 @@ where TConsensusSpec: ConsensusSpec let epoch = high_qc.epoch(); debug!( target: LOG_TARGET, - "🌟 Received NEWVIEW for qc {} new height {} from {}", - high_qc, + "🌟 Received NEWVIEW {} for qc {} from {}", new_height, + high_qc, from ); - if !self.epoch_manager.is_this_validator_registered_for_epoch(epoch).await? { - warn!(target: LOG_TARGET, "❌ Ignoring NEWVIEW for epoch {} because the epoch is invalid or we are not registered for that epoch", epoch); - return Ok(()); - } - if epoch != current_epoch { warn!(target: LOG_TARGET, "❌ Ignoring NEWVIEW for epoch {} because the epoch is not the current epoch", epoch); return Ok(()); @@ -115,17 +110,17 @@ where TConsensusSpec: ConsensusSpec // } // We can never accept NEWVIEWS for heights that are lower than the locked block height - let locked = self.store.with_read_tx(|tx| LockedBlock::get(tx, epoch))?; - if new_height < locked.height() { - warn!(target: LOG_TARGET, "❌ Ignoring NEWVIEW for height less than the locked block, locked block: {} new height: {}", locked, new_height); - return Ok(()); - } + self.store.with_read_tx(|tx| { + let locked = LockedBlock::get(tx, epoch)?; + if new_height < locked.height() { + warn!(target: LOG_TARGET, "❌ Ignoring NEWVIEW for height less than the locked block, locked block: {} new height: {}", locked, new_height); + return Ok(()); + } - self.validate_qc(&high_qc)?; + self.validate_qc(&high_qc)?; - // Sync if we do not have the block for this valid QC - self.store.with_read_tx(|tx| { if !Block::record_exists(tx, high_qc.block_id())? { + // Sync if we do not have the block for this valid QC let local_height = LeafBlock::get(tx, epoch) .optional()? .map(|leaf| leaf.height()) @@ -135,6 +130,7 @@ where TConsensusSpec: ConsensusSpec qc_height: high_qc.block_height(), }); } + Ok(()) })?; @@ -146,18 +142,13 @@ where TConsensusSpec: ConsensusSpec if *leader != our_node.address { warn!(target: LOG_TARGET, "❌ New View failed, leader is {} at height:{}", leader, new_height); - return Err(HotStuffError::NotTheLeader { - details: format!( - "Received NEWVIEW height {} but this not is not the leader for that height", - new_height - ), - }); + return Ok(()); } if let Some(vote) = last_vote { debug!( target: LOG_TARGET, - "πŸ”₯ Receive VOTE with NEWVIEW for node {} from {}", vote.block_id, from, + "πŸ”₯ Receive VOTE with NEWVIEW for node {} {} from {}", vote.block_height, vote.block_id, from, ); self.vote_receiver .handle(from.clone(), vote, false, local_committee_info) @@ -167,16 +158,13 @@ where TConsensusSpec: ConsensusSpec // Are nodes requesting to create more than the minimum number of dummy blocks? let height_diff = high_qc.block_height().saturating_sub(new_height).as_u64(); if height_diff > local_committee.len() as u64 { - return Err(HotStuffError::BadNewViewMessage { - details: format!( - "Validator {from} sent NEWVIEW that attempts to create a larger than necessary number of dummy \ - blocks. Expected requested {} < local committee size {}", - height_diff, - local_committee.len() - ), - high_qc_height: high_qc.block_height(), - received_new_height: new_height, - }); + warn!( + target: LOG_TARGET, + "❌ Validator {from} sent NEWVIEW that attempts to create a larger than necessary number of dummy blocks. Expected requested {} < local committee size {}", + height_diff, + local_committee.len() + ); + return Ok(()); } // Take note of unique NEWVIEWs so that we can count them @@ -197,21 +185,23 @@ where TConsensusSpec: ConsensusSpec info!( target: LOG_TARGET, - "🌟 Received NEWVIEW for height {} (QC: {}) has {} votes out of {}", - new_height, - latest_high_qc, + "🌟 Received NEWVIEW (QUORUM: {}/{}) {} (QC: {})", newview_count, threshold, + new_height, + latest_high_qc, ); // Once we have received enough (quorum) NEWVIEWS, we can create the dummy block(s) and propose the next block. // Any subsequent NEWVIEWs for this height/view are ignored. if newview_count == threshold { info!(target: LOG_TARGET, "πŸŒŸβœ… NEWVIEW for block {} (high_qc: {}) has reached quorum ({}/{})", new_height, latest_high_qc.as_high_qc(), newview_count, threshold); + self.pacemaker + .update_view(epoch, new_height, high_qc.block_height()) + .await?; if latest_high_qc.block_height() + NodeHeight(1) > new_height { // CASE: the votes received from NEWVIEWS created a new high QC, so there are no dummy blocks to create - // We can force beat with our current leaf. - let leaf_block = self.store.with_read_tx(|tx| LeafBlock::get(tx, epoch))?; - self.pacemaker.force_beat(leaf_block); + // We can force beat with our current leaf and the justified block is the parent. + self.pacemaker.force_beat_current_leaf(); return Ok(()); } diff --git a/dan_layer/consensus/src/hotstuff/on_sync_request.rs b/dan_layer/consensus/src/hotstuff/on_sync_request.rs index 32b922bd0..e25a54bd5 100644 --- a/dan_layer/consensus/src/hotstuff/on_sync_request.rs +++ b/dan_layer/consensus/src/hotstuff/on_sync_request.rs @@ -2,9 +2,9 @@ // SPDX-License-Identifier: BSD-3-Clause use log::*; -use tari_dan_common_types::{optional::Optional, Epoch}; +use tari_dan_common_types::{committee::CommitteeInfo, optional::Optional, Epoch}; use tari_dan_storage::{ - consensus_models::{Block, LastSentVote, LeafBlock}, + consensus_models::{Block, LastProposed, LastSentVote, LeafBlock}, StateStore, }; use tokio::task; @@ -32,13 +32,19 @@ impl OnSyncRequest { } #[allow(clippy::too_many_lines)] - pub fn handle(&self, from: TConsensusSpec::Addr, epoch: Epoch, msg: SyncRequestMessage) { - if msg.epoch != epoch { + pub fn handle( + &self, + from: TConsensusSpec::Addr, + local_committee_info: CommitteeInfo, + epoch: Epoch, + msg: SyncRequestMessage, + ) { + if msg.high_qc.epoch() != epoch { warn!( target: LOG_TARGET, "Received SyncRequest from {} for epoch {} but our epoch is {}. Ignoring request.", from, - msg.epoch, + msg.high_qc.epoch(), epoch ); return; @@ -49,7 +55,11 @@ impl OnSyncRequest { task::spawn(async move { let result = store.with_read_tx(|tx| { - let leaf_block = LeafBlock::get(tx, epoch)?.get_block(tx)?; + let mut leaf_block = LeafBlock::get(tx, epoch)?; + let last_proposed = LastProposed::get(tx)?; + if last_proposed.height > leaf_block.height() { + leaf_block = last_proposed.as_leaf_block(); + } if leaf_block.height() < msg.high_qc.block_height() { return Err(HotStuffError::InvalidSyncRequest { @@ -73,9 +83,9 @@ impl OnSyncRequest { let blocks = Block::get_all_blocks_between( tx, leaf_block.epoch(), - leaf_block.shard_group(), + local_committee_info.shard_group(), msg.high_qc.block_id(), - leaf_block.id(), + leaf_block.block_id(), true, )?; @@ -84,7 +94,9 @@ impl OnSyncRequest { let blocks = match result { Ok(mut blocks) => { - blocks.retain(|b| !b.is_genesis()); + if let Some(pos) = blocks.iter().position(|b| b.is_genesis()) { + blocks.remove(pos); + } blocks }, Err(err) => { diff --git a/dan_layer/consensus/src/hotstuff/pacemaker_handle.rs b/dan_layer/consensus/src/hotstuff/pacemaker_handle.rs index 3015eb05f..f2dbc165e 100644 --- a/dan_layer/consensus/src/hotstuff/pacemaker_handle.rs +++ b/dan_layer/consensus/src/hotstuff/pacemaker_handle.rs @@ -79,6 +79,10 @@ impl PaceMakerHandle { self.on_force_beat.beat(Some(parent_block)); } + pub fn force_beat_current_leaf(&self) { + self.on_force_beat.beat(None); + } + pub fn get_on_beat(&self) -> OnBeat { self.on_beat.clone() } diff --git a/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs b/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs index c0159b176..c451aeff9 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs @@ -96,10 +96,17 @@ impl ShardedStateTree<&TTx> { let mut store = StagedTreeStore::new(&scoped_store); // Apply pending (not yet committed) diffs to the staged store if let Some(diffs) = self.pending_diffs.get(&shard) { - debug!(target: LOG_TARGET, "Applying {num_diffs} pending diff(s) to shard {shard} (version={version})", num_diffs = diffs.len(), version = diffs.last().map(|d| d.version).unwrap_or(0)); + let mut num_changes = 0usize; for diff in diffs { + num_changes += diff.diff.new_nodes.len() + diff.diff.stale_tree_nodes.len(); store.apply_pending_diff(diff.diff.clone()); } + debug!( + target: LOG_TARGET, + "Applied {num_diffs} pending diff(s) ({num_changes} change(s)) to shard {shard} (version={version})", + num_diffs = diffs.len(), + version = diffs.last().map(|d| d.version).unwrap_or(0) + ); } // Apply state updates to the state tree that is backed by the staged shard-scoped store diff --git a/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs b/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs index 884a7bf68..e45e8eeb1 100644 --- a/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs +++ b/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs @@ -121,7 +121,7 @@ impl> Ok(executed) } - fn execute_or_fetch( + pub fn execute_or_fetch( &self, store: &mut PendingSubstateStore, transaction: Transaction, diff --git a/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs b/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs index a7d33c47a..609d959bb 100644 --- a/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs +++ b/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs @@ -4,7 +4,7 @@ use std::collections::HashSet; use indexmap::IndexMap; -use tari_dan_common_types::{NumPreshards, SubstateRequirement, VersionedSubstateId}; +use tari_dan_common_types::{committee::CommitteeInfo, SubstateRequirement, VersionedSubstateId}; use tari_dan_storage::consensus_models::{Decision, Evidence, TransactionExecution, VersionedSubstateIdLockIntent}; use crate::hotstuff::substate_store::LockStatus; @@ -101,7 +101,7 @@ impl MultiShardPreparedTransaction { &self.local_inputs } - pub fn outputs(&self) -> &HashSet { + pub fn known_outputs(&self) -> &HashSet { &self.outputs } @@ -109,24 +109,7 @@ impl MultiShardPreparedTransaction { self.execution } - pub fn to_initial_evidence(&self, num_preshards: NumPreshards, num_committees: u32) -> Evidence { - // if let Some(ref execution) = self.execution { - // return Evidence::from_inputs_and_outputs(execution.resolved_inputs(), execution.resulting_outputs()); - // } - // - // // CASE: One or more local inputs are not found, so the transaction is aborted. - // if self.current_decision().is_abort() { - // return Evidence::from_inputs_and_outputs( - // self.execution - // .transaction() - // .all_inputs_iter() - // .map(|input| VersionedSubstateIdLockIntent::from_requirement(input, SubstateLockType::Read)), - // self.outputs - // .iter() - // .map(|id| VersionedSubstateIdLockIntent::output(id.clone())), - // ); - // } - + pub fn to_initial_evidence(&self, local_committee_info: &CommitteeInfo) -> Evidence { // TODO: We do not know if the inputs locks required are Read/Write. Either we allow the user to // specify this or we can correct the locks after execution. Currently, this limitation // prevents concurrent multi-shard read locks. @@ -134,17 +117,35 @@ impl MultiShardPreparedTransaction { .local_inputs() .iter() .map(|(requirement, version)| VersionedSubstateId::new(requirement.substate_id.clone(), *version)) - // TODO(correctness): to_zero_version is error prone when used in evidence and the correctness depends how it is used. - // e.g. using it to determining which shard is involved is fine, but loading substate by the address is incorrect (v0 may or may not be the actual pledged substate) - .chain(self.foreign_inputs().iter().map(|r| r.clone().or_zero_version())) .map(|id| VersionedSubstateIdLockIntent::write(id, true)); let outputs = self - .outputs() + .known_outputs() .iter() .cloned() .map(VersionedSubstateIdLockIntent::output); - Evidence::from_inputs_and_outputs(num_preshards, num_committees, inputs, outputs) + let mut evidence = Evidence::from_inputs_and_outputs( + local_committee_info.num_preshards(), + local_committee_info.num_committees(), + inputs, + outputs, + ); + + // Add foreign involved shard groups without adding any substates (because we do not know the pledged version + // yet) + self.foreign_inputs() + .iter() + .map(|r| { + r.to_substate_address_zero_version().to_shard_group( + local_committee_info.num_preshards(), + local_committee_info.num_committees(), + ) + }) + .for_each(|sg| { + evidence.add_shard_group(sg); + }); + + evidence } } diff --git a/dan_layer/consensus/src/hotstuff/vote_receiver.rs b/dan_layer/consensus/src/hotstuff/vote_receiver.rs index b8b199beb..e8404826f 100644 --- a/dan_layer/consensus/src/hotstuff/vote_receiver.rs +++ b/dan_layer/consensus/src/hotstuff/vote_receiver.rs @@ -92,7 +92,7 @@ where TConsensusSpec: ConsensusSpec // Is a local committee member that signed this vote? let sender_vn = self .epoch_manager - .get_validator_node_by_public_key(message.epoch, &message.signature.public_key) + .get_validator_node_by_public_key(message.epoch, message.signature.public_key.clone()) .await .optional()?; let Some(sender_vn) = sender_vn else { diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index 97c6d4b02..3e9f0d6c0 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -1,12 +1,24 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::fmt::{Debug, Formatter}; +use std::{ + fmt::{Debug, Formatter}, + iter, +}; use log::*; use tari_dan_common_types::{committee::CommitteeInfo, Epoch, NodeHeight, ShardGroup}; use tari_dan_storage::{ - consensus_models::{Block, BlockDiff, BurntUtxo, ForeignProposal, HighQc, LeafBlock, TransactionPool}, + consensus_models::{ + Block, + BlockDiff, + BurntUtxo, + ForeignProposal, + HighQc, + LeafBlock, + TransactionPool, + TransactionRecord, + }, StateStore, }; use tari_epoch_manager::{EpochManagerEvent, EpochManagerReader}; @@ -35,7 +47,7 @@ use crate::{ transaction_manager::ConsensusTransactionManager, vote_receiver::VoteReceiver, }, - messages::HotstuffMessage, + messages::{HotstuffMessage, ProposalMessage}, tracing::TraceTimer, traits::{hooks::ConsensusHooks, ConsensusSpec, LeaderStrategy}, }; @@ -49,7 +61,7 @@ pub struct HotstuffWorker { tx_events: broadcast::Sender, rx_new_transactions: mpsc::Receiver<(Transaction, usize)>, - rx_missing_transactions: mpsc::UnboundedReceiver, + rx_missing_transactions: mpsc::UnboundedReceiver>, on_inbound_message: OnInboundMessage, on_next_sync_view: OnNextSyncViewHandler, @@ -291,7 +303,7 @@ impl HotstuffWorker { Some(result) = self.on_inbound_message.next_message(current_epoch, current_height) => { if let Err(e) = self.on_unvalidated_message(current_epoch, current_height, result, &local_committee_info).await { - self.on_failure("on_beat", &e).await; + self.on_failure("on_inbound_message", &e).await; return Err(e); } }, @@ -300,8 +312,8 @@ impl HotstuffWorker { // We cannot simply call check_if_block_can_be_unparked in dispatch_hotstuff_message as that creates a cycle. // One suggestion is to refactor consensus to emit events (kinda like libp2p does) and handle those events. // This should be easy to reason about and avoid a large depth of async calls and "callback channels". - Some(tx_id) = self.rx_missing_transactions.recv() => { - if let Err(err) = self.check_if_block_can_be_unparked(current_epoch, current_height, &tx_id, &local_committee_info).await { + Some(batch) = self.rx_missing_transactions.recv() => { + if let Err(err) = self.check_if_block_can_be_unparked(current_epoch, current_height, batch.iter(), &local_committee_info).await { self.hooks.on_error(&err); error!(target: LOG_TARGET, "🚨Error handling missing transaction: {}", err); } @@ -415,7 +427,7 @@ impl HotstuffWorker { let _timer = TraceTimer::info(LOG_TARGET, "on_new_transaction"); let maybe_transaction = self.on_receive_new_transaction.try_sequence_transaction( current_epoch, - transaction, + TransactionRecord::new(transaction), local_committee_info, )?; @@ -433,7 +445,12 @@ impl HotstuffWorker { self.hooks.on_transaction_ready(transaction.id()); if self - .check_if_block_can_be_unparked(current_epoch, current_height, transaction.id(), local_committee_info) + .check_if_block_can_be_unparked( + current_epoch, + current_height, + iter::once(transaction.id()), + local_committee_info, + ) .await? { // No need to call on_beat, a block was unparked so on_beat will be called as needed @@ -450,57 +467,33 @@ impl HotstuffWorker { } /// Returns true if a block was unparked, otherwise false - async fn check_if_block_can_be_unparked( + async fn check_if_block_can_be_unparked< + 'a, + I: IntoIterator + ExactSizeIterator + Clone, + >( &mut self, current_epoch: Epoch, current_height: NodeHeight, - tx_id: &TransactionId, + transaction_ids: I, local_committee_info: &CommitteeInfo, ) -> Result { - let mut is_any_block_unparked = false; - if let Some(msg) = self + let (local_proposals, foreign_proposals) = self .on_message_validate - .update_local_parked_blocks(current_height, tx_id)? - { - let vn = self - .epoch_manager - .get_validator_node_by_public_key(msg.block.epoch(), msg.block.proposed_by()) - .await?; + .update_local_parked_blocks(current_height, transaction_ids)?; - if let Err(e) = self - .dispatch_hotstuff_message( - current_epoch, - vn.address, - HotstuffMessage::Proposal(msg), - local_committee_info, - ) - .await - { - self.on_failure("on_new_transaction -> dispatch_hotstuff_message", &e) + let is_any_block_unparked = !local_proposals.is_empty() || !foreign_proposals.is_empty(); + + for msg in foreign_proposals { + if let Err(e) = self.on_receive_foreign_proposal.handle(msg, local_committee_info).await { + self.on_failure("check_if_block_can_be_unparked -> on_receive_foreign_proposal", &e) .await; return Err(e); } - is_any_block_unparked = true; } - let unparked_foreign_blocks = self.on_message_validate.update_foreign_parked_blocks(tx_id)?; - is_any_block_unparked |= !unparked_foreign_blocks.is_empty(); - for parked in unparked_foreign_blocks { - let vn = self - .epoch_manager - .get_validator_node_by_public_key(parked.block().epoch(), parked.block().proposed_by()) - .await?; - - if let Err(e) = self - .dispatch_hotstuff_message( - current_epoch, - vn.address, - HotstuffMessage::ForeignProposal(parked.into()), - local_committee_info, - ) - .await - { - self.on_failure("on_new_transaction -> dispatch_hotstuff_message", &e) + for msg in local_proposals { + if let Err(e) = self.on_proposal_message(current_epoch, local_committee_info, msg).await { + self.on_failure("check_if_block_can_be_unparked -> on_proposal_message", &e) .await; return Err(e); } @@ -529,7 +522,7 @@ impl HotstuffWorker { // } // If we can propose a block end, let's not wait for the block time to do it - self.pacemaker.beat(); + // self.pacemaker.beat(); }, EpochManagerEvent::ThisValidatorIsRegistered { .. } => {}, } @@ -538,9 +531,10 @@ impl HotstuffWorker { } async fn request_initial_catch_up_sync(&mut self, current_epoch: Epoch) -> Result<(), HotStuffError> { - let committee = self.epoch_manager.get_local_committee(current_epoch).await?; - for member in committee.shuffled() { - if *member != self.local_validator_addr { + let mut committee = self.epoch_manager.get_local_committee(current_epoch).await?; + committee.shuffle(); + for (member, _) in committee { + if member != self.local_validator_addr { self.on_catch_up_sync.request_sync(current_epoch, member).await?; break; } @@ -675,29 +669,10 @@ impl HotstuffWorker { .handle(current_epoch, from, message, local_committee_info) .await, ), - HotstuffMessage::Proposal(msg) => { - match log_err( - "on_receive_local_proposal", - self.on_receive_local_proposal - .handle(current_epoch, local_committee_info, msg) - .await, - ) { - Ok(_) => Ok(()), - Err( - err @ HotStuffError::ProposalValidationError(ProposalValidationError::JustifyBlockNotFound { - .. - }), - ) => { - warn!( - target: LOG_TARGET, - "⚠️This node has fallen behind due to a missing justified block: {err}" - ); - self.on_catch_up_sync.request_sync(current_epoch, &from).await?; - Ok(()) - }, - Err(err) => Err(err), - } - }, + HotstuffMessage::Proposal(msg) => log_err( + "on_receive_local_proposal", + self.on_proposal_message(current_epoch, local_committee_info, msg).await, + ), HotstuffMessage::ForeignProposal(msg) => log_err( "on_receive_foreign_proposal", self.on_receive_foreign_proposal.handle(msg, local_committee_info).await, @@ -717,7 +692,8 @@ impl HotstuffWorker { .await, ), HotstuffMessage::CatchUpSyncRequest(msg) => { - self.on_sync_request.handle(from, current_epoch, msg); + self.on_sync_request + .handle(from, *local_committee_info, current_epoch, msg); Ok(()) }, HotstuffMessage::SyncResponse(_) => { @@ -730,6 +706,36 @@ impl HotstuffWorker { } } + async fn on_proposal_message( + &mut self, + current_epoch: Epoch, + local_committee_info: &CommitteeInfo, + msg: ProposalMessage, + ) -> Result<(), HotStuffError> { + let proposed_by = msg.block.proposed_by().clone(); + match log_err( + "on_receive_local_proposal", + self.on_receive_local_proposal + .handle(current_epoch, local_committee_info, msg) + .await, + ) { + Ok(_) => Ok(()), + Err(err @ HotStuffError::ProposalValidationError(ProposalValidationError::JustifyBlockNotFound { .. })) => { + let vn = self + .epoch_manager + .get_validator_node_by_public_key(current_epoch, proposed_by) + .await?; + warn!( + target: LOG_TARGET, + "⚠️This node has fallen behind due to a missing justified block: {err}" + ); + self.on_catch_up_sync.request_sync(current_epoch, vn.address).await?; + Ok(()) + }, + Err(err) => Err(err), + } + } + fn create_zero_block_if_required(&self, epoch: Epoch, shard_group: ShardGroup) -> Result<(), HotStuffError> { self.state_store.with_write_tx(|tx| { // The parent for genesis blocks refer to this zero block diff --git a/dan_layer/consensus/src/messages/message.rs b/dan_layer/consensus/src/messages/message.rs index 64c07564c..dc67c5560 100644 --- a/dan_layer/consensus/src/messages/message.rs +++ b/dan_layer/consensus/src/messages/message.rs @@ -45,7 +45,7 @@ impl HotstuffMessage { Self::Vote(msg) => msg.epoch, Self::MissingTransactionsRequest(msg) => msg.epoch, Self::MissingTransactionsResponse(msg) => msg.epoch, - Self::CatchUpSyncRequest(msg) => msg.epoch, + Self::CatchUpSyncRequest(msg) => msg.high_qc.epoch(), Self::SyncResponse(msg) => msg.epoch, } } @@ -61,7 +61,14 @@ impl HotstuffMessage { impl Display for HotstuffMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - HotstuffMessage::NewView(msg) => write!(f, "NewView({})", msg.new_height), + HotstuffMessage::NewView(msg) => { + write!( + f, + "NewView({}, high-qc: {})", + msg.new_height, + msg.high_qc.block_height() + ) + }, HotstuffMessage::Proposal(msg) => { write!(f, "Proposal(Epoch={},Height={})", msg.block.epoch(), msg.block.height(),) }, diff --git a/dan_layer/consensus/src/messages/sync.rs b/dan_layer/consensus/src/messages/sync.rs index d6f284940..3131ff927 100644 --- a/dan_layer/consensus/src/messages/sync.rs +++ b/dan_layer/consensus/src/messages/sync.rs @@ -8,7 +8,6 @@ use tari_transaction::Transaction; #[derive(Debug, Clone, Serialize)] pub struct SyncRequestMessage { - pub epoch: Epoch, pub high_qc: HighQc, } diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index d681b0714..846495cab 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -509,6 +509,7 @@ async fn multishard_local_inputs_and_outputs_foreign_outputs() { .await; test.send_transaction_to_destination(TestVnDestination::Committee(1), tx1.clone()) .await; + // Don't send to committee 2 since they are not involved in inputs test.start_epoch(Epoch(1)).await; @@ -866,25 +867,33 @@ async fn leader_failure_node_goes_down() { let mut test = Test::builder() // Allow enough time for leader failures .with_test_timeout(Duration::from_secs(60)) + .with_block_time(Duration::from_secs(2)) .add_committee(0, vec!["1", "2", "3", "4", "5"]) .start() .await; - let failure_node = TestAddress::new("2"); + let failure_node = TestAddress::new("4"); for _ in 0..10 { test.send_transaction_to_all(Decision::Commit, 1, 2, 1).await; } + + // Take the VN offline - if we do it in the loop below, all transactions may have already been finalized (local + // only) by committed block 1 + log::info!("😴 {failure_node} is offline"); + test.network() + .go_offline(TestVnDestination::Address(failure_node.clone())) + .await; + test.start_epoch(Epoch(1)).await; loop { let (_, _, _, committed_height) = test.on_block_committed().await; if committed_height == NodeHeight(1) { - log::info!("😴 Node 2 goes offline"); - test.network() - .go_offline(TestVnDestination::Address(failure_node.clone())) - .await; + // This allows a few more leader failures to occur + test.send_transaction_to_all(Decision::Commit, 1, 2, 1).await; + test.wait_for_pool_count(TestVnDestination::All, 1).await; } if test.validators().filter(|vn| vn.address != failure_node).all(|v| { @@ -895,7 +904,7 @@ async fn leader_failure_node_goes_down() { break; } - if committed_height > NodeHeight(100) { + if committed_height > NodeHeight(50) { panic!("Not all transaction committed after {} blocks", committed_height); } } @@ -908,7 +917,7 @@ async fn leader_failure_node_goes_down() { }); log::info!("total messages sent: {}", test.network().total_messages_sent()); - test.assert_clean_shutdown().await; + test.assert_clean_shutdown_except(&[failure_node]).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] diff --git a/dan_layer/consensus_tests/src/support/epoch_manager.rs b/dan_layer/consensus_tests/src/support/epoch_manager.rs index 927d67986..74b98c99e 100644 --- a/dan_layer/consensus_tests/src/support/epoch_manager.rs +++ b/dan_layer/consensus_tests/src/support/epoch_manager.rs @@ -314,13 +314,13 @@ impl EpochManagerReader for TestEpochManager { async fn get_validator_node_by_public_key( &self, _epoch: Epoch, - public_key: &PublicKey, + public_key: PublicKey, ) -> Result, EpochManagerError> { let lock = self.state_lock().await; let (address, (_shard, shard_key, public_key, sidechain_id, registered_at, start_epoch, end_epoch)) = lock .validator_shards .iter() - .find(|(_, (_, _, pk, _, _, _, _))| pk == public_key) + .find(|(_, (_, _, pk, _, _, _, _))| *pk == public_key) .unwrap(); Ok(ValidatorNode { diff --git a/dan_layer/consensus_tests/src/support/harness.rs b/dan_layer/consensus_tests/src/support/harness.rs index b96d83d71..3b18e79b0 100644 --- a/dan_layer/consensus_tests/src/support/harness.rs +++ b/dan_layer/consensus_tests/src/support/harness.rs @@ -206,6 +206,9 @@ impl Test { } pub async fn on_hotstuff_event(&mut self) -> (TestAddress, HotstuffEvent) { + if self.network.task_handle().is_finished() { + panic!("Network task exited while waiting for Hotstuff event"); + } self.validators .values_mut() .map(|v| { @@ -332,6 +335,18 @@ impl Test { .await } + pub async fn wait_for_pool_count(&self, dest: TestVnDestination, count: usize) { + self.wait_all_for_predicate("waiting for pool count", |vn| { + if !dest.is_for_vn(vn) { + return true; + } + let c = vn.get_transaction_pool_count(); + log::info!("{} has {} transactions in pool", vn.address, c); + c >= count + }) + .await + } + pub fn with_all_validators(&self, f: impl FnMut(&Validator)) { self.validators.values().for_each(f); } @@ -459,6 +474,15 @@ impl Test { v.handle.await.unwrap(); } } + + pub async fn assert_clean_shutdown_except(&mut self, except: &[TestAddress]) { + self.shutdown.trigger(); + for (_, v) in self.validators.drain() { + if !except.contains(&v.address) { + v.handle.await.unwrap(); + } + } + } } pub struct TestBuilder { @@ -466,6 +490,7 @@ pub struct TestBuilder { sql_address: String, timeout: Option, debug_sql_file: Option, + block_time: Duration, message_filter: Option, } @@ -475,6 +500,7 @@ impl TestBuilder { committees: HashMap::new(), sql_address: ":memory:".to_string(), timeout: Some(Duration::from_secs(10)), + block_time: Duration::from_secs(5), debug_sql_file: None, message_filter: None, } @@ -522,10 +548,16 @@ impl TestBuilder { self } + pub fn with_block_time(mut self, block_time: Duration) -> Self { + self.block_time = block_time; + self + } + async fn build_validators( leader_strategy: &RoundRobinLeaderStrategy, epoch_manager: &TestEpochManager, sql_address: String, + block_time: Duration, shutdown_signal: ShutdownSignal, ) -> (Vec, HashMap) { let num_committees = epoch_manager.get_num_committees(Epoch(0)).await.unwrap(); @@ -539,6 +571,7 @@ impl TestBuilder { let (channels, validator) = Validator::builder() .with_sql_url(sql_address) + .with_block_time(block_time) .with_address_and_secret_key(address.clone(), sk) .with_shard(shard_addr) .with_shard_group(shard_group) @@ -573,8 +606,14 @@ impl TestBuilder { let epoch_manager = TestEpochManager::new(tx_epoch_events); epoch_manager.add_committees(committees).await; let shutdown = Shutdown::new(); - let (channels, validators) = - Self::build_validators(&leader_strategy, &epoch_manager, self.sql_address, shutdown.to_signal()).await; + let (channels, validators) = Self::build_validators( + &leader_strategy, + &epoch_manager, + self.sql_address, + self.block_time, + shutdown.to_signal(), + ) + .await; let network = spawn_network(channels, shutdown.to_signal(), self.message_filter); Test { diff --git a/dan_layer/consensus_tests/src/support/leader_strategy.rs b/dan_layer/consensus_tests/src/support/leader_strategy.rs index 44857ffc8..5867caf07 100644 --- a/dan_layer/consensus_tests/src/support/leader_strategy.rs +++ b/dan_layer/consensus_tests/src/support/leader_strategy.rs @@ -14,6 +14,6 @@ impl RoundRobinLeaderStrategy { impl LeaderStrategy for RoundRobinLeaderStrategy { fn calculate_leader(&self, committee: &Committee, height: NodeHeight) -> u32 { - (height.0 % committee.members.len() as u64) as u32 + (height.as_u64() % committee.members.len() as u64) as u32 } } diff --git a/dan_layer/consensus_tests/src/support/network.rs b/dan_layer/consensus_tests/src/support/network.rs index 17ed9745c..22b0d3116 100644 --- a/dan_layer/consensus_tests/src/support/network.rs +++ b/dan_layer/consensus_tests/src/support/network.rs @@ -309,26 +309,26 @@ impl TestNetworkWorker { log::info!("πŸ›‘ Network stopped"); } - pub async fn handle_broadcast(&mut self, from: TestAddress, to: Vec, msg: HotstuffMessage) { - log::debug!("βœ‰οΈ Broadcast {} from {} to {}", msg, from, to.iter().join(", ")); - for vn in to { + pub async fn handle_broadcast(&mut self, from: TestAddress, to_addrs: Vec, msg: HotstuffMessage) { + log::debug!("βœ‰οΈ Broadcast {} from {} to {}", msg, from, to_addrs.iter().join(", ")); + for to in to_addrs { if let Some(message_filter) = &self.message_filter { - if !message_filter(&from, &vn, &msg) { + if !message_filter(&from, &to, &msg) { self.num_filtered_messages .fetch_add(1, std::sync::atomic::Ordering::Relaxed); continue; } } // TODO: support for taking a whole committee bucket offline - if vn != from && - self.is_offline_destination(&vn, ShardGroup::all_shards(TEST_NUM_PRESHARDS)) + if to != from && + self.is_offline_destination(&from, &to, ShardGroup::all_shards(TEST_NUM_PRESHARDS)) .await { continue; } self.tx_hs_message - .get(&vn) + .get(&to) .unwrap() .send((from.clone(), msg.clone())) .await @@ -349,10 +349,10 @@ impl TestNetworkWorker { } log::debug!("βœ‰οΈ Message {} from {} to {}", msg, from, to); if from != to && - self.is_offline_destination(&from, ShardGroup::all_shards(TEST_NUM_PRESHARDS)) + self.is_offline_destination(&from, &to, ShardGroup::all_shards(TEST_NUM_PRESHARDS)) .await { - log::info!("πŸ›‘ Discarding message {msg}. Leader {from} is offline"); + log::info!("πŸ—‘οΈ Discarding message {msg}. Leader {from} is offline"); return; } self.on_message.send(Some(msg.clone())).unwrap(); @@ -361,9 +361,10 @@ impl TestNetworkWorker { self.tx_hs_message.get(&to).unwrap().send((from, msg)).await.unwrap(); } - async fn is_offline_destination(&self, addr: &TestAddress, shard: ShardGroup) -> bool { + async fn is_offline_destination(&self, from: &TestAddress, to: &TestAddress, shard: ShardGroup) -> bool { let lock = self.offline_destinations.read().await; // 99999 is not used TODO: support for taking entire shard group offline - lock.iter().any(|d| d.is_for(addr, shard, 99999)) + lock.iter() + .any(|d| d.is_for(from, shard, 99999) || d.is_for(to, shard, 99999)) } } diff --git a/dan_layer/consensus_tests/src/support/transaction_executor.rs b/dan_layer/consensus_tests/src/support/transaction_executor.rs index e81ca68c9..0d6e70ac7 100644 --- a/dan_layer/consensus_tests/src/support/transaction_executor.rs +++ b/dan_layer/consensus_tests/src/support/transaction_executor.rs @@ -82,7 +82,13 @@ impl BlockTransactionExecutor for TestBloc .inputs .into_iter() .map(|spec| { - let substate = resolved_inputs[spec.substate_requirement()].clone(); + let substate = resolved_inputs.get(spec.substate_requirement()).unwrap_or_else(|| { + panic!( + "Missing input substate for transaction {} with requirement {}", + id, + spec.substate_requirement() + ) + }); VersionedSubstateIdLockIntent::new( VersionedSubstateId::new(spec.substate_id().clone(), substate.version()), spec.lock_type(), diff --git a/dan_layer/consensus_tests/src/support/validator/builder.rs b/dan_layer/consensus_tests/src/support/validator/builder.rs index bf807f46f..a6ac393bf 100644 --- a/dan_layer/consensus_tests/src/support/validator/builder.rs +++ b/dan_layer/consensus_tests/src/support/validator/builder.rs @@ -40,6 +40,7 @@ pub struct ValidatorBuilder { pub sql_url: String, pub leader_strategy: RoundRobinLeaderStrategy, pub num_committees: u32, + pub block_time: Duration, pub epoch_manager: Option, pub transaction_executions: TestExecutionSpecStore, } @@ -55,6 +56,7 @@ impl ValidatorBuilder { shard_group: ShardGroup::all_shards(TEST_NUM_PRESHARDS), sql_url: ":memory".to_string(), leader_strategy: RoundRobinLeaderStrategy::new(), + block_time: Duration::from_secs(5), epoch_manager: None, transaction_executions: TestExecutionSpecStore::new(), } @@ -67,6 +69,11 @@ impl ValidatorBuilder { self } + pub fn with_block_time(&mut self, block_time: Duration) -> &mut Self { + self.block_time = block_time; + self + } + pub fn with_shard_group(&mut self, shard_group: ShardGroup) -> &mut Self { self.shard_group = shard_group; self @@ -132,7 +139,7 @@ impl ValidatorBuilder { max_base_layer_blocks_ahead: 5, max_base_layer_blocks_behind: 5, network: Network::LocalNet, - pacemaker_max_base_time: Duration::from_secs(10), + pacemaker_max_base_time: self.block_time, sidechain_id: None, }, self.address.clone(), diff --git a/dan_layer/epoch_manager/src/base_layer/handle.rs b/dan_layer/epoch_manager/src/base_layer/handle.rs index 6f3f2dbc1..21b11ec68 100644 --- a/dan_layer/epoch_manager/src/base_layer/handle.rs +++ b/dan_layer/epoch_manager/src/base_layer/handle.rs @@ -256,13 +256,13 @@ impl EpochManagerReader for EpochManagerHandle { async fn get_validator_node_by_public_key( &self, epoch: Epoch, - public_key: &PublicKey, + public_key: PublicKey, ) -> Result, EpochManagerError> { let (tx, rx) = oneshot::channel(); self.tx_request .send(EpochManagerRequest::GetValidatorNodeByPublicKey { epoch, - public_key: public_key.clone(), + public_key, reply: tx, }) .await diff --git a/dan_layer/epoch_manager/src/traits.rs b/dan_layer/epoch_manager/src/traits.rs index 03134306c..b37371868 100644 --- a/dan_layer/epoch_manager/src/traits.rs +++ b/dan_layer/epoch_manager/src/traits.rs @@ -70,7 +70,7 @@ pub trait EpochManagerReader: Send + Sync { async fn get_validator_node_by_public_key( &self, epoch: Epoch, - public_key: &PublicKey, + public_key: PublicKey, ) -> Result, EpochManagerError>; /// Returns a list of validator nodes with the given epoch and public key. If any validator node is not found, an @@ -82,7 +82,7 @@ pub trait EpochManagerReader: Send + Sync { #[allow(clippy::mutable_key_type)] let mut results = HashMap::with_capacity(query.len()); for (epoch, public_key) in query { - let vn = self.get_validator_node_by_public_key(epoch, &public_key).await?; + let vn = self.get_validator_node_by_public_key(epoch, public_key.clone()).await?; results.insert((epoch, public_key), vn); } Ok(results) @@ -99,7 +99,7 @@ pub trait EpochManagerReader: Send + Sync { async fn get_committee_info_by_validator_public_key( &self, epoch: Epoch, - public_key: &PublicKey, + public_key: PublicKey, ) -> Result { let validator = self.get_validator_node_by_public_key(epoch, public_key).await?; self.get_committee_info_for_substate(epoch, validator.shard_key).await @@ -128,7 +128,7 @@ pub trait EpochManagerReader: Send + Sync { async fn get_committee_by_validator_public_key( &self, epoch: Epoch, - public_key: &PublicKey, + public_key: PublicKey, ) -> Result, EpochManagerError> { let validator = self.get_validator_node_by_public_key(epoch, public_key).await?; let committee = self.get_committee_for_substate(epoch, validator.shard_key).await?; diff --git a/dan_layer/p2p/proto/consensus.proto b/dan_layer/p2p/proto/consensus.proto index e533c2adb..b1be9c0a9 100644 --- a/dan_layer/p2p/proto/consensus.proto +++ b/dan_layer/p2p/proto/consensus.proto @@ -240,8 +240,7 @@ message SubstateDestroyed { } message SyncRequest { - uint64 epoch = 1; - HighQc high_qc = 2; + HighQc high_qc = 1; } message HighQc { diff --git a/dan_layer/p2p/proto/rpc.proto b/dan_layer/p2p/proto/rpc.proto index 0339037eb..342dc64d0 100644 --- a/dan_layer/p2p/proto/rpc.proto +++ b/dan_layer/p2p/proto/rpc.proto @@ -250,7 +250,6 @@ message SyncStateResponse { message StateTransition { StateTransitionId id = 1; SubstateUpdate update = 2; - uint64 state_tree_version = 3; } message StateTransitionId { diff --git a/dan_layer/p2p/src/conversions/consensus.rs b/dan_layer/p2p/src/conversions/consensus.rs index 99237365f..a66ea3b57 100644 --- a/dan_layer/p2p/src/conversions/consensus.rs +++ b/dan_layer/p2p/src/conversions/consensus.rs @@ -888,11 +888,10 @@ impl From for proto::consensus::SubstateDestroyed { impl From<&SyncRequestMessage> for proto::consensus::SyncRequest { fn from(value: &SyncRequestMessage) -> Self { Self { - epoch: value.epoch.as_u64(), high_qc: Some(proto::consensus::HighQc { block_id: value.high_qc.block_id.as_bytes().to_vec(), block_height: value.high_qc.block_height.as_u64(), - epoch: value.epoch.as_u64(), + epoch: value.high_qc.epoch.as_u64(), qc_id: value.high_qc.qc_id.as_bytes().to_vec(), }), } @@ -904,7 +903,6 @@ impl TryFrom for SyncRequestMessage { fn try_from(value: proto::consensus::SyncRequest) -> Result { Ok(Self { - epoch: Epoch(value.epoch), high_qc: value .high_qc .map(|value| { diff --git a/dan_layer/p2p/src/conversions/rpc.rs b/dan_layer/p2p/src/conversions/rpc.rs index e3b6d2e05..aee2f0405 100644 --- a/dan_layer/p2p/src/conversions/rpc.rs +++ b/dan_layer/p2p/src/conversions/rpc.rs @@ -137,11 +137,7 @@ impl TryFrom for StateTransition { .update .ok_or_else(|| anyhow::anyhow!("Missing state transition update"))?; let update = SubstateUpdate::try_from(update)?; - Ok(Self { - id, - update, - state_tree_version: value.state_tree_version, - }) + Ok(Self { id, update }) } } @@ -150,7 +146,6 @@ impl From for proto::rpc::StateTransition { Self { id: Some(value.id.into()), update: Some(value.update.into()), - state_tree_version: value.state_tree_version, } } } diff --git a/dan_layer/rpc_state_sync/src/manager.rs b/dan_layer/rpc_state_sync/src/manager.rs index 65a10a0e8..e0276724b 100644 --- a/dan_layer/rpc_state_sync/src/manager.rs +++ b/dan_layer/rpc_state_sync/src/manager.rs @@ -1,6 +1,8 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause +use std::cmp; + use anyhow::anyhow; use async_trait::async_trait; use futures::StreamExt; @@ -55,6 +57,7 @@ use tari_validator_node_rpc::{ use crate::error::CommsRpcConsensusSyncError; +const BATCH_SIZE: usize = 100; const LOG_TARGET: &str = "tari::dan::comms_rpc_state_sync"; pub struct RpcStateSyncManager { @@ -137,7 +140,8 @@ where TConsensusSpec: ConsensusSpec info!( target: LOG_TARGET, - "πŸ›œSyncing from state transition {last_state_transition_id}" + "πŸ›œSyncing from v{} to state transition {last_state_transition_id}", + current_version.unwrap_or(0), ); let mut state_stream = client @@ -149,6 +153,8 @@ where TConsensusSpec: ConsensusSpec }) .await?; + let mut tree_changes = vec![]; + while let Some(result) = state_stream.next().await { let msg = match result { Ok(msg) => msg, @@ -166,20 +172,18 @@ where TConsensusSpec: ConsensusSpec ))); } + tree_changes.reserve_exact(cmp::min(msg.transitions.len(), BATCH_SIZE)); + self.state_store.with_write_tx(|tx| { - let mut next_version = msg.transitions.first().expect("non-empty batch already checked").state_tree_version; info!( target: LOG_TARGET, - "πŸ›œ Next state updates batch of size {} (v{}-v{})", + "πŸ›œ Next state updates batch of size {} from v{}", msg.transitions.len(), current_version.unwrap_or(0), - msg.transitions.last().unwrap().state_tree_version, ); let mut store = ShardScopedTreeStoreWriter::new(tx, shard); - let mut tree_changes = vec![]; - for transition in msg.transitions { let transition = @@ -192,15 +196,6 @@ where TConsensusSpec: ConsensusSpec ))); } - if current_version.map_or(false, |v| transition.state_tree_version < v) { - return Err(CommsRpcConsensusSyncError::InvalidResponse(anyhow!( - "Received state transition with version {} that is not monotonically increasing (expected \ - >= {})", - transition.state_tree_version, - persisted_version.unwrap_or(0) - ))); - } - if transition.id.epoch().is_zero() { return Err(CommsRpcConsensusSyncError::InvalidResponse(anyhow!( "Received state transition with epoch 0." @@ -225,13 +220,15 @@ where TConsensusSpec: ConsensusSpec }, }; - info!(target: LOG_TARGET, "πŸ›œ Applying state update {transition} (v{} to v{})", current_version.unwrap_or(0), transition.state_tree_version); - if next_version != transition.state_tree_version { + if tree_changes.len() + 1 == BATCH_SIZE { let mut state_tree = SpreadPrefixStateTree::new(&mut store); + info!(target: LOG_TARGET, "πŸ›œ Committing {} state tree changes v{} to v{}", tree_changes.len(), current_version.unwrap_or(0), current_version.unwrap_or(0) + 1); + let next_version = current_version.unwrap_or(0) + 1; state_tree.put_substate_changes(current_version, next_version, tree_changes.drain(..))?; current_version = Some(next_version); - next_version = transition.state_tree_version; } + + info!(target: LOG_TARGET, "πŸ›œ Applying state update {transition} v{}", current_version.unwrap_or(0)); tree_changes.push(change); self.commit_update(store.transaction(), checkpoint, transition)?; @@ -239,12 +236,11 @@ where TConsensusSpec: ConsensusSpec if !tree_changes.is_empty() { let mut state_tree = SpreadPrefixStateTree::new(&mut store); + let next_version = current_version.unwrap_or(0) + 1; + info!(target: LOG_TARGET, "πŸ›œ Committing final {} state tree changes v{} to v{}", tree_changes.len(), current_version.unwrap_or(0), next_version); state_tree.put_substate_changes(current_version, next_version, tree_changes.drain(..))?; - } - current_version = Some(next_version); - - if let Some(v) = current_version { - store.set_version(v)?; + current_version = Some(next_version); + store.set_version(next_version)?; } Ok::<_, CommsRpcConsensusSyncError>(()) @@ -419,8 +415,8 @@ where TConsensusSpec: ConsensusSpec + Send + Sync + 'static let checkpoint = match self.fetch_epoch_checkpoint(&mut client, current_epoch).await { Ok(Some(cp)) => cp, Ok(None) => { - // EDGE-CASE: This may occur because the previous epoch had not started consensus, typically - // in testing cases where transactions + // EDGE-CASE: This may occur because the previous epoch had not started at the consensus + // level. warn!( target: LOG_TARGET, "❓No checkpoint for epoch {current_epoch}. This may mean that this is the first epoch in the network" diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index 73af01963..a4882cec4 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -92,9 +92,8 @@ create table foreign_parked_blocks justify_qc text not NULL, created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ); - -- block_id must be unique. Optimise fetching by block_id -create unique index foreign_parked_blocks_uniq_idx_id on parked_blocks (block_id); +create unique index foreign_parked_blocks_uniq_idx_id on foreign_parked_blocks (block_id); CREATE TABLE foreign_missing_transactions ( @@ -439,7 +438,7 @@ CREATE TABLE state_tree -- Scoping by shard CREATE INDEX state_tree_idx_shard_key on state_tree (shard); -- Duplicate keys are not allowed -CREATE UNIQUE INDEX state_tree_uniq_idx_key on state_tree (shard, key); +-- CREATE UNIQUE INDEX state_tree_uniq_idx_key on state_tree (shard, key); create table state_tree_shard_versions ( diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index cd7eaaa79..4d056a93d 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -261,6 +261,7 @@ impl<'a, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'a> SqliteState FROM blocks JOIN tree ON block_id = tree.parent AND tree.bid != ? + AND tree.parent != '0000000000000000000000000000000000000000000000000000000000000000' LIMIT 1000 ) SELECT bid FROM tree"#, @@ -297,6 +298,7 @@ impl<'a, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'a> SqliteState FROM blocks JOIN tree ON block_id = tree.parent AND tree.bid != ? + AND tree.parent != '0000000000000000000000000000000000000000000000000000000000000000' LIMIT 1000 ) SELECT bid FROM tree where is_dummy = 0 AND command_count > 0"#, @@ -335,18 +337,12 @@ impl<'a, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'a> SqliteState } pub(crate) fn get_commit_block_id(&self) -> Result { - use crate::schema::{blocks, locked_block}; + use crate::schema::blocks; let block_id = blocks::table - .select(blocks::parent_block_id) - .filter( - blocks::block_id.eq(locked_block::table - .select(locked_block::block_id) - .order_by(locked_block::id.desc()) - .limit(1) - .single_value() - .assume_not_null()), - ) + .select(blocks::block_id) + .filter(blocks::is_committed.eq(true)) + .order_by(blocks::id.desc()) .first::(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "get_commit_block_id", @@ -588,6 +584,13 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor ) -> Result, StorageError> { use crate::schema::{foreign_proposals, quorum_certificates}; + if !self.blocks_exists(block_id)? { + return Err(StorageError::NotFound { + item: "foreign_proposals_get_all_new: Block".to_string(), + key: block_id.to_string(), + }); + } + let locked = self.get_current_locked_block()?; let pending_block_ids = self.get_block_ids_with_commands_between(&locked.block_id, block_id)?; @@ -1538,7 +1541,11 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor txs.into_iter().map(|tx| tx.try_convert(None)).collect() } - fn transaction_pool_get_many_ready(&self, max_txs: usize) -> Result, StorageError> { + fn transaction_pool_get_many_ready( + &self, + max_txs: usize, + block_id: &BlockId, + ) -> Result, StorageError> { use crate::schema::{lock_conflicts, transaction_pool}; let mut ready_txs = transaction_pool::table @@ -1575,11 +1582,10 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor // Fetch all applicable block ids between the locked block and the given block let locked = self.get_current_locked_block()?; - let leaf = self.leaf_block_get(locked.epoch)?; let mut updates = self.get_transaction_atom_state_updates_between_blocks( &locked.block_id, - &leaf.block_id, + block_id, ready_txs.iter().map(|s| s.transaction_id.as_str()), )?; @@ -1587,7 +1593,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor target: LOG_TARGET, "transaction_pool_get_many_ready: locked.block_id={}, leaf.block_id={}, len(ready_txs)={}, updates={}", locked.block_id, - leaf.block_id, + block_id, ready_txs.len(), updates.len() ); @@ -2084,10 +2090,18 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor ) -> Result>, StorageError> { use crate::schema::pending_state_tree_diffs; + if !self.blocks_exists(block_id)? { + return Err(StorageError::NotFound { + item: "pending_state_tree_diffs_get_all_up_to_commit_block: Block".to_string(), + key: block_id.to_string(), + }); + } + // Get the last committed block let committed_block_id = self.get_commit_block_id()?; - let block_ids = self.get_block_ids_with_commands_between(&committed_block_id, block_id)?; + // Block may modify state with zero commands because the justify a block that changes state + let block_ids = self.get_block_ids_between(&committed_block_id, block_id)?; if block_ids.is_empty() { return Ok(HashMap::new()); @@ -2111,10 +2125,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor .or_insert_with(Vec::new) //PendingStateTreeDiff::default) .push(diff); } - // diffs - // .into_iter() - // .map(|diff| Ok((Shard::from(diff.shard as u32), diff.try_into()?))) - // .collect() + Ok(diffs) } @@ -2300,6 +2311,13 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor limit: usize, ) -> Result, StorageError> { use crate::schema::burnt_utxos; + if !self.blocks_exists(leaf_block)? { + return Err(StorageError::NotFound { + item: "Block".to_string(), + key: leaf_block.to_string(), + }); + } + if limit == 0 { return Ok(Vec::new()); } @@ -2338,6 +2356,21 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor Ok(count as u64) } + + fn foreign_parked_blocks_exists(&self, block_id: &BlockId) -> Result { + use crate::schema::foreign_parked_blocks; + + let count = foreign_parked_blocks::table + .count() + .filter(foreign_parked_blocks::block_id.eq(serialize_hex(block_id))) + .get_result::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "foreign_parked_blocks_exists", + source: e, + })?; + + Ok(count > 0) + } } #[derive(QueryableByName)] diff --git a/dan_layer/state_store_sqlite/src/sql_models/state_transition.rs b/dan_layer/state_store_sqlite/src/sql_models/state_transition.rs index 734dcf502..5259c395a 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/state_transition.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/state_transition.rs @@ -72,7 +72,6 @@ impl StateTransition { Ok(consensus_models::StateTransition { id: StateTransitionId::new(epoch, shard, seq), update, - state_tree_version: self.state_version as u64, }) } } diff --git a/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs b/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs index 24893274b..c6fd0b5ad 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs @@ -51,7 +51,7 @@ impl TransactionPoolRecord { let mut transaction_fee = self.transaction_fee; if let Some(update) = update { - evidence.merge(deserialize_json(&update.evidence)?); + evidence = deserialize_json(&update.evidence)?; is_ready = update.is_ready; pending_stage = Some(parse_from_string(&update.stage)?); local_decision = Some(update.local_decision); diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index a733a2d52..da3feae5e 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -5,6 +5,7 @@ use std::{iter::Peekable, ops::Deref}; use diesel::{ dsl, + dsl::count_star, sql_types::Text, AsChangeset, ExpressionMethods, @@ -30,7 +31,6 @@ use tari_dan_common_types::{ use tari_dan_storage::{ consensus_models::{ Block, - BlockDiff, BlockId, BlockTransactionExecution, BurntUtxo, @@ -53,6 +53,7 @@ use tari_dan_storage::{ PendingShardStateTreeDiff, QcId, QuorumCertificate, + SubstateChange, SubstateLock, SubstatePledge, SubstatePledges, @@ -334,12 +335,12 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta Ok(()) } - fn block_diffs_insert(&mut self, block_diff: &BlockDiff) -> Result<(), StorageError> { + fn block_diffs_insert(&mut self, block_id: &BlockId, changes: &[SubstateChange]) -> Result<(), StorageError> { use crate::schema::block_diffs; - let block_id = serialize_hex(block_diff.block_id); + let block_id = serialize_hex(block_id); // We commit in chunks because we can hit the SQL variable limit - for chunk in block_diff.changes.chunks(1000) { + for chunk in changes.chunks(1000) { let values = chunk .iter() .map(|ch| { @@ -1378,23 +1379,34 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta if removed_ids.is_empty() { return Ok(vec![]); } - - let num_remaining = foreign_missing_transactions::table - .filter(foreign_missing_transactions::parked_block_id.eq_any(&removed_ids)) - .count() - .get_result::(self.connection()) + let counts = foreign_parked_blocks::table + .select(( + foreign_parked_blocks::id, + foreign_missing_transactions::table + .select(count_star()) + .filter(foreign_missing_transactions::parked_block_id.eq(foreign_parked_blocks::id)) + .single_value(), + )) + .filter(foreign_parked_blocks::id.eq_any(&removed_ids)) + .get_results::<(i32, Option)>(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "foreign_parked_blocks_remove_all_by_transaction", source: e, })?; - // If there are still missing transactions for the parked block, it is not yet unparked - if num_remaining > 0 { + let mut remaining = counts + .iter() + .filter(|(_, count)| count.map_or(true, |c| c == 0)) + .map(|(id, _)| *id) + .peekable(); + + // If there are still missing transactions for ALL parked blocks, then we exit early + if remaining.peek().is_none() { return Ok(vec![]); } let blocks = diesel::delete(foreign_parked_blocks::table) - .filter(foreign_parked_blocks::id.eq_any(&removed_ids)) + .filter(foreign_parked_blocks::id.eq_any(remaining)) .get_results::(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "foreign_parked_blocks_remove_all_by_transaction", @@ -1427,9 +1439,9 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta Ok(()) } - fn substate_locks_insert_all)>>( + fn substate_locks_insert_all<'a, I: IntoIterator)>>( &mut self, - block_id: BlockId, + block_id: &BlockId, locks: I, ) -> Result<(), StorageError> { use crate::schema::substate_locks; @@ -1442,9 +1454,10 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta .by_ref() .take(CHUNK_SIZE) .flat_map(|(id, locks)| { - locks.into_iter().map(move |lock| { + let block_id = serialize_hex(block_id); + locks.iter().map(move |lock| { ( - substate_locks::block_id.eq(serialize_hex(block_id)), + substate_locks::block_id.eq(block_id.clone()), substate_locks::substate_id.eq(id.to_string()), substate_locks::version.eq(lock.version() as i32), substate_locks::transaction_id.eq(serialize_hex(lock.transaction_id())), @@ -1648,20 +1661,20 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta fn foreign_substate_pledges_save( &mut self, - transaction_id: TransactionId, + transaction_id: &TransactionId, shard_group: ShardGroup, - pledges: SubstatePledges, + pledges: &SubstatePledges, ) -> Result<(), StorageError> { use crate::schema::foreign_substate_pledges; let tx_id_hex = serialize_hex(transaction_id); - let values = pledges.into_iter().map(|pledge| match pledge { + let values = pledges.iter().map(|pledge| match pledge { SubstatePledge::Input { substate_id, is_write, substate, } => { - let lock_type = if is_write { + let lock_type = if *is_write { SubstateLockType::Write } else { SubstateLockType::Read @@ -1728,7 +1741,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta &mut self, block_id: BlockId, shard: Shard, - diff: VersionedStateHashTreeDiff, + diff: &VersionedStateHashTreeDiff, ) -> Result<(), StorageError> { use crate::schema::{blocks, pending_state_tree_diffs}; diff --git a/dan_layer/storage/src/consensus_models/block.rs b/dan_layer/storage/src/consensus_models/block.rs index ec66276a5..80f1150cf 100644 --- a/dan_layer/storage/src/consensus_models/block.rs +++ b/dan_layer/storage/src/consensus_models/block.rs @@ -391,8 +391,8 @@ impl Block { .filter_map(|cmd| cmd.transaction()) .filter(|t| { t.evidence - .substate_addresses_iter() - .any(|addr| committee_info.includes_substate_address(addr)) + .shard_groups_iter() + .any(|sg| *sg == committee_info.shard_group()) }) .map(|t| t.id()) } @@ -615,7 +615,7 @@ impl Block { Self::record_exists(tx, self.parent()) } - pub fn has_been_processed( + pub fn has_been_justified( tx: &TTx, block_id: &BlockId, ) -> Result { @@ -676,6 +676,13 @@ impl Block { if block_id == *self.id() { continue; } + info!( + target: LOG_TARGET, + "β—οΈπŸ”— Removing parallel chain block {} from epoch {} height {}", + block_id, + self.epoch(), + self.height() + ); delete_block_and_children(tx, &block_id)?; } Ok(()) @@ -1004,15 +1011,14 @@ impl Block { /// lockedQC. The predicate is true as long as either one of two rules holds. pub fn is_safe(&self, tx: &TTx) -> Result { let locked = LockedBlock::get(tx, self.epoch())?; - let locked_block = locked.get_block(tx)?; // Liveness rules - if self.justify().block_height() > locked_block.height() { + if self.justify().block_height() > locked.height() { return Ok(true); } // Safety rule - if self.extends(tx, locked_block.id())? { + if self.extends(tx, locked.block_id())? { return Ok(true); } @@ -1020,7 +1026,7 @@ impl Block { target: LOG_TARGET, "❌ Block {} does satisfy the liveness or safety rules of the safeNode predicate. Locked block {}", self, - locked_block, + locked, ); Ok(false) } @@ -1053,15 +1059,14 @@ impl Block { continue; } - let evidence = atom - .evidence - .get(&self.shard_group) - .ok_or_else(|| StorageError::DataInconsistency { - details: format!( - "invariant get_block_pledge: Local evidence for atom {} in block {} is missing", - atom.id, self.id - ), - })?; + let Some(evidence) = atom.evidence.get(&self.shard_group) else { + // CASE: The output-only shard group has sequenced this transaction + debug!( + "get_block_pledge: Local evidence for atom {} is missing in block {}", + atom.id, self + ); + continue; + }; // TODO(perf): O(n) queries let locked_values = tx.substate_locks_get_locked_substates_for_transaction(&atom.id)?; @@ -1098,14 +1103,20 @@ impl Block { impl Display for Block { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if self.is_dummy() { + write!(f, "Dummy")?; + } write!( f, - "[{}, {}, {}, {} cmd(s), {}]", + "[{}, justify: {} ({}), {}, {}, {} cmd(s), {}->{}]", self.height(), + self.justify().block_height(), + if self.justifies_parent() { "🟒" } else { "🟑" }, self.epoch(), self.shard_group(), self.commands().len(), self.id(), + self.parent() ) } } @@ -1220,7 +1231,7 @@ where Ok(()) } -/// Deletes everything related to a block and any children +/// Deletes everything related to a block as well as any child blocks fn delete_block_and_children(tx: &mut TTx, block_id: &BlockId) -> Result<(), StorageError> where TTx: StateStoreWriteTransaction + Deref, diff --git a/dan_layer/storage/src/consensus_models/block_diff.rs b/dan_layer/storage/src/consensus_models/block_diff.rs index 62126a9bf..b8c290216 100644 --- a/dan_layer/storage/src/consensus_models/block_diff.rs +++ b/dan_layer/storage/src/consensus_models/block_diff.rs @@ -62,8 +62,12 @@ impl BlockDiff { } impl BlockDiff { - pub fn insert(&self, tx: &mut TTx) -> Result<(), StorageError> { - tx.block_diffs_insert(self) + pub fn insert_record( + tx: &mut TTx, + block_id: &BlockId, + changes: &[SubstateChange], + ) -> Result<(), StorageError> { + tx.block_diffs_insert(block_id, changes) } pub fn remove(&self, tx: &mut TTx) -> Result<(), StorageError> { diff --git a/dan_layer/storage/src/consensus_models/evidence.rs b/dan_layer/storage/src/consensus_models/evidence.rs index fec839303..670fd51e8 100644 --- a/dan_layer/storage/src/consensus_models/evidence.rs +++ b/dan_layer/storage/src/consensus_models/evidence.rs @@ -76,22 +76,18 @@ impl Evidence { // may be implicit (null) if the local node is only involved in outputs (and therefore sequences using the LocalAccept // foreign proposal) .all(|e| { - if e.is_prepare_justified() || e.is_accept_justified() { - true - } else { - // TODO: we should only include input evidence in transactions, so we would only need to check justifies - // At this point output-only shards may not be justified - e.substates.values().all(|lock| lock.is_output()) - } + e.is_prepare_justified() || e.is_accept_justified() + }) } + /// Returns true if all substates in the given shard group are output locks. + /// This assumes the provided evidence is complete before this is called. + /// If no evidence is present for the shard group, false is returned. pub fn is_committee_output_only(&self, committee_info: &CommitteeInfo) -> bool { - self.evidence - .iter() - .filter(|(sg, _)| committee_info.shard_group() == **sg) - .flat_map(|(_, e)| e.substates().values()) - .all(|lock| lock.is_output()) + self.evidence.get(&committee_info.shard_group()).map_or(false, |e| { + !e.substates().is_empty() && e.substates().values().all(|lock| lock.is_output()) + }) } pub fn is_empty(&self) -> bool { @@ -150,28 +146,22 @@ impl Evidence { .map(|(_, e)| e) } - /// Returns an iterator over the substate addresses in this Evidence object. - /// NOTE: not all substates involved in the final transaction are necessarily included in this Evidence object until - /// the transaction has reached AllAccepted state. - pub fn substate_addresses_iter(&self) -> impl Iterator + '_ { - self.evidence.values().flat_map(|e| e.substates.keys()) - } - pub fn qc_ids_iter(&self) -> impl Iterator + '_ { self.evidence .values() .flat_map(|e| e.prepare_qc.iter().chain(e.accept_qc.iter())) } - pub fn add_shard_group_evidence( - &mut self, - shard_group: ShardGroup, - address: SubstateAddress, - lock_type: SubstateLockType, - ) -> &mut Self { - let entry = self.evidence.entry(shard_group).or_default(); - entry.substates.insert_sorted(address, lock_type); - self + pub fn add_shard_group(&mut self, shard_group: ShardGroup) -> &mut ShardGroupEvidence { + self.evidence.entry(shard_group).or_default() + } + + pub fn shard_groups_iter(&self) -> impl Iterator { + self.evidence.keys() + } + + pub fn num_shard_groups(&self) -> usize { + self.evidence.len() } /// Add or update shard groups, substates and locks into Evidence. Existing prepare/accept QC IDs are not changed. @@ -186,23 +176,6 @@ impl Evidence { self.evidence.sort_keys(); self } - - /// Merges the other Evidence into this Evidence. - pub fn merge(&mut self, other: Evidence) -> &mut Self { - for (sg, evidence) in other.evidence { - let evidence_mut = self.evidence.entry(sg).or_default(); - evidence_mut.substates.extend(evidence.substates); - evidence_mut.sort_substates(); - if let Some(qc_id) = evidence.prepare_qc { - evidence_mut.prepare_qc = Some(qc_id); - } - if let Some(qc_id) = evidence.accept_qc { - evidence_mut.accept_qc = Some(qc_id); - } - } - self.evidence.sort_keys(); - self - } } impl FromIterator<(ShardGroup, ShardGroupEvidence)> for Evidence { @@ -241,12 +214,19 @@ pub struct ShardGroupEvidence { } impl ShardGroupEvidence { + pub fn insert(&mut self, address: SubstateAddress, lock: SubstateLockType) -> &mut Self { + self.substates.insert_sorted(address, lock); + self + } + pub fn is_prepare_justified(&self) -> bool { - self.prepare_qc.is_some() + // No substates means that we have no pledges yet, so we cannot count this as justified + !self.substates.is_empty() && self.prepare_qc.is_some() } pub fn is_accept_justified(&self) -> bool { - self.accept_qc.is_some() + // No substates means that we have no pledges yet, so we cannot count this as justified + !self.substates.is_empty() && self.accept_qc.is_some() } pub fn substates(&self) -> &IndexMap { @@ -301,15 +281,25 @@ mod tests { let sg3 = ShardGroup::new(4, 5); let mut evidence1 = Evidence::empty(); - evidence1.add_shard_group_evidence(sg1, seed_substate_address(1), SubstateLockType::Write); - evidence1.add_shard_group_evidence(sg1, seed_substate_address(2), SubstateLockType::Read); + evidence1 + .add_shard_group(sg1) + .insert(seed_substate_address(1), SubstateLockType::Write); + evidence1 + .add_shard_group(sg1) + .insert(seed_substate_address(2), SubstateLockType::Read); let mut evidence2 = Evidence::empty(); - evidence2.add_shard_group_evidence(sg1, seed_substate_address(2), SubstateLockType::Output); - evidence2.add_shard_group_evidence(sg2, seed_substate_address(3), SubstateLockType::Output); - evidence2.add_shard_group_evidence(sg3, seed_substate_address(4), SubstateLockType::Output); - - evidence1.merge(evidence2); + evidence2 + .add_shard_group(sg1) + .insert(seed_substate_address(2), SubstateLockType::Output); + evidence2 + .add_shard_group(sg2) + .insert(seed_substate_address(3), SubstateLockType::Output); + evidence2 + .add_shard_group(sg3) + .insert(seed_substate_address(4), SubstateLockType::Output); + + evidence1.update(&evidence2); assert_eq!(evidence1.len(), 3); assert_eq!( diff --git a/dan_layer/storage/src/consensus_models/foreign_parked_proposal.rs b/dan_layer/storage/src/consensus_models/foreign_parked_proposal.rs index 908bcc3d8..68330d96d 100644 --- a/dan_layer/storage/src/consensus_models/foreign_parked_proposal.rs +++ b/dan_layer/storage/src/consensus_models/foreign_parked_proposal.rs @@ -1,12 +1,13 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::fmt::Display; +use std::{fmt::Display, ops::Deref}; use tari_transaction::TransactionId; use crate::{ consensus_models::{Block, BlockPledge, ForeignProposal, QuorumCertificate}, + StateStoreReadTransaction, StateStoreWriteTransaction, StorageError, }; @@ -39,10 +40,27 @@ impl ForeignParkedProposal { } impl ForeignParkedProposal { + pub fn save(&self, tx: &mut TTx) -> Result + where + TTx: StateStoreWriteTransaction + Deref, + TTx::Target: StateStoreReadTransaction, + { + if self.exists(&**tx)? { + return Ok(false); + } + + self.insert(tx)?; + Ok(true) + } + pub fn insert(&self, tx: &mut TTx) -> Result<(), StorageError> { tx.foreign_parked_blocks_insert(self) } + pub fn exists(&self, tx: &TTx) -> Result { + tx.foreign_parked_blocks_exists(self.block().id()) + } + pub fn add_missing_transactions<'a, TTx: StateStoreWriteTransaction, I: IntoIterator>( &self, tx: &mut TTx, diff --git a/dan_layer/storage/src/consensus_models/last_proposed.rs b/dan_layer/storage/src/consensus_models/last_proposed.rs index 7278ccb58..66a69476b 100644 --- a/dan_layer/storage/src/consensus_models/last_proposed.rs +++ b/dan_layer/storage/src/consensus_models/last_proposed.rs @@ -4,7 +4,7 @@ use tari_dan_common_types::{Epoch, NodeHeight}; use crate::{ - consensus_models::{Block, BlockId}, + consensus_models::{Block, BlockId, LeafBlock}, StateStoreReadTransaction, StateStoreWriteTransaction, StorageError, @@ -15,6 +15,15 @@ pub struct LastProposed { pub block_id: BlockId, pub epoch: Epoch, } +impl LastProposed { + pub fn as_leaf_block(&self) -> LeafBlock { + LeafBlock { + block_id: self.block_id, + height: self.height, + epoch: self.epoch, + } + } +} impl LastProposed { pub fn get(tx: &TTx) -> Result { diff --git a/dan_layer/storage/src/consensus_models/state_transition.rs b/dan_layer/storage/src/consensus_models/state_transition.rs index 311676e9c..3c766f841 100644 --- a/dan_layer/storage/src/consensus_models/state_transition.rs +++ b/dan_layer/storage/src/consensus_models/state_transition.rs @@ -8,7 +8,6 @@ use std::{ }; use tari_dan_common_types::{shard::Shard, Epoch}; -use tari_state_tree::Version; use crate::{consensus_models::SubstateUpdate, StateStoreReadTransaction, StorageError}; @@ -16,7 +15,6 @@ use crate::{consensus_models::SubstateUpdate, StateStoreReadTransaction, Storage pub struct StateTransition { pub id: StateTransitionId, pub update: SubstateUpdate, - pub state_tree_version: Version, } impl StateTransition { diff --git a/dan_layer/storage/src/consensus_models/state_tree_diff.rs b/dan_layer/storage/src/consensus_models/state_tree_diff.rs index fcf2aace6..4c2834c08 100644 --- a/dan_layer/storage/src/consensus_models/state_tree_diff.rs +++ b/dan_layer/storage/src/consensus_models/state_tree_diff.rs @@ -48,7 +48,7 @@ impl PendingShardStateTreeDiff { tx: &mut TTx, block_id: BlockId, shard: Shard, - diff: VersionedStateHashTreeDiff, + diff: &VersionedStateHashTreeDiff, ) -> Result<(), StorageError> where TTx: Deref + StateStoreWriteTransaction, diff --git a/dan_layer/storage/src/consensus_models/substate.rs b/dan_layer/storage/src/consensus_models/substate.rs index 221919e15..6e2509fd7 100644 --- a/dan_layer/storage/src/consensus_models/substate.rs +++ b/dan_layer/storage/src/consensus_models/substate.rs @@ -157,9 +157,13 @@ impl SubstateRecord { } impl SubstateRecord { - pub fn lock_all)>>( + pub fn lock_all< + 'a, + TTx: StateStoreWriteTransaction, + I: IntoIterator)>, + >( tx: &mut TTx, - block_id: BlockId, + block_id: &BlockId, locks: I, ) -> Result<(), StorageError> { tx.substate_locks_insert_all(block_id, locks) diff --git a/dan_layer/storage/src/consensus_models/transaction.rs b/dan_layer/storage/src/consensus_models/transaction.rs index c208debb1..006bf7832 100644 --- a/dan_layer/storage/src/consensus_models/transaction.rs +++ b/dan_layer/storage/src/consensus_models/transaction.rs @@ -1,7 +1,11 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{collections::HashSet, ops::Deref, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + ops::Deref, + time::Duration, +}; use log::*; use serde::Deserialize; @@ -307,6 +311,24 @@ impl TransactionRecord { Ok((recs, tx_ids)) } + pub fn get_any_or_build + Clone>( + tx: &TTx, + transactions: I, + ) -> Result, StorageError> { + let mut tx_ids = transactions + .clone() + .into_iter() + .map(|t| (*t.id(), t)) + .collect::>(); + let mut recs = tx.transactions_get_any(tx_ids.keys())?; + for rec in &recs { + tx_ids.remove(rec.transaction.id()); + } + recs.extend(tx_ids.into_values().map(Self::new)); + + Ok(recs) + } + pub fn get_missing<'a, TTx: StateStoreReadTransaction, I: IntoIterator>( tx: &TTx, tx_ids: I, diff --git a/dan_layer/storage/src/consensus_models/transaction_pool.rs b/dan_layer/storage/src/consensus_models/transaction_pool.rs index 9f9033210..354ea34d5 100644 --- a/dan_layer/storage/src/consensus_models/transaction_pool.rs +++ b/dan_layer/storage/src/consensus_models/transaction_pool.rs @@ -15,7 +15,6 @@ use tari_dan_common_types::{ committee::CommitteeInfo, optional::{IsNotFoundError, Optional}, NumPreshards, - ShardGroup, ToSubstateAddress, }; use tari_engine_types::transaction_receipt::TransactionReceiptAddress; @@ -31,7 +30,6 @@ use crate::{ LeafBlock, LockedBlock, QcId, - SubstatePledges, TransactionAtom, TransactionExecution, TransactionRecord, @@ -93,12 +91,25 @@ impl TransactionPool { Ok(()) } + pub fn insert_new_batched<'a, I: IntoIterator>( + &self, + tx: &mut TStateStore::WriteTransaction<'_>, + transactions: I, + ) -> Result<(), TransactionPoolError> { + // TODO(perf) + for (transaction, is_ready) in transactions { + tx.transaction_pool_insert_new(*transaction.id(), transaction.current_decision(), is_ready)?; + } + Ok(()) + } + pub fn get_batch_for_next_block( &self, tx: &TStateStore::ReadTransaction<'_>, max: usize, + block_id: &BlockId, ) -> Result, TransactionPoolError> { - let recs = tx.transaction_pool_get_many_ready(max)?; + let recs = tx.transaction_pool_get_many_ready(max, block_id)?; Ok(recs) } @@ -380,7 +391,7 @@ impl TransactionPoolRecord { } } - pub fn is_ready_for_next_stage(&self) -> bool { + pub fn is_ready_for_pending_stage(&self) -> bool { self.can_continue_to(self.current_stage()) } @@ -540,7 +551,8 @@ impl TransactionPoolRecord { let addr = lock.to_substate_address(); let shard_group = addr.to_shard_group(num_preshards, num_committees); self.evidence_mut() - .add_shard_group_evidence(shard_group, addr, lock.lock_type()); + .add_shard_group(shard_group) + .insert(addr, lock.lock_type()); } // Only change the local decision if we haven't already decided to ABORT if self.local_decision().map_or(true, |d| d.is_commit()) { @@ -631,17 +643,6 @@ impl TransactionPoolRecord { } impl TransactionPoolRecord { - #[allow(clippy::mutable_key_type)] - pub fn add_foreign_pledges( - &self, - tx: &mut TTx, - shard_group: ShardGroup, - foreign_pledges: SubstatePledges, - ) -> Result<(), TransactionPoolError> { - tx.foreign_substate_pledges_save(self.transaction_id, shard_group, foreign_pledges)?; - Ok(()) - } - pub fn remove(&self, tx: &mut TTx) -> Result<(), TransactionPoolError> { tx.transaction_pool_remove(&self.transaction_id)?; Ok(()) diff --git a/dan_layer/storage/src/consensus_models/validated_block.rs b/dan_layer/storage/src/consensus_models/validated_block.rs index 21552deea..34d8185b5 100644 --- a/dan_layer/storage/src/consensus_models/validated_block.rs +++ b/dan_layer/storage/src/consensus_models/validated_block.rs @@ -65,6 +65,7 @@ impl ValidBlock { TTx: StateStoreWriteTransaction + Deref, TTx::Target: StateStoreReadTransaction, { + // TODO(perf) for block in &self.dummy_blocks { block.save(tx)?; } diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index 542aac15c..f43c78482 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -235,7 +235,11 @@ pub trait StateStoreReadTransaction: Sized { ) -> Result; fn transaction_pool_exists(&self, transaction_id: &TransactionId) -> Result; fn transaction_pool_get_all(&self) -> Result, StorageError>; - fn transaction_pool_get_many_ready(&self, max_txs: usize) -> Result, StorageError>; + fn transaction_pool_get_many_ready( + &self, + max_txs: usize, + block_id: &BlockId, + ) -> Result, StorageError>; fn transaction_pool_count( &self, stage: Option, @@ -343,6 +347,9 @@ pub trait StateStoreReadTransaction: Sized { ) -> Result, StorageError>; fn burnt_utxos_count(&self) -> Result; + + // -------------------------------- Foreign parked block -------------------------------- // + fn foreign_parked_blocks_exists(&self, block_id: &BlockId) -> Result; } pub trait StateStoreWriteTransaction { @@ -362,7 +369,7 @@ pub trait StateStoreWriteTransaction { ) -> Result<(), StorageError>; // -------------------------------- BlockDiff -------------------------------- // - fn block_diffs_insert(&mut self, block_diff: &BlockDiff) -> Result<(), StorageError>; + fn block_diffs_insert(&mut self, block_id: &BlockId, changes: &[SubstateChange]) -> Result<(), StorageError>; fn block_diffs_remove(&mut self, block_id: &BlockId) -> Result<(), StorageError>; // -------------------------------- QuorumCertificate -------------------------------- // @@ -481,9 +488,9 @@ pub trait StateStoreWriteTransaction { fn votes_insert(&mut self, vote: &Vote) -> Result<(), StorageError>; //---------------------------------- Substates --------------------------------------------// - fn substate_locks_insert_all)>>( + fn substate_locks_insert_all<'a, I: IntoIterator)>>( &mut self, - block_id: BlockId, + block_id: &BlockId, locks: I, ) -> Result<(), StorageError>; @@ -510,9 +517,9 @@ pub trait StateStoreWriteTransaction { #[allow(clippy::mutable_key_type)] fn foreign_substate_pledges_save( &mut self, - transaction_id: TransactionId, + transaction_id: &TransactionId, shard_group: ShardGroup, - pledges: SubstatePledges, + pledges: &SubstatePledges, ) -> Result<(), StorageError>; fn foreign_substate_pledges_remove_many<'a, I: IntoIterator>( @@ -525,7 +532,7 @@ pub trait StateStoreWriteTransaction { &mut self, block_id: BlockId, shard: Shard, - diff: VersionedStateHashTreeDiff, + diff: &VersionedStateHashTreeDiff, ) -> Result<(), StorageError>; fn pending_state_tree_diffs_remove_by_block(&mut self, block_id: &BlockId) -> Result<(), StorageError>; fn pending_state_tree_diffs_remove_and_return_by_block( diff --git a/integration_tests/tests/features/state_sync.feature b/integration_tests/tests/features/state_sync.feature index 63a00187e..0deb37d03 100644 --- a/integration_tests/tests/features/state_sync.feature +++ b/integration_tests/tests/features/state_sync.feature @@ -45,6 +45,7 @@ Feature: State Sync Then VN2 has scanned to height 37 Then the validator node VN2 is listed as registered + When I wait for validator VN has leaf block height of at least 1 at epoch 3 When I wait for validator VN2 has leaf block height of at least 1 at epoch 3 When I create an account UNUSED4 via the wallet daemon WALLET_D diff --git a/networking/core/src/worker.rs b/networking/core/src/worker.rs index 77861417a..38da25796 100644 --- a/networking/core/src/worker.rs +++ b/networking/core/src/worker.rs @@ -901,7 +901,7 @@ where } } - /// Establishes a relay circuit for the given peer if the it is the selected relay peer. Returns true if the circuit + /// Establishes a relay circuit for the given peer if it is the selected relay peer. Returns true if the circuit /// was established from this call. fn establish_relay_circuit_on_connect(&mut self, peer_id: &PeerId) -> bool { let Some(relay) = self.relays.selected_relay() else { diff --git a/utilities/transaction_generator/src/transaction_builders/free_coins.rs b/utilities/transaction_generator/src/transaction_builders/free_coins.rs index 309e38586..efd5306bb 100644 --- a/utilities/transaction_generator/src/transaction_builders/free_coins.rs +++ b/utilities/transaction_generator/src/transaction_builders/free_coins.rs @@ -8,7 +8,7 @@ use tari_engine_types::{component::new_component_address_from_public_key, instru use tari_template_builtin::ACCOUNT_TEMPLATE_ADDRESS; use tari_template_lib::{ args, - constants::{XTR_FAUCET_COMPONENT_ADDRESS, XTR_FAUCET_VAULT_ADDRESS}, + constants::{XTR, XTR_FAUCET_COMPONENT_ADDRESS, XTR_FAUCET_VAULT_ADDRESS}, models::Amount, }; use tari_transaction::Transaction; @@ -31,6 +31,7 @@ pub fn builder(_: u64) -> Transaction { .call_method(account_address, "pay_fee", args![Amount(1000)]) }) .with_inputs([ + SubstateRequirement::unversioned(XTR), SubstateRequirement::unversioned(XTR_FAUCET_COMPONENT_ADDRESS), SubstateRequirement::unversioned(XTR_FAUCET_VAULT_ADDRESS), ])