From 934ebcbb035090fe44a8881f27a1e5810dfadc1c Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 17 Sep 2024 14:28:48 +1000 Subject: [PATCH 1/9] Move reconstruction logic out of `overflow_lru_cache` to simplify the code and avoids having to pass `DataColumnsToPublish` around and blocking other processing. --- beacon_node/beacon_chain/src/beacon_chain.rs | 111 +++++++------ beacon_node/beacon_chain/src/builder.rs | 1 + .../src/data_availability_checker.rs | 147 +++++++++++++++--- .../overflow_lru_cache.rs | 102 +++--------- .../src/data_column_verification.rs | 11 +- beacon_node/beacon_chain/src/metrics.rs | 16 ++ .../gossip_methods.rs | 35 +---- .../src/network_beacon_processor/mod.rs | 75 ++++++++- .../network_beacon_processor/sync_methods.rs | 47 +++--- 9 files changed, 327 insertions(+), 218 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index bf660c9eaf9..e1b4972285b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -22,7 +22,6 @@ pub use crate::canonical_head::CanonicalHead; use crate::chain_config::ChainConfig; use crate::data_availability_checker::{ Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, - DataColumnsToPublish, }; use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use crate::early_attester_cache::EarlyAttesterCache; @@ -3012,13 +3011,7 @@ impl BeaconChain { pub async fn process_gossip_data_columns( self: &Arc, data_columns: Vec>, - ) -> Result< - ( - AvailabilityProcessingStatus, - DataColumnsToPublish, - ), - BlockError, - > { + ) -> Result { let Ok((slot, block_root)) = data_columns .iter() .map(|c| (c.slot(), c.block_root())) @@ -3102,13 +3095,7 @@ impl BeaconChain { pub async fn process_rpc_custody_columns( self: &Arc, custody_columns: DataColumnSidecarList, - ) -> Result< - ( - AvailabilityProcessingStatus, - DataColumnsToPublish, - ), - BlockError, - > { + ) -> Result { let Ok((slot, block_root)) = custody_columns .iter() .map(|c| (c.slot(), c.block_root())) @@ -3149,6 +3136,45 @@ impl BeaconChain { self.remove_notified_custody_columns(&block_root, r) } + pub async fn reconstruct_data_columns( + self: &Arc, + block_root: Hash256, + ) -> Result< + Option<( + AvailabilityProcessingStatus, + DataColumnSidecarList, + )>, + BlockError, + > { + // As of now we only reconstruct data columns on supernodes, so if the block is already + // available on a supernode, there's no need to reconstruct as the node must already have + // all columns. + if self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) + { + return Ok(None); + } + + let Some((availability, data_columns_to_publish)) = self + .data_availability_checker + .reconstruct_data_columns(&block_root)? + else { + return Ok(None); + }; + + let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { + return Ok(None); + }; + + let r = self.process_availability(slot, availability).await; + self.remove_notified_custody_columns(&block_root, r) + .map(|availability_processing_status| { + Some((availability_processing_status, data_columns_to_publish)) + }) + } + /// Remove any block components from the *processing cache* if we no longer require them. If the /// block was imported full or erred, we no longer require them. fn remove_notified( @@ -3166,15 +3192,13 @@ impl BeaconChain { /// Remove any block components from the *processing cache* if we no longer require them. If the /// block was imported full or erred, we no longer require them. - fn remove_notified_custody_columns

( + fn remove_notified_custody_columns( &self, block_root: &Hash256, - r: Result<(AvailabilityProcessingStatus, P), BlockError>, - ) -> Result<(AvailabilityProcessingStatus, P), BlockError> { - let has_missing_components = matches!( - r, - Ok((AvailabilityProcessingStatus::MissingComponents(_, _), _)) - ); + r: Result, + ) -> Result { + let has_missing_components = + matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _))); if !has_missing_components { self.reqresp_pre_import_cache.write().remove(block_root); } @@ -3432,26 +3456,20 @@ impl BeaconChain { slot: Slot, block_root: Hash256, data_columns: Vec>, - ) -> Result< - ( - AvailabilityProcessingStatus, - DataColumnsToPublish, - ), - BlockError, - > { + ) -> Result { if let Some(slasher) = self.slasher.as_ref() { for data_colum in &data_columns { slasher.accept_block_header(data_colum.signed_block_header()); } } - let (availability, data_columns_to_publish) = self - .data_availability_checker - .put_gossip_data_columns(slot, block_root, data_columns)?; + let availability = self.data_availability_checker.put_gossip_data_columns( + slot, + block_root, + data_columns, + )?; - self.process_availability(slot, availability) - .await - .map(|result| (result, data_columns_to_publish)) + self.process_availability(slot, availability).await } /// Checks if the provided blobs can make any cached blocks available, and imports immediately @@ -3500,13 +3518,7 @@ impl BeaconChain { slot: Slot, block_root: Hash256, custody_columns: DataColumnSidecarList, - ) -> Result< - ( - AvailabilityProcessingStatus, - DataColumnsToPublish, - ), - BlockError, - > { + ) -> Result { // Need to scope this to ensure the lock is dropped before calling `process_availability` // Even an explicit drop is not enough to convince the borrow checker. { @@ -3531,16 +3543,13 @@ impl BeaconChain { // This slot value is purely informative for the consumers of // `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot. - let (availability, data_columns_to_publish) = - self.data_availability_checker.put_rpc_custody_columns( - block_root, - slot.epoch(T::EthSpec::slots_per_epoch()), - custody_columns, - )?; + let availability = self.data_availability_checker.put_rpc_custody_columns( + block_root, + slot.epoch(T::EthSpec::slots_per_epoch()), + custody_columns, + )?; - self.process_availability(slot, availability) - .await - .map(|result| (result, data_columns_to_publish)) + self.process_availability(slot, availability).await } /// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents` diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index d38530b9049..21591e56b75 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -989,6 +989,7 @@ where store, self.import_all_data_columns, self.spec, + log.new(o!("service" => "data_availability_checker")), ) .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, ), diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 470cee713fa..aa366f489a5 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -2,10 +2,12 @@ use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, Kzg use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, }; -use crate::data_availability_checker::overflow_lru_cache::DataAvailabilityCheckerInner; -use crate::{BeaconChain, BeaconChainTypes, BeaconStore}; +use crate::data_availability_checker::overflow_lru_cache::{ + DataAvailabilityCheckerInner, PendingComponents, +}; +use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore}; use kzg::Kzg; -use slog::{debug, error}; +use slog::{debug, error, Logger}; use slot_clock::SlotClock; use std::fmt; use std::fmt::Debug; @@ -27,11 +29,12 @@ use crate::data_column_verification::{ verify_kzg_for_data_column_list, CustodyDataColumn, GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, }; +use crate::metrics::{ + KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, +}; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; use types::non_zero_usize::new_non_zero_usize; -pub use self::overflow_lru_cache::DataColumnsToPublish; - /// The LRU Cache stores `PendingComponents` which can store up to /// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So /// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this @@ -71,8 +74,11 @@ pub struct DataAvailabilityChecker { slot_clock: T::SlotClock, kzg: Option>, spec: Arc, + log: Logger, } +pub type AvailabilityAndReconstructedColumns = (Availability, DataColumnSidecarList); + /// This type is returned after adding a block / blob to the `DataAvailabilityChecker`. /// /// Indicates if the block is fully `Available` or if we need blobs or blocks @@ -101,6 +107,7 @@ impl DataAvailabilityChecker { store: BeaconStore, import_all_data_columns: bool, spec: ChainSpec, + log: Logger, ) -> Result { let spec = Arc::new(spec); let custody_subnet_count = if import_all_data_columns { @@ -123,6 +130,7 @@ impl DataAvailabilityChecker { slot_clock, kzg, spec, + log, }) } @@ -215,8 +223,7 @@ impl DataAvailabilityChecker { block_root: Hash256, epoch: Epoch, custody_columns: DataColumnSidecarList, - ) -> Result<(Availability, DataColumnsToPublish), AvailabilityCheckError> - { + ) -> Result, AvailabilityCheckError> { let Some(kzg) = self.kzg.as_ref() else { return Err(AvailabilityCheckError::KzgNotInitialized); }; @@ -233,7 +240,6 @@ impl DataAvailabilityChecker { .collect::, AvailabilityCheckError>>()?; self.availability_cache.put_kzg_verified_data_columns( - kzg, block_root, epoch, verified_custody_columns, @@ -267,11 +273,7 @@ impl DataAvailabilityChecker { slot: Slot, block_root: Hash256, gossip_data_columns: Vec>, - ) -> Result<(Availability, DataColumnsToPublish), AvailabilityCheckError> - { - let Some(kzg) = self.kzg.as_ref() else { - return Err(AvailabilityCheckError::KzgNotInitialized); - }; + ) -> Result, AvailabilityCheckError> { let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); let custody_columns = gossip_data_columns @@ -279,12 +281,8 @@ impl DataAvailabilityChecker { .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) .collect::>(); - self.availability_cache.put_kzg_verified_data_columns( - kzg, - block_root, - epoch, - custody_columns, - ) + self.availability_cache + .put_kzg_verified_data_columns(block_root, epoch, custody_columns) } /// Check if we have all the blobs for a block. Returns `Availability` which has information @@ -529,6 +527,117 @@ impl DataAvailabilityChecker { block_cache_size: self.availability_cache.block_cache_size(), } } + + pub fn reconstruct_data_columns( + &self, + block_root: &Hash256, + ) -> Result>, AvailabilityCheckError> + { + let Some(kzg) = self.kzg.as_ref() else { + return Err(AvailabilityCheckError::KzgNotInitialized); + }; + + // Clone the pending components, so we don't hold the read lock during reconstruction + let Some(pending_components) = self + .availability_cache + .peek_pending_components(block_root, |pending_components_opt| { + pending_components_opt.cloned() + }) + else { + // Block may have been imported as it does not exist in availability cache. + return Ok(None); + }; + + if !self.should_reconstruct(&pending_components) { + return Ok(None); + } + + self.availability_cache + .set_reconstruction_started(block_root); + + metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS); + let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME); + + let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns( + kzg, + &pending_components.verified_data_columns, + &self.spec, + ) + .inspect_err(|e| { + error!( + self.log, + "Error reconstructing data columns"; + "block_root" => ?block_root, + "error" => ?e + ); + metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES); + })?; + + // Check indices from cache again to make sure we don't publish components we've already received. + let Some(existing_column_indices) = self.imported_custody_column_indexes(block_root) else { + // If block is already imported (no longer in cache), abort publishing data columns + // TODO(das) This assumes only supernodes do reconstructions (i.e. custody + // requirement = all columns). This behaviour is likely to change in the future when we + // get to 2D PeerDAS. + return Ok(None); + }; + + let data_columns_to_publish = all_data_columns + .into_iter() + .filter(|d| !existing_column_indices.contains(&d.index())) + .collect::>(); + + // Return if there's no data columns to publish. + let Some(slot) = data_columns_to_publish + .first() + .map(|d| d.as_data_column().slot()) + else { + return Ok(None); + }; + + metrics::stop_timer(timer); + metrics::inc_counter_by( + &metrics::DATA_AVAILABILITY_RECONSTRUCTED_COLUMNS, + data_columns_to_publish.len() as u64, + ); + + debug!(self.log, "Reconstructed columns"; + "count" => data_columns_to_publish.len(), + "block_root" => ?block_root, + "slot" => slot, + ); + + self.availability_cache + .put_kzg_verified_data_columns( + *block_root, + slot.epoch(T::EthSpec::slots_per_epoch()), + data_columns_to_publish.clone(), + ) + .map(|availability| { + ( + availability, + data_columns_to_publish + .into_iter() + .map(|d| d.clone_arc()) + .collect::>(), + ) + }) + .map(Some) + } + + /// Potentially trigger reconstruction if: + /// - Our custody requirement is all columns, and we haven't got all columns + /// - We have >= 50% of columns, but not all columns + /// - Reconstruction hasn't been started for the block + fn should_reconstruct(&self, pending_components: &PendingComponents) -> bool { + let received_column_count = pending_components.verified_data_columns.len(); + let custody_column_count = self.get_custody_columns_count(); + let total_column_count = self.spec.number_of_columns; + custody_column_count == total_column_count + && received_column_count < total_column_count + && received_column_count >= total_column_count / 2 + && !pending_components.reconstruction_started + } } /// Helper struct to group data availability checker metrics. diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 36c5a9359dd..f58444866e5 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -6,23 +6,18 @@ use crate::block_verification_types::{ }; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; use crate::data_column_verification::KzgVerifiedCustodyDataColumn; -use crate::metrics; use crate::BeaconChainTypes; -use kzg::Kzg; use lru::LruCache; use parking_lot::RwLock; use ssz_types::{FixedVector, VariableList}; -use std::collections::HashSet; use std::num::NonZeroUsize; use std::sync::Arc; use types::blob_sidecar::BlobIdentifier; use types::{ - BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, - DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock, + BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec, + Hash256, SignedBeaconBlock, }; -pub type DataColumnsToPublish = Option>; - /// This represents the components of a partially available block /// /// The blobs are all gossip and kzg verified. @@ -311,6 +306,11 @@ impl PendingComponents { ))) } + /// Mark reconstruction as started for this `PendingComponent`. + /// + /// NOTE: currently this value never reverts to false once it's set here. This means + /// reconstruction will only be attempted once. This is intentional because currently + /// reconstruction could only fail due to code errors or kzg errors, which shouldn't be retried. pub fn reconstruction_started(&mut self) { self.reconstruction_started = true; } @@ -448,28 +448,6 @@ impl DataAvailabilityCheckerInner { } } - /// Potentially trigger reconstruction if: - /// - Our custody requirement is all columns - /// - We >= 50% of columns, but not all columns - fn should_reconstruct( - &self, - block_import_requirement: &BlockImportRequirement, - pending_components: &PendingComponents, - ) -> bool { - let BlockImportRequirement::CustodyColumns(num_expected_columns) = block_import_requirement - else { - return false; - }; - - let num_of_columns = self.spec.number_of_columns; - let has_missing_columns = pending_components.verified_data_columns.len() < num_of_columns; - - has_missing_columns - && !pending_components.reconstruction_started - && *num_expected_columns == num_of_columns - && pending_components.verified_data_columns.len() >= num_of_columns / 2 - } - pub fn put_kzg_verified_blobs>>( &self, block_root: Hash256, @@ -514,12 +492,10 @@ impl DataAvailabilityCheckerInner { I: IntoIterator>, >( &self, - kzg: &Kzg, block_root: Hash256, epoch: Epoch, kzg_verified_data_columns: I, - ) -> Result<(Availability, DataColumnsToPublish), AvailabilityCheckError> - { + ) -> Result, AvailabilityCheckError> { let mut write_lock = self.critical.write(); // Grab existing entry or create a new entry. @@ -533,64 +509,22 @@ impl DataAvailabilityCheckerInner { let block_import_requirement = self.block_import_requirement(epoch)?; - // Potentially trigger reconstruction if: - // - Our custody requirement is all columns - // - We >= 50% of columns - let data_columns_to_publish = - if self.should_reconstruct(&block_import_requirement, &pending_components) { - pending_components.reconstruction_started(); - - let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME); - - let existing_column_indices = pending_components - .verified_data_columns - .iter() - .map(|d| d.index()) - .collect::>(); - - // Will only return an error if: - // - < 50% of columns - // - There are duplicates - let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns( - kzg, - pending_components.verified_data_columns.as_slice(), - &self.spec, - )?; - - let data_columns_to_publish = all_data_columns - .iter() - .filter(|d| !existing_column_indices.contains(&d.index())) - .map(|d| d.clone_arc()) - .collect::>(); - - pending_components.verified_data_columns = all_data_columns; - - metrics::stop_timer(timer); - metrics::inc_counter_by( - &metrics::DATA_AVAILABILITY_RECONSTRUCTED_COLUMNS, - data_columns_to_publish.len() as u64, - ); - - Some(data_columns_to_publish) - } else { - None - }; - if pending_components.is_available(&block_import_requirement) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); - pending_components - .make_available(block_import_requirement, &self.spec, |diet_block| { - self.state_cache.recover_pending_executed_block(diet_block) - }) - .map(|availability| (availability, data_columns_to_publish)) + pending_components.make_available(block_import_requirement, &self.spec, |diet_block| { + self.state_cache.recover_pending_executed_block(diet_block) + }) } else { write_lock.put(block_root, pending_components); - Ok(( - Availability::MissingComponents(block_root), - data_columns_to_publish, - )) + Ok(Availability::MissingComponents(block_root)) + } + } + + pub fn set_reconstruction_started(&self, block_root: &Hash256) { + if let Some(pending_components_mut) = self.critical.write().get_mut(block_root) { + pending_components_mut.reconstruction_started(); } } diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index f4a5feaee2a..43a9e5128f1 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -300,10 +300,7 @@ impl KzgVerifiedCustodyDataColumn { kzg: &Kzg, partial_set_of_columns: &[Self], spec: &ChainSpec, - ) -> Result, KzgError> { - // Will only return an error if: - // - < 50% of columns - // - There are duplicates + ) -> Result>, KzgError> { let all_data_columns = reconstruct_data_columns( kzg, &partial_set_of_columns @@ -315,10 +312,8 @@ impl KzgVerifiedCustodyDataColumn { Ok(all_data_columns .into_iter() - .map(|d| { - KzgVerifiedCustodyDataColumn::from_asserted_custody(KzgVerifiedDataColumn { - data: d, - }) + .map(|data| { + KzgVerifiedCustodyDataColumn::from_asserted_custody(KzgVerifiedDataColumn { data }) }) .collect::>()) } diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index f15b46fc4bf..55327d0fb05 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1887,6 +1887,22 @@ pub static DATA_AVAILABILITY_RECONSTRUCTED_COLUMNS: LazyLock> ) }); +pub static KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "kzg_data_column_reconstruction_attempts", + "Count of times data column reconstruction has been attempted", + ) + }); + +pub static KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "kzg_data_column_reconstruction_failures", + "Count of times data column reconstruction has failed", + ) + }); + /* * light_client server metrics */ diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 62f1371c811..52aa4bc329b 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4,6 +4,7 @@ use crate::{ service::NetworkMessage, sync::SyncMessage, }; +use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use beacon_chain::store::Error; @@ -18,13 +19,7 @@ use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock, NotifyExecutionLayer, }; -use beacon_chain::{ - blob_verification::{GossipBlobError, GossipVerifiedBlob}, - data_availability_checker::DataColumnsToPublish, -}; -use lighthouse_network::{ - Client, MessageAcceptance, MessageId, PeerAction, PeerId, PubsubMessage, ReportSource, -}; +use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use operation_pool::ReceivedPreCapella; use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; @@ -171,26 +166,6 @@ impl NetworkBeaconProcessor { }) } - pub(crate) fn handle_data_columns_to_publish( - &self, - data_columns_to_publish: DataColumnsToPublish, - ) { - if let Some(data_columns_to_publish) = data_columns_to_publish { - self.send_network_message(NetworkMessage::Publish { - messages: data_columns_to_publish - .iter() - .map(|d| { - let subnet = DataColumnSubnetId::from_column_index::( - d.index as usize, - &self.chain.spec, - ); - PubsubMessage::DataColumnSidecar(Box::new((subnet, d.clone()))) - }) - .collect(), - }); - } - } - /// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on /// the gossip network. /// @@ -1019,9 +994,7 @@ impl NetworkBeaconProcessor { .process_gossip_data_columns(vec![verified_data_column]) .await { - Ok((availability, data_columns_to_publish)) => { - self.handle_data_columns_to_publish(data_columns_to_publish); - + Ok(availability) => { match availability { AvailabilityProcessingStatus::Imported(block_root) => { // Note: Reusing block imported metric here @@ -1049,7 +1022,7 @@ impl NetworkBeaconProcessor { "block_root" => %block_root, ); - // Potentially trigger reconstruction + self.attempt_data_column_reconstruction(block_root).await; } } } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7f551c544c7..f82470ecc2b 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -2,7 +2,9 @@ use crate::sync::manager::BlockProcessType; use crate::sync::SamplingId; use crate::{service::NetworkMessage, sync::manager::SyncMessage}; use beacon_chain::block_verification_types::RpcBlock; -use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain}; +use beacon_chain::{ + builder::Witness, eth1_chain::CachingEth1Backend, AvailabilityProcessingStatus, BeaconChain, +}; use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer}; use beacon_processor::{ work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorChannels, BeaconProcessorSend, @@ -14,9 +16,9 @@ use lighthouse_network::rpc::methods::{ }; use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, - Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, + Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, }; -use slog::{debug, Logger}; +use slog::{debug, error, trace, Logger}; use slot_clock::ManualSlotClock; use std::path::PathBuf; use std::sync::Arc; @@ -767,6 +769,73 @@ impl NetworkBeaconProcessor { "error" => %e) }); } + + /// Attempt to reconstruct all data columns if the following conditions satisfies: + /// - Our custody requirement is all columns + /// - We have >= 50% of columns, but not all columns + /// + /// Returns `Some(AvailabilityProcessingStatus)` if reconstruction is successfully performed, + /// otherwise returns `None`. + async fn attempt_data_column_reconstruction( + &self, + block_root: Hash256, + ) -> Option { + let result = self.chain.reconstruct_data_columns(block_root).await; + match result { + Ok(Some((availability_processing_status, data_columns_to_publish))) => { + match &availability_processing_status { + AvailabilityProcessingStatus::Imported(hash) => { + debug!( + self.log, + "Block components available via reconstruction"; + "result" => "imported block and custody columns", + "block_hash" => %hash, + ); + self.chain.recompute_head_at_current_slot().await; + } + AvailabilityProcessingStatus::MissingComponents(_, _) => { + debug!( + self.log, + "Block components still missing block after reconstruction"; + "result" => "imported all custody columns", + "block_hash" => %block_root, + ); + } + } + + self.send_network_message(NetworkMessage::Publish { + messages: data_columns_to_publish + .iter() + .map(|d| { + let subnet = DataColumnSubnetId::from_column_index::( + d.index as usize, + &self.chain.spec, + ); + PubsubMessage::DataColumnSidecar(Box::new((subnet, d.clone()))) + }) + .collect(), + }); + + Some(availability_processing_status) + } + Ok(None) => { + trace!( + self.log, + "Reconstruction not required for block"; + "block_hash" => %block_root, + ); + None + } + Err(e) => { + error!( + self.log, + "Error during data column reconstruction"; + "error" => ?e + ); + None + } + } + } } type TestBeaconChainType = diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index c21054dab50..d3dfbc7a42d 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -327,34 +327,37 @@ impl NetworkBeaconProcessor { _seen_timestamp: Duration, process_type: BlockProcessType, ) { - let result = self + let mut result = self .chain .process_rpc_custody_columns(custody_columns) .await; match &result { - Ok((availability, data_columns_to_publish)) => { - self.handle_data_columns_to_publish(data_columns_to_publish.clone()); - - match availability { - AvailabilityProcessingStatus::Imported(hash) => { - debug!( - self.log, - "Block components retrieved"; - "result" => "imported block and custody columns", - "block_hash" => %hash, - ); - self.chain.recompute_head_at_current_slot().await; - } - AvailabilityProcessingStatus::MissingComponents(_, _) => { - debug!( - self.log, - "Missing components over rpc"; - "block_hash" => %block_root, - ); + Ok(availability) => match availability { + AvailabilityProcessingStatus::Imported(hash) => { + debug!( + self.log, + "Block components retrieved"; + "result" => "imported block and custody columns", + "block_hash" => %hash, + ); + self.chain.recompute_head_at_current_slot().await; + } + AvailabilityProcessingStatus::MissingComponents(_, _) => { + debug!( + self.log, + "Missing components over rpc"; + "block_hash" => %block_root, + ); + // Attempt reconstruction here before notifying sync, to avoid sending out more requests + // that we may no longer need. + if let Some(availability) = + self.attempt_data_column_reconstruction(block_root).await + { + result = Ok(availability) } } - } + }, Err(BlockError::BlockIsAlreadyKnown(_)) => { debug!( self.log, @@ -374,7 +377,7 @@ impl NetworkBeaconProcessor { self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result: result.map(|(r, _)| r).into(), + result: result.into(), }); } From 2e641908a83732dcf7279aaa35fd9fd3806e0d17 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 18 Sep 2024 16:26:23 +1000 Subject: [PATCH 2/9] Publish reconstructed cells before recomputing head. Remove duplicate functions. --- beacon_node/beacon_chain/src/beacon_chain.rs | 21 +++------------ .../src/network_beacon_processor/mod.rs | 26 +++++++++---------- 2 files changed, 16 insertions(+), 31 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e1b4972285b..36f25b4e027 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3036,7 +3036,7 @@ impl BeaconChain { let r = self .check_gossip_data_columns_availability_and_import(slot, block_root, data_columns) .await; - self.remove_notified_custody_columns(&block_root, r) + self.remove_notified(&block_root, r) } /// Cache the blobs in the processing cache, process it, then evict it from the cache if it was @@ -3133,7 +3133,7 @@ impl BeaconChain { let r = self .check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns) .await; - self.remove_notified_custody_columns(&block_root, r) + self.remove_notified(&block_root, r) } pub async fn reconstruct_data_columns( @@ -3169,7 +3169,7 @@ impl BeaconChain { }; let r = self.process_availability(slot, availability).await; - self.remove_notified_custody_columns(&block_root, r) + self.remove_notified(&block_root, r) .map(|availability_processing_status| { Some((availability_processing_status, data_columns_to_publish)) }) @@ -3190,21 +3190,6 @@ impl BeaconChain { r } - /// Remove any block components from the *processing cache* if we no longer require them. If the - /// block was imported full or erred, we no longer require them. - fn remove_notified_custody_columns( - &self, - block_root: &Hash256, - r: Result, - ) -> Result { - let has_missing_components = - matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _))); - if !has_missing_components { - self.reqresp_pre_import_cache.write().remove(block_root); - } - r - } - /// Wraps `process_block` in logic to cache the block's commitments in the processing cache /// and evict if the block was imported or errored. pub async fn process_block_with_early_caching>( diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f82470ecc2b..9a52844225a 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -783,6 +783,19 @@ impl NetworkBeaconProcessor { let result = self.chain.reconstruct_data_columns(block_root).await; match result { Ok(Some((availability_processing_status, data_columns_to_publish))) => { + self.send_network_message(NetworkMessage::Publish { + messages: data_columns_to_publish + .iter() + .map(|d| { + let subnet = DataColumnSubnetId::from_column_index::( + d.index as usize, + &self.chain.spec, + ); + PubsubMessage::DataColumnSidecar(Box::new((subnet, d.clone()))) + }) + .collect(), + }); + match &availability_processing_status { AvailabilityProcessingStatus::Imported(hash) => { debug!( @@ -803,19 +816,6 @@ impl NetworkBeaconProcessor { } } - self.send_network_message(NetworkMessage::Publish { - messages: data_columns_to_publish - .iter() - .map(|d| { - let subnet = DataColumnSubnetId::from_column_index::( - d.index as usize, - &self.chain.spec, - ); - PubsubMessage::DataColumnSidecar(Box::new((subnet, d.clone()))) - }) - .collect(), - }); - Some(availability_processing_status) } Ok(None) => { From d3c84e8573eb66b4682ae0a129c595daa2ceaa08 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 30 Sep 2024 15:24:42 +1000 Subject: [PATCH 3/9] Spawn a blocking task for reconstruction. --- beacon_node/beacon_chain/src/beacon_chain.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 975e009d3f1..68c308e16d9 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3169,9 +3169,16 @@ impl BeaconChain { return Ok(None); } + let data_availability_checker = self.data_availability_checker.clone(); let Some((availability, data_columns_to_publish)) = self - .data_availability_checker - .reconstruct_data_columns(&block_root)? + .task_executor + .spawn_blocking_handle( + move || data_availability_checker.reconstruct_data_columns(&block_root), + "reconstruct_data_columns", + ) + .ok_or(BeaconChainError::RuntimeShutdown)? + .await + .map_err(BeaconChainError::TokioJoin)?? else { return Ok(None); }; From 634d1374f904698abcc2b19aecb26c485d8041e5 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 3 Oct 2024 11:08:50 +1000 Subject: [PATCH 4/9] Fix fmt --- beacon_node/network/src/network_beacon_processor/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 5758c4c9ac0..1b995dcadf0 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -18,7 +18,7 @@ use lighthouse_network::rpc::methods::{ use lighthouse_network::rpc::{RequestId, SubstreamId}; use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, - Client, MessageId, NetworkGlobals, PeerId, PubsubMessage + Client, MessageId, NetworkGlobals, PeerId, PubsubMessage, }; use slog::{debug, error, trace, Logger}; use slot_clock::ManualSlotClock; From b0c337946de2a33c08eab8515d9612a27ca9c452 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 10 Oct 2024 12:22:25 +1100 Subject: [PATCH 5/9] Fix race condition by making check and mutation atomic as suggested by Lion. Also added error handling to reconstruction failure. --- .../src/data_availability_checker.rs | 35 ++---------- .../overflow_lru_cache.rs | 54 +++++++++++++++---- 2 files changed, 48 insertions(+), 41 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index cb20e014a26..6b1a1d9e08f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -2,9 +2,7 @@ use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, Kzg use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, }; -use crate::data_availability_checker::overflow_lru_cache::{ - DataAvailabilityCheckerInner, PendingComponents, -}; +use crate::data_availability_checker::overflow_lru_cache::DataAvailabilityCheckerInner; use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore}; use kzg::Kzg; use slog::{debug, error, Logger}; @@ -518,24 +516,14 @@ impl DataAvailabilityChecker { block_root: &Hash256, ) -> Result>, AvailabilityCheckError> { - // Clone the pending components, so we don't hold the read lock during reconstruction let Some(pending_components) = self .availability_cache - .peek_pending_components(block_root, |pending_components_opt| { - pending_components_opt.cloned() - }) + .check_and_set_reconstruction_started(block_root) else { - // Block may have been imported as it does not exist in availability cache. + // Reconstruction not required or already in progress return Ok(None); }; - if !self.should_reconstruct(&pending_components) { - return Ok(None); - } - - self.availability_cache - .set_reconstruction_started(block_root); - metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS); let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME); @@ -551,6 +539,8 @@ impl DataAvailabilityChecker { "block_root" => ?block_root, "error" => ?e ); + self.availability_cache + .handle_reconstruction_failure(block_root); metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES); AvailabilityCheckError::ReconstructColumnsError(e) })?; @@ -606,21 +596,6 @@ impl DataAvailabilityChecker { }) .map(Some) } - - /// Potentially trigger reconstruction if: - /// - Our custody requirement is all columns, and we haven't got all columns - /// - We have >= 50% of columns, but not all columns - /// - Reconstruction hasn't been started for the block - fn should_reconstruct(&self, pending_components: &PendingComponents) -> bool { - let received_column_count = pending_components.verified_data_columns.len(); - // If we're sampling all columns, it means we must be custodying all columns. - let custody_column_count = self.availability_cache.sampling_column_count(); - let total_column_count = self.spec.number_of_columns; - custody_column_count == total_column_count - && received_column_count < total_column_count - && received_column_count >= total_column_count / 2 - && !pending_components.reconstruction_started - } } /// Helper struct to group data availability checker metrics. diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 4126e78ad9d..cb95be6b8a9 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -306,15 +306,6 @@ impl PendingComponents { ))) } - /// Mark reconstruction as started for this `PendingComponent`. - /// - /// NOTE: currently this value never reverts to false once it's set here. This means - /// reconstruction will only be attempted once. This is intentional because currently - /// reconstruction could only fail due to code errors or kzg errors, which shouldn't be retried. - pub fn reconstruction_started(&mut self) { - self.reconstruction_started = true; - } - /// Returns the epoch of the block if it is cached, otherwise returns the epoch of the first blob. pub fn epoch(&self) -> Option { self.executed_block @@ -522,9 +513,50 @@ impl DataAvailabilityCheckerInner { } } - pub fn set_reconstruction_started(&self, block_root: &Hash256) { + /// Check whether data column reconstruction should be attempted. + /// + /// If reconstruction is required, returns `Some(PendingComponents)` which contains the + /// components to be used as inputs to reconstruction, otherwise return `None`. + pub fn check_and_set_reconstruction_started( + &self, + block_root: &Hash256, + ) -> Option> { + let mut write_lock = self.critical.write(); + let Some(pending_components) = write_lock.get_mut(block_root) else { + // Block may have been imported as it does not exist in availability cache. + return None; + }; + + // Potentially trigger reconstruction if: + // - Our custody requirement is all columns, and we haven't got all columns + // - We have >= 50% of columns, but not all columns + // - Reconstruction hasn't been started for the block + let should_reconstruct = { + let received_column_count = pending_components.verified_data_columns.len(); + // If we're sampling all columns, it means we must be custodying all columns. + let custody_column_count = self.sampling_column_count(); + let total_column_count = self.spec.number_of_columns; + custody_column_count == total_column_count + && received_column_count < total_column_count + && received_column_count >= total_column_count / 2 + && !pending_components.reconstruction_started + }; + + if should_reconstruct { + pending_components.reconstruction_started = true; + Some(pending_components.clone()) + } else { + None + } + } + + /// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`. + /// In this case, we remove all data columns in `PendingComponents`, reset reconstruction + /// status so that we can attempt to retrieve columns from peers again. + pub fn handle_reconstruction_failure(&self, block_root: &Hash256) { if let Some(pending_components_mut) = self.critical.write().get_mut(block_root) { - pending_components_mut.reconstruction_started(); + pending_components_mut.verified_data_columns = vec![]; + pending_components_mut.reconstruction_started = false; } } From 0e6eaa227890194983cd523d5e6be498907dfdad Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 11 Oct 2024 18:49:06 +1100 Subject: [PATCH 6/9] Add reconstruction reason metric and more debug logging to da checker. --- beacon_node/beacon_chain/src/beacon_chain.rs | 43 ++++-- .../src/data_availability_checker.rs | 57 ++++--- .../overflow_lru_cache.rs | 142 ++++++++++++------ beacon_node/beacon_chain/src/metrics.rs | 9 ++ .../src/network_beacon_processor/mod.rs | 1 + 5 files changed, 172 insertions(+), 80 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index caf48ee2cff..2ca79d92ac0 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -22,6 +22,7 @@ pub use crate::canonical_head::CanonicalHead; use crate::chain_config::ChainConfig; use crate::data_availability_checker::{ Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, + DataColumnReconstructionResult, }; use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use crate::early_attester_cache::EarlyAttesterCache; @@ -3166,7 +3167,8 @@ impl BeaconChain { } let data_availability_checker = self.data_availability_checker.clone(); - let Some((availability, data_columns_to_publish)) = self + + let result = self .task_executor .spawn_blocking_handle( move || data_availability_checker.reconstruct_data_columns(&block_root), @@ -3174,22 +3176,33 @@ impl BeaconChain { ) .ok_or(BeaconChainError::RuntimeShutdown)? .await - .map_err(BeaconChainError::TokioJoin)?? - else { - return Ok(None); - }; + .map_err(BeaconChainError::TokioJoin)??; - let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { - return Ok(None); - }; + match result { + DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => { + let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { + // This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success. + return Ok(None); + }; - let r = self - .process_availability(slot, availability, || Ok(())) - .await; - self.remove_notified(&block_root, r) - .map(|availability_processing_status| { - Some((availability_processing_status, data_columns_to_publish)) - }) + let r = self + .process_availability(slot, availability, || Ok(())) + .await; + self.remove_notified(&block_root, r) + .map(|availability_processing_status| { + Some((availability_processing_status, data_columns_to_publish)) + }) + } + DataColumnReconstructionResult::NotRequired(reason) + | DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => { + // We use metric here because logging this would be *very* noisy. + metrics::inc_counter_vec( + &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, + &[reason], + ); + Ok(None) + } + } } /// Remove any block components from the *processing cache* if we no longer require them. If the diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 6b1a1d9e08f..29e613da4eb 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -2,7 +2,9 @@ use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, Kzg use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, }; -use crate::data_availability_checker::overflow_lru_cache::DataAvailabilityCheckerInner; +use crate::data_availability_checker::overflow_lru_cache::{ + DataAvailabilityCheckerInner, ReconstructColumnsDecision, +}; use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore}; use kzg::Kzg; use slog::{debug, error, Logger}; @@ -77,6 +79,13 @@ pub struct DataAvailabilityChecker { pub type AvailabilityAndReconstructedColumns = (Availability, DataColumnSidecarList); +#[derive(Debug)] +pub enum DataColumnReconstructionResult { + Success(AvailabilityAndReconstructedColumns), + NotRequired(&'static str), + RecoveredColumnsNotImported(&'static str), +} + /// This type is returned after adding a block / blob to the `DataAvailabilityChecker`. /// /// Indicates if the block is fully `Available` or if we need blobs or blocks @@ -211,7 +220,7 @@ impl DataAvailabilityChecker { .map_err(AvailabilityCheckError::InvalidBlobs)?; self.availability_cache - .put_kzg_verified_blobs(block_root, epoch, verified_blobs) + .put_kzg_verified_blobs(block_root, epoch, verified_blobs, &self.log) } /// Put a list of custody columns received via RPC into the availability cache. This performs KZG @@ -241,6 +250,7 @@ impl DataAvailabilityChecker { block_root, epoch, verified_custody_columns, + &self.log, ) } @@ -257,6 +267,7 @@ impl DataAvailabilityChecker { gossip_blob.block_root(), gossip_blob.epoch(), vec![gossip_blob.into_inner()], + &self.log, ) } @@ -279,8 +290,12 @@ impl DataAvailabilityChecker { .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) .collect::>(); - self.availability_cache - .put_kzg_verified_data_columns(block_root, epoch, custody_columns) + self.availability_cache.put_kzg_verified_data_columns( + block_root, + epoch, + custody_columns, + &self.log, + ) } /// Check if we have all the blobs for a block. Returns `Availability` which has information @@ -290,7 +305,7 @@ impl DataAvailabilityChecker { executed_block: AvailabilityPendingExecutedBlock, ) -> Result, AvailabilityCheckError> { self.availability_cache - .put_pending_executed_block(executed_block) + .put_pending_executed_block(executed_block, &self.log) } pub fn remove_pending_components(&self, block_root: Hash256) { @@ -514,14 +529,15 @@ impl DataAvailabilityChecker { pub fn reconstruct_data_columns( &self, block_root: &Hash256, - ) -> Result>, AvailabilityCheckError> - { - let Some(pending_components) = self + ) -> Result, AvailabilityCheckError> { + let pending_components = match self .availability_cache .check_and_set_reconstruction_started(block_root) - else { - // Reconstruction not required or already in progress - return Ok(None); + { + ReconstructColumnsDecision::Yes(pending_components) => pending_components, + ReconstructColumnsDecision::No(reason) => { + return Ok(DataColumnReconstructionResult::NotRequired(reason)); + } }; metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS); @@ -547,11 +563,9 @@ impl DataAvailabilityChecker { // Check indices from cache again to make sure we don't publish components we've already received. let Some(existing_column_indices) = self.cached_data_column_indexes(block_root) else { - // If block is already imported (no longer in cache), abort publishing data columns - // TODO(das) This assumes only supernodes do reconstructions (i.e. custody - // requirement = all columns). This behaviour is likely to change in the future when we - // get to 2D PeerDAS. - return Ok(None); + return Ok(DataColumnReconstructionResult::RecoveredColumnsNotImported( + "block already imported", + )); }; let data_columns_to_publish = all_data_columns @@ -559,12 +573,13 @@ impl DataAvailabilityChecker { .filter(|d| !existing_column_indices.contains(&d.index())) .collect::>(); - // Return if there's no data columns to publish. let Some(slot) = data_columns_to_publish .first() .map(|d| d.as_data_column().slot()) else { - return Ok(None); + return Ok(DataColumnReconstructionResult::RecoveredColumnsNotImported( + "No new columns to import and publish", + )); }; metrics::stop_timer(timer); @@ -584,17 +599,17 @@ impl DataAvailabilityChecker { *block_root, slot.epoch(T::EthSpec::slots_per_epoch()), data_columns_to_publish.clone(), + &self.log, ) .map(|availability| { - ( + DataColumnReconstructionResult::Success(( availability, data_columns_to_publish .into_iter() .map(|d| d.clone_arc()) .collect::>(), - ) + )) }) - .map(Some) } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index cb95be6b8a9..7bdd33e2ee7 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -9,6 +9,7 @@ use crate::data_column_verification::KzgVerifiedCustodyDataColumn; use crate::BeaconChainTypes; use lru::LruCache; use parking_lot::RwLock; +use slog::{debug, Logger}; use ssz_types::{FixedVector, VariableList}; use std::num::NonZeroUsize; use std::sync::Arc; @@ -90,7 +91,7 @@ impl PendingComponents { /// block. /// /// This corresponds to the number of commitments that are present in a block. - pub fn num_expected_blobs(&self) -> Option { + pub fn block_kzg_commitments_count(&self) -> Option { self.get_cached_block() .as_ref() .map(|b| b.get_commitments().len()) @@ -198,21 +199,61 @@ impl PendingComponents { /// /// Returns `true` if both the block exists and the number of received blobs / custody columns /// matches the number of expected blobs / custody columns. - pub fn is_available(&self, block_import_requirement: &BlockImportRequirement) -> bool { + pub fn is_available( + &self, + block_import_requirement: &BlockImportRequirement, + log: &Logger, + ) -> bool { + let block_kzg_commitments_count_opt = self.block_kzg_commitments_count(); + match block_import_requirement { - BlockImportRequirement::AllBlobs => self - .num_expected_blobs() - .map_or(false, |num_expected_blobs| { - num_expected_blobs == self.num_received_blobs() - }), + BlockImportRequirement::AllBlobs => { + let received_blobs = self.num_received_blobs(); + let expected_blobs_msg = block_kzg_commitments_count_opt + .as_ref() + .map(|num| num.to_string()) + .unwrap_or("unknown".to_string()); + + debug!(log, + "Component(s) added to data availability checker"; + "block_root" => ?self.block_root, + "received_block" => block_kzg_commitments_count_opt.is_some(), + "received_blobs" => received_blobs, + "expected_blobs" => expected_blobs_msg, + ); + + block_kzg_commitments_count_opt.map_or(false, |num_expected_blobs| { + num_expected_blobs == received_blobs + }) + } BlockImportRequirement::ColumnSampling(num_expected_columns) => { - let num_received_data_columns = self.num_received_data_columns(); // No data columns when there are 0 blobs - self.num_expected_blobs() - .map_or(false, |num_expected_blobs| { - num_expected_blobs == 0 - || *num_expected_columns == num_received_data_columns - }) + let expected_columns_opt = block_kzg_commitments_count_opt.map(|blob_count| { + if blob_count > 0 { + *num_expected_columns + } else { + 0 + } + }); + + let expected_columns_msg = expected_columns_opt + .as_ref() + .map(|num| num.to_string()) + .unwrap_or("unknown".to_string()); + + let num_received_columns = self.num_received_data_columns(); + + debug!(log, + "Component(s) added to data availability checker"; + "block_root" => ?self.block_root, + "received_block" => block_kzg_commitments_count_opt.is_some(), + "received_columns" => num_received_columns, + "expected_columns" => expected_columns_msg, + ); + + expected_columns_opt.map_or(false, |num_expected_columns| { + num_expected_columns == num_received_columns + }) } } } @@ -349,6 +390,12 @@ pub struct DataAvailabilityCheckerInner { spec: Arc, } +#[allow(clippy::large_enum_variant)] +pub(crate) enum ReconstructColumnsDecision { + Yes(PendingComponents), + No(&'static str), +} + impl DataAvailabilityCheckerInner { pub fn new( capacity: NonZeroUsize, @@ -444,6 +491,7 @@ impl DataAvailabilityCheckerInner { block_root: Hash256, epoch: Epoch, kzg_verified_blobs: I, + log: &Logger, ) -> Result, AvailabilityCheckError> { let mut fixed_blobs = FixedVector::default(); @@ -465,7 +513,7 @@ impl DataAvailabilityCheckerInner { pending_components.merge_blobs(fixed_blobs); let block_import_requirement = self.block_import_requirement(epoch)?; - if pending_components.is_available(&block_import_requirement) { + if pending_components.is_available(&block_import_requirement, log) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); @@ -486,6 +534,7 @@ impl DataAvailabilityCheckerInner { block_root: Hash256, epoch: Epoch, kzg_verified_data_columns: I, + log: &Logger, ) -> Result, AvailabilityCheckError> { let mut write_lock = self.critical.write(); @@ -500,7 +549,7 @@ impl DataAvailabilityCheckerInner { let block_import_requirement = self.block_import_requirement(epoch)?; - if pending_components.is_available(&block_import_requirement) { + if pending_components.is_available(&block_import_requirement, log) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); @@ -515,39 +564,43 @@ impl DataAvailabilityCheckerInner { /// Check whether data column reconstruction should be attempted. /// - /// If reconstruction is required, returns `Some(PendingComponents)` which contains the - /// components to be used as inputs to reconstruction, otherwise return `None`. + /// Potentially trigger reconstruction if: + /// - Our custody requirement is all columns (supernode), and we haven't got all columns + /// - We have >= 50% of columns, but not all columns + /// - Reconstruction hasn't been started for the block + /// + /// If reconstruction is required, returns `PendingComponents` which contains the + /// components to be used as inputs to reconstruction, otherwise returns a `reason`. pub fn check_and_set_reconstruction_started( &self, block_root: &Hash256, - ) -> Option> { + ) -> ReconstructColumnsDecision { let mut write_lock = self.critical.write(); let Some(pending_components) = write_lock.get_mut(block_root) else { // Block may have been imported as it does not exist in availability cache. - return None; + return ReconstructColumnsDecision::No("block already imported"); }; - // Potentially trigger reconstruction if: - // - Our custody requirement is all columns, and we haven't got all columns - // - We have >= 50% of columns, but not all columns - // - Reconstruction hasn't been started for the block - let should_reconstruct = { - let received_column_count = pending_components.verified_data_columns.len(); - // If we're sampling all columns, it means we must be custodying all columns. - let custody_column_count = self.sampling_column_count(); - let total_column_count = self.spec.number_of_columns; - custody_column_count == total_column_count - && received_column_count < total_column_count - && received_column_count >= total_column_count / 2 - && !pending_components.reconstruction_started - }; + // If we're sampling all columns, it means we must be custodying all columns. + let custody_column_count = self.sampling_column_count(); + let total_column_count = self.spec.number_of_columns; + let received_column_count = pending_components.verified_data_columns.len(); - if should_reconstruct { - pending_components.reconstruction_started = true; - Some(pending_components.clone()) - } else { - None + if pending_components.reconstruction_started { + return ReconstructColumnsDecision::No("already started"); + } + if custody_column_count != total_column_count { + return ReconstructColumnsDecision::No("not required for full node"); } + if received_column_count == self.spec.number_of_columns { + return ReconstructColumnsDecision::No("all columns received"); + } + if received_column_count < total_column_count / 2 { + return ReconstructColumnsDecision::No("not enough columns"); + } + + pending_components.reconstruction_started = true; + ReconstructColumnsDecision::Yes(pending_components.clone()) } /// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`. @@ -565,6 +618,7 @@ impl DataAvailabilityCheckerInner { pub fn put_pending_executed_block( &self, executed_block: AvailabilityPendingExecutedBlock, + log: &Logger, ) -> Result, AvailabilityCheckError> { let mut write_lock = self.critical.write(); let block_root = executed_block.import_data.block_root; @@ -586,7 +640,7 @@ impl DataAvailabilityCheckerInner { // Check if we have all components and entire set is consistent. let block_import_requirement = self.block_import_requirement(epoch)?; - if pending_components.is_available(&block_import_requirement) { + if pending_components.is_available(&block_import_requirement, log) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); @@ -884,7 +938,7 @@ mod test { ); assert!(cache.critical.read().is_empty(), "cache should be empty"); let availability = cache - .put_pending_executed_block(pending_block) + .put_pending_executed_block(pending_block, harness.logger()) .expect("should put block"); if blobs_expected == 0 { assert!( @@ -923,7 +977,7 @@ mod test { for (blob_index, gossip_blob) in blobs.into_iter().enumerate() { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone()) + .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone(), harness.logger()) .expect("should put blob"); if blob_index == blobs_expected - 1 { assert!(matches!(availability, Availability::Available(_))); @@ -950,7 +1004,7 @@ mod test { for gossip_blob in blobs { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone()) + .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone(), harness.logger()) .expect("should put blob"); assert_eq!( availability, @@ -960,7 +1014,7 @@ mod test { assert_eq!(cache.critical.read().len(), 1); } let availability = cache - .put_pending_executed_block(pending_block) + .put_pending_executed_block(pending_block, harness.logger()) .expect("should put block"); assert!( matches!(availability, Availability::Available(_)), @@ -1028,7 +1082,7 @@ mod test { // put the block in the cache let availability = cache - .put_pending_executed_block(pending_block) + .put_pending_executed_block(pending_block, harness.logger()) .expect("should put block"); // grab the diet block from the cache for later testing diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 55327d0fb05..0b5608f0843 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1903,6 +1903,15 @@ pub static KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES: LazyLock> ) }); +pub static KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL: LazyLock> = + LazyLock::new(|| { + try_create_int_counter_vec( + "kzg_data_column_reconstruction_incomplete_total", + "Count of times data column reconstruction attempts did not result in an import", + &["reason"], + ) + }); + /* * light_client server metrics */ diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 1b995dcadf0..295d6491a3c 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -899,6 +899,7 @@ impl NetworkBeaconProcessor { Some(availability_processing_status) } Ok(None) => { + // reason is tracked via the `KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL` metric trace!( self.log, "Reconstruction not required for block"; From 9efa7680f42a6e1b7dd4174f97dcfb09192b52bc Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 11 Oct 2024 23:22:12 +1100 Subject: [PATCH 7/9] Add comment and logging. --- .../src/data_availability_checker/overflow_lru_cache.rs | 3 +++ beacon_node/network/src/network_beacon_processor/mod.rs | 1 + 2 files changed, 4 insertions(+) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 7bdd33e2ee7..6d4636e8ed8 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -390,6 +390,9 @@ pub struct DataAvailabilityCheckerInner { spec: Arc, } +// This enum is only used internally within the crate in the reconstruction function to improve +// readability, so it's OK to not box the variant value, and it shouldn't impact memory much with +// the current usage, as it's deconstructed immediately. #[allow(clippy::large_enum_variant)] pub(crate) enum ReconstructColumnsDecision { Yes(PendingComponents), diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 295d6491a3c..42af32f08b6 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -911,6 +911,7 @@ impl NetworkBeaconProcessor { error!( self.log, "Error during data column reconstruction"; + "block_root" => %block_root, "error" => ?e ); None From 2710d3c3db2776b6cc83706630baa57cf68e3bfb Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 17 Oct 2024 14:00:51 +1100 Subject: [PATCH 8/9] Rename `NotRequired` to `NotStarted`. --- beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- beacon_node/beacon_chain/src/data_availability_checker.rs | 4 ++-- beacon_node/network/src/subnet_service/attestation_subnets.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 2ca79d92ac0..0a0f751d0a5 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3193,7 +3193,7 @@ impl BeaconChain { Some((availability_processing_status, data_columns_to_publish)) }) } - DataColumnReconstructionResult::NotRequired(reason) + DataColumnReconstructionResult::NotStarted(reason) | DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => { // We use metric here because logging this would be *very* noisy. metrics::inc_counter_vec( diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 29e613da4eb..047764d705c 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -82,7 +82,7 @@ pub type AvailabilityAndReconstructedColumns = (Availability, DataColumnSi #[derive(Debug)] pub enum DataColumnReconstructionResult { Success(AvailabilityAndReconstructedColumns), - NotRequired(&'static str), + NotStarted(&'static str), RecoveredColumnsNotImported(&'static str), } @@ -536,7 +536,7 @@ impl DataAvailabilityChecker { { ReconstructColumnsDecision::Yes(pending_components) => pending_components, ReconstructColumnsDecision::No(reason) => { - return Ok(DataColumnReconstructionResult::NotRequired(reason)); + return Ok(DataColumnReconstructionResult::NotStarted(reason)); } }; diff --git a/beacon_node/network/src/subnet_service/attestation_subnets.rs b/beacon_node/network/src/subnet_service/attestation_subnets.rs index 432a2b7fb7c..ab6e77876f8 100644 --- a/beacon_node/network/src/subnet_service/attestation_subnets.rs +++ b/beacon_node/network/src/subnet_service/attestation_subnets.rs @@ -434,7 +434,7 @@ impl AttestationService { // duration of the subscription. let min_ttl = self .beacon_chain - .slot_clock + i .slot_clock .duration_to_slot(exact_subnet.slot + 1) .map(|duration| std::time::Instant::now() + duration); Some(SubnetDiscovery { From e66a75037545b9ec337983b6724c85cdd6a87da1 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 17 Oct 2024 14:18:08 +1100 Subject: [PATCH 9/9] Remove extra character added. --- beacon_node/network/src/subnet_service/attestation_subnets.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/subnet_service/attestation_subnets.rs b/beacon_node/network/src/subnet_service/attestation_subnets.rs index ab6e77876f8..432a2b7fb7c 100644 --- a/beacon_node/network/src/subnet_service/attestation_subnets.rs +++ b/beacon_node/network/src/subnet_service/attestation_subnets.rs @@ -434,7 +434,7 @@ impl AttestationService { // duration of the subscription. let min_ttl = self .beacon_chain - i .slot_clock + .slot_clock .duration_to_slot(exact_subnet.slot + 1) .map(|duration| std::time::Instant::now() + duration); Some(SubnetDiscovery {