Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discovery: append quic multiaddrs along with tcp, #6

Merged
merged 3 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 13 additions & 34 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const DURATION_DIFFERENCE: Duration = Duration::from_millis(1);
/// of the peer if it is specified.
#[derive(Debug)]
pub struct DiscoveredPeers {
pub peers: HashMap<PeerId, Option<Instant>>,
pub peers: HashMap<Enr, Option<Instant>>,
}

#[derive(Clone, PartialEq)]
Expand Down Expand Up @@ -787,7 +787,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
fn process_completed_queries(
&mut self,
query: QueryResult,
) -> Option<HashMap<PeerId, Option<Instant>>> {
) -> Option<HashMap<Enr, Option<Instant>>> {
match query.query_type {
QueryType::FindPeers => {
self.find_peer_active = false;
Expand All @@ -798,10 +798,10 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
Ok(r) => {
debug!(self.log, "Discovery query completed"; "peers_found" => r.len());
let mut results: HashMap<_, Option<Instant>> = HashMap::new();
r.iter().for_each(|enr| {
r.into_iter().for_each(|enr| {
// cache the found ENR's
self.cached_enrs.put(enr.peer_id(), enr.clone());
results.insert(enr.peer_id(), None);
results.insert(enr, None);
});
return Some(results);
}
Expand Down Expand Up @@ -850,17 +850,17 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
let subnet_predicate =
subnet_predicate::<TSpec>(vec![query.subnet], &self.log);

r.iter()
r.clone()
.into_iter()
.filter(|enr| subnet_predicate(enr))
.map(|enr| enr.peer_id())
.for_each(|peer_id| {
.for_each(|enr| {
if let Some(v) = metrics::get_int_counter(
&metrics::SUBNET_PEERS_FOUND,
&[query_str],
) {
v.inc();
}
let other_min_ttl = mapped_results.get_mut(&peer_id);
let other_min_ttl = mapped_results.get_mut(&enr);

// map peer IDs to the min_ttl furthest in the future
match (query.min_ttl, other_min_ttl) {
Expand All @@ -878,15 +878,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
// update the mapping if we have a specified min_ttl
(Some(min_ttl), Some(None)) => {
mapped_results.insert(peer_id, Some(min_ttl));
mapped_results.insert(enr, Some(min_ttl));
}
// first seen min_ttl for this enr
(Some(min_ttl), None) => {
mapped_results.insert(peer_id, Some(min_ttl));
}
// first seen min_ttl for this enr
(None, None) => {
mapped_results.insert(peer_id, None);
(min_ttl, None) => {
mapped_results.insert(enr, min_ttl);
}
(None, Some(Some(_))) => {} // Don't replace the existing specific min_ttl
(None, Some(None)) => {} // No-op because this is a duplicate
Expand All @@ -910,7 +906,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}

/// Drives the queries returning any results from completed queries.
fn poll_queries(&mut self, cx: &mut Context) -> Option<HashMap<PeerId, Option<Instant>>> {
fn poll_queries(&mut self, cx: &mut Context) -> Option<HashMap<Enr, Option<Instant>>> {
while let Poll::Ready(Some(query_result)) = self.active_queries.poll_next_unpin(cx) {
let result = self.process_completed_queries(query_result);
if result.is_some() {
Expand Down Expand Up @@ -957,23 +953,6 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
) {
}

fn handle_pending_outbound_connection(
&mut self,
_connection_id: ConnectionId,
maybe_peer: Option<PeerId>,
_addresses: &[Multiaddr],
_effective_role: libp2p::core::Endpoint,
) -> Result<Vec<Multiaddr>, libp2p::swarm::ConnectionDenied> {
if let Some(enr) = maybe_peer.and_then(|peer_id| self.enr_of_peer(&peer_id)) {
// ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP
// port is removed, which is assumed to be associated with the discv5 protocol (and
// therefore irrelevant for other libp2p components).
Ok(enr.multiaddr_tcp())
} else {
Ok(vec![])
}
}

// Main execution loop to drive the behaviour
fn poll(
&mut self,
Expand Down Expand Up @@ -1263,4 +1242,4 @@ mod tests {
// when a peer belongs to multiple subnet ids, we use the highest ttl.
assert_eq!(results.get(&enr1.peer_id()).unwrap(), &instant1);
}
}
}
24 changes: 14 additions & 10 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Implementation of Lighthouse's peer management system.

use crate::discovery::enr_ext::EnrExt;
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::service::TARGET_SUBNET_PEERS;
use crate::{error, metrics, Gossipsub};
Expand All @@ -13,7 +14,6 @@ use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult};
use rand::seq::SliceRandom;
use slog::{debug, error, trace, warn};
use smallvec::SmallVec;
use std::collections::BTreeMap;
use std::{
sync::Arc,
time::{Duration, Instant},
Expand Down Expand Up @@ -78,7 +78,7 @@ pub struct PeerManager<TSpec: EthSpec> {
/// The target number of peers we would like to connect to.
target_peers: usize,
/// Peers queued to be dialed.
peers_to_dial: BTreeMap<PeerId, Option<Enr>>,
peers_to_dial: Vec<Enr>,
/// The number of temporarily banned peers. This is used to prevent instantaneous
/// reconnection.
// NOTE: This just prevents re-connections. The state of the peer is otherwise unaffected. A
Expand Down Expand Up @@ -317,11 +317,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// multiaddr here, however this could relate to duplicate PeerId's etc. If the lookup
/// proves resource constraining, we should switch to multiaddr dialling here.
#[allow(clippy::mutable_key_type)]
pub fn peers_discovered(&mut self, results: HashMap<PeerId, Option<Instant>>) -> Vec<PeerId> {
pub fn peers_discovered(&mut self, results: HashMap<Enr, Option<Instant>>) -> Vec<Enr> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. I'll also update the doc comments to match

let mut to_dial_peers = Vec::with_capacity(4);

let connected_or_dialing = self.network_globals.connected_or_dialing_peers();
for (peer_id, min_ttl) in results {
for (enr, min_ttl) in results {
// There are two conditions in deciding whether to dial this peer.
// 1. If we are less than our max connections. Discovery queries are executed to reach
// our target peers, so its fine to dial up to our max peers (which will get pruned
Expand All @@ -333,17 +333,21 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
if (min_ttl.is_some()
&& connected_or_dialing + to_dial_peers.len() < self.max_priority_peers()
|| connected_or_dialing + to_dial_peers.len() < self.max_peers())
&& self.network_globals.peers.read().should_dial(&peer_id)
&& self
.network_globals
.peers
.read()
.should_dial(&enr.peer_id())
{
// This should be updated with the peer dialing. In fact created once the peer is
// dialed
if let Some(min_ttl) = min_ttl {
self.network_globals
.peers
.write()
.update_min_ttl(&peer_id, min_ttl);
.update_min_ttl(&enr.peer_id(), min_ttl);
}
to_dial_peers.push(peer_id);
to_dial_peers.push(enr);
}
}

Expand Down Expand Up @@ -407,8 +411,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/* Notifications from the Swarm */

// A peer is being dialed.
pub fn dial_peer(&mut self, peer_id: &PeerId, enr: Option<Enr>) {
self.peers_to_dial.insert(*peer_id, enr);
pub fn dial_peers(&mut self, mut peers: Vec<Enr>) {
self.peers_to_dial.append(&mut peers);
}

/// Reports if a peer is banned or not.
Expand Down Expand Up @@ -2331,4 +2335,4 @@ mod tests {
})
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use libp2p::swarm::{ConnectionId, NetworkBehaviour, PollParameters, ToSwarm};
use slog::{debug, error};
use types::EthSpec;

use crate::discovery::enr_ext::EnrExt;
use crate::metrics;
use crate::rpc::GoodbyeReason;
use crate::types::SyncState;
Expand Down Expand Up @@ -95,11 +96,18 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
self.events.shrink_to_fit();
}

if let Some((peer_id, maybe_enr)) = self.peers_to_dial.pop_first() {
self.inject_peer_connection(&peer_id, ConnectingType::Dialing, maybe_enr);
if let Some(enr) = self.peers_to_dial.pop() {
self.inject_peer_connection(&enr.peer_id(), ConnectingType::Dialing, Some(enr.clone()));
// Prioritize Quic connections over Tcp ones.
let multiaddrs = enr
.multiaddr_quic()
.into_iter()
.chain(enr.multiaddr_tcp())
.collect();
return Poll::Ready(ToSwarm::Dial {
opts: DialOpts::peer_id(peer_id)
opts: DialOpts::peer_id(enr.peer_id())
.condition(PeerCondition::Disconnected)
.addresses(multiaddrs)
.build(),
});
}
Expand Down Expand Up @@ -295,4 +303,4 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}
}
}
}
}
38 changes: 16 additions & 22 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,9 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {

let (swarm, bandwidth) = {
// Set up the transport - tcp/ws with noise and mplex
let (transport, bandwidth) = build_transport(local_keypair.clone(), !config.disable_quic_support)
.map_err(|e| format!("Failed to build transport: {:?}", e))?;
let (transport, bandwidth) =
build_transport(local_keypair.clone(), !config.disable_quic_support)
.map_err(|e| format!("Failed to build transport: {:?}", e))?;

// use the executor for libp2p
struct Executor(task_executor::TaskExecutor);
Expand Down Expand Up @@ -400,7 +401,6 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
debug!(self.log, "Attempting to open listening ports"; config.listen_addrs(), "discovery_enabled" => !config.disable_discovery, "quic_enabled" => !config.disable_quic_support);

for listen_multiaddr in config.listen_addrs().listen_addresses() {

// If QUIC is disabled, ignore listening on QUIC ports
if config.disable_quic_support {
if listen_multiaddr.iter().any(|v| v == MProtocol::QuicV1) {
Expand Down Expand Up @@ -1049,27 +1049,26 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
/// in Connected, Dialing or Banned state.
fn dial_cached_enrs_in_subnet(&mut self, subnet: Subnet) {
let predicate = subnet_predicate::<TSpec>(vec![subnet], &self.log);
let peers_to_dial: Vec<PeerId> = self
let peers_to_dial: Vec<Enr> = self
.discovery()
.cached_enrs()
.filter_map(|(peer_id, enr)| {
let peers = self.network_globals.peers.read();
if predicate(enr) && peers.should_dial(peer_id) {
Some(*peer_id)
Some(enr.clone())
} else {
None
}
})
.collect();
for peer_id in peers_to_dial {
debug!(self.log, "Dialing cached ENR peer"; "peer_id" => %peer_id);
// Remove the ENR from the cache to prevent continual re-dialing on disconnects

self.discovery_mut().remove_cached_enr(&peer_id);
// For any dial event, inform the peer manager
let enr = self.discovery_mut().enr_of_peer(&peer_id);
self.peer_manager_mut().dial_peer(&peer_id, enr);
}

// Remove the ENR from the cache to prevent continual re-dialing on disconnects
// TODO: add this step to the iteration above.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add this guy also

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a bit trickier because the iterator is on a reference of cached peers.

The second iteration isn't that bad. Feel free to update if you find a nice way to group them and avoid the ENR clone

peers_to_dial.iter().for_each(|enr| {
self.discovery_mut().remove_cached_enr(&enr.peer_id());
});

self.peer_manager_mut().dial_peers(peers_to_dial);
}

/* Sub-behaviour event handling functions */
Expand Down Expand Up @@ -1359,14 +1358,9 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
&mut self,
event: DiscoveredPeers,
) -> Option<NetworkEvent<AppReqId, TSpec>> {
let DiscoveredPeers { peers } = event;
let to_dial_peers = self.peer_manager_mut().peers_discovered(peers);
for peer_id in to_dial_peers {
debug!(self.log, "Dialing discovered peer"; "peer_id" => %peer_id);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the logs we emit are quite important. Trace logs not so much, but debug logs are. This one in particular is very useful to track errors throughout the stack as we note when we dial and when connections fail etc.

I'll add this log into the peer manager poll when we submit the actual dial.

// For any dial event, inform the peer manager
let enr = self.discovery_mut().enr_of_peer(&peer_id);
self.peer_manager_mut().dial_peer(&peer_id, enr);
}
let to_dial_peers = self.peer_manager_mut().peers_discovered(event.peers);
// For any dial event, inform the peer manager
self.peer_manager_mut().dial_peers(to_dial_peers);
None
}

Expand Down
Loading