diff --git a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs index 59c1c6cde..3d5cffb3c 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs @@ -73,14 +73,23 @@ where TConsensusSpec: ConsensusSpec .await?; let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?; - self.validate_proposed_block( + if let Err(err) = self.validate_proposed_block( &from, &block, committee_shard.shard(), local_shard.shard(), &foreign_receive_counter, - )?; - // Is this ok? Can foreign node send invalid block that should still increment the counter? + ) { + warn!( + target: LOG_TARGET, + "🔥 FOREIGN PROPOSAL: Invalid proposal from {}: {}. Ignoring.", + from, + err + ); + // Invalid blocks should not cause the state machine to transition to Error + return Ok(()); + } + foreign_receive_counter.increment(&committee_shard.shard()); let tx_ids = block diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index a9f9560cf..d6169f0bf 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -405,12 +405,12 @@ async fn leader_failure_node_goes_down() { test.assert_clean_shutdown().await; } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn foreign_block_distribution() { setup_logger(); let mut test = Test::builder() .with_test_timeout(Duration::from_secs(60)) - .with_hotstuff_filter(Box::new(|from: &TestAddress, to: &TestAddress, _| { + .with_message_filter(Box::new(move |from: &TestAddress, to: &TestAddress, _| { match from.0.as_str() { // We filter our message from each leader to the foreign committees. So we will rely on other members of // the local committees to send the message to the foreign committee members. And then on diff --git a/dan_layer/consensus_tests/src/support/harness.rs b/dan_layer/consensus_tests/src/support/harness.rs index f773bfca9..37fb6433c 100644 --- a/dan_layer/consensus_tests/src/support/harness.rs +++ b/dan_layer/consensus_tests/src/support/harness.rs @@ -21,7 +21,7 @@ use tari_shutdown::{Shutdown, ShutdownSignal}; use tari_transaction::TransactionId; use tokio::{sync::broadcast, task}; -use super::HotstuffFilter; +use super::MessageFilter; use crate::support::{ address::TestAddress, epoch_manager::TestEpochManager, @@ -283,7 +283,7 @@ pub struct TestBuilder { sql_address: String, timeout: Option, debug_sql_file: Option, - hotstuff_filter: Option, + message_filter: Option, } impl TestBuilder { @@ -293,7 +293,7 @@ impl TestBuilder { sql_address: ":memory:".to_string(), timeout: Some(Duration::from_secs(10)), debug_sql_file: None, - hotstuff_filter: None, + message_filter: None, } } @@ -337,8 +337,8 @@ impl TestBuilder { self } - pub fn with_hotstuff_filter(mut self, hotstuff_filter: HotstuffFilter) -> Self { - self.hotstuff_filter = Some(hotstuff_filter); + pub fn with_message_filter(mut self, message_filter: MessageFilter) -> Self { + self.message_filter = Some(message_filter.into()); self } @@ -389,7 +389,7 @@ impl TestBuilder { let (channels, validators) = self .build_validators(&leader_strategy, &epoch_manager, shutdown.to_signal()) .await; - let network = spawn_network(channels, shutdown.to_signal(), self.hotstuff_filter); + let network = spawn_network(channels, shutdown.to_signal(), self.message_filter); Test { validators, diff --git a/dan_layer/consensus_tests/src/support/network.rs b/dan_layer/consensus_tests/src/support/network.rs index 00a0dc6e9..186dd039d 100644 --- a/dan_layer/consensus_tests/src/support/network.rs +++ b/dan_layer/consensus_tests/src/support/network.rs @@ -25,12 +25,12 @@ use tokio::sync::{ use crate::support::{address::TestAddress, ValidatorChannels}; -pub type HotstuffFilter = Box bool + Sync + Send + 'static>; +pub type MessageFilter = Box bool + Sync + Send + 'static>; pub fn spawn_network( channels: Vec, shutdown_signal: ShutdownSignal, - hotstuff_filter: Option, + message_filter: Option, ) -> TestNetwork { let tx_new_transactions = channels .iter() @@ -77,7 +77,7 @@ pub fn spawn_network( transaction_store: Arc::new(Default::default()), offline_destinations: offline_destinations.clone(), shutdown_signal, - hotstuff_filter, + message_filter, } .spawn(); @@ -195,7 +195,7 @@ pub struct TestNetworkWorker { offline_destinations: Arc>>, shutdown_signal: ShutdownSignal, - hotstuff_filter: Option, + message_filter: Option, } impl TestNetworkWorker { @@ -292,8 +292,8 @@ impl TestNetworkWorker { pub async fn handle_broadcast(&mut self, from: TestAddress, to: Vec, msg: HotstuffMessage) { log::debug!("🌎️ Broadcast {} from {} to {}", msg, from, to.iter().join(", ")); for vn in to { - if let Some(hotstuff_filter) = &self.hotstuff_filter { - if !hotstuff_filter(&from, &vn, &msg) { + if let Some(message_filter) = &self.message_filter { + if !message_filter(&from, &vn, &msg) { self.num_filtered_messages .fetch_add(1, std::sync::atomic::Ordering::Relaxed); continue; @@ -317,8 +317,8 @@ impl TestNetworkWorker { } async fn handle_leader(&mut self, from: TestAddress, to: TestAddress, msg: HotstuffMessage) { - if let Some(hotstuff_filter) = &self.hotstuff_filter { - if !hotstuff_filter(&from, &to, &msg) { + if let Some(message_filter) = &self.message_filter { + if !message_filter(&from, &to, &msg) { self.num_filtered_messages .fetch_add(1, std::sync::atomic::Ordering::Relaxed); return;