Skip to content

Commit

Permalink
Merge branch 'unstable' of https://github.com/sigp/lighthouse into si…
Browse files Browse the repository at this point in the history
…ngle_attestation_electra
  • Loading branch information
eserilev committed Oct 10, 2024
2 parents 3b3065e + da290e8 commit b99161d
Show file tree
Hide file tree
Showing 39 changed files with 598 additions and 122 deletions.
15 changes: 15 additions & 0 deletions .github/workflows/test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ jobs:
done
echo "skip_ci=$SKIP_CI" >> $GITHUB_OUTPUT
lockbud:
name: lockbud
runs-on: ubuntu-latest
container:
image: sigmaprime/lockbud:latest
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Install dependencies
run: apt update && apt install -y cmake
- name: Generate code coverage
run: |
cargo lockbud -k deadlock -b -l tokio_util
target-branch-check:
name: target-branch-check
runs-on: ubuntu-latest
Expand Down Expand Up @@ -433,6 +447,7 @@ jobs:
'cargo-udeps',
'compile-with-beta-compiler',
'cli-check',
'lockbud',
]
steps:
- uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3968,6 +3968,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

if let Some(data_columns) = data_columns {
// TODO(das): `available_block includes all sampled columns, but we only need to store
// custody columns. To be clarified in spec.
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
Expand Down
22 changes: 11 additions & 11 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
spec.custody_requirement as usize
};

let custody_column_count =
custody_subnet_count.saturating_mul(spec.data_columns_per_subnet());
let subnet_sampling_size =
std::cmp::max(custody_subnet_count, spec.samples_per_slot as usize);
let sampling_column_count =
subnet_sampling_size.saturating_mul(spec.data_columns_per_subnet());

let inner = DataAvailabilityCheckerInner::new(
OVERFLOW_LRU_CAPACITY,
store,
custody_column_count,
sampling_column_count,
spec.clone(),
)?;
Ok(Self {
Expand All @@ -125,10 +127,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

pub fn get_custody_columns_count(&self) -> usize {
self.availability_cache
.custody_subnet_count()
.saturating_mul(self.spec.data_columns_per_subnet())
pub fn get_sampling_column_count(&self) -> usize {
self.availability_cache.sampling_column_count()
}

/// Checks if the block root is currenlty in the availability cache awaiting import because
Expand All @@ -141,9 +141,9 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.get_execution_valid_block(block_root)
}

/// Return the set of imported blob indexes for `block_root`. Returns None if there is no block
/// Return the set of cached blob indexes for `block_root`. Returns None if there is no block
/// component for `block_root`.
pub fn imported_blob_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
pub fn cached_blob_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.availability_cache
.peek_pending_components(block_root, |components| {
components.map(|components| {
Expand All @@ -156,9 +156,9 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

/// Return the set of imported custody column indexes for `block_root`. Returns None if there is
/// Return the set of cached custody column indexes for `block_root`. Returns None if there is
/// no block component for `block_root`.
pub fn imported_custody_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
pub fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.availability_cache
.peek_pending_components(block_root, |components| {
components.map(|components| components.get_cached_data_columns_indices())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct PendingComponents<E: EthSpec> {

pub enum BlockImportRequirement {
AllBlobs,
CustodyColumns(usize),
ColumnSampling(usize),
}

impl<E: EthSpec> PendingComponents<E> {
Expand Down Expand Up @@ -210,7 +210,7 @@ impl<E: EthSpec> PendingComponents<E> {
.map_or(false, |num_expected_blobs| {
num_expected_blobs == self.num_received_blobs()
}),
BlockImportRequirement::CustodyColumns(num_expected_columns) => {
BlockImportRequirement::ColumnSampling(num_expected_columns) => {
let num_received_data_columns = self.num_received_data_columns();
// No data columns when there are 0 blobs
self.num_expected_blobs()
Expand Down Expand Up @@ -281,7 +281,7 @@ impl<E: EthSpec> PendingComponents<E> {
};
(Some(VariableList::new(verified_blobs)?), None)
}
BlockImportRequirement::CustodyColumns(_) => {
BlockImportRequirement::ColumnSampling(_) => {
let verified_data_columns = verified_data_columns
.into_iter()
.map(|d| d.into_inner())
Expand Down Expand Up @@ -353,28 +353,28 @@ pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
/// This cache holds a limited number of states in memory and reconstructs them
/// from disk when necessary. This is necessary until we merge tree-states
state_cache: StateLRUCache<T>,
/// The number of data columns the node is custodying.
custody_column_count: usize,
/// The number of data columns the node is sampling via subnet sampling.
sampling_column_count: usize,
spec: Arc<ChainSpec>,
}

impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pub fn new(
capacity: NonZeroUsize,
beacon_store: BeaconStore<T>,
custody_column_count: usize,
sampling_column_count: usize,
spec: Arc<ChainSpec>,
) -> Result<Self, AvailabilityCheckError> {
Ok(Self {
critical: RwLock::new(LruCache::new(capacity)),
state_cache: StateLRUCache::new(beacon_store, spec.clone()),
custody_column_count,
sampling_column_count,
spec,
})
}

pub fn custody_subnet_count(&self) -> usize {
self.custody_column_count
pub fn sampling_column_count(&self) -> usize {
self.sampling_column_count
}

/// Returns true if the block root is known, without altering the LRU ordering
Expand Down Expand Up @@ -440,8 +440,8 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
) -> Result<BlockImportRequirement, AvailabilityCheckError> {
let peer_das_enabled = self.spec.is_peer_das_enabled_for_epoch(epoch);
if peer_das_enabled {
Ok(BlockImportRequirement::CustodyColumns(
self.custody_column_count,
Ok(BlockImportRequirement::ColumnSampling(
self.sampling_column_count,
))
} else {
Ok(BlockImportRequirement::AllBlobs)
Expand All @@ -456,7 +456,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
block_import_requirement: &BlockImportRequirement,
pending_components: &PendingComponents<T::EthSpec>,
) -> bool {
let BlockImportRequirement::CustodyColumns(num_expected_columns) = block_import_requirement
let BlockImportRequirement::ColumnSampling(num_expected_columns) = block_import_requirement
else {
return false;
};
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Blobs are stored per block, and data columns are each stored individually
let n_blob_ops_per_block = if self.spec.is_peer_das_scheduled() {
self.data_availability_checker.get_custody_columns_count()
// TODO(das): `available_block includes all sampled columns, but we only need to store
// custody columns. To be clarified in spec PR.
self.data_availability_checker.get_sampling_column_count()
} else {
1
};
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2792,12 +2792,12 @@ pub fn build_log(level: slog::Level, logger_type: LoggerType) -> Logger {
match logger_type {
LoggerType::Test => {
let drain = FullFormat::new(TermDecorator::new().build()).build().fuse();
let drain = Async::new(drain).build().fuse();
let drain = Async::new(drain).chan_size(10_000).build().fuse();
Logger::root(drain.filter_level(level).fuse(), o!())
}
LoggerType::CI => {
let drain = FullFormat::new(ci_decorator()).build().fuse();
let drain = Async::new(drain).build().fuse();
let drain = Async::new(drain).chan_size(10_000).build().fuse();
Logger::root(drain.filter_level(level).fuse(), o!())
}
LoggerType::Null => {
Expand Down
57 changes: 56 additions & 1 deletion beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2514,7 +2514,7 @@ async fn pruning_test(
}

#[tokio::test]
async fn garbage_collect_temp_states_from_failed_block() {
async fn garbage_collect_temp_states_from_failed_block_on_startup() {
let db_path = tempdir().unwrap();

// Wrap these functions to ensure the variables are dropped before we try to open another
Expand Down Expand Up @@ -2571,6 +2571,61 @@ async fn garbage_collect_temp_states_from_failed_block() {
assert_eq!(store.iter_temporary_state_roots().count(), 0);
}

#[tokio::test]
async fn garbage_collect_temp_states_from_failed_block_on_finalization() {
let db_path = tempdir().unwrap();

let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);

let slots_per_epoch = E::slots_per_epoch();

let genesis_state = harness.get_current_state();
let block_slot = Slot::new(2 * slots_per_epoch);
let ((signed_block, _), state) = harness.make_block(genesis_state, block_slot).await;

let (mut block, _) = (*signed_block).clone().deconstruct();

// Mutate the block to make it invalid, and re-sign it.
*block.state_root_mut() = Hash256::repeat_byte(0xff);
let proposer_index = block.proposer_index() as usize;
let block = Arc::new(block.sign(
&harness.validator_keypairs[proposer_index].sk,
&state.fork(),
state.genesis_validators_root(),
&harness.spec,
));

// The block should be rejected, but should store a bunch of temporary states.
harness.set_current_slot(block_slot);
harness
.process_block_result((block, None))
.await
.unwrap_err();

assert_eq!(
store.iter_temporary_state_roots().count(),
block_slot.as_usize() - 1
);

// Finalize the chain without the block, which should result in pruning of all temporary states.
let blocks_required_to_finalize = 3 * slots_per_epoch;
harness.advance_slot();
harness
.extend_chain(
blocks_required_to_finalize as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;

// Check that the finalization migration ran.
assert_ne!(store.get_split_slot(), 0);

// Check that temporary states have been pruned.
assert_eq!(store.iter_temporary_state_roots().count(), 0);
}

#[tokio::test]
async fn weak_subjectivity_sync_easy() {
let num_initial_slots = E::slots_per_epoch() * 11;
Expand Down
33 changes: 31 additions & 2 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_Q
/// slightly, we don't need to adjust the queues during the lifetime of a process.
const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110;

/// Minimum size of dynamically sized queues. Due to integer division we don't want 0 length queues
/// as the processor won't process that message type. 128 is an arbitrary value value >= 1 that
/// seems reasonable.
const MIN_QUEUE_LEN: usize = 128;

/// Maximum number of queued items that will be stored before dropping them
pub struct BeaconProcessorQueueLengths {
aggregate_queue: usize,
Expand Down Expand Up @@ -156,9 +161,15 @@ impl BeaconProcessorQueueLengths {
aggregate_queue: 4096,
unknown_block_aggregate_queue: 1024,
// Capacity for a full slot's worth of attestations if subscribed to all subnets
attestation_queue: active_validator_count / slots_per_epoch,
attestation_queue: std::cmp::max(
active_validator_count / slots_per_epoch,
MIN_QUEUE_LEN,
),
// Capacity for a full slot's worth of attestations if subscribed to all subnets
unknown_block_attestation_queue: active_validator_count / slots_per_epoch,
unknown_block_attestation_queue: std::cmp::max(
active_validator_count / slots_per_epoch,
MIN_QUEUE_LEN,
),
sync_message_queue: 2048,
sync_contribution_queue: 1024,
gossip_voluntary_exit_queue: 4096,
Expand Down Expand Up @@ -1729,3 +1740,21 @@ impl Drop for SendOnDrop {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use types::{BeaconState, ChainSpec, Eth1Data, ForkName, MainnetEthSpec};

#[test]
fn min_queue_len() {
// State with no validators.
let spec = ForkName::latest().make_genesis_spec(ChainSpec::mainnet());
let genesis_time = 0;
let state = BeaconState::<MainnetEthSpec>::new(genesis_time, Eth1Data::default(), &spec);
assert_eq!(state.validators().len(), 0);
let queue_lengths = BeaconProcessorQueueLengths::from_state(&state, &spec).unwrap();
assert_eq!(queue_lengths.attestation_queue, MIN_QUEUE_LEN);
assert_eq!(queue_lengths.unknown_block_attestation_queue, MIN_QUEUE_LEN);
}
}
3 changes: 2 additions & 1 deletion beacon_node/eth1/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,10 +549,11 @@ impl Service {

/// Returns the number of deposits with valid signatures that have been observed.
pub fn get_valid_signature_count(&self) -> Option<usize> {
let highest_safe_block = self.highest_safe_block()?;
self.deposits()
.read()
.cache
.get_valid_signature_count(self.highest_safe_block()?)
.get_valid_signature_count(highest_safe_block)
}

/// Returns the number of deposits with valid signatures that have been observed, without
Expand Down
9 changes: 4 additions & 5 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,18 +389,17 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlock<T>>(
.count()
> 0
{
let custody_columns_indices = &network_globals.custody_columns;

let custody_columns = gossip_verified_data_columns
let sampling_columns_indices = &network_globals.sampling_columns;
let sampling_columns = gossip_verified_data_columns
.into_iter()
.flatten()
.filter(|data_column| custody_columns_indices.contains(&data_column.index()))
.filter(|data_column| sampling_columns_indices.contains(&data_column.index()))
.collect();

// Importing the columns could trigger block import and network publication in the case
// where the block was already seen on gossip.
if let Err(e) =
Box::pin(chain.process_gossip_data_columns(custody_columns, publish_fn)).await
Box::pin(chain.process_gossip_data_columns(sampling_columns, publish_fn)).await
{
let msg = format!("Invalid data column: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Expand Down
Loading

0 comments on commit b99161d

Please sign in to comment.