Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor data column reconstruction and avoid blocking processing #6403

Merged
merged 13 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 64 additions & 59 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub use crate::canonical_head::CanonicalHead;
use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
DataColumnsToPublish,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
Expand Down Expand Up @@ -3019,13 +3018,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError,
> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
let Ok((slot, block_root)) = data_columns
.iter()
.map(|c| (c.slot(), c.block_root()))
Expand Down Expand Up @@ -3055,7 +3048,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
publish_fn,
)
.await;
self.remove_notified_custody_columns(&block_root, r)
self.remove_notified(&block_root, r)
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
Expand Down Expand Up @@ -3114,13 +3107,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_rpc_custody_columns(
self: &Arc<Self>,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError,
> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
let Ok((slot, block_root)) = custody_columns
.iter()
.map(|c| (c.slot(), c.block_root()))
Expand Down Expand Up @@ -3158,7 +3145,55 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let r = self
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await;
self.remove_notified_custody_columns(&block_root, r)
self.remove_notified(&block_root, r)
}

pub async fn reconstruct_data_columns(
self: &Arc<Self>,
block_root: Hash256,
) -> Result<
Option<(
AvailabilityProcessingStatus,
DataColumnSidecarList<T::EthSpec>,
)>,
BlockError,
> {
// As of now we only reconstruct data columns on supernodes, so if the block is already
// available on a supernode, there's no need to reconstruct as the node must already have
// all columns.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Ok(None);
}

let data_availability_checker = self.data_availability_checker.clone();
let Some((availability, data_columns_to_publish)) = self
.task_executor
.spawn_blocking_handle(
move || data_availability_checker.reconstruct_data_columns(&block_root),
"reconstruct_data_columns",
)
.ok_or(BeaconChainError::RuntimeShutdown)?
.await
.map_err(BeaconChainError::TokioJoin)??
else {
return Ok(None);
};

let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else {
return Ok(None);
};

let r = self
.process_availability(slot, availability, || Ok(()))
.await;
self.remove_notified(&block_root, r)
.map(|availability_processing_status| {
Some((availability_processing_status, data_columns_to_publish))
})
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
Expand All @@ -3176,23 +3211,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
r
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified_custody_columns<P>(
&self,
block_root: &Hash256,
r: Result<(AvailabilityProcessingStatus, P), BlockError>,
) -> Result<(AvailabilityProcessingStatus, P), BlockError> {
let has_missing_components = matches!(
r,
Ok((AvailabilityProcessingStatus::MissingComponents(_, _), _))
);
if !has_missing_components {
self.reqresp_pre_import_cache.write().remove(block_root);
}
r
}

/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
/// and evict if the block was imported or errored.
pub async fn process_block_with_early_caching<B: IntoExecutionPendingBlock<T>>(
Expand Down Expand Up @@ -3448,26 +3466,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError,
> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
if let Some(slasher) = self.slasher.as_ref() {
for data_colum in &data_columns {
slasher.accept_block_header(data_colum.signed_block_header());
}
}

let (availability, data_columns_to_publish) = self
.data_availability_checker
.put_gossip_data_columns(slot, block_root, data_columns)?;
let availability = self.data_availability_checker.put_gossip_data_columns(
slot,
block_root,
data_columns,
)?;

self.process_availability(slot, availability, publish_fn)
.await
.map(|result| (result, data_columns_to_publish))
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
Expand Down Expand Up @@ -3517,13 +3530,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError,
> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
Expand All @@ -3548,16 +3555,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// This slot value is purely informative for the consumers of
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
let (availability, data_columns_to_publish) =
self.data_availability_checker.put_rpc_custody_columns(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
custody_columns,
)?;
let availability = self.data_availability_checker.put_rpc_custody_columns(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
custody_columns,
)?;

self.process_availability(slot, availability, || Ok(()))
.await
.map(|result| (result, data_columns_to_publish))
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ where
store,
self.import_all_data_columns,
self.spec,
log.new(o!("service" => "data_availability_checker")),
)
.map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?,
),
Expand Down
Loading
Loading