diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index ee50e3b3843..62aad558eeb 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -546,12 +546,20 @@ impl Eth1ChainBackend for CachingEth1Backend { state.eth1_data().deposit_count }; - match deposit_index.cmp(&deposit_count) { + // [New in Electra:EIP6110] + let deposit_index_limit = + if let Ok(deposit_receipts_start_index) = state.deposit_requests_start_index() { + std::cmp::min(deposit_count, deposit_receipts_start_index) + } else { + deposit_count + }; + + match deposit_index.cmp(&deposit_index_limit) { Ordering::Greater => Err(Error::DepositIndexTooHigh), Ordering::Equal => Ok(vec![]), Ordering::Less => { let next = deposit_index; - let last = std::cmp::min(deposit_count, next + E::MaxDeposits::to_u64()); + let last = std::cmp::min(deposit_index_limit, next + E::MaxDeposits::to_u64()); self.core .deposits() diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 7c8f64a722c..2d017d65391 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -31,6 +31,7 @@ mod validators; mod version; use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3}; +use crate::version::fork_versioned_response; use beacon_chain::{ attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome, validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError, @@ -256,12 +257,15 @@ pub fn prometheus_metrics() -> warp::filters::log::Log( ); // GET beacon/blocks/{block_id}/attestations - let get_beacon_block_attestations = beacon_blocks_path_v1 + let get_beacon_block_attestations = beacon_blocks_path_any .clone() .and(warp::path("attestations")) .and(warp::path::end()) .then( - |block_id: BlockId, + |endpoint_version: EndpointVersion, + block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P1, move || { + task_spawner.blocking_response_task(Priority::P1, move || { let (block, execution_optimistic, finalized) = block_id.blinded_block(&chain)?; - Ok(api_types::GenericResponse::from( - block - .message() - .body() - .attestations() - .map(|att| att.clone_as_attestation()) - .collect::>(), - ) - .add_execution_optimistic_finalized(execution_optimistic, finalized)) + let fork_name = block + .fork_name(&chain.spec) + .map_err(inconsistent_fork_rejection)?; + let atts = block + .message() + .body() + .attestations() + .map(|att| att.clone_as_attestation()) + .collect::>(); + let res = execution_optimistic_finalized_fork_versioned_response( + endpoint_version, + fork_name, + execution_optimistic, + finalized, + &atts, + )?; + Ok(add_consensus_version_header( + warp::reply::json(&res).into_response(), + fork_name, + )) }) }, ); @@ -1750,8 +1766,14 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()); + let beacon_pool_path_any = any_version + .and(warp::path("beacon")) + .and(warp::path("pool")) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()); + // POST beacon/pool/attestations - let post_beacon_pool_attestations = beacon_pool_path + let post_beacon_pool_attestations = beacon_pool_path_any .clone() .and(warp::path("attestations")) .and(warp::path::end()) @@ -1760,7 +1782,11 @@ pub fn serve( .and(reprocess_send_filter) .and(log_filter.clone()) .then( - |task_spawner: TaskSpawner, + // V1 and V2 are identical except V2 has a consensus version header in the request. + // We only require this header for SSZ deserialization, which isn't supported for + // this endpoint presently. + |_endpoint_version: EndpointVersion, + task_spawner: TaskSpawner, chain: Arc>, attestations: Vec>, network_tx: UnboundedSender>, @@ -1781,16 +1807,17 @@ pub fn serve( ); // GET beacon/pool/attestations?committee_index,slot - let get_beacon_pool_attestations = beacon_pool_path + let get_beacon_pool_attestations = beacon_pool_path_any .clone() .and(warp::path("attestations")) .and(warp::path::end()) .and(warp::query::()) .then( - |task_spawner: TaskSpawner, + |endpoint_version: EndpointVersion, + task_spawner: TaskSpawner, chain: Arc>, query: api_types::AttestationPoolQuery| { - task_spawner.blocking_json_task(Priority::P1, move || { + task_spawner.blocking_response_task(Priority::P1, move || { let query_filter = |data: &AttestationData| { query.slot.map_or(true, |slot| slot == data.slot) && query @@ -1807,20 +1834,48 @@ pub fn serve( .filter(|&att| query_filter(att.data())) .cloned(), ); - Ok(api_types::GenericResponse::from(attestations)) + // Use the current slot to find the fork version, and convert all messages to the + // current fork's format. This is to ensure consistent message types matching + // `Eth-Consensus-Version`. + let current_slot = + chain + .slot_clock + .now() + .ok_or(warp_utils::reject::custom_server_error( + "unable to read slot clock".to_string(), + ))?; + let fork_name = chain.spec.fork_name_at_slot::(current_slot); + let attestations = attestations + .into_iter() + .filter(|att| { + (fork_name.electra_enabled() && matches!(att, Attestation::Electra(_))) + || (!fork_name.electra_enabled() + && matches!(att, Attestation::Base(_))) + }) + .collect::>(); + + let res = fork_versioned_response(endpoint_version, fork_name, &attestations)?; + Ok(add_consensus_version_header( + warp::reply::json(&res).into_response(), + fork_name, + )) }) }, ); // POST beacon/pool/attester_slashings - let post_beacon_pool_attester_slashings = beacon_pool_path + let post_beacon_pool_attester_slashings = beacon_pool_path_any .clone() .and(warp::path("attester_slashings")) .and(warp::path::end()) .and(warp_utils::json::json()) .and(network_tx_filter.clone()) .then( - |task_spawner: TaskSpawner, + // V1 and V2 are identical except V2 has a consensus version header in the request. + // We only require this header for SSZ deserialization, which isn't supported for + // this endpoint presently. + |_endpoint_version: EndpointVersion, + task_spawner: TaskSpawner, chain: Arc>, slashing: AttesterSlashing, network_tx: UnboundedSender>| { @@ -1857,18 +1912,45 @@ pub fn serve( ); // GET beacon/pool/attester_slashings - let get_beacon_pool_attester_slashings = beacon_pool_path - .clone() - .and(warp::path("attester_slashings")) - .and(warp::path::end()) - .then( - |task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P1, move || { - let attestations = chain.op_pool.get_all_attester_slashings(); - Ok(api_types::GenericResponse::from(attestations)) - }) - }, - ); + let get_beacon_pool_attester_slashings = + beacon_pool_path_any + .clone() + .and(warp::path("attester_slashings")) + .and(warp::path::end()) + .then( + |endpoint_version: EndpointVersion, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_response_task(Priority::P1, move || { + let slashings = chain.op_pool.get_all_attester_slashings(); + + // Use the current slot to find the fork version, and convert all messages to the + // current fork's format. This is to ensure consistent message types matching + // `Eth-Consensus-Version`. + let current_slot = chain.slot_clock.now().ok_or( + warp_utils::reject::custom_server_error( + "unable to read slot clock".to_string(), + ), + )?; + let fork_name = chain.spec.fork_name_at_slot::(current_slot); + let slashings = slashings + .into_iter() + .filter(|slashing| { + (fork_name.electra_enabled() + && matches!(slashing, AttesterSlashing::Electra(_))) + || (!fork_name.electra_enabled() + && matches!(slashing, AttesterSlashing::Base(_))) + }) + .collect::>(); + + let res = fork_versioned_response(endpoint_version, fork_name, &slashings)?; + Ok(add_consensus_version_header( + warp::reply::json(&res).into_response(), + fork_name, + )) + }) + }, + ); // POST beacon/pool/proposer_slashings let post_beacon_pool_proposer_slashings = beacon_pool_path @@ -3175,7 +3257,7 @@ pub fn serve( ); // GET validator/aggregate_attestation?attestation_data_root,slot - let get_validator_aggregate_attestation = eth_v1 + let get_validator_aggregate_attestation = any_version .and(warp::path("validator")) .and(warp::path("aggregate_attestation")) .and(warp::path::end()) @@ -3184,29 +3266,45 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( - |query: api_types::ValidatorAggregateAttestationQuery, + |endpoint_version: EndpointVersion, + query: api_types::ValidatorAggregateAttestationQuery, not_synced_filter: Result<(), Rejection>, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P0, move || { not_synced_filter?; - chain - .get_pre_electra_aggregated_attestation_by_slot_and_root( + let res = if endpoint_version == V2 { + let Some(committee_index) = query.committee_index else { + return Err(warp_utils::reject::custom_bad_request( + "missing committee index".to_string(), + )); + }; + chain.get_aggregated_attestation_electra( query.slot, &query.attestation_data_root, + committee_index, ) - .map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "unable to fetch aggregate: {:?}", - e - )) - })? - .map(api_types::GenericResponse::from) - .ok_or_else(|| { - warp_utils::reject::custom_not_found( - "no matching aggregate found".to_string(), - ) - }) + } else if endpoint_version == V1 { + // Do nothing + chain.get_pre_electra_aggregated_attestation_by_slot_and_root( + query.slot, + &query.attestation_data_root, + ) + } else { + return Err(unsupported_version_rejection(endpoint_version)); + }; + res.map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "unable to fetch aggregate: {:?}", + e + )) + })? + .map(api_types::GenericResponse::from) + .ok_or_else(|| { + warp_utils::reject::custom_not_found( + "no matching aggregate found".to_string(), + ) + }) }) }, ); @@ -3302,7 +3400,7 @@ pub fn serve( ); // POST validator/aggregate_and_proofs - let post_validator_aggregate_and_proofs = eth_v1 + let post_validator_aggregate_and_proofs = any_version .and(warp::path("validator")) .and(warp::path("aggregate_and_proofs")) .and(warp::path::end()) @@ -3313,7 +3411,11 @@ pub fn serve( .and(network_tx_filter.clone()) .and(log_filter.clone()) .then( - |not_synced_filter: Result<(), Rejection>, + // V1 and V2 are identical except V2 has a consensus version header in the request. + // We only require this header for SSZ deserialization, which isn't supported for + // this endpoint presently. + |_endpoint_version: EndpointVersion, + not_synced_filter: Result<(), Rejection>, task_spawner: TaskSpawner, chain: Arc>, aggregates: Vec>, diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index 1e20280da11..db8a0ab2b54 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -150,8 +150,13 @@ async fn attestations_across_fork_with_skip_slots() { .collect::>(); assert!(!unaggregated_attestations.is_empty()); + let fork_name = harness.spec.fork_name_at_slot::(fork_slot); client - .post_beacon_pool_attestations(&unaggregated_attestations) + .post_beacon_pool_attestations_v1(&unaggregated_attestations) + .await + .unwrap(); + client + .post_beacon_pool_attestations_v2(&unaggregated_attestations, fork_name) .await .unwrap(); @@ -162,7 +167,11 @@ async fn attestations_across_fork_with_skip_slots() { assert!(!signed_aggregates.is_empty()); client - .post_validator_aggregate_and_proof(&signed_aggregates) + .post_validator_aggregate_and_proof_v1(&signed_aggregates) + .await + .unwrap(); + client + .post_validator_aggregate_and_proof_v2(&signed_aggregates, fork_name) .await .unwrap(); } diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 14673d23e12..2f417cf7ba5 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -893,9 +893,10 @@ async fn queue_attestations_from_http() { .flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att)) .collect::>(); + let fork_name = tester.harness.spec.fork_name_at_slot::(attestation_slot); let attestation_future = tokio::spawn(async move { client - .post_beacon_pool_attestations(&attestations) + .post_beacon_pool_attestations_v2(&attestations, fork_name) .await .expect("attestations should be processed successfully") }); diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index cb4ce346827..f511f25d327 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1668,7 +1668,7 @@ impl ApiTester { for block_id in self.interesting_block_ids() { let result = self .client - .get_beacon_blocks_attestations(block_id.0) + .get_beacon_blocks_attestations_v2(block_id.0) .await .unwrap() .map(|res| res.data); @@ -1699,9 +1699,9 @@ impl ApiTester { self } - pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self { + pub async fn test_post_beacon_pool_attestations_valid_v1(mut self) -> Self { self.client - .post_beacon_pool_attestations(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); @@ -1713,7 +1713,25 @@ impl ApiTester { self } - pub async fn test_post_beacon_pool_attestations_invalid(mut self) -> Self { + pub async fn test_post_beacon_pool_attestations_valid_v2(mut self) -> Self { + let fork_name = self + .attestations + .first() + .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) + .unwrap(); + self.client + .post_beacon_pool_attestations_v2(self.attestations.as_slice(), fork_name) + .await + .unwrap(); + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid attestation should be sent to network" + ); + + self + } + + pub async fn test_post_beacon_pool_attestations_invalid_v1(mut self) -> Self { let mut attestations = Vec::new(); for attestation in &self.attestations { let mut invalid_attestation = attestation.clone(); @@ -1726,7 +1744,7 @@ impl ApiTester { let err = self .client - .post_beacon_pool_attestations(attestations.as_slice()) + .post_beacon_pool_attestations_v1(attestations.as_slice()) .await .unwrap_err(); @@ -1749,6 +1767,48 @@ impl ApiTester { self } + pub async fn test_post_beacon_pool_attestations_invalid_v2(mut self) -> Self { + let mut attestations = Vec::new(); + for attestation in &self.attestations { + let mut invalid_attestation = attestation.clone(); + invalid_attestation.data_mut().slot += 1; + + // add both to ensure we only fail on invalid attestations + attestations.push(attestation.clone()); + attestations.push(invalid_attestation); + } + + let fork_name = self + .attestations + .first() + .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) + .unwrap(); + + let err_v2 = self + .client + .post_beacon_pool_attestations_v2(attestations.as_slice(), fork_name) + .await + .unwrap_err(); + + match err_v2 { + Error::ServerIndexedMessage(IndexedErrorMessage { + code, + message: _, + failures, + }) => { + assert_eq!(code, 400); + assert_eq!(failures.len(), self.attestations.len()); + } + _ => panic!("query did not fail correctly"), + } + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "if some attestations are valid, we should send them to the network" + ); + + self + } pub async fn test_get_beacon_light_client_bootstrap(self) -> Self { let block_id = BlockId(CoreBlockId::Finalized); @@ -1812,7 +1872,7 @@ impl ApiTester { pub async fn test_get_beacon_pool_attestations(self) -> Self { let result = self .client - .get_beacon_pool_attestations(None, None) + .get_beacon_pool_attestations_v1(None, None) .await .unwrap() .data; @@ -1822,12 +1882,38 @@ impl ApiTester { assert_eq!(result, expected); + let result = self + .client + .get_beacon_pool_attestations_v2(None, None) + .await + .unwrap() + .data; + assert_eq!(result, expected); + + self + } + + pub async fn test_post_beacon_pool_attester_slashings_valid_v1(mut self) -> Self { + self.client + .post_beacon_pool_attester_slashings_v1(&self.attester_slashing) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid attester slashing should be sent to network" + ); + self } - pub async fn test_post_beacon_pool_attester_slashings_valid(mut self) -> Self { + pub async fn test_post_beacon_pool_attester_slashings_valid_v2(mut self) -> Self { + let fork_name = self + .chain + .spec + .fork_name_at_slot::(self.attester_slashing.attestation_1().data().slot); self.client - .post_beacon_pool_attester_slashings(&self.attester_slashing) + .post_beacon_pool_attester_slashings_v2(&self.attester_slashing, fork_name) .await .unwrap(); @@ -1839,7 +1925,31 @@ impl ApiTester { self } - pub async fn test_post_beacon_pool_attester_slashings_invalid(mut self) -> Self { + pub async fn test_post_beacon_pool_attester_slashings_invalid_v1(mut self) -> Self { + let mut slashing = self.attester_slashing.clone(); + match &mut slashing { + AttesterSlashing::Base(ref mut slashing) => { + slashing.attestation_1.data.slot += 1; + } + AttesterSlashing::Electra(ref mut slashing) => { + slashing.attestation_1.data.slot += 1; + } + } + + self.client + .post_beacon_pool_attester_slashings_v1(&slashing) + .await + .unwrap_err(); + + assert!( + self.network_rx.network_recv.recv().now_or_never().is_none(), + "invalid attester slashing should not be sent to network" + ); + + self + } + + pub async fn test_post_beacon_pool_attester_slashings_invalid_v2(mut self) -> Self { let mut slashing = self.attester_slashing.clone(); match &mut slashing { AttesterSlashing::Base(ref mut slashing) => { @@ -1850,8 +1960,12 @@ impl ApiTester { } } + let fork_name = self + .chain + .spec + .fork_name_at_slot::(self.attester_slashing.attestation_1().data().slot); self.client - .post_beacon_pool_attester_slashings(&slashing) + .post_beacon_pool_attester_slashings_v2(&slashing, fork_name) .await .unwrap_err(); @@ -1866,7 +1980,7 @@ impl ApiTester { pub async fn test_get_beacon_pool_attester_slashings(self) -> Self { let result = self .client - .get_beacon_pool_attester_slashings() + .get_beacon_pool_attester_slashings_v1() .await .unwrap() .data; @@ -1875,6 +1989,14 @@ impl ApiTester { assert_eq!(result, expected); + let result = self + .client + .get_beacon_pool_attester_slashings_v2() + .await + .unwrap() + .data; + assert_eq!(result, expected); + self } @@ -3233,30 +3355,52 @@ impl ApiTester { } pub async fn test_get_validator_aggregate_attestation(self) -> Self { - let attestation = self + if self .chain - .head_beacon_block() - .message() - .body() - .attestations() - .next() - .unwrap() - .clone_as_attestation(); - - let result = self - .client - .get_validator_aggregate_attestation( - attestation.data().slot, - attestation.data().tree_hash_root(), - ) - .await - .unwrap() - .unwrap() - .data; + .spec + .fork_name_at_slot::(self.chain.slot().unwrap()) + .electra_enabled() + { + for attestation in self.chain.naive_aggregation_pool.read().iter() { + let result = self + .client + .get_validator_aggregate_attestation_v2( + attestation.data().slot, + attestation.data().tree_hash_root(), + attestation.committee_index().unwrap(), + ) + .await + .unwrap() + .unwrap() + .data; + let expected = attestation; - let expected = attestation; + assert_eq!(&result, expected); + } + } else { + let attestation = self + .chain + .head_beacon_block() + .message() + .body() + .attestations() + .next() + .unwrap() + .clone_as_attestation(); + let result = self + .client + .get_validator_aggregate_attestation_v1( + attestation.data().slot, + attestation.data().tree_hash_root(), + ) + .await + .unwrap() + .unwrap() + .data; + let expected = attestation; - assert_eq!(result, expected); + assert_eq!(result, expected); + } self } @@ -3355,11 +3499,11 @@ impl ApiTester { ) } - pub async fn test_get_validator_aggregate_and_proofs_valid(mut self) -> Self { + pub async fn test_get_validator_aggregate_and_proofs_valid_v1(mut self) -> Self { let aggregate = self.get_aggregate().await; self.client - .post_validator_aggregate_and_proof::(&[aggregate]) + .post_validator_aggregate_and_proof_v1::(&[aggregate]) .await .unwrap(); @@ -3368,7 +3512,7 @@ impl ApiTester { self } - pub async fn test_get_validator_aggregate_and_proofs_invalid(mut self) -> Self { + pub async fn test_get_validator_aggregate_and_proofs_invalid_v1(mut self) -> Self { let mut aggregate = self.get_aggregate().await; match &mut aggregate { SignedAggregateAndProof::Base(ref mut aggregate) => { @@ -3380,7 +3524,7 @@ impl ApiTester { } self.client - .post_validator_aggregate_and_proof::(&[aggregate]) + .post_validator_aggregate_and_proof_v1::(&[aggregate.clone()]) .await .unwrap_err(); @@ -3389,6 +3533,46 @@ impl ApiTester { self } + pub async fn test_get_validator_aggregate_and_proofs_valid_v2(mut self) -> Self { + let aggregate = self.get_aggregate().await; + let fork_name = self + .chain + .spec + .fork_name_at_slot::(aggregate.message().aggregate().data().slot); + self.client + .post_validator_aggregate_and_proof_v2::(&[aggregate], fork_name) + .await + .unwrap(); + + assert!(self.network_rx.network_recv.recv().await.is_some()); + + self + } + + pub async fn test_get_validator_aggregate_and_proofs_invalid_v2(mut self) -> Self { + let mut aggregate = self.get_aggregate().await; + match &mut aggregate { + SignedAggregateAndProof::Base(ref mut aggregate) => { + aggregate.message.aggregate.data.slot += 1; + } + SignedAggregateAndProof::Electra(ref mut aggregate) => { + aggregate.message.aggregate.data.slot += 1; + } + } + + let fork_name = self + .chain + .spec + .fork_name_at_slot::(aggregate.message().aggregate().data().slot); + self.client + .post_validator_aggregate_and_proof_v2::(&[aggregate], fork_name) + .await + .unwrap_err(); + assert!(self.network_rx.network_recv.recv().now_or_never().is_none()); + + self + } + pub async fn test_get_validator_beacon_committee_subscriptions(mut self) -> Self { let subscription = BeaconCommitteeSubscription { validator_index: 0, @@ -3484,7 +3668,7 @@ impl ApiTester { pub async fn test_post_validator_register_validator_slashed(self) -> Self { // slash a validator self.client - .post_beacon_pool_attester_slashings(&self.attester_slashing) + .post_beacon_pool_attester_slashings_v1(&self.attester_slashing) .await .unwrap(); @@ -3597,7 +3781,7 @@ impl ApiTester { // Attest to the current slot self.client - .post_beacon_pool_attestations(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); @@ -5237,7 +5421,7 @@ impl ApiTester { // Attest to the current slot self.client - .post_beacon_pool_attestations(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); @@ -5292,7 +5476,7 @@ impl ApiTester { let expected_attestation_len = self.attestations.len(); self.client - .post_beacon_pool_attestations(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(self.attestations.as_slice()) .await .unwrap(); @@ -5801,34 +5985,66 @@ async fn post_beacon_blocks_duplicate() { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn beacon_pools_post_attestations_valid() { +async fn beacon_pools_post_attestations_valid_v1() { + ApiTester::new() + .await + .test_post_beacon_pool_attestations_valid_v1() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn beacon_pools_post_attestations_invalid_v1() { + ApiTester::new() + .await + .test_post_beacon_pool_attestations_invalid_v1() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn beacon_pools_post_attestations_valid_v2() { ApiTester::new() .await - .test_post_beacon_pool_attestations_valid() + .test_post_beacon_pool_attestations_valid_v2() .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn beacon_pools_post_attestations_invalid() { +async fn beacon_pools_post_attestations_invalid_v2() { ApiTester::new() .await - .test_post_beacon_pool_attestations_invalid() + .test_post_beacon_pool_attestations_invalid_v2() .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn beacon_pools_post_attester_slashings_valid() { +async fn beacon_pools_post_attester_slashings_valid_v1() { ApiTester::new() .await - .test_post_beacon_pool_attester_slashings_valid() + .test_post_beacon_pool_attester_slashings_valid_v1() .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn beacon_pools_post_attester_slashings_invalid() { +async fn beacon_pools_post_attester_slashings_invalid_v1() { ApiTester::new() .await - .test_post_beacon_pool_attester_slashings_invalid() + .test_post_beacon_pool_attester_slashings_invalid_v1() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn beacon_pools_post_attester_slashings_valid_v2() { + ApiTester::new() + .await + .test_post_beacon_pool_attester_slashings_valid_v2() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn beacon_pools_post_attester_slashings_invalid_v2() { + ApiTester::new() + .await + .test_post_beacon_pool_attester_slashings_invalid_v2() .await; } @@ -6156,36 +6372,70 @@ async fn get_validator_aggregate_attestation_with_skip_slots() { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn get_validator_aggregate_and_proofs_valid() { +async fn get_validator_aggregate_and_proofs_valid_v1() { + ApiTester::new() + .await + .test_get_validator_aggregate_and_proofs_valid_v1() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_validator_aggregate_and_proofs_valid_with_skip_slots_v1() { + ApiTester::new() + .await + .skip_slots(E::slots_per_epoch() * 2) + .test_get_validator_aggregate_and_proofs_valid_v1() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_validator_aggregate_and_proofs_valid_v2() { + ApiTester::new() + .await + .test_get_validator_aggregate_and_proofs_valid_v2() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_validator_aggregate_and_proofs_valid_with_skip_slots_v2() { + ApiTester::new() + .await + .skip_slots(E::slots_per_epoch() * 2) + .test_get_validator_aggregate_and_proofs_valid_v2() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_validator_aggregate_and_proofs_invalid_v1() { ApiTester::new() .await - .test_get_validator_aggregate_and_proofs_valid() + .test_get_validator_aggregate_and_proofs_invalid_v1() .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn get_validator_aggregate_and_proofs_valid_with_skip_slots() { +async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots_v1() { ApiTester::new() .await .skip_slots(E::slots_per_epoch() * 2) - .test_get_validator_aggregate_and_proofs_valid() + .test_get_validator_aggregate_and_proofs_invalid_v1() .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn get_validator_aggregate_and_proofs_invalid() { +async fn get_validator_aggregate_and_proofs_invalid_v2() { ApiTester::new() .await - .test_get_validator_aggregate_and_proofs_invalid() + .test_get_validator_aggregate_and_proofs_invalid_v2() .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots() { +async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots_v2() { ApiTester::new() .await .skip_slots(E::slots_per_epoch() * 2) - .test_get_validator_aggregate_and_proofs_invalid() + .test_get_validator_aggregate_and_proofs_invalid_v2() .await; } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 5a51aaec5a2..6d000f576f9 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -346,6 +346,19 @@ impl BeaconNodeHttpClient { Ok(()) } + /// Perform a HTTP POST request with a custom timeout and consensus header. + async fn post_with_timeout_and_consensus_header( + &self, + url: U, + body: &T, + timeout: Duration, + fork_name: ForkName, + ) -> Result<(), Error> { + self.post_generic_with_consensus_version(url, body, Some(timeout), fork_name) + .await?; + Ok(()) + } + /// Perform a HTTP POST request with a custom timeout, returning a JSON response. async fn post_with_timeout_and_response( &self, @@ -376,25 +389,6 @@ impl BeaconNodeHttpClient { ok_or_error(response).await } - /// Generic POST function supporting arbitrary responses and timeouts. - /// Does not include Content-Type application/json in the request header. - async fn post_generic_json_without_content_type_header( - &self, - url: U, - body: &T, - timeout: Option, - ) -> Result { - let mut builder = self.client.post(url); - if let Some(timeout) = timeout { - builder = builder.timeout(timeout); - } - - let serialized_body = serde_json::to_vec(body).map_err(Error::InvalidJson)?; - - let response = builder.body(serialized_body).send().await?; - ok_or_error(response).await - } - /// Generic POST function supporting arbitrary responses and timeouts. async fn post_generic_with_consensus_version( &self, @@ -1228,10 +1222,10 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } - /// `GET beacon/blocks/{block_id}/attestations` + /// `GET v1/beacon/blocks/{block_id}/attestations` /// /// Returns `Ok(None)` on a 404 error. - pub async fn get_beacon_blocks_attestations( + pub async fn get_beacon_blocks_attestations_v1( &self, block_id: BlockId, ) -> Result>>>, Error> { @@ -1247,8 +1241,28 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } - /// `POST beacon/pool/attestations` - pub async fn post_beacon_pool_attestations( + /// `GET v2/beacon/blocks/{block_id}/attestations` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_beacon_blocks_attestations_v2( + &self, + block_id: BlockId, + ) -> Result>>>, Error> + { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("blocks") + .push(&block_id.to_string()) + .push("attestations"); + + self.get_opt(path).await + } + + /// `POST v1/beacon/pool/attestations` + pub async fn post_beacon_pool_attestations_v1( &self, attestations: &[Attestation], ) -> Result<(), Error> { @@ -1266,8 +1280,33 @@ impl BeaconNodeHttpClient { Ok(()) } - /// `GET beacon/pool/attestations?slot,committee_index` - pub async fn get_beacon_pool_attestations( + /// `POST v2/beacon/pool/attestations` + pub async fn post_beacon_pool_attestations_v2( + &self, + attestations: &[Attestation], + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("attestations"); + + self.post_with_timeout_and_consensus_header( + path, + &attestations, + self.timeouts.attestation, + fork_name, + ) + .await?; + + Ok(()) + } + + /// `GET v1/beacon/pool/attestations?slot,committee_index` + pub async fn get_beacon_pool_attestations_v1( &self, slot: Option, committee_index: Option, @@ -1293,8 +1332,35 @@ impl BeaconNodeHttpClient { self.get(path).await } - /// `POST beacon/pool/attester_slashings` - pub async fn post_beacon_pool_attester_slashings( + /// `GET v2/beacon/pool/attestations?slot,committee_index` + pub async fn get_beacon_pool_attestations_v2( + &self, + slot: Option, + committee_index: Option, + ) -> Result>>, Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("attestations"); + + if let Some(slot) = slot { + path.query_pairs_mut() + .append_pair("slot", &slot.to_string()); + } + + if let Some(index) = committee_index { + path.query_pairs_mut() + .append_pair("committee_index", &index.to_string()); + } + + self.get(path).await + } + + /// `POST v1/beacon/pool/attester_slashings` + pub async fn post_beacon_pool_attester_slashings_v1( &self, slashing: &AttesterSlashing, ) -> Result<(), Error> { @@ -1306,14 +1372,33 @@ impl BeaconNodeHttpClient { .push("pool") .push("attester_slashings"); - self.post_generic_json_without_content_type_header(path, slashing, None) + self.post_generic(path, slashing, None).await?; + + Ok(()) + } + + /// `POST v2/beacon/pool/attester_slashings` + pub async fn post_beacon_pool_attester_slashings_v2( + &self, + slashing: &AttesterSlashing, + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("attester_slashings"); + + self.post_generic_with_consensus_version(path, slashing, None, fork_name) .await?; Ok(()) } - /// `GET beacon/pool/attester_slashings` - pub async fn get_beacon_pool_attester_slashings( + /// `GET v1/beacon/pool/attester_slashings` + pub async fn get_beacon_pool_attester_slashings_v1( &self, ) -> Result>>, Error> { let mut path = self.eth_path(V1)?; @@ -1327,6 +1412,21 @@ impl BeaconNodeHttpClient { self.get(path).await } + /// `GET v2/beacon/pool/attester_slashings` + pub async fn get_beacon_pool_attester_slashings_v2( + &self, + ) -> Result>>, Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("pool") + .push("attester_slashings"); + + self.get(path).await + } + /// `POST beacon/pool/proposer_slashings` pub async fn post_beacon_pool_proposer_slashings( &self, @@ -2216,8 +2316,8 @@ impl BeaconNodeHttpClient { self.get_with_timeout(path, self.timeouts.attestation).await } - /// `GET validator/aggregate_attestation?slot,attestation_data_root` - pub async fn get_validator_aggregate_attestation( + /// `GET v1/validator/aggregate_attestation?slot,attestation_data_root` + pub async fn get_validator_aggregate_attestation_v1( &self, slot: Slot, attestation_data_root: Hash256, @@ -2240,6 +2340,32 @@ impl BeaconNodeHttpClient { .await } + /// `GET v2/validator/aggregate_attestation?slot,attestation_data_root,committee_index` + pub async fn get_validator_aggregate_attestation_v2( + &self, + slot: Slot, + attestation_data_root: Hash256, + committee_index: CommitteeIndex, + ) -> Result>>, Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("aggregate_attestation"); + + path.query_pairs_mut() + .append_pair("slot", &slot.to_string()) + .append_pair( + "attestation_data_root", + &format!("{:?}", attestation_data_root), + ) + .append_pair("committee_index", &committee_index.to_string()); + + self.get_opt_with_timeout(path, self.timeouts.attestation) + .await + } + /// `GET validator/sync_committee_contribution` pub async fn get_validator_sync_committee_contribution( &self, @@ -2335,8 +2461,8 @@ impl BeaconNodeHttpClient { .await } - /// `POST validator/aggregate_and_proofs` - pub async fn post_validator_aggregate_and_proof( + /// `POST v1/validator/aggregate_and_proofs` + pub async fn post_validator_aggregate_and_proof_v1( &self, aggregates: &[SignedAggregateAndProof], ) -> Result<(), Error> { @@ -2353,6 +2479,30 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST v2/validator/aggregate_and_proofs` + pub async fn post_validator_aggregate_and_proof_v2( + &self, + aggregates: &[SignedAggregateAndProof], + fork_name: ForkName, + ) -> Result<(), Error> { + let mut path = self.eth_path(V2)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("aggregate_and_proofs"); + + self.post_with_timeout_and_consensus_header( + path, + &aggregates, + self.timeouts.attestation, + fork_name, + ) + .await?; + + Ok(()) + } + /// `POST validator/beacon_committee_subscriptions` pub async fn post_validator_beacon_committee_subscriptions( &self, diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 70aa5aab3e2..d399bc2bd01 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -780,6 +780,8 @@ pub struct ValidatorAttestationDataQuery { pub struct ValidatorAggregateAttestationQuery { pub attestation_data_root: Hash256, pub slot: Slot, + #[serde(skip_serializing_if = "Option::is_none")] + pub committee_index: Option, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 88993267a94..7b53a98caa1 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -1,6 +1,6 @@ use crate::slot_data::SlotData; -use crate::Checkpoint; use crate::{test_utils::TestRandom, Hash256, Slot}; +use crate::{Checkpoint, ForkVersionDeserialize}; use derivative::Derivative; use safe_arith::ArithError; use serde::{Deserialize, Serialize}; @@ -26,6 +26,12 @@ pub enum Error { InvalidCommitteeIndex, } +impl From for Error { + fn from(e: ssz_types::Error) -> Self { + Error::SszTypesError(e) + } +} + #[superstruct( variants(Base, Electra), variant_attributes( @@ -487,6 +493,46 @@ impl<'a, E: EthSpec> From> for AttestationRef<'a, E> } } +impl ForkVersionDeserialize for Attestation { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::Value, + fork_name: crate::ForkName, + ) -> Result { + if fork_name.electra_enabled() { + let attestation: AttestationElectra = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(Attestation::Electra(attestation)) + } else { + let attestation: AttestationBase = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(Attestation::Base(attestation)) + } + } +} + +impl ForkVersionDeserialize for Vec> { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::Value, + fork_name: crate::ForkName, + ) -> Result { + if fork_name.electra_enabled() { + let attestations: Vec> = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(attestations + .into_iter() + .map(Attestation::Electra) + .collect::>()) + } else { + let attestations: Vec> = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(attestations + .into_iter() + .map(Attestation::Base) + .collect::>()) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/consensus/types/src/attester_slashing.rs b/consensus/types/src/attester_slashing.rs index c8e2fb4f821..f6aa654d445 100644 --- a/consensus/types/src/attester_slashing.rs +++ b/consensus/types/src/attester_slashing.rs @@ -171,6 +171,29 @@ impl TestRandom for AttesterSlashing { } } +impl crate::ForkVersionDeserialize for Vec> { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::Value, + fork_name: crate::ForkName, + ) -> Result { + if fork_name.electra_enabled() { + let slashings: Vec> = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(slashings + .into_iter() + .map(AttesterSlashing::Electra) + .collect::>()) + } else { + let slashings: Vec> = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(slashings + .into_iter() + .map(AttesterSlashing::Base) + .collect::>()) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 0cff39546dd..30fe508a2c2 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -287,17 +287,21 @@ impl AttestationService { // Then download, sign and publish a `SignedAggregateAndProof` for each // validator that is elected to aggregate for this `slot` and // `committee_index`. - self.produce_and_publish_aggregates(&attestation_data, &validator_duties) - .await - .map_err(move |e| { - crit!( - log, - "Error during attestation routine"; - "error" => format!("{:?}", e), - "committee_index" => committee_index, - "slot" => slot.as_u64(), - ) - })?; + self.produce_and_publish_aggregates( + &attestation_data, + committee_index, + &validator_duties, + ) + .await + .map_err(move |e| { + crit!( + log, + "Error during attestation routine"; + "error" => format!("{:?}", e), + "committee_index" => committee_index, + "slot" => slot.as_u64(), + ) + })?; } Ok(()) @@ -445,6 +449,11 @@ impl AttestationService { warn!(log, "No attestations were published"); return Ok(None); } + let fork_name = self + .context + .eth2_config + .spec + .fork_name_at_slot::(attestation_data.slot); // Post the attestations to the BN. match self @@ -458,9 +467,15 @@ impl AttestationService { &metrics::ATTESTATION_SERVICE_TIMES, &[metrics::ATTESTATIONS_HTTP_POST], ); - beacon_node - .post_beacon_pool_attestations(attestations) - .await + if fork_name.electra_enabled() { + beacon_node + .post_beacon_pool_attestations_v2(attestations, fork_name) + .await + } else { + beacon_node + .post_beacon_pool_attestations_v1(attestations) + .await + } }, ) .await @@ -504,6 +519,7 @@ impl AttestationService { async fn produce_and_publish_aggregates( &self, attestation_data: &AttestationData, + committee_index: CommitteeIndex, validator_duties: &[DutyAndProof], ) -> Result<(), String> { let log = self.context.log(); @@ -516,6 +532,12 @@ impl AttestationService { return Ok(()); } + let fork_name = self + .context + .eth2_config + .spec + .fork_name_at_slot::(attestation_data.slot); + let aggregated_attestation = &self .beacon_nodes .first_success( @@ -526,17 +548,36 @@ impl AttestationService { &metrics::ATTESTATION_SERVICE_TIMES, &[metrics::AGGREGATES_HTTP_GET], ); - beacon_node - .get_validator_aggregate_attestation( - attestation_data.slot, - attestation_data.tree_hash_root(), - ) - .await - .map_err(|e| { - format!("Failed to produce an aggregate attestation: {:?}", e) - })? - .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data)) - .map(|result| result.data) + if fork_name.electra_enabled() { + beacon_node + .get_validator_aggregate_attestation_v2( + attestation_data.slot, + attestation_data.tree_hash_root(), + committee_index, + ) + .await + .map_err(|e| { + format!("Failed to produce an aggregate attestation: {:?}", e) + })? + .ok_or_else(|| { + format!("No aggregate available for {:?}", attestation_data) + }) + .map(|result| result.data) + } else { + beacon_node + .get_validator_aggregate_attestation_v1( + attestation_data.slot, + attestation_data.tree_hash_root(), + ) + .await + .map_err(|e| { + format!("Failed to produce an aggregate attestation: {:?}", e) + })? + .ok_or_else(|| { + format!("No aggregate available for {:?}", attestation_data) + }) + .map(|result| result.data) + } }, ) .await @@ -604,9 +645,20 @@ impl AttestationService { &metrics::ATTESTATION_SERVICE_TIMES, &[metrics::AGGREGATES_HTTP_POST], ); - beacon_node - .post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice) - .await + if fork_name.electra_enabled() { + beacon_node + .post_validator_aggregate_and_proof_v2( + signed_aggregate_and_proofs_slice, + fork_name, + ) + .await + } else { + beacon_node + .post_validator_aggregate_and_proof_v1( + signed_aggregate_and_proofs_slice, + ) + .await + } }, ) .await