Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

peerdas-devnet-1 testing branch #6023

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY;
use crate::discovery::CombinedKey;
#[cfg(test)]
use crate::EnrExt;
use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId};
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use rand::seq::SliceRandom;
Expand Down Expand Up @@ -702,6 +704,36 @@ impl<E: EthSpec> PeerDB<E> {
)
}

/// Updates the connection state. MUST ONLY BE USED IN TESTS.
#[cfg(test)]
pub(crate) fn __add_connected_peer_enr_testing_only(
&mut self,
enr: Enr,
) -> Option<BanOperation> {
let peer_id = enr.peer_id();
let new_state = NewConnectionState::Connected {
enr: Some(enr),
seen_address: Multiaddr::empty(),
direction: ConnectionDirection::Outgoing,
};
self.update_connection_state(&peer_id, new_state)
}

/// Updates the connection state. MUST ONLY BE USED IN TESTS.
#[cfg(test)]
pub(crate) fn __add_connected_peer_multiaddr_testing_only(
&mut self,
peer_id: &PeerId,
multiaddr: Multiaddr,
) -> Option<BanOperation> {
let new_state = NewConnectionState::Connected {
enr: None,
seen_address: multiaddr,
direction: ConnectionDirection::Outgoing,
};
self.update_connection_state(peer_id, new_state)
}

/// The connection state of the peer has been changed. Modify the peer in the db to ensure all
/// variables are in sync with libp2p.
/// Updating the state can lead to a `BanOperation` which needs to be processed via the peer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ impl<E: EthSpec> PeerInfo<E> {
.syncnets()
.map_or(false, |s| s.get(**id as usize).unwrap_or(false))
}
// TODO(das) Add data column nets bitfield
Subnet::DataColumn(_) => return false,
Subnet::DataColumn(_) => {
// TODO(das): Pending spec PR https://github.com/ethereum/consensus-specs/pull/3821
return true;
}
}
}
false
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,7 @@ pub struct DataColumnsByRangeRequest {

impl DataColumnsByRangeRequest {
pub fn max_requested<E: EthSpec>(&self) -> u64 {
self.count
.saturating_mul(E::max_blobs_per_block() as u64)
.saturating_mul(self.columns.len() as u64)
self.count.saturating_mul(self.columns.len() as u64)
}

pub fn ssz_min_len() -> usize {
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,10 @@ where
protocol,
Protocol::BlocksByRange
| Protocol::BlobsByRange
| Protocol::DataColumnsByRange
| Protocol::BlocksByRoot
| Protocol::BlobsByRoot
| Protocol::DataColumnsByRoot
) {
debug!(self.log, "Request too large to process"; "request" => %req, "protocol" => %protocol);
} else {
Expand Down
123 changes: 122 additions & 1 deletion beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::types::{BackFillState, SyncState};
use crate::EnrExt;
use crate::{Client, Eth2Enr};
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use discv5::handler::NodeContact;
use itertools::Itertools;
use parking_lot::RwLock;
use std::collections::HashSet;
use types::data_column_sidecar::ColumnIndex;
Expand Down Expand Up @@ -120,6 +122,55 @@ impl<E: EthSpec> NetworkGlobals<E> {
.collect()
}

/// Compute custody data column subnets the node is assigned to custody.
pub fn custody_subnets(&self, spec: &ChainSpec) -> impl Iterator<Item = DataColumnSubnetId> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
DataColumnSubnetId::compute_custody_subnets::<E>(node_id, custody_subnet_count, spec)
}

pub fn custody_peers_for_column(
&self,
column_index: ColumnIndex,
spec: &ChainSpec,
) -> Vec<PeerId> {
self.peers
.read()
.connected_peers()
.filter_map(|(peer_id, peer_info)| {
let node_id_and_csc = if let Some(enr) = peer_info.enr() {
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
Some((enr.node_id(), custody_subnet_count))
} else if let Some(node_contact) = peer_info
.seen_multiaddrs()
.last()
.cloned()
.and_then(|multiaddr| NodeContact::try_from_multiaddr(multiaddr).ok())
{
let node_id = node_contact.node_id();
// TODO(das): Use `custody_subnet_count` from `MetaDataV3` before
// falling back to minimum custody requirement.
Some((node_id, spec.custody_requirement))
} else {
None
};

node_id_and_csc.and_then(|(node_id, custody_subnet_count)| {
// TODO(das): consider caching a map of subnet -> Vec<PeerId> and invalidating
// whenever a peer connected or disconnect event in received
DataColumnSubnetId::compute_custody_columns::<E>(
node_id.raw().into(),
custody_subnet_count,
spec,
)
.contains(&column_index)
.then_some(*peer_id)
})
})
.collect::<Vec<_>>()
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(trusted_peers: Vec<PeerId>, log: &slog::Logger) -> NetworkGlobals<E> {
use crate::CombinedKeyExt;
Expand All @@ -142,7 +193,8 @@ impl<E: EthSpec> NetworkGlobals<E> {

#[cfg(test)]
mod test {
use crate::NetworkGlobals;
use super::*;
use std::str::FromStr;
use types::{Epoch, EthSpec, MainnetEthSpec as E};

#[test]
Expand All @@ -160,4 +212,73 @@ mod test {
default_custody_requirement_column_count as usize
);
}

#[test]
fn custody_peers_for_column_enr_present() {
let spec = E::default_spec();
let log = logging::test_logger();
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log);

let mut peers_db_write_lock = globals.peers.write();
let valid_enrs = [
"enr:-Mm4QDJpcg5mZ8EFeYuDcUX78tOTigHLz4_zJlCY7vOTd2-XPPqlAoWM02Us69c4ov85pHgTgeo77Z3_nAhJ4yF1y30Bh2F0dG5ldHOIAAAAAAAAAACDY3NjIIRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR0iXNlY3AyNTZrMaECvF7Y-fD1MEEVQq3y5qW7C8UoTsq6J_tfwvQIJ5fo1TGIc3luY25ldHMAg3RjcIKUc4N1ZHCClHM",
"enr:-Mm4QBw4saycbk-Up2PvppJOv0KzBqgFFHl6_OfFlh8_HxtwWkZpSFgJ0hFV3qOelh_Ai4L9HhSAEJSG48LE8YJ-7WABh2F0dG5ldHOIAAAAAAAAAACDY3NjIIRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR1iXNlY3AyNTZrMaECsRjhgRrAuRWelB9VTTzTa0tHtcWyLTLSReL4RNWhJgGIc3luY25ldHMAg3RjcIKUdIN1ZHCClHQ",
"enr:-Mm4QMFlqbpGrmN21EM-70_hDW9c3MrulhIZElmsP3kb7XSLOEmV7-Msj2jlwGR5C_TicwOXYsZrN6eEIJlGgluM_XgBh2F0dG5ldHOIAAAAAAAAAACDY3NjAYRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR2iXNlY3AyNTZrMaECpAOonvUcYbBX8Tf0ErNPKwJeeidKzJftLTryBZUusMSIc3luY25ldHMAg3RjcIKUdYN1ZHCClHU",
"enr:-Mm4QEHdVjmQ7mH2qIX7_6SDablQUcrZuA4Sxjprh9WGbipfHUjPrELtBaRIRJUrpI8cgJRoAF1wMwoeRS7j3d8xviRGh2F0dG5ldHOIAAAAAAAAAACDY3NjAYRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR2iXNlY3AyNTZrMaECpAOonvUcYbBX8Tf0ErNPKwJeeidKzJftLTryBZUusMSIc3luY25ldHMAg3RjcIKUdYN1ZHCClHU"
];
let peers = valid_enrs
.into_iter()
.map(|enr_str| {
let enr = Enr::from_str(enr_str).unwrap();
let peer_id = enr.peer_id();
peers_db_write_lock.__add_connected_peer_enr_testing_only(enr);
peer_id
})
.collect::<Vec<_>>();

drop(peers_db_write_lock);
let [supernode_peer_1, supernode_peer_2, _, _] =
peers.try_into().expect("expected exactly 4 peer ids");

for col_index in 0..spec.number_of_columns {
let custody_peers = globals.custody_peers_for_column(col_index as ColumnIndex, &spec);
assert!(
custody_peers.contains(&supernode_peer_1),
"must at least return supernode peer"
);
assert!(
custody_peers.contains(&supernode_peer_2),
"must at least return supernode peer"
);
}
}

// If ENR is not preset, fallback to deriving node_id and use `spec.custody_requirement`.
#[test]
fn custody_peers_for_column_no_enr_use_default() {
let spec = E::default_spec();
let log = logging::test_logger();
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log);

// Add peer without enr
let peer_id_str = "16Uiu2HAm86zWajwnBFD8uxkRpxhRzeUEf6Brfz2VBxGAaWx9ejyr";
let peer_id = PeerId::from_str(peer_id_str).unwrap();
let multiaddr =
Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/9000/p2p/{peer_id_str}")).unwrap();

let mut peers_db_write_lock = globals.peers.write();
peers_db_write_lock.__add_connected_peer_multiaddr_testing_only(&peer_id, multiaddr);
drop(peers_db_write_lock);

let custody_subnets = (0..spec.data_column_sidecar_subnet_count)
.filter(|col_index| {
!globals
.custody_peers_for_column(*col_index, &spec)
.is_empty()
})
.count();

// The single peer's custody subnet should match custody_requirement.
assert_eq!(custody_subnets, spec.custody_requirement as usize);
}
}
5 changes: 5 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ lazy_static! {
"Number of connected peers per sync status type",
&["sync_status"]
);
pub static ref PEERS_PER_COLUMN_SUBNET: Result<IntGaugeVec> = try_create_int_gauge_vec(
"peers_per_column_subnet",
"Number of connected peers per column subnet",
&["subnet_id"]
);
pub static ref SYNCING_CHAINS_COUNT: Result<IntGaugeVec> = try_create_int_gauge_vec(
"sync_range_chains",
"Number of Syncing chains in range, per range type",
Expand Down
32 changes: 4 additions & 28 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineStat
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest};
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
use lighthouse_network::{
Client, Eth2Enr, NetworkGlobals, PeerAction, PeerId, ReportSource, Request,
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
pub use requests::LookupVerifyError;
use slog::{debug, error, warn};
use slot_clock::SlotClock;
Expand All @@ -38,8 +36,7 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, Hash256,
SignedBeaconBlock, Slot,
BlobSidecar, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot,
};

pub mod custody;
Expand Down Expand Up @@ -240,29 +237,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {

// TODO(das): epoch argument left here in case custody rotation is implemented
pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec<PeerId> {
let mut peer_ids = vec![];

for (peer_id, peer_info) in self.network_globals().peers.read().connected_peers() {
if let Some(enr) = peer_info.enr() {
let custody_subnet_count = enr.custody_subnet_count::<T::EthSpec>(&self.chain.spec);
// TODO(das): consider caching a map of subnet -> Vec<PeerId> and invalidating
// whenever a peer connected or disconnect event in received
let mut subnets = DataColumnSubnetId::compute_custody_subnets::<T::EthSpec>(
enr.node_id().raw().into(),
custody_subnet_count,
&self.chain.spec,
);
if subnets.any(|subnet| {
subnet
.columns::<T::EthSpec>(&self.chain.spec)
.any(|index| index == column_index)
}) {
peer_ids.push(*peer_id)
}
}
}

peer_ids
self.network_globals()
.custody_peers_for_column(column_index, &self.chain.spec)
}

pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
Expand Down
55 changes: 54 additions & 1 deletion beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use crate::metrics::PEERS_PER_COLUMN_SUBNET;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::network_context::RangeRequestId;
use crate::sync::{
Expand All @@ -7,7 +8,8 @@ use crate::sync::{
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId};
use lighthouse_metrics::set_int_gauge;
use lighthouse_network::{PeerAction, PeerId, Subnet};
use rand::seq::SliceRandom;
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
Expand Down Expand Up @@ -396,6 +398,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.request_batches(network)?;
}
}
} else if !self.good_peers_on_custody_subnets(self.processing_target, network) {
// This is to handle the case where no batch was sent for the current processing
// target when there is no custody peers available. This is a valid state and should not
// return an error.
return Ok(KeepChain);
} else {
return Err(RemoveChain::WrongChainState(format!(
"Batch not found for current processing target {}",
Expand Down Expand Up @@ -994,6 +1001,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// check if we have the batch for our optimistic start. If not, request it first.
// We wait for this batch before requesting any other batches.
if let Some(epoch) = self.optimistic_start {
if !self.good_peers_on_custody_subnets(epoch, network) {
debug!(
self.log,
"Waiting for peers to be available on custody column subnets"
);
return Ok(KeepChain);
}

if let Entry::Vacant(entry) = self.batches.entry(epoch) {
if let Some(peer) = idle_peers.pop() {
let batch_type = network.batch_type(epoch);
Expand All @@ -1018,6 +1033,35 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
Ok(KeepChain)
}

/// Checks all custody column subnets for peers. Returns `true` if there is at least one peer in
/// every custody column subnet.
fn good_peers_on_custody_subnets(&self, epoch: Epoch, network: &SyncNetworkContext<T>) -> bool {
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
// Require peers on all custody column subnets before sending batches
let peers_on_all_custody_subnets = network
.network_globals()
.custody_subnets(&network.chain.spec)
.all(|subnet_id| {
let peer_count = network
.network_globals()
.peers
.read()
.good_peers_on_subnet(Subnet::DataColumn(subnet_id))
.count();

set_int_gauge(
&PEERS_PER_COLUMN_SUBNET,
&[&subnet_id.to_string()],
peer_count as i64,
);
peer_count > 0
});
peers_on_all_custody_subnets
} else {
true
}
}

/// Creates the next required batch from the chain. If there are no more batches required,
/// `false` is returned.
fn include_next_batch(&mut self, network: &mut SyncNetworkContext<T>) -> Option<BatchId> {
Expand Down Expand Up @@ -1048,6 +1092,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}

// don't send batch requests until we have peers on custody subnets
if !self.good_peers_on_custody_subnets(self.to_be_downloaded, network) {
debug!(
self.log,
"Waiting for peers to be available on custody column subnets"
);
return None;
}

let batch_id = self.to_be_downloaded;
// this batch could have been included already being an optimistic batch
match self.batches.entry(batch_id) {
Expand Down
Loading