Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Apr 13, 2024
1 parent c6ac5d3 commit 38d21f0
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 344 deletions.
24 changes: 2 additions & 22 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::sync::block_lookups::parent_lookup::PARENT_FAIL_TOLERANCE;
use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, LookupVerifyError, SingleBlockLookup, SingleLookupRequestState, State,
LookupRequestError, SingleBlockLookup, SingleLookupRequestState, State,
};
use crate::sync::block_lookups::{
BlobRequestState, BlockLookups, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
Expand Down Expand Up @@ -158,26 +158,6 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {

/* Response handling methods */

/// Verify the response is valid based on what we requested.
fn verify_response(
&mut self,
response: Self::VerifiedResponseType,
) -> Result<Self::VerifiedResponseType, LookupVerifyError> {
let request_state = self.get_state_mut();
match request_state.state {
State::AwaitingDownload => {
request_state.register_failure_downloading();
Err(LookupVerifyError::ExtraBlocksReturned)
}
State::Downloading { peer_id } => Ok(response),
State::Processing { peer_id: _ } => {
// We sent the block for processing and received an extra block.
request_state.register_failure_downloading();
Err(LookupVerifyError::ExtraBlocksReturned)
}
}
}

/// A getter for the parent root of the response. Returns an `Option` because we won't know
/// the blob parent if we don't end up getting any blobs in the response.
fn get_parent_root(verified_response: &Self::VerifiedResponseType) -> Option<Hash256>;
Expand Down Expand Up @@ -231,7 +211,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
type ReconstructedResponseType = RpcBlock<T::EthSpec>;

fn new_request(&self) -> Self::RequestType {
self.requested_block_root
BlocksByRootSingleRequest(self.requested_block_root)
}

fn make_request(
Expand Down
201 changes: 92 additions & 109 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use self::parent_lookup::ParentVerifyError;
use self::single_block_lookup::SingleBlockLookup;
use super::manager::BlockProcessingResult;
use super::BatchProcessResult;
Expand Down Expand Up @@ -315,7 +314,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let id = lookup_id.id;
let response_type = R::response_type();

let Some(lookup) = self.get_single_lookup::<R>(lookup_id) else {
let Some(mut lookup) = self.get_single_lookup::<R>(lookup_id) else {
// We don't have the ability to cancel in-flight RPC requests. So this can happen
// if we started this RPC request, and later saw the block/blobs via gossip.
debug!(
Expand All @@ -335,9 +334,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"response_type" => ?response_type,
);

match self.single_lookup_response_inner::<R>(peer_id, response, seen_timestamp, cx, lookup)
{
Ok(lookup) => {
match self.handle_verified_response::<Current, R>(
seen_timestamp,
cx,
BlockProcessType::SingleBlock { id: lookup.id },
response,
&mut lookup,
) {
Ok(_) => {
self.single_block_lookups.insert(id, lookup);
}
Err(e) => {
Expand All @@ -357,48 +361,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

/// Consolidates error handling for `single_lookup_response`. An `Err` here should always mean
/// the lookup is dropped.
fn single_lookup_response_inner<R: RequestState<Current, T>>(
&self,
peer_id: PeerId,
response: R::VerifiedResponseType,
seen_timestamp: Duration,
cx: &mut SyncNetworkContext<T>,
mut lookup: SingleBlockLookup<Current, T>,
) -> Result<SingleBlockLookup<Current, T>, LookupRequestError> {
let response_type = R::response_type();
let log = self.log.clone();
let expected_block_root = lookup.block_root();
let request_state = R::request_state_mut(&mut lookup);

match request_state.verify_response(response) {
Ok(verified_response) => {
self.handle_verified_response::<Current, R>(
seen_timestamp,
cx,
BlockProcessType::SingleBlock { id: lookup.id },
verified_response,
&mut lookup,
)?;
}
Err(e) => {
debug!(
log,
"Single lookup response verification failed, retrying";
"block_root" => ?expected_block_root,
"peer_id" => %peer_id,
"response_type" => ?response_type,
"error" => ?e
);
let msg = e.into();
cx.report_peer(peer_id, PeerAction::LowToleranceError, msg);

request_state.register_failure_downloading();
lookup.request_block_and_blobs(cx)?;
}
}
Ok(lookup)
}

fn handle_verified_response<L: Lookup, R: RequestState<L, T>>(
&self,
seen_timestamp: Duration,
Expand Down Expand Up @@ -449,14 +411,21 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
lookup.request_block_and_blobs(cx)?;
}
CachedChild::NotRequired => R::send_reconstructed_for_processing(
id,
self,
block_root,
R::verified_to_reconstructed(block_root, verified_response),
seen_timestamp,
cx,
)?,
CachedChild::NotRequired => {
R::request_state_mut(lookup)
.get_state_mut()
.into_processing()
.map_err(LookupRequestError::BadState)?;

R::send_reconstructed_for_processing(
id,
self,
block_root,
R::verified_to_reconstructed(block_root, verified_response),
seen_timestamp,
cx,
)?
}
CachedChild::Err(e) => {
warn!(self.log, "Consistency error in cached block";
"error" => ?e,
Expand Down Expand Up @@ -550,53 +519,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx: &mut SyncNetworkContext<T>,
parent_lookup: &mut ParentLookup<T>,
) -> Result<(), RequestError> {
match parent_lookup.verify_response::<R>(response, &mut self.failed_chains) {
Ok(verified_response) => {
self.handle_verified_response::<Parent, R>(
seen_timestamp,
cx,
BlockProcessType::ParentLookup {
chain_hash: parent_lookup.chain_hash(),
},
verified_response,
&mut parent_lookup.current_parent_request,
)?;
}
Err(e) => self.handle_parent_verify_error::<R>(peer_id, parent_lookup, e, cx)?,
};
Ok(())
}

/// Handle logging and peer scoring for `ParentVerifyError`s during parent lookup requests.
fn handle_parent_verify_error<R: RequestState<Parent, T>>(
&mut self,
peer_id: PeerId,
parent_lookup: &mut ParentLookup<T>,
e: ParentVerifyError,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), RequestError> {
match e {
ParentVerifyError::RootMismatch
| ParentVerifyError::NoBlockReturned
| ParentVerifyError::NotEnoughBlobsReturned
| ParentVerifyError::ExtraBlocksReturned
| ParentVerifyError::UnrequestedBlobId(_)
| ParentVerifyError::InvalidInclusionProof
| ParentVerifyError::UnrequestedHeader
| ParentVerifyError::ExtraBlobsReturned
| ParentVerifyError::InvalidIndex(_) => {
let e = e.into();
warn!(self.log, "Peer sent invalid response to parent request";
"peer_id" => %peer_id, "reason" => %e);

// We do not tolerate these kinds of errors. We will accept a few but these are signs
// of a faulty peer.
cx.report_peer(peer_id, PeerAction::LowToleranceError, e);

// We try again if possible.
parent_lookup.request_parent(cx)?;
}
ParentVerifyError::PreviousFailure { parent_root } => {
// check if the parent of this block isn't in the failed cache. If it is, this chain should
// be dropped and the peer downscored.
if let Some(parent_root) = R::get_parent_root(&response) {
if self.failed_chains.contains(&parent_root) {
let request_state = R::request_state_mut(&mut parent_lookup.current_parent_request);
request_state.register_failure_downloading();
debug!(
self.log,
"Parent chain ignored due to past failure";
Expand All @@ -610,8 +538,20 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
PeerAction::MidToleranceError,
"bbroot_failed_chains",
);
return Ok(());
}
}

self.handle_verified_response::<Parent, R>(
seen_timestamp,
cx,
BlockProcessType::ParentLookup {
chain_hash: parent_lookup.chain_hash(),
},
response,
&mut parent_lookup.current_parent_request,
)?;

Ok(())
}

Expand Down Expand Up @@ -651,6 +591,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// This happens if the peer disconnects while the block is being
// processed. Drop the request without extra penalty
}
RequestError::BadState(..) => {
// Internal error should never happen
}
}
}

Expand Down Expand Up @@ -685,18 +628,22 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx: &mut SyncNetworkContext<T>,
error: RPCError,
) {
let msg = error.as_static_str();
// Downscore peer even if lookup is not known
self.downscore_on_rpc_error(peer_id, &error, cx);

let Some(mut parent_lookup) = self.get_parent_lookup::<R>(id) else {
debug!(self.log,
"RPC failure for a block parent lookup request that was not found";
"peer_id" => %peer_id,
"error" => msg
"error" => %error
);
return;
};
R::request_state_mut(&mut parent_lookup.current_parent_request)
.register_failure_downloading();
trace!(self.log, "Parent lookup block request failed"; &parent_lookup, "error" => msg);
debug!(self.log, "Parent lookup block request failed";
"chain_hash" => %parent_lookup.chain_hash(), "id" => ?id, "error" => %error
);

self.request_parent(parent_lookup, cx);

Expand All @@ -714,7 +661,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx: &mut SyncNetworkContext<T>,
error: RPCError,
) {
let msg = error.as_static_str();
// Downscore peer even if lookup is not known
self.downscore_on_rpc_error(peer_id, &error, cx);

let log = self.log.clone();
let Some(mut lookup) = self.get_single_lookup::<R>(id) else {
debug!(log, "Error response to dropped lookup"; "error" => ?error);
Expand All @@ -726,7 +675,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
trace!(log,
"Single lookup failed";
"block_root" => ?block_root,
"error" => msg,
"error" => %error,
"peer_id" => %peer_id,
"response_type" => ?response_type
);
Expand Down Expand Up @@ -1129,8 +1078,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
mut parent_lookup: ParentLookup<T>,
) {
// We should always have a block peer.
let Ok(block_peer_id) = parent_lookup.block_processing_peer() else {
return;
let block_peer_id = match parent_lookup.block_processing_peer() {
Ok(peer_id) => peer_id,
Err(e) => {
warn!(self.log, "Parent lookup in bad state"; "chain_hash" => %parent_lookup.chain_hash(), "error" => e);
return;
}
};

// We may not have a blob peer, if there were no blobs required for this block.
Expand Down Expand Up @@ -1370,4 +1323,34 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn drop_parent_chain_requests(&mut self) -> usize {
self.parent_lookups.drain(..).len()
}

pub fn downscore_on_rpc_error(
&self,
peer_id: &PeerId,
error: &RPCError,
cx: &SyncNetworkContext<T>,
) {
// Note: logging the report event here with the full error display. The log inside
// `report_peer` only includes a smaller string, like "invalid_data"
debug!(self.log, "reporting peer for sync lookup error"; "error" => %error);
if let Some(action) = match error {
// Protocol errors are heavily penalized
RPCError::SSZDecodeError(..)
| RPCError::IoError(..)
| RPCError::ErrorResponse(..)
| RPCError::InvalidData(..)
| RPCError::HandlerRejected => Some(PeerAction::LowToleranceError),
// Timing / network errors are less penalized
// TODO: Is IoError a protocol error or network error?
RPCError::StreamTimeout | RPCError::IncompleteStream | RPCError::NegotiationTimeout => {
Some(PeerAction::MidToleranceError)
}
// Not supporting a specific protocol is tolerated. TODO: Are you sure?
RPCError::UnsupportedProtocol => None,
// Our fault, don't penalize peer
RPCError::InternalError(..) | RPCError::Disconnected => None,
} {
cx.report_peer(*peer_id, action, error.into());
}
}
}
Loading

0 comments on commit 38d21f0

Please sign in to comment.