Skip to content

Commit dd95a52

Browse files
paritytech-release-backport-bot[bot]lexnvgithub-actions[bot]bkchrEgorPopelyaev
authored
[stable2509] Backport #10362: net/peerset: Optimize substream opening duration for SetReservedPeers (#10408)
Backport #10362 into `stable2509` from lexnv. See the [documentation](https://github.com/paritytech/polkadot-sdk/blob/master/docs/BACKPORT.md) on how to use this bot. <!-- # To be used by other automation, do not modify: original-pr-number: #${pull_number} --> --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Co-authored-by: cmd[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Bastian Köcher <git@kchr.de> Co-authored-by: Egor_P <egor@parity.io>
1 parent e8e7b48 commit dd95a52

File tree

5 files changed

+253
-179
lines changed

5 files changed

+253
-179
lines changed

prdoc/pr_10362.prdoc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
title: 'net/peerset: Optimize substream opening duration for `SetReservedPeers`'
2+
doc:
3+
- audience: Node Dev
4+
description: |-
5+
While triaging the Versi-net, I've discovered that the connection between collators and validators sometimes takes less than 20ms, while at other times it takes more than 500ms.
6+
7+
In both cases, the validators are already connected to a different protocol. Therefore, opening and negotiating substreams must be almost instant.
8+
9+
The slot timer of the peerset artificially introduces the delay:
10+
- The `SetReservedPeers` is received by the peerset. At this step, the peerset propagated the `closedSubstream` to signal that it wants to disconnect previously reserved peers.
11+
- At the next slot allocation timer tick (after 1s), the newly added reserved peers are requested to be connected
12+
13+
This can introduce an artificial delay of up to 1s, which is unnecessary.
14+
15+
To mitigate this behavior, this PR:
16+
- Transforms the ` enum PeersetNotificationCommand` into a structure. Effectively, the peerset can specify directly to close some substreams and open other substreams
17+
- Upon receiving the `SetReservedPeers` command, peers are moved into the `Opening` state and the request is propagated to the litep2p to open substreams.
18+
- The behavior of the slot allocation timer remains identical. This is needed to capture the following edge cases:
19+
- The reserved peer of the `SetReservedPeers` is not disconnected, but backoff / pending closing.
20+
- The reserved peer is banned
21+
22+
cc @paritytech/networking
23+
24+
Detected during versi-net triaging of elastic scaling: https://github.com/paritytech/polkadot-sdk/issues/10310#issuecomment-3543395157
25+
crates:
26+
- name: sc-network
27+
bump: patch
28+

substrate/client/network/src/litep2p/shim/notification/mod.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -173,17 +173,19 @@ impl NotificationProtocol {
173173

174174
/// Handle `Peerset` command.
175175
async fn on_peerset_command(&mut self, command: PeersetNotificationCommand) {
176-
match command {
177-
PeersetNotificationCommand::OpenSubstream { peers } => {
178-
log::debug!(target: LOG_TARGET, "{}: open substreams to {peers:?}", self.protocol);
179-
180-
let _ = self.handle.open_substream_batch(peers.into_iter().map(From::from)).await;
181-
},
182-
PeersetNotificationCommand::CloseSubstream { peers } => {
183-
log::debug!(target: LOG_TARGET, "{}: close substreams to {peers:?}", self.protocol);
176+
if !command.open_peers.is_empty() {
177+
log::trace!(target: LOG_TARGET, "{}: open substreams to {:?}", self.protocol, command.open_peers);
178+
let _ = self
179+
.handle
180+
.open_substream_batch(command.open_peers.into_iter().map(From::from))
181+
.await;
182+
}
184183

185-
self.handle.close_substream_batch(peers.into_iter().map(From::from)).await;
186-
},
184+
if !command.close_peers.is_empty() {
185+
log::trace!(target: LOG_TARGET, "{}: close substreams to {:?}", self.protocol, command.close_peers);
186+
self.handle
187+
.close_substream_batch(command.close_peers.into_iter().map(From::from))
188+
.await;
187189
}
188190
}
189191
}

substrate/client/network/src/litep2p/shim/notification/peerset.rs

Lines changed: 69 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -197,18 +197,24 @@ pub enum PeersetCommand {
197197

198198
/// Commands emitted by [`Peerset`] to the notification protocol.
199199
#[derive(Debug)]
200-
pub enum PeersetNotificationCommand {
200+
pub struct PeersetNotificationCommand {
201201
/// Open substreams to one or more peers.
202-
OpenSubstream {
203-
/// Peer IDs.
204-
peers: Vec<PeerId>,
205-
},
202+
pub open_peers: Vec<PeerId>,
206203

207204
/// Close substream to one or more peers.
208-
CloseSubstream {
209-
/// Peer IDs.
210-
peers: Vec<PeerId>,
211-
},
205+
pub close_peers: Vec<PeerId>,
206+
}
207+
208+
impl PeersetNotificationCommand {
209+
/// Open substream to peers.
210+
pub fn open_substream(peers: Vec<PeerId>) -> Self {
211+
Self { open_peers: peers, close_peers: vec![] }
212+
}
213+
214+
/// Close substream to peers.
215+
pub fn close_substream(peers: Vec<PeerId>) -> Self {
216+
Self { open_peers: vec![], close_peers: peers }
217+
}
212218
}
213219

214220
/// Peer state.
@@ -924,6 +930,37 @@ impl Peerset {
924930
}
925931
}
926932

933+
/// Connect to all reserved peers.
934+
///
935+
/// Under the following conditions:
936+
/// 1. The peer must be present to the current set of peers.
937+
/// 2. The peer must be disconnected.
938+
/// 3. The peer must not be banned.
939+
///
940+
/// All reserved peers returned are transitioned to the `PeerState::Opening` state.
941+
fn connect_reserved_peers(&mut self) -> Vec<PeerId> {
942+
self.reserved_peers
943+
.iter()
944+
.filter_map(|peer| {
945+
let peer_state = self.peers.get(peer);
946+
if peer_state != Some(&PeerState::Disconnected) {
947+
return None;
948+
}
949+
950+
if self.peerstore_handle.is_banned(peer) {
951+
return None;
952+
}
953+
954+
// Transition peer to the opening state.
955+
self.peers.insert(
956+
*peer,
957+
PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) },
958+
);
959+
Some(*peer)
960+
})
961+
.collect::<Vec<_>>()
962+
}
963+
927964
/// Get the number of inbound peers.
928965
#[cfg(test)]
929966
pub fn num_in(&self) -> usize {
@@ -984,9 +1021,9 @@ impl Stream for Peerset {
9841021
);
9851022

9861023
self.peers.insert(peer, PeerState::Closing { direction });
987-
return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
988-
peers: vec![peer],
989-
}))
1024+
return Poll::Ready(Some(PeersetNotificationCommand::close_substream(
1025+
vec![peer],
1026+
)));
9901027
},
9911028
Some(PeerState::Backoff) => {
9921029
log::trace!(
@@ -1152,15 +1189,22 @@ impl Stream for Peerset {
11521189
})
11531190
.collect();
11541191

1192+
// Open substreams to the new reserved peers that are disconnected.
1193+
// This ensures we are not relying on the slot allocation timer to connect to
1194+
// the new reserved peers. Therefore, we start connecting to them immediately.
1195+
let connect_to = self.connect_reserved_peers();
1196+
let command = PeersetNotificationCommand {
1197+
open_peers: connect_to,
1198+
close_peers: peers_to_remove,
1199+
};
1200+
11551201
log::trace!(
11561202
target: LOG_TARGET,
1157-
"{}: close substreams to {peers_to_remove:?}",
1203+
"{}: SetReservedPeers result {command:?}",
11581204
self.protocol,
11591205
);
11601206

1161-
return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
1162-
peers: peers_to_remove,
1163-
}))
1207+
return Poll::Ready(Some(command));
11641208
},
11651209
PeersetCommand::AddReservedPeers { peers } => {
11661210
log::debug!(target: LOG_TARGET, "{}: add reserved peers {peers:?}", self.protocol);
@@ -1205,7 +1249,7 @@ impl Stream for Peerset {
12051249

12061250
log::debug!(target: LOG_TARGET, "{}: start connecting to {peers:?}", self.protocol);
12071251

1208-
return Poll::Ready(Some(PeersetNotificationCommand::OpenSubstream { peers }))
1252+
return Poll::Ready(Some(PeersetNotificationCommand::open_substream(peers)));
12091253
},
12101254
PeersetCommand::RemoveReservedPeers { peers } => {
12111255
log::debug!(target: LOG_TARGET, "{}: remove reserved peers {peers:?}", self.protocol);
@@ -1380,9 +1424,9 @@ impl Stream for Peerset {
13801424
self.protocol,
13811425
);
13821426

1383-
return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
1384-
peers: peers_to_remove,
1385-
}))
1427+
return Poll::Ready(Some(PeersetNotificationCommand::close_substream(
1428+
peers_to_remove,
1429+
)));
13861430
},
13871431
PeersetCommand::SetReservedOnly { reserved_only } => {
13881432
log::debug!(target: LOG_TARGET, "{}: set reserved only mode to {reserved_only}", self.protocol);
@@ -1418,9 +1462,9 @@ impl Stream for Peerset {
14181462
_ => {},
14191463
});
14201464

1421-
return Poll::Ready(Some(PeersetNotificationCommand::CloseSubstream {
1422-
peers: peers_to_remove,
1423-
}))
1465+
return Poll::Ready(Some(PeersetNotificationCommand::close_substream(
1466+
peers_to_remove,
1467+
)));
14241468
}
14251469
},
14261470
PeersetCommand::GetReservedPeers { tx } => {
@@ -1435,23 +1479,7 @@ impl Stream for Peerset {
14351479
// also check if there are free outbound slots and if so, fetch peers with highest
14361480
// reputations from `Peerstore` and start opening substreams to these peers
14371481
if let Poll::Ready(()) = Pin::new(&mut self.next_slot_allocation).poll(cx) {
1438-
let mut connect_to = self
1439-
.peers
1440-
.iter()
1441-
.filter_map(|(peer, state)| {
1442-
(self.reserved_peers.contains(peer) &&
1443-
std::matches!(state, PeerState::Disconnected) &&
1444-
!self.peerstore_handle.is_banned(peer))
1445-
.then_some(*peer)
1446-
})
1447-
.collect::<Vec<_>>();
1448-
1449-
connect_to.iter().for_each(|peer| {
1450-
self.peers.insert(
1451-
*peer,
1452-
PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) },
1453-
);
1454-
});
1482+
let mut connect_to = self.connect_reserved_peers();
14551483

14561484
// if the number of outbound peers is lower than the desired amount of outbound peers,
14571485
// query `PeerStore` and try to get a new outbound candidated.
@@ -1496,9 +1524,7 @@ impl Stream for Peerset {
14961524
self.protocol,
14971525
);
14981526

1499-
return Poll::Ready(Some(PeersetNotificationCommand::OpenSubstream {
1500-
peers: connect_to,
1501-
}))
1527+
return Poll::Ready(Some(PeersetNotificationCommand::open_substream(connect_to)));
15021528
}
15031529
}
15041530

substrate/client/network/src/litep2p/shim/notification/tests/fuzz.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222
use crate::{
2323
litep2p::{
2424
peerstore::Peerstore,
25-
shim::notification::peerset::{
26-
OpenResult, Peerset, PeersetCommand, PeersetNotificationCommand,
27-
},
25+
shim::notification::peerset::{OpenResult, Peerset, PeersetCommand},
2826
},
2927
service::traits::{Direction, PeerStore, ValidationResult},
3028
ProtocolName,
@@ -126,22 +124,23 @@ async fn test_once() {
126124

127125
match WeightedIndex::new(&action_weights).unwrap().sample(&mut rng) {
128126
0 => match peerset.next().now_or_never() {
129-
// open substreams to `peers`
130-
Some(Some(PeersetNotificationCommand::OpenSubstream { peers })) =>
131-
for peer in peers {
127+
Some(Some(command)) => {
128+
// open substreams to `peers`
129+
for peer in command.open_peers {
132130
opening.insert(peer, Direction::Outbound);
133131
closed.remove(&peer);
134132

135133
assert!(!closing.contains(&peer));
136134
assert!(!open.contains_key(&peer));
137-
},
138-
// close substreams to `peers`
139-
Some(Some(PeersetNotificationCommand::CloseSubstream { peers })) =>
140-
for peer in peers {
135+
}
136+
137+
// close substreams to `peers`
138+
for peer in command.close_peers {
141139
assert!(closing.insert(peer));
142140
assert!(open.remove(&peer).is_some());
143141
assert!(!opening.contains_key(&peer));
144-
},
142+
}
143+
},
145144
Some(None) => panic!("peerset exited"),
146145
None => {},
147146
},

0 commit comments

Comments
 (0)