Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(census): add peer scoring #1579

Merged
merged 1 commit into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions portal-bridge/src/bridge/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,20 +291,16 @@ impl StateBridge {
Ok(())
}

// request enrs interested in the content key from Census
fn request_enrs(&self, content_key: &StateContentKey) -> anyhow::Result<Vec<Enr>> {
Ok(self
.census
.get_interested_enrs(Subnetwork::State, &content_key.content_id())?)
}

// spawn individual offer tasks of the content key for each interested enr found in Census
async fn spawn_offer_tasks(
&self,
content_key: StateContentKey,
content_value: StateContentValue,
) {
let Ok(enrs) = self.request_enrs(&content_key) else {
let Ok(enrs) = self
.census
.select_peers(Subnetwork::State, &content_key.content_id())
else {
error!("Failed to request enrs for content key, skipping offer: {content_key:?}");
return;
};
Expand Down
11 changes: 6 additions & 5 deletions portal-bridge/src/census/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use network::{Network, NetworkAction, NetworkInitializationConfig, NetworkManage
mod network;
mod peer;
mod peers;
mod scoring;

/// The error that occured in [Census].
#[derive(Error, Debug)]
Expand Down Expand Up @@ -59,16 +60,16 @@ impl Census {
}
}

/// Returns ENRs interested in provided content id.
pub fn get_interested_enrs(
/// Selects peers to receive content.
pub fn select_peers(
&self,
subnetwork: Subnetwork,
content_id: &[u8; 32],
) -> Result<Vec<Enr>, CensusError> {
match subnetwork {
Subnetwork::History => self.history.get_interested_enrs(content_id),
Subnetwork::State => self.state.get_interested_enrs(content_id),
Subnetwork::Beacon => self.beacon.get_interested_enrs(content_id),
Subnetwork::History => self.history.select_peers(content_id),
Subnetwork::State => self.state.select_peers(content_id),
Subnetwork::Beacon => self.beacon.select_peers(content_id),
_ => Err(CensusError::UnsupportedSubnetwork(subnetwork)),
}
}
Expand Down
22 changes: 12 additions & 10 deletions portal-bridge/src/census/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use crate::{
cli::{BridgeConfig, ClientType},
};

use super::peers::Peers;
use super::{
peers::Peers,
scoring::{AdditiveWeight, PeerSelector},
};

/// The result of the liveness check.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -69,11 +72,10 @@ impl Default for NetworkInitializationConfig {
/// should be used in a background task to keep it up-to-date.
#[derive(Clone)]
pub(super) struct Network {
peers: Peers,
peers: Peers<AdditiveWeight>,
client: HttpClient,
subnetwork: Subnetwork,
filter_clients: Vec<ClientType>,
enr_offer_limit: usize,
}

impl Network {
Expand All @@ -86,30 +88,30 @@ impl Network {
}

Self {
peers: Peers::new(),
peers: Peers::new(PeerSelector::new(
AdditiveWeight::default(),
bridge_config.enr_offer_limit,
)),
client,
subnetwork,
filter_clients: bridge_config.filter_clients.to_vec(),
enr_offer_limit: bridge_config.enr_offer_limit,
}
}

pub fn create_manager(&self) -> NetworkManager {
NetworkManager::new(self.clone())
}

/// Look up interested enrs for a given content id
pub fn get_interested_enrs(&self, content_id: &[u8; 32]) -> Result<Vec<Enr>, CensusError> {
/// Selects peers to receive content.
pub fn select_peers(&self, content_id: &[u8; 32]) -> Result<Vec<Enr>, CensusError> {
if self.peers.is_empty() {
error!(
subnetwork = %self.subnetwork,
"No known peers, unable to look up interested enrs",
);
return Err(CensusError::NoPeers);
}
Ok(self
.peers
.get_interested_enrs(content_id, self.enr_offer_limit))
Ok(self.peers.select_peers(content_id))
}

/// Records the status of the most recent `Offer` request to one of the peers.
Expand Down
35 changes: 17 additions & 18 deletions portal-bridge/src/census/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ use tracing::error;

#[derive(Debug, Clone)]
pub struct LivenessCheck {
success: bool,
#[allow(dead_code)]
timestamp: Instant,
pub success: bool,
pub timestamp: Instant,
}

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct OfferEvent {
success: bool,
timestamp: Instant,
content_value_size: usize,
duration: Duration,
pub success: bool,
pub timestamp: Instant,
#[allow(dead_code)]
pub content_value_size: usize,
#[allow(dead_code)]
pub duration: Duration,
Comment on lines +20 to +23
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there plans to use these in the future?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
Idea is to measure bandwidth and prefer peers that are faster than average.

But I didn't want to complicate initial work, and I don't think it works well with current implementation of "Weight" because peers that are working would quickly increase their weight to maximum and this wouldn't make a difference.

}

#[derive(Debug)]
Expand Down Expand Up @@ -57,17 +57,8 @@ impl Peer {
self.enr.clone()
}

/// Returns true if latest liveness check was successful and content is within radius.
/// Returns true if content is within radius.
pub fn is_interested_in_content(&self, content_id: &[u8; 32]) -> bool {
// check that most recent liveness check was successful
if !self
.liveness_checks
.front()
.is_some_and(|liveness_check| liveness_check.success)
{
return false;
}

let distance = XorMetric::distance(&self.enr.node_id().raw(), content_id);
distance <= self.radius
}
Expand Down Expand Up @@ -138,4 +129,12 @@ impl Peer {
self.offer_events.drain(Self::MAX_OFFER_EVENTS..);
}
}

pub fn iter_liveness_checks(&self) -> impl Iterator<Item = &LivenessCheck> {
self.liveness_checks.iter()
}

pub fn iter_offer_events(&self) -> impl Iterator<Item = &OfferEvent> {
self.offer_events.iter()
}
}
34 changes: 14 additions & 20 deletions portal-bridge/src/census/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ use ethportal_api::{
Enr,
};
use futures::Stream;
use rand::seq::IteratorRandom;
use tokio::time::Instant;
use tracing::error;

use super::peer::Peer;
use super::{
peer::Peer,
scoring::{PeerSelector, Weight},
};

/// How frequently liveness check should be done.
///
Expand All @@ -40,23 +42,19 @@ struct PeersWithLivenessChecks {
/// It provides thread safe access to peers and is responsible for deciding when they should be
/// pinged for liveness.
#[derive(Clone, Debug)]
pub(super) struct Peers {
pub(super) struct Peers<W: Weight> {
peers: Arc<RwLock<PeersWithLivenessChecks>>,
selector: PeerSelector<W>,
}

impl Default for Peers {
fn default() -> Self {
Self::new()
}
}

impl Peers {
pub fn new() -> Self {
impl<W: Weight> Peers<W> {
pub fn new(selector: PeerSelector<W>) -> Self {
Self {
peers: Arc::new(RwLock::new(PeersWithLivenessChecks {
peers: HashMap::new(),
liveness_checks: HashSetDelay::new(LIVENESS_CHECK_DELAY),
})),
selector,
}
}

Expand Down Expand Up @@ -121,14 +119,10 @@ impl Peers {
}
}

/// Selects random `limit` peers that should be interested in content.
pub fn get_interested_enrs(&self, content_id: &[u8; 32], limit: usize) -> Vec<Enr> {
self.read()
.peers
.values()
.filter(|peer| peer.is_interested_in_content(content_id))
.map(Peer::enr)
.choose_multiple(&mut rand::thread_rng(), limit)
/// Selects peers to receive content.
pub fn select_peers(&self, content_id: &[u8; 32]) -> Vec<Enr> {
self.selector
.select_peers(content_id, self.read().peers.values())
}

fn read(&self) -> RwLockReadGuard<'_, PeersWithLivenessChecks> {
Expand All @@ -140,7 +134,7 @@ impl Peers {
}
}

impl Stream for Peers {
impl<W: Weight> Stream for Peers<W> {
type Item = Enr;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
115 changes: 115 additions & 0 deletions portal-bridge/src/census/scoring.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use std::time::Duration;

use ethportal_api::Enr;
use itertools::Itertools;
use rand::{seq::SliceRandom, thread_rng};

use super::peer::Peer;

/// A trait for calculating peer's weight.
pub trait Weight: Send + Sync {
fn weight(&self, content_id: &[u8; 32], peer: &Peer) -> u32;

fn weight_all<'a>(
&self,
content_id: &[u8; 32],
peers: impl IntoIterator<Item = &'a Peer>,
) -> impl Iterator<Item = (&'a Peer, u32)> {
peers
.into_iter()
.map(|peer| (peer, self.weight(content_id, peer)))
}
}

/// Calculates peer's weight by adding/subtracting weights of recent events.
///
/// Weight is calculated using following rules:
/// 1. If peer is not interested in content, `0` is returned
/// 2. Weight's starting value is `starting_weight`
/// 3. All recent events (based on `timeframe` parameter) are scored separately:
/// - successful events increase weight by `success_weight`
/// - failed events decrease weight by `failure_weight`
/// 4. Final weight is restricted to `[0, maximum_weight]` range.
#[derive(Debug, Clone)]
pub struct AdditiveWeight {
pub timeframe: Duration,
pub starting_weight: u32,
pub maximum_weight: u32,
pub success_weight: i32,
pub failure_weight: i32,
}

impl Default for AdditiveWeight {
fn default() -> Self {
Self {
timeframe: Duration::from_secs(15 * 60), // 15 min
starting_weight: 200,
maximum_weight: 400,
success_weight: 5,
failure_weight: -10,
}
}
}

impl Weight for AdditiveWeight {
fn weight(&self, content_id: &[u8; 32], peer: &Peer) -> u32 {
if !peer.is_interested_in_content(content_id) {
return 0;
}
let weight = self.starting_weight as i32
+ Iterator::chain(
peer.iter_liveness_checks()
.map(|liveness_check| (liveness_check.success, liveness_check.timestamp)),
peer.iter_offer_events()
.map(|offer_event| (offer_event.success, offer_event.timestamp)),
)
.map(|(success, timestamp)| {
if timestamp.elapsed() > self.timeframe {
return 0;
}
if success {
self.success_weight
} else {
self.failure_weight
}
})
.sum::<i32>();
weight.clamp(0, self.maximum_weight as i32) as u32
}
}

/// Selects peers based on their weight provided by [Weight] trait.
///
/// Selection is done using [SliceRandom::choose_multiple_weighted]. Peers are ranked so that
/// probability of peer A being ranked higher than peer B is proportional to their weights.
/// The top ranked peers are then selected and returned.
#[derive(Debug, Clone)]
pub struct PeerSelector<W: Weight> {
weight: W,
/// The maximum number of peers to select
limit: usize,
}

impl<W: Weight> PeerSelector<W> {
pub fn new(rank: W, limit: usize) -> Self {
Self {
weight: rank,
limit,
}
}

/// Selects up to `self.limit` peers based on their weights.
pub fn select_peers<'a>(
&self,
content_id: &[u8; 32],
peers: impl IntoIterator<Item = &'a Peer>,
) -> Vec<Enr> {
let weighted_peers = self.weight.weight_all(content_id, peers).collect_vec();

weighted_peers
.choose_multiple_weighted(&mut thread_rng(), self.limit, |(_peer, weight)| *weight)
.expect("choosing random sample shouldn't fail")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this fail if there are zero peers?

Copy link
Collaborator Author

@morph-dev morph-dev Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesn't.
That can fail if weight is negative (api allows any type that can be converted to f64), which can't happen in our case because we use u32.

Weight 0 would also be picked, but only if there isn't enough weights that are non-zero, which is what we want.

One thing that we don't differentiate at the moment is that weight can be 0 if:

  • content doesn't fall within radius
  • peer has very negative weight

It might be better to never select peer whose radius doesn't include content. But I left that for future optimization

.map(|(peer, _weight)| peer.enr())
.collect()
}
}
Loading