diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index dcfc8e8412a..5610e6aa630 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3033,6 +3033,41 @@ impl BeaconChain { self.remove_notified(&block_root, r) } + /// Cache the columns in the processing cache, process it, then evict it from the cache if it was + /// imported or errors. + pub async fn process_rpc_custody_columns( + self: &Arc, + custody_columns: DataColumnSidecarList, + ) -> Result> { + let Ok((slot, block_root)) = custody_columns + .iter() + .map(|c| (c.slot(), c.block_root())) + .unique() + .exactly_one() + else { + return Err(BlockError::InternalError( + "Columns should be from the same block".to_string(), + )); + }; + + // If this block has already been imported to forkchoice it must have been available, so + // we don't need to process its columns again. + if self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) + { + return Err(BlockError::BlockIsAlreadyKnown(block_root)); + } + + // TODO(das): custody column SSE event + + let r = self + .check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns) + .await; + self.remove_notified(&block_root, r) + } + /// Remove any block components from the *processing cache* if we no longer require them. If the /// block was imported full or erred, we no longer require them. fn remove_notified( @@ -3369,6 +3404,47 @@ impl BeaconChain { self.process_availability(slot, availability).await } + /// Checks if the provided columns can make any cached blocks available, and imports immediately + /// if so, otherwise caches the columns in the data availability checker. + async fn check_rpc_custody_columns_availability_and_import( + self: &Arc, + slot: Slot, + block_root: Hash256, + custody_columns: DataColumnSidecarList, + ) -> Result> { + // Need to scope this to ensure the lock is dropped before calling `process_availability` + // Even an explicit drop is not enough to convince the borrow checker. + { + let mut slashable_cache = self.observed_slashable.write(); + // Assumes all items in custody_columns are for the same block_root + if let Some(column) = custody_columns.first() { + let header = &column.signed_block_header; + if verify_header_signature::>(self, header).is_ok() { + slashable_cache + .observe_slashable( + header.message.slot, + header.message.proposer_index, + block_root, + ) + .map_err(|e| BlockError::BeaconChainError(e.into()))?; + if let Some(slasher) = self.slasher.as_ref() { + slasher.accept_block_header(header.clone()); + } + } + } + } + + // This slot value is purely informative for the consumers of + // `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot. + let availability = self.data_availability_checker.put_rpc_custody_columns( + block_root, + slot.epoch(T::EthSpec::slots_per_epoch()), + custody_columns, + )?; + + self.process_availability(slot, availability).await + } + /// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents` /// /// An error is returned if the block was unable to be imported. It may be partially imported diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 2178798bb60..c4ce93d266d 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -23,7 +23,9 @@ mod error; mod overflow_lru_cache; mod state_lru_cache; -use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn}; +use crate::data_column_verification::{ + GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, +}; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; use types::non_zero_usize::new_non_zero_usize; @@ -187,6 +189,37 @@ impl DataAvailabilityChecker { .put_kzg_verified_blobs(block_root, epoch, verified_blobs) } + /// Put a list of custody columns received via RPC into the availability cache. This performs KZG + /// verification on the blobs in the list. + pub fn put_rpc_custody_columns( + &self, + block_root: Hash256, + epoch: Epoch, + custody_columns: DataColumnSidecarList, + ) -> Result, AvailabilityCheckError> { + let Some(kzg) = self.kzg.as_ref() else { + return Err(AvailabilityCheckError::KzgNotInitialized); + }; + + // TODO(das): report which column is invalid for proper peer scoring + // TODO(das): batch KZG verification here + let verified_custody_columns = custody_columns + .iter() + .map(|column| { + Ok(KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::new(column.clone(), kzg) + .map_err(AvailabilityCheckError::Kzg)?, + )) + }) + .collect::, AvailabilityCheckError>>()?; + + self.availability_cache.put_kzg_verified_data_columns( + block_root, + epoch, + verified_custody_columns, + ) + } + /// Check if we've cached other blobs for this block. If it completes a set and we also /// have a block cached, return the `Availability` variant triggering block import. /// Otherwise cache the blob sidecar. diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 34a40282737..a85ef6476f5 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -442,8 +442,6 @@ impl DataAvailabilityCheckerInner { } } - // TODO(das): rpc code paths to be implemented. - #[allow(dead_code)] pub fn put_kzg_verified_data_columns< I: IntoIterator>, >( diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index af3fbab6aed..6a4efd605df 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -177,7 +177,7 @@ impl GossipVerifiedDataColumn { pub fn id(&self) -> DataColumnIdentifier { DataColumnIdentifier { block_root: self.block_root, - index: self.data_column.data_column_index(), + index: self.data_column.index(), } } @@ -221,7 +221,7 @@ impl KzgVerifiedDataColumn { self.data.clone() } - pub fn data_column_index(&self) -> u64 { + pub fn index(&self) -> ColumnIndex { self.data.index } } diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 68c33e99baf..6ce3b64acfe 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -108,6 +108,7 @@ pub struct BeaconProcessorQueueLengths { unknown_light_client_update_queue: usize, rpc_block_queue: usize, rpc_blob_queue: usize, + rpc_custody_column_queue: usize, chain_segment_queue: usize, backfill_chain_segment: usize, gossip_block_queue: usize, @@ -163,6 +164,7 @@ impl BeaconProcessorQueueLengths { unknown_light_client_update_queue: 128, rpc_block_queue: 1024, rpc_blob_queue: 1024, + rpc_custody_column_queue: 1024, chain_segment_queue: 64, backfill_chain_segment: 64, gossip_block_queue: 1024, @@ -228,6 +230,7 @@ pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic pub const RPC_BLOCK: &str = "rpc_block"; pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block"; pub const RPC_BLOBS: &str = "rpc_blob"; +pub const RPC_CUSTODY_COLUMN: &str = "rpc_custody_column"; pub const CHAIN_SEGMENT: &str = "chain_segment"; pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill"; pub const STATUS_PROCESSING: &str = "status_processing"; @@ -606,6 +609,7 @@ pub enum Work { RpcBlobs { process_fn: AsyncFn, }, + RpcCustodyColumn(AsyncFn), IgnoredRpcBlock { process_fn: BlockingFn, }, @@ -653,6 +657,7 @@ impl Work { Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE, Work::RpcBlock { .. } => RPC_BLOCK, Work::RpcBlobs { .. } => RPC_BLOBS, + Work::RpcCustodyColumn { .. } => RPC_CUSTODY_COLUMN, Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK, Work::ChainSegment { .. } => CHAIN_SEGMENT, Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL, @@ -815,6 +820,7 @@ impl BeaconProcessor { // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue); let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue); + let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue); let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); @@ -970,6 +976,8 @@ impl BeaconProcessor { self.spawn_worker(item, idle_tx); } else if let Some(item) = rpc_blob_queue.pop() { self.spawn_worker(item, idle_tx); + } else if let Some(item) = rpc_custody_column_queue.pop() { + self.spawn_worker(item, idle_tx); // Check delayed blocks before gossip blocks, the gossip blocks might rely // on the delayed ones. } else if let Some(item) = delayed_block_queue.pop() { @@ -1262,6 +1270,9 @@ impl BeaconProcessor { rpc_block_queue.push(work, work_id, &self.log) } Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log), + Work::RpcCustodyColumn { .. } => { + rpc_custody_column_queue.push(work, work_id, &self.log) + } Work::ChainSegment { .. } => { chain_segment_queue.push(work, work_id, &self.log) } @@ -1497,9 +1508,9 @@ impl BeaconProcessor { beacon_block_root: _, process_fn, } => task_spawner.spawn_async(process_fn), - Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => { - task_spawner.spawn_async(process_fn) - } + Work::RpcBlock { process_fn } + | Work::RpcBlobs { process_fn } + | Work::RpcCustodyColumn(process_fn) => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), Work::GossipBlock(work) | Work::GossipBlobSidecar(work) diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 9fb14fdcb8c..cb21b6dfb50 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -476,6 +476,30 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for some custody columns. `process_rpc_custody_columns` reports + /// the result back to sync. + pub fn send_rpc_custody_columns( + self: &Arc, + block_root: Hash256, + custody_columns: DataColumnSidecarList, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Result<(), Error> { + let s = self.clone(); + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::RpcCustodyColumn(Box::pin(async move { + s.process_rpc_custody_columns( + block_root, + custody_columns, + seen_timestamp, + process_type, + ) + .await; + })), + }) + } + /// Create a new work event to import `blocks` as a beacon chain segment. pub fn send_chain_segment( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 68bd6745144..495d1cd92be 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -24,7 +24,7 @@ use store::KzgCommitment; use tokio::sync::mpsc; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; -use types::BlockImportSource; +use types::{BlockImportSource, DataColumnSidecarList}; use types::{Epoch, Hash256}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. @@ -307,6 +307,60 @@ impl NetworkBeaconProcessor { }); } + pub async fn process_rpc_custody_columns( + self: Arc>, + block_root: Hash256, + custody_columns: DataColumnSidecarList, + _seen_timestamp: Duration, + process_type: BlockProcessType, + ) { + let result = self + .chain + .process_rpc_custody_columns(custody_columns) + .await; + + match &result { + Ok(availability) => match availability { + AvailabilityProcessingStatus::Imported(hash) => { + debug!( + self.log, + "Block components retrieved"; + "result" => "imported block and custody columns", + "block_hash" => %hash, + ); + self.chain.recompute_head_at_current_slot().await; + } + AvailabilityProcessingStatus::MissingComponents(_, _) => { + debug!( + self.log, + "Missing components over rpc"; + "block_hash" => %block_root, + ); + } + }, + Err(BlockError::BlockIsAlreadyKnown(_)) => { + debug!( + self.log, + "Custody columns have already been imported"; + "block_hash" => %block_root, + ); + } + Err(e) => { + warn!( + self.log, + "Error when importing rpc custody columns"; + "error" => ?e, + "block_hash" => %block_root, + ); + } + } + + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type, + result: result.into(), + }); + } + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. pub async fn process_chain_segment(