Skip to content

Commit

Permalink
Refactor data column reconstruction and avoid blocking processing (#6403
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jimmygchen committed Oct 3, 2024
1 parent 251409e commit 9647326
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 224 deletions.
123 changes: 64 additions & 59 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub use crate::canonical_head::CanonicalHead;
use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
DataColumnsToPublish,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
Expand Down Expand Up @@ -3019,13 +3018,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError,
> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
let Ok((slot, block_root)) = data_columns
.iter()
.map(|c| (c.slot(), c.block_root()))
Expand Down Expand Up @@ -3055,7 +3048,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
publish_fn,
)
.await;
self.remove_notified_custody_columns(&block_root, r)
self.remove_notified(&block_root, r)
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
Expand Down Expand Up @@ -3114,13 +3107,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_rpc_custody_columns(
self: &Arc<Self>,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError,
> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
let Ok((slot, block_root)) = custody_columns
.iter()
.map(|c| (c.slot(), c.block_root()))
Expand Down Expand Up @@ -3158,7 +3145,55 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let r = self
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await;
self.remove_notified_custody_columns(&block_root, r)
self.remove_notified(&block_root, r)
}

pub async fn reconstruct_data_columns(
self: &Arc<Self>,
block_root: Hash256,
) -> Result<
Option<(
AvailabilityProcessingStatus,
DataColumnSidecarList<T::EthSpec>,
)>,
BlockError,
> {
// As of now we only reconstruct data columns on supernodes, so if the block is already
// available on a supernode, there's no need to reconstruct as the node must already have
// all columns.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Ok(None);
}

let data_availability_checker = self.data_availability_checker.clone();
let Some((availability, data_columns_to_publish)) = self
.task_executor
.spawn_blocking_handle(
move || data_availability_checker.reconstruct_data_columns(&block_root),
"reconstruct_data_columns",
)
.ok_or(BeaconChainError::RuntimeShutdown)?
.await
.map_err(BeaconChainError::TokioJoin)??
else {
return Ok(None);
};

let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else {
return Ok(None);
};

let r = self
.process_availability(slot, availability, || Ok(()))
.await;
self.remove_notified(&block_root, r)
.map(|availability_processing_status| {
Some((availability_processing_status, data_columns_to_publish))
})
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
Expand All @@ -3176,23 +3211,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
r
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified_custody_columns<P>(
&self,
block_root: &Hash256,
r: Result<(AvailabilityProcessingStatus, P), BlockError>,
) -> Result<(AvailabilityProcessingStatus, P), BlockError> {
let has_missing_components = matches!(
r,
Ok((AvailabilityProcessingStatus::MissingComponents(_, _), _))
);
if !has_missing_components {
self.reqresp_pre_import_cache.write().remove(block_root);
}
r
}

/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
/// and evict if the block was imported or errored.
pub async fn process_block_with_early_caching<B: IntoExecutionPendingBlock<T>>(
Expand Down Expand Up @@ -3448,26 +3466,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError,
> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
if let Some(slasher) = self.slasher.as_ref() {
for data_colum in &data_columns {
slasher.accept_block_header(data_colum.signed_block_header());
}
}

let (availability, data_columns_to_publish) = self
.data_availability_checker
.put_gossip_data_columns(slot, block_root, data_columns)?;
let availability = self.data_availability_checker.put_gossip_data_columns(
slot,
block_root,
data_columns,
)?;

self.process_availability(slot, availability, publish_fn)
.await
.map(|result| (result, data_columns_to_publish))
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
Expand Down Expand Up @@ -3517,13 +3530,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError,
> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
Expand All @@ -3548,16 +3555,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// This slot value is purely informative for the consumers of
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
let (availability, data_columns_to_publish) =
self.data_availability_checker.put_rpc_custody_columns(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
custody_columns,
)?;
let availability = self.data_availability_checker.put_rpc_custody_columns(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
custody_columns,
)?;

self.process_availability(slot, availability, || Ok(()))
.await
.map(|result| (result, data_columns_to_publish))
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ where
store,
self.import_all_data_columns,
self.spec,
log.new(o!("service" => "data_availability_checker")),
)
.map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?,
),
Expand Down
Loading

0 comments on commit 9647326

Please sign in to comment.