Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(census): keep track of liveness checks #1575

Merged
merged 2 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions portal-bridge/src/census/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::cli::BridgeConfig;
use network::{Network, NetworkAction, NetworkInitializationConfig, NetworkManager};

mod network;
mod peer;
mod peers;

/// The error that occured in [Census].
Expand All @@ -32,6 +33,7 @@ pub const ENR_OFFER_LIMIT: usize = 4;
/// The census is responsible for maintaining a list of known peers in the network,
/// checking their liveness, updating their data radius, iterating through their
/// rfn to find new peers, and providing interested enrs for a given content id.
#[derive(Clone)]
pub struct Census {
history: Network,
state: Network,
Expand Down
94 changes: 41 additions & 53 deletions portal-bridge/src/census/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ use crate::{

use super::peers::Peers;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// The result of the liveness check.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum LivenessResult {
/// We pinged the peer successfully
Pass,
/// We failed to ping peer
Fail,
/// Peer is already registered and not expired (we didn't try to ping the peer)
AlreadyRegistered,
/// Peer is already known and doesn't need liveness check
Fresh,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -98,7 +98,7 @@ impl Network {
NetworkManager::new(self.clone())
}

// Look up all known interested enrs for a given content id
/// Look up interested enrs for a given content id
pub fn get_interested_enrs(&self, content_id: &[u8; 32]) -> Result<Vec<Enr>, CensusError> {
if self.peers.is_empty() {
error!(
Expand Down Expand Up @@ -174,8 +174,8 @@ impl Network {
.collect_vec();

// Concurrent execution of liveness check
let new_peers = enrs
.iter()
let starting_peers = self.peers.len() as f64;
enrs.iter()
.map(|enr| async {
if let Ok(_permit) = semaphore.acquire().await {
self.liveness_check(enr.clone()).await
Expand All @@ -188,20 +188,17 @@ impl Network {
}
})
.collect::<JoinAll<_>>()
.await
.into_iter()
.filter(|liveness_result| liveness_result == &LivenessResult::Pass)
.count();

let total_peers = self.peers.len();
.await;
let ending_peers = self.peers.len() as f64;
let new_peers = ending_peers - starting_peers;

debug!(
subnetwork = %self.subnetwork,
"init: added {new_peers} / {total_peers} peers",
"init: added {new_peers} / {ending_peers} peers",
);

// Stop if number of new peers is less than a threshold fraction of all peers
if (new_peers as f64) < (total_peers as f64) * config.stop_fraction_threshold {
if new_peers < ending_peers * config.stop_fraction_threshold {
break;
}
}
Expand All @@ -225,17 +222,19 @@ impl Network {
/// Performs liveness check.
///
/// Liveness check will pass if peer respond to a Ping request. It returns
/// `LivenessResult::AlreadyRegistered` if peer is already registered and not expired.
/// `LivenessResult::Fresh` if peer is already known and doesn't need liveness check.
async fn liveness_check(&self, enr: Enr) -> LivenessResult {
// if enr is already registered, check if delay map deadline has expired
if let Some(deadline) = self.peers.deadline(&enr) {
if Instant::now() < deadline {
return LivenessResult::AlreadyRegistered;
}
// check if peer needs liveness check
if self
.peers
.next_liveness_check(&enr.node_id())
.is_some_and(|next_liveness_check| Instant::now() < next_liveness_check)
{
return LivenessResult::Fresh;
}

let Ok(pong_info) = self.ping(&enr).await else {
self.peers.record_failed_liveness_check(&enr);
self.peers.record_failed_liveness_check(enr);
return LivenessResult::Fail;
};

Expand All @@ -244,7 +243,7 @@ impl Network {
// If ENR seq is not the latest one, fetch fresh ENR
let enr = if enr.seq() < pong_info.enr_seq {
let Ok(enr) = self.fetch_enr(&enr).await else {
self.peers.record_failed_liveness_check(&enr);
self.peers.record_failed_liveness_check(enr);
return LivenessResult::Fail;
};
enr
Expand Down Expand Up @@ -367,29 +366,21 @@ impl NetworkManager {

/// Returns next action that should be executed.
pub async fn next_action(&mut self) -> NetworkAction {
loop {
tokio::select! {
_ = self.peer_discovery_interval.tick() => {
return NetworkAction::PeerDiscovery;
}
peer = self.network.peers.next() => {
match peer {
Some(Ok(enr)) => {
return NetworkAction::LivenessCheck(enr);
}
Some(Err(err)) => {
error!(
subnetwork = %self.network.subnetwork,
"next-action: error getting peer - err: {err}",
);
}
None => {
warn!(
subnetwork = %self.network.subnetwork,
"next-action: no pending peers - re-initializing",
);
return NetworkAction::ReInitialization;
}
tokio::select! {
_ = self.peer_discovery_interval.tick() => {
NetworkAction::PeerDiscovery
}
peer = self.network.peers.next() => {
match peer {
Some(enr) => {
NetworkAction::LivenessCheck(enr)
}
None => {
warn!(
subnetwork = %self.network.subnetwork,
"next-action: no pending peers - re-initializing",
);
NetworkAction::ReInitialization
}
}
}
Expand All @@ -415,7 +406,7 @@ impl NetworkManager {
}
NetworkAction::PeerDiscovery => self.peer_discovery().await,
NetworkAction::LivenessCheck(enr) => {
if self.network.liveness_check(enr).await == LivenessResult::AlreadyRegistered {
if self.network.liveness_check(enr).await == LivenessResult::Fresh {
warn!(
subnetwork = %self.network.subnetwork,
"execute-action: liveness check on already registered peer",
Expand All @@ -438,17 +429,14 @@ impl NetworkManager {
}
};

let mut new_peers = 0;
let starting_peers = self.network.peers.len();
for enr in enrs {
if self.network.liveness_check(enr).await == LivenessResult::Pass {
new_peers += 1;
}
self.network.liveness_check(enr).await;
}

let total_peers = self.network.peers.len();
let ending_peers = self.network.peers.len();
info!(
subnetwork = %self.network.subnetwork,
"peer-discovery: finished - discovered {new_peers} / {total_peers} peers",
"peer-discovery: finished - peers: {starting_peers} -> {ending_peers}",
);
}
}
116 changes: 116 additions & 0 deletions portal-bridge/src/census/peer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use std::{
collections::VecDeque,
time::{Duration, Instant},
};

use discv5::Enr;
use ethportal_api::types::distance::{Distance, Metric, XorMetric};
use tracing::error;

#[derive(Debug, Clone)]
pub struct LivenessCheck {
success: bool,
#[allow(dead_code)]
timestamp: Instant,
}

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct OfferEvent {
success: bool,
timestamp: Instant,
content_value_size: usize,
duration: Duration,
}

#[derive(Debug)]
/// Stores information about peer and its most recent interactions.
pub struct Peer {
enr: Enr,
radius: Distance,
/// Liveness checks, ordered from most recent (index `0`), to the earliest.
///
/// Contains at most [Self::MAX_LIVENESS_CHECKS] entries.
liveness_checks: VecDeque<LivenessCheck>,
}

impl Peer {
/// The maximum number of liveness checks that we store. Value chosen arbitrarily.
const MAX_LIVENESS_CHECKS: usize = 10;

pub fn new(enr: Enr) -> Self {
Self {
enr,
radius: Distance::ZERO,
liveness_checks: VecDeque::with_capacity(Self::MAX_LIVENESS_CHECKS + 1),
}
}

pub fn enr(&self) -> Enr {
self.enr.clone()
}

/// Returns true if latest liveness check was successful and content is within radius.
pub fn is_interested_in_content(&self, content_id: &[u8; 32]) -> bool {
// check that most recent liveness check was successful
if !self
.liveness_checks
.front()
.is_some_and(|liveness_check| liveness_check.success)
{
return false;
}

let distance = XorMetric::distance(&self.enr.node_id().raw(), content_id);
distance <= self.radius
}

/// Returns true if all latest [Self::MAX_LIVENESS_CHECKS] liveness checks failed.
pub fn is_obsolete(&self) -> bool {
if self.liveness_checks.len() < Self::MAX_LIVENESS_CHECKS {
return false;
}
self.liveness_checks
.iter()
.all(|liveness_check| !liveness_check.success)
}

pub fn record_successful_liveness_check(&mut self, enr: Enr, radius: Distance) {
assert_eq!(
self.enr.node_id(),
enr.node_id(),
"Received enr for different peer. Expected node-id: {}, received enr: {enr}",
self.enr.node_id(),
);

if self.enr.seq() > enr.seq() {
error!(
"successful_liveness_check: received outdated enr: {enr} (existing enr: {})",
self.enr.seq()
);
} else {
self.enr = enr;
}
self.radius = radius;
self.liveness_checks.push_front(LivenessCheck {
success: true,
timestamp: Instant::now(),
});
self.purge();
}

pub fn record_failed_liveness_check(&mut self) {
self.liveness_checks.push_front(LivenessCheck {
success: false,
timestamp: Instant::now(),
});
self.purge();
}

/// Removes oldest liveness checks and offer events, if we exceeded capacity.
fn purge(&mut self) {
if self.liveness_checks.len() > Self::MAX_LIVENESS_CHECKS {
self.liveness_checks.drain(Self::MAX_LIVENESS_CHECKS..);
}
}
}
Loading
Loading