From d57bb847929c875f116b366ec5178e9f290aa336 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 1 Jul 2024 17:38:16 +1000 Subject: [PATCH 1/2] Fix incorrect inbound request count causing rate limiting. (#6025) Squashed commit of the following: commit b711f09eadbe709f72ae46f93989a90364df310e Author: Jimmy Chen Date: Mon Jul 1 15:12:33 2024 +1000 Fix incorrect inbound request count causing rate limiting. --- beacon_node/lighthouse_network/src/rpc/methods.rs | 4 +--- beacon_node/lighthouse_network/src/rpc/mod.rs | 2 ++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index f7a92ac29f7..4c394629e4c 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -308,9 +308,7 @@ pub struct DataColumnsByRangeRequest { impl DataColumnsByRangeRequest { pub fn max_requested(&self) -> u64 { - self.count - .saturating_mul(E::max_blobs_per_block() as u64) - .saturating_mul(self.columns.len() as u64) + self.count.saturating_mul(self.columns.len() as u64) } pub fn ssz_min_len() -> usize { diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 666cbe6fbcc..c40f976e7a1 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -366,8 +366,10 @@ where protocol, Protocol::BlocksByRange | Protocol::BlobsByRange + | Protocol::DataColumnsByRange | Protocol::BlocksByRoot | Protocol::BlobsByRoot + | Protocol::DataColumnsByRoot ) { debug!(self.log, "Request too large to process"; "request" => %req, "protocol" => %protocol); } else { From 022b848cfb86e0e949aaddb0429a2b6206b1c9ea Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 1 Jul 2024 17:38:41 +1000 Subject: [PATCH 2/2] Avoid spamming blocks by range request until there's available peer on all custody subnets (#6004) Squashed commit of the following: commit cfb3ebcfe7dd497b3dd9a2b12871be17a9b639f4 Author: Jimmy Chen Date: Mon Jul 1 12:49:03 2024 +1000 Add metrics and update code comment. commit 03a9dce58c2d75132df4592f41b9cd8431b2eb29 Author: Jimmy Chen Date: Mon Jul 1 12:29:30 2024 +1000 Fall back to default custody requiremnt if peer ENR is not present. commit 4373a280e89968f15b0d5ccb138c30d0e3e2725a Author: Jimmy Chen Date: Sat Jun 29 01:05:52 2024 +1000 Revert epoch parameter refactor. commit 876ea3b3dbfe70c5949c6a9f63d4202df7f8e32b Author: Jimmy Chen Date: Thu Jun 27 17:21:04 2024 +1000 Add test for `get_custody_peers_for_column` commit de0535599f9a3777d8cdc53848f411b2a6520ae6 Merge: 4079d2e65 7206909fb Author: Jimmy Chen Date: Thu Jun 27 16:06:28 2024 +1000 Merge branch 'das' into custody-sync-peers commit 4079d2e65117aa460bf37d79fb3ab9a9082e1ac0 Author: Jimmy Chen Date: Thu Jun 27 15:35:39 2024 +1000 Fix `good_peers_on_subnet` always returning false for `DataColumnSubnet`. commit 9f20029db961c7ea35af4dcea5fe158222b1ac2e Author: Jimmy Chen Date: Thu Jun 27 14:27:04 2024 +1000 Fix lint and logic error commit 05608b09551fddfcc3c53b7f2a5689491a13529d Author: Jimmy Chen Date: Thu Jun 27 13:51:33 2024 +1000 Add check to cover a case where batch is not processed while waiting for custody peers to become available. commit 9f82497b05a2cf65e5b856d0723786907be9e4a1 Author: Jimmy Chen Date: Thu Jun 27 12:57:06 2024 +1000 Add custody peer check before mutating `BatchInfo` to avoid inconsistent state. commit edc584af2c584b83099c3d8857e7fd47deee72b0 Author: Jimmy Chen Date: Thu Jun 27 10:32:19 2024 +1000 Only start requesting batches when there are good peers across all custody columns to avoid spaming block requests. --- .../src/peer_manager/peerdb.rs | 32 +++++ .../src/peer_manager/peerdb/peer_info.rs | 6 +- .../lighthouse_network/src/types/globals.rs | 123 +++++++++++++++++- beacon_node/network/src/metrics.rs | 5 + .../network/src/sync/network_context.rs | 32 +---- .../network/src/sync/range_sync/chain.rs | 55 +++++++- 6 files changed, 221 insertions(+), 32 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index d0c6710e8a7..10c4cb231aa 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -1,5 +1,7 @@ use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY; use crate::discovery::CombinedKey; +#[cfg(test)] +use crate::EnrExt; use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId}; use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use rand::seq::SliceRandom; @@ -702,6 +704,36 @@ impl PeerDB { ) } + /// Updates the connection state. MUST ONLY BE USED IN TESTS. + #[cfg(test)] + pub(crate) fn __add_connected_peer_enr_testing_only( + &mut self, + enr: Enr, + ) -> Option { + let peer_id = enr.peer_id(); + let new_state = NewConnectionState::Connected { + enr: Some(enr), + seen_address: Multiaddr::empty(), + direction: ConnectionDirection::Outgoing, + }; + self.update_connection_state(&peer_id, new_state) + } + + /// Updates the connection state. MUST ONLY BE USED IN TESTS. + #[cfg(test)] + pub(crate) fn __add_connected_peer_multiaddr_testing_only( + &mut self, + peer_id: &PeerId, + multiaddr: Multiaddr, + ) -> Option { + let new_state = NewConnectionState::Connected { + enr: None, + seen_address: multiaddr, + direction: ConnectionDirection::Outgoing, + }; + self.update_connection_state(peer_id, new_state) + } + /// The connection state of the peer has been changed. Modify the peer in the db to ensure all /// variables are in sync with libp2p. /// Updating the state can lead to a `BanOperation` which needs to be processed via the peer diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index 59053b19292..b0d28cc835e 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -94,8 +94,10 @@ impl PeerInfo { .syncnets() .map_or(false, |s| s.get(**id as usize).unwrap_or(false)) } - // TODO(das) Add data column nets bitfield - Subnet::DataColumn(_) => return false, + Subnet::DataColumn(_) => { + // TODO(das): Pending spec PR https://github.com/ethereum/consensus-specs/pull/3821 + return true; + } } } false diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index b3d37e23104..83ff9be5421 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -5,6 +5,8 @@ use crate::types::{BackFillState, SyncState}; use crate::EnrExt; use crate::{Client, Eth2Enr}; use crate::{Enr, GossipTopic, Multiaddr, PeerId}; +use discv5::handler::NodeContact; +use itertools::Itertools; use parking_lot::RwLock; use std::collections::HashSet; use types::data_column_sidecar::ColumnIndex; @@ -120,6 +122,55 @@ impl NetworkGlobals { .collect() } + /// Compute custody data column subnets the node is assigned to custody. + pub fn custody_subnets(&self, spec: &ChainSpec) -> impl Iterator { + let enr = self.local_enr(); + let node_id = enr.node_id().raw().into(); + let custody_subnet_count = enr.custody_subnet_count::(spec); + DataColumnSubnetId::compute_custody_subnets::(node_id, custody_subnet_count, spec) + } + + pub fn custody_peers_for_column( + &self, + column_index: ColumnIndex, + spec: &ChainSpec, + ) -> Vec { + self.peers + .read() + .connected_peers() + .filter_map(|(peer_id, peer_info)| { + let node_id_and_csc = if let Some(enr) = peer_info.enr() { + let custody_subnet_count = enr.custody_subnet_count::(spec); + Some((enr.node_id(), custody_subnet_count)) + } else if let Some(node_contact) = peer_info + .seen_multiaddrs() + .last() + .cloned() + .and_then(|multiaddr| NodeContact::try_from_multiaddr(multiaddr).ok()) + { + let node_id = node_contact.node_id(); + // TODO(das): Use `custody_subnet_count` from `MetaDataV3` before + // falling back to minimum custody requirement. + Some((node_id, spec.custody_requirement)) + } else { + None + }; + + node_id_and_csc.and_then(|(node_id, custody_subnet_count)| { + // TODO(das): consider caching a map of subnet -> Vec and invalidating + // whenever a peer connected or disconnect event in received + DataColumnSubnetId::compute_custody_columns::( + node_id.raw().into(), + custody_subnet_count, + spec, + ) + .contains(&column_index) + .then_some(*peer_id) + }) + }) + .collect::>() + } + /// TESTING ONLY. Build a dummy NetworkGlobals instance. pub fn new_test_globals(trusted_peers: Vec, log: &slog::Logger) -> NetworkGlobals { use crate::CombinedKeyExt; @@ -142,7 +193,8 @@ impl NetworkGlobals { #[cfg(test)] mod test { - use crate::NetworkGlobals; + use super::*; + use std::str::FromStr; use types::{Epoch, EthSpec, MainnetEthSpec as E}; #[test] @@ -160,4 +212,73 @@ mod test { default_custody_requirement_column_count as usize ); } + + #[test] + fn custody_peers_for_column_enr_present() { + let spec = E::default_spec(); + let log = logging::test_logger(); + let globals = NetworkGlobals::::new_test_globals(vec![], &log); + + let mut peers_db_write_lock = globals.peers.write(); + let valid_enrs = [ + "enr:-Mm4QDJpcg5mZ8EFeYuDcUX78tOTigHLz4_zJlCY7vOTd2-XPPqlAoWM02Us69c4ov85pHgTgeo77Z3_nAhJ4yF1y30Bh2F0dG5ldHOIAAAAAAAAAACDY3NjIIRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR0iXNlY3AyNTZrMaECvF7Y-fD1MEEVQq3y5qW7C8UoTsq6J_tfwvQIJ5fo1TGIc3luY25ldHMAg3RjcIKUc4N1ZHCClHM", + "enr:-Mm4QBw4saycbk-Up2PvppJOv0KzBqgFFHl6_OfFlh8_HxtwWkZpSFgJ0hFV3qOelh_Ai4L9HhSAEJSG48LE8YJ-7WABh2F0dG5ldHOIAAAAAAAAAACDY3NjIIRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR1iXNlY3AyNTZrMaECsRjhgRrAuRWelB9VTTzTa0tHtcWyLTLSReL4RNWhJgGIc3luY25ldHMAg3RjcIKUdIN1ZHCClHQ", + "enr:-Mm4QMFlqbpGrmN21EM-70_hDW9c3MrulhIZElmsP3kb7XSLOEmV7-Msj2jlwGR5C_TicwOXYsZrN6eEIJlGgluM_XgBh2F0dG5ldHOIAAAAAAAAAACDY3NjAYRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR2iXNlY3AyNTZrMaECpAOonvUcYbBX8Tf0ErNPKwJeeidKzJftLTryBZUusMSIc3luY25ldHMAg3RjcIKUdYN1ZHCClHU", + "enr:-Mm4QEHdVjmQ7mH2qIX7_6SDablQUcrZuA4Sxjprh9WGbipfHUjPrELtBaRIRJUrpI8cgJRoAF1wMwoeRS7j3d8xviRGh2F0dG5ldHOIAAAAAAAAAACDY3NjAYRldGgykAHMVa1gAAA4AOH1BQAAAACCaWSCdjSCaXCEiPMgroRxdWljgpR2iXNlY3AyNTZrMaECpAOonvUcYbBX8Tf0ErNPKwJeeidKzJftLTryBZUusMSIc3luY25ldHMAg3RjcIKUdYN1ZHCClHU" + ]; + let peers = valid_enrs + .into_iter() + .map(|enr_str| { + let enr = Enr::from_str(enr_str).unwrap(); + let peer_id = enr.peer_id(); + peers_db_write_lock.__add_connected_peer_enr_testing_only(enr); + peer_id + }) + .collect::>(); + + drop(peers_db_write_lock); + let [supernode_peer_1, supernode_peer_2, _, _] = + peers.try_into().expect("expected exactly 4 peer ids"); + + for col_index in 0..spec.number_of_columns { + let custody_peers = globals.custody_peers_for_column(col_index as ColumnIndex, &spec); + assert!( + custody_peers.contains(&supernode_peer_1), + "must at least return supernode peer" + ); + assert!( + custody_peers.contains(&supernode_peer_2), + "must at least return supernode peer" + ); + } + } + + // If ENR is not preset, fallback to deriving node_id and use `spec.custody_requirement`. + #[test] + fn custody_peers_for_column_no_enr_use_default() { + let spec = E::default_spec(); + let log = logging::test_logger(); + let globals = NetworkGlobals::::new_test_globals(vec![], &log); + + // Add peer without enr + let peer_id_str = "16Uiu2HAm86zWajwnBFD8uxkRpxhRzeUEf6Brfz2VBxGAaWx9ejyr"; + let peer_id = PeerId::from_str(peer_id_str).unwrap(); + let multiaddr = + Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/9000/p2p/{peer_id_str}")).unwrap(); + + let mut peers_db_write_lock = globals.peers.write(); + peers_db_write_lock.__add_connected_peer_multiaddr_testing_only(&peer_id, multiaddr); + drop(peers_db_write_lock); + + let custody_subnets = (0..spec.data_column_sidecar_subnet_count) + .filter(|col_index| { + !globals + .custody_peers_for_column(*col_index, &spec) + .is_empty() + }) + .count(); + + // The single peer's custody subnet should match custody_requirement. + assert_eq!(custody_subnets, spec.custody_requirement as usize); + } } diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 56af9833f1a..dd5e359af37 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -239,6 +239,11 @@ lazy_static! { "Number of connected peers per sync status type", &["sync_status"] ); + pub static ref PEERS_PER_COLUMN_SUBNET: Result = try_create_int_gauge_vec( + "peers_per_column_subnet", + "Number of connected peers per column subnet", + &["subnet_id"] + ); pub static ref SYNCING_CHAINS_COUNT: Result = try_create_int_gauge_vec( "sync_range_chains", "Number of Syncing chains in range, per range type", diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d9b2a9444f6..eec055fd281 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -25,9 +25,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineStat use fnv::FnvHashMap; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; -use lighthouse_network::{ - Client, Eth2Enr, NetworkGlobals, PeerAction, PeerId, ReportSource, Request, -}; +use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; pub use requests::LookupVerifyError; use slog::{debug, error, warn}; use slot_clock::SlotClock; @@ -38,8 +36,7 @@ use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, Hash256, - SignedBeaconBlock, Slot, + BlobSidecar, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot, }; pub mod custody; @@ -240,29 +237,8 @@ impl SyncNetworkContext { // TODO(das): epoch argument left here in case custody rotation is implemented pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec { - let mut peer_ids = vec![]; - - for (peer_id, peer_info) in self.network_globals().peers.read().connected_peers() { - if let Some(enr) = peer_info.enr() { - let custody_subnet_count = enr.custody_subnet_count::(&self.chain.spec); - // TODO(das): consider caching a map of subnet -> Vec and invalidating - // whenever a peer connected or disconnect event in received - let mut subnets = DataColumnSubnetId::compute_custody_subnets::( - enr.node_id().raw().into(), - custody_subnet_count, - &self.chain.spec, - ); - if subnets.any(|subnet| { - subnet - .columns::(&self.chain.spec) - .any(|index| index == column_index) - }) { - peer_ids.push(*peer_id) - } - } - } - - peer_ids + self.network_globals() + .custody_peers_for_column(column_index, &self.chain.spec) } pub fn network_globals(&self) -> &NetworkGlobals { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index b6390f1a07f..071d634a39f 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,4 +1,5 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; +use crate::metrics::PEERS_PER_COLUMN_SUBNET; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::network_context::RangeRequestId; use crate::sync::{ @@ -7,7 +8,8 @@ use crate::sync::{ use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; -use lighthouse_network::{PeerAction, PeerId}; +use lighthouse_metrics::set_int_gauge; +use lighthouse_network::{PeerAction, PeerId, Subnet}; use rand::seq::SliceRandom; use slog::{crit, debug, o, warn}; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; @@ -396,6 +398,11 @@ impl SyncingChain { self.request_batches(network)?; } } + } else if !self.good_peers_on_custody_subnets(self.processing_target, network) { + // This is to handle the case where no batch was sent for the current processing + // target when there is no custody peers available. This is a valid state and should not + // return an error. + return Ok(KeepChain); } else { return Err(RemoveChain::WrongChainState(format!( "Batch not found for current processing target {}", @@ -994,6 +1001,14 @@ impl SyncingChain { // check if we have the batch for our optimistic start. If not, request it first. // We wait for this batch before requesting any other batches. if let Some(epoch) = self.optimistic_start { + if !self.good_peers_on_custody_subnets(epoch, network) { + debug!( + self.log, + "Waiting for peers to be available on custody column subnets" + ); + return Ok(KeepChain); + } + if let Entry::Vacant(entry) = self.batches.entry(epoch) { if let Some(peer) = idle_peers.pop() { let batch_type = network.batch_type(epoch); @@ -1018,6 +1033,35 @@ impl SyncingChain { Ok(KeepChain) } + /// Checks all custody column subnets for peers. Returns `true` if there is at least one peer in + /// every custody column subnet. + fn good_peers_on_custody_subnets(&self, epoch: Epoch, network: &SyncNetworkContext) -> bool { + if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) { + // Require peers on all custody column subnets before sending batches + let peers_on_all_custody_subnets = network + .network_globals() + .custody_subnets(&network.chain.spec) + .all(|subnet_id| { + let peer_count = network + .network_globals() + .peers + .read() + .good_peers_on_subnet(Subnet::DataColumn(subnet_id)) + .count(); + + set_int_gauge( + &PEERS_PER_COLUMN_SUBNET, + &[&subnet_id.to_string()], + peer_count as i64, + ); + peer_count > 0 + }); + peers_on_all_custody_subnets + } else { + true + } + } + /// Creates the next required batch from the chain. If there are no more batches required, /// `false` is returned. fn include_next_batch(&mut self, network: &mut SyncNetworkContext) -> Option { @@ -1048,6 +1092,15 @@ impl SyncingChain { return None; } + // don't send batch requests until we have peers on custody subnets + if !self.good_peers_on_custody_subnets(self.to_be_downloaded, network) { + debug!( + self.log, + "Waiting for peers to be available on custody column subnets" + ); + return None; + } + let batch_id = self.to_be_downloaded; // this batch could have been included already being an optimistic batch match self.batches.entry(batch_id) {