Skip to content

Commit e3236ab

Browse files
committed
feat(census): add peer scoring
1 parent e3d987e commit e3236ab

File tree

6 files changed

+168
-61
lines changed

6 files changed

+168
-61
lines changed

portal-bridge/src/bridge/state.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -291,20 +291,16 @@ impl StateBridge {
291291
Ok(())
292292
}
293293

294-
// request enrs interested in the content key from Census
295-
fn request_enrs(&self, content_key: &StateContentKey) -> anyhow::Result<Vec<Enr>> {
296-
Ok(self
297-
.census
298-
.get_interested_enrs(Subnetwork::State, &content_key.content_id())?)
299-
}
300-
301294
// spawn individual offer tasks of the content key for each interested enr found in Census
302295
async fn spawn_offer_tasks(
303296
&self,
304297
content_key: StateContentKey,
305298
content_value: StateContentValue,
306299
) {
307-
let Ok(enrs) = self.request_enrs(&content_key) else {
300+
let Ok(enrs) = self
301+
.census
302+
.select_peers(Subnetwork::State, &content_key.content_id())
303+
else {
308304
error!("Failed to request enrs for content key, skipping offer: {content_key:?}");
309305
return;
310306
};

portal-bridge/src/census/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use network::{Network, NetworkAction, NetworkInitializationConfig, NetworkManage
1616
mod network;
1717
mod peer;
1818
mod peers;
19+
mod scoring;
1920

2021
/// The error that occured in [Census].
2122
#[derive(Error, Debug)]
@@ -59,16 +60,16 @@ impl Census {
5960
}
6061
}
6162

62-
/// Returns ENRs interested in provided content id.
63-
pub fn get_interested_enrs(
63+
/// Selects peers to receive content.
64+
pub fn select_peers(
6465
&self,
6566
subnetwork: Subnetwork,
6667
content_id: &[u8; 32],
6768
) -> Result<Vec<Enr>, CensusError> {
6869
match subnetwork {
69-
Subnetwork::History => self.history.get_interested_enrs(content_id),
70-
Subnetwork::State => self.state.get_interested_enrs(content_id),
71-
Subnetwork::Beacon => self.beacon.get_interested_enrs(content_id),
70+
Subnetwork::History => self.history.select_peers(content_id),
71+
Subnetwork::State => self.state.select_peers(content_id),
72+
Subnetwork::Beacon => self.beacon.select_peers(content_id),
7273
_ => Err(CensusError::UnsupportedSubnetwork(subnetwork)),
7374
}
7475
}

portal-bridge/src/census/network.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ use crate::{
2121
cli::{BridgeConfig, ClientType},
2222
};
2323

24-
use super::peers::Peers;
24+
use super::{
25+
peers::Peers,
26+
scoring::{AdditiveWeight, PeerSelector},
27+
};
2528

2629
/// The result of the liveness check.
2730
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -69,11 +72,10 @@ impl Default for NetworkInitializationConfig {
6972
/// should be used in a background task to keep it up-to-date.
7073
#[derive(Clone)]
7174
pub(super) struct Network {
72-
peers: Peers,
75+
peers: Peers<AdditiveWeight>,
7376
client: HttpClient,
7477
subnetwork: Subnetwork,
7578
filter_clients: Vec<ClientType>,
76-
enr_offer_limit: usize,
7779
}
7880

7981
impl Network {
@@ -86,30 +88,30 @@ impl Network {
8688
}
8789

8890
Self {
89-
peers: Peers::new(),
91+
peers: Peers::new(PeerSelector::new(
92+
AdditiveWeight::default(),
93+
bridge_config.enr_offer_limit,
94+
)),
9095
client,
9196
subnetwork,
9297
filter_clients: bridge_config.filter_clients.to_vec(),
93-
enr_offer_limit: bridge_config.enr_offer_limit,
9498
}
9599
}
96100

97101
pub fn create_manager(&self) -> NetworkManager {
98102
NetworkManager::new(self.clone())
99103
}
100104

101-
/// Look up interested enrs for a given content id
102-
pub fn get_interested_enrs(&self, content_id: &[u8; 32]) -> Result<Vec<Enr>, CensusError> {
105+
/// Selects peers to receive content.
106+
pub fn select_peers(&self, content_id: &[u8; 32]) -> Result<Vec<Enr>, CensusError> {
103107
if self.peers.is_empty() {
104108
error!(
105109
subnetwork = %self.subnetwork,
106110
"No known peers, unable to look up interested enrs",
107111
);
108112
return Err(CensusError::NoPeers);
109113
}
110-
Ok(self
111-
.peers
112-
.get_interested_enrs(content_id, self.enr_offer_limit))
114+
Ok(self.peers.select_peers(content_id))
113115
}
114116

115117
/// Records the status of the most recent `Offer` request to one of the peers.

portal-bridge/src/census/peer.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,18 @@ use tracing::error;
99

1010
#[derive(Debug, Clone)]
1111
pub struct LivenessCheck {
12-
success: bool,
13-
#[allow(dead_code)]
14-
timestamp: Instant,
12+
pub success: bool,
13+
pub timestamp: Instant,
1514
}
1615

17-
#[allow(dead_code)]
1816
#[derive(Debug, Clone)]
1917
pub struct OfferEvent {
20-
success: bool,
21-
timestamp: Instant,
22-
content_value_size: usize,
23-
duration: Duration,
18+
pub success: bool,
19+
pub timestamp: Instant,
20+
#[allow(dead_code)]
21+
pub content_value_size: usize,
22+
#[allow(dead_code)]
23+
pub duration: Duration,
2424
}
2525

2626
#[derive(Debug)]
@@ -57,17 +57,8 @@ impl Peer {
5757
self.enr.clone()
5858
}
5959

60-
/// Returns true if latest liveness check was successful and content is within radius.
60+
/// Returns true if content is within radius.
6161
pub fn is_interested_in_content(&self, content_id: &[u8; 32]) -> bool {
62-
// check that most recent liveness check was successful
63-
if !self
64-
.liveness_checks
65-
.front()
66-
.is_some_and(|liveness_check| liveness_check.success)
67-
{
68-
return false;
69-
}
70-
7162
let distance = XorMetric::distance(&self.enr.node_id().raw(), content_id);
7263
distance <= self.radius
7364
}
@@ -138,4 +129,12 @@ impl Peer {
138129
self.offer_events.drain(Self::MAX_OFFER_EVENTS..);
139130
}
140131
}
132+
133+
pub fn iter_liveness_checks(&self) -> impl Iterator<Item = &LivenessCheck> {
134+
self.liveness_checks.iter()
135+
}
136+
137+
pub fn iter_offer_events(&self) -> impl Iterator<Item = &OfferEvent> {
138+
self.offer_events.iter()
139+
}
141140
}

portal-bridge/src/census/peers.rs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@ use ethportal_api::{
1313
Enr,
1414
};
1515
use futures::Stream;
16-
use rand::seq::IteratorRandom;
1716
use tokio::time::Instant;
1817
use tracing::error;
1918

20-
use super::peer::Peer;
19+
use super::{
20+
peer::Peer,
21+
scoring::{PeerSelector, Weight},
22+
};
2123

2224
/// How frequently liveness check should be done.
2325
///
@@ -40,23 +42,19 @@ struct PeersWithLivenessChecks {
4042
/// It provides thread safe access to peers and is responsible for deciding when they should be
4143
/// pinged for liveness.
4244
#[derive(Clone, Debug)]
43-
pub(super) struct Peers {
45+
pub(super) struct Peers<W: Weight> {
4446
peers: Arc<RwLock<PeersWithLivenessChecks>>,
47+
selector: PeerSelector<W>,
4548
}
4649

47-
impl Default for Peers {
48-
fn default() -> Self {
49-
Self::new()
50-
}
51-
}
52-
53-
impl Peers {
54-
pub fn new() -> Self {
50+
impl<W: Weight> Peers<W> {
51+
pub fn new(selector: PeerSelector<W>) -> Self {
5552
Self {
5653
peers: Arc::new(RwLock::new(PeersWithLivenessChecks {
5754
peers: HashMap::new(),
5855
liveness_checks: HashSetDelay::new(LIVENESS_CHECK_DELAY),
5956
})),
57+
selector,
6058
}
6159
}
6260

@@ -121,14 +119,10 @@ impl Peers {
121119
}
122120
}
123121

124-
/// Selects random `limit` peers that should be interested in content.
125-
pub fn get_interested_enrs(&self, content_id: &[u8; 32], limit: usize) -> Vec<Enr> {
126-
self.read()
127-
.peers
128-
.values()
129-
.filter(|peer| peer.is_interested_in_content(content_id))
130-
.map(Peer::enr)
131-
.choose_multiple(&mut rand::thread_rng(), limit)
122+
/// Selects peers to receive content.
123+
pub fn select_peers(&self, content_id: &[u8; 32]) -> Vec<Enr> {
124+
self.selector
125+
.select_peers(content_id, self.read().peers.values())
132126
}
133127

134128
fn read(&self) -> RwLockReadGuard<'_, PeersWithLivenessChecks> {
@@ -140,7 +134,7 @@ impl Peers {
140134
}
141135
}
142136

143-
impl Stream for Peers {
137+
impl<W: Weight> Stream for Peers<W> {
144138
type Item = Enr;
145139

146140
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

portal-bridge/src/census/scoring.rs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
use std::time::Duration;
2+
3+
use ethportal_api::Enr;
4+
use itertools::Itertools;
5+
use rand::{seq::SliceRandom, thread_rng};
6+
7+
use super::peer::Peer;
8+
9+
/// A trait for calculating peer's weight.
10+
pub trait Weight: Send + Sync {
11+
fn weight(&self, content_id: &[u8; 32], peer: &Peer) -> u32;
12+
13+
fn weight_all<'a>(
14+
&self,
15+
content_id: &[u8; 32],
16+
peers: impl IntoIterator<Item = &'a Peer>,
17+
) -> impl Iterator<Item = (&'a Peer, u32)> {
18+
peers
19+
.into_iter()
20+
.map(|peer| (peer, self.weight(content_id, peer)))
21+
}
22+
}
23+
24+
/// Calculates peer's weight by adding/subtracting weights of recent events.
25+
///
26+
/// Weight is calculated using following rules:
27+
/// 1. If peer is not interested in content, `0` is returned
28+
/// 2. Weight's starting value is `starting_weight`
29+
/// 3. All recent events (based on `timeframe` parameter) are scored separately:
30+
/// - successful events increase weight by `success_weight`
31+
/// - failed events decrease weight by `failure_weight`
32+
/// 4. Final weight is restricted to `[0, maximum_weight]` range.
33+
#[derive(Debug, Clone)]
34+
pub struct AdditiveWeight {
35+
pub timeframe: Duration,
36+
pub starting_weight: u32,
37+
pub maximum_weight: u32,
38+
pub success_weight: i32,
39+
pub failure_weight: i32,
40+
}
41+
42+
impl Default for AdditiveWeight {
43+
fn default() -> Self {
44+
Self {
45+
timeframe: Duration::from_secs(15 * 60), // 15 min
46+
starting_weight: 200,
47+
maximum_weight: 400,
48+
success_weight: 5,
49+
failure_weight: -10,
50+
}
51+
}
52+
}
53+
54+
impl Weight for AdditiveWeight {
55+
fn weight(&self, content_id: &[u8; 32], peer: &Peer) -> u32 {
56+
if !peer.is_interested_in_content(content_id) {
57+
return 0;
58+
}
59+
let weight = self.starting_weight as i32
60+
+ Iterator::chain(
61+
peer.iter_liveness_checks()
62+
.map(|liveness_check| (liveness_check.success, liveness_check.timestamp)),
63+
peer.iter_offer_events()
64+
.map(|offer_event| (offer_event.success, offer_event.timestamp)),
65+
)
66+
.map(|(success, timestamp)| {
67+
if timestamp.elapsed() < self.timeframe {
68+
return 0;
69+
}
70+
if success {
71+
self.success_weight
72+
} else {
73+
self.failure_weight
74+
}
75+
})
76+
.sum::<i32>();
77+
weight.clamp(0, self.maximum_weight as i32) as u32
78+
}
79+
}
80+
81+
/// Selects peers based on their weight provided by [Weight] trait.
82+
///
83+
/// Selection is done using [SliceRandom::choose_multiple_weighted]. Peers are ranked so that
84+
/// probability of peer A being ranked higher than peer B is proportional to their weights.
85+
/// The top ranked peers are then selected and returned.
86+
#[derive(Debug, Clone)]
87+
pub struct PeerSelector<W: Weight> {
88+
weight: W,
89+
/// The maximum number of peers to select
90+
limit: usize,
91+
}
92+
93+
impl<W: Weight> PeerSelector<W> {
94+
pub fn new(rank: W, limit: usize) -> Self {
95+
Self {
96+
weight: rank,
97+
limit,
98+
}
99+
}
100+
101+
/// Selects up to `self.limit` peers based on their weights.
102+
pub fn select_peers<'a>(
103+
&self,
104+
content_id: &[u8; 32],
105+
peers: impl IntoIterator<Item = &'a Peer>,
106+
) -> Vec<Enr> {
107+
let weighted_peers = self.weight.weight_all(content_id, peers).collect_vec();
108+
109+
weighted_peers
110+
.choose_multiple_weighted(&mut thread_rng(), self.limit, |(_peer, weight)| *weight)
111+
.expect("choosing random sample shouldn't fail")
112+
.map(|(peer, _weight)| peer.enr())
113+
.collect()
114+
}
115+
}

0 commit comments

Comments
 (0)