Skip to content

Commit

Permalink
fix(consensus): dont transition to error state with invalid foreign p…
Browse files Browse the repository at this point in the history
…roposal
  • Loading branch information
sdbondi committed Feb 5, 2024
1 parent c29d5b8 commit 5e5b656
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 19 deletions.
15 changes: 12 additions & 3 deletions dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,23 @@ where TConsensusSpec: ConsensusSpec
.await?;

let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
self.validate_proposed_block(
if let Err(err) = self.validate_proposed_block(
&from,
&block,
committee_shard.shard(),
local_shard.shard(),
&foreign_receive_counter,
)?;
// Is this ok? Can foreign node send invalid block that should still increment the counter?
) {
warn!(
target: LOG_TARGET,
"🔥 FOREIGN PROPOSAL: Invalid proposal from {}: {}. Ignoring.",
from,
err
);
// Invalid blocks should not cause the state machine to transition to Error
return Ok(());
}

foreign_receive_counter.increment(&committee_shard.shard());

let tx_ids = block
Expand Down
4 changes: 2 additions & 2 deletions dan_layer/consensus_tests/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,12 @@ async fn leader_failure_node_goes_down() {
test.assert_clean_shutdown().await;
}

#[tokio::test(flavor = "multi_thread")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn foreign_block_distribution() {
setup_logger();
let mut test = Test::builder()
.with_test_timeout(Duration::from_secs(60))
.with_hotstuff_filter(Box::new(|from: &TestAddress, to: &TestAddress, _| {
.with_message_filter(Box::new(move |from: &TestAddress, to: &TestAddress, _| {
match from.0.as_str() {
// We filter our message from each leader to the foreign committees. So we will rely on other members of
// the local committees to send the message to the foreign committee members. And then on
Expand Down
12 changes: 6 additions & 6 deletions dan_layer/consensus_tests/src/support/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tari_shutdown::{Shutdown, ShutdownSignal};
use tari_transaction::TransactionId;
use tokio::{sync::broadcast, task};

use super::HotstuffFilter;
use super::MessageFilter;
use crate::support::{
address::TestAddress,
epoch_manager::TestEpochManager,
Expand Down Expand Up @@ -283,7 +283,7 @@ pub struct TestBuilder {
sql_address: String,
timeout: Option<Duration>,
debug_sql_file: Option<String>,
hotstuff_filter: Option<HotstuffFilter>,
message_filter: Option<MessageFilter>,
}

impl TestBuilder {
Expand All @@ -293,7 +293,7 @@ impl TestBuilder {
sql_address: ":memory:".to_string(),
timeout: Some(Duration::from_secs(10)),
debug_sql_file: None,
hotstuff_filter: None,
message_filter: None,
}
}

Expand Down Expand Up @@ -337,8 +337,8 @@ impl TestBuilder {
self
}

pub fn with_hotstuff_filter(mut self, hotstuff_filter: HotstuffFilter) -> Self {
self.hotstuff_filter = Some(hotstuff_filter);
pub fn with_message_filter(mut self, message_filter: MessageFilter) -> Self {
self.message_filter = Some(message_filter.into());
self
}

Expand Down Expand Up @@ -389,7 +389,7 @@ impl TestBuilder {
let (channels, validators) = self
.build_validators(&leader_strategy, &epoch_manager, shutdown.to_signal())
.await;
let network = spawn_network(channels, shutdown.to_signal(), self.hotstuff_filter);
let network = spawn_network(channels, shutdown.to_signal(), self.message_filter);

Test {
validators,
Expand Down
16 changes: 8 additions & 8 deletions dan_layer/consensus_tests/src/support/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use tokio::sync::{

use crate::support::{address::TestAddress, ValidatorChannels};

pub type HotstuffFilter = Box<dyn Fn(&TestAddress, &TestAddress, &HotstuffMessage) -> bool + Sync + Send + 'static>;
pub type MessageFilter = Box<dyn Fn(&TestAddress, &TestAddress, &HotstuffMessage) -> bool + Sync + Send + 'static>;

pub fn spawn_network(
channels: Vec<ValidatorChannels>,
shutdown_signal: ShutdownSignal,
hotstuff_filter: Option<HotstuffFilter>,
message_filter: Option<MessageFilter>,
) -> TestNetwork {
let tx_new_transactions = channels
.iter()
Expand Down Expand Up @@ -77,7 +77,7 @@ pub fn spawn_network(
transaction_store: Arc::new(Default::default()),
offline_destinations: offline_destinations.clone(),
shutdown_signal,
hotstuff_filter,
message_filter,
}
.spawn();

Expand Down Expand Up @@ -195,7 +195,7 @@ pub struct TestNetworkWorker {

offline_destinations: Arc<RwLock<Vec<TestNetworkDestination>>>,
shutdown_signal: ShutdownSignal,
hotstuff_filter: Option<HotstuffFilter>,
message_filter: Option<MessageFilter>,
}

impl TestNetworkWorker {
Expand Down Expand Up @@ -292,8 +292,8 @@ impl TestNetworkWorker {
pub async fn handle_broadcast(&mut self, from: TestAddress, to: Vec<TestAddress>, msg: HotstuffMessage) {
log::debug!("🌎️ Broadcast {} from {} to {}", msg, from, to.iter().join(", "));
for vn in to {
if let Some(hotstuff_filter) = &self.hotstuff_filter {
if !hotstuff_filter(&from, &vn, &msg) {
if let Some(message_filter) = &self.message_filter {
if !message_filter(&from, &vn, &msg) {
self.num_filtered_messages
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
continue;
Expand All @@ -317,8 +317,8 @@ impl TestNetworkWorker {
}

async fn handle_leader(&mut self, from: TestAddress, to: TestAddress, msg: HotstuffMessage) {
if let Some(hotstuff_filter) = &self.hotstuff_filter {
if !hotstuff_filter(&from, &to, &msg) {
if let Some(message_filter) = &self.message_filter {
if !message_filter(&from, &to, &msg) {
self.num_filtered_messages
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return;
Expand Down

0 comments on commit 5e5b656

Please sign in to comment.