-
Notifications
You must be signed in to change notification settings - Fork 148
refactor: split census into multiple types #1510
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,15 @@ | ||
use ethportal_api::{ | ||
jsonrpsee::http_client::HttpClient, types::network::Subnetwork, BeaconContentKey, Enr, | ||
HistoryContentKey, OverlayContentKey, StateContentKey, | ||
}; | ||
use futures::{channel::oneshot, StreamExt}; | ||
use std::collections::HashSet; | ||
|
||
use ethportal_api::{jsonrpsee::http_client::HttpClient, types::network::Subnetwork, Enr}; | ||
use thiserror::Error; | ||
use tokio::sync::mpsc; | ||
use tracing::{error, info}; | ||
use tokio::task::JoinHandle; | ||
use tracing::{error, info, Instrument}; | ||
|
||
use crate::cli::BridgeConfig; | ||
use network::Network; | ||
|
||
mod network; | ||
mod peers; | ||
|
||
/// The error that occured in [Census]. | ||
#[derive(Error, Debug)] | ||
|
@@ -19,20 +18,10 @@ pub enum CensusError { | |
NoPeers, | ||
#[error("Failed to initialize Census")] | ||
FailedInitialization, | ||
} | ||
|
||
/// The request for ENRs that should be offered the content. | ||
pub struct EnrsRequest { | ||
pub content_key: ContentKey, | ||
pub resp_tx: oneshot::Sender<Vec<Enr>>, | ||
} | ||
|
||
/// The enum for network specific content key | ||
#[derive(Debug, Clone)] | ||
pub enum ContentKey { | ||
History(HistoryContentKey), | ||
State(StateContentKey), | ||
Beacon(BeaconContentKey), | ||
#[error("Subnetwork {0} is not supported")] | ||
UnsupportedSubnetwork(Subnetwork), | ||
#[error("Census already initialized")] | ||
AlreadyInitialized, | ||
} | ||
|
||
/// The maximum number of enrs to return in a response, | ||
|
@@ -42,91 +31,87 @@ pub const ENR_OFFER_LIMIT: usize = 4; | |
|
||
/// The census is responsible for maintaining a list of known peers in the network, | ||
/// checking their liveness, updating their data radius, iterating through their | ||
/// rfn to find new peers, and providing interested enrs for a given content key. | ||
/// rfn to find new peers, and providing interested enrs for a given content id. | ||
pub struct Census { | ||
history: Network, | ||
state: Network, | ||
beacon: Network, | ||
census_rx: mpsc::UnboundedReceiver<EnrsRequest>, | ||
initialized: bool, | ||
} | ||
|
||
impl Census { | ||
pub fn new( | ||
client: HttpClient, | ||
census_rx: mpsc::UnboundedReceiver<EnrsRequest>, | ||
bridge_config: &BridgeConfig, | ||
) -> Self { | ||
pub fn new(client: HttpClient, bridge_config: &BridgeConfig) -> Self { | ||
Self { | ||
history: Network::new(client.clone(), Subnetwork::History, bridge_config), | ||
state: Network::new(client.clone(), Subnetwork::State, bridge_config), | ||
beacon: Network::new(client.clone(), Subnetwork::Beacon, bridge_config), | ||
census_rx, | ||
initialized: false, | ||
} | ||
} | ||
} | ||
|
||
impl Census { | ||
pub async fn init(&mut self) -> Result<(), CensusError> { | ||
// currently, the census is only initialized for the state network | ||
// only initialized networks will yield inside `run()` loop | ||
self.state.init().await; | ||
if self.state.peers.is_empty() { | ||
return Err(CensusError::FailedInitialization); | ||
/// Returns ENRs interested into provided content id. | ||
pub fn get_interested_enrs( | ||
&self, | ||
subnetwork: Subnetwork, | ||
content_id: &[u8; 32], | ||
) -> Result<Vec<Enr>, CensusError> { | ||
match subnetwork { | ||
Subnetwork::History => self.history.get_interested_enrs(content_id), | ||
Subnetwork::State => self.state.get_interested_enrs(content_id), | ||
Subnetwork::Beacon => self.beacon.get_interested_enrs(content_id), | ||
_ => Err(CensusError::UnsupportedSubnetwork(subnetwork)), | ||
} | ||
Ok(()) | ||
} | ||
|
||
pub async fn run(&mut self) { | ||
loop { | ||
// Randomly selects between what available task is ready | ||
// and executes it. Ensures that the census will continue | ||
// to update while it handles a stream of enr requests. | ||
tokio::select! { | ||
// handle enrs request | ||
Some(request) = self.census_rx.recv() => { | ||
match self.get_interested_enrs(request.content_key) { | ||
Ok(enrs) => { | ||
if let Err(err) = request.resp_tx.send(enrs) { | ||
error!("Error sending enrs response: {err:?}"); | ||
} | ||
} | ||
Err(_) => { | ||
error!("No peers found in census, restarting initialization."); | ||
self.state.init().await; | ||
if let Err(err) = request.resp_tx.send(Vec::new()) { | ||
error!("Error sending enrs response: {err:?}"); | ||
} | ||
} | ||
} | ||
} | ||
Some(Ok(known_enr)) = self.history.peers.next() => { | ||
self.history.process_enr(known_enr.1.0).await; | ||
info!("Updated history census: found peers: {}", self.history.peers.len()); | ||
} | ||
// yield next known state peer and ping for liveness | ||
Some(Ok(known_enr)) = self.state.peers.next() => { | ||
self.state.process_enr(known_enr.1.0).await; | ||
info!("Updated state census: found peers: {}", self.state.peers.len()); | ||
} | ||
Some(Ok(known_enr)) = self.beacon.peers.next() => { | ||
self.beacon.process_enr(known_enr.1.0).await; | ||
info!("Updated beacon census: found peers: {}", self.beacon.peers.len()); | ||
} | ||
/// Initialize subnetworks and starts background service that will keep our view of the network | ||
/// up to date. | ||
/// | ||
/// Returns JoinHandle of the background service. | ||
pub async fn init( | ||
&mut self, | ||
subnetworks: impl IntoIterator<Item = Subnetwork>, | ||
) -> Result<JoinHandle<()>, CensusError> { | ||
if self.initialized { | ||
return Err(CensusError::AlreadyInitialized); | ||
} | ||
self.initialized = true; | ||
|
||
let subnetworks = HashSet::from_iter(subnetworks); | ||
for subnetwork in &subnetworks { | ||
info!("Initializing {subnetwork} subnetwork"); | ||
match subnetwork { | ||
Subnetwork::History => self.history.init().await?, | ||
Subnetwork::State => self.state.init().await?, | ||
Subnetwork::Beacon => self.beacon.init().await?, | ||
_ => return Err(CensusError::UnsupportedSubnetwork(*subnetwork)), | ||
} | ||
} | ||
|
||
Ok(self.start_background_service(subnetworks)) | ||
} | ||
|
||
pub fn get_interested_enrs(&self, content_key: ContentKey) -> Result<Vec<Enr>, CensusError> { | ||
match content_key { | ||
ContentKey::History(content_key) => { | ||
self.history.get_interested_enrs(content_key.content_id()) | ||
} | ||
ContentKey::State(content_key) => { | ||
self.state.get_interested_enrs(content_key.content_id()) | ||
} | ||
ContentKey::Beacon(content_key) => { | ||
self.beacon.get_interested_enrs(content_key.content_id()) | ||
/// Starts background service that is responsible for keeping view of the network up to date. | ||
/// | ||
/// Selects available tasks and runs them. Tasks are provided by enabled subnetworks. | ||
fn start_background_service(&self, subnetworks: HashSet<Subnetwork>) -> JoinHandle<()> { | ||
let mut history_network = self.history.clone(); | ||
let mut state_network = self.state.clone(); | ||
let mut beacon_network = self.beacon.clone(); | ||
let service = async move { | ||
loop { | ||
tokio::select! { | ||
peer = history_network.peer_to_process(), if subnetworks.contains(&Subnetwork::History) => { | ||
history_network.process_peer(peer).await; | ||
} | ||
peer = state_network.peer_to_process(), if subnetworks.contains(&Subnetwork::State) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit is it worth adding an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there might be misunderstanding. The The The original implementation was using selectors like this:
(basically ignoring those I believe that re-initialization should be detected and done as part of the regular census maintenance (and not be triggered by I'm not sure if my explanation is clear. I think good understanding of what |
||
state_network.process_peer(peer).await; | ||
} | ||
peer = beacon_network.peer_to_process(), if subnetworks.contains(&Subnetwork::Beacon) => { | ||
beacon_network.process_peer(peer).await; | ||
} | ||
} | ||
} | ||
} | ||
}; | ||
tokio::spawn(service.instrument(tracing::trace_span!("census").or_current())) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.