From 34ce5ccccdc880c62f068a090647b9b0a495783f Mon Sep 17 00:00:00 2001 From: Milos Stankovic <82043364+morph-dev@users.noreply.github.com> Date: Wed, 13 Mar 2024 08:51:04 +0200 Subject: [PATCH] feat: add state peertests (#1210) --- Cargo.lock | 1 + ethportal-peertest/Cargo.toml | 1 + ethportal-peertest/src/scenarios/mod.rs | 1 + ethportal-peertest/src/scenarios/state.rs | 97 ++++++++++++ ethportal-peertest/src/utils.rs | 176 +++++++++++++++++----- tests/self_peertest.rs | 37 +++++ 6 files changed, 279 insertions(+), 34 deletions(-) create mode 100644 ethportal-peertest/src/scenarios/state.rs diff --git a/Cargo.lock b/Cargo.lock index bb71a557e..9afd32c8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2593,6 +2593,7 @@ dependencies = [ "rand 0.8.5", "reth-ipc", "rpc", + "serde", "serde_json", "serde_yaml", "tempfile", diff --git a/ethportal-peertest/Cargo.toml b/ethportal-peertest/Cargo.toml index 56e26c6dd..d6ac47a48 100644 --- a/ethportal-peertest/Cargo.toml +++ b/ethportal-peertest/Cargo.toml @@ -23,6 +23,7 @@ jsonrpsee = {version="0.20.0", features = ["async-client", "client", "macros", " rand = "0.8.4" reth-ipc = { tag = "v0.1.0-alpha.10", git = "https://github.com/paradigmxyz/reth.git"} rpc = { path = "../rpc" } +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.89" serde_yaml = "0.9" tempfile = "3.3.0" diff --git a/ethportal-peertest/src/scenarios/mod.rs b/ethportal-peertest/src/scenarios/mod.rs index 767a9245c..345d93b7a 100644 --- a/ethportal-peertest/src/scenarios/mod.rs +++ b/ethportal-peertest/src/scenarios/mod.rs @@ -5,5 +5,6 @@ pub mod find; pub mod gossip; pub mod offer_accept; pub mod paginate; +pub mod state; pub mod utp; pub mod validation; diff --git a/ethportal-peertest/src/scenarios/state.rs b/ethportal-peertest/src/scenarios/state.rs new file mode 100644 index 000000000..13d0caed9 --- /dev/null +++ b/ethportal-peertest/src/scenarios/state.rs @@ -0,0 +1,97 @@ +use crate::{ + utils::{ + fixtures_state_account_trie_node, fixtures_state_contract_bytecode, + fixtures_state_contract_storage_trie_node, fixtures_state_recursive_gossip, + wait_for_state_content, StateFixture, + }, + Peertest, PeertestNode, +}; +use ethportal_api::{ + jsonrpsee::async_client::Client, types::content_value::state::TrieNode, StateContentValue, + StateNetworkApiClient, +}; +use tracing::info; + +pub async fn test_state_offer_account_trie_node(peertest: &Peertest, target: &Client) { + for fixture in fixtures_state_account_trie_node() { + info!( + "Testing offering AccountTrieNode for key: {:?}", + fixture.content_data.key + ); + test_state_offer(&fixture, target, &peertest.bootnode).await; + } +} + +pub async fn test_state_gossip_contract_storage_trie_node(peertest: &Peertest, target: &Client) { + for fixture in fixtures_state_contract_storage_trie_node() { + info!( + "Testing offering ContractStorageTrieNode for key: {:?}", + fixture.content_data.key + ); + test_state_offer(&fixture, target, &peertest.bootnode).await; + } +} + +pub async fn test_state_gossip_contract_bytecode(peertest: &Peertest, target: &Client) { + for fixture in fixtures_state_contract_bytecode() { + info!( + "Testing offering ContractBytecode for key: {:?}", + fixture.content_data.key + ); + test_state_offer(&fixture, target, &peertest.bootnode).await; + } +} + +async fn test_state_offer(fixture: &StateFixture, target: &Client, peer: &PeertestNode) { + target + .offer( + peer.enr.clone(), + fixture.content_data.key.clone(), + Some(fixture.content_data.offer_value.clone()), + ) + .await + .unwrap(); + + let lookup_content_value = + wait_for_state_content(&peer.ipc_client, fixture.content_data.key.clone()).await; + assert_eq!(lookup_content_value, fixture.content_data.lookup_value); +} + +pub async fn test_state_recursive_gossip(peertest: &Peertest, target: &Client) { + let _ = target.ping(peertest.bootnode.enr.clone()).await.unwrap(); + + for fixture in fixtures_state_recursive_gossip().unwrap() { + let (first_key, first_value) = &fixture.key_value_pairs.first().unwrap(); + info!( + "Testing recursive gossip starting with key: {:?}", + first_key + ); + + target + .gossip(first_key.clone(), first_value.clone()) + .await + .unwrap(); + + // Verify that every key/value is fully propagated + for (key, value) in fixture.key_value_pairs { + let expected_lookup_trie_node = match value { + StateContentValue::AccountTrieNodeWithProof(value) => { + value.proof.last().unwrap().clone() + } + StateContentValue::ContractStorageTrieNodeWithProof(value) => { + value.storage_proof.last().unwrap().clone() + } + _ => panic!("Unexpected state content value: {value:?}"), + }; + let expected_lookup_value = StateContentValue::TrieNode(TrieNode { + node: expected_lookup_trie_node, + }); + + assert_eq!( + wait_for_state_content(target, key.clone()).await, + expected_lookup_value, + "Expecting lookup for {key:?} to return {expected_lookup_value:?}" + ); + } + } +} diff --git a/ethportal-peertest/src/utils.rs b/ethportal-peertest/src/utils.rs index 40107f38d..aa81ab8e3 100644 --- a/ethportal-peertest/src/utils.rs +++ b/ethportal-peertest/src/utils.rs @@ -1,36 +1,50 @@ +use std::fs; + +use futures::{Future, TryFutureExt}; use tracing::error; use anyhow::Result; use serde_yaml::Value; -use std::fs; +use ureq::serde::Deserialize; use ethportal_api::{ BeaconContentKey, BeaconContentValue, BeaconNetworkApiClient, HistoryContentKey, - HistoryContentValue, HistoryNetworkApiClient, + HistoryContentValue, HistoryNetworkApiClient, StateContentKey, StateContentValue, + StateNetworkApiClient, }; -/// Wait for the history content to be transferred -pub async fn wait_for_history_content( - ipc_client: &P, - content_key: HistoryContentKey, -) -> HistoryContentValue { - let mut received_content_value = ipc_client.local_content(content_key.clone()).await; - - let mut counter = 0; - +pub async fn wait_for_successful_result(f: impl Fn() -> Fut) -> O +where + Fut: Future>, +{ // If content is absent an error will be returned. - while counter < 5 { - let message = match received_content_value { + for counter in 0..60 { + let result = f().await; + match result { Ok(val) => return val, - Err(e) => format!("received an error {e}"), + Err(e) => { + if counter > 0 { + error!("Retry attempt #{counter} after 0.5s, because we received an error {e}") + } + } }; - error!("Retrying after 0.5s, because {message}"); tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - received_content_value = ipc_client.local_content(content_key.clone()).await; - counter += 1; } - received_content_value.unwrap() + panic!("Retried too many times"); +} + +/// Wait for the history content to be transferred +pub async fn wait_for_history_content( + ipc_client: &P, + content_key: HistoryContentKey, +) -> HistoryContentValue { + wait_for_successful_result(|| { + ipc_client + .local_content(content_key.clone()) + .map_err(anyhow::Error::from) + }) + .await } /// Wait for the beacon content to be transferred @@ -38,23 +52,25 @@ pub async fn wait_for_beacon_content BeaconContentValue { - let mut received_content_value = ipc_client.local_content(content_key.clone()).await; - - let mut counter = 0; - - // If content is absent an error will be returned. - while counter < 60 { - let message = match received_content_value { - Ok(val) => return val, - Err(e) => format!("received an error {e}"), - }; - counter += 1; - error!("Retry attempt #{counter} in 0.5s to find beacon content, because {message}"); - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - received_content_value = ipc_client.local_content(content_key.clone()).await; - } + wait_for_successful_result(|| { + ipc_client + .local_content(content_key.clone()) + .map_err(anyhow::Error::from) + }) + .await +} - received_content_value.unwrap() +/// Wait for the state content to be transferred +pub async fn wait_for_state_content( + ipc_client: &P, + content_key: StateContentKey, +) -> StateContentValue { + wait_for_successful_result(|| { + ipc_client + .local_content(content_key.clone()) + .map_err(anyhow::Error::from) + }) + .await } fn read_history_content_key_value( @@ -101,3 +117,95 @@ pub fn fixture_block_body() -> (HistoryContentKey, HistoryContentValue) { pub fn fixture_receipts() -> (HistoryContentKey, HistoryContentValue) { read_fixture("portal-spec-tests/tests/mainnet/history/receipts/14764013.yaml") } + +#[derive(Debug, Clone, Deserialize)] +pub struct StateContentData { + #[serde(rename = "content_key")] + pub key: StateContentKey, + #[serde(rename = "content_value_offer")] + pub offer_value: StateContentValue, + #[serde(rename = "content_value_retrieval")] + pub lookup_value: StateContentValue, +} + +#[derive(Debug, Clone)] +pub struct StateFixture { + pub content_data: StateContentData, + pub recursive_gossip: Option, +} + +fn read_state_fixture_from_file(file_name: &str) -> Result> { + let yaml_content = fs::read_to_string(file_name)?; + let value: Value = serde_yaml::from_str(&yaml_content)?; + + let mut result = vec![]; + + for fixture in value.as_sequence().unwrap() { + result.push(StateFixture { + content_data: StateContentData::deserialize(fixture)?, + recursive_gossip: fixture.get("recursive_gossip").and_then(|value| { + if value.is_null() { + None + } else { + Some(StateContentData::deserialize(value).unwrap()) + } + }), + }); + } + Ok(result) +} + +fn read_state_fixture(file_name: &str) -> Vec { + read_state_fixture_from_file(file_name) + .unwrap_or_else(|err| panic!("Error reading fixture: {err}")) +} + +pub fn fixtures_state_account_trie_node() -> Vec { + read_state_fixture("portal-spec-tests/tests/mainnet/state/validation/account_trie_node.yaml") +} + +pub fn fixtures_state_contract_storage_trie_node() -> Vec { + read_state_fixture( + "portal-spec-tests/tests/mainnet/state/validation/contract_storage_trie_node.yaml", + ) +} + +pub fn fixtures_state_contract_bytecode() -> Vec { + read_state_fixture("portal-spec-tests/tests/mainnet/state/validation/contract_bytecode.yaml") +} + +pub struct StateRecursiveGossipFixture { + pub key_value_pairs: Vec<(StateContentKey, StateContentValue)>, +} + +pub fn fixtures_state_recursive_gossip() -> Result> { + let yaml_content = fs::read_to_string( + "portal-spec-tests/tests/mainnet/state/validation/recursive_gossip.yaml", + )?; + let value: Value = serde_yaml::from_str(&yaml_content)?; + + let mut result = vec![]; + + for fixture in value.as_sequence().unwrap() { + result.push(StateRecursiveGossipFixture { + key_value_pairs: fixture + .as_sequence() + .unwrap() + .iter() + .map(|key_value_container| { + let key = StateContentKey::deserialize( + key_value_container.get("content_key").unwrap(), + ) + .unwrap(); + let value = StateContentValue::deserialize( + key_value_container.get("content_value").unwrap(), + ) + .unwrap(); + (key, value) + }) + .collect(), + }) + } + + Ok(result) +} diff --git a/tests/self_peertest.rs b/tests/self_peertest.rs index 87bd79d3a..95e7477fd 100644 --- a/tests/self_peertest.rs +++ b/tests/self_peertest.rs @@ -175,6 +175,43 @@ async fn peertest_trace_recursive_utp() { handle.stop().unwrap(); } +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn peertest_state_offer_account_trie_node() { + let (peertest, target, handle) = setup_peertest().await; + peertest::scenarios::state::test_state_offer_account_trie_node(&peertest, &target).await; + peertest.exit_all_nodes(); + handle.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn peertest_state_offer_contract_storage_trie_node() { + let (peertest, target, handle) = setup_peertest().await; + peertest::scenarios::state::test_state_gossip_contract_storage_trie_node(&peertest, &target) + .await; + peertest.exit_all_nodes(); + handle.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn peertest_state_offer_contract_bytecode() { + let (peertest, target, handle) = setup_peertest().await; + peertest::scenarios::state::test_state_gossip_contract_bytecode(&peertest, &target).await; + peertest.exit_all_nodes(); + handle.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn peertest_state_recursive_gossip() { + let (peertest, target, handle) = setup_peertest().await; + peertest::scenarios::state::test_state_recursive_gossip(&peertest, &target).await; + peertest.exit_all_nodes(); + handle.stop().unwrap(); +} + async fn setup_peertest() -> (peertest::Peertest, Client, RpcServerHandle) { utils::init_tracing(); // Run a client, as a buddy peer for ping tests, etc.