Skip to content

Commit

Permalink
fix consensus test setup
Browse files Browse the repository at this point in the history
  • Loading branch information
mrnaveira committed Sep 23, 2024
1 parent 1959777 commit 5400231
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 11 deletions.
31 changes: 27 additions & 4 deletions dan_layer/consensus_tests/src/support/messaging_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,32 @@ use tari_consensus::{
traits::{InboundMessaging, InboundMessagingError, OutboundMessaging, OutboundMessagingError},
};
use tari_dan_common_types::ShardGroup;
use tari_epoch_manager::EpochManagerReader;
use tokio::sync::mpsc;

use super::epoch_manager::TestEpochManager;
use crate::support::TestAddress;

#[derive(Debug, Clone)]
pub struct TestOutboundMessaging {
epoch_manager: TestEpochManager,
tx_leader: mpsc::Sender<(TestAddress, HotstuffMessage)>,
_tx_broadcast: mpsc::Sender<(Vec<TestAddress>, HotstuffMessage)>,
tx_broadcast: mpsc::Sender<(Vec<TestAddress>, HotstuffMessage)>,
loopback_sender: mpsc::Sender<HotstuffMessage>,
}

impl TestOutboundMessaging {
pub fn create(
epoch_manager: TestEpochManager,
tx_leader: mpsc::Sender<(TestAddress, HotstuffMessage)>,
tx_broadcast: mpsc::Sender<(Vec<TestAddress>, HotstuffMessage)>,
) -> (Self, mpsc::Receiver<HotstuffMessage>) {
let (loopback_sender, loopback_receiver) = mpsc::channel(100);
(
Self {
epoch_manager,
tx_leader,
_tx_broadcast: tx_broadcast,
tx_broadcast,
loopback_sender,
},
loopback_receiver,
Expand Down Expand Up @@ -61,12 +66,30 @@ impl OutboundMessaging for TestOutboundMessaging {
})
}

async fn multicast<'a, T>(&mut self, _shard_group: ShardGroup, _message: T) -> Result<(), OutboundMessagingError>
async fn multicast<'a, T>(&mut self, shard_group: ShardGroup, message: T) -> Result<(), OutboundMessagingError>
where
Self::Addr: 'a,
T: Into<HotstuffMessage> + Send,
{
Ok(())
let epoch = self
.epoch_manager
.current_epoch()
.await
.map_err(|e| OutboundMessagingError::UpstreamError(e.into()))?;
let peers: Vec<TestAddress> = self
.epoch_manager
.get_committees_by_shard_group(epoch, shard_group)
.await
.map_err(|e| OutboundMessagingError::UpstreamError(e.into()))?
.values()
.flat_map(|c| c.addresses().cloned())
.collect();

self.tx_broadcast.send((peers, message.into())).await.map_err(|_| {
OutboundMessagingError::FailedToEnqueueMessage {
reason: "broadcast channel closed".to_string(),
}
})
}
}

Expand Down
15 changes: 8 additions & 7 deletions dan_layer/consensus_tests/src/support/validator/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,21 @@ impl ValidatorBuilder {
let (tx_hs_message, rx_hs_message) = mpsc::channel(100);
let (tx_leader, rx_leader) = mpsc::channel(100);

let (outbound_messaging, rx_loopback) = TestOutboundMessaging::create(tx_leader, tx_broadcast);
let epoch_manager = self.epoch_manager.as_ref().unwrap().clone_for(
self.address.clone(),
self.public_key.clone(),
self.shard_address,
);

let (outbound_messaging, rx_loopback) =
TestOutboundMessaging::create(epoch_manager.clone(), tx_leader, tx_broadcast);
let inbound_messaging = TestInboundMessaging::new(self.address.clone(), rx_hs_message, rx_loopback);

let store = SqliteStateStore::connect(&self.sql_url).unwrap();
let signing_service = TestVoteSignatureService::new(self.address.clone());
let transaction_pool = TransactionPool::new();
let (tx_events, _) = broadcast::channel(100);

let epoch_manager = self.epoch_manager.as_ref().unwrap().clone_for(
self.address.clone(),
self.public_key.clone(),
self.shard_address,
);

let transaction_executor = TestBlockTransactionProcessor::new(self.transaction_executions.clone());

let worker = HotstuffWorker::<TestConsensusSpec>::new(
Expand Down

0 comments on commit 5400231

Please sign in to comment.