diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 800d988d1a9..e69c7aa5f78 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -1,16 +1,15 @@ -use std::sync::Arc; - +use crate::rpc::{ + methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage}, + SubstreamId, +}; use libp2p::swarm::ConnectionId; +use std::fmt::{Display, Formatter}; +use std::sync::Arc; use types::{ BlobSidecar, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, }; -use crate::rpc::{ - methods::{ResponseTermination, RpcResponse, RpcSuccessResponse, StatusMessage}, - SubstreamId, -}; - /// Identifier of requests sent by a peer. pub type PeerRequestId = (ConnectionId, SubstreamId); @@ -235,9 +234,108 @@ impl slog::Value for RequestId { } } -// This custom impl reduces log boilerplate not printing `DataColumnsByRootRequestId` on each id log -impl std::fmt::Display for DataColumnsByRootRequestId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} {:?}", self.id, self.requester) +macro_rules! impl_display { + ($structname: ty, $format: literal, $($field:ident),*) => { + impl Display for $structname { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, $format, $(self.$field,)*) + } + } + }; +} + +// Since each request Id is deeply nested with various types, if rendered with Debug on logs they +// take too much visual space. This custom Display implementations make the overall Id short while +// not losing information +impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id); +impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id); +impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id); +impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); +impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); +impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); +impl_display!(CustodyId, "{}", requester); +impl_display!(SamplingId, "{}/{}", sampling_request_id, id); + +impl Display for DataColumnsByRootRequester { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Custody(id) => write!(f, "Custody/{id}"), + Self::Sampling(id) => write!(f, "Sampling/{id}"), + } + } +} + +impl Display for CustodyRequester { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl Display for RangeRequestId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::RangeSync { chain_id, batch_id } => write!(f, "RangeSync/{batch_id}/{chain_id}"), + Self::BackfillSync { batch_id } => write!(f, "BackfillSync/{batch_id}"), + } + } +} + +impl Display for SamplingRequestId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl Display for SamplingRequester { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::ImportedBlock(block) => write!(f, "ImportedBlock/{block}"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn display_id_data_columns_by_root_custody() { + let id = DataColumnsByRootRequestId { + id: 123, + requester: DataColumnsByRootRequester::Custody(CustodyId { + requester: CustodyRequester(SingleLookupReqId { + req_id: 121, + lookup_id: 101, + }), + }), + }; + assert_eq!(format!("{id}"), "123/Custody/121/Lookup/101"); + } + + #[test] + fn display_id_data_columns_by_root_sampling() { + let id = DataColumnsByRootRequestId { + id: 123, + requester: DataColumnsByRootRequester::Sampling(SamplingId { + id: SamplingRequester::ImportedBlock(Hash256::ZERO), + sampling_request_id: SamplingRequestId(101), + }), + }; + assert_eq!(format!("{id}"), "123/Sampling/101/ImportedBlock/0x0000000000000000000000000000000000000000000000000000000000000000"); + } + + #[test] + fn display_id_data_columns_by_range() { + let id = DataColumnsByRangeRequestId { + id: 123, + parent_request_id: ComponentsByRangeRequestId { + id: 122, + requester: RangeRequestId::RangeSync { + chain_id: 54, + batch_id: Epoch::new(0), + }, + }, + }; + assert_eq!(format!("{id}"), "123/122/RangeSync/0/54"); } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 0cd21de7f41..b03a446add2 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -37,6 +37,7 @@ use requests::{ use slog::{debug, error, warn}; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; +use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; @@ -535,17 +536,10 @@ impl SyncNetworkContext { } } - let req_id = self.next_id(); - let id = SingleLookupReqId { lookup_id, req_id }; - - debug!( - self.log, - "Sending BlocksByRoot Request"; - "method" => "BlocksByRoot", - "block_root" => ?block_root, - "peer" => %peer_id, - "id" => ?id - ); + let id = SingleLookupReqId { + lookup_id, + req_id: self.next_id(), + }; let request = BlocksByRootSingleRequest(block_root); @@ -563,6 +557,15 @@ impl SyncNetworkContext { }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; + debug!( + self.log, + "Sync RPC request sent"; + "method" => "BlocksByRoot", + "block_root" => ?block_root, + "peer" => %peer_id, + "id" => %id + ); + self.blocks_by_root_requests.insert( id, peer_id, @@ -572,7 +575,7 @@ impl SyncNetworkContext { BlocksByRootRequestItems::new(request), ); - Ok(LookupRequestResult::RequestSent(req_id)) + Ok(LookupRequestResult::RequestSent(id.req_id)) } /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: @@ -618,22 +621,14 @@ impl SyncNetworkContext { return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch")); } - let req_id = self.next_id(); - let id = SingleLookupReqId { lookup_id, req_id }; - - debug!( - self.log, - "Sending BlobsByRoot Request"; - "method" => "BlobsByRoot", - "block_root" => ?block_root, - "blob_indices" => ?indices, - "peer" => %peer_id, - "id" => ?id - ); + let id = SingleLookupReqId { + lookup_id, + req_id: self.next_id(), + }; let request = BlobsByRootSingleBlockRequest { block_root, - indices, + indices: indices.clone(), }; // Lookup sync event safety: Refer to `Self::block_lookup_request` `network_send.send` call @@ -645,6 +640,16 @@ impl SyncNetworkContext { }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; + debug!( + self.log, + "Sync RPC request sent"; + "method" => "BlobsByRoot", + "block_root" => ?block_root, + "blob_indices" => ?indices, + "peer" => %peer_id, + "id" => %id + ); + self.blobs_by_root_requests.insert( id, peer_id, @@ -655,7 +660,7 @@ impl SyncNetworkContext { BlobsByRootRequestItems::new(request), ); - Ok(LookupRequestResult::RequestSent(req_id)) + Ok(LookupRequestResult::RequestSent(id.req_id)) } /// Request to send a single `data_columns_by_root` request to the network. @@ -666,35 +671,35 @@ impl SyncNetworkContext { request: DataColumnsByRootSingleBlockRequest, expect_max_responses: bool, ) -> Result, &'static str> { - let req_id = DataColumnsByRootRequestId { + let id = DataColumnsByRootRequestId { id: self.next_id(), requester, }; + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: RequestType::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)), + request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(id)), + })?; + debug!( self.log, - "Sending DataColumnsByRoot Request"; + "Sync RPC request sent"; "method" => "DataColumnsByRoot", "block_root" => ?request.block_root, "indices" => ?request.indices, "peer" => %peer_id, - "requester" => ?requester, - "req_id" => %req_id, + "id" => %id, ); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: RequestType::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)), - request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id)), - })?; - self.data_columns_by_root_requests.insert( - req_id, + id, peer_id, expect_max_responses, DataColumnsByRootRequestItems::new(request), ); - Ok(LookupRequestResult::RequestSent(req_id)) + Ok(LookupRequestResult::RequestSent(id)) } /// Request to fetch all needed custody columns of a specific block. This function may not send @@ -727,15 +732,17 @@ impl SyncNetworkContext { return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch")); } - let req_id = self.next_id(); - let id = SingleLookupReqId { lookup_id, req_id }; + let id = SingleLookupReqId { + lookup_id, + req_id: self.next_id(), + }; debug!( self.log, "Starting custody columns request"; "block_root" => ?block_root, "indices" => ?custody_indexes_to_fetch, - "id" => ?id + "id" => %id ); let requester = CustodyRequester(id); @@ -754,7 +761,7 @@ impl SyncNetworkContext { // created cannot return data immediately, it must send some request to the network // first. And there must exist some request, `custody_indexes_to_fetch` is not empty. self.custody_by_root_requests.insert(requester, request); - Ok(LookupRequestResult::RequestSent(req_id)) + Ok(LookupRequestResult::RequestSent(id.req_id)) } Err(e) => Err(RpcRequestSendError::CustodyRequestError(e)), } @@ -770,15 +777,6 @@ impl SyncNetworkContext { id: self.next_id(), parent_request_id, }; - debug!( - self.log, - "Sending BlocksByRange request"; - "method" => "BlocksByRange", - "count" => request.count(), - "epoch" => Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()), - "peer" => %peer_id, - "id" => ?id, - ); self.network_send .send(NetworkMessage::SendRequest { peer_id, @@ -787,6 +785,16 @@ impl SyncNetworkContext { }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; + debug!( + self.log, + "Sync RPC request sent"; + "method" => "BlocksByRange", + "slots" => request.count(), + "epoch" => Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()), + "peer" => %peer_id, + "id" => %id, + ); + self.blocks_by_range_requests.insert( id, peer_id, @@ -809,15 +817,6 @@ impl SyncNetworkContext { parent_request_id, }; let request_epoch = Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()); - debug!( - self.log, - "Sending BlobsByRange requests"; - "method" => "BlobsByRange", - "count" => request.count, - "epoch" => request_epoch, - "peer" => %peer_id, - "id" => ?id, - ); // Create the blob request based on the blocks request. self.network_send @@ -828,6 +827,16 @@ impl SyncNetworkContext { }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; + debug!( + self.log, + "Sync RPC request sent"; + "method" => "BlobsByRange", + "slots" => request.count, + "epoch" => request_epoch, + "peer" => %peer_id, + "id" => %id, + ); + let max_blobs_per_block = self.chain.spec.max_blobs_per_block(request_epoch); self.blobs_by_range_requests.insert( id, @@ -850,16 +859,6 @@ impl SyncNetworkContext { id: self.next_id(), parent_request_id, }; - debug!( - self.log, - "Sending DataColumnsByRange requests"; - "method" => "DataColumnsByRange", - "count" => request.count, - "epoch" => Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()), - "columns" => ?request.columns, - "peer" => %peer_id, - "id" => ?id, - ); self.send_network_msg(NetworkMessage::SendRequest { peer_id, @@ -868,6 +867,17 @@ impl SyncNetworkContext { }) .map_err(|_| RpcRequestSendError::NetworkSendError)?; + debug!( + self.log, + "Sync RPC request sent"; + "method" => "DataColumnsByRange", + "slots" => request.count, + "epoch" => Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()), + "columns" => ?request.columns, + "peer" => %peer_id, + "id" => %id, + ); + self.data_columns_by_range_requests.insert( id, peer_id, @@ -1011,8 +1021,8 @@ impl SyncNetworkContext { peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>>> { - let response = self.blocks_by_root_requests.on_response(id, rpc_event); - let response = response.map(|res| { + let resp = self.blocks_by_root_requests.on_response(id, rpc_event); + let resp = resp.map(|res| { res.and_then(|(mut blocks, seen_timestamp)| { // Enforce that exactly one chunk = one block is returned. ReqResp behavior limits the // response count to at most 1. @@ -1024,10 +1034,7 @@ impl SyncNetworkContext { } }) }); - if let Some(Err(RpcResponseError::VerifyError(e))) = &response { - self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); - } - response + self.on_rpc_response_result(id, "BlocksByRoot", resp, peer_id, |_| 1) } pub(crate) fn on_single_blob_response( @@ -1036,8 +1043,8 @@ impl SyncNetworkContext { peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>> { - let response = self.blobs_by_root_requests.on_response(id, rpc_event); - let response = response.map(|res| { + let resp = self.blobs_by_root_requests.on_response(id, rpc_event); + let resp = resp.map(|res| { res.and_then(|(blobs, seen_timestamp)| { if let Some(max_len) = blobs .first() @@ -1056,10 +1063,7 @@ impl SyncNetworkContext { } }) }); - if let Some(Err(RpcResponseError::VerifyError(e))) = &response { - self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); - } - response + self.on_rpc_response_result(id, "BlobsByRoot", resp, peer_id, |_| 1) } #[allow(clippy::type_complexity)] @@ -1072,7 +1076,7 @@ impl SyncNetworkContext { let resp = self .data_columns_by_root_requests .on_response(id, rpc_event); - self.report_rpc_response_errors(resp, peer_id) + self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1) } #[allow(clippy::type_complexity)] @@ -1083,7 +1087,7 @@ impl SyncNetworkContext { rpc_event: RpcEvent>>, ) -> Option>>>> { let resp = self.blocks_by_range_requests.on_response(id, rpc_event); - self.report_rpc_response_errors(resp, peer_id) + self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len()) } #[allow(clippy::type_complexity)] @@ -1094,7 +1098,7 @@ impl SyncNetworkContext { rpc_event: RpcEvent>>, ) -> Option>>>> { let resp = self.blobs_by_range_requests.on_response(id, rpc_event); - self.report_rpc_response_errors(resp, peer_id) + self.on_rpc_response_result(id, "BlobsByRangeRequest", resp, peer_id, |b| b.len()) } #[allow(clippy::type_complexity)] @@ -1107,14 +1111,38 @@ impl SyncNetworkContext { let resp = self .data_columns_by_range_requests .on_response(id, rpc_event); - self.report_rpc_response_errors(resp, peer_id) + self.on_rpc_response_result(id, "DataColumnsByRange", resp, peer_id, |d| d.len()) } - fn report_rpc_response_errors( + fn on_rpc_response_result usize>( &mut self, + id: I, + method: &'static str, resp: Option>, peer_id: PeerId, + get_count: F, ) -> Option> { + match &resp { + None => {} + Some(Ok((v, _))) => { + debug!( + self.log, + "Sync RPC request completed"; + "id" => %id, + "method" => method, + "count" => get_count(v) + ); + } + Some(Err(e)) => { + debug!( + self.log, + "Sync RPC request error"; + "id" => %id, + "method" => method, + "error" => ?e + ); + } + } if let Some(Err(RpcResponseError::VerifyError(e))) = &resp { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); }