Skip to content

Commit

Permalink
Move sync active requests to own modules (#6272)
Browse files Browse the repository at this point in the history
* Move sync active requests to own modules

* Merge branch 'unstable' into sync-requests-modules
  • Loading branch information
dapplion authored Sep 12, 2024
1 parent 351dd6c commit e0ccadb
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 154 deletions.
159 changes: 5 additions & 154 deletions beacon_node/network/src/sync/network_context/requests.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use beacon_chain::get_block_root;
use lighthouse_network::{
rpc::{methods::BlobsByRootRequest, BlocksByRootRequest},
PeerId,
};
use std::sync::Arc;
use strum::IntoStaticStr;
use types::{
blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock,
};
use types::Hash256;

pub use blobs_by_root::{ActiveBlobsByRootRequest, BlobsByRootSingleBlockRequest};
pub use blocks_by_root::{ActiveBlocksByRootRequest, BlocksByRootSingleRequest};
pub use data_columns_by_root::{
ActiveDataColumnsByRootRequest, DataColumnsByRootSingleBlockRequest,
};

mod blobs_by_root;
mod blocks_by_root;
mod data_columns_by_root;

#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
Expand All @@ -25,148 +21,3 @@ pub enum LookupVerifyError {
InvalidInclusionProof,
DuplicateData,
}

pub struct ActiveBlocksByRootRequest {
request: BlocksByRootSingleRequest,
resolved: bool,
pub(crate) peer_id: PeerId,
}

impl ActiveBlocksByRootRequest {
pub fn new(request: BlocksByRootSingleRequest, peer_id: PeerId) -> Self {
Self {
request,
resolved: false,
peer_id,
}
}

/// Append a response to the single chunk request. If the chunk is valid, the request is
/// resolved immediately.
/// The active request SHOULD be dropped after `add_response` returns an error
pub fn add_response<E: EthSpec>(
&mut self,
block: Arc<SignedBeaconBlock<E>>,
) -> Result<Arc<SignedBeaconBlock<E>>, LookupVerifyError> {
if self.resolved {
return Err(LookupVerifyError::TooManyResponses);
}

let block_root = get_block_root(&block);
if self.request.0 != block_root {
return Err(LookupVerifyError::UnrequestedBlockRoot(block_root));
}

// Valid data, blocks by root expects a single response
self.resolved = true;
Ok(block)
}

pub fn terminate(self) -> Result<(), LookupVerifyError> {
if self.resolved {
Ok(())
} else {
Err(LookupVerifyError::NoResponseReturned)
}
}
}

#[derive(Debug, Copy, Clone)]
pub struct BlocksByRootSingleRequest(pub Hash256);

impl BlocksByRootSingleRequest {
pub fn into_request(self, spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![self.0], spec)
}
}

#[derive(Debug, Clone)]
pub struct BlobsByRootSingleBlockRequest {
pub block_root: Hash256,
pub indices: Vec<u64>,
}

impl BlobsByRootSingleBlockRequest {
pub fn into_request(self, spec: &ChainSpec) -> BlobsByRootRequest {
BlobsByRootRequest::new(
self.indices
.into_iter()
.map(|index| BlobIdentifier {
block_root: self.block_root,
index,
})
.collect(),
spec,
)
}
}

pub struct ActiveBlobsByRootRequest<E: EthSpec> {
request: BlobsByRootSingleBlockRequest,
blobs: Vec<Arc<BlobSidecar<E>>>,
resolved: bool,
pub(crate) peer_id: PeerId,
}

impl<E: EthSpec> ActiveBlobsByRootRequest<E> {
pub fn new(request: BlobsByRootSingleBlockRequest, peer_id: PeerId) -> Self {
Self {
request,
blobs: vec![],
resolved: false,
peer_id,
}
}

/// Appends a chunk to this multi-item request. If all expected chunks are received, this
/// method returns `Some`, resolving the request before the stream terminator.
/// The active request SHOULD be dropped after `add_response` returns an error
pub fn add_response(
&mut self,
blob: Arc<BlobSidecar<E>>,
) -> Result<Option<Vec<Arc<BlobSidecar<E>>>>, LookupVerifyError> {
if self.resolved {
return Err(LookupVerifyError::TooManyResponses);
}

let block_root = blob.block_root();
if self.request.block_root != block_root {
return Err(LookupVerifyError::UnrequestedBlockRoot(block_root));
}
if !blob.verify_blob_sidecar_inclusion_proof() {
return Err(LookupVerifyError::InvalidInclusionProof);
}
if !self.request.indices.contains(&blob.index) {
return Err(LookupVerifyError::UnrequestedIndex(blob.index));
}
if self.blobs.iter().any(|b| b.index == blob.index) {
return Err(LookupVerifyError::DuplicateData);
}

self.blobs.push(blob);
if self.blobs.len() >= self.request.indices.len() {
// All expected chunks received, return result early
self.resolved = true;
Ok(Some(std::mem::take(&mut self.blobs)))
} else {
Ok(None)
}
}

pub fn terminate(self) -> Result<(), LookupVerifyError> {
if self.resolved {
Ok(())
} else {
Err(LookupVerifyError::NotEnoughResponsesReturned {
expected: self.request.indices.len(),
actual: self.blobs.len(),
})
}
}

/// Mark request as resolved (= has returned something downstream) while marking this status as
/// true for future calls.
pub fn resolve(&mut self) -> bool {
std::mem::replace(&mut self.resolved, true)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use lighthouse_network::{rpc::methods::BlobsByRootRequest, PeerId};
use std::sync::Arc;
use types::{blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256};

use super::LookupVerifyError;

#[derive(Debug, Clone)]
pub struct BlobsByRootSingleBlockRequest {
pub block_root: Hash256,
pub indices: Vec<u64>,
}

impl BlobsByRootSingleBlockRequest {
pub fn into_request(self, spec: &ChainSpec) -> BlobsByRootRequest {
BlobsByRootRequest::new(
self.indices
.into_iter()
.map(|index| BlobIdentifier {
block_root: self.block_root,
index,
})
.collect(),
spec,
)
}
}

pub struct ActiveBlobsByRootRequest<E: EthSpec> {
request: BlobsByRootSingleBlockRequest,
blobs: Vec<Arc<BlobSidecar<E>>>,
resolved: bool,
pub(crate) peer_id: PeerId,
}

impl<E: EthSpec> ActiveBlobsByRootRequest<E> {
pub fn new(request: BlobsByRootSingleBlockRequest, peer_id: PeerId) -> Self {
Self {
request,
blobs: vec![],
resolved: false,
peer_id,
}
}

/// Appends a chunk to this multi-item request. If all expected chunks are received, this
/// method returns `Some`, resolving the request before the stream terminator.
/// The active request SHOULD be dropped after `add_response` returns an error
pub fn add_response(
&mut self,
blob: Arc<BlobSidecar<E>>,
) -> Result<Option<Vec<Arc<BlobSidecar<E>>>>, LookupVerifyError> {
if self.resolved {
return Err(LookupVerifyError::TooManyResponses);
}

let block_root = blob.block_root();
if self.request.block_root != block_root {
return Err(LookupVerifyError::UnrequestedBlockRoot(block_root));
}
if !blob.verify_blob_sidecar_inclusion_proof() {
return Err(LookupVerifyError::InvalidInclusionProof);
}
if !self.request.indices.contains(&blob.index) {
return Err(LookupVerifyError::UnrequestedIndex(blob.index));
}
if self.blobs.iter().any(|b| b.index == blob.index) {
return Err(LookupVerifyError::DuplicateData);
}

self.blobs.push(blob);
if self.blobs.len() >= self.request.indices.len() {
// All expected chunks received, return result early
self.resolved = true;
Ok(Some(std::mem::take(&mut self.blobs)))
} else {
Ok(None)
}
}

pub fn terminate(self) -> Result<(), LookupVerifyError> {
if self.resolved {
Ok(())
} else {
Err(LookupVerifyError::NotEnoughResponsesReturned {
expected: self.request.indices.len(),
actual: self.blobs.len(),
})
}
}

/// Mark request as resolved (= has returned something downstream) while marking this status as
/// true for future calls.
pub fn resolve(&mut self) -> bool {
std::mem::replace(&mut self.resolved, true)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use beacon_chain::get_block_root;
use lighthouse_network::{rpc::BlocksByRootRequest, PeerId};
use std::sync::Arc;
use types::{ChainSpec, EthSpec, Hash256, SignedBeaconBlock};

use super::LookupVerifyError;

#[derive(Debug, Copy, Clone)]
pub struct BlocksByRootSingleRequest(pub Hash256);

impl BlocksByRootSingleRequest {
pub fn into_request(self, spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![self.0], spec)
}
}

pub struct ActiveBlocksByRootRequest {
request: BlocksByRootSingleRequest,
resolved: bool,
pub(crate) peer_id: PeerId,
}

impl ActiveBlocksByRootRequest {
pub fn new(request: BlocksByRootSingleRequest, peer_id: PeerId) -> Self {
Self {
request,
resolved: false,
peer_id,
}
}

/// Append a response to the single chunk request. If the chunk is valid, the request is
/// resolved immediately.
/// The active request SHOULD be dropped after `add_response` returns an error
pub fn add_response<E: EthSpec>(
&mut self,
block: Arc<SignedBeaconBlock<E>>,
) -> Result<Arc<SignedBeaconBlock<E>>, LookupVerifyError> {
if self.resolved {
return Err(LookupVerifyError::TooManyResponses);
}

let block_root = get_block_root(&block);
if self.request.0 != block_root {
return Err(LookupVerifyError::UnrequestedBlockRoot(block_root));
}

// Valid data, blocks by root expects a single response
self.resolved = true;
Ok(block)
}

pub fn terminate(self) -> Result<(), LookupVerifyError> {
if self.resolved {
Ok(())
} else {
Err(LookupVerifyError::NoResponseReturned)
}
}
}

0 comments on commit e0ccadb

Please sign in to comment.