Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions prdoc/pr_10362.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
title: 'net/peerset: Optimize substream opening duration for `SetReservedPeers`'
doc:
- audience: Node Dev
description: |-
While triaging the Versi-net, I've discovered that the connection between collators and validators sometimes takes less than 20ms, while at other times it takes more than 500ms.

In both cases, the validators are already connected to a different protocol. Therefore, opening and negotiating substreams must be almost instant.

The slot timer of the peerset artificially introduces the delay:
- The `SetReservedPeers` is received by the peerset. At this step, the peerset propagated the `closedSubstream` to signal that it wants to disconnect previously reserved peers.
- At the next slot allocation timer tick (after 1s), the newly added reserved peers are requested to be connected

This can introduce an artificial delay of up to 1s, which is unnecessary.

To mitigate this behavior, this PR:
- Transforms the ` enum PeersetNotificationCommand` into a structure. Effectively, the peerset can specify directly to close some substreams and open other substreams
- Upon receiving the `SetReservedPeers` command, peers are moved into the `Opening` state and the request is propagated to the litep2p to open substreams.
- The behavior of the slot allocation timer remains identical. This is needed to capture the following edge cases:
- The reserved peer of the `SetReservedPeers` is not disconnected, but backoff / pending closing.
- The reserved peer is banned

cc @paritytech/networking

Detected during versi-net triaging of elastic scaling: https://github.com/paritytech/polkadot-sdk/issues/10310#issuecomment-3543395157
crates:
- name: sc-network
bump: patch
22 changes: 12 additions & 10 deletions substrate/client/network/src/litep2p/shim/notification/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,19 @@ impl NotificationProtocol {

/// Handle `Peerset` command.
async fn on_peerset_command(&mut self, command: PeersetNotificationCommand) {
match command {
PeersetNotificationCommand::OpenSubstream { peers } => {
log::debug!(target: LOG_TARGET, "{}: open substreams to {peers:?}", self.protocol);

let _ = self.handle.open_substream_batch(peers.into_iter().map(From::from)).await;
},
PeersetNotificationCommand::CloseSubstream { peers } => {
log::debug!(target: LOG_TARGET, "{}: close substreams to {peers:?}", self.protocol);
if !command.open_peers.is_empty() {
log::trace!(target: LOG_TARGET, "{}: open substreams to {:?}", self.protocol, command.open_peers);
let _ = self
.handle
.open_substream_batch(command.open_peers.into_iter().map(From::from))
.await;
}

self.handle.close_substream_batch(peers.into_iter().map(From::from)).await;
},
if !command.close_peers.is_empty() {
log::trace!(target: LOG_TARGET, "{}: close substreams to {:?}", self.protocol, command.close_peers);
self.handle
.close_substream_batch(command.close_peers.into_iter().map(From::from))
.await;
}
}
}
Expand Down
112 changes: 69 additions & 43 deletions substrate/client/network/src/litep2p/shim/notification/peerset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,24 @@ pub enum PeersetCommand {

/// Commands emitted by [`Peerset`] to the notification protocol.
#[derive(Debug)]
pub enum PeersetNotificationCommand {
pub struct PeersetNotificationCommand {
/// Open substreams to one or more peers.
OpenSubstream {
/// Peer IDs.
peers: Vec<PeerId>,
},
pub open_peers: Vec<PeerId>,

/// Close substream to one or more peers.
CloseSubstream {
/// Peer IDs.
peers: Vec<PeerId>,
},
pub close_peers: Vec<PeerId>,
}

impl PeersetNotificationCommand {
/// Open substream to peers.
pub fn open_substream(peers: Vec<PeerId>) -> Self {
Self { open_peers: peers, close_peers: vec![] }
}

/// Close substream to peers.
pub fn close_substream(peers: Vec<PeerId>) -> Self {
Self { open_peers: vec![], close_peers: peers }
}
}

/// Peer state.
Expand Down Expand Up @@ -924,6 +930,37 @@ impl Peerset {
}
}

/// Connect to all reserved peers.
///
/// Under the following conditions:
/// 1. The peer must be present to the current set of peers.
/// 2. The peer must be disconnected.
/// 3. The peer must not be banned.
///
/// All reserved peers returned are transitioned to the `PeerState::Opening` state.
fn connect_reserved_peers(&mut self) -> Vec<PeerId> {
self.reserved_peers
.iter()
.filter_map(|peer| {
let peer_state = self.peers.get(peer);
if peer_state != Some(&PeerState::Disconnected) {
return None;
}

if self.peerstore_handle.is_banned(peer) {
return None;
}

// Transition peer to the opening state.
self.peers.insert(
*peer,
PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) },
);
Some(*peer)
})
.collect::<Vec<_>>()
}

/// Get the number of inbound peers.
#[cfg(test)]
pub fn num_in(&self) -> usize {
Expand Down Expand Up @@ -984,9 +1021,9 @@ impl Stream for Peerset {
);

self.peers.insert(peer, PeerState::Closing { direction });
return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
peers: vec![peer],
}))
return Poll::Ready(Some(PeersetNotificationCommand::close_substream(
vec![peer],
)));
},
Some(PeerState::Backoff) => {
log::trace!(
Expand Down Expand Up @@ -1152,15 +1189,22 @@ impl Stream for Peerset {
})
.collect();

// Open substreams to the new reserved peers that are disconnected.
// This ensures we are not relying on the slot allocation timer to connect to
// the new reserved peers. Therefore, we start connecting to them immediately.
let connect_to = self.connect_reserved_peers();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: connect_reserved_peers goes over all peers (not only reserved) to find disconnected reserved peers. We can handle only newly added reserved peers here like it's done with reserved_peers_maybe_remove, and let old reserved peers that got disconnected to be handled later on slot allocation. This should optimize things for the case when the list of reserved peers is updated many times per second.

This is minor though, feel free to ignore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally makes sense, I've copy pasted the code without thinking too much about it:

  • now we are only iterating over the provided reserved peers
  • in a single iteration I'm also changing the state to opening, since there's no real reason why we'd have to pay for a second pass
  • slightly adjusted the suggestion and picked iterating through all reserved peers since they aren't expected to be large, and wanted to "connect" as fast as possible

let command = PeersetNotificationCommand {
open_peers: connect_to,
close_peers: peers_to_remove,
};

log::trace!(
target: LOG_TARGET,
"{}: close substreams to {peers_to_remove:?}",
"{}: SetReservedPeers result {command:?}",
self.protocol,
);

return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
peers: peers_to_remove,
}))
return Poll::Ready(Some(command));
},
PeersetCommand::AddReservedPeers { peers } => {
log::debug!(target: LOG_TARGET, "{}: add reserved peers {peers:?}", self.protocol);
Expand Down Expand Up @@ -1205,7 +1249,7 @@ impl Stream for Peerset {

log::debug!(target: LOG_TARGET, "{}: start connecting to {peers:?}", self.protocol);

return Poll::Ready(Some(PeersetNotificationCommand::OpenSubstream { peers }))
return Poll::Ready(Some(PeersetNotificationCommand::open_substream(peers)));
},
PeersetCommand::RemoveReservedPeers { peers } => {
log::debug!(target: LOG_TARGET, "{}: remove reserved peers {peers:?}", self.protocol);
Expand Down Expand Up @@ -1380,9 +1424,9 @@ impl Stream for Peerset {
self.protocol,
);

return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
peers: peers_to_remove,
}))
return Poll::Ready(Some(PeersetNotificationCommand::close_substream(
peers_to_remove,
)));
},
PeersetCommand::SetReservedOnly { reserved_only } => {
log::debug!(target: LOG_TARGET, "{}: set reserved only mode to {reserved_only}", self.protocol);
Expand Down Expand Up @@ -1418,9 +1462,9 @@ impl Stream for Peerset {
_ => {},
});

return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
peers: peers_to_remove,
}))
return Poll::Ready(Some(PeersetNotificationCommand::close_substream(
peers_to_remove,
)));
}
},
PeersetCommand::GetReservedPeers { tx } => {
Expand All @@ -1435,23 +1479,7 @@ impl Stream for Peerset {
// also check if there are free outbound slots and if so, fetch peers with highest
// reputations from `Peerstore` and start opening substreams to these peers
if let Poll::Ready(()) = Pin::new(&mut self.next_slot_allocation).poll(cx) {
let mut connect_to = self
.peers
.iter()
.filter_map(|(peer, state)| {
(self.reserved_peers.contains(peer) &&
std::matches!(state, PeerState::Disconnected) &&
!self.peerstore_handle.is_banned(peer))
.then_some(*peer)
})
.collect::<Vec<_>>();

connect_to.iter().for_each(|peer| {
self.peers.insert(
*peer,
PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) },
);
});
let mut connect_to = self.connect_reserved_peers();

// if the number of outbound peers is lower than the desired amount of outbound peers,
// query `PeerStore` and try to get a new outbound candidated.
Expand Down Expand Up @@ -1496,9 +1524,7 @@ impl Stream for Peerset {
self.protocol,
);

return Poll::Ready(Some(PeersetNotificationCommand::OpenSubstream {
peers: connect_to,
}))
return Poll::Ready(Some(PeersetNotificationCommand::open_substream(connect_to)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
use crate::{
litep2p::{
peerstore::Peerstore,
shim::notification::peerset::{
OpenResult, Peerset, PeersetCommand, PeersetNotificationCommand,
},
shim::notification::peerset::{OpenResult, Peerset, PeersetCommand},
},
service::traits::{Direction, PeerStore, ValidationResult},
ProtocolName,
Expand Down Expand Up @@ -126,22 +124,23 @@ async fn test_once() {

match WeightedIndex::new(&action_weights).unwrap().sample(&mut rng) {
0 => match peerset.next().now_or_never() {
// open substreams to `peers`
Some(Some(PeersetNotificationCommand::OpenSubstream { peers })) =>
for peer in peers {
Some(Some(command)) => {
// open substreams to `peers`
for peer in command.open_peers {
opening.insert(peer, Direction::Outbound);
closed.remove(&peer);

assert!(!closing.contains(&peer));
assert!(!open.contains_key(&peer));
},
// close substreams to `peers`
Some(Some(PeersetNotificationCommand::CloseSubstream { peers })) =>
for peer in peers {
}

// close substreams to `peers`
for peer in command.close_peers {
assert!(closing.insert(peer));
assert!(open.remove(&peer).is_some());
assert!(!opening.contains_key(&peer));
},
}
},
Some(None) => panic!("peerset exited"),
None => {},
},
Expand Down
Loading
Loading