Skip to content

Commit

Permalink
feat(comms): allow multiple messaging protocol instances (#5748)
Browse files Browse the repository at this point in the history
Description
---
Allow multiple message protocol instances

Motivation and Context
---
In L2 we want to have 2 messaging substreams, one for hotstuff messages
and one for the rest (transactions, network messages) to prevent the
queue of incoming transactions from causing HS leader failures.

In the base layer we spawn a single messaging protocol with the same
identitfier, so nothing changes
 
How Has This Been Tested?
---
Existing tests and manually

What process can a PR reviewer use to test or verify this change?
---
Node and wallet function as before

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->

Co-authored-by: SW van Heerden <swvheerden@gmail.com>
  • Loading branch information
sdbondi and SWvheerden authored Sep 7, 2023
1 parent c53ec06 commit 3fba04e
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 32 deletions.
15 changes: 12 additions & 3 deletions base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use tari_comms::{
messaging::{MessagingEventSender, MessagingProtocolExtension},
rpc::RpcServer,
NodeNetworkInfo,
ProtocolId,
},
tor,
tor::HiddenServiceControllerError,
Expand Down Expand Up @@ -81,6 +82,9 @@ use crate::{
};
const LOG_TARGET: &str = "p2p::initialization";

/// ProtocolId for minotari messaging protocol
pub static MESSAGING_PROTOCOL_ID: ProtocolId = ProtocolId::from_static(b"t/msg/0.1");

#[derive(Debug, Error)]
pub enum CommsInitializationError {
#[error("Comms builder error: `{0}`")]
Expand Down Expand Up @@ -199,7 +203,8 @@ pub async fn initialize_local_test_comms<P: AsRef<Path>>(

let comms = comms
.add_protocol_extension(
MessagingProtocolExtension::new(event_sender.clone(), pipeline).enable_message_received_event(),
MessagingProtocolExtension::new(MESSAGING_PROTOCOL_ID.clone(), event_sender.clone(), pipeline)
.enable_message_received_event(),
)
.spawn_with_transport(MemoryTransport)
.await?;
Expand Down Expand Up @@ -374,8 +379,12 @@ async fn configure_comms_and_dht(

let (messaging_events_sender, _) = broadcast::channel(1);
comms = comms.add_protocol_extension(
MessagingProtocolExtension::new(messaging_events_sender, messaging_pipeline)
.with_ban_duration(config.dht.ban_duration_short),
MessagingProtocolExtension::new(
MESSAGING_PROTOCOL_ID.clone(),
messaging_events_sender,
messaging_pipeline,
)
.with_ban_duration(config.dht.ban_duration_short),
);

Ok((comms, dht))
Expand Down
6 changes: 5 additions & 1 deletion comms/core/examples/stress/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tari_comms::{
multiaddr::Multiaddr,
pipeline,
pipeline::SinkService,
protocol::{messaging::MessagingProtocolExtension, ProtocolNotification, Protocols},
protocol::{messaging::MessagingProtocolExtension, ProtocolId, ProtocolNotification, Protocols},
tor,
tor::TorIdentity,
transports::{predicate::FalsePredicate, SocksConfig, TcpWithTorTransport},
Expand All @@ -47,6 +47,8 @@ use tokio::sync::{broadcast, mpsc};

use super::{error::Error, STRESS_PROTOCOL_NAME, TOR_CONTROL_PORT_ADDR, TOR_SOCKS_ADDR};

static MSG_PROTOCOL_ID: ProtocolId = ProtocolId::from_static(b"example/msg/1.0");

pub async fn create(
node_identity: Option<Arc<NodeIdentity>>,
database_path: &Path,
Expand Down Expand Up @@ -115,6 +117,7 @@ pub async fn create(
.build()?
.add_protocol_extensions(protocols.into())
.add_protocol_extension(MessagingProtocolExtension::new(
MSG_PROTOCOL_ID.clone(),
event_tx,
pipeline::Builder::new()
.with_inbound_pipeline(SinkService::new(inbound_tx))
Expand Down Expand Up @@ -148,6 +151,7 @@ pub async fn create(
.with_hidden_service_controller(hs_ctl)
.add_protocol_extensions(protocols.into())
.add_protocol_extension(MessagingProtocolExtension::new(
MSG_PROTOCOL_ID.clone(),
event_tx,
pipeline::Builder::new()
.with_inbound_pipeline(SinkService::new(inbound_tx))
Expand Down
5 changes: 4 additions & 1 deletion comms/core/examples/tor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tari_comms::{
peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures},
pipeline,
pipeline::SinkService,
protocol::messaging::MessagingProtocolExtension,
protocol::{messaging::MessagingProtocolExtension, ProtocolId},
tor,
CommsBuilder,
CommsNode,
Expand All @@ -34,6 +34,8 @@ use tokio::{
//
// _Note:_ A running tor proxy with `ControlPort` set is required for this example to work.

static MSG_PROTOCOL_ID: ProtocolId = ProtocolId::from_static(b"example/tor/msg/1.0");

type Error = anyhow::Error;

#[tokio::main]
Expand Down Expand Up @@ -195,6 +197,7 @@ async fn setup_node_with_tor<P: Into<tor::PortMapping>>(
let (event_tx, _) = broadcast::channel(1);
let comms_node = comms_node
.add_protocol_extension(MessagingProtocolExtension::new(
MSG_PROTOCOL_ID.clone(),
event_tx,
pipeline::Builder::new()
// Outbound messages will be forwarded "as is" to outbound messaging
Expand Down
2 changes: 2 additions & 0 deletions comms/core/src/builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::{
protocol::{
messaging::{MessagingEvent, MessagingEventSender, MessagingProtocolExtension},
ProtocolEvent,
ProtocolId,
Protocols,
},
test_utils::node_identity::build_node_identity,
Expand Down Expand Up @@ -91,6 +92,7 @@ async fn spawn_node(
.add_protocol_extensions(protocols.into())
.add_protocol_extension(
MessagingProtocolExtension::new(
ProtocolId::from_static(b"test/msg"),
messaging_events_sender.clone(),
pipeline::Builder::new()
// Outbound messages will be forwarded "as is" to outbound messaging
Expand Down
14 changes: 11 additions & 3 deletions comms/core/src/protocol/messaging/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ use crate::{
message::InboundMessage,
pipeline,
protocol::{
messaging::{protocol::MESSAGING_PROTOCOL_ID, MessagingEventSender},
messaging::MessagingEventSender,
ProtocolExtension,
ProtocolExtensionContext,
ProtocolExtensionError,
ProtocolId,
},
};

Expand All @@ -52,11 +53,17 @@ pub struct MessagingProtocolExtension<TInPipe, TOutPipe, TOutReq> {
pipeline: pipeline::Config<TInPipe, TOutPipe, TOutReq>,
enable_message_received_event: bool,
ban_duration: Duration,
protocol_id: ProtocolId,
}

impl<TInPipe, TOutPipe, TOutReq> MessagingProtocolExtension<TInPipe, TOutPipe, TOutReq> {
pub fn new(event_tx: MessagingEventSender, pipeline: pipeline::Config<TInPipe, TOutPipe, TOutReq>) -> Self {
pub fn new(
protocol_id: ProtocolId,
event_tx: MessagingEventSender,
pipeline: pipeline::Config<TInPipe, TOutPipe, TOutReq>,
) -> Self {
Self {
protocol_id,
event_tx,
pipeline,
enable_message_received_event: false,
Expand Down Expand Up @@ -91,12 +98,13 @@ where
{
fn install(mut self: Box<Self>, context: &mut ProtocolExtensionContext) -> Result<(), ProtocolExtensionError> {
let (proto_tx, proto_rx) = mpsc::channel(MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE);
context.add_protocol(&[MESSAGING_PROTOCOL_ID.clone()], &proto_tx);
context.add_protocol(&[self.protocol_id.clone()], &proto_tx);

let (inbound_message_tx, inbound_message_rx) = mpsc::channel(INBOUND_MESSAGE_BUFFER_SIZE);

let message_receiver = self.pipeline.outbound.out_receiver.take().unwrap();
let messaging = MessagingProtocol::new(
self.protocol_id.clone(),
context.connectivity(),
proto_rx,
message_receiver,
Expand Down
9 changes: 1 addition & 8 deletions comms/core/src/protocol/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,7 @@ mod inbound;
mod metrics;
mod outbound;
mod protocol;
pub use protocol::{
MessagingEvent,
MessagingEventReceiver,
MessagingEventSender,
MessagingProtocol,
SendFailReason,
MESSAGING_PROTOCOL_ID,
};
pub use protocol::{MessagingEvent, MessagingEventReceiver, MessagingEventSender, MessagingProtocol, SendFailReason};

#[cfg(test)]
mod test;
7 changes: 5 additions & 2 deletions comms/core/src/protocol/messaging/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
message::OutboundMessage,
multiplexing::Substream,
peer_manager::NodeId,
protocol::messaging::protocol::MESSAGING_PROTOCOL_ID,
protocol::ProtocolId,
stream_id::StreamId,
};

Expand All @@ -50,6 +50,7 @@ pub struct OutboundMessaging {
messaging_events_tx: mpsc::Sender<MessagingEvent>,
retry_queue_tx: mpsc::UnboundedSender<OutboundMessage>,
peer_node_id: NodeId,
protocol_id: ProtocolId,
}

impl OutboundMessaging {
Expand All @@ -59,13 +60,15 @@ impl OutboundMessaging {
messages_rx: mpsc::UnboundedReceiver<OutboundMessage>,
retry_queue_tx: mpsc::UnboundedSender<OutboundMessage>,
peer_node_id: NodeId,
protocol_id: ProtocolId,
) -> Self {
Self {
connectivity,
messages_rx,
messaging_events_tx,
retry_queue_tx,
peer_node_id,
protocol_id,
}
}

Expand Down Expand Up @@ -223,7 +226,7 @@ impl OutboundMessaging {
&mut self,
conn: &mut PeerConnection,
) -> Result<NegotiatedSubstream<Substream>, MessagingProtocolError> {
match conn.open_substream(&MESSAGING_PROTOCOL_ID).await {
match conn.open_substream(&self.protocol_id).await {
Ok(substream) => Ok(substream),
Err(err) => {
debug!(
Expand Down
17 changes: 14 additions & 3 deletions comms/core/src/protocol/messaging/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use std::{
time::Duration,
};

use bytes::Bytes;
use log::*;
use tari_shutdown::{Shutdown, ShutdownSignal};
use thiserror::Error;
Expand All @@ -48,12 +47,12 @@ use crate::{
protocol::{
messaging::{inbound::InboundMessaging, outbound::OutboundMessaging},
ProtocolEvent,
ProtocolId,
ProtocolNotification,
},
};

const LOG_TARGET: &str = "comms::protocol::messaging";
pub static MESSAGING_PROTOCOL_ID: Bytes = Bytes::from_static(b"t/msg/0.1");
const INTERNAL_MESSAGING_EVENT_CHANNEL_SIZE: usize = 10;

const MAX_FRAME_LENGTH: usize = 8 * 1_024 * 1_024;
Expand Down Expand Up @@ -102,6 +101,7 @@ impl fmt::Display for MessagingEvent {

/// Actor responsible for lazily spawning inbound (protocol notifications) and outbound (mpsc channel) messaging actors.
pub struct MessagingProtocol {
protocol_id: ProtocolId,
connectivity: ConnectivityRequester,
proto_notification: mpsc::Receiver<ProtocolNotification<Substream>>,
active_queues: HashMap<NodeId, mpsc::UnboundedSender<OutboundMessage>>,
Expand All @@ -122,6 +122,7 @@ pub struct MessagingProtocol {
impl MessagingProtocol {
/// Create a new messaging protocol actor.
pub(super) fn new(
protocol_id: ProtocolId,
connectivity: ConnectivityRequester,
proto_notification: mpsc::Receiver<ProtocolNotification<Substream>>,
outbound_message_rx: mpsc::UnboundedReceiver<OutboundMessage>,
Expand All @@ -134,6 +135,7 @@ impl MessagingProtocol {
let (retry_queue_tx, retry_queue_rx) = mpsc::unbounded_channel();

Self {
protocol_id,
connectivity,
proto_notification,
outbound_message_rx,
Expand Down Expand Up @@ -287,6 +289,7 @@ impl MessagingProtocol {
self.internal_messaging_event_tx.clone(),
peer_node_id,
self.retry_queue_tx.clone(),
self.protocol_id.clone(),
);
break entry.insert(sender);
},
Expand Down Expand Up @@ -315,9 +318,17 @@ impl MessagingProtocol {
events_tx: mpsc::Sender<MessagingEvent>,
peer_node_id: NodeId,
retry_queue_tx: mpsc::UnboundedSender<OutboundMessage>,
protocol_id: ProtocolId,
) -> mpsc::UnboundedSender<OutboundMessage> {
let (msg_tx, msg_rx) = mpsc::unbounded_channel();
let outbound_messaging = OutboundMessaging::new(connectivity, events_tx, msg_rx, retry_queue_tx, peer_node_id);
let outbound_messaging = OutboundMessaging::new(
connectivity,
events_tx,
msg_rx,
retry_queue_tx,
peer_node_id,
protocol_id,
);
tokio::spawn(outbound_messaging.run());
msg_tx
}
Expand Down
12 changes: 10 additions & 2 deletions comms/core/src/protocol/messaging/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,18 @@ use tokio::{
time,
};

use super::protocol::{MessagingEvent, MessagingEventReceiver, MessagingProtocol, MESSAGING_PROTOCOL_ID};
use super::protocol::{MessagingEventReceiver, MessagingProtocol};
use crate::{
message::{InboundMessage, MessageTag, MessagingReplyRx, OutboundMessage},
multiplexing::Substream,
net_address::MultiaddressesWithStats,
peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures, PeerFlags, PeerManager},
protocol::{messaging::SendFailReason, ProtocolEvent, ProtocolNotification},
protocol::{
messaging::{MessagingEvent, SendFailReason},
ProtocolEvent,
ProtocolId,
ProtocolNotification,
},
test_utils::{
mocks::{create_connectivity_mock, create_peer_connection_mock_pair, ConnectivityManagerMockState},
node_id,
Expand All @@ -51,6 +56,8 @@ use crate::{

static TEST_MSG1: Bytes = Bytes::from_static(b"TEST_MSG1");

static MESSAGING_PROTOCOL_ID: ProtocolId = ProtocolId::from_static(b"test/msg");

async fn spawn_messaging_protocol() -> (
Arc<PeerManager>,
Arc<NodeIdentity>,
Expand All @@ -75,6 +82,7 @@ async fn spawn_messaging_protocol() -> (
let (events_tx, events_rx) = broadcast::channel(100);

let msg_proto = MessagingProtocol::new(
MESSAGING_PROTOCOL_ID.clone(),
requester,
proto_rx,
request_rx,
Expand Down
5 changes: 4 additions & 1 deletion comms/dht/examples/memory_net/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use tari_comms::{
protocol::{
messaging::{MessagingEvent, MessagingEventReceiver, MessagingEventSender, MessagingProtocolExtension},
rpc::RpcServer,
ProtocolId,
},
transports::MemoryTransport,
types::CommsDatabase,
Expand Down Expand Up @@ -75,6 +76,7 @@ use tower::ServiceBuilder;

use crate::memory_net::DrainBurst;

pub static MEMORYNET_MSG_PROTOCOL_ID: ProtocolId = ProtocolId::from_static(b"t/msg/1.0");
pub type NodeEventRx = mpsc::UnboundedReceiver<(NodeId, NodeId)>;
pub type NodeEventTx = mpsc::UnboundedSender<(NodeId, NodeId)>;

Expand Down Expand Up @@ -969,7 +971,8 @@ async fn setup_comms_dht(
let comms = comms
.add_rpc_server(RpcServer::new().add_service(dht.rpc_service()))
.add_protocol_extension(
MessagingProtocolExtension::new(messaging_events_tx.clone(), pipeline).enable_message_received_event(),
MessagingProtocolExtension::new(MEMORYNET_MSG_PROTOCOL_ID.clone(), messaging_events_tx.clone(), pipeline)
.enable_message_received_event(),
)
.spawn_with_transport(MemoryTransport)
.await
Expand Down
5 changes: 4 additions & 1 deletion comms/dht/examples/propagation/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tari_comms::{
peer_manager::PeerFeatures,
pipeline,
pipeline::SinkService,
protocol::{messaging::MessagingProtocolExtension, NodeNetworkInfo},
protocol::{messaging::MessagingProtocolExtension, NodeNetworkInfo, ProtocolId},
tor,
tor::TorIdentity,
CommsBuilder,
Expand All @@ -46,6 +46,8 @@ use tower::ServiceBuilder;

use crate::parse_from_short_str;

pub static MEMORYNET_MSG_PROTOCOL_ID: ProtocolId = ProtocolId::from_static(b"t/msg/1.0");

pub const TOR_CONTROL_PORT_ADDR: &str = "/ip4/127.0.0.1/tcp/9051";

pub async fn create<P: AsRef<Path>>(
Expand Down Expand Up @@ -132,6 +134,7 @@ pub async fn create<P: AsRef<Path>>(
let comms_node = comms_node
.with_hidden_service_controller(hs_ctl)
.add_protocol_extension(MessagingProtocolExtension::new(
MEMORYNET_MSG_PROTOCOL_ID.clone(),
event_tx,
pipeline::Builder::new()
.with_inbound_pipeline(
Expand Down
Loading

0 comments on commit 3fba04e

Please sign in to comment.