diff --git a/Cargo.lock b/Cargo.lock index 1ad880641445..bb9c305c1084 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7936,6 +7936,7 @@ dependencies = [ "pallet-transaction-payment-rpc-runtime-api", "parity-db", "parity-scale-codec", + "parking_lot 0.12.1", "polkadot-approval-distribution", "polkadot-availability-bitfield-distribution", "polkadot-availability-distribution", diff --git a/node/network/bridge/Cargo.toml b/node/network/bridge/Cargo.toml index 4f3d6306aa8e..56b506df627d 100644 --- a/node/network/bridge/Cargo.toml +++ b/node/network/bridge/Cargo.toml @@ -6,7 +6,6 @@ edition.workspace = true license.workspace = true [dependencies] -always-assert = "0.1" async-trait = "0.1.57" futures = "0.3.21" gum = { package = "tracing-gum", path = "../../gum" } diff --git a/node/network/bridge/src/network.rs b/node/network/bridge/src/network.rs index 04026197f6eb..42f734b9ff67 100644 --- a/node/network/bridge/src/network.rs +++ b/node/network/bridge/src/network.rs @@ -14,21 +14,24 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use async_trait::async_trait; -use futures::{prelude::*, stream::BoxStream}; +use parking_lot::Mutex; use parity_scale_codec::Encode; use sc_network::{ - config::parse_addr, multiaddr::Multiaddr, types::ProtocolName, Event as NetworkEvent, - IfDisconnected, NetworkEventStream, NetworkNotification, NetworkPeers, NetworkRequest, - NetworkService, OutboundFailure, ReputationChange, RequestFailure, + config::parse_addr, multiaddr::Multiaddr, service::traits::MessageSink, types::ProtocolName, + IfDisconnected, NetworkPeers, NetworkRequest, NetworkService, ObservedRole, OutboundFailure, + ReputationChange, RequestFailure, }; use polkadot_node_network_protocol::{ - peer_set::{PeerSet, PeerSetProtocolNames, ProtocolVersion}, + peer_set::{PeerSet, ProtocolVersion}, request_response::{OutgoingRequest, Recipient, ReqProtocolNames, Requests}, PeerId, }; @@ -45,13 +48,12 @@ const LOG_TARGET: &'static str = "parachain::network-bridge-net"; /// messages that are compatible with the passed peer set, as that is currently not enforced by /// this function. These are messages of type `WireMessage` parameterized on the matching type. pub(crate) fn send_message( - net: &mut impl Network, mut peers: Vec, peer_set: PeerSet, version: ProtocolVersion, - protocol_names: &PeerSetProtocolNames, message: M, metrics: &super::Metrics, + network_notification_sinks: &Arc>>>, ) where M: Encode + Clone, { @@ -61,29 +63,31 @@ pub(crate) fn send_message( encoded }; + let notification_sinks = network_notification_sinks.lock(); + // optimization: avoid cloning the message for the last peer in the // list. The message payload can be quite large. If the underlying // network used `Bytes` this would not be necessary. + // + // peer may have gotten disconnect by the time `send_message()` is called + // at which point the the sink is not available. let last_peer = peers.pop(); - // optimization: generate the protocol name once. - let protocol_name = protocol_names.get_name(peer_set, version); peers.into_iter().for_each(|peer| { - net.write_notification(peer, protocol_name.clone(), message.clone()); + if let Some(sink) = notification_sinks.get(&(peer_set, peer)) { + sink.send_sync_notification(message.clone()); + } }); + if let Some(peer) = last_peer { - net.write_notification(peer, protocol_name, message); + if let Some(sink) = notification_sinks.get(&(peer_set, peer)) { + sink.send_sync_notification(message.clone()); + } } } /// An abstraction over networking for the purposes of this subsystem. #[async_trait] pub trait Network: Clone + Send + 'static { - /// Get a stream of all events occurring on the network. This may include events unrelated - /// to the Polkadot protocol - the user of this function should filter only for events related - /// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME) - /// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME) - fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>; - /// Ask the network to keep a substream open with these nodes and not disconnect from them /// until removed from the protocol's peer set. /// Note that `out_peers` setting has no effect on this. @@ -115,16 +119,12 @@ pub trait Network: Clone + Send + 'static { /// Disconnect a given peer from the protocol specified without harming reputation. fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName); - /// Write a notification to a peer on the given protocol. - fn write_notification(&self, who: PeerId, protocol: ProtocolName, message: Vec); + /// Get peer role. + fn peer_role(&self, who: PeerId, handshake: Vec) -> Option; } #[async_trait] impl Network for Arc> { - fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { - NetworkService::event_stream(self, "polkadot-network-bridge").boxed() - } - async fn set_reserved_peers( &mut self, protocol: ProtocolName, @@ -149,10 +149,6 @@ impl Network for Arc> { NetworkService::disconnect_peer(&**self, who, protocol); } - fn write_notification(&self, who: PeerId, protocol: ProtocolName, message: Vec) { - NetworkService::write_notification(&**self, who, protocol, message); - } - async fn start_request( &self, authority_discovery: &mut AD, @@ -224,6 +220,10 @@ impl Network for Arc> { if_disconnected, ); } + + fn peer_role(&self, who: PeerId, handshake: Vec) -> Option { + NetworkService::peer_role(self, who, handshake) + } } /// We assume one `peer_id` per `authority_id`. diff --git a/node/network/bridge/src/rx/mod.rs b/node/network/bridge/src/rx/mod.rs index 11a2dc6be83a..72ffbd9252ac 100644 --- a/node/network/bridge/src/rx/mod.rs +++ b/node/network/bridge/src/rx/mod.rs @@ -17,21 +17,18 @@ //! The Network Bridge Subsystem - handles _incoming_ messages from the network, forwarded to the relevant subsystems. use super::*; -use always_assert::never; use bytes::Bytes; -use futures::stream::BoxStream; use parity_scale_codec::{Decode, DecodeAll}; -use sc_network::Event as NetworkEvent; +use sc_network::service::traits::{ + MessageSink, NotificationEvent, NotificationService, ValidationResult, +}; use sp_consensus::SyncOracle; use polkadot_node_network_protocol::{ self as net_protocol, grid_topology::{SessionGridTopology, TopologyPeerInfo}, - peer_set::{ - CollationVersion, PeerSet, PeerSetProtocolNames, PerPeerSet, ProtocolVersion, - ValidationVersion, - }, + peer_set::{CollationVersion, PeerSet, PeerSetProtocolNames, ValidationVersion}, v1 as protocol_v1, ObservedRole, OurView, PeerId, UnifiedReputationChange as Rep, View, }; @@ -83,6 +80,9 @@ pub struct NetworkBridgeRx { shared: Shared, metrics: Metrics, peerset_protocol_names: PeerSetProtocolNames, + validation_service: Box, + collation_service: Box, + network_notification_sinks: Arc>>>, } impl NetworkBridgeRx { @@ -96,8 +96,18 @@ impl NetworkBridgeRx { sync_oracle: Box, metrics: Metrics, peerset_protocol_names: PeerSetProtocolNames, + mut notification_services: HashMap>, + network_notification_sinks: Arc>>>, ) -> Self { let shared = Shared::default(); + + let validation_service = notification_services + .remove(&PeerSet::Validation) + .expect("validation protocol was enabled so `NotificationService` for it exists. qed"); + let collation_service = notification_services + .remove(&PeerSet::Collation) + .expect("collation protocol was enabled so `NotificationService` for it exists. qed"); + Self { network_service, authority_discovery_service, @@ -105,6 +115,9 @@ impl NetworkBridgeRx { shared, metrics, peerset_protocol_names, + validation_service, + collation_service, + network_notification_sinks, } } } @@ -115,371 +128,481 @@ where Net: Network + Sync, AD: validator_discovery::AuthorityDiscovery + Clone + Sync, { - fn start(mut self, ctx: Context) -> SpawnedSubsystem { - // The stream of networking events has to be created at initialization, otherwise the - // networking might open connections before the stream of events has been grabbed. - let network_stream = self.network_service.event_stream(); - + fn start(self, ctx: Context) -> SpawnedSubsystem { // Swallow error because failure is fatal to the node and we log with more precision // within `run_network`. - let future = run_network_in(self, ctx, network_stream) + let future = run_network_in(self, ctx) .map_err(|e| SubsystemError::with_origin("network-bridge", e)) .boxed(); SpawnedSubsystem { name: "network-bridge-rx-subsystem", future } } } -async fn handle_network_messages( - mut sender: impl overseer::NetworkBridgeRxSenderTrait, - mut network_service: impl Network, - network_stream: BoxStream<'static, NetworkEvent>, - mut authority_discovery_service: AD, - metrics: Metrics, - shared: Shared, - peerset_protocol_names: PeerSetProtocolNames, -) -> Result<(), Error> +/// Handle notification event received over the validation protocol. +async fn handle_validation_message( + event: NotificationEvent, + network_service: &mut impl Network, + sender: &mut impl overseer::NetworkBridgeRxSenderTrait, + authority_discovery_service: &mut AD, + metrics: &Metrics, + shared: &Shared, + peerset_protocol_names: &PeerSetProtocolNames, + notification_service: &mut Box, + network_notification_sinks: &mut Arc>>>, +) -> Result<(), ()> where AD: validator_discovery::AuthorityDiscovery + Send, { - let mut network_stream = network_stream.fuse(); - loop { - match network_stream.next().await { - None => return Err(Error::EventStreamConcluded), - Some(NetworkEvent::Dht(_)) => {}, - Some(NetworkEvent::NotificationStreamOpened { - remote: peer, - protocol, - role, - negotiated_fallback, - received_handshake: _, - }) => { - let role = ObservedRole::from(role); + match event { + NotificationEvent::ValidateInboundSubstream { result_tx, .. } => { + let _ = result_tx.send(ValidationResult::Accept); + }, + NotificationEvent::NotificationStreamOpened { + peer, + handshake, + negotiated_fallback, + .. + } => { + let role = match network_service.peer_role(peer, handshake) { + Some(role) => ObservedRole::from(role), + None => { + gum::debug!( + target: LOG_TARGET, + ?peer, + "Failed to determine peer role", + ); + return Ok(()) + }, + }; + + let (peer_set, version) = { let (peer_set, version) = { - let (peer_set, version) = - match peerset_protocol_names.try_get_protocol(&protocol) { - None => continue, - Some(p) => p, - }; - - if let Some(fallback) = negotiated_fallback { - match peerset_protocol_names.try_get_protocol(&fallback) { - None => { + let name = peerset_protocol_names.get_main_name(PeerSet::Validation); + peerset_protocol_names + .try_get_protocol(&name) + .expect("validation protocol exists since it's enabled. qed") + }; + + if let Some(fallback) = negotiated_fallback { + match peerset_protocol_names.try_get_protocol(&fallback) { + None => { + gum::debug!( + target: LOG_TARGET, + fallback = &*fallback, + ?peer, + ?peer_set, + "Unknown fallback", + ); + + return Ok(()) + }, + Some((p2, v2)) => { + if p2 != peer_set { gum::debug!( target: LOG_TARGET, fallback = &*fallback, - ?peer, - ?peer_set, - "Unknown fallback", + fallback_peerset = ?p2, + peerset = ?peer_set, + "Fallback mismatched peer-set", ); - continue - }, - Some((p2, v2)) => { - if p2 != peer_set { - gum::debug!( - target: LOG_TARGET, - fallback = &*fallback, - fallback_peerset = ?p2, - protocol = &*protocol, - peerset = ?peer_set, - "Fallback mismatched peer-set", - ); - - continue - } - - (p2, v2) - }, - } - } else { - (peer_set, version) - } - }; - - gum::debug!( - target: LOG_TARGET, - action = "PeerConnected", - peer_set = ?peer_set, - version = %version, - peer = ?peer, - role = ?role - ); - - let local_view = { - let mut shared = shared.0.lock(); - let peer_map = match peer_set { - PeerSet::Validation => &mut shared.validation_peers, - PeerSet::Collation => &mut shared.collation_peers, - }; + return Ok(()) + } - match peer_map.entry(peer) { - hash_map::Entry::Occupied(_) => continue, - hash_map::Entry::Vacant(vacant) => { - vacant.insert(PeerData { view: View::default(), version }); + (p2, v2) }, } - - metrics.on_peer_connected(peer_set, version); - metrics.note_peer_count(peer_set, version, peer_map.len()); - - shared.local_view.clone().unwrap_or(View::default()) - }; - - let maybe_authority = - authority_discovery_service.get_authority_ids_by_peer_id(peer).await; - - match peer_set { - PeerSet::Validation => { - dispatch_validation_events_to_all( - vec![ - NetworkBridgeEvent::PeerConnected( - peer, - role, - version, - maybe_authority, - ), - NetworkBridgeEvent::PeerViewChange(peer, View::default()), - ], - &mut sender, - ) - .await; - - send_message( - &mut network_service, - vec![peer], - PeerSet::Validation, - version, - &peerset_protocol_names, - WireMessage::::ViewUpdate(local_view), - &metrics, - ); - }, - PeerSet::Collation => { - dispatch_collation_events_to_all( - vec![ - NetworkBridgeEvent::PeerConnected( - peer, - role, - version, - maybe_authority, - ), - NetworkBridgeEvent::PeerViewChange(peer, View::default()), - ], - &mut sender, - ) - .await; - - send_message( - &mut network_service, - vec![peer], - PeerSet::Collation, - version, - &peerset_protocol_names, - WireMessage::::ViewUpdate(local_view), - &metrics, - ); + } else { + (peer_set, version) + } + }; + + // store the notification sink to `network_notification_sinks` so both `NetworkBridgeRx` + // and `NetworkBridgeTx` can send messages to the peer. + match notification_service.message_sink(&peer) { + Some(sink) => { + network_notification_sinks.lock().insert((peer_set, peer), sink); + }, + None => { + gum::warn!( + target: LOG_TARGET, + peer_set = ?peer_set, + version = %version, + peer = ?peer, + role = ?role, + "Message sink not available for peer", + ); + return Ok(()) + }, + } + + gum::debug!( + target: LOG_TARGET, + action = "PeerConnected", + peer_set = ?peer_set, + version = %version, + peer = ?peer, + role = ?role + ); + + let local_view = { + let mut shared = shared.0.lock(); + let peer_map = &mut shared.validation_peers; + + match peer_map.entry(peer) { + hash_map::Entry::Occupied(_) => return Ok(()), + hash_map::Entry::Vacant(vacant) => { + vacant.insert(PeerData { view: View::default(), version }); }, } - }, - Some(NetworkEvent::NotificationStreamClosed { remote: peer, protocol }) => { - let (peer_set, version) = match peerset_protocol_names.try_get_protocol(&protocol) { - None => continue, - Some(peer_set) => peer_set, - }; - gum::debug!( - target: LOG_TARGET, - action = "PeerDisconnected", - peer_set = ?peer_set, - peer = ?peer - ); + metrics.on_peer_connected(peer_set, version); + metrics.note_peer_count(peer_set, version, peer_map.len()); - let was_connected = { - let mut shared = shared.0.lock(); - let peer_map = match peer_set { - PeerSet::Validation => &mut shared.validation_peers, - PeerSet::Collation => &mut shared.collation_peers, - }; - - let w = peer_map.remove(&peer).is_some(); + shared.local_view.clone().unwrap_or(View::default()) + }; - metrics.on_peer_disconnected(peer_set, version); - metrics.note_peer_count(peer_set, version, peer_map.len()); + let maybe_authority = + authority_discovery_service.get_authority_ids_by_peer_id(peer).await; - w - }; + // TODO: store sink to networkservice? + dispatch_validation_events_to_all( + vec![ + NetworkBridgeEvent::PeerConnected(peer, role, version, maybe_authority), + NetworkBridgeEvent::PeerViewChange(peer, View::default()), + ], + sender, + ) + .await; - if was_connected && version == peer_set.get_main_version() { - match peer_set { - PeerSet::Validation => - dispatch_validation_event_to_all( - NetworkBridgeEvent::PeerDisconnected(peer), - &mut sender, - ) - .await, - PeerSet::Collation => - dispatch_collation_event_to_all( - NetworkBridgeEvent::PeerDisconnected(peer), - &mut sender, - ) - .await, - } - } - }, - Some(NetworkEvent::NotificationsReceived { remote, messages }) => { - let expected_versions = { - let mut versions = PerPeerSet::>::default(); - let shared = shared.0.lock(); - if let Some(peer_data) = shared.validation_peers.get(&remote) { - versions[PeerSet::Validation] = Some(peer_data.version); - } + send_message( + vec![peer], + PeerSet::Validation, + version, + WireMessage::::ViewUpdate(local_view), + metrics, + &network_notification_sinks, + ); + }, + NotificationEvent::NotificationStreamClosed { peer } => { + let (peer_set, version) = { + let name = peerset_protocol_names.get_main_name(PeerSet::Validation); + peerset_protocol_names + .try_get_protocol(&name) + .expect("validation protocol exists since it's enabled. qed") + }; + + gum::debug!( + target: LOG_TARGET, + action = "PeerDisconnected", + peer_set = ?peer_set, + peer = ?peer + ); + + let was_connected = { + let mut shared = shared.0.lock(); + let peer_map = &mut shared.validation_peers; + + let w = peer_map.remove(&peer).is_some(); + + metrics.on_peer_disconnected(peer_set, version); + metrics.note_peer_count(peer_set, version, peer_map.len()); + + w + }; + + network_notification_sinks.lock().remove(&(peer_set, peer)); + + if was_connected && version == peer_set.get_main_version() { + dispatch_validation_event_to_all(NetworkBridgeEvent::PeerDisconnected(peer), sender) + .await + } + }, + NotificationEvent::NotificationReceived { peer, notification } => { + if shared.0.lock().validation_peers.get(&peer).is_none() { + // TODO: does this check make any sense? + gum::debug!(target: LOG_TARGET, action = "ReportPeer"); + network_service.report_peer(peer, UNCONNECTED_PEERSET_COST.into()); + return Ok(()) + } + + gum::trace!(target: LOG_TARGET, action = "PeerMessage", ?peer); + + let (events, reports) = handle_v1_peer_messages::( + peer, + PeerSet::Validation, + &mut shared.0.lock().validation_peers, + vec![Bytes::from(notification)], + &metrics, + ); + + for report in reports { + network_service.report_peer(peer, report.into()); + } + + dispatch_validation_events_to_all(events, sender).await; + }, + } - if let Some(peer_data) = shared.collation_peers.get(&remote) { - versions[PeerSet::Collation] = Some(peer_data.version); - } + Ok(()) +} - versions +/// Handle notification event received over the collation protocol. +async fn handle_collation_message( + event: NotificationEvent, + network_service: &mut impl Network, + sender: &mut impl overseer::NetworkBridgeRxSenderTrait, + authority_discovery_service: &mut AD, + metrics: &Metrics, + shared: &Shared, + peerset_protocol_names: &PeerSetProtocolNames, + notification_service: &mut Box, + network_notification_sinks: &mut Arc>>>, +) -> Result<(), ()> +where + AD: validator_discovery::AuthorityDiscovery + Send, +{ + match event { + NotificationEvent::ValidateInboundSubstream { result_tx, .. } => { + let _ = result_tx.send(ValidationResult::Accept); + }, + NotificationEvent::NotificationStreamOpened { + peer, + handshake, + negotiated_fallback, + .. + } => { + let role = match network_service.peer_role(peer, handshake) { + Some(role) => ObservedRole::from(role), + None => { + gum::debug!( + target: LOG_TARGET, + ?peer, + "Failed to determine peer role", + ); + return Ok(()) + }, + }; + + let (peer_set, version) = { + let (peer_set, version) = { + let name = peerset_protocol_names.get_main_name(PeerSet::Collation); + peerset_protocol_names + .try_get_protocol(&name) + .expect("validation protocol exists since it's enabled. qed") }; - // non-decoded, but version-checked validation messages. - let v_messages: Result, _> = messages - .iter() - .filter_map(|(protocol, msg_bytes)| { - // version doesn't matter because we always receive on the 'correct' - // protocol name, not the negotiated fallback. - let (peer_set, _version) = - peerset_protocol_names.try_get_protocol(protocol)?; - if peer_set == PeerSet::Validation { - if expected_versions[PeerSet::Validation].is_none() { - return Some(Err(UNCONNECTED_PEERSET_COST)) - } - - Some(Ok(msg_bytes.clone())) - } else { - None - } - }) - .collect(); - - let v_messages = match v_messages { - Err(rep) => { - gum::debug!(target: LOG_TARGET, action = "ReportPeer"); - network_service.report_peer(remote, rep.into()); + if let Some(fallback) = negotiated_fallback { + match peerset_protocol_names.try_get_protocol(&fallback) { + None => { + gum::debug!( + target: LOG_TARGET, + fallback = &*fallback, + ?peer, + ?peer_set, + "Unknown fallback", + ); - continue - }, - Ok(v) => v, - }; + return Ok(()) + }, + Some((p2, v2)) => { + if p2 != peer_set { + gum::debug!( + target: LOG_TARGET, + fallback = &*fallback, + fallback_peerset = ?p2, + peerset = ?peer_set, + "Fallback mismatched peer-set", + ); - // non-decoded, but version-checked collation messages. - let c_messages: Result, _> = messages - .iter() - .filter_map(|(protocol, msg_bytes)| { - // version doesn't matter because we always receive on the 'correct' - // protocol name, not the negotiated fallback. - let (peer_set, _version) = - peerset_protocol_names.try_get_protocol(protocol)?; - - if peer_set == PeerSet::Collation { - if expected_versions[PeerSet::Collation].is_none() { - return Some(Err(UNCONNECTED_PEERSET_COST)) + return Ok(()) } - Some(Ok(msg_bytes.clone())) - } else { - None - } - }) - .collect(); - - let c_messages = match c_messages { - Err(rep) => { - gum::debug!(target: LOG_TARGET, action = "ReportPeer"); - network_service.report_peer(remote, rep.into()); - - continue + (p2, v2) + }, + } + } else { + (peer_set, version) + } + }; + + // store the notification sink to `network_notification_sinks` so both `NetworkBridgeRx` + // and `NetworkBridgeTx` can send messages to the peer. + match notification_service.message_sink(&peer) { + Some(sink) => { + network_notification_sinks.lock().insert((peer_set, peer), sink); + }, + None => { + gum::warn!( + target: LOG_TARGET, + peer_set = ?peer_set, + version = %version, + peer = ?peer, + role = ?role, + "Message sink not available for peer", + ); + return Ok(()) + }, + } + + gum::debug!( + target: LOG_TARGET, + action = "PeerConnected", + peer_set = ?peer_set, + version = %version, + peer = ?peer, + role = ?role + ); + + let local_view = { + let mut shared = shared.0.lock(); + let peer_map = &mut shared.collation_peers; + + match peer_map.entry(peer) { + hash_map::Entry::Occupied(_) => return Ok(()), + hash_map::Entry::Vacant(vacant) => { + vacant.insert(PeerData { view: View::default(), version }); }, - Ok(v) => v, - }; - - if v_messages.is_empty() && c_messages.is_empty() { - continue } - gum::trace!( - target: LOG_TARGET, - action = "PeerMessages", - peer = ?remote, - num_validation_messages = %v_messages.len(), - num_collation_messages = %c_messages.len() - ); + metrics.on_peer_connected(peer_set, version); + metrics.note_peer_count(peer_set, version, peer_map.len()); - if !v_messages.is_empty() { - let (events, reports) = - if expected_versions[PeerSet::Validation] == - Some(ValidationVersion::V1.into()) - { - handle_v1_peer_messages::( - remote, - PeerSet::Validation, - &mut shared.0.lock().validation_peers, - v_messages, - &metrics, - ) - } else { - gum::warn!( - target: LOG_TARGET, - version = ?expected_versions[PeerSet::Validation], - "Major logic bug. Peer somehow has unsupported validation protocol version." - ); - - never!("Only version 1 is supported; peer set connection checked above; qed"); - - // If a peer somehow triggers this, we'll disconnect them - // eventually. - (Vec::new(), vec![UNCONNECTED_PEERSET_COST]) - }; + shared.local_view.clone().unwrap_or(View::default()) + }; - for report in reports { - network_service.report_peer(remote, report.into()); - } - - dispatch_validation_events_to_all(events, &mut sender).await; - } + let maybe_authority = + authority_discovery_service.get_authority_ids_by_peer_id(peer).await; - if !c_messages.is_empty() { - let (events, reports) = - if expected_versions[PeerSet::Collation] == - Some(CollationVersion::V1.into()) - { - handle_v1_peer_messages::( - remote, - PeerSet::Collation, - &mut shared.0.lock().collation_peers, - c_messages, - &metrics, - ) - } else { - gum::warn!( - target: LOG_TARGET, - version = ?expected_versions[PeerSet::Collation], - "Major logic bug. Peer somehow has unsupported collation protocol version." - ); - - never!("Only version 1 is supported; peer set connection checked above; qed"); + dispatch_collation_events_to_all( + vec![ + NetworkBridgeEvent::PeerConnected(peer, role, version, maybe_authority), + NetworkBridgeEvent::PeerViewChange(peer, View::default()), + ], + sender, + ) + .await; - // If a peer somehow triggers this, we'll disconnect them - // eventually. - (Vec::new(), vec![UNCONNECTED_PEERSET_COST]) - }; + send_message( + vec![peer], + PeerSet::Collation, + version, + WireMessage::::ViewUpdate(local_view), + metrics, + &network_notification_sinks, + ); + }, + NotificationEvent::NotificationStreamClosed { peer } => { + let (peer_set, version) = { + let name = peerset_protocol_names.get_main_name(PeerSet::Collation); + peerset_protocol_names + .try_get_protocol(&name) + .expect("validation protocol exists since it's enabled. qed") + }; + + gum::debug!( + target: LOG_TARGET, + action = "PeerDisconnected", + peer_set = ?peer_set, + peer = ?peer + ); + + let was_connected = { + let mut shared = shared.0.lock(); + let peer_map = &mut shared.collation_peers; + + let w = peer_map.remove(&peer).is_some(); + + metrics.on_peer_disconnected(peer_set, version); + metrics.note_peer_count(peer_set, version, peer_map.len()); + + w + }; + + network_notification_sinks.lock().remove(&(peer_set, peer)); + + if was_connected && version == peer_set.get_main_version() { + dispatch_collation_event_to_all(NetworkBridgeEvent::PeerDisconnected(peer), sender) + .await + } + }, + NotificationEvent::NotificationReceived { peer, notification } => { + // TODO: does this check make any sense? + if shared.0.lock().collation_peers.get(&peer).is_none() { + gum::debug!(target: LOG_TARGET, action = "ReportPeer"); + network_service.report_peer(peer, UNCONNECTED_PEERSET_COST.into()); + return Ok(()) + } + + gum::trace!(target: LOG_TARGET, action = "PeerMessages", ?peer); + + let (events, reports) = handle_v1_peer_messages::( + peer, + PeerSet::Collation, + &mut shared.0.lock().collation_peers, + vec![Bytes::from(notification)], + &metrics, + ); + + for report in reports { + network_service.report_peer(peer, report.into()); + } + + dispatch_collation_events_to_all(events, sender).await; + }, + } - for report in reports { - network_service.report_peer(remote, report.into()); - } + Ok(()) +} - dispatch_collation_events_to_all(events, &mut sender).await; +async fn handle_network_messages( + mut sender: impl overseer::NetworkBridgeRxSenderTrait, + mut network_service: impl Network, + mut validation_service: Box, + mut collation_service: Box, + mut authority_discovery_service: AD, + metrics: Metrics, + shared: Shared, + peerset_protocol_names: PeerSetProtocolNames, + mut network_notification_sinks: Arc>>>, +) -> Result<(), Error> +where + AD: validator_discovery::AuthorityDiscovery + Send, +{ + loop { + futures::select! { + event = validation_service.next_event().fuse() => match event { + Some(event) => if let Err(_error) = handle_validation_message( + event, + &mut network_service, + &mut sender, + &mut authority_discovery_service, + &metrics, + &shared, + &peerset_protocol_names, + &mut validation_service, + &mut network_notification_sinks, + ).await { + // TODO: log error } + None => return Err(Error::EventStreamConcluded), }, + event = collation_service.next_event().fuse() => match event { + Some(event) => if let Err(_error) = handle_collation_message( + event, + &mut network_service, + &mut sender, + &mut authority_discovery_service, + &metrics, + &shared, + &peerset_protocol_names, + &mut collation_service, + &mut network_notification_sinks, + ).await { + // TODO: log error + } + None => return Err(Error::EventStreamConcluded), + } } } } @@ -505,17 +628,15 @@ where } #[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)] -async fn run_incoming_orchestra_signals( +async fn run_incoming_orchestra_signals( mut ctx: Context, - mut network_service: N, mut authority_discovery_service: AD, shared: Shared, sync_oracle: Box, metrics: Metrics, - peerset_protocol_names: PeerSetProtocolNames, + network_notification_sinks: Arc>>>, ) -> Result<(), Error> where - N: Network, AD: validator_discovery::AuthorityDiscovery + Clone, { // This is kept sorted, descending, by block number. @@ -607,13 +728,12 @@ where mode = Mode::Active; update_our_view( - &mut network_service, &mut ctx, &live_heads, &shared, finalized_number, &metrics, - &peerset_protocol_names, + &network_notification_sinks, ); } } @@ -638,16 +758,10 @@ where /// and `parking_lot` currently does not provide `Send`, which helps us enforce that. /// If this breaks, we need to find another way to protect ourselves. /// -/// ```compile_fail -/// #use parking_lot::MutexGuard; -/// #fn is_send(); -/// #is_send::(); -/// ``` #[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)] async fn run_network_in( bridge: NetworkBridgeRx, mut ctx: Context, - network_stream: BoxStream<'static, NetworkEvent>, ) -> Result<(), Error> where N: Network, @@ -660,16 +774,21 @@ where sync_oracle, shared, peerset_protocol_names, + validation_service, + collation_service, + network_notification_sinks, } = bridge; let (task, network_event_handler) = handle_network_messages( ctx.sender().clone(), network_service.clone(), - network_stream, + validation_service, + collation_service, authority_discovery_service.clone(), metrics.clone(), shared.clone(), peerset_protocol_names.clone(), + network_notification_sinks.clone(), ) .remote_handle(); @@ -678,12 +797,11 @@ where let orchestra_signal_handler = run_incoming_orchestra_signals( ctx, - network_service, authority_discovery_service, shared, sync_oracle, metrics, - peerset_protocol_names, + network_notification_sinks, ); futures::pin_mut!(orchestra_signal_handler); @@ -703,17 +821,14 @@ fn construct_view( } #[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)] -fn update_our_view( - net: &mut Net, +fn update_our_view( ctx: &mut Context, live_heads: &[ActivatedLeaf], shared: &Shared, finalized_number: BlockNumber, metrics: &Metrics, - peerset_protocol_names: &PeerSetProtocolNames, -) where - Net: Network, -{ + network_notification_sinks: &Arc>>>, +) { let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number); let (validation_peers, collation_peers) = { @@ -742,19 +857,17 @@ fn update_our_view( }; send_validation_message_v1( - net, validation_peers, - peerset_protocol_names, WireMessage::ViewUpdate(new_view.clone()), metrics, + network_notification_sinks, ); send_collation_message_v1( - net, collation_peers, - peerset_protocol_names, WireMessage::ViewUpdate(new_view), metrics, + network_notification_sinks, ); let our_view = OurView::new( @@ -827,38 +940,34 @@ fn handle_v1_peer_messages>( } fn send_validation_message_v1( - net: &mut impl Network, peers: Vec, - peerset_protocol_names: &PeerSetProtocolNames, message: WireMessage, metrics: &Metrics, + network_notification_sinks: &Arc>>>, ) { send_message( - net, peers, PeerSet::Validation, ValidationVersion::V1.into(), - peerset_protocol_names, message, metrics, + network_notification_sinks, ); } fn send_collation_message_v1( - net: &mut impl Network, peers: Vec, - peerset_protocol_names: &PeerSetProtocolNames, message: WireMessage, metrics: &Metrics, + network_notification_sinks: &Arc>>>, ) { send_message( - net, peers, PeerSet::Collation, CollationVersion::V1.into(), - peerset_protocol_names, message, metrics, + &network_notification_sinks, ); } diff --git a/node/network/bridge/src/rx/tests.rs b/node/network/bridge/src/rx/tests.rs index 078f6591ae2a..4690a336e097 100644 --- a/node/network/bridge/src/rx/tests.rs +++ b/node/network/bridge/src/rx/tests.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use super::*; -use futures::{channel::oneshot, executor, stream::BoxStream}; +use futures::{channel::oneshot, executor}; use polkadot_node_network_protocol::{self as net_protocol, OurView}; use polkadot_node_subsystem::{messages::NetworkBridgeEvent, ActivatedLeaf}; @@ -27,7 +27,11 @@ use std::{ sync::atomic::{AtomicBool, Ordering}, }; -use sc_network::{Event as NetworkEvent, IfDisconnected, ProtocolName, ReputationChange}; +use sc_network::{ + service::traits::{Direction, MessageSink, NotificationService}, + IfDisconnected, Multiaddr, ObservedRole as SubstrateObservedRole, ProtocolName, + ReputationChange, Roles, +}; use polkadot_node_network_protocol::{ peer_set::PeerSetProtocolNames, @@ -48,7 +52,6 @@ use polkadot_node_subsystem_test_helpers::{ use polkadot_node_subsystem_util::metered; use polkadot_primitives::{AuthorityDiscoveryId, Hash}; -use sc_network::Multiaddr; use sp_keyring::Sr25519Keyring; use crate::{network::Network, validator_discovery::AuthorityDiscovery}; @@ -63,10 +66,9 @@ pub enum NetworkAction { WriteNotification(PeerId, PeerSet, Vec), } -// The subsystem's view of the network - only supports a single call to `event_stream`. +// The subsystem's view of the network. #[derive(Clone)] struct TestNetwork { - net_events: Arc>>>, action_tx: Arc>>, protocol_names: Arc, } @@ -78,37 +80,42 @@ struct TestAuthorityDiscovery; // of `NetworkAction`s. struct TestNetworkHandle { action_rx: metered::UnboundedMeteredReceiver, - net_tx: SingleItemSink, - protocol_names: PeerSetProtocolNames, + validation_tx: SingleItemSink, + collation_tx: SingleItemSink, } fn new_test_network( protocol_names: PeerSetProtocolNames, -) -> (TestNetwork, TestNetworkHandle, TestAuthorityDiscovery) { - let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink(); +) -> ( + TestNetwork, + TestNetworkHandle, + TestAuthorityDiscovery, + Box, + Box, +) { let (action_tx, action_rx) = metered::unbounded(); + let (validation_tx, validation_rx) = polkadot_node_subsystem_test_helpers::single_item_sink(); + let (collation_tx, collation_rx) = polkadot_node_subsystem_test_helpers::single_item_sink(); + let action_tx = Arc::new(Mutex::new(action_tx)); ( TestNetwork { - net_events: Arc::new(Mutex::new(Some(net_rx))), - action_tx: Arc::new(Mutex::new(action_tx)), + action_tx: action_tx.clone(), protocol_names: Arc::new(protocol_names.clone()), }, - TestNetworkHandle { action_rx, net_tx, protocol_names }, + TestNetworkHandle { action_rx, validation_tx, collation_tx }, TestAuthorityDiscovery, + Box::new(TestNotificationService::new( + PeerSet::Validation, + action_tx.clone(), + validation_rx, + )), + Box::new(TestNotificationService::new(PeerSet::Collation, action_tx, collation_rx)), ) } #[async_trait] impl Network for TestNetwork { - fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { - self.net_events - .lock() - .take() - .expect("Subsystem made more than one call to `event_stream`") - .boxed() - } - async fn set_reserved_peers( &mut self, _protocol: ProtocolName, @@ -151,14 +158,10 @@ impl Network for TestNetwork { .unwrap(); } - fn write_notification(&self, who: PeerId, protocol: ProtocolName, message: Vec) { - let (peer_set, version) = self.protocol_names.try_get_protocol(&protocol).unwrap(); - assert_eq!(version, peer_set.get_main_version()); - - self.action_tx - .lock() - .unbounded_send(NetworkAction::WriteNotification(who, peer_set, message)) - .unwrap(); + fn peer_role(&self, _peer_id: PeerId, handshake: Vec) -> Option { + Roles::decode_all(&mut &handshake[..]) + .ok() + .and_then(|role| Some(SubstrateObservedRole::from(role))) } } @@ -179,6 +182,7 @@ impl validator_discovery::AuthorityDiscovery for TestAuthorityDiscovery { } } +// TODO: convert this to use `NotificationService` impl TestNetworkHandle { // Get the next network action. async fn next_network_action(&mut self) -> NetworkAction { @@ -196,34 +200,68 @@ impl TestNetworkHandle { } async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) { - self.send_network_event(NetworkEvent::NotificationStreamOpened { - remote: peer, - protocol: self.protocol_names.get_main_name(peer_set), - negotiated_fallback: None, - role: role.into(), - received_handshake: vec![], - }) - .await; + fn observed_role_to_handshake(role: &ObservedRole) -> Vec { + match role { + &ObservedRole::Light => Roles::LIGHT.encode(), + &ObservedRole::Authority => Roles::AUTHORITY.encode(), + &ObservedRole::Full => Roles::FULL.encode(), + } + } + + match peer_set { + PeerSet::Validation => { + self.validation_tx + .send(NotificationEvent::NotificationStreamOpened { + peer, + direction: Direction::Inbound, + handshake: observed_role_to_handshake(&role), + negotiated_fallback: None, + }) + .await + .expect("subsystem concluded early"); + }, + PeerSet::Collation => { + self.collation_tx + .send(NotificationEvent::NotificationStreamOpened { + peer, + direction: Direction::Inbound, + handshake: observed_role_to_handshake(&role), + negotiated_fallback: None, + }) + .await + .expect("subsystem concluded early"); + }, + } } async fn disconnect_peer(&mut self, peer: PeerId, peer_set: PeerSet) { - self.send_network_event(NetworkEvent::NotificationStreamClosed { - remote: peer, - protocol: self.protocol_names.get_main_name(peer_set), - }) - .await; + match peer_set { + PeerSet::Validation => self + .validation_tx + .send(NotificationEvent::NotificationStreamClosed { peer }) + .await + .expect("subsystem concluded early"), + PeerSet::Collation => self + .collation_tx + .send(NotificationEvent::NotificationStreamClosed { peer }) + .await + .expect("subsystem concluded early"), + } } async fn peer_message(&mut self, peer: PeerId, peer_set: PeerSet, message: Vec) { - self.send_network_event(NetworkEvent::NotificationsReceived { - remote: peer, - messages: vec![(self.protocol_names.get_main_name(peer_set), message.into())], - }) - .await; - } - - async fn send_network_event(&mut self, event: NetworkEvent) { - self.net_tx.send(event).await.expect("subsystem concluded early"); + match peer_set { + PeerSet::Validation => self + .validation_tx + .send(NotificationEvent::NotificationReceived { peer, notification: message }) + .await + .expect("subsystem concluded early"), + PeerSet::Collation => self + .collation_tx + .send(NotificationEvent::NotificationReceived { peer, notification: message }) + .await + .expect("subsystem concluded early"), + } } } @@ -234,6 +272,119 @@ fn assert_network_actions_contains(actions: &[NetworkAction], action: &NetworkAc } } +struct TestNotificationService { + peer_set: PeerSet, + action_tx: Arc>>, + rx: SingleItemStream, +} + +impl std::fmt::Debug for TestNotificationService { + fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Ok(()) + } +} + +impl TestNotificationService { + pub fn new( + peer_set: PeerSet, + action_tx: Arc>>, + rx: SingleItemStream, + ) -> Self { + Self { peer_set, action_tx, rx } + } +} + +struct TestMessageSink { + peer: PeerId, + peer_set: PeerSet, + action_tx: Arc>>, +} + +impl TestMessageSink { + fn new( + peer: PeerId, + peer_set: PeerSet, + action_tx: Arc>>, + ) -> TestMessageSink { + Self { peer, peer_set, action_tx } + } +} + +#[async_trait::async_trait] +impl MessageSink for TestMessageSink { + fn send_sync_notification(&self, notification: Vec) { + self.action_tx + .lock() + .unbounded_send(NetworkAction::WriteNotification( + self.peer, + self.peer_set, + notification, + )) + .unwrap(); + } + + async fn send_async_notification( + &self, + _notification: Vec, + ) -> Result<(), sc_network::error::Error> { + unimplemented!(); + } +} + +// TODO: how to implement this? +#[async_trait::async_trait] +impl NotificationService for TestNotificationService { + /// Instruct `Notifications` to open a new substream for `peer`. + async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> { + unimplemented!(); + } + + /// Instruct `Notifications` to close substream for `peer`. + async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> { + unimplemented!(); + } + + /// Send synchronous `notification` to `peer`. + fn send_sync_notification(&self, _peer: &PeerId, _notification: Vec) { + unimplemented!(); + } + + /// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure. + async fn send_async_notification( + &self, + _peer: &PeerId, + _notification: Vec, + ) -> Result<(), sc_network::error::Error> { + unimplemented!(); + } + + /// Set handshake for the notification protocol replacing the old handshake. + async fn set_handshake(&mut self, _handshake: Vec) -> Result<(), ()> { + unimplemented!(); + } + + /// Get next event from the `Notifications` event stream. + async fn next_event(&mut self) -> Option { + self.rx.next().await + } + + // Clone [`NotificationService`] + fn clone(&mut self) -> Result, ()> { + unimplemented!(); + } + + /// Get protocol name. + fn protocol(&self) -> &ProtocolName { + unimplemented!(); + } + + /// Get notification sink of the peer. + fn message_sink(&self, peer: &PeerId) -> Option> { + // TODO: verify that peer exists? + Some(Box::new(TestMessageSink::new(*peer, self.peer_set, self.action_tx.clone()))) + } +} + #[derive(Clone)] struct TestSyncOracle { is_major_syncing: Arc, @@ -329,10 +480,11 @@ fn test_harness>( let peerset_protocol_names = PeerSetProtocolNames::new(genesis_hash, fork_id); let pool = sp_core::testing::TaskExecutor::new(); - let (mut network, network_handle, discovery) = new_test_network(peerset_protocol_names.clone()); + let (network, network_handle, discovery, validation_service, collation_service) = + new_test_network(peerset_protocol_names.clone()); let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); - let network_stream = network.event_stream(); + let network_notification_sinks = Arc::new(Mutex::new(HashMap::new())); let shared = Shared::default(); let bridge = NetworkBridgeRx { @@ -342,9 +494,12 @@ fn test_harness>( sync_oracle, shared: shared.clone(), peerset_protocol_names, + validation_service, + collation_service, + network_notification_sinks, }; - let network_bridge = run_network_in(bridge, context, network_stream) + let network_bridge = run_network_in(bridge, context) .map_err(|_| panic!("subsystem execution failed")) .map(|_| ()); @@ -632,6 +787,7 @@ fn do_not_send_view_update_until_synced() { } #[test] +// #[ignore] fn do_not_send_view_update_when_only_finalized_block_changed() { test_harness(done_syncing_oracle(), |test_harness| async move { let TestHarness { mut network_handle, mut virtual_overseer, shared } = test_harness; @@ -914,6 +1070,7 @@ fn peer_disconnect_from_just_one_peerset() { } #[test] +// #[ignore] fn relays_collation_protocol_messages() { test_harness(done_syncing_oracle(), |test_harness| async move { let TestHarness { mut network_handle, mut virtual_overseer, shared } = test_harness; @@ -1109,6 +1266,7 @@ fn different_views_on_different_peer_sets() { } #[test] +// #[ignore] fn sent_views_include_finalized_number_update() { test_harness(done_syncing_oracle(), |test_harness| async move { let TestHarness { mut network_handle, mut virtual_overseer, shared } = test_harness; diff --git a/node/network/bridge/src/tx/mod.rs b/node/network/bridge/src/tx/mod.rs index 2b54f6f0f06d..988293fb3256 100644 --- a/node/network/bridge/src/tx/mod.rs +++ b/node/network/bridge/src/tx/mod.rs @@ -33,7 +33,7 @@ use polkadot_node_subsystem::{ /// /// To be passed to [`FullNetworkConfiguration::add_notification_protocol`](). pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority}; -use sc_network::ReputationChange; +use sc_network::{service::traits::MessageSink, ReputationChange}; use crate::validator_discovery; @@ -44,6 +44,9 @@ use crate::network::{send_message, Network}; use crate::metrics::Metrics; +use parking_lot::Mutex; +use std::sync::Arc; + #[cfg(test)] mod tests; @@ -58,6 +61,7 @@ pub struct NetworkBridgeTx { metrics: Metrics, req_protocol_names: ReqProtocolNames, peerset_protocol_names: PeerSetProtocolNames, + network_notification_sinks: Arc>>>, } impl NetworkBridgeTx { @@ -71,6 +75,7 @@ impl NetworkBridgeTx { metrics: Metrics, req_protocol_names: ReqProtocolNames, peerset_protocol_names: PeerSetProtocolNames, + network_notification_sinks: Arc>>>, ) -> Self { Self { network_service, @@ -78,6 +83,7 @@ impl NetworkBridgeTx { metrics, req_protocol_names, peerset_protocol_names, + network_notification_sinks, } } } @@ -104,6 +110,7 @@ async fn handle_subsystem_messages( metrics: Metrics, req_protocol_names: ReqProtocolNames, peerset_protocol_names: PeerSetProtocolNames, + network_notification_sinks: Arc>>>, ) -> Result<(), Error> where N: Network, @@ -127,6 +134,7 @@ where &metrics, &req_protocol_names, &peerset_protocol_names, + &network_notification_sinks, ) .await; }, @@ -137,13 +145,14 @@ where #[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)] async fn handle_incoming_subsystem_communication( _ctx: &mut Context, - mut network_service: N, + network_service: N, validator_discovery: &mut validator_discovery::Service, mut authority_discovery_service: AD, msg: NetworkBridgeTxMessage, metrics: &Metrics, req_protocol_names: &ReqProtocolNames, peerset_protocol_names: &PeerSetProtocolNames, + network_notification_sinks: &Arc>>>, ) -> (N, AD) where N: Network, @@ -190,11 +199,10 @@ where match msg { Versioned::V1(msg) => send_validation_message_v1( - &mut network_service, peers, - peerset_protocol_names, WireMessage::ProtocolMessage(msg), &metrics, + network_notification_sinks, ), } }, @@ -208,11 +216,10 @@ where for (peers, msg) in msgs { match msg { Versioned::V1(msg) => send_validation_message_v1( - &mut network_service, peers, - peerset_protocol_names, WireMessage::ProtocolMessage(msg), &metrics, + network_notification_sinks, ), } } @@ -226,11 +233,10 @@ where match msg { Versioned::V1(msg) => send_collation_message_v1( - &mut network_service, peers, - peerset_protocol_names, WireMessage::ProtocolMessage(msg), &metrics, + network_notification_sinks, ), } }, @@ -244,11 +250,10 @@ where for (peers, msg) in msgs { match msg { Versioned::V1(msg) => send_collation_message_v1( - &mut network_service, peers, - peerset_protocol_names, WireMessage::ProtocolMessage(msg), &metrics, + network_notification_sinks, ), } } @@ -330,6 +335,7 @@ where metrics, req_protocol_names, peerset_protocol_names, + network_notification_sinks, } = bridge; handle_subsystem_messages( @@ -339,6 +345,7 @@ where metrics, req_protocol_names, peerset_protocol_names, + network_notification_sinks, ) .await?; @@ -346,37 +353,33 @@ where } fn send_validation_message_v1( - net: &mut impl Network, peers: Vec, - protocol_names: &PeerSetProtocolNames, message: WireMessage, metrics: &Metrics, + network_notification_sinks: &Arc>>>, ) { send_message( - net, peers, PeerSet::Validation, ValidationVersion::V1.into(), - protocol_names, message, metrics, + network_notification_sinks, ); } fn send_collation_message_v1( - net: &mut impl Network, peers: Vec, - protocol_names: &PeerSetProtocolNames, message: WireMessage, metrics: &Metrics, + network_notification_sinks: &Arc>>>, ) { send_message( - net, peers, PeerSet::Collation, CollationVersion::V1.into(), - protocol_names, message, metrics, + network_notification_sinks, ); } diff --git a/node/network/bridge/src/tx/tests.rs b/node/network/bridge/src/tx/tests.rs index 520218d3c481..875a4277df2b 100644 --- a/node/network/bridge/src/tx/tests.rs +++ b/node/network/bridge/src/tx/tests.rs @@ -15,15 +15,18 @@ // along with Polkadot. If not, see . use super::*; -use futures::{executor, stream::BoxStream}; +use futures::executor; use polkadot_node_subsystem_util::TimeoutExt; use async_trait::async_trait; use parking_lot::Mutex; use std::collections::HashSet; -use sc_network::{Event as NetworkEvent, IfDisconnected, ProtocolName, ReputationChange}; +use sc_network::{ + IfDisconnected, ObservedRole as SubstrateObservedRole, ProtocolName, ReputationChange, Roles, +}; +use parity_scale_codec::DecodeAll; use polkadot_node_network_protocol::{ peer_set::PeerSetProtocolNames, request_response::{outgoing::Requests, ReqProtocolNames}, @@ -51,10 +54,9 @@ pub enum NetworkAction { WriteNotification(PeerId, PeerSet, Vec), } -// The subsystem's view of the network - only supports a single call to `event_stream`. +// The subsystem's view of the network. #[derive(Clone)] struct TestNetwork { - net_events: Arc>>>, action_tx: Arc>>, peerset_protocol_names: Arc, } @@ -66,37 +68,78 @@ struct TestAuthorityDiscovery; // of `NetworkAction`s. struct TestNetworkHandle { action_rx: metered::UnboundedMeteredReceiver, - net_tx: metered::MeteredSender, - peerset_protocol_names: PeerSetProtocolNames, + _peerset_protocol_names: PeerSetProtocolNames, + notification_sinks: Arc>>>, + action_tx: Arc>>, +} + +struct TestMessageSink { + peer: PeerId, + peer_set: PeerSet, + action_tx: Arc>>, +} + +impl TestMessageSink { + fn new( + peer: PeerId, + peer_set: PeerSet, + action_tx: Arc>>, + ) -> TestMessageSink { + Self { peer, peer_set, action_tx } + } +} + +#[async_trait::async_trait] +impl MessageSink for TestMessageSink { + fn send_sync_notification(&self, notification: Vec) { + self.action_tx + .lock() + .unbounded_send(NetworkAction::WriteNotification( + self.peer, + self.peer_set, + notification, + )) + .unwrap(); + } + + async fn send_async_notification( + &self, + _notification: Vec, + ) -> Result<(), sc_network::error::Error> { + unimplemented!(); + } } fn new_test_network( peerset_protocol_names: PeerSetProtocolNames, -) -> (TestNetwork, TestNetworkHandle, TestAuthorityDiscovery) { - let (net_tx, net_rx) = metered::channel(10); +) -> ( + TestNetwork, + TestNetworkHandle, + TestAuthorityDiscovery, + Arc>>>, +) { let (action_tx, action_rx) = metered::unbounded(); + let notification_sinks = Arc::new(Mutex::new(HashMap::new())); + let action_tx = Arc::new(Mutex::new(action_tx)); ( TestNetwork { - net_events: Arc::new(Mutex::new(Some(net_rx))), - action_tx: Arc::new(Mutex::new(action_tx)), + action_tx: action_tx.clone(), peerset_protocol_names: Arc::new(peerset_protocol_names.clone()), }, - TestNetworkHandle { action_rx, net_tx, peerset_protocol_names }, + TestNetworkHandle { + action_rx, + _peerset_protocol_names: peerset_protocol_names, + action_tx, + notification_sinks: notification_sinks.clone(), + }, TestAuthorityDiscovery, + notification_sinks, ) } #[async_trait] impl Network for TestNetwork { - fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { - self.net_events - .lock() - .take() - .expect("Subsystem made more than one call to `event_stream`") - .boxed() - } - async fn set_reserved_peers( &mut self, _protocol: ProtocolName, @@ -139,14 +182,10 @@ impl Network for TestNetwork { .unwrap(); } - fn write_notification(&self, who: PeerId, protocol: ProtocolName, message: Vec) { - let (peer_set, version) = self.peerset_protocol_names.try_get_protocol(&protocol).unwrap(); - assert_eq!(version, peer_set.get_main_version()); - - self.action_tx - .lock() - .unbounded_send(NetworkAction::WriteNotification(who, peer_set, message)) - .unwrap(); + fn peer_role(&self, _peer_id: PeerId, handshake: Vec) -> Option { + Roles::decode_all(&mut &handshake[..]) + .ok() + .and_then(|role| Some(SubstrateObservedRole::from(role))) } } @@ -173,19 +212,11 @@ impl TestNetworkHandle { self.action_rx.next().await.expect("subsystem concluded early") } - async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) { - self.send_network_event(NetworkEvent::NotificationStreamOpened { - remote: peer, - protocol: self.peerset_protocol_names.get_main_name(peer_set), - negotiated_fallback: None, - role: role.into(), - received_handshake: vec![], - }) - .await; - } - - async fn send_network_event(&mut self, event: NetworkEvent) { - self.net_tx.send(event).await.expect("subsystem concluded early"); + async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, _role: ObservedRole) { + self.notification_sinks.lock().insert( + (peer_set, peer), + Box::new(TestMessageSink::new(peer, peer_set, self.action_tx.clone())), + ); } } @@ -203,7 +234,8 @@ fn test_harness>(test: impl FnOnce(TestHarne let peerset_protocol_names = PeerSetProtocolNames::new(genesis_hash, fork_id); let pool = sp_core::testing::TaskExecutor::new(); - let (network, network_handle, discovery) = new_test_network(peerset_protocol_names.clone()); + let (network, network_handle, discovery, network_notification_sinks) = + new_test_network(peerset_protocol_names.clone()); let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); @@ -214,6 +246,7 @@ fn test_harness>(test: impl FnOnce(TestHarne Metrics(None), req_protocol_names, peerset_protocol_names, + network_notification_sinks, ); let network_bridge_out_fut = run_network_out(bridge_out, context) diff --git a/node/network/bridge/src/validator_discovery.rs b/node/network/bridge/src/validator_discovery.rs index 098416c5b88d..a4dfcf0081b5 100644 --- a/node/network/bridge/src/validator_discovery.rs +++ b/node/network/bridge/src/validator_discovery.rs @@ -168,13 +168,13 @@ mod tests { use crate::network::Network; use async_trait::async_trait; - use futures::stream::BoxStream; + use parity_scale_codec::DecodeAll; use polkadot_node_network_protocol::{ request_response::{outgoing::Requests, ReqProtocolNames}, PeerId, }; use polkadot_primitives::Hash; - use sc_network::{Event as NetworkEvent, IfDisconnected, ProtocolName, ReputationChange}; + use sc_network::{IfDisconnected, ObservedRole, ProtocolName, ReputationChange, Roles}; use sp_keyring::Sr25519Keyring; use std::collections::{HashMap, HashSet}; @@ -223,10 +223,6 @@ mod tests { #[async_trait] impl Network for TestNetwork { - fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { - panic!() - } - async fn set_reserved_peers( &mut self, _protocol: ProtocolName, @@ -262,8 +258,10 @@ mod tests { panic!() } - fn write_notification(&self, _: PeerId, _: ProtocolName, _: Vec) { - panic!() + fn peer_role(&self, _peer_id: PeerId, handshake: Vec) -> Option { + Roles::decode_all(&mut &handshake[..]) + .ok() + .and_then(|role| Some(ObservedRole::from(role))) } } diff --git a/node/network/protocol/src/peer_set.rs b/node/network/protocol/src/peer_set.rs index ce47ac30811a..a037ba2559b3 100644 --- a/node/network/protocol/src/peer_set.rs +++ b/node/network/protocol/src/peer_set.rs @@ -20,6 +20,7 @@ use derive_more::Display; use polkadot_primitives::Hash; use sc_network::{ config::{NonDefaultSetConfig, SetConfig}, + service::traits::NotificationService, types::ProtocolName, }; use std::{ @@ -68,7 +69,7 @@ impl PeerSet { self, is_authority: IsAuthority, peerset_protocol_names: &PeerSetProtocolNames, - ) -> NonDefaultSetConfig { + ) -> (NonDefaultSetConfig, (PeerSet, Box)) { // Networking layer relies on `get_main_name()` being the main name of the protocol // for peersets and connection management. let protocol = peerset_protocol_names.get_main_name(self); @@ -76,38 +77,46 @@ impl PeerSet { let max_notification_size = self.get_max_notification_size(is_authority); match self { - PeerSet::Validation => NonDefaultSetConfig { - notifications_protocol: protocol, - fallback_names, - max_notification_size, - handshake: None, - set_config: SetConfig { - // we allow full nodes to connect to validators for gossip - // to ensure any `MIN_GOSSIP_PEERS` always include reserved peers - // we limit the amount of non-reserved slots to be less - // than `MIN_GOSSIP_PEERS` in total - in_peers: super::MIN_GOSSIP_PEERS as u32 / 2 - 1, - out_peers: super::MIN_GOSSIP_PEERS as u32 / 2 - 1, - reserved_nodes: Vec::new(), - non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept, - }, + PeerSet::Validation => { + let (config, notification_service) = NonDefaultSetConfig::new( + protocol, + fallback_names, + max_notification_size, + None, + SetConfig { + // we allow full nodes to connect to validators for gossip + // to ensure any `MIN_GOSSIP_PEERS` always include reserved peers + // we limit the amount of non-reserved slots to be less + // than `MIN_GOSSIP_PEERS` in total + in_peers: super::MIN_GOSSIP_PEERS as u32 / 2 - 1, + out_peers: super::MIN_GOSSIP_PEERS as u32 / 2 - 1, + reserved_nodes: Vec::new(), + non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept, + }, + ); + + (config, (PeerSet::Validation, notification_service)) }, - PeerSet::Collation => NonDefaultSetConfig { - notifications_protocol: protocol, - fallback_names, - max_notification_size, - handshake: None, - set_config: SetConfig { - // Non-authority nodes don't need to accept incoming connections on this peer set: - in_peers: if is_authority == IsAuthority::Yes { 100 } else { 0 }, - out_peers: 0, - reserved_nodes: Vec::new(), - non_reserved_mode: if is_authority == IsAuthority::Yes { - sc_network::config::NonReservedPeerMode::Accept - } else { - sc_network::config::NonReservedPeerMode::Deny + PeerSet::Collation => { + let (config, notification_service) = NonDefaultSetConfig::new( + protocol, + fallback_names, + max_notification_size, + None, + SetConfig { + // Non-authority nodes don't need to accept incoming connections on this peer set: + in_peers: if is_authority == IsAuthority::Yes { 100 } else { 0 }, + out_peers: 0, + reserved_nodes: Vec::new(), + non_reserved_mode: if is_authority == IsAuthority::Yes { + sc_network::config::NonReservedPeerMode::Accept + } else { + sc_network::config::NonReservedPeerMode::Deny + }, }, - }, + ); + + (config, (PeerSet::Collation, notification_service)) }, } } @@ -190,7 +199,7 @@ impl IndexMut for PerPeerSet { pub fn peer_sets_info( is_authority: IsAuthority, peerset_protocol_names: &PeerSetProtocolNames, -) -> Vec { +) -> Vec<(NonDefaultSetConfig, (PeerSet, Box))> { PeerSet::iter() .map(|s| s.get_info(is_authority, &peerset_protocol_names)) .collect() diff --git a/node/service/Cargo.toml b/node/service/Cargo.toml index e373dd4f0011..19f790c0e3e9 100644 --- a/node/service/Cargo.toml +++ b/node/service/Cargo.toml @@ -84,6 +84,7 @@ thiserror = "1.0.31" kvdb = "0.13.0" kvdb-rocksdb = { version = "0.19.0", optional = true } parity-db = { version = "0.4.8", optional = true } +parking_lot = "0.12.0" codec = { package = "parity-scale-codec", version = "3.6.1" } async-trait = "0.1.57" diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 457b5488ea14..2ac0d27dd499 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -51,7 +51,8 @@ use { }, polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig, polkadot_node_network_protocol::{ - peer_set::PeerSetProtocolNames, request_response::ReqProtocolNames, + peer_set::{PeerSet, PeerSetProtocolNames}, + request_response::ReqProtocolNames, }, sc_client_api::BlockBackend, sc_transaction_pool_api::OffchainTransactionPoolFactory, @@ -75,7 +76,7 @@ pub use { #[cfg(feature = "full-node")] use polkadot_node_subsystem::jaeger; -use std::{path::PathBuf, sync::Arc, time::Duration}; +use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; use prometheus_endpoint::Registry; #[cfg(feature = "full-node")] @@ -806,9 +807,9 @@ pub fn new_full( // anything in terms of behaviour, but makes the logs more consistent with the other // Substrate nodes. let grandpa_protocol_name = grandpa::protocol_standard_name(&genesis_hash, &config.chain_spec); - net_config.add_notification_protocol(grandpa::grandpa_peers_set_config( - grandpa_protocol_name.clone(), - )); + let (grandpa_protocol_config, grandpa_notification_service) = + grandpa::grandpa_peers_set_config(grandpa_protocol_name.clone()); + net_config.add_notification_protocol(grandpa_protocol_config); let beefy_gossip_proto_name = beefy::gossip_protocol_name(&genesis_hash, config.chain_spec.fork_id()); @@ -821,23 +822,33 @@ pub fn new_full( client.clone(), prometheus_registry.clone(), ); - if enable_beefy { - net_config.add_notification_protocol(beefy::communication::beefy_peers_set_config( - beefy_gossip_proto_name.clone(), - )); - net_config.add_request_response_protocol(beefy_req_resp_cfg); - } + + let beefy_notification_service = match enable_beefy { + false => None, + true => { + let (beefy_protocol_config, beefy_notification_service) = + beefy::communication::beefy_peers_set_config(beefy_gossip_proto_name.clone()); + net_config.add_notification_protocol(beefy_protocol_config); + net_config.add_request_response_protocol(beefy_req_resp_cfg); + Some(beefy_notification_service) + }, + }; let peerset_protocol_names = PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id()); - { + let notification_services = { use polkadot_network_bridge::{peer_sets_info, IsAuthority}; let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No }; - for config in peer_sets_info(is_authority, &peerset_protocol_names) { - net_config.add_notification_protocol(config); - } - } + peer_sets_info(is_authority, &peerset_protocol_names) + .into_iter() + .map(|(config, (peerset, service))| { + net_config.add_notification_protocol(config); + (peerset, service) + }) + .collect::>>( + ) + }; let req_protocol_names = ReqProtocolNames::new(&genesis_hash, config.chain_spec.fork_id()); @@ -1051,6 +1062,7 @@ pub fn new_full( offchain_transaction_pool_factory: OffchainTransactionPoolFactory::new( transaction_pool.clone(), ), + notification_services, }, ) .map_err(|e| { @@ -1159,6 +1171,8 @@ pub fn new_full( sync: sync_service.clone(), gossip_protocol_name: beefy_gossip_proto_name, justifications_protocol_name, + notification_service: beefy_notification_service + .expect("beefy is enabled so `NotificationService` exists. qed"), _phantom: core::marker::PhantomData::, }; let payload_provider = beefy_primitives::mmr::MmrRootProvider::new(client.clone()); @@ -1255,6 +1269,7 @@ pub fn new_full( voting_rule, prometheus_registry: prometheus_registry.clone(), shared_voter_state, + notification_service: grandpa_notification_service, telemetry: telemetry.as_ref().map(|x| x.handle()), offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool.clone()), }; diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs index 29122ddca162..fb67b817010a 100644 --- a/node/service/src/overseer.rs +++ b/node/service/src/overseer.rs @@ -20,6 +20,8 @@ use sc_transaction_pool_api::OffchainTransactionPoolFactory; use sp_core::traits::SpawnNamed; use lru::LruCache; +use parking_lot::Mutex; + use polkadot_availability_distribution::IncomingRequestReceivers; use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig; use polkadot_node_core_av_store::Config as AvailabilityConfig; @@ -27,7 +29,7 @@ use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig use polkadot_node_core_chain_selection::Config as ChainSelectionConfig; use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig; use polkadot_node_network_protocol::{ - peer_set::PeerSetProtocolNames, + peer_set::{PeerSet, PeerSetProtocolNames}, request_response::{v1 as request_v1, IncomingRequestReceiver, ReqProtocolNames}, }; #[cfg(any(feature = "malus", test))] @@ -44,11 +46,11 @@ use polkadot_primitives::runtime_api::ParachainHost; use sc_authority_discovery::Service as AuthorityDiscoveryService; use sc_client_api::AuxStore; use sc_keystore::LocalKeystore; -use sc_network::NetworkStateInfo; +use sc_network::{NetworkStateInfo, NotificationService}; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_consensus_babe::BabeApi; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; pub use polkadot_approval_distribution::ApprovalDistribution as ApprovalDistributionSubsystem; pub use polkadot_availability_bitfield_distribution::BitfieldDistribution as BitfieldDistributionSubsystem; @@ -129,6 +131,8 @@ where pub peerset_protocol_names: PeerSetProtocolNames, /// The offchain transaction pool factory. pub offchain_transaction_pool_factory: OffchainTransactionPoolFactory, + /// Notification services. + pub notification_services: HashMap>, } /// Obtain a prepared `OverseerBuilder`, that is initialized @@ -160,6 +164,7 @@ pub fn prepared_overseer_builder( req_protocol_names, peerset_protocol_names, offchain_transaction_pool_factory, + notification_services, }: OverseerGenArgs, ) -> Result< InitializedOverseerBuilder< @@ -208,6 +213,7 @@ where let spawner = SpawnGlue(spawner); let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?; + let network_notification_sinks = Arc::new(Mutex::new(HashMap::new())); let runtime_api_client = Arc::new(DefaultSubsystemClient::new( runtime_client.clone(), @@ -221,6 +227,7 @@ where network_bridge_metrics.clone(), req_protocol_names, peerset_protocol_names.clone(), + network_notification_sinks.clone(), )) .network_bridge_rx(NetworkBridgeRxSubsystem::new( network_service.clone(), @@ -228,6 +235,8 @@ where Box::new(sync_service.clone()), network_bridge_metrics, peerset_protocol_names, + notification_services, + network_notification_sinks.clone(), )) .availability_distribution(AvailabilityDistributionSubsystem::new( keystore.clone(),