diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 0f8ddc53c1b..51b3770f9a8 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -75,7 +75,7 @@ const DURATION_DIFFERENCE: Duration = Duration::from_millis(1); /// of the peer if it is specified. #[derive(Debug)] pub struct DiscoveredPeers { - pub peers: HashMap>, + pub peers: HashMap>, } #[derive(Clone, PartialEq)] @@ -787,7 +787,7 @@ impl Discovery { fn process_completed_queries( &mut self, query: QueryResult, - ) -> Option>> { + ) -> Option>> { match query.query_type { QueryType::FindPeers => { self.find_peer_active = false; @@ -798,10 +798,10 @@ impl Discovery { Ok(r) => { debug!(self.log, "Discovery query completed"; "peers_found" => r.len()); let mut results: HashMap<_, Option> = HashMap::new(); - r.iter().for_each(|enr| { + r.into_iter().for_each(|enr| { // cache the found ENR's self.cached_enrs.put(enr.peer_id(), enr.clone()); - results.insert(enr.peer_id(), None); + results.insert(enr, None); }); return Some(results); } @@ -850,17 +850,17 @@ impl Discovery { let subnet_predicate = subnet_predicate::(vec![query.subnet], &self.log); - r.iter() + r.clone() + .into_iter() .filter(|enr| subnet_predicate(enr)) - .map(|enr| enr.peer_id()) - .for_each(|peer_id| { + .for_each(|enr| { if let Some(v) = metrics::get_int_counter( &metrics::SUBNET_PEERS_FOUND, &[query_str], ) { v.inc(); } - let other_min_ttl = mapped_results.get_mut(&peer_id); + let other_min_ttl = mapped_results.get_mut(&enr); // map peer IDs to the min_ttl furthest in the future match (query.min_ttl, other_min_ttl) { @@ -878,15 +878,11 @@ impl Discovery { } // update the mapping if we have a specified min_ttl (Some(min_ttl), Some(None)) => { - mapped_results.insert(peer_id, Some(min_ttl)); + mapped_results.insert(enr, Some(min_ttl)); } // first seen min_ttl for this enr - (Some(min_ttl), None) => { - mapped_results.insert(peer_id, Some(min_ttl)); - } - // first seen min_ttl for this enr - (None, None) => { - mapped_results.insert(peer_id, None); + (min_ttl, None) => { + mapped_results.insert(enr, min_ttl); } (None, Some(Some(_))) => {} // Don't replace the existing specific min_ttl (None, Some(None)) => {} // No-op because this is a duplicate @@ -910,7 +906,7 @@ impl Discovery { } /// Drives the queries returning any results from completed queries. - fn poll_queries(&mut self, cx: &mut Context) -> Option>> { + fn poll_queries(&mut self, cx: &mut Context) -> Option>> { while let Poll::Ready(Some(query_result)) = self.active_queries.poll_next_unpin(cx) { let result = self.process_completed_queries(query_result); if result.is_some() { @@ -957,23 +953,6 @@ impl NetworkBehaviour for Discovery { ) { } - fn handle_pending_outbound_connection( - &mut self, - _connection_id: ConnectionId, - maybe_peer: Option, - _addresses: &[Multiaddr], - _effective_role: libp2p::core::Endpoint, - ) -> Result, libp2p::swarm::ConnectionDenied> { - if let Some(enr) = maybe_peer.and_then(|peer_id| self.enr_of_peer(&peer_id)) { - // ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP - // port is removed, which is assumed to be associated with the discv5 protocol (and - // therefore irrelevant for other libp2p components). - Ok(enr.multiaddr_tcp()) - } else { - Ok(vec![]) - } - } - // Main execution loop to drive the behaviour fn poll( &mut self, @@ -1263,4 +1242,4 @@ mod tests { // when a peer belongs to multiple subnet ids, we use the highest ttl. assert_eq!(results.get(&enr1.peer_id()).unwrap(), &instant1); } -} +} \ No newline at end of file diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 4f3454f4033..3f5a088f48a 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -1,5 +1,6 @@ //! Implementation of Lighthouse's peer management system. +use crate::discovery::enr_ext::EnrExt; use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode}; use crate::service::TARGET_SUBNET_PEERS; use crate::{error, metrics, Gossipsub}; @@ -13,7 +14,6 @@ use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult}; use rand::seq::SliceRandom; use slog::{debug, error, trace, warn}; use smallvec::SmallVec; -use std::collections::BTreeMap; use std::{ sync::Arc, time::{Duration, Instant}, @@ -78,7 +78,7 @@ pub struct PeerManager { /// The target number of peers we would like to connect to. target_peers: usize, /// Peers queued to be dialed. - peers_to_dial: BTreeMap>, + peers_to_dial: Vec, /// The number of temporarily banned peers. This is used to prevent instantaneous /// reconnection. // NOTE: This just prevents re-connections. The state of the peer is otherwise unaffected. A @@ -317,11 +317,11 @@ impl PeerManager { /// multiaddr here, however this could relate to duplicate PeerId's etc. If the lookup /// proves resource constraining, we should switch to multiaddr dialling here. #[allow(clippy::mutable_key_type)] - pub fn peers_discovered(&mut self, results: HashMap>) -> Vec { + pub fn peers_discovered(&mut self, results: HashMap>) -> Vec { let mut to_dial_peers = Vec::with_capacity(4); let connected_or_dialing = self.network_globals.connected_or_dialing_peers(); - for (peer_id, min_ttl) in results { + for (enr, min_ttl) in results { // There are two conditions in deciding whether to dial this peer. // 1. If we are less than our max connections. Discovery queries are executed to reach // our target peers, so its fine to dial up to our max peers (which will get pruned @@ -333,7 +333,11 @@ impl PeerManager { if (min_ttl.is_some() && connected_or_dialing + to_dial_peers.len() < self.max_priority_peers() || connected_or_dialing + to_dial_peers.len() < self.max_peers()) - && self.network_globals.peers.read().should_dial(&peer_id) + && self + .network_globals + .peers + .read() + .should_dial(&enr.peer_id()) { // This should be updated with the peer dialing. In fact created once the peer is // dialed @@ -341,9 +345,9 @@ impl PeerManager { self.network_globals .peers .write() - .update_min_ttl(&peer_id, min_ttl); + .update_min_ttl(&enr.peer_id(), min_ttl); } - to_dial_peers.push(peer_id); + to_dial_peers.push(enr); } } @@ -407,8 +411,8 @@ impl PeerManager { /* Notifications from the Swarm */ // A peer is being dialed. - pub fn dial_peer(&mut self, peer_id: &PeerId, enr: Option) { - self.peers_to_dial.insert(*peer_id, enr); + pub fn dial_peers(&mut self, mut peers: Vec) { + self.peers_to_dial.append(&mut peers); } /// Reports if a peer is banned or not. @@ -2331,4 +2335,4 @@ mod tests { }) } } -} +} \ No newline at end of file diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index ce374bb9ab4..eefb0af6b9d 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -12,6 +12,7 @@ use libp2p::swarm::{ConnectionId, NetworkBehaviour, PollParameters, ToSwarm}; use slog::{debug, error}; use types::EthSpec; +use crate::discovery::enr_ext::EnrExt; use crate::metrics; use crate::rpc::GoodbyeReason; use crate::types::SyncState; @@ -95,11 +96,18 @@ impl NetworkBehaviour for PeerManager { self.events.shrink_to_fit(); } - if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_first() { - self.inject_peer_connection(&peer_id, ConnectingType::Dialing, maybe_enr); + if let Some(enr) = self.peers_to_dial.pop() { + self.inject_peer_connection(&enr.peer_id(), ConnectingType::Dialing, Some(enr.clone())); + // Prioritize Quic connections over Tcp ones. + let multiaddrs = enr + .multiaddr_quic() + .into_iter() + .chain(enr.multiaddr_tcp()) + .collect(); return Poll::Ready(ToSwarm::Dial { - opts: DialOpts::peer_id(peer_id) + opts: DialOpts::peer_id(enr.peer_id()) .condition(PeerCondition::Disconnected) + .addresses(multiaddrs) .build(), }); } @@ -295,4 +303,4 @@ impl PeerManager { } } } -} +} \ No newline at end of file diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 809b05c55d6..9de750ea8a5 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -341,8 +341,9 @@ impl Network { let (swarm, bandwidth) = { // Set up the transport - tcp/ws with noise and mplex - let (transport, bandwidth) = build_transport(local_keypair.clone(), !config.disable_quic_support) - .map_err(|e| format!("Failed to build transport: {:?}", e))?; + let (transport, bandwidth) = + build_transport(local_keypair.clone(), !config.disable_quic_support) + .map_err(|e| format!("Failed to build transport: {:?}", e))?; // use the executor for libp2p struct Executor(task_executor::TaskExecutor); @@ -400,7 +401,6 @@ impl Network { debug!(self.log, "Attempting to open listening ports"; config.listen_addrs(), "discovery_enabled" => !config.disable_discovery, "quic_enabled" => !config.disable_quic_support); for listen_multiaddr in config.listen_addrs().listen_addresses() { - // If QUIC is disabled, ignore listening on QUIC ports if config.disable_quic_support { if listen_multiaddr.iter().any(|v| v == MProtocol::QuicV1) { @@ -1049,27 +1049,26 @@ impl Network { /// in Connected, Dialing or Banned state. fn dial_cached_enrs_in_subnet(&mut self, subnet: Subnet) { let predicate = subnet_predicate::(vec![subnet], &self.log); - let peers_to_dial: Vec = self + let peers_to_dial: Vec = self .discovery() .cached_enrs() .filter_map(|(peer_id, enr)| { let peers = self.network_globals.peers.read(); if predicate(enr) && peers.should_dial(peer_id) { - Some(*peer_id) + Some(enr.clone()) } else { None } }) .collect(); - for peer_id in peers_to_dial { - debug!(self.log, "Dialing cached ENR peer"; "peer_id" => %peer_id); - // Remove the ENR from the cache to prevent continual re-dialing on disconnects - - self.discovery_mut().remove_cached_enr(&peer_id); - // For any dial event, inform the peer manager - let enr = self.discovery_mut().enr_of_peer(&peer_id); - self.peer_manager_mut().dial_peer(&peer_id, enr); - } + + // Remove the ENR from the cache to prevent continual re-dialing on disconnects + // TODO: add this step to the iteration above. + peers_to_dial.iter().for_each(|enr| { + self.discovery_mut().remove_cached_enr(&enr.peer_id()); + }); + + self.peer_manager_mut().dial_peers(peers_to_dial); } /* Sub-behaviour event handling functions */ @@ -1359,14 +1358,9 @@ impl Network { &mut self, event: DiscoveredPeers, ) -> Option> { - let DiscoveredPeers { peers } = event; - let to_dial_peers = self.peer_manager_mut().peers_discovered(peers); - for peer_id in to_dial_peers { - debug!(self.log, "Dialing discovered peer"; "peer_id" => %peer_id); - // For any dial event, inform the peer manager - let enr = self.discovery_mut().enr_of_peer(&peer_id); - self.peer_manager_mut().dial_peer(&peer_id, enr); - } + let to_dial_peers = self.peer_manager_mut().peers_discovered(event.peers); + // For any dial event, inform the peer manager + self.peer_manager_mut().dial_peers(to_dial_peers); None }