Skip to content

Commit

Permalink
Transition block lookup sync to range sync (#6122)
Browse files Browse the repository at this point in the history
* Transition block lookup sync to range sync

* Log unexpected state

* Merge remote-tracking branch 'sigp/unstable' into lookup-to-range

* Add docs

* Merge remote-tracking branch 'sigp/unstable' into lookup-to-range
  • Loading branch information
dapplion authored Oct 8, 2024
1 parent 48dd3f3 commit 71c5388
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 30 deletions.
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2790,12 +2790,12 @@ pub fn build_log(level: slog::Level, logger_type: LoggerType) -> Logger {
match logger_type {
LoggerType::Test => {
let drain = FullFormat::new(TermDecorator::new().build()).build().fuse();
let drain = Async::new(drain).build().fuse();
let drain = Async::new(drain).chan_size(10_000).build().fuse();
Logger::root(drain.filter_level(level).fuse(), o!())
}
LoggerType::CI => {
let drain = FullFormat::new(ci_decorator()).build().fuse();
let drain = Async::new(drain).build().fuse();
let drain = Async::new(drain).chan_size(10_000).build().fuse();
Logger::root(drain.filter_level(level).fuse(), o!())
}
LoggerType::Null => {
Expand Down
5 changes: 3 additions & 2 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
use std::time::Duration;
use store::MemoryStore;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{self, error::TrySendError};
use types::*;

Expand Down Expand Up @@ -831,7 +832,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// Send a message to `sync_tx`.
///
/// Creates a log if there is an internal error.
fn send_sync_message(&self, message: SyncMessage<T::EthSpec>) {
pub(crate) fn send_sync_message(&self, message: SyncMessage<T::EthSpec>) {
self.sync_tx.send(message).unwrap_or_else(|e| {
debug!(self.log, "Could not send message to the sync service";
"error" => %e)
Expand Down Expand Up @@ -859,6 +860,7 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
// processor (but not much else).
pub fn null_for_testing(
network_globals: Arc<NetworkGlobals<E>>,
sync_tx: UnboundedSender<SyncMessage<E>>,
chain: Arc<BeaconChain<TestBeaconChainType<E>>>,
executor: TaskExecutor,
log: Logger,
Expand All @@ -871,7 +873,6 @@ impl<E: EthSpec> NetworkBeaconProcessor<TestBeaconChainType<E>> {
} = <_>::default();

let (network_tx, _network_rx) = mpsc::unbounded_channel();
let (sync_tx, _sync_rx) = mpsc::unbounded_channel();

let network_beacon_processor = Self {
beacon_processor_send: beacon_processor_tx,
Expand Down
67 changes: 54 additions & 13 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext};
use crate::metrics;
use crate::sync::block_lookups::common::ResponseType;
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use crate::sync::SyncMessage;
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_availability_checker::{
AvailabilityCheckError, AvailabilityCheckErrorCategory,
Expand Down Expand Up @@ -55,7 +56,10 @@ mod tests;
/// The maximum depth we will search for a parent block. In principle we should have sync'd any
/// canonical chain to its head once the peer connects. A chain should not appear where it's depth
/// is further back than the most recent head slot.
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;
///
/// Have the same value as range's sync tolerance to consider a peer synced. Once sync lookup
/// reaches the maximum depth it will force trigger range sync.
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE;

const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4;
Expand Down Expand Up @@ -254,22 +258,59 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// blocks on top of A forming A -> C. The malicious peer forces us to fetch C
// from it, which will result in parent A hitting the chain_too_long error. Then
// the valid chain A -> B is dropped too.
if let Ok(block_to_drop) = find_oldest_fork_ancestor(parent_chains, chain_idx) {
// Drop all lookups descending from the child of the too long parent chain
if let Some((lookup_id, lookup)) = self
//
// `find_oldest_fork_ancestor` should never return Err, unwrapping to tip for
// complete-ness
let parent_chain_tip = parent_chain.tip;
let block_to_drop =
find_oldest_fork_ancestor(parent_chains, chain_idx).unwrap_or(parent_chain_tip);
// Drop all lookups descending from the child of the too long parent chain
if let Some((lookup_id, lookup)) = self
.single_block_lookups
.iter()
.find(|(_, l)| l.block_root() == block_to_drop)
{
// If a lookup chain is too long, we can't distinguish a valid chain from a
// malicious one. We must attempt to sync this chain to not lose liveness. If
// the chain grows too long, we stop lookup sync and transition this head to
// forward range sync. We need to tell range sync which head to sync to, and
// from which peers. The lookup of the very tip of this chain may contain zero
// peers if it's the parent-child lookup. So we do a bit of a trick here:
// - Tell range sync to sync to the tip's root (if available, else its ancestor)
// - But use all peers in the ancestor lookup, which should have at least one
// peer, and its peer set is a strict superset of the tip's lookup.
if let Some((_, tip_lookup)) = self
.single_block_lookups
.iter()
.find(|(_, l)| l.block_root() == block_to_drop)
.find(|(_, l)| l.block_root() == parent_chain_tip)
{
for &peer_id in lookup.all_peers() {
cx.report_peer(
peer_id,
PeerAction::LowToleranceError,
"chain_too_long",
);
}
self.drop_lookup_and_children(*lookup_id);
cx.send_sync_message(SyncMessage::AddPeersForceRangeSync {
peers: lookup.all_peers().copied().collect(),
head_slot: tip_lookup.peek_downloaded_block_slot(),
head_root: parent_chain_tip,
});
} else {
// Should never happen, log error and continue the lookup drop
error!(self.log, "Unable to transition lookup to range sync";
"error" => "Parent chain tip lookup not found",
"block_root" => ?parent_chain_tip
);
}

// Do not downscore peers here. Because we can't distinguish a valid chain from
// a malicious one we may penalize honest peers for attempting to discover us a
// valid chain. Until blocks_by_range allows to specify a tip, for example with
// https://github.com/ethereum/consensus-specs/pull/3845 we will have poor
// attributability. A peer can send us garbage blocks over blocks_by_root, and
// then correct blocks via blocks_by_range.

self.drop_lookup_and_children(*lookup_id);
} else {
// Should never happen
error!(self.log, "Unable to transition lookup to range sync";
"error" => "Block to drop lookup not found",
"block_root" => ?block_to_drop
);
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::time::{Duration, Instant};
use store::Hash256;
use strum::IntoStaticStr;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock};
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot};

// Dedicated enum for LookupResult to force its usage
#[must_use = "LookupResult must be handled with on_lookup_result"]
Expand Down Expand Up @@ -91,6 +91,14 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
}

/// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing`
pub fn peek_downloaded_block_slot(&self) -> Option<Slot> {
self.block_request_state
.state
.peek_downloaded_data()
.map(|block| block.slot())
}

/// Get the block root that is being requested.
pub fn block_root(&self) -> Hash256 {
self.block_root
Expand Down
48 changes: 43 additions & 5 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::sync::manager::{BlockProcessType, SyncManager};
use crate::sync::peer_sampling::SamplingConfig;
use crate::sync::range_sync::RangeSyncType;
use crate::sync::{SamplingId, SyncMessage};
use crate::NetworkMessage;
use std::sync::Arc;
Expand Down Expand Up @@ -78,6 +79,8 @@ struct TestRig {
network_rx: mpsc::UnboundedReceiver<NetworkMessage<E>>,
/// Stores all `NetworkMessage`s received from `network_recv`. (e.g. outgoing RPC requests)
network_rx_queue: Vec<NetworkMessage<E>>,
/// Receiver for `SyncMessage` from the network
sync_rx: mpsc::UnboundedReceiver<SyncMessage<E>>,
/// To send `SyncMessage`. For sending RPC responses or block processing results to sync.
sync_manager: SyncManager<T>,
/// To manipulate sync state and peer connection status
Expand Down Expand Up @@ -137,6 +140,7 @@ impl TestRig {
let chain = harness.chain.clone();

let (network_tx, network_rx) = mpsc::unbounded_channel();
let (sync_tx, sync_rx) = mpsc::unbounded_channel::<SyncMessage<E>>();
// TODO(das): make the generation of the ENR use the deterministic rng to have consistent
// column assignments
let network_config = Arc::new(NetworkConfig::default());
Expand All @@ -148,13 +152,12 @@ impl TestRig {
));
let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing(
globals,
sync_tx,
chain.clone(),
harness.runtime.task_executor.clone(),
log.clone(),
);

let (_sync_send, sync_recv) = mpsc::unbounded_channel::<SyncMessage<E>>();

let fork_name = chain.spec.fork_name_at_slot::<E>(chain.slot().unwrap());

// All current tests expect synced and EL online state
Expand All @@ -168,13 +171,15 @@ impl TestRig {
beacon_processor_rx_queue: vec![],
network_rx,
network_rx_queue: vec![],
sync_rx,
rng,
network_globals: beacon_processor.network_globals.clone(),
sync_manager: SyncManager::new(
chain,
network_tx,
beacon_processor.into(),
sync_recv,
// Pass empty recv not tied to any tx
mpsc::unbounded_channel().1,
SamplingConfig::Custom {
required_successes: vec![SAMPLING_REQUIRED_SUCCESSES],
},
Expand Down Expand Up @@ -237,6 +242,13 @@ impl TestRig {
self.send_sync_message(SyncMessage::SampleBlock(block_root, block_slot))
}

/// Drain all sync messages in the sync_rx attached to the beacon processor
fn drain_sync_rx(&mut self) {
while let Ok(sync_message) = self.sync_rx.try_recv() {
self.send_sync_message(sync_message);
}
}

fn rand_block(&mut self) -> SignedBeaconBlock<E> {
self.rand_block_and_blobs(NumBlobs::None).0
}
Expand Down Expand Up @@ -293,6 +305,10 @@ impl TestRig {
self.sync_manager.active_parent_lookups().len()
}

fn active_range_sync_chain(&self) -> (RangeSyncType, Slot, Slot) {
self.sync_manager.get_range_sync_chains().unwrap().unwrap()
}

fn assert_single_lookups_count(&self, count: usize) {
assert_eq!(
self.active_single_lookups_count(),
Expand Down Expand Up @@ -1696,7 +1712,18 @@ fn test_parent_lookup_too_deep_grow_ancestor() {
)
}

rig.expect_penalty(peer_id, "chain_too_long");
// Should create a new syncing chain
rig.drain_sync_rx();
assert_eq!(
rig.active_range_sync_chain(),
(
RangeSyncType::Head,
Slot::new(0),
Slot::new(PARENT_DEPTH_TOLERANCE as u64 - 1)
)
);
// Should not penalize peer, but network is not clear because of the blocks_by_range requests
rig.expect_no_penalty_for(peer_id);
rig.assert_failed_chain(chain_hash);
}

Expand All @@ -1723,7 +1750,18 @@ fn test_parent_lookup_too_deep_grow_tip() {
);
}

rig.expect_penalty(peer_id, "chain_too_long");
// Should create a new syncing chain
rig.drain_sync_rx();
assert_eq!(
rig.active_range_sync_chain(),
(
RangeSyncType::Head,
Slot::new(0),
Slot::new(PARENT_DEPTH_TOLERANCE as u64 - 2)
)
);
// Should not penalize peer, but network is not clear because of the blocks_by_range requests
rig.expect_no_penalty_for(peer_id);
rig.assert_failed_chain(tip.canonical_root());
}

Expand Down
Loading

0 comments on commit 71c5388

Please sign in to comment.