diff --git a/bin/ream/src/main.rs b/bin/ream/src/main.rs index 699d24a3b..0e2abd4d1 100644 --- a/bin/ream/src/main.rs +++ b/bin/ream/src/main.rs @@ -1428,11 +1428,22 @@ mod tests { head_state_1.slot, head_state_1.latest_finalized.slot ); + let head_slot_delta = head_state.slot.abs_diff(head_state_1.slot); + let finalized_slot_lag = head_state_1 + .latest_finalized + .slot + .saturating_sub(head_state.latest_finalized.slot); assert!( - head_state.slot == head_state_1.slot, - "Node 3 is too far behind Node 1. Node 3: {}, Node 1: {}", - head_state.slot, - head_state_1.slot + head_slot_delta <= 2, + "Node 3 head diverged too much from Node 1. Node 3: {node_3_slot}, Node 1: {node_1_slot}, delta: {head_slot_delta}", + node_3_slot = head_state.slot, + node_1_slot = head_state_1.slot + ); + assert!( + finalized_slot_lag <= 4, + "Node 3 finalized slot lagged too far behind Node 1. Node 3: {node_3_finalized}, Node 1: {node_1_finalized}, lag: {finalized_slot_lag}", + node_3_finalized = head_state.latest_finalized.slot, + node_1_finalized = head_state_1.latest_finalized.slot ); }); } diff --git a/crates/common/chain/lean/src/service.rs b/crates/common/chain/lean/src/service.rs index 3e2b16fea..6de7852f0 100644 --- a/crates/common/chain/lean/src/service.rs +++ b/crates/common/chain/lean/src/service.rs @@ -1,8 +1,8 @@ use std::{ - collections::HashSet, + collections::{HashMap, HashSet, VecDeque}, pin::Pin, sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use anyhow::anyhow; @@ -41,10 +41,78 @@ use crate::{ SyncStatus, forward_background_syncer::{ForwardBackgroundSyncer, ForwardSyncResults}, job::{pending::PendingJobRequest, request::JobRequest}, + strategy::{ + BackfillTimeoutStrategy, HandoffInputs, HandoffStrategy, NearHeadBackfillStrategy, + NearHeadFanoutStrategy, PeerSelectionStrategy, PendingRequestDedupStrategy, + should_fanout_near_head, should_switch_to_synced, + }, }, }; const STATE_RETENTION_SLOTS: u64 = 128; +const NEAR_HEAD_BRIDGE_MAX_GAP_SLOTS: u64 = 3; +const NEAR_HEAD_FANOUT_MAX_GAP_SLOTS: u64 = 4; +const RECENT_SYNC_BLOCK_RETENTION: Duration = Duration::from_secs(16); +const BACKFILL_PROGRESS_LOG_INTERVAL: Duration = Duration::from_secs(2); +const BACKFILL_HEDGE_DELAY: Duration = Duration::from_millis(250); +const FINALITY_SYNC_MAX_LAG_SLOTS: u64 = 4; + +fn should_queue_peer_checkpoint(checkpoint: Checkpoint, local_head_slot: u64) -> bool { + checkpoint.root != alloy_primitives::B256::ZERO && checkpoint.slot > local_head_slot +} + +fn is_genesis_anchor(slot: u64, parent_root: alloy_primitives::B256) -> bool { + slot == 0 && parent_root == alloy_primitives::B256::ZERO +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum SyncBlockSource { + ReqResp, + Gossip, +} + +#[derive(Debug, Clone, Copy)] +struct RecentSyncBlock { + parent_root: alloy_primitives::B256, + slot: u64, + seen_at: Instant, + source: SyncBlockSource, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum CallbackLossMode { + None, + DropFirstPerRoot, +} + +impl CallbackLossMode { + fn from_env() -> Self { + match std::env::var("REAM_LEAN_PACKET_LOSS_MODE") { + Ok(value) if value.eq_ignore_ascii_case("drop-first-per-root") => { + Self::DropFirstPerRoot + } + _ => Self::None, + } + } +} + +#[derive(Debug, Default, Clone, Copy)] +struct BackfillTelemetry { + requests_sent: u64, + request_retries: u64, + callbacks_processed: u64, + callbacks_dropped: u64, + callback_latency_ms_total: u128, + callback_latency_samples: u64, +} + +#[derive(Debug, Clone, Copy)] +struct InflightRootRequest { + primary_peer: PeerId, + backup_peer: Option, + requested_at: Instant, + backup_sent: bool, +} type CallbackFuture = Pin< Box< @@ -67,10 +135,23 @@ pub struct LeanChainService { network_state: Arc, sync_status: SyncStatus, peers_in_use: HashSet, - pending_job_requests: Vec, + pending_job_requests: VecDeque, forward_syncer: Option>>, checkpoints_to_queue: Vec<(Checkpoint, bool)>, pending_callbacks: FuturesUnordered, + near_head_backfill_strategy: NearHeadBackfillStrategy, + near_head_fanout_strategy: NearHeadFanoutStrategy, + handoff_strategy: HandoffStrategy, + backfill_timeout_strategy: BackfillTimeoutStrategy, + pending_dedup_strategy: PendingRequestDedupStrategy, + peer_selection_strategy: PeerSelectionStrategy, + recent_sync_blocks: Vec, + callback_loss_mode: CallbackLossMode, + dropped_callback_roots: HashSet, + backfill_telemetry: BackfillTelemetry, + last_backfill_progress_log: Option, + inflight_roots: HashMap, + peer_avg_latency_ms: HashMap, } impl LeanChainService { @@ -90,13 +171,33 @@ impl LeanChainService { forward_syncer: None, checkpoints_to_queue: Vec::new(), pending_callbacks: FuturesUnordered::new(), - pending_job_requests: Vec::new(), + pending_job_requests: VecDeque::new(), + near_head_backfill_strategy: NearHeadBackfillStrategy::from_env(), + near_head_fanout_strategy: NearHeadFanoutStrategy::from_env(), + handoff_strategy: HandoffStrategy::from_env(), + backfill_timeout_strategy: BackfillTimeoutStrategy::from_env(), + pending_dedup_strategy: PendingRequestDedupStrategy::from_env(), + peer_selection_strategy: PeerSelectionStrategy::from_env(), + recent_sync_blocks: Vec::new(), + callback_loss_mode: CallbackLossMode::from_env(), + dropped_callback_roots: HashSet::new(), + backfill_telemetry: BackfillTelemetry::default(), + last_backfill_progress_log: None, + inflight_roots: HashMap::new(), + peer_avg_latency_ms: HashMap::new(), } } pub async fn start(mut self) -> anyhow::Result<()> { info!( genesis_time = lean_network_spec().genesis_time, + near_head_backfill_strategy = ?self.near_head_backfill_strategy, + near_head_fanout_strategy = ?self.near_head_fanout_strategy, + handoff_strategy = ?self.handoff_strategy, + backfill_timeout_strategy = ?self.backfill_timeout_strategy, + pending_dedup_strategy = ?self.pending_dedup_strategy, + peer_selection_strategy = ?self.peer_selection_strategy, + callback_loss_mode = ?self.callback_loss_mode, "LeanChainService started", ); @@ -244,7 +345,14 @@ impl LeanChainService { } LeanChainServiceMessage::ProcessBlock { signed_block_with_attestation, need_gossip } => { if self.sync_status != SyncStatus::Synced { - trace!("Received ProcessBlock request while syncing. Ignoring."); + if let Err(err) = self + .handle_syncing_process_block(&signed_block_with_attestation) + .await + { + warn!( + "Failed to handle ProcessBlock while backfill syncing: {err:?}" + ); + } continue; } @@ -515,10 +623,23 @@ impl LeanChainService { } async fn step_backfill_sync(&mut self) -> anyhow::Result<()> { - info!( - slot = get_current_slot(), - "Node is syncing; backfill sync step executed", - ); + self.maybe_log_backfill_progress(); + self.prune_recent_sync_blocks(); + let backfill_job_timeout = self.current_backfill_job_timeout().await; + for timed_out_job in self.sync_status.reset_timed_out_jobs(backfill_job_timeout) { + self.backfill_telemetry.request_retries += 1; + self.inflight_roots.remove(&timed_out_job.root); + warn!( + peer_id = ?timed_out_job.peer_id, + root = ?timed_out_job.root, + timeout_seconds = backfill_job_timeout.as_secs_f64(), + "Backfill job request timed out; scheduling peer reassignment" + ); + self.network_state + .failed_response_from_peer(timed_out_job.peer_id); + self.peers_in_use.remove(&timed_out_job.peer_id); + self.queue_pending_reset(timed_out_job.peer_id); + } // If a queue has reached the stored head, execute that queue in a background thread, // blocking any other threads from processing until it returns. The thread can @@ -544,23 +665,75 @@ impl LeanChainService { } // queue unqueued jobs + let peer_gap_slots = self.current_peer_gap_slots().await; + let enable_near_head_fanout = should_fanout_near_head( + self.near_head_fanout_strategy, + peer_gap_slots, + NEAR_HEAD_FANOUT_MAX_GAP_SLOTS, + ); + self.queue_pending_job_requests().await?; + self.process_delayed_hedges(); let unqueued_jobs = self.sync_status.unqueued_jobs(); for job in unqueued_jobs { - let (callback, rx) = mpsc::channel(100); - if let Err(err) = self.outbound_p2p.send(LeanP2PRequest::Request { - peer_id: job.peer_id, - callback, - message: P2PCallbackRequest::BlocksByRoot { - roots: vec![job.root], - }, + if self.pending_job_requests.iter().any(|request| { + matches!( + request, + PendingJobRequest::Reset { + peer_id: existing_peer_id + } if *existing_peer_id == job.peer_id + ) }) { - warn!( - "Failed to send block request to peer {:?} for root {:?}: {err:?}", - job.peer_id, job.root + trace!( + peer_id = ?job.peer_id, + root = ?job.root, + "Skipping unqueued job while peer reset remains pending" ); continue; } - self.push_callback_receiver(rx); + + if matches!( + self.near_head_backfill_strategy, + NearHeadBackfillStrategy::GossipPreferred + ) && self.try_advance_job_with_cached_block(job.root).await? + { + continue; + } + + if self.inflight_roots.contains_key(&job.root) { + continue; + } + + let backup_peer = if enable_near_head_fanout { + self.alternate_peer_for_fanout(job.peer_id) + } else { + None + }; + + if !self.request_block_by_root_from_peer(job.peer_id, job.root) { + continue; + } + + let mut inflight_request = InflightRootRequest { + primary_peer: job.peer_id, + backup_peer, + requested_at: Instant::now(), + backup_sent: false, + }; + if self.near_head_fanout_strategy == NearHeadFanoutStrategy::DualPeer + && let Some(backup_peer_id) = backup_peer + { + inflight_request.backup_sent = + self.request_block_by_root_from_peer(backup_peer_id, job.root); + if inflight_request.backup_sent { + trace!( + root = ?job.root, + primary_peer_id = ?job.peer_id, + backup_peer_id = ?backup_peer_id, + "Fanout backfill request sent to backup peer" + ); + } + } + self.inflight_roots.insert(job.root, inflight_request); self.sync_status.mark_job_as_requested(job.root); } @@ -575,6 +748,30 @@ impl LeanChainService { return Ok(()); } }; + let (local_head_root, local_head_slot) = { + let fork_choice = self.store.read().await; + let store = fork_choice.store.lock().await; + let head = store.head_provider().get()?; + let head_slot = store + .block_provider() + .get(head)? + .ok_or_else(|| anyhow!("Block not found for head: {head}"))? + .message + .block + .slot; + (head, head_slot) + }; + + if !should_queue_peer_checkpoint(common_highest_checkpoint, local_head_slot) { + trace!( + checkpoint_slot = common_highest_checkpoint.slot, + checkpoint_root = ?common_highest_checkpoint.root, + local_head_slot, + local_head_root = ?local_head_root, + "Skipping checkpoint queue that is not ahead of local head" + ); + return Ok(()); + } if self .sync_status @@ -625,7 +822,9 @@ impl LeanChainService { .filter(|(peer_id, _)| !self.peers_in_use.contains(peer_id)) .collect(); - match candidates.choose_weighted(&mut rand::rng(), |(_, score)| f64::from(*score)) { + match candidates.choose_weighted(&mut rand::rng(), |(peer_id, score)| { + self.peer_weight(*peer_id, *score) + }) { Ok((peer_id, _)) => Some(*peer_id), Err(err) => { warn!("Failed to choose weighted peer: {err}"); @@ -634,15 +833,130 @@ impl LeanChainService { } } - async fn update_sync_status(&self) -> anyhow::Result { + fn alternate_peer_for_fanout(&self, primary_peer_id: PeerId) -> Option { + let candidates: Vec<(PeerId, u8)> = self + .network_state + .connected_peer_ids_with_scores() + .into_iter() + .filter(|(peer_id, _)| *peer_id != primary_peer_id) + .collect(); + + match candidates.choose_weighted(&mut rand::rng(), |(peer_id, score)| { + self.peer_weight(*peer_id, *score) + }) { + Ok((peer_id, _)) => Some(*peer_id), + Err(_) => None, + } + } + + fn peer_weight(&self, peer_id: PeerId, score: u8) -> f64 { + let score_weight = f64::from(score.max(1)); + match self.peer_selection_strategy { + PeerSelectionStrategy::ScoreOnly => score_weight, + PeerSelectionStrategy::LatencyWeighted => { + let latency_penalty = self + .peer_avg_latency_ms + .get(&peer_id) + .map(|latency_ms| 1.0 / (1.0 + (latency_ms / 1500.0))) + .unwrap_or(1.0); + (score_weight * latency_penalty).max(0.1) + } + } + } + + fn request_block_by_root_from_peer( + &mut self, + peer_id: PeerId, + root: alloy_primitives::B256, + ) -> bool { + let (callback, rx) = mpsc::channel(100); + if let Err(err) = self.outbound_p2p.send(LeanP2PRequest::Request { + peer_id, + callback, + message: P2PCallbackRequest::BlocksByRoot { roots: vec![root] }, + }) { + warn!( + "Failed to send block request to peer {:?} for root {:?}: {err:?}", + peer_id, root + ); + self.network_state.failed_response_from_peer(peer_id); + return false; + } + self.push_callback_receiver(rx); + self.backfill_telemetry.requests_sent += 1; + true + } + + fn process_delayed_hedges(&mut self) { + if self.near_head_fanout_strategy != NearHeadFanoutStrategy::DelayedHedge { + return; + } + + let now = Instant::now(); + let roots_to_hedge: Vec<(alloy_primitives::B256, PeerId, PeerId)> = self + .inflight_roots + .iter() + .filter_map(|(root, inflight)| { + let backup_peer = inflight.backup_peer?; + if inflight.backup_sent + || now.saturating_duration_since(inflight.requested_at) < BACKFILL_HEDGE_DELAY + { + return None; + } + Some((*root, inflight.primary_peer, backup_peer)) + }) + .collect(); + + for (root, primary_peer_id, backup_peer_id) in roots_to_hedge { + let backup_sent = self.request_block_by_root_from_peer(backup_peer_id, root); + if backup_sent { + if let Some(inflight) = self.inflight_roots.get_mut(&root) { + inflight.backup_sent = true; + } + trace!( + root = ?root, + primary_peer_id = ?primary_peer_id, + backup_peer_id = ?backup_peer_id, + "Delayed hedge backfill request sent to backup peer" + ); + } + } + } + + async fn current_peer_gap_slots(&self) -> u64 { + let local_head_slot = { + let fork_choice = self.store.read().await; + let store = fork_choice.store.lock().await; + let head = match store.head_provider().get() { + Ok(head) => head, + Err(_) => return 0, + }; + match store.block_provider().get(head) { + Ok(Some(block)) => block.message.block.slot, + _ => return 0, + } + }; + let highest_peer_head_slot = self + .network_state + .common_highest_checkpoint() + .map(|checkpoint| checkpoint.slot) + .unwrap_or(local_head_slot); + highest_peer_head_slot.saturating_sub(local_head_slot) + } + + async fn update_sync_status(&mut self) -> anyhow::Result { if self.forward_syncer.is_some() { return Ok(self.sync_status.clone()); } - let (head, block_provider) = { + let (head, block_provider, state_provider) = { let fork_choice = self.store.read().await; let store = fork_choice.store.lock().await; - (store.head_provider().get()?, store.block_provider()) + ( + store.head_provider().get()?, + store.block_provider(), + store.state_provider(), + ) }; let current_head_slot = block_provider .get(head)? @@ -650,6 +964,11 @@ impl LeanChainService { .message .block .slot; + let local_finalized_slot = state_provider + .get(head)? + .ok_or_else(|| anyhow!("State not found for head: {head}"))? + .latest_finalized + .slot; let tolerance = std::cmp::max(8, (lean_network_spec().num_validators * 2) / 3); let highest_peer_head_slot = self @@ -657,42 +976,75 @@ impl LeanChainService { .common_highest_checkpoint() .map(|c| c.slot) .unwrap_or(0); + let highest_peer_finalized_slot = self + .network_state + .common_highest_finalized_checkpoint() + .map(|c| c.slot) + .unwrap_or(local_finalized_slot); let is_synced_by_time = get_current_slot() <= current_head_slot + tolerance; let is_behind_peers = highest_peer_head_slot > current_head_slot + 2; + let is_behind_finality = + highest_peer_finalized_slot > local_finalized_slot + FINALITY_SYNC_MAX_LAG_SLOTS; + let has_pending_backfill_work = self.has_pending_backfill_work(); + let has_active_backfill_jobs = self.has_active_backfill_jobs(); + let has_inflight_backfill_requests = !self.inflight_roots.is_empty(); + let has_near_head_bridge = self.has_recent_near_head_gossip_bridge( + head, + current_head_slot, + highest_peer_head_slot, + ); + let should_be_synced = should_switch_to_synced( + self.handoff_strategy, + HandoffInputs { + is_behind_peers, + is_behind_finality, + has_pending_backfill_work, + has_near_head_bridge, + has_active_backfill_jobs, + has_inflight_backfill_requests, + }, + ); - let sync_status = if is_behind_peers { - if self.sync_status == SyncStatus::Synced { - info!( - slot = get_current_slot(), - head_slot = current_head_slot, - peer_head_slot = highest_peer_head_slot, - "Node fell behind peers; switching to Syncing" - ); - SyncStatus::Syncing { jobs: Vec::new() } - } else { - self.sync_status.clone() - } - } else if is_synced_by_time { + let sync_status = if should_be_synced { if self.sync_status != SyncStatus::Synced { - info!( - slot = get_current_slot(), - head_slot = current_head_slot, - "Node has synced to the head" - ); + if is_synced_by_time { + info!( + slot = get_current_slot(), + head_slot = current_head_slot, + "Node has synced to the head" + ); + } else { + info!( + slot = get_current_slot(), + head_slot = current_head_slot, + "Node is behind time but caught up to peers (stall detected); switching to Synced" + ); + } } SyncStatus::Synced + } else if self.sync_status == SyncStatus::Synced { + info!( + slot = get_current_slot(), + head_slot = current_head_slot, + peer_head_slot = highest_peer_head_slot, + local_finalized_slot, + peer_finalized_slot = highest_peer_finalized_slot, + has_pending_backfill_work, + has_near_head_bridge, + has_active_backfill_jobs, + has_inflight_backfill_requests, + is_behind_finality, + handoff_strategy = ?self.handoff_strategy, + "Node remains in backfill syncing mode" + ); + SyncStatus::Syncing { jobs: Vec::new() } } else { - if self.sync_status != SyncStatus::Synced { - info!( - slot = get_current_slot(), - head_slot = current_head_slot, - "Node is behind time but caught up to peers (stall detected); switching to Synced" - ); - } - SyncStatus::Synced + self.sync_status.clone() }; if sync_status == SyncStatus::Synced { + self.dropped_callback_roots.clear(); + self.inflight_roots.clear(); let now = SystemTime::now() .duration_since(UNIX_EPOCH) .map_err(|err| anyhow!("System time before epoch: {err:?}"))? @@ -706,6 +1058,152 @@ impl LeanChainService { Ok(sync_status) } + async fn current_backfill_job_timeout(&self) -> Duration { + let peer_gap = self.current_peer_gap_slots().await; + self.backfill_timeout_strategy + .timeout_for_peer_gap(peer_gap) + } + + fn has_pending_backfill_work(&self) -> bool { + let has_queued_jobs = + matches!(&self.sync_status, SyncStatus::Syncing { jobs } if !jobs.is_empty()); + let has_busy_peers = has_queued_jobs && !self.peers_in_use.is_empty(); + has_queued_jobs + || !self.pending_job_requests.is_empty() + || !self.checkpoints_to_queue.is_empty() + || has_busy_peers + || self.forward_syncer.is_some() + } + + fn has_active_backfill_jobs(&self) -> bool { + matches!( + &self.sync_status, + SyncStatus::Syncing { jobs } if jobs.iter().any(|queue| !queue.jobs.is_empty()) + ) + } + + fn sync_queue_stats(&self) -> (usize, usize) { + if let SyncStatus::Syncing { jobs } = &self.sync_status { + let queue_count = jobs.len(); + let total_jobs = jobs.iter().map(|queue| queue.jobs.len()).sum(); + return (queue_count, total_jobs); + } + (0, 0) + } + + fn maybe_log_backfill_progress(&mut self) { + let now = Instant::now(); + if let Some(last_log_time) = self.last_backfill_progress_log + && now.saturating_duration_since(last_log_time) < BACKFILL_PROGRESS_LOG_INTERVAL + { + return; + } + self.last_backfill_progress_log = Some(now); + + let (queue_count, total_jobs) = self.sync_queue_stats(); + let avg_callback_latency_ms = if self.backfill_telemetry.callback_latency_samples == 0 { + 0.0 + } else { + self.backfill_telemetry.callback_latency_ms_total as f64 + / self.backfill_telemetry.callback_latency_samples as f64 + }; + info!( + slot = get_current_slot(), + queue_count, + total_jobs, + pending_requests = self.pending_job_requests.len(), + inflight_roots = self.inflight_roots.len(), + peers_in_use = self.peers_in_use.len(), + recent_sync_blocks = self.recent_sync_blocks.len(), + requests_sent = self.backfill_telemetry.requests_sent, + request_retries = self.backfill_telemetry.request_retries, + callbacks_processed = self.backfill_telemetry.callbacks_processed, + callbacks_dropped = self.backfill_telemetry.callbacks_dropped, + avg_callback_latency_ms, + peer_latency_entries = self.peer_avg_latency_ms.len(), + "Node is syncing; backfill progress" + ); + } + + fn queue_pending_reset(&mut self, peer_id: PeerId) { + if self.pending_dedup_strategy == PendingRequestDedupStrategy::Dedup + && self + .pending_job_requests + .iter() + .any(|request| matches!(request, PendingJobRequest::Reset { peer_id: existing_peer_id } if *existing_peer_id == peer_id)) + { + return; + } + + self.pending_job_requests + .push_back(PendingJobRequest::new_reset(peer_id)); + } + + fn queue_pending_initial( + &mut self, + root: alloy_primitives::B256, + slot: u64, + parent_root: alloy_primitives::B256, + ) { + if self.pending_dedup_strategy == PendingRequestDedupStrategy::Dedup + && self + .pending_job_requests + .iter() + .any(|request| matches!(request, PendingJobRequest::Initial { root: existing_root, .. } if *existing_root == root)) + { + return; + } + + self.pending_job_requests + .push_back(PendingJobRequest::new_initial(root, slot, parent_root)); + } + + fn has_recent_near_head_gossip_bridge( + &self, + head: alloy_primitives::B256, + current_head_slot: u64, + highest_peer_head_slot: u64, + ) -> bool { + let gap = highest_peer_head_slot.saturating_sub(current_head_slot); + if gap <= 1 { + return true; + } + if gap > NEAR_HEAD_BRIDGE_MAX_GAP_SLOTS { + return false; + } + + let now = Instant::now(); + self.recent_sync_blocks.iter().any(|block| { + block.source == SyncBlockSource::Gossip + && now.saturating_duration_since(block.seen_at) <= RECENT_SYNC_BLOCK_RETENTION + && block.parent_root == head + && block.slot > current_head_slot + && block.slot <= highest_peer_head_slot.saturating_add(1) + }) + } + + fn record_recent_sync_block( + &mut self, + parent_root: alloy_primitives::B256, + slot: u64, + source: SyncBlockSource, + ) { + self.recent_sync_blocks.push(RecentSyncBlock { + parent_root, + slot, + seen_at: Instant::now(), + source, + }); + self.prune_recent_sync_blocks(); + } + + fn prune_recent_sync_blocks(&mut self) { + let now = Instant::now(); + self.recent_sync_blocks.retain(|block| { + now.saturating_duration_since(block.seen_at) <= RECENT_SYNC_BLOCK_RETENTION + }); + } + async fn handle_network_event(&mut self, event: NetworkEvent) -> anyhow::Result<()> { match event { NetworkEvent::RequestMessage { @@ -777,9 +1275,14 @@ impl LeanChainService { async fn handle_failed_job_request(&mut self, peer_id: PeerId) -> anyhow::Result<()> { self.network_state.failed_response_from_peer(peer_id); + if !self.sync_status.has_job_for_peer(peer_id) { + return Ok(()); + } self.peers_in_use.remove(&peer_id); - self.pending_job_requests - .push(PendingJobRequest::Reset { peer_id }); + self.inflight_roots.retain(|_, inflight| { + inflight.primary_peer != peer_id && inflight.backup_peer != Some(peer_id) + }); + self.queue_pending_reset(peer_id); Ok(()) } @@ -788,60 +1291,35 @@ impl LeanChainService { peer_id: PeerId, message: Arc, ) -> anyhow::Result<()> { - self.network_state.successful_response_from_peer(peer_id); - match &*message { LeanResponseMessage::BlocksByRoot(signed_block_with_attestation) => { - let last_root = signed_block_with_attestation.message.block.tree_hash_root(); - // if the parent root is present in pending blocks or is local head, we mark the - // queue as complete - let (head, pending_blocks_provider) = { - let fork_choice = self.store.read().await; - let store = fork_choice.store.lock().await; - ( - store.head_provider().get()?, - store.pending_blocks_provider(), - ) - }; - - pending_blocks_provider - .insert(last_root, signed_block_with_attestation.as_ref().clone())?; - self.peers_in_use.remove(&peer_id); - - // We have 3 scenarios where we can mark the job queue as complete - // 1. The parent root is the local head - // 2. The parent root is already present in the pending blocks (we have already - // requested it) - // 3. The parent root is the starting root of any existing job queue - let parent_root_is_local_head = - signed_block_with_attestation.message.block.parent_root == head; - let parent_root_in_pending_blocks = pending_blocks_provider - .get(signed_block_with_attestation.message.block.parent_root)? - .is_some(); - let parent_root_is_start_of_any_queue = - self.sync_status.is_root_start_of_any_queue( - &signed_block_with_attestation.message.block.parent_root, - ); - if parent_root_is_local_head - || parent_root_in_pending_blocks - || parent_root_is_start_of_any_queue + let block_root = signed_block_with_attestation.message.block.tree_hash_root(); + if !self.inflight_roots.contains_key(&block_root) + && !self.sync_status.contains_job_root(block_root) { trace!( - "Marking job queue as complete for block from peer {peer_id:?} with root {last_root:?}." + peer_id = ?peer_id, + block_root = ?block_root, + "Ignoring stale backfill callback for completed root" ); - self.sync_status.mark_job_queue_as_complete(last_root); - return Ok(()); } - - self.pending_job_requests - .push(PendingJobRequest::new_initial( - last_root, - signed_block_with_attestation.message.block.slot, - signed_block_with_attestation.message.block.parent_root, - )); - - self.queue_pending_job_requests().await?; + if self.should_drop_callback_response(block_root) { + self.backfill_telemetry.callbacks_dropped += 1; + warn!( + peer_id = ?peer_id, + block_root = ?block_root, + callback_loss_mode = ?self.callback_loss_mode, + "Dropping req/resp block callback to simulate packet loss" + ); + return Ok(()); + } + self.handle_backfill_block( + Some(peer_id), + signed_block_with_attestation.as_ref().clone(), + SyncBlockSource::ReqResp, + ) + .await?; } _ => warn!( "We handle these messages elsewhere, received unexpected LeanRequestMessage: {:?}", @@ -851,8 +1329,139 @@ impl LeanChainService { Ok(()) } + fn should_drop_callback_response(&mut self, root: alloy_primitives::B256) -> bool { + match self.callback_loss_mode { + CallbackLossMode::None => false, + CallbackLossMode::DropFirstPerRoot => self.dropped_callback_roots.insert(root), + } + } + + async fn handle_syncing_process_block( + &mut self, + signed_block_with_attestation: &SignedBlockWithAttestation, + ) -> anyhow::Result<()> { + let root = signed_block_with_attestation.message.block.tree_hash_root(); + trace!( + root = ?root, + slot = signed_block_with_attestation.message.block.slot, + "Received gossiped block while backfill syncing" + ); + self.handle_backfill_block( + None, + signed_block_with_attestation.clone(), + SyncBlockSource::Gossip, + ) + .await + } + + async fn try_advance_job_with_cached_block( + &mut self, + root: alloy_primitives::B256, + ) -> anyhow::Result { + let pending_block = { + let fork_choice = self.store.read().await; + let store = fork_choice.store.lock().await; + store.pending_blocks_provider().get(root)? + }; + + if let Some(block) = pending_block { + trace!( + root = ?root, + "Using cached pending block to advance backfill queue" + ); + self.handle_backfill_block(None, block, SyncBlockSource::ReqResp) + .await?; + return Ok(true); + } + + Ok(false) + } + + async fn handle_backfill_block( + &mut self, + source_peer_id: Option, + signed_block_with_attestation: SignedBlockWithAttestation, + source: SyncBlockSource, + ) -> anyhow::Result<()> { + let last_root = signed_block_with_attestation.message.block.tree_hash_root(); + let parent_root = signed_block_with_attestation.message.block.parent_root; + let slot = signed_block_with_attestation.message.block.slot; + self.inflight_roots.remove(&last_root); + let mut request_latency_ms: Option = None; + if source == SyncBlockSource::ReqResp { + self.backfill_telemetry.callbacks_processed += 1; + if let Some(latency) = self.sync_status.request_latency_for_root(last_root) { + self.backfill_telemetry.callback_latency_ms_total += latency.as_millis(); + self.backfill_telemetry.callback_latency_samples += 1; + request_latency_ms = Some(latency.as_secs_f64() * 1_000.0); + } + } + let job_peer_id = self.sync_status.peer_for_job_root(last_root); + let (head, pending_blocks_provider, block_provider) = { + let fork_choice = self.store.read().await; + let store = fork_choice.store.lock().await; + ( + store.head_provider().get()?, + store.pending_blocks_provider(), + store.block_provider(), + ) + }; + pending_blocks_provider.insert(last_root, signed_block_with_attestation)?; + self.record_recent_sync_block(parent_root, slot, source); + + if let Some(job_peer_id) = job_peer_id { + self.peers_in_use.remove(&job_peer_id); + } + + if let Some(peer_id) = source_peer_id { + self.network_state.successful_response_from_peer(peer_id); + if let Some(latency_ms) = request_latency_ms { + self.peer_avg_latency_ms + .entry(peer_id) + .and_modify(|avg_ms| *avg_ms = (*avg_ms * 0.8) + (latency_ms * 0.2)) + .or_insert(latency_ms); + } + self.peers_in_use.remove(&peer_id); + } + + if is_genesis_anchor(slot, parent_root) { + trace!( + root = ?last_root, + "Reached genesis anchor while backfilling; marking queue complete" + ); + self.sync_status.mark_job_queue_as_complete(last_root); + return Ok(()); + } + + let parent_root_is_local_head = parent_root == head; + let parent_root_in_pending_blocks = pending_blocks_provider.get(parent_root)?.is_some(); + let parent_root_in_block_store = block_provider.get(parent_root)?.is_some(); + let parent_root_is_start_of_any_queue = + self.sync_status.is_root_start_of_any_queue(&parent_root); + if parent_root_is_local_head + || parent_root_in_pending_blocks + || parent_root_in_block_store + || parent_root_is_start_of_any_queue + { + trace!( + root = ?last_root, + parent_root = ?parent_root, + "Marking backfill queue as complete" + ); + self.sync_status.mark_job_queue_as_complete(last_root); + return Ok(()); + } + + if self.sync_status.contains_job_root(last_root) { + self.queue_pending_initial(last_root, slot, parent_root); + self.queue_pending_job_requests().await?; + } + + Ok(()) + } + async fn queue_pending_job_requests(&mut self) -> anyhow::Result<()> { - while let Some(pending_job_request) = self.pending_job_requests.pop() { + while let Some(pending_job_request) = self.pending_job_requests.pop_front() { let non_queued_peer_id = match self.non_queued_peer_id().await { Some(id) => id, None => { @@ -860,7 +1469,7 @@ impl LeanChainService { "No connected peers available to assign pending job request. {:?} || {:?}", self.sync_status, self.pending_job_requests ); - self.pending_job_requests.push(pending_job_request); + self.pending_job_requests.push_back(pending_job_request); return Ok(()); } }; @@ -1036,3 +1645,46 @@ impl LeanChainService { self.pending_callbacks.push(future); } } + +#[cfg(test)] +mod tests { + use alloy_primitives::B256; + use ream_consensus_lean::checkpoint::Checkpoint; + + use super::{is_genesis_anchor, should_queue_peer_checkpoint}; + + #[test] + fn queueing_skips_zero_root_checkpoint() { + let checkpoint = Checkpoint { + root: B256::ZERO, + slot: 0, + }; + assert!(!should_queue_peer_checkpoint(checkpoint, 0)); + } + + #[test] + fn queueing_skips_stale_checkpoint() { + let checkpoint = Checkpoint { + root: B256::from([7; 32]), + slot: 4, + }; + assert!(!should_queue_peer_checkpoint(checkpoint, 4)); + assert!(!should_queue_peer_checkpoint(checkpoint, 5)); + } + + #[test] + fn queueing_accepts_ahead_non_zero_checkpoint() { + let checkpoint = Checkpoint { + root: B256::from([9; 32]), + slot: 12, + }; + assert!(should_queue_peer_checkpoint(checkpoint, 11)); + } + + #[test] + fn detects_genesis_anchor_only_for_slot_zero_zero_parent() { + assert!(is_genesis_anchor(0, B256::ZERO)); + assert!(!is_genesis_anchor(1, B256::ZERO)); + assert!(!is_genesis_anchor(0, B256::from([1; 32]))); + } +} diff --git a/crates/common/chain/lean/src/sync/forward_background_syncer.rs b/crates/common/chain/lean/src/sync/forward_background_syncer.rs index 3011c739b..bdebec029 100644 --- a/crates/common/chain/lean/src/sync/forward_background_syncer.rs +++ b/crates/common/chain/lean/src/sync/forward_background_syncer.rs @@ -32,12 +32,13 @@ impl ForwardBackgroundSyncer { pub async fn start(&mut self) -> anyhow::Result { let timer = Instant::now(); - let (head, pending_blocks_provider) = { + let (head, pending_blocks_provider, block_provider) = { let fork_choice = self.store.read().await; let store = fork_choice.store.lock().await; ( store.head_provider().get()?, store.pending_blocks_provider(), + store.block_provider(), ) }; let mut next_root = self.job_queue.starting_root; @@ -46,18 +47,23 @@ impl ForwardBackgroundSyncer { while next_root != head { let current_block = match pending_blocks_provider.get(next_root)? { Some(block) => block, - None => { - let last_block = last_block.ok_or_else(|| { - anyhow!("Failed to find block with root {next_root:?} in pending blocks") - })?; - return Ok(ForwardSyncResults::ChainIncomplete { - prevous_queue: self.job_queue.clone(), - checkpoint_for_new_queue: Checkpoint { - root: last_block.message.block.tree_hash_root(), - slot: last_block.message.block.slot, - }, - }); - } + None => match block_provider.get(next_root)? { + Some(block) => block, + None => { + let last_block = last_block.ok_or_else(|| { + anyhow!( + "Failed to find block with root {next_root:?} in pending blocks" + ) + })?; + return Ok(ForwardSyncResults::ChainIncomplete { + prevous_queue: self.job_queue.clone(), + checkpoint_for_new_queue: Checkpoint { + root: last_block.message.block.tree_hash_root(), + slot: last_block.message.block.slot, + }, + }); + } + }, }; ensure!( current_block.message.block.tree_hash_root() == next_root, @@ -70,9 +76,15 @@ impl ForwardBackgroundSyncer { } chained_roots.reverse(); - let blocks_synced = chained_roots.len(); + let mut blocks_synced = 0usize; + let mut store_writer = self.store.write().await; for root in chained_roots { + if block_provider.get(root)?.is_some() { + let _ = pending_blocks_provider.remove(root)?; + continue; + } + let block = pending_blocks_provider.get(root)?.ok_or_else(|| { anyhow!( "Failed to find block with root {root:?} in pending blocks during insertion" @@ -81,10 +93,14 @@ impl ForwardBackgroundSyncer { let time = lean_network_spec().genesis_time + (block.message.block.slot * lean_network_spec().seconds_per_slot); #[cfg(feature = "devnet2")] - self.store.write().await.on_tick(time, false).await?; + store_writer.on_tick(time, false).await?; #[cfg(feature = "devnet3")] - self.store.write().await.on_tick(time, false, true).await?; - self.store.write().await.on_block(&block, true).await?; + store_writer.on_tick(time, false, true).await?; + store_writer.on_block(&block, true).await?; + blocks_synced += 1; + // Remove blocks that have been applied to canonical storage to prevent unbounded growth + // of the pending-blocks table. + let _ = pending_blocks_provider.remove(root)?; } Ok(ForwardSyncResults::Completed { diff --git a/crates/common/chain/lean/src/sync/mod.rs b/crates/common/chain/lean/src/sync/mod.rs index 7953e5022..1b7f1a7d8 100644 --- a/crates/common/chain/lean/src/sync/mod.rs +++ b/crates/common/chain/lean/src/sync/mod.rs @@ -1,5 +1,8 @@ pub mod forward_background_syncer; pub mod job; +pub mod strategy; + +use std::time::{Duration, Instant}; use alloy_primitives::B256; use libp2p_identity::PeerId; @@ -193,6 +196,72 @@ impl SyncStatus { unqueued_jobs } + pub fn contains_job_root(&self, root: B256) -> bool { + if let SyncStatus::Syncing { jobs } = self { + return jobs.iter().any(|queue| queue.jobs.contains_key(&root)); + } + + false + } + + pub fn peer_for_job_root(&self, root: B256) -> Option { + if let SyncStatus::Syncing { jobs } = self { + for queue in jobs { + if let Some(job) = queue.jobs.get(&root) { + return Some(job.peer_id); + } + } + } + + None + } + + pub fn reset_timed_out_jobs(&mut self, timeout: Duration) -> Vec { + let mut timed_out_jobs = Vec::new(); + if let SyncStatus::Syncing { jobs } = self { + for queue in jobs { + for job in queue.jobs.values_mut() { + if job.has_been_requested + && let Some(time_requested) = job.time_requested + && time_requested.elapsed() >= timeout + { + timed_out_jobs.push(job.clone()); + job.has_been_requested = false; + job.time_requested = None; + } + } + } + } + timed_out_jobs + } + + pub fn request_latency_for_root(&self, root: B256) -> Option { + if let SyncStatus::Syncing { jobs } = self { + for queue in jobs { + if let Some(job) = queue.jobs.get(&root) + && let Some(requested_at) = job.time_requested + { + let now = Instant::now(); + return Some(now.saturating_duration_since(requested_at)); + } + } + } + + None + } + + pub fn has_job_for_peer(&self, peer_id: PeerId) -> bool { + if let SyncStatus::Syncing { jobs } = self { + for queue in jobs { + if queue.jobs.values().any(|job| job.peer_id == peer_id) { + return true; + } + } + } + + false + } + pub fn mark_job_as_requested(&mut self, root: B256) { if let SyncStatus::Syncing { jobs } = self { for queue in jobs.iter_mut() { @@ -371,4 +440,91 @@ mod tests { let unqueued_jobs_after = status.unqueued_jobs(); assert!(unqueued_jobs_after.is_empty()); } + + #[test] + fn test_contains_job_root() { + let mut status = SyncStatus::Syncing { jobs: Vec::new() }; + let peer_id = PeerId::random(); + let root = mock_root(7); + + status.add_new_job_queue( + Checkpoint { root, slot: 15 }, + JobRequest::new(peer_id, root), + false, + ); + + assert!(status.contains_job_root(root)); + assert!(!status.contains_job_root(mock_root(9))); + } + + #[test] + fn test_reset_timed_out_jobs() { + let mut status = SyncStatus::Syncing { jobs: Vec::new() }; + let peer_id = PeerId::random(); + let root = mock_root(1); + + status.add_new_job_queue( + Checkpoint { root, slot: 100 }, + JobRequest::new(peer_id, root), + false, + ); + status.mark_job_as_requested(root); + + let timed_out_jobs = status.reset_timed_out_jobs(Duration::from_millis(0)); + assert_eq!(timed_out_jobs.len(), 1); + assert_eq!(timed_out_jobs[0].root, root); + + let unqueued_jobs = status.unqueued_jobs(); + assert_eq!(unqueued_jobs.len(), 1); + assert!(!unqueued_jobs[0].has_been_requested); + } + + #[test] + fn test_peer_for_job_root() { + let mut status = SyncStatus::Syncing { jobs: Vec::new() }; + let peer_id = PeerId::random(); + let root = mock_root(42); + status.add_new_job_queue( + Checkpoint { root, slot: 10 }, + JobRequest::new(peer_id, root), + false, + ); + + assert_eq!(status.peer_for_job_root(root), Some(peer_id)); + assert_eq!(status.peer_for_job_root(mock_root(100)), None); + } + + #[test] + fn test_request_latency_for_root_after_mark_requested() { + let mut status = SyncStatus::Syncing { jobs: Vec::new() }; + let peer_id = PeerId::random(); + let root = mock_root(9); + status.add_new_job_queue( + Checkpoint { root, slot: 12 }, + JobRequest::new(peer_id, root), + false, + ); + + assert_eq!(status.request_latency_for_root(root), None); + + status.mark_job_as_requested(root); + let latency = status.request_latency_for_root(root); + assert!(latency.is_some()); + } + + #[test] + fn test_has_job_for_peer() { + let mut status = SyncStatus::Syncing { jobs: Vec::new() }; + let peer_id = PeerId::random(); + let other_peer = PeerId::random(); + let root = mock_root(77); + status.add_new_job_queue( + Checkpoint { root, slot: 1 }, + JobRequest::new(peer_id, root), + false, + ); + + assert!(status.has_job_for_peer(peer_id)); + assert!(!status.has_job_for_peer(other_peer)); + } } diff --git a/crates/common/chain/lean/src/sync/strategy.rs b/crates/common/chain/lean/src/sync/strategy.rs new file mode 100644 index 000000000..96a110666 --- /dev/null +++ b/crates/common/chain/lean/src/sync/strategy.rs @@ -0,0 +1,426 @@ +use std::{env, time::Duration}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NearHeadBackfillStrategy { + RequestOnly, + GossipPreferred, +} + +impl NearHeadBackfillStrategy { + fn parse(value: &str) -> Option { + if value.eq_ignore_ascii_case("request-only") { + Some(Self::RequestOnly) + } else if value.eq_ignore_ascii_case("gossip-preferred") { + Some(Self::GossipPreferred) + } else { + None + } + } + + pub fn from_env() -> Self { + env::var("REAM_LEAN_BACKFILL_AB_STRATEGY") + .ok() + .as_deref() + .and_then(Self::parse) + .unwrap_or(Self::GossipPreferred) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NearHeadFanoutStrategy { + SinglePeer, + DualPeer, + DelayedHedge, +} + +impl NearHeadFanoutStrategy { + fn parse(value: &str) -> Option { + if value.eq_ignore_ascii_case("single-peer") { + Some(Self::SinglePeer) + } else if value.eq_ignore_ascii_case("dual-peer") { + Some(Self::DualPeer) + } else if value.eq_ignore_ascii_case("delayed-hedge") { + Some(Self::DelayedHedge) + } else { + None + } + } + + pub fn from_env() -> Self { + env::var("REAM_LEAN_FANOUT_AB_STRATEGY") + .ok() + .as_deref() + .and_then(Self::parse) + .unwrap_or(Self::SinglePeer) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HandoffStrategy { + Legacy, + RobustBridge, +} + +impl HandoffStrategy { + fn parse(value: &str) -> Option { + if value.eq_ignore_ascii_case("legacy") { + Some(Self::Legacy) + } else if value.eq_ignore_ascii_case("robust-bridge") { + Some(Self::RobustBridge) + } else { + None + } + } + + pub fn from_env() -> Self { + env::var("REAM_LEAN_HANDOFF_AB_STRATEGY") + .ok() + .as_deref() + .and_then(Self::parse) + .unwrap_or(Self::RobustBridge) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BackfillTimeoutStrategy { + Fixed, + AdaptiveGap, +} + +impl BackfillTimeoutStrategy { + const FIXED_TIMEOUT: Duration = Duration::from_secs(2); + + fn parse(value: &str) -> Option { + if value.eq_ignore_ascii_case("fixed") { + Some(Self::Fixed) + } else if value.eq_ignore_ascii_case("adaptive-gap") { + Some(Self::AdaptiveGap) + } else { + None + } + } + + pub fn from_env() -> Self { + env::var("REAM_LEAN_TIMEOUT_AB_STRATEGY") + .ok() + .as_deref() + .and_then(Self::parse) + .unwrap_or(Self::AdaptiveGap) + } + + pub fn timeout_for_peer_gap(self, peer_gap_slots: u64) -> Duration { + match self { + Self::Fixed => Self::FIXED_TIMEOUT, + // Near-head: retry faster. Far from head: allow slower peers more time. + Self::AdaptiveGap => { + if peer_gap_slots <= 2 { + Duration::from_millis(750) + } else if peer_gap_slots <= 8 { + Self::FIXED_TIMEOUT + } else { + Duration::from_secs(4) + } + } + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PendingRequestDedupStrategy { + Legacy, + Dedup, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PeerSelectionStrategy { + ScoreOnly, + LatencyWeighted, +} + +impl PeerSelectionStrategy { + fn parse(value: &str) -> Option { + if value.eq_ignore_ascii_case("score-only") { + Some(Self::ScoreOnly) + } else if value.eq_ignore_ascii_case("latency-weighted") { + Some(Self::LatencyWeighted) + } else { + None + } + } + + pub fn from_env() -> Self { + env::var("REAM_LEAN_PEER_SELECT_AB_STRATEGY") + .ok() + .as_deref() + .and_then(Self::parse) + .unwrap_or(Self::LatencyWeighted) + } +} + +impl PendingRequestDedupStrategy { + fn parse(value: &str) -> Option { + if value.eq_ignore_ascii_case("legacy") { + Some(Self::Legacy) + } else if value.eq_ignore_ascii_case("dedup") { + Some(Self::Dedup) + } else { + None + } + } + + pub fn from_env() -> Self { + env::var("REAM_LEAN_PENDING_DEDUP_AB_STRATEGY") + .ok() + .as_deref() + .and_then(Self::parse) + .unwrap_or(Self::Dedup) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct HandoffInputs { + pub is_behind_peers: bool, + pub is_behind_finality: bool, + pub has_pending_backfill_work: bool, + pub has_near_head_bridge: bool, + pub has_active_backfill_jobs: bool, + pub has_inflight_backfill_requests: bool, +} + +pub fn should_switch_to_synced(strategy: HandoffStrategy, inputs: HandoffInputs) -> bool { + if inputs.is_behind_peers || inputs.is_behind_finality { + return false; + } + + match strategy { + HandoffStrategy::Legacy => true, + HandoffStrategy::RobustBridge => { + if !inputs.has_pending_backfill_work { + return true; + } + + // Allow bridge-based handoff only when remaining work is passive bookkeeping. + inputs.has_near_head_bridge + && !inputs.has_active_backfill_jobs + && !inputs.has_inflight_backfill_requests + } + } +} + +pub fn should_fanout_near_head( + strategy: NearHeadFanoutStrategy, + peer_gap_slots: u64, + max_near_head_gap_slots: u64, +) -> bool { + matches!( + strategy, + NearHeadFanoutStrategy::DualPeer | NearHeadFanoutStrategy::DelayedHedge + ) && peer_gap_slots > 0 + && peer_gap_slots <= max_near_head_gap_slots +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn handoff_strategy_legacy_ignores_pending_backfill_work() { + assert!(should_switch_to_synced( + HandoffStrategy::Legacy, + HandoffInputs { + is_behind_peers: false, + is_behind_finality: false, + has_pending_backfill_work: true, + has_near_head_bridge: false, + has_active_backfill_jobs: true, + has_inflight_backfill_requests: true, + }, + )); + } + + #[test] + fn handoff_strategy_robust_requires_bridge_and_no_active_work() { + assert!(!should_switch_to_synced( + HandoffStrategy::RobustBridge, + HandoffInputs { + is_behind_peers: false, + is_behind_finality: false, + has_pending_backfill_work: true, + has_near_head_bridge: false, + has_active_backfill_jobs: true, + has_inflight_backfill_requests: true, + }, + )); + + assert!(!should_switch_to_synced( + HandoffStrategy::RobustBridge, + HandoffInputs { + is_behind_peers: false, + is_behind_finality: false, + has_pending_backfill_work: true, + has_near_head_bridge: true, + has_active_backfill_jobs: true, + has_inflight_backfill_requests: false, + }, + )); + + assert!(!should_switch_to_synced( + HandoffStrategy::RobustBridge, + HandoffInputs { + is_behind_peers: false, + is_behind_finality: false, + has_pending_backfill_work: true, + has_near_head_bridge: true, + has_active_backfill_jobs: false, + has_inflight_backfill_requests: true, + }, + )); + + assert!(should_switch_to_synced( + HandoffStrategy::RobustBridge, + HandoffInputs { + is_behind_peers: false, + is_behind_finality: false, + has_pending_backfill_work: true, + has_near_head_bridge: true, + has_active_backfill_jobs: false, + has_inflight_backfill_requests: false, + }, + )); + } + + #[test] + fn handoff_stays_syncing_when_behind_peers() { + assert!(!should_switch_to_synced( + HandoffStrategy::Legacy, + HandoffInputs { + is_behind_peers: true, + is_behind_finality: false, + has_pending_backfill_work: false, + has_near_head_bridge: true, + has_active_backfill_jobs: false, + has_inflight_backfill_requests: false, + }, + )); + } + + #[test] + fn handoff_stays_syncing_when_finality_is_far_behind() { + assert!(!should_switch_to_synced( + HandoffStrategy::RobustBridge, + HandoffInputs { + is_behind_peers: false, + is_behind_finality: true, + has_pending_backfill_work: false, + has_near_head_bridge: true, + has_active_backfill_jobs: false, + has_inflight_backfill_requests: false, + }, + )); + } + + #[test] + fn strategy_parsing_supports_ab_variants() { + assert_eq!( + NearHeadBackfillStrategy::parse("request-only"), + Some(NearHeadBackfillStrategy::RequestOnly) + ); + assert_eq!( + NearHeadBackfillStrategy::parse("gossip-preferred"), + Some(NearHeadBackfillStrategy::GossipPreferred) + ); + assert_eq!( + NearHeadFanoutStrategy::parse("single-peer"), + Some(NearHeadFanoutStrategy::SinglePeer) + ); + assert_eq!( + NearHeadFanoutStrategy::parse("dual-peer"), + Some(NearHeadFanoutStrategy::DualPeer) + ); + assert_eq!( + NearHeadFanoutStrategy::parse("delayed-hedge"), + Some(NearHeadFanoutStrategy::DelayedHedge) + ); + assert_eq!( + HandoffStrategy::parse("legacy"), + Some(HandoffStrategy::Legacy) + ); + assert_eq!( + HandoffStrategy::parse("robust-bridge"), + Some(HandoffStrategy::RobustBridge) + ); + assert_eq!( + BackfillTimeoutStrategy::parse("fixed"), + Some(BackfillTimeoutStrategy::Fixed) + ); + assert_eq!( + BackfillTimeoutStrategy::parse("adaptive-gap"), + Some(BackfillTimeoutStrategy::AdaptiveGap) + ); + assert_eq!( + PendingRequestDedupStrategy::parse("legacy"), + Some(PendingRequestDedupStrategy::Legacy) + ); + assert_eq!( + PendingRequestDedupStrategy::parse("dedup"), + Some(PendingRequestDedupStrategy::Dedup) + ); + assert_eq!( + PeerSelectionStrategy::parse("score-only"), + Some(PeerSelectionStrategy::ScoreOnly) + ); + assert_eq!( + PeerSelectionStrategy::parse("latency-weighted"), + Some(PeerSelectionStrategy::LatencyWeighted) + ); + } + + #[test] + fn adaptive_timeout_ab_behaves_as_expected() { + assert_eq!( + BackfillTimeoutStrategy::Fixed.timeout_for_peer_gap(1), + Duration::from_secs(2) + ); + assert_eq!( + BackfillTimeoutStrategy::AdaptiveGap.timeout_for_peer_gap(1), + Duration::from_millis(750) + ); + assert_eq!( + BackfillTimeoutStrategy::AdaptiveGap.timeout_for_peer_gap(6), + Duration::from_secs(2) + ); + assert_eq!( + BackfillTimeoutStrategy::AdaptiveGap.timeout_for_peer_gap(30), + Duration::from_secs(4) + ); + } + + #[test] + fn near_head_fanout_ab_behaves_as_expected() { + assert!(!should_fanout_near_head( + NearHeadFanoutStrategy::SinglePeer, + 1, + 4 + )); + assert!(should_fanout_near_head( + NearHeadFanoutStrategy::DualPeer, + 1, + 4 + )); + assert!(!should_fanout_near_head( + NearHeadFanoutStrategy::DualPeer, + 0, + 4 + )); + assert!(!should_fanout_near_head( + NearHeadFanoutStrategy::DualPeer, + 9, + 4 + )); + assert!(should_fanout_near_head( + NearHeadFanoutStrategy::DelayedHedge, + 2, + 4 + )); + } +} diff --git a/crates/networking/network_state/lean/src/lib.rs b/crates/networking/network_state/lean/src/lib.rs index b69ffad31..da1d77ab9 100644 --- a/crates/networking/network_state/lean/src/lib.rs +++ b/crates/networking/network_state/lean/src/lib.rs @@ -97,6 +97,23 @@ impl NetworkState { .map(|(checkpoint, _)| checkpoint) } + pub fn common_highest_finalized_checkpoint(&self) -> Option { + let peer_table = self.peer_table.lock(); + let mut checkpoint_tally: HashMap = HashMap::new(); + for peer in peer_table.values() { + if let (ConnectionState::Connected, Some(finalized_checkpoint)) = + (&peer.state, &peer.finalized_checkpoint) + { + *checkpoint_tally.entry(*finalized_checkpoint).or_insert(0) += 1; + } + } + + checkpoint_tally + .into_iter() + .max_by_key(|(checkpoint, tally)| (*tally, checkpoint.slot)) + .map(|(checkpoint, _)| checkpoint) + } + pub fn successful_response_from_peer(&self, peer_id: PeerId) { if let Some(cached_peer) = self.peer_table.lock().get_mut(&peer_id) { cached_peer.peer_score = cached_peer.peer_score.saturating_add(10);