Skip to content

Commit

Permalink
Implement Subnet Sampling for PeerDAS (#6410)
Browse files Browse the repository at this point in the history
* Add `SAMPLES_PER_SLOT` config.

* Rename `sampling` module to `peer_sampling`

* Implement subnet sampling.

* Update lookup test.

* Merge branch 'unstable' into subnet-sampling

* Merge branch 'unstable' into subnet-sampling

# Conflicts:
#	beacon_node/beacon_chain/src/data_availability_checker.rs
#	beacon_node/http_api/src/publish_blocks.rs
#	beacon_node/lighthouse_network/src/types/globals.rs
#	beacon_node/network/src/sync/manager.rs

* Merge branch 'unstable' into subnet-sampling
  • Loading branch information
jimmygchen authored Oct 4, 2024
1 parent a4a673b commit f3a5e25
Show file tree
Hide file tree
Showing 20 changed files with 122 additions and 80 deletions.
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3851,6 +3851,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

if let Some(data_columns) = data_columns {
// TODO(das): `available_block includes all sampled columns, but we only need to store
// custody columns. To be clarified in spec.
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
Expand Down
22 changes: 11 additions & 11 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
spec.custody_requirement as usize
};

let custody_column_count =
custody_subnet_count.saturating_mul(spec.data_columns_per_subnet());
let subnet_sampling_size =
std::cmp::max(custody_subnet_count, spec.samples_per_slot as usize);
let sampling_column_count =
subnet_sampling_size.saturating_mul(spec.data_columns_per_subnet());

let inner = DataAvailabilityCheckerInner::new(
OVERFLOW_LRU_CAPACITY,
store,
custody_column_count,
sampling_column_count,
spec.clone(),
)?;
Ok(Self {
Expand All @@ -125,10 +127,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

pub fn get_custody_columns_count(&self) -> usize {
self.availability_cache
.custody_subnet_count()
.saturating_mul(self.spec.data_columns_per_subnet())
pub fn get_sampling_column_count(&self) -> usize {
self.availability_cache.sampling_column_count()
}

/// Checks if the block root is currenlty in the availability cache awaiting import because
Expand All @@ -141,9 +141,9 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.get_execution_valid_block(block_root)
}

/// Return the set of imported blob indexes for `block_root`. Returns None if there is no block
/// Return the set of cached blob indexes for `block_root`. Returns None if there is no block
/// component for `block_root`.
pub fn imported_blob_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
pub fn cached_blob_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.availability_cache
.peek_pending_components(block_root, |components| {
components.map(|components| {
Expand All @@ -156,9 +156,9 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

/// Return the set of imported custody column indexes for `block_root`. Returns None if there is
/// Return the set of cached custody column indexes for `block_root`. Returns None if there is
/// no block component for `block_root`.
pub fn imported_custody_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
pub fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.availability_cache
.peek_pending_components(block_root, |components| {
components.map(|components| components.get_cached_data_columns_indices())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct PendingComponents<E: EthSpec> {

pub enum BlockImportRequirement {
AllBlobs,
CustodyColumns(usize),
ColumnSampling(usize),
}

impl<E: EthSpec> PendingComponents<E> {
Expand Down Expand Up @@ -210,7 +210,7 @@ impl<E: EthSpec> PendingComponents<E> {
.map_or(false, |num_expected_blobs| {
num_expected_blobs == self.num_received_blobs()
}),
BlockImportRequirement::CustodyColumns(num_expected_columns) => {
BlockImportRequirement::ColumnSampling(num_expected_columns) => {
let num_received_data_columns = self.num_received_data_columns();
// No data columns when there are 0 blobs
self.num_expected_blobs()
Expand Down Expand Up @@ -281,7 +281,7 @@ impl<E: EthSpec> PendingComponents<E> {
};
(Some(VariableList::new(verified_blobs)?), None)
}
BlockImportRequirement::CustodyColumns(_) => {
BlockImportRequirement::ColumnSampling(_) => {
let verified_data_columns = verified_data_columns
.into_iter()
.map(|d| d.into_inner())
Expand Down Expand Up @@ -353,28 +353,28 @@ pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
/// This cache holds a limited number of states in memory and reconstructs them
/// from disk when necessary. This is necessary until we merge tree-states
state_cache: StateLRUCache<T>,
/// The number of data columns the node is custodying.
custody_column_count: usize,
/// The number of data columns the node is sampling via subnet sampling.
sampling_column_count: usize,
spec: Arc<ChainSpec>,
}

impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pub fn new(
capacity: NonZeroUsize,
beacon_store: BeaconStore<T>,
custody_column_count: usize,
sampling_column_count: usize,
spec: Arc<ChainSpec>,
) -> Result<Self, AvailabilityCheckError> {
Ok(Self {
critical: RwLock::new(LruCache::new(capacity)),
state_cache: StateLRUCache::new(beacon_store, spec.clone()),
custody_column_count,
sampling_column_count,
spec,
})
}

pub fn custody_subnet_count(&self) -> usize {
self.custody_column_count
pub fn sampling_column_count(&self) -> usize {
self.sampling_column_count
}

/// Returns true if the block root is known, without altering the LRU ordering
Expand Down Expand Up @@ -440,8 +440,8 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
) -> Result<BlockImportRequirement, AvailabilityCheckError> {
let peer_das_enabled = self.spec.is_peer_das_enabled_for_epoch(epoch);
if peer_das_enabled {
Ok(BlockImportRequirement::CustodyColumns(
self.custody_column_count,
Ok(BlockImportRequirement::ColumnSampling(
self.sampling_column_count,
))
} else {
Ok(BlockImportRequirement::AllBlobs)
Expand All @@ -456,7 +456,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
block_import_requirement: &BlockImportRequirement,
pending_components: &PendingComponents<T::EthSpec>,
) -> bool {
let BlockImportRequirement::CustodyColumns(num_expected_columns) = block_import_requirement
let BlockImportRequirement::ColumnSampling(num_expected_columns) = block_import_requirement
else {
return false;
};
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Blobs are stored per block, and data columns are each stored individually
let n_blob_ops_per_block = if self.spec.is_peer_das_scheduled() {
self.data_availability_checker.get_custody_columns_count()
// TODO(das): `available_block includes all sampled columns, but we only need to store
// custody columns. To be clarified in spec PR.
self.data_availability_checker.get_sampling_column_count()
} else {
1
};
Expand Down
9 changes: 4 additions & 5 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,18 +389,17 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
.count()
> 0
{
let custody_columns_indices = &network_globals.custody_columns;

let custody_columns = gossip_verified_data_columns
let sampling_columns_indices = &network_globals.sampling_columns;
let sampling_columns = gossip_verified_data_columns
.into_iter()
.flatten()
.filter(|data_column| custody_columns_indices.contains(&data_column.index()))
.filter(|data_column| sampling_columns_indices.contains(&data_column.index()))
.collect();

// Importing the columns could trigger block import and network publication in the case
// where the block was already seen on gossip.
if let Err(e) =
Box::pin(chain.process_gossip_data_columns(custody_columns, publish_fn)).await
Box::pin(chain.process_gossip_data_columns(sampling_columns, publish_fn)).await
{
let msg = format!("Invalid data column: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Expand Down
48 changes: 31 additions & 17 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ pub struct NetworkGlobals<E: EthSpec> {
pub sync_state: RwLock<SyncState>,
/// The current state of the backfill sync.
pub backfill_state: RwLock<BackFillState>,
/// The computed custody subnets and columns is stored to avoid re-computing.
pub custody_subnets: Vec<DataColumnSubnetId>,
pub custody_columns: Vec<ColumnIndex>,
/// The computed sampling subnets and columns is stored to avoid re-computing.
pub sampling_subnets: Vec<DataColumnSubnetId>,
pub sampling_columns: Vec<ColumnIndex>,
/// Network-related configuration. Immutable after initialization.
pub config: Arc<NetworkConfig>,
/// Ethereum chain configuration. Immutable after initialization.
Expand All @@ -45,24 +45,31 @@ impl<E: EthSpec> NetworkGlobals<E> {
config: Arc<NetworkConfig>,
spec: Arc<ChainSpec>,
) -> Self {
let (custody_subnets, custody_columns) = if spec.is_peer_das_scheduled() {
let (sampling_subnets, sampling_columns) = if spec.is_peer_das_scheduled() {
let node_id = enr.node_id().raw();

let custody_subnet_count = local_metadata
.custody_subnet_count()
.copied()
.expect("custody subnet count must be set if PeerDAS is scheduled");
let custody_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
enr.node_id().raw(),
custody_subnet_count,

let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot);

let sampling_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
node_id,
subnet_sampling_size,
&spec,
)
.expect("custody subnet count must be valid")
.expect("sampling subnet count must be valid")
.collect::<Vec<_>>();
let custody_columns = custody_subnets

let sampling_columns = sampling_subnets
.iter()
.flat_map(|subnet| subnet.columns::<E>(&spec))
.sorted()
.collect();
(custody_subnets, custody_columns)

(sampling_subnets, sampling_columns)
} else {
(vec![], vec![])
};
Expand All @@ -76,8 +83,8 @@ impl<E: EthSpec> NetworkGlobals<E> {
gossipsub_subscriptions: RwLock::new(HashSet::new()),
sync_state: RwLock::new(SyncState::Stalled),
backfill_state: RwLock::new(BackFillState::NotRequired),
custody_subnets,
custody_columns,
sampling_subnets,
sampling_columns,
config,
spec,
}
Expand Down Expand Up @@ -197,12 +204,13 @@ mod test {
use types::{Epoch, EthSpec, MainnetEthSpec as E};

#[test]
fn test_custody_subnets() {
fn test_sampling_subnets() {
let log = logging::test_logger();
let mut spec = E::default_spec();
spec.eip7594_fork_epoch = Some(Epoch::new(0));

let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2;
let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot);
let metadata = get_metadata(custody_subnet_count);
let config = Arc::new(NetworkConfig::default());

Expand All @@ -213,17 +221,20 @@ mod test {
config,
Arc::new(spec),
);
assert_eq!(globals.custody_subnets.len(), custody_subnet_count as usize);
assert_eq!(
globals.sampling_subnets.len(),
subnet_sampling_size as usize
);
}

#[test]
fn test_custody_columns() {
fn test_sampling_columns() {
let log = logging::test_logger();
let mut spec = E::default_spec();
spec.eip7594_fork_epoch = Some(Epoch::new(0));

let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2;
let custody_columns_count = spec.number_of_columns / 2;
let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot);
let metadata = get_metadata(custody_subnet_count);
let config = Arc::new(NetworkConfig::default());

Expand All @@ -234,7 +245,10 @@ mod test {
config,
Arc::new(spec),
);
assert_eq!(globals.custody_columns.len(), custody_columns_count);
assert_eq!(
globals.sampling_columns.len(),
subnet_sampling_size as usize
);
}

fn get_metadata(custody_subnet_count: u64) -> MetaData<E> {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
} else {
for column_subnet in &self.network_globals.custody_subnets {
for column_subnet in &self.network_globals.sampling_subnets {
for fork_digest in self.required_gossip_fork_digests() {
let gossip_kind = Subnet::DataColumn(*column_subnet).into();
let topic =
Expand Down
7 changes: 4 additions & 3 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::sync::manager::{BlockProcessType, SyncManager};
use crate::sync::sampling::SamplingConfig;
use crate::sync::peer_sampling::SamplingConfig;
use crate::sync::{SamplingId, SyncMessage};
use crate::NetworkMessage;
use std::sync::Arc;
Expand Down Expand Up @@ -2037,9 +2037,10 @@ fn custody_lookup_happy_path() {
// Should not request blobs
let id = r.expect_block_lookup_request(block.canonical_root());
r.complete_valid_block_request(id, block.into(), true);
let custody_column_count = spec.custody_requirement * spec.data_columns_per_subnet() as u64;
// for each slot we download `samples_per_slot` columns
let sample_column_count = spec.samples_per_slot * spec.data_columns_per_subnet() as u64;
let custody_ids =
r.expect_only_data_columns_by_root_requests(block_root, custody_column_count as usize);
r.expect_only_data_columns_by_root_requests(block_root, sample_column_count as usize);
r.complete_valid_custody_request(custody_ids, data_columns, false);
r.expect_no_active_lookups();
}
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ use super::block_lookups::BlockLookups;
use super::network_context::{
BlockOrBlob, CustodyByRootResult, RangeRequestId, RpcEvent, SyncNetworkContext,
};
use super::peer_sampling::{Sampling, SamplingConfig, SamplingResult};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use super::sampling::{Sampling, SamplingConfig, SamplingResult};
use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor};
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ mod block_lookups;
mod block_sidecar_coupling;
pub mod manager;
mod network_context;
mod peer_sampling;
mod peer_sync_info;
mod range_sync;
mod sampling;

pub use lighthouse_network::service::api_types::SamplingId;
pub use manager::{BatchProcessResult, SyncMessage};
Expand Down
Loading

0 comments on commit f3a5e25

Please sign in to comment.