diff --git a/.config/nextest.toml b/.config/nextest.toml index d0246ed52d..6e83f34a16 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -19,11 +19,6 @@ threads-required = 8 filter = "package(tempo-node) & binary(it)" threads-required = 4 -[[profile.ci.overrides]] -filter = "test(sync::can_restart_after_joining_from_snapshot)" -slow-timeout = { period = "60s", terminate-after = 4, on-timeout = "pass" } -success-output = "final" - # Local development defaults (same constraints apply) [[profile.default.overrides]] filter = "package(tempo-e2e)" @@ -32,8 +27,3 @@ threads-required = 8 [[profile.default.overrides]] filter = "package(tempo-node) & binary(it)" threads-required = 4 - -[[profile.default.overrides]] -filter = "test(sync::can_restart_after_joining_from_snapshot)" -slow-timeout = { period = "60s", terminate-after = 4, on-timeout = "pass" } -success-output = "final" diff --git a/Cargo.lock b/Cargo.lock index d7141e7439..5e3d85354a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11942,6 +11942,7 @@ dependencies = [ "itertools 0.14.0", "jsonrpsee", "rand 0.8.5", + "reth-chainspec", "reth-db", "reth-ethereum", "reth-network-peers", diff --git a/crates/commonware-node/src/dkg/manager/actor/mod.rs b/crates/commonware-node/src/dkg/manager/actor/mod.rs index 7517890fcf..b46c20c3e8 100644 --- a/crates/commonware-node/src/dkg/manager/actor/mod.rs +++ b/crates/commonware-node/src/dkg/manager/actor/mod.rs @@ -1755,8 +1755,8 @@ fn determine_next_players( fields( %digest, ), - err, - ret(level = Level::WARN) + err(level = Level::WARN) + ret, )] pub(crate) fn read_re_dkg_epoch( node: &TempoFullNode, diff --git a/crates/commonware-node/src/dkg/manager/actor/state.rs b/crates/commonware-node/src/dkg/manager/actor/state.rs index dd0097c10b..60f026b362 100644 --- a/crates/commonware-node/src/dkg/manager/actor/state.rs +++ b/crates/commonware-node/src/dkg/manager/actor/state.rs @@ -766,14 +766,13 @@ impl Read for State { buf: &mut impl bytes::Buf, cfg: &Self::Cfg, ) -> Result { - let range_cfg = RangeCfg::from(1..=(u16::MAX as usize)); Ok(Self { epoch: ReadExt::read(buf)?, seed: ReadExt::read(buf)?, output: Read::read_cfg(buf, cfg)?, share: ReadExt::read(buf)?, - players: Read::read_cfg(buf, &(range_cfg, ()))?, - syncers: Read::read_cfg(buf, &(range_cfg, ()))?, + players: Read::read_cfg(buf, &(RangeCfg::from(1..=(u16::MAX as usize)), ()))?, + syncers: Read::read_cfg(buf, &(RangeCfg::from(0..=(u16::MAX as usize)), ()))?, is_full_dkg: ReadExt::read(buf)?, }) } @@ -1338,6 +1337,7 @@ impl ReducedBlock { #[cfg(test)] mod tests { use super::*; + use commonware_codec::Encode as _; use commonware_cryptography::{ bls12381::{dkg, primitives::sharing::Mode}, ed25519::PrivateKey, @@ -1394,6 +1394,30 @@ mod tests { } } + #[test] + fn state_round_trip_with() { + let executor = deterministic::Runner::default(); + executor.start(|mut context| async move { + let state = make_test_state(&mut context, 0); + let mut bytes = state.encode(); + assert_eq!( + state, + State::read_cfg(&mut bytes, &NZU32!(u32::MAX)).unwrap(), + ); + + let state_without_syncers = { + let mut s = make_test_state(&mut context, 0); + s.syncers = Default::default(); + s + }; + let mut bytes = state_without_syncers.encode(); + assert_eq!( + state_without_syncers, + State::read_cfg(&mut bytes, &NZU32!(u32::MAX)).unwrap(), + ); + }); + } + #[test] fn states_migration_migrates_last_two() { let executor = deterministic::Runner::default(); diff --git a/crates/e2e/Cargo.toml b/crates/e2e/Cargo.toml index 66d17b35be..a6dcab7c9e 100644 --- a/crates/e2e/Cargo.toml +++ b/crates/e2e/Cargo.toml @@ -32,6 +32,7 @@ commonware-utils.workspace = true itertools.workspace = true eyre.workspace = true +reth-chainspec.workspace = true reth-db.workspace = true reth-ethereum = { workspace = true, features = [ "node", diff --git a/crates/e2e/src/execution_runtime.rs b/crates/e2e/src/execution_runtime.rs index 62275c8883..0e2b55526d 100644 --- a/crates/e2e/src/execution_runtime.rs +++ b/crates/e2e/src/execution_runtime.rs @@ -1,6 +1,6 @@ //! The environment to launch tempo execution nodes in. use std::{ - net::SocketAddr, + net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, sync::Arc, time::Duration, @@ -14,12 +14,17 @@ use alloy::{ }; use alloy_evm::{EvmFactory as _, revm::inspector::JournalExt as _}; use alloy_genesis::{Genesis, GenesisAccount}; -use alloy_primitives::{Address, B256}; +use alloy_primitives::{Address, B256, Keccak256, U256}; use commonware_codec::Encode; -use commonware_cryptography::ed25519::PublicKey; +use commonware_cryptography::{ + Signer, + ed25519::{PrivateKey, PublicKey, Signature}, +}; +use commonware_runtime::Clock; use commonware_utils::ordered; use eyre::{OptionExt as _, WrapErr as _}; use futures::{StreamExt, future::BoxFuture}; +use reth_chainspec::EthChainSpec; use reth_db::mdbx::DatabaseEnv; use reth_ethereum::{ evm::{ @@ -55,10 +60,16 @@ use tempo_node::{ rpc::consensus::{TempoConsensusApiServer, TempoConsensusRpc}, }; use tempo_precompiles::{ - VALIDATOR_CONFIG_ADDRESS, + VALIDATOR_CONFIG_ADDRESS, VALIDATOR_CONFIG_V2_ADDRESS, storage::StorageCtx, validator_config::{IValidatorConfig, ValidatorConfig}, + validator_config_v2::{ + IValidatorConfigV2, VALIDATOR_NS_ADD, VALIDATOR_NS_ROTATE, ValidatorConfigV2, + }, }; +use tokio::sync::oneshot; + +use crate::{ConsensusNodeConfig, TestingNode}; const ADMIN_INDEX: u32 = 0; const VALIDATOR_START_INDEX: u32 = 1; @@ -70,7 +81,8 @@ pub const TEST_MNEMONIC: &str = "test test test test test test test test test te pub struct Builder { epoch_length: Option, initial_dkg_outcome: Option, - validators: Option>, + t2_time: Option, + validators: Option>, } impl Builder { @@ -78,6 +90,7 @@ impl Builder { Self { epoch_length: None, initial_dkg_outcome: None, + t2_time: None, validators: None, } } @@ -96,41 +109,62 @@ impl Builder { } } - pub fn with_validators( - self, - validators: ordered::Map, - ) -> Self { + pub fn with_validators(self, validators: ordered::Map) -> Self { Self { validators: Some(validators), ..self } } + pub fn with_t2_time(self, t2_time: u64) -> Self { + Self { + t2_time: Some(t2_time), + ..self + } + } + pub fn launch(self) -> eyre::Result { let Self { epoch_length, initial_dkg_outcome, + t2_time, validators, } = self; let epoch_length = epoch_length.ok_or_eyre("must specify epoch length")?; let initial_dkg_outcome = initial_dkg_outcome.ok_or_eyre("must specify initial DKG outcome")?; + let t2_time = t2_time.ok_or_eyre("must specify t2 time")?; let validators = validators.ok_or_eyre("must specify validators")?; - assert!(initial_dkg_outcome.next_players() == validators.keys(),); + assert_eq!( + initial_dkg_outcome.next_players(), + &ordered::Set::from_iter_dedup( + validators + .iter_pairs() + .filter_map(|(key, val)| val.share.is_some().then_some(key.clone())) + ) + ); let mut genesis = genesis(); genesis .config .extra_fields .insert_value("epochLength".to_string(), epoch_length) - .wrap_err("failed to insert epoch length into genesis")?; + .unwrap(); + genesis + .config + .extra_fields + .insert_value("t2Time".to_string(), t2_time) + .unwrap(); genesis.extra_data = initial_dkg_outcome.encode().to_vec().into(); - let mut evm = setup_tempo_evm(); + // Just remove whatever is already written into chainspec. + genesis.alloc.remove(&VALIDATOR_CONFIG_ADDRESS); + genesis.alloc.remove(&VALIDATOR_CONFIG_V2_ADDRESS); + let mut evm = setup_tempo_evm(genesis.config.chain_id); { let cx = evm.ctx_mut(); StorageCtx::enter_evm(&mut cx.journaled_state, &cx.block, &cx.cfg, &cx.tx, || { @@ -138,23 +172,64 @@ impl Builder { let mut validator_config = ValidatorConfig::new(); validator_config .initialize(admin()) - .wrap_err("Failed to initialize validator config") + .wrap_err("failed to initialize validator config v1") .unwrap(); - for (peer, (net_addr, chain_addr)) in validators.iter_pairs() { - validator_config - .add_validator( - admin(), - IValidatorConfig::addValidatorCall { - newValidatorAddress: *chain_addr, - publicKey: peer.encode().as_ref().try_into().unwrap(), - active: true, - inboundAddress: net_addr.to_string(), - outboundAddress: net_addr.to_string(), - }, - ) + let mut validator_config_v2 = ValidatorConfigV2::new(); + if t2_time == 0 { + validator_config_v2 + .initialize(admin()) + .wrap_err("failed to initialize validator config v2") .unwrap(); } + + for (public_key, validator) in validators { + if let ConsensusNodeConfig { + address, + ingress, + egress, + private_key, + share: Some(_), + } = validator + { + validator_config + .add_validator( + admin(), + IValidatorConfig::addValidatorCall { + newValidatorAddress: address, + publicKey: public_key.encode().as_ref().try_into().unwrap(), + active: true, + inboundAddress: ingress.to_string(), + outboundAddress: egress.to_string(), + }, + ) + .unwrap(); + + if t2_time == 0 { + validator_config_v2 + .add_validator( + admin(), + IValidatorConfigV2::addValidatorCall { + validatorAddress: address, + publicKey: public_key.encode().as_ref().try_into().unwrap(), + ingress: ingress.to_string(), + egress: egress.ip().to_string(), + signature: sign_add_validator_args( + genesis.config.chain_id, + &private_key, + address, + ingress, + egress.ip(), + ) + .encode() + .to_vec() + .into(), + }, + ) + .unwrap() + } + } + } }) } @@ -348,7 +423,7 @@ impl ExecutionRuntime { public_key, addr, response, - } = *add_validator; + } = add_validator; let provider = ProviderBuilder::new() .wallet(wallet.clone()) .connect_http(http_url); @@ -370,13 +445,57 @@ impl ExecutionRuntime { .unwrap(); let _ = response.send(receipt); } + Message::AddValidatorV2(add_validator_v2) => { + let AddValidatorV2 { + http_url, + private_key, + address, + ingress, + egress, + response, + } = add_validator_v2; + let provider = ProviderBuilder::new() + .wallet(wallet.clone()) + .connect_http(http_url); + let validator_config = + IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider); + let receipt = validator_config + .addValidator( + address, + private_key + .public_key() + .encode() + .as_ref() + .try_into() + .unwrap(), + ingress.to_string(), + egress.to_string(), + sign_add_validator_args( + EthChainSpec::chain(&chain_spec).id(), + &private_key, + address, + ingress, + egress, + ) + .encode() + .to_vec() + .into(), + ) + .send() + .await + .unwrap() + .get_receipt() + .await + .unwrap(); + let _ = response.send(receipt); + } Message::ChangeValidatorStatus(change_validator_status) => { let ChangeValidatorStatus { http_url, active, index, response, - } = *change_validator_status; + } = change_validator_status; let provider = ProviderBuilder::new() .wallet(wallet.clone()) .connect_http(http_url); @@ -392,12 +511,138 @@ impl ExecutionRuntime { .unwrap(); let _ = response.send(receipt); } + Message::DeactivateValidatorV2(deacivate_validator_v2) => { + let DeactivateValidatorV2 { + http_url, + address, + response, + } = deacivate_validator_v2; + let provider = ProviderBuilder::new() + .wallet(wallet.clone()) + .connect_http(http_url); + let validator_config_v2 = + IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider); + let receipt = validator_config_v2 + .deactivateValidator(address) + .send() + .await + .unwrap() + .get_receipt() + .await + .unwrap(); + let _ = response.send(receipt); + } + Message::GetV1Validators(get_v1_validators) => { + let GetV1Validators { http_url, response } = get_v1_validators; + let provider = ProviderBuilder::new() + .wallet(wallet.clone()) + .connect_http(http_url); + let validator_config = + IValidatorConfig::new(VALIDATOR_CONFIG_ADDRESS, provider); + let validators = validator_config.getValidators().call().await.unwrap(); + let _ = response.send(validators); + } + Message::GetV2Validators(get_v2_validators) => { + let GetV2Validators { http_url, response } = get_v2_validators; + let provider = ProviderBuilder::new() + .wallet(wallet.clone()) + .connect_http(http_url); + let validator_config = + IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider); + let validators = + validator_config.getAllValidators().call().await.unwrap(); + let _ = response.send(validators); + } + Message::InitializeIfMigrated(InitializeIfMigrated { + http_url, + response, + }) => { + let provider = ProviderBuilder::new() + .wallet(wallet.clone()) + .connect_http(http_url); + let validator_config_v2 = + IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider); + let receipt = validator_config_v2 + .initializeIfMigrated() + .send() + .await + .unwrap() + .get_receipt() + .await + .unwrap(); + let _ = response.send(receipt); + } + Message::MigrateValidator(migrate_validator) => { + let MigrateValidator { + http_url, + index, + response, + } = migrate_validator; + let provider = ProviderBuilder::new() + .wallet(wallet.clone()) + .connect_http(http_url); + let validator_config_v2 = + IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider); + let receipt = validator_config_v2 + .migrateValidator(index) + .send() + .await + .unwrap() + .get_receipt() + .await + .unwrap(); + let _ = response.send(receipt); + } + Message::RotateValidator(rotate_validator) => { + let RotateValidator { + http_url, + private_key, + address, + ingress, + egress, + response, + } = rotate_validator; + let provider = ProviderBuilder::new() + .wallet(wallet.clone()) + .connect_http(http_url); + let validator_config = + IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider); + let receipt = validator_config + .rotateValidator( + address, + private_key + .public_key() + .encode() + .as_ref() + .try_into() + .unwrap(), + ingress.to_string(), + egress.to_string(), + sign_rotate_validator_args( + EthChainSpec::chain(&chain_spec).id(), + &private_key, + address, + ingress, + egress, + ) + .encode() + .to_vec() + .into(), + ) + .send() + .await + .unwrap() + .get_receipt() + .await + .unwrap(); + let _ = response.send(receipt); + } Message::SetNextFullDkgCeremony(set_next_full_dkg_ceremony) => { let SetNextFullDkgCeremony { http_url, epoch, response, - } = *set_next_full_dkg_ceremony; + } = set_next_full_dkg_ceremony; let provider = ProviderBuilder::new() .wallet(wallet.clone()) .connect_http(http_url); @@ -413,6 +658,27 @@ impl ExecutionRuntime { .unwrap(); let _ = response.send(receipt); } + Message::SetNextFullDkgCeremonyV2(set_next_full_dkg_ceremony_v2) => { + let SetNextFullDkgCeremonyV2 { + http_url, + epoch, + response, + } = set_next_full_dkg_ceremony_v2; + let provider = ProviderBuilder::new() + .wallet(wallet.clone()) + .connect_http(http_url); + let validator_config = + IValidatorConfigV2::new(VALIDATOR_CONFIG_V2_ADDRESS, provider); + let receipt = validator_config + .setNextFullDkgCeremony(epoch) + .send() + .await + .unwrap() + .get_receipt() + .await + .unwrap(); + let _ = response.send(receipt); + } Message::SpawnNode { name, config, @@ -467,7 +733,7 @@ impl ExecutionRuntime { public_key: PublicKey, addr: SocketAddr, ) -> eyre::Result { - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.to_runtime .send( AddValidator { @@ -484,13 +750,36 @@ impl ExecutionRuntime { .wrap_err("the execution runtime dropped the response channel before sending a receipt") } + pub async fn add_validator_v2( + &self, + http_url: Url, + validator: &TestingNode, + ) -> eyre::Result { + let (tx, rx) = oneshot::channel(); + self.to_runtime + .send( + AddValidatorV2 { + http_url, + private_key: validator.private_key().clone(), + address: validator.chain_address, + ingress: validator.ingress(), + egress: validator.egress(), + response: tx, + } + .into(), + ) + .map_err(|_| eyre::eyre!("the execution runtime went away"))?; + rx.await + .wrap_err("the execution runtime dropped the response channel before sending a receipt") + } + pub async fn change_validator_status( &self, http_url: Url, index: u64, active: bool, ) -> eyre::Result { - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.to_runtime .send( ChangeValidatorStatus { @@ -506,12 +795,120 @@ impl ExecutionRuntime { .wrap_err("the execution runtime dropped the response channel before sending a receipt") } + pub async fn deactivate_validator_v2( + &self, + http_url: Url, + validator: &TestingNode, + ) -> eyre::Result { + let (tx, rx) = oneshot::channel(); + self.to_runtime + .send( + DeactivateValidatorV2 { + http_url, + address: validator.chain_address, + response: tx, + } + .into(), + ) + .map_err(|_| eyre::eyre!("the execution runtime went away"))?; + rx.await + .wrap_err("the execution runtime dropped the response channel before sending a receipt") + } + + pub async fn get_v1_validators( + &self, + http_url: Url, + ) -> eyre::Result> { + let (tx, rx) = oneshot::channel(); + self.to_runtime + .send( + GetV1Validators { + http_url, + response: tx, + } + .into(), + ) + .map_err(|_| eyre::eyre!("the execution runtime went away"))?; + rx.await + .wrap_err("the execution runtime dropped the response channel before sending a receipt") + } + + pub async fn get_v2_validators( + &self, + http_url: Url, + ) -> eyre::Result> { + let (tx, rx) = oneshot::channel(); + self.to_runtime + .send( + GetV2Validators { + http_url, + response: tx, + } + .into(), + ) + .map_err(|_| eyre::eyre!("the execution runtime went away"))?; + rx.await + .wrap_err("the execution runtime dropped the response channel before sending a receipt") + } + + pub async fn initialize_if_migrated(&self, http_url: Url) -> eyre::Result { + let (response, rx) = oneshot::channel(); + self.to_runtime + .send(InitializeIfMigrated { http_url, response }.into()) + .map_err(|_| eyre::eyre!("the execution runtime went away"))?; + rx.await + .wrap_err("the execution runtime dropped the response channel before sending a receipt") + } + + pub async fn migrate_validator( + &self, + http_url: Url, + index: u64, + ) -> eyre::Result { + let (response, rx) = oneshot::channel(); + self.to_runtime + .send( + MigrateValidator { + http_url, + index, + response, + } + .into(), + ) + .map_err(|_| eyre::eyre!("the execution runtime went away"))?; + rx.await + .wrap_err("the execution runtime dropped the response channel before sending a receipt") + } + + pub async fn rotate_validator( + &self, + http_url: Url, + validator: &TestingNode, + ) -> eyre::Result { + let (response, rx) = oneshot::channel(); + self.to_runtime + .send( + RotateValidator { + http_url, + private_key: validator.private_key().clone(), + address: validator.chain_address, + ingress: validator.ingress(), + egress: validator.egress(), + response, + } + .into(), + ) + .map_err(|_| eyre::eyre!("the execution runtime went away"))?; + rx.await + .wrap_err("the execution runtime dropped the response channel before sending a receipt") + } + pub async fn set_next_full_dkg_ceremony( &self, http_url: Url, epoch: u64, ) -> eyre::Result { - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.to_runtime .send( SetNextFullDkgCeremony { @@ -526,6 +923,26 @@ impl ExecutionRuntime { .wrap_err("the execution runtime dropped the response channel before sending a receipt") } + pub async fn set_next_full_dkg_ceremony_v2( + &self, + http_url: Url, + epoch: u64, + ) -> eyre::Result { + let (tx, rx) = oneshot::channel(); + self.to_runtime + .send( + SetNextFullDkgCeremonyV2 { + http_url, + epoch, + response: tx, + } + .into(), + ) + .map_err(|_| eyre::eyre!("the execution runtime went away"))?; + rx.await + .wrap_err("the execution runtime dropped the response channel before sending a receipt") + } + pub async fn remove_validator( &self, http_url: Url, @@ -533,7 +950,7 @@ impl ExecutionRuntime { public_key: PublicKey, addr: SocketAddr, ) -> eyre::Result { - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.to_runtime .send( AddValidator { @@ -559,7 +976,7 @@ impl ExecutionRuntime { Fut: std::future::Future + Send + 'static, T: Send + 'static, { - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.to_runtime .send(Message::RunAsync(Box::pin(async move { let result = fut.await; @@ -604,7 +1021,7 @@ impl ExecutionRuntimeHandle { config: ExecutionNodeConfig, database: DatabaseEnv, ) -> eyre::Result { - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.to_runtime .send(Message::SpawnNode { name: name.to_string(), @@ -773,14 +1190,22 @@ pub async fn launch_execution_node>( } enum Message { - AddValidator(Box), - ChangeValidatorStatus(Box), - SetNextFullDkgCeremony(Box), + AddValidator(AddValidator), + AddValidatorV2(AddValidatorV2), + ChangeValidatorStatus(ChangeValidatorStatus), + DeactivateValidatorV2(DeactivateValidatorV2), + GetV1Validators(GetV1Validators), + GetV2Validators(GetV2Validators), + InitializeIfMigrated(InitializeIfMigrated), + MigrateValidator(MigrateValidator), + RotateValidator(RotateValidator), + SetNextFullDkgCeremony(SetNextFullDkgCeremony), + SetNextFullDkgCeremonyV2(SetNextFullDkgCeremonyV2), SpawnNode { name: String, config: ExecutionNodeConfig, database: DatabaseEnv, - response: tokio::sync::oneshot::Sender, + response: oneshot::Sender, }, RunAsync(BoxFuture<'static, ()>), Stop, @@ -788,19 +1213,67 @@ enum Message { impl From for Message { fn from(value: AddValidator) -> Self { - Self::AddValidator(value.into()) + Self::AddValidator(value) + } +} + +impl From for Message { + fn from(value: AddValidatorV2) -> Self { + Self::AddValidatorV2(value) } } impl From for Message { fn from(value: ChangeValidatorStatus) -> Self { - Self::ChangeValidatorStatus(value.into()) + Self::ChangeValidatorStatus(value) + } +} + +impl From for Message { + fn from(value: DeactivateValidatorV2) -> Self { + Self::DeactivateValidatorV2(value) + } +} + +impl From for Message { + fn from(value: GetV1Validators) -> Self { + Self::GetV1Validators(value) + } +} + +impl From for Message { + fn from(value: GetV2Validators) -> Self { + Self::GetV2Validators(value) + } +} + +impl From for Message { + fn from(value: InitializeIfMigrated) -> Self { + Self::InitializeIfMigrated(value) + } +} + +impl From for Message { + fn from(value: MigrateValidator) -> Self { + Self::MigrateValidator(value) + } +} + +impl From for Message { + fn from(value: RotateValidator) -> Self { + Self::RotateValidator(value) } } impl From for Message { fn from(value: SetNextFullDkgCeremony) -> Self { - Self::SetNextFullDkgCeremony(value.into()) + Self::SetNextFullDkgCeremony(value) + } +} + +impl From for Message { + fn from(value: SetNextFullDkgCeremonyV2) -> Self { + Self::SetNextFullDkgCeremonyV2(value) } } @@ -811,7 +1284,18 @@ struct AddValidator { address: Address, public_key: PublicKey, addr: SocketAddr, - response: tokio::sync::oneshot::Sender, + response: oneshot::Sender, +} + +#[derive(Debug)] +struct AddValidatorV2 { + /// URL of the node to send this to. + http_url: Url, + private_key: PrivateKey, + address: Address, + ingress: SocketAddr, + egress: IpAddr, + response: oneshot::Sender, } #[derive(Debug)] @@ -820,7 +1304,51 @@ struct ChangeValidatorStatus { http_url: Url, index: u64, active: bool, - response: tokio::sync::oneshot::Sender, + response: oneshot::Sender, +} + +#[derive(Debug)] +struct DeactivateValidatorV2 { + /// URL of the node to send this to. + http_url: Url, + address: Address, + response: oneshot::Sender, +} + +struct GetV1Validators { + http_url: Url, + response: oneshot::Sender>, +} + +struct GetV2Validators { + http_url: Url, + response: oneshot::Sender>, +} + +#[derive(Debug)] +struct InitializeIfMigrated { + /// URL of the node to send this to. + http_url: Url, + response: oneshot::Sender, +} + +#[derive(Debug)] +struct MigrateValidator { + /// URL of the node to send this to. + http_url: Url, + index: u64, + response: oneshot::Sender, +} + +#[derive(Debug)] +struct RotateValidator { + /// URL of the node to send this to. + http_url: Url, + private_key: PrivateKey, + address: Address, + ingress: SocketAddr, + egress: IpAddr, + response: oneshot::Sender, } #[derive(Debug)] @@ -828,7 +1356,15 @@ struct SetNextFullDkgCeremony { /// URL of the node to send this to. http_url: Url, epoch: u64, - response: tokio::sync::oneshot::Sender, + response: oneshot::Sender, +} + +#[derive(Debug)] +struct SetNextFullDkgCeremonyV2 { + /// URL of the node to send this to. + http_url: Url, + epoch: u64, + response: oneshot::Sender, } pub fn admin() -> Address { @@ -843,9 +1379,46 @@ pub fn address(index: u32) -> Address { secret_key_to_address(MnemonicBuilder::from_phrase_nth(TEST_MNEMONIC, index).credential()) } -fn setup_tempo_evm() -> TempoEvm> { +fn setup_tempo_evm(chain_id: u64) -> TempoEvm> { let db = CacheDB::default(); - let env = EvmEnv::default(); + // revm sets timestamp to 1 by default, override it to 0 for genesis initializations + let mut env = EvmEnv::default().with_timestamp(U256::ZERO); + env.cfg_env.chain_id = chain_id; + let factory = TempoEvmFactory::default(); factory.create_evm(db, env) } + +fn sign_add_validator_args( + chain_id: u64, + key: &PrivateKey, + address: Address, + ingress: SocketAddr, + egress: IpAddr, +) -> Signature { + let mut hasher = Keccak256::new(); + hasher.update(chain_id.to_be_bytes()); + hasher.update(VALIDATOR_CONFIG_V2_ADDRESS.as_slice()); + hasher.update(address.as_slice()); + hasher.update(ingress.to_string().as_bytes()); + hasher.update(egress.to_string().as_bytes()); + let msg = hasher.finalize(); + key.sign(VALIDATOR_NS_ADD, msg.as_slice()) +} + +fn sign_rotate_validator_args( + chain_id: u64, + key: &PrivateKey, + address: Address, + ingress: SocketAddr, + egress: IpAddr, +) -> Signature { + let mut hasher = Keccak256::new(); + hasher.update(chain_id.to_be_bytes()); + hasher.update(VALIDATOR_CONFIG_V2_ADDRESS.as_slice()); + hasher.update(address.as_slice()); + hasher.update(ingress.to_string().as_bytes()); + hasher.update(egress.to_string().as_bytes()); + let msg = hasher.finalize(); + key.sign(VALIDATOR_NS_ROTATE, msg.as_slice()) +} diff --git a/crates/e2e/src/lib.rs b/crates/e2e/src/lib.rs index 26bde61d2d..2b28866de0 100644 --- a/crates/e2e/src/lib.rs +++ b/crates/e2e/src/lib.rs @@ -12,10 +12,15 @@ use std::{iter::repeat_with, net::SocketAddr, time::Duration}; +use alloy::signers::k256::schnorr::CryptoRngCore; +use alloy_primitives::Address; use commonware_consensus::types::Epoch; use commonware_cryptography::{ Signer as _, - bls12381::{dkg, primitives::sharing::Mode}, + bls12381::{ + dkg::{self}, + primitives::{group::Share, sharing::Mode}, + }, ed25519::{PrivateKey, PublicKey}, }; use commonware_math::algebra::Random as _; @@ -45,6 +50,68 @@ mod tests; pub const CONSENSUS_NODE_PREFIX: &str = "consensus"; pub const EXECUTION_NODE_PREFIX: &str = "execution"; +fn generate_consensus_node_config( + rng: &mut impl CryptoRngCore, + signers: u32, + verifiers: u32, +) -> ( + OnchainDkgOutcome, + ordered::Map, +) { + let signer_keys = repeat_with(|| PrivateKey::random(&mut *rng)) + .take(signers as usize) + .collect::>(); + + let (initial_dkg_outcome, shares) = dkg::deal::<_, _, N3f1>( + &mut *rng, + Mode::NonZeroCounter, + ordered::Set::try_from_iter(signer_keys.iter().map(|key| key.public_key())).unwrap(), + ) + .unwrap(); + + let onchain_dkg_outcome = OnchainDkgOutcome { + epoch: Epoch::zero(), + output: initial_dkg_outcome, + next_players: shares.keys().clone(), + is_next_full_dkg: false, + }; + + let verifier_keys = repeat_with(|| PrivateKey::random(&mut *rng)) + .take(verifiers as usize) + .collect::>(); + + let validators = ordered::Map::try_from_iter( + signer_keys + .into_iter() + .chain(verifier_keys) + .enumerate() + .map(|(i, private_key)| { + let public_key = private_key.public_key(); + let config = ConsensusNodeConfig { + address: crate::execution_runtime::validator(i as u32), + ingress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 8000)), + egress: SocketAddr::from(([127, 0, 0, (i + 1) as u8], 0)), + private_key, + share: shares.get_value(&public_key).cloned(), + }; + (public_key, config) + }), + ) + .unwrap(); + + (onchain_dkg_outcome, validators) +} + +/// Configuration for a validator. +#[derive(Clone, Debug)] +pub struct ConsensusNodeConfig { + pub address: Address, + pub ingress: SocketAddr, + pub egress: SocketAddr, + pub private_key: PrivateKey, + pub share: Option, +} + /// The test setup run by [`run`]. #[derive(Clone)] pub struct Setup { @@ -66,6 +133,13 @@ pub struct Setup { /// Whether to connect execution layer nodes directly. pub connect_execution_layer_nodes: bool, + + /// The t2 hardfork time. + /// + /// Validators will only be written into the V2 contract if t2_time == 0. + /// + /// Default: 1. + pub t2_time: u64, } impl Setup { @@ -81,6 +155,7 @@ impl Setup { }, epoch_length: 20, connect_execution_layer_nodes: false, + t2_time: 1, } } @@ -119,6 +194,10 @@ impl Setup { ..self } } + + pub fn t2_time(self, t2_time: u64) -> Self { + Self { t2_time, ..self } + } } impl Default for Setup { @@ -141,6 +220,7 @@ pub async fn setup_validators( connect_execution_layer_nodes, linkage, epoch_length, + t2_time, .. }: Setup, ) -> (Vec>, ExecutionRuntime) { @@ -154,52 +234,14 @@ pub async fn setup_validators( ); network.start(); - let mut signer_keys = repeat_with(|| PrivateKey::random(&mut *context)) - .take(how_many_signers as usize) - .collect::>(); - signer_keys.sort_by_key(|key| key.public_key()); - let (initial_dkg_outcome, shares) = dkg::deal::<_, _, N3f1>( - &mut *context, - Mode::NonZeroCounter, - ordered::Set::try_from_iter(signer_keys.iter().map(|key| key.public_key())).unwrap(), - ) - .unwrap(); - - let onchain_dkg_outcome = OnchainDkgOutcome { - epoch: Epoch::zero(), - output: initial_dkg_outcome, - next_players: shares.keys().clone(), - is_next_full_dkg: false, - }; - let mut verifier_keys = repeat_with(|| PrivateKey::random(&mut *context)) - .take(how_many_verifiers as usize) - .collect::>(); - verifier_keys.sort_by_key(|key| key.public_key()); - - // The port here does not matter because it will be ignored in simulated p2p. - // Still nice, because sometimes nodes can be better identified in logs. - let network_addresses = (1..) - .map(|port| SocketAddr::from(([127, 0, 0, 1], port))) - .take((how_many_signers + how_many_verifiers) as usize) - .collect::>(); - let chain_addresses = (0..) - .map(crate::execution_runtime::validator) - .take((how_many_signers + how_many_verifiers) as usize) - .collect::>(); - - let validators = ordered::Map::try_from_iter( - shares - .iter() - .zip(&network_addresses) - .zip(&chain_addresses) - .map(|((key, net_addr), chain_addr)| (key.clone(), (*net_addr, *chain_addr))), - ) - .unwrap(); + let (onchain_dkg_outcome, validators) = + generate_consensus_node_config(context, how_many_signers, how_many_verifiers); let execution_runtime = ExecutionRuntime::builder() .with_epoch_length(epoch_length) .with_initial_dkg_outcome(onchain_dkg_outcome) - .with_validators(validators) + .with_t2_time(t2_time) + .with_validators(validators.clone()) .launch() .unwrap(); @@ -209,31 +251,22 @@ pub async fn setup_validators( .generate(); let mut nodes = vec![]; - for ((((private_key, share), mut execution_config), network_address), chain_address) in - signer_keys - .into_iter() - .zip_eq(shares) - .map(|(signing_key, (verifying_key, share))| { - assert_eq!(signing_key.public_key(), verifying_key); - (signing_key, Some(share)) - }) - .chain(verifier_keys.into_iter().map(|key| (key, None))) - .zip_eq(execution_configs) - .zip_eq(network_addresses) - .zip_eq(chain_addresses) + + for ((public_key, consensus_node_config), mut execution_config) in + validators.into_iter().zip_eq(execution_configs) { + let ConsensusNodeConfig { + address, + ingress, + private_key, + share, + .. + } = consensus_node_config; let oracle = oracle.clone(); - let uid = format!("{CONSENSUS_NODE_PREFIX}_{}", private_key.public_key()); + let uid = format!("{CONSENSUS_NODE_PREFIX}_{public_key}"); let feed_state = FeedStateHandle::new(); - execution_config.validator_key = Some( - private_key - .public_key() - .encode() - .as_ref() - .try_into() - .unwrap(), - ); + execution_config.validator_key = Some(public_key.encode().as_ref().try_into().unwrap()); execution_config.feed_state = Some(feed_state.clone()); let engine_config = consensus::Builder { @@ -261,13 +294,13 @@ pub async fn setup_validators( nodes.push(TestingNode::new( uid, - private_key.public_key(), + private_key, oracle.clone(), engine_config, execution_runtime.handle(), execution_config, - network_address, - chain_address, + ingress, + address, )); } @@ -283,7 +316,7 @@ pub fn run(setup: Setup, mut stop_condition: impl FnMut(&str, &str) -> bool) -> executor.start(|mut context| async move { // Setup and run all validators. - let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup).await; + let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup.clone()).await; join_all(nodes.iter_mut().map(|node| node.start(&context))).await; @@ -305,6 +338,18 @@ pub fn run(setup: Setup, mut stop_condition: impl FnMut(&str, &str) -> bool) -> assert_eq!(value, 0); } + if setup.t2_time == 0 { + if metric.ends_with("_dkg_manager_read_players_from_v1_contract_total") { + assert_eq!(0, value.parse::().unwrap()); + } + if metric.ends_with("_dkg_manager_syncing_players") { + assert_eq!(0, value.parse::().unwrap()); + } + if metric.ends_with("_dkg_manager_read_re_dkg_epoch_from_v1_contract_total") { + assert_eq!(0, value.parse::().unwrap()); + } + } + if stop_condition(metric, value) { success = true; break; diff --git a/crates/e2e/src/testing_node.rs b/crates/e2e/src/testing_node.rs index c6f2988c21..75ddb42aae 100644 --- a/crates/e2e/src/testing_node.rs +++ b/crates/e2e/src/testing_node.rs @@ -2,7 +2,10 @@ use crate::execution_runtime::{self, ExecutionNode, ExecutionNodeConfig, ExecutionRuntimeHandle}; use alloy_primitives::Address; -use commonware_cryptography::ed25519::PublicKey; +use commonware_cryptography::{ + Signer as _, + ed25519::{PrivateKey, PublicKey}, +}; use commonware_p2p::simulated::{Control, Oracle, SocketManager}; use commonware_runtime::{Handle, Metrics as _, deterministic::Context}; use reth_db::{Database, DatabaseEnv, mdbx::DatabaseArguments, open_db_read_only}; @@ -14,7 +17,11 @@ use reth_ethereum::{ storage::BlockNumReader, }; use reth_node_builder::NodeTypesWithDBAdapter; -use std::{net::SocketAddr, path::PathBuf, sync::Arc}; +use std::{ + net::{IpAddr, SocketAddr}, + path::PathBuf, + sync::Arc, +}; use tempo_commonware_node::{ BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT, CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT, DKG_CHANNEL_IDENT, DKG_LIMIT, MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT, RESOLVER_CHANNEL_IDENT, @@ -32,7 +39,7 @@ where /// Unique identifier for this node pub uid: String, /// Public key of the validator - pub public_key: PublicKey, + pub private_key: PrivateKey, /// Simulated network oracle for test environments pub oracle: Oracle, /// Consensus configuration used to start the consensus engine @@ -76,7 +83,7 @@ where #[expect(clippy::too_many_arguments, reason = "quickly threw this together")] pub fn new( uid: String, - public_key: PublicKey, + private_key: PrivateKey, oracle: Oracle, consensus_config: consensus::Builder< Control, @@ -87,6 +94,7 @@ where network_address: SocketAddr, chain_address: Address, ) -> Self { + let public_key = private_key.public_key(); let execution_node_datadir = execution_runtime .nodes_dir() .join(execution_runtime::execution_node_name(&public_key)); @@ -94,7 +102,7 @@ where let execution_node_name = execution_runtime::execution_node_name(&public_key); Self { uid, - public_key, + private_key, oracle, consensus_config, consensus_handle: None, @@ -112,9 +120,13 @@ where } } + pub fn private_key(&self) -> &PrivateKey { + &self.private_key + } + /// Get the validator public key of this node. - pub fn public_key(&self) -> &PublicKey { - &self.public_key + pub fn public_key(&self) -> PublicKey { + self.private_key.public_key() } /// Get the unique identifier of this node. @@ -141,6 +153,24 @@ where &self.oracle } + pub fn ingress(&self) -> SocketAddr { + self.network_address + } + + pub fn egress(&self) -> IpAddr { + self.network_address.ip() + } + + /// A verifier is a node that has a share. + pub fn is_signer(&self) -> bool { + self.consensus_config.share.is_some() + } + + /// A verifier is a node that has no share. + pub fn is_verifier(&self) -> bool { + self.consensus_config.share.is_none() + } + /// Start both consensus and execution layers. /// /// @@ -225,43 +255,43 @@ where let votes = self .oracle - .control(self.public_key.clone()) + .control(self.public_key()) .register(VOTES_CHANNEL_IDENT, VOTES_LIMIT) .await .unwrap(); let certificates = self .oracle - .control(self.public_key.clone()) + .control(self.public_key()) .register(CERTIFICATES_CHANNEL_IDENT, CERTIFICATES_LIMIT) .await .unwrap(); let resolver = self .oracle - .control(self.public_key.clone()) + .control(self.public_key()) .register(RESOLVER_CHANNEL_IDENT, RESOLVER_LIMIT) .await .unwrap(); let broadcast = self .oracle - .control(self.public_key.clone()) + .control(self.public_key()) .register(BROADCASTER_CHANNEL_IDENT, BROADCASTER_LIMIT) .await .unwrap(); let marshal = self .oracle - .control(self.public_key.clone()) + .control(self.public_key()) .register(MARSHAL_CHANNEL_IDENT, MARSHAL_LIMIT) .await .unwrap(); let dkg = self .oracle - .control(self.public_key.clone()) + .control(self.public_key()) .register(DKG_CHANNEL_IDENT, DKG_LIMIT) .await .unwrap(); let subblocks = self .oracle - .control(self.public_key.clone()) + .control(self.public_key()) .register(SUBBLOCKS_CHANNEL_IDENT, SUBBLOCKS_LIMIT) .await .unwrap(); diff --git a/crates/e2e/src/tests/consensus_rpc.rs b/crates/e2e/src/tests/consensus_rpc.rs index 0a0e4bafa6..68a41e383f 100644 --- a/crates/e2e/src/tests/consensus_rpc.rs +++ b/crates/e2e/src/tests/consensus_rpc.rs @@ -5,7 +5,9 @@ use std::{net::SocketAddr, time::Duration}; -use super::dkg::common::{assert_no_dkg_failures, wait_for_validators_to_reach_epoch, wait_for_outcome}; +use super::dkg::common::{ + assert_no_dkg_failures, wait_for_outcome, wait_for_validators_to_reach_epoch, +}; use crate::{CONSENSUS_NODE_PREFIX, Setup, setup_validators}; use alloy::transports::http::reqwest::Url; use alloy_primitives::hex; @@ -33,7 +35,10 @@ async fn consensus_subscribe_and_query_finalization() { let _ = tempo_eyre::install(); let initial_height = 3; - let setup = Setup::new().how_many_signers(1).epoch_length(100); + let setup = Setup::new() + .how_many_signers(1) + .t2_time(0) + .epoch_length(100); let cfg = deterministic::Config::default().with_seed(setup.seed); let (addr_tx, addr_rx) = oneshot::channel::<(SocketAddr, SocketAddr)>(); @@ -156,6 +161,7 @@ fn get_identity_transition_proof_after_full_dkg() { let setup = Setup::new() .how_many_signers(how_many_signers) + .t2_time(0) .epoch_length(epoch_length); let seed = setup.seed; diff --git a/crates/e2e/src/tests/dkg/common.rs b/crates/e2e/src/tests/dkg/common.rs index 62edc35bcd..4c07db1b5d 100644 --- a/crates/e2e/src/tests/dkg/common.rs +++ b/crates/e2e/src/tests/dkg/common.rs @@ -11,6 +11,21 @@ use tempo_dkg_onchain_artifacts::OnchainDkgOutcome; use crate::{CONSENSUS_NODE_PREFIX, TestingNode}; +/// Returns the target epoch to wait for depending on `event_height`. +/// +/// If `event_height` is less than a boundary height, then the next epoch is +/// returned. Otherwise, the one *after* the next is returned. +pub(crate) fn target_epoch(epoch_length: u64, event_height: u64) -> Epoch { + let strat = FixedEpocher::new(NZU64!(epoch_length)); + let event_height = Height::new(event_height); + let info = strat.containing(event_height).unwrap(); + if info.last() == event_height { + info.epoch().next().next() + } else { + info.epoch().next() + } +} + /// Reads the DKG outcome from a block, returns None if block doesn't exist or has no outcome. pub(crate) fn read_outcome_from_validator( validator: &TestingNode, diff --git a/crates/e2e/src/tests/dkg/full_ceremony.rs b/crates/e2e/src/tests/dkg/full_ceremony.rs index 99322aac39..d84e7fceed 100644 --- a/crates/e2e/src/tests/dkg/full_ceremony.rs +++ b/crates/e2e/src/tests/dkg/full_ceremony.rs @@ -8,7 +8,7 @@ use commonware_runtime::{ }; use futures::future::join_all; -use super::common::{assert_no_dkg_failures, wait_for_validators_to_reach_epoch, wait_for_outcome}; +use super::common::{assert_no_dkg_failures, wait_for_outcome, wait_for_validators_to_reach_epoch}; use crate::{Setup, setup_validators}; #[test_traced] @@ -77,7 +77,12 @@ impl FullDkgTest { tracing::info!(?pubkey_before, "Group public key BEFORE full DKG"); // Step 2: Wait for full DKG to complete (epoch N+1) - wait_for_validators_to_reach_epoch(&context, self.full_dkg_epoch + 1, self.how_many_signers).await; + wait_for_validators_to_reach_epoch( + &context, + self.full_dkg_epoch + 1, + self.how_many_signers, + ) + .await; assert_no_dkg_failures(&context); // Step 3: Verify full DKG created a NEW polynomial (different public key) @@ -99,7 +104,12 @@ impl FullDkgTest { tracing::info!("Verified: full DKG created independent polynomial"); // Step 4: Wait for reshare (epoch N+2) and verify it PRESERVES the public key - wait_for_validators_to_reach_epoch(&context, self.full_dkg_epoch + 2, self.how_many_signers).await; + wait_for_validators_to_reach_epoch( + &context, + self.full_dkg_epoch + 2, + self.how_many_signers, + ) + .await; assert_no_dkg_failures(&context); let outcome_after_reshare = wait_for_outcome( diff --git a/crates/e2e/src/tests/dkg/static_transitions/mod.rs b/crates/e2e/src/tests/dkg/static_transitions.rs similarity index 100% rename from crates/e2e/src/tests/dkg/static_transitions/mod.rs rename to crates/e2e/src/tests/dkg/static_transitions.rs diff --git a/crates/e2e/src/tests/migration_from_v1_to_v2/dkg/mod.rs b/crates/e2e/src/tests/migration_from_v1_to_v2/dkg/mod.rs new file mode 100644 index 0000000000..57ff963a13 --- /dev/null +++ b/crates/e2e/src/tests/migration_from_v1_to_v2/dkg/mod.rs @@ -0,0 +1 @@ +mod static_sets; diff --git a/crates/e2e/src/tests/migration_from_v1_to_v2/dkg/static_sets.rs b/crates/e2e/src/tests/migration_from_v1_to_v2/dkg/static_sets.rs new file mode 100644 index 0000000000..a2f857f3be --- /dev/null +++ b/crates/e2e/src/tests/migration_from_v1_to_v2/dkg/static_sets.rs @@ -0,0 +1,204 @@ +use std::time::Duration; + +use alloy::transports::http::reqwest::Url; +use commonware_consensus::types::{Epocher, FixedEpocher, Height}; +use commonware_macros::test_traced; +use commonware_runtime::{ + Clock, Metrics as _, Runner as _, + deterministic::{Config, Runner}, +}; +use commonware_utils::NZU64; +use futures::future::join_all; + +use crate::{ + CONSENSUS_NODE_PREFIX, Setup, setup_validators, + tests::dkg::common::wait_for_validators_to_reach_epoch, +}; + +#[test_traced] +fn single_node_transitions_once() { + AssertTransition { + how_many_signers: 1, + epoch_length: 10, + how_many_epochs: 1, + } + .run() +} + +#[test_traced] +fn single_node_transitions_twice() { + AssertTransition { + how_many_signers: 1, + epoch_length: 10, + how_many_epochs: 2, + } + .run() +} + +#[test_traced] +fn two_nodes_transition_once() { + AssertTransition { + how_many_signers: 2, + epoch_length: 10, + how_many_epochs: 1, + } + .run() +} + +#[test_traced] +fn two_nodes_transition_twice() { + AssertTransition { + how_many_signers: 2, + epoch_length: 10, + how_many_epochs: 1, + } + .run() +} + +#[test_traced] +fn four_nodes_transition_once() { + AssertTransition { + how_many_signers: 4, + epoch_length: 20, + how_many_epochs: 1, + } + .run() +} + +#[test_traced] +fn four_nodes_transition_twice() { + AssertTransition { + how_many_signers: 4, + epoch_length: 20, + how_many_epochs: 2, + } + .run() +} + +struct AssertTransition { + how_many_signers: u32, + epoch_length: u64, + how_many_epochs: u64, +} + +impl AssertTransition { + fn run(self) { + let Self { + how_many_signers, + epoch_length, + how_many_epochs, + } = self; + let _ = tempo_eyre::install(); + let setup = Setup::new() + .how_many_signers(how_many_signers) + .epoch_length(epoch_length); + + let executor = Runner::from(Config::default().with_seed(setup.seed)); + + executor.start(|mut context| async move { + // HACK: Sleep 1 second to ensure the deterministic runtime returns + // .current().epoch_millis() > 1000. + context.sleep(Duration::from_secs(1)).await; + + let (mut validators, execution_runtime) = setup_validators(&mut context, setup).await; + + join_all(validators.iter_mut().map(|v| v.start(&context))).await; + + let http_url = validators[0] + .execution() + .rpc_server_handle() + .http_url() + .unwrap() + .parse::() + .unwrap(); + + for i in 0..how_many_signers { + tracing::debug!( + block.number = execution_runtime + .migrate_validator(http_url.clone(), i as u64) + .await + .unwrap() + .block_number, + "migrateValidator returned receipt", + ); + } + let initialization_height = execution_runtime + .initialize_if_migrated(http_url.clone()) + .await + .unwrap() + .block_number + .unwrap(); + + let epoch_strat = FixedEpocher::new(NZU64!(epoch_length)); + let info = epoch_strat + .containing(Height::new(initialization_height)) + .unwrap(); + let initialization_epoch = info.epoch(); + tracing::debug!( + initialization_height, + %initialization_epoch, + "initializeIfMigrated completed", + ); + + // The epoch at which we start checking nodes for transitions. + // + // If the migration completed in epoch 0, we need to wait for + // all nodes to enter epoch 1 before their metrics make sense. + let start_epoch = if info.last().get() == initialization_height { + initialization_epoch.next().next() + } else { + initialization_epoch.next() + } + .get(); + let mut epoch_count = 0; + while epoch_count < how_many_epochs { + tracing::error!("waiting for epoch {}", start_epoch + epoch_count); + wait_for_validators_to_reach_epoch( + &context, + start_epoch + epoch_count, + how_many_signers, + ) + .await; + + for line in context.encode().lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue; + } + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + if metric.ends_with("_dkg_manager_read_players_from_v1_contract_total") { + assert_eq!( + initialization_epoch.get(), + value.parse::().unwrap(), + "v1 contract must only have been read for however \ + many epochs it took to initialize the v2 contract" + ); + } + if metric.ends_with("_dkg_manager_read_players_from_v2_contract_total") { + assert!(value.parse::().unwrap() > 0); + } + if metric.ends_with("_dkg_manager_read_re_dkg_epoch_from_v1_contract_total") { + assert_eq!( + initialization_epoch.get(), + value.parse::().unwrap(), + "v1 contract must only have been read for however \ + many epochs it took to initialize the v2 contract" + ); + } + if metric.ends_with("_dkg_manager_read_re_dkg_epoch_from_v2_contract_total") { + assert!(value.parse::().unwrap() > 0); + } + if metric.ends_with("_dkg_manager_syncing_players") { + assert_eq!( + 0, + value.parse::().unwrap(), + "once migrated, the node should no longer consider syncing players", + ); + } + } + epoch_count += 1; + } + }) + } +} diff --git a/crates/e2e/src/tests/migration_from_v1_to_v2/mod.rs b/crates/e2e/src/tests/migration_from_v1_to_v2/mod.rs new file mode 100644 index 0000000000..b91c7149ce --- /dev/null +++ b/crates/e2e/src/tests/migration_from_v1_to_v2/mod.rs @@ -0,0 +1 @@ +mod dkg; diff --git a/crates/e2e/src/tests/mod.rs b/crates/e2e/src/tests/mod.rs index b4e545e4e0..ebb77caac7 100644 --- a/crates/e2e/src/tests/mod.rs +++ b/crates/e2e/src/tests/mod.rs @@ -8,9 +8,11 @@ mod consensus_rpc; mod dkg; mod linkage; mod metrics; +mod migration_from_v1_to_v2; mod restart; mod subblocks; mod sync; +mod v2_at_genesis; #[test_traced] fn spawning_execution_node_works() { @@ -26,7 +28,7 @@ fn spawning_execution_node_works() { // #[test] // fn spawning_execution_node_works() { // let _telemetry = tracing_subscriber::fmt() - // .with_max_level(Level::DEBUG) + // .with_max_level(tracing::Level::DEBUG) // .with_test_writer() // .try_init(); // diff --git a/crates/e2e/src/tests/sync.rs b/crates/e2e/src/tests/sync.rs index e3aed54384..68a871e59a 100644 --- a/crates/e2e/src/tests/sync.rs +++ b/crates/e2e/src/tests/sync.rs @@ -112,7 +112,7 @@ fn joins_from_snapshot() { // Now turn the receiver into the donor - except for the database dir and // env. This simulates a start from a snapshot. receiver.uid = donor.uid; - receiver.public_key = donor.public_key; + receiver.private_key = donor.private_key; { let peer_manager = receiver.consensus_config.peer_manager.clone(); receiver.consensus_config = donor.consensus_config; @@ -273,7 +273,7 @@ fn can_restart_after_joining_from_snapshot() { // Now turn the receiver into the donor - except for the database dir and // env. This simulates a start from a snapshot. receiver.uid = donor.uid; - receiver.public_key = donor.public_key; + receiver.private_key = donor.private_key; { let peer_manager = receiver.consensus_config.peer_manager.clone(); receiver.consensus_config = donor.consensus_config; diff --git a/crates/e2e/src/tests/v2_at_genesis/backfill.rs b/crates/e2e/src/tests/v2_at_genesis/backfill.rs new file mode 100644 index 0000000000..4afbdfabc5 --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/backfill.rs @@ -0,0 +1,134 @@ +use std::time::Duration; + +use commonware_macros::test_traced; +use commonware_runtime::{ + Clock, Metrics, Runner as _, + deterministic::{Config, Runner}, +}; +use futures::future::join_all; +use reth_ethereum::storage::BlockNumReader; +use reth_node_metrics::recorder::install_prometheus_recorder; + +use crate::{ + CONSENSUS_NODE_PREFIX, Setup, get_pipeline_runs, setup_validators, + tests::v2_at_genesis::assert_no_v1, +}; + +#[test_traced] +fn validator_can_join_later_with_live_sync() { + AssertJoinsLate { + blocks_before_join: 5, + blocks_after_join: 10, + should_pipeline_sync: false, + } + .run(); +} + +#[test_traced] +fn validator_can_join_later_with_pipeline_sync() { + AssertJoinsLate { + blocks_before_join: 65, + blocks_after_join: 70, + should_pipeline_sync: false, + } + .run(); + let _ = tempo_eyre::install(); +} + +#[track_caller] +fn assert_no_new_epoch(context: &impl Metrics, max_epoch: u64) { + let metrics = context.encode(); + for line in metrics.lines() { + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + + if metrics.ends_with("_peers_blocked") { + let value = value.parse::().unwrap(); + assert_eq!(value, 0); + } + + if metric.ends_with("_epoch_manager_latest_epoch") { + let value = value.parse::().unwrap(); + assert!(value <= max_epoch, "epoch progressed; sync likely failed"); + } + } +} + +struct AssertJoinsLate { + blocks_before_join: u64, + blocks_after_join: u64, + should_pipeline_sync: bool, +} +impl AssertJoinsLate { + fn run(self) { + let Self { + blocks_before_join, + blocks_after_join, + should_pipeline_sync, + } = self; + + let _ = tempo_eyre::install(); + let metrics_recorder = install_prometheus_recorder(); + + let setup = Setup::new() + .epoch_length(100) + .t2_time(0) + .connect_execution_layer_nodes(should_pipeline_sync); + + Runner::from(Config::default().with_seed(setup.seed)).start(|mut context| async move { + let (mut nodes, _execution_runtime) = + setup_validators(&mut context, setup.clone()).await; + + // Start all nodes except the last one + let mut last = nodes.pop().unwrap(); + join_all(nodes.iter_mut().map(|node| node.start(&context))).await; + + // Wait for chain to advance before starting the last node + while nodes[0].execution_provider().last_block_number().unwrap() < blocks_before_join { + context.sleep(Duration::from_secs(1)).await; + } + + last.start(&context).await; + assert_eq!(last.execution_provider().last_block_number().unwrap(), 0); + + tracing::debug!("last node started"); + + // Assert that last node is able to catch up and progress + while last.execution_provider().last_block_number().unwrap() < blocks_after_join { + context.sleep(Duration::from_millis(100)).await; + assert_no_new_epoch(&context, 0); + } + for line in context.encode().lines() { + if line.starts_with(CONSENSUS_NODE_PREFIX) { + continue; + } + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + assert_no_v1(metric, value); + } + // Verify backfill behavior + let actual_runs = get_pipeline_runs(metrics_recorder); + if should_pipeline_sync { + assert!( + actual_runs > 0, + "at least one backfill must have been triggered" + ); + } else { + assert_eq!( + 0, actual_runs, + "expected no backfill, got {actual_runs} runs" + ); + } + + // Verify that the node is still progressing after sync + let last_block = last.execution_provider().last_block_number().unwrap(); + context.sleep(Duration::from_secs(10)).await; + assert!( + last.execution_provider().last_block_number().unwrap() > last_block, + "node should still be progressing after sync" + ); + }); + } +} diff --git a/crates/e2e/src/tests/v2_at_genesis/consensus_rpc.rs b/crates/e2e/src/tests/v2_at_genesis/consensus_rpc.rs new file mode 100644 index 0000000000..e26c8d711d --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/consensus_rpc.rs @@ -0,0 +1,299 @@ +//! Tests for the consensus RPC namespace. +//! +//! These tests verify that the consensus RPC endpoints work correctly, +//! including subscriptions and queries. + +use std::{net::SocketAddr, time::Duration}; + +use super::dkg::common::{ + assert_no_dkg_failures, wait_for_outcome, wait_for_validators_to_reach_epoch, +}; +use crate::{CONSENSUS_NODE_PREFIX, Setup, setup_validators}; +use alloy::transports::http::reqwest::Url; +use alloy_primitives::hex; +use commonware_codec::ReadExt as _; +use commonware_consensus::simplex::{scheme::bls12381_threshold::vrf::Scheme, types::Finalization}; +use commonware_cryptography::{ + bls12381::primitives::variant::{MinSig, Variant}, + ed25519::PublicKey, +}; +use commonware_macros::test_traced; +use commonware_runtime::{ + Clock, Metrics as _, Runner as _, + deterministic::{self, Context, Runner}, +}; +use futures::{channel::oneshot, future::join_all}; +use jsonrpsee::{http_client::HttpClientBuilder, ws_client::WsClientBuilder}; +use tempo_commonware_node::consensus::Digest; +use tempo_node::rpc::consensus::{Event, Query, TempoConsensusApiClient}; + +/// Test that subscribing to consensus events works and that finalization +/// can be queried via HTTP after receiving a finalization event. +#[tokio::test] +#[test_traced] +async fn consensus_subscribe_and_query_finalization() { + let _ = tempo_eyre::install(); + + let initial_height = 3; + let setup = Setup::new().how_many_signers(1).epoch_length(100); + let cfg = deterministic::Config::default().with_seed(setup.seed); + + let (addr_tx, addr_rx) = oneshot::channel::<(SocketAddr, SocketAddr)>(); + let (done_tx, done_rx) = oneshot::channel::<()>(); + + let executor_handle = std::thread::spawn(move || { + let executor = Runner::from(cfg); + executor.start(|mut context| async move { + let (mut validators, _execution_runtime) = setup_validators(&mut context, setup).await; + validators[0].start(&context).await; + wait_for_height(&context, initial_height).await; + + let execution = validators[0].execution(); + + addr_tx + .send(( + execution.rpc_server_handles.rpc.http_local_addr().unwrap(), + execution.rpc_server_handles.rpc.ws_local_addr().unwrap(), + )) + .unwrap(); + + let _ = done_rx.await; + }); + }); + + let (http_addr, ws_addr) = addr_rx.await.unwrap(); + let ws_url = format!("ws://{ws_addr}"); + let http_url = format!("http://{http_addr}"); + let ws_client = WsClientBuilder::default().build(&ws_url).await.unwrap(); + let mut subscription = ws_client.subscribe_events().await.unwrap(); + + let http_client = HttpClientBuilder::default().build(&http_url).unwrap(); + + let mut saw_notarized = false; + let mut saw_finalized = false; + let mut current_height = initial_height; + + while !saw_notarized || !saw_finalized { + let event = tokio::time::timeout(Duration::from_secs(10), subscription.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + + match event { + Event::Notarized { .. } => { + saw_notarized = true; + } + Event::Finalized { block, .. } => { + let height = block.height.unwrap(); + assert!( + height > current_height, + "finalized height should be > {current_height}" + ); + + let queried_block = http_client + .get_finalization(Query::Height(height)) + .await + .unwrap() + .unwrap(); + + assert_eq!(queried_block, block); + + current_height = height; + saw_finalized = true; + } + Event::Nullified { .. } => {} + } + } + + let _ = http_client + .get_finalization(Query::Latest) + .await + .unwrap() + .unwrap(); + + let state = http_client.get_latest().await.unwrap(); + + assert!(state.finalized.is_some()); + + drop(done_tx); + executor_handle.join().unwrap(); +} + +/// Wait for a validator to reach a target height by checking metrics. +async fn wait_for_height(context: &Context, target_height: u64) { + loop { + let metrics = context.encode(); + for line in metrics.lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue; + } + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + if metric.ends_with("_marshal_processed_height") { + let height = value.parse::().unwrap(); + if height >= target_height { + return; + } + } + } + context.sleep(Duration::from_millis(100)).await; + } +} + +/// Test that `get_identity_transition_proof` returns valid proofs after a full DKG ceremony. +/// +/// This verifies: +/// 1. After a full DKG, the RPC returns a transition with different old/new public keys +/// 2. The transition epoch matches where the full DKG occurred +/// 3. The proof contains a valid header and certificate +#[test_traced] +fn get_identity_transition_proof_after_full_dkg() { + let _ = tempo_eyre::install(); + + let how_many_signers = 1; + let epoch_length = 10; + let full_dkg_epoch: u64 = 1; + + let setup = Setup::new() + .how_many_signers(how_many_signers) + .epoch_length(epoch_length); + + let seed = setup.seed; + let cfg = deterministic::Config::default().with_seed(seed); + let executor = Runner::from(cfg); + + executor.start(|mut context| async move { + let (mut validators, execution_runtime) = setup_validators(&mut context, setup).await; + + join_all(validators.iter_mut().map(|v| v.start(&context))).await; + + // Get HTTP URL for RPC + let http_url: Url = validators[0] + .execution() + .rpc_server_handle() + .http_url() + .unwrap() + .parse() + .unwrap(); + + // Schedule full DKG for epoch 1 + execution_runtime + .set_next_full_dkg_ceremony(http_url.clone(), full_dkg_epoch) + .await + .unwrap(); + + // Wait for is_next_full_dkg flag + let outcome_before = + wait_for_outcome(&context, &validators, full_dkg_epoch - 1, epoch_length).await; + assert!( + outcome_before.is_next_full_dkg, + "Epoch {} outcome should have is_next_full_dkg=true", + full_dkg_epoch - 1 + ); + let pubkey_before = *outcome_before.sharing().public(); + + // Wait for full DKG to complete + wait_for_validators_to_reach_epoch(&context, full_dkg_epoch + 1, how_many_signers).await; + assert_no_dkg_failures(&context); + + // Verify the full DKG created a new public key + let outcome_after = + wait_for_outcome(&context, &validators, full_dkg_epoch, epoch_length).await; + let pubkey_after = *outcome_after.sharing().public(); + assert_ne!( + pubkey_before, pubkey_after, + "Full DKG must produce a DIFFERENT group public key" + ); + + // Test 1: Query from latest epoch (after full DKG) - should have transition + // Run on execution runtime's tokio runtime since jsonrpsee requires tokio + let http_url_str = http_url.to_string(); + let response = execution_runtime + .run_async(async move { + let http_client = HttpClientBuilder::default().build(&http_url_str).unwrap(); + http_client + .get_identity_transition_proof(None, Some(false)) + .await + .unwrap() + }) + .await + .unwrap(); + + assert!( + !response.identity.is_empty(), + "Identity should always be present" + ); + assert_eq!( + response.transitions.len(), + 1, + "Expected exactly one transition" + ); + + let transition = &response.transitions[0]; + assert_eq!( + transition.transition_epoch, full_dkg_epoch, + "Transition epoch should match full DKG epoch" + ); + assert_ne!( + transition.old_identity, transition.new_identity, + "Old and new public keys should be different" + ); + assert_eq!( + response.identity, transition.new_identity, + "Identity should match the new public key from the latest transition" + ); + + // Decode and verify the BLS signature + let old_pubkey_bytes = hex::decode(&transition.old_identity).unwrap(); + let old_pubkey = ::Public::read(&mut old_pubkey_bytes.as_slice()) + .expect("valid BLS public key"); + let proof = transition + .proof + .as_ref() + .expect("non-genesis transition should have proof"); + let finalization = Finalization::, Digest>::read( + &mut hex::decode(&proof.finalization_certificate) + .unwrap() + .as_slice(), + ) + .expect("valid finalization"); + + assert!( + finalization.verify( + &mut context, + &Scheme::certificate_verifier(tempo_commonware_node::NAMESPACE, old_pubkey), + &commonware_parallel::Sequential + ), + "BLS signature verification failed" + ); + + // Test 2: Query from epoch 0 (before full DKG) - should have identity but no transitions + let old_identity = transition.old_identity.clone(); + let http_url_str = http_url.to_string(); + let response_epoch0 = execution_runtime + .run_async(async move { + let http_client = HttpClientBuilder::default().build(&http_url_str).unwrap(); + http_client + .get_identity_transition_proof(Some(0), Some(false)) + .await + .unwrap() + }) + .await + .unwrap(); + + assert!( + !response_epoch0.identity.is_empty(), + "Identity should be present even at epoch 0" + ); + assert!( + response_epoch0.transitions.is_empty(), + "Should have no transitions when querying from epoch 0" + ); + assert_eq!( + response_epoch0.identity, old_identity, + "Identity at epoch 0 should be the old public key (before full DKG)" + ); + }); +} diff --git a/crates/e2e/src/tests/v2_at_genesis/dkg/common.rs b/crates/e2e/src/tests/v2_at_genesis/dkg/common.rs new file mode 100644 index 0000000000..62edc35bcd --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/dkg/common.rs @@ -0,0 +1,140 @@ +//! Common helpers for DKG tests. + +use std::time::Duration; + +use commonware_codec::ReadExt as _; +use commonware_consensus::types::{Epoch, Epocher as _, FixedEpocher, Height}; +use commonware_runtime::{Clock as _, Metrics as _, deterministic::Context}; +use commonware_utils::NZU64; +use reth_ethereum::provider::BlockReader as _; +use tempo_dkg_onchain_artifacts::OnchainDkgOutcome; + +use crate::{CONSENSUS_NODE_PREFIX, TestingNode}; + +/// Reads the DKG outcome from a block, returns None if block doesn't exist or has no outcome. +pub(crate) fn read_outcome_from_validator( + validator: &TestingNode, + block_num: Height, +) -> Option { + let provider = validator.execution_provider(); + let block = provider.block_by_number(block_num.get()).ok()??; + let extra_data = &block.header.inner.extra_data; + + if extra_data.is_empty() { + return None; + } + + Some(OnchainDkgOutcome::read(&mut extra_data.as_ref()).expect("valid DKG outcome")) +} + +/// Parses a metric line, returning (metric_name, value) if valid. +pub(crate) fn parse_metric_line(line: &str) -> Option<(&str, u64)> { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + return None; + } + + let mut parts = line.split_whitespace(); + let metric = parts.next()?; + let value = parts.next()?.parse().ok()?; + + Some((metric, value)) +} + +/// Waits for and reads the DKG outcome from the last block of the given epoch. +pub(crate) async fn wait_for_outcome( + context: &Context, + validators: &[TestingNode], + epoch: u64, + epoch_length: u64, +) -> OnchainDkgOutcome { + let height = FixedEpocher::new(NZU64!(epoch_length)) + .last(Epoch::new(epoch)) + .expect("valid epoch"); + + tracing::info!(epoch, %height, "Waiting for DKG outcome"); + + loop { + context.sleep(Duration::from_secs(1)).await; + + if let Some(outcome) = read_outcome_from_validator(&validators[0], height) { + tracing::info!( + epoch, + %height, + outcome_epoch = %outcome.epoch, + is_next_full_dkg = outcome.is_next_full_dkg, + "Read DKG outcome" + ); + return outcome; + } + } +} + +/// Counts how many validators have reached the target epoch. +pub(crate) fn count_validators_at_epoch(context: &Context, target_epoch: u64) -> u32 { + let metrics = context.encode(); + let mut at_epoch = 0; + + for line in metrics.lines() { + let Some((metric, value)) = parse_metric_line(line) else { + continue; + }; + + if metric.ends_with("_epoch_manager_latest_epoch") && value >= target_epoch { + at_epoch += 1; + } + } + + at_epoch +} + +/// Waits until at least `min_validators` have reached the target epoch. +pub(crate) async fn wait_for_validators_to_reach_epoch( + context: &Context, + target_epoch: u64, + min_validators: u32, +) { + tracing::info!(target_epoch, min_validators, "Waiting for epoch"); + + loop { + context.sleep(Duration::from_secs(1)).await; + + if count_validators_at_epoch(context, target_epoch) >= min_validators { + tracing::info!(target_epoch, "Validators reached epoch"); + return; + } + } +} + +/// Asserts that no DKG ceremony failures have occurred. +#[track_caller] +pub(crate) fn assert_no_dkg_failures(context: &Context) { + let metrics = context.encode(); + + for line in metrics.lines() { + let Some((metric, value)) = parse_metric_line(line) else { + continue; + }; + + if metric.ends_with("_dkg_manager_ceremony_failures_total") { + assert_eq!(0, value, "DKG ceremony failed: {metric}"); + } + } +} + +/// Asserts that at least one validator has skipped rounds (indicating sync occurred). +#[track_caller] +pub(crate) fn assert_skipped_rounds(context: &Context) { + let metrics = context.encode(); + + for line in metrics.lines() { + let Some((metric, value)) = parse_metric_line(line) else { + continue; + }; + + if metric.ends_with("_rounds_skipped_total") && value > 0 { + return; + } + } + + panic!("Expected at least one validator to have skipped rounds during sync"); +} diff --git a/crates/e2e/src/tests/v2_at_genesis/dkg/dynamic.rs b/crates/e2e/src/tests/v2_at_genesis/dkg/dynamic.rs new file mode 100644 index 0000000000..e7c007beea --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/dkg/dynamic.rs @@ -0,0 +1,302 @@ +use std::time::Duration; + +use alloy::transports::http::reqwest::Url; +use commonware_macros::test_traced; +use commonware_runtime::{ + Clock as _, Metrics as _, Runner as _, + deterministic::{Config, Runner}, +}; +use futures::future::join_all; + +use crate::{ + CONSENSUS_NODE_PREFIX, Setup, setup_validators, + tests::{ + dkg::common::target_epoch, + v2_at_genesis::{assert_no_dkg_failure, assert_no_v1}, + }, +}; + +#[test_traced] +fn validator_is_added_to_a_set_of_one() { + AssertValidatorIsAdded { + how_many_initial: 1, + epoch_length: 10, + } + .run(); +} + +#[test_traced] +fn validator_is_added_to_a_set_of_three() { + AssertValidatorIsAdded { + how_many_initial: 3, + epoch_length: 30, + } + .run(); +} + +#[test_traced] +fn validator_is_removed_from_set_of_two() { + AssertValidatorIsRemoved { + how_many_initial: 2, + epoch_length: 20, + } + .run(); +} + +#[test_traced] +fn validator_is_removed_from_set_of_four() { + AssertValidatorIsRemoved { + how_many_initial: 4, + epoch_length: 40, + } + .run(); +} + +struct AssertValidatorIsAdded { + how_many_initial: u32, + epoch_length: u64, +} + +impl AssertValidatorIsAdded { + fn run(self) { + let Self { + how_many_initial, + epoch_length, + } = self; + let _ = tempo_eyre::install(); + let setup = Setup::new() + .how_many_signers(how_many_initial) + .how_many_verifiers(1) + .t2_time(0) + .epoch_length(epoch_length); + + let cfg = Config::default().with_seed(setup.seed); + let executor = Runner::from(cfg); + + executor.start(|mut context| async move { + let (mut validators, execution_runtime) = setup_validators(&mut context, setup).await; + + let added_uid = validators + .iter() + .find(|v| v.is_verifier()) + .unwrap() + .uid + .clone(); + join_all(validators.iter_mut().map(|v| v.start(&context))).await; + + // We will send an arbitrary node of the initial validator set the smart + // contract call. + let http_url = validators + .iter() + .find(|v| v.is_signer()) + .unwrap() + .execution() + .rpc_server_handle() + .http_url() + .unwrap() + .parse::() + .unwrap(); + + let receipt = execution_runtime + .add_validator_v2( + http_url.clone(), + validators.iter().find(|v| v.is_verifier()).unwrap(), + ) + .await + .unwrap(); + + tracing::debug!( + block.number = receipt.block_number, + "addValidator call returned receipt" + ); + + let player_epoch = target_epoch(epoch_length, receipt.block_number.unwrap()); + let dealer_epoch = player_epoch.next(); + + 'becomes_signer: loop { + context.sleep(Duration::from_secs(1)).await; + + let mut entered_player_epoch = false; + let mut entered_dealer_epoch = false; + for line in context.encode().lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue; + } + + let mut parts = line.split_whitespace(); + let key = parts.next().unwrap(); + let value = parts.next().unwrap(); + + assert_no_v1(key, value); + assert_no_dkg_failure(key, value); + + if key.ends_with("peer_manager_peers") { + assert_eq!( + how_many_initial + 1, + value.parse::().unwrap(), + "peers are registered on the next finalized block; this should have happened almost immediately", + ); + } + + if key.ends_with("_epoch_manager_latest_epoch") { + let epoch = value.parse::().unwrap(); + + if key.contains(&added_uid) { + entered_player_epoch |= epoch >= player_epoch.get(); + entered_dealer_epoch |= epoch >= dealer_epoch.get(); + } + + assert!( + epoch < dealer_epoch.next().get(), + "network reached epoch `{}` without added validator getting a share", + dealer_epoch.next(), + ); + } + + if entered_player_epoch && !entered_dealer_epoch { + if key.ends_with("_dkg_manager_ceremony_players") { + assert_eq!(how_many_initial + 1, value.parse::().unwrap(),) + } + if key.ends_with("_dkg_manager_ceremony_dealers") { + assert_eq!(how_many_initial, value.parse::().unwrap(),) + } + } + + if entered_dealer_epoch { + if key.ends_with("_dkg_manager_ceremony_dealers") { + assert_eq!(how_many_initial + 1, value.parse::().unwrap(),) + } + + if key.ends_with("_epoch_manager_how_often_signer_total") { + assert!(value.parse::().unwrap() > 0,); + break 'becomes_signer; + } + } + } + } + }) + } +} + +struct AssertValidatorIsRemoved { + how_many_initial: u32, + epoch_length: u64, +} + +impl AssertValidatorIsRemoved { + fn run(self) { + let Self { + how_many_initial, + epoch_length, + } = self; + let _ = tempo_eyre::install(); + let setup = Setup::new() + .how_many_signers(how_many_initial) + .t2_time(0) + .epoch_length(epoch_length); + + let cfg = Config::default().with_seed(setup.seed); + let executor = Runner::from(cfg); + + executor.start(|mut context| async move { + let (mut validators, execution_runtime) = setup_validators(&mut context, setup).await; + + join_all(validators.iter_mut().map(|v| v.start(&context))).await; + + // We will send an arbitrary node of the initial validator set the smart + // contract call. + let http_url = validators + .iter() + .find(|v| v.is_signer()) + .unwrap() + .execution() + .rpc_server_handle() + .http_url() + .unwrap() + .parse::() + .unwrap(); + + let removed_validator = validators.pop().unwrap(); + + let receipt = execution_runtime + .deactivate_validator_v2(http_url, &removed_validator) + .await + .unwrap(); + + tracing::debug!( + block.number = receipt.block_number, + "deactivateValidator call returned receipt" + ); + + let removal_epoch = target_epoch(epoch_length, receipt.block_number.unwrap()); + let removed_epoch = removal_epoch.next(); + + 'is_removed: loop { + context.sleep(Duration::from_secs(1)).await; + + let mut entered_removal_epoch = false; + let mut entered_removed_epoch = false; + for line in context.encode().lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue; + } + + let mut parts = line.split_whitespace(); + let key = parts.next().unwrap(); + let value = parts.next().unwrap(); + + assert_no_v1(key, value); + assert_no_dkg_failure(key, value); + + if key.ends_with("ceremony_failures_total") { + assert_eq!(0, value.parse::().unwrap(),); + } + + if key.ends_with("_epoch_manager_latest_epoch") { + let epoch = value.parse::().unwrap(); + + assert!( + epoch < removed_epoch.next().get(), + "validator removal should have happened by epoch \ + `{removed_epoch}`, but network is already in epoch \ + {}", + removed_epoch.next(), + ); + + if key.contains(&removed_validator.uid) { + entered_removal_epoch |= epoch >= removal_epoch.get(); + } + + entered_removed_epoch |= epoch >= removed_epoch.get(); + } + + if entered_removal_epoch && !entered_removed_epoch { + if key.ends_with("_dkg_manager_ceremony_players") { + assert_eq!(how_many_initial - 1, value.parse::().unwrap(),) + } + if key.ends_with("_dkg_manager_ceremony_dealers") { + assert_eq!(how_many_initial, value.parse::().unwrap(),) + } + } + + if entered_removed_epoch && !key.contains(&removed_validator.uid) { + if key.ends_with("peer_manager_peers") { + assert_eq!( + how_many_initial - 1, + value.parse::().unwrap(), + "once the peer is deactivated and no longer a \ + dealer, it should be removed from the list of \ + peers immediately" + ); + } + + if key.ends_with("_dkg_manager_ceremony_dealers") { + assert_eq!(how_many_initial - 1, value.parse::().unwrap(),); + break 'is_removed; + } + } + } + } + }) + } +} diff --git a/crates/e2e/src/tests/v2_at_genesis/dkg/fast_sync_after_full_dkg.rs b/crates/e2e/src/tests/v2_at_genesis/dkg/fast_sync_after_full_dkg.rs new file mode 100644 index 0000000000..134ceb19ee --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/dkg/fast_sync_after_full_dkg.rs @@ -0,0 +1,133 @@ +//! Tests for fast sync after a full DKG ceremony. + +use alloy::transports::http::reqwest::Url; +use commonware_macros::test_traced; +use commonware_runtime::{ + Clock as _, Runner as _, + deterministic::{Config, Runner}, +}; +use futures::future::join_all; +use reth_ethereum::storage::BlockNumReader as _; +use std::time::Duration; +use tracing::info; + +use super::common::{ + assert_no_dkg_failures, assert_skipped_rounds, wait_for_outcome, + wait_for_validators_to_reach_epoch, +}; +use crate::{Setup, setup_validators}; + +/// Tests that a late-joining validator can sync and participate after a full DKG ceremony. +/// +/// This verifies: +/// 1. A full DKG ceremony completes successfully (new polynomial, different public key) +/// 2. A validator that joins late (after full DKG) can sync the chain +/// 3. The late validator uses fast-sync to jump epoch boundaries (including the full DKG epoch) +/// 4. The late validator continues progressing after sync +#[test_traced] +fn validator_can_fast_sync_after_full_dkg() { + let _ = tempo_eyre::install(); + + let how_many_signers = 4; + let epoch_length = 20; + let full_dkg_epoch = 1; + let blocks_before_late_join = 3 * epoch_length + 1; + + let setup = Setup::new() + .how_many_signers(how_many_signers) + .epoch_length(epoch_length) + .t2_time(0) + .connect_execution_layer_nodes(true); + + let cfg = Config::default().with_seed(setup.seed); + let executor = Runner::from(cfg); + + executor.start(|mut context| async move { + let (mut validators, execution_runtime) = setup_validators(&mut context, setup).await; + + let mut late_validator = validators.pop().unwrap(); + join_all(validators.iter_mut().map(|v| v.start(&context))).await; + + let http_url: Url = validators[0] + .execution() + .rpc_server_handle() + .http_url() + .unwrap() + .parse() + .unwrap(); + + execution_runtime + .set_next_full_dkg_ceremony_v2(http_url, full_dkg_epoch) + .await + .unwrap(); + + let outcome_before = + wait_for_outcome(&context, &validators, full_dkg_epoch - 1, epoch_length).await; + assert!( + outcome_before.is_next_full_dkg, + "outcome.is_next_full_dkg should be `true`" + ); + + // wait for full DKG completion (-1 because late validator not started yet) + wait_for_validators_to_reach_epoch(&context, full_dkg_epoch + 1, how_many_signers - 1) + .await; + + let outcome_after = + wait_for_outcome(&context, &validators, full_dkg_epoch, epoch_length).await; + assert_ne!( + outcome_before.sharing().public(), + outcome_after.sharing().public(), + "full DKG must create different public key" + ); + + // wait for chain to advance + while validators[0] + .execution_provider() + .last_block_number() + .unwrap() + < blocks_before_late_join + { + context.sleep(Duration::from_secs(1)).await; + } + + // start late validator + late_validator.start(&context).await; + info!(id = late_validator.uid, "started late validator",); + assert_eq!( + late_validator + .execution_provider() + .last_block_number() + .unwrap(), + 0, + "Late validator should start at block 0" + ); + + // wait for late validator to catch up + while late_validator + .execution_provider() + .last_block_number() + .unwrap() + < blocks_before_late_join + { + context.sleep(Duration::from_millis(100)).await; + } + // ensure fast-sync was used to jump epoch boundaries (including from old to new sharing) + assert_skipped_rounds(&context); + + // verify continued progress + let block_after_sync = late_validator + .execution_provider() + .last_block_number() + .unwrap(); + context.sleep(Duration::from_secs(2)).await; + let block_later = late_validator + .execution_provider() + .last_block_number() + .unwrap(); + assert!( + block_later > block_after_sync, + "Late validator should keep progressing after sync" + ); + assert_no_dkg_failures(&context); + }) +} diff --git a/crates/e2e/src/tests/v2_at_genesis/dkg/full_ceremony.rs b/crates/e2e/src/tests/v2_at_genesis/dkg/full_ceremony.rs new file mode 100644 index 0000000000..cf67c40f64 --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/dkg/full_ceremony.rs @@ -0,0 +1,140 @@ +//! Tests for full DKG ceremonies triggered by `setNextFullDkgCeremony`. + +use alloy::transports::http::reqwest::Url; +use commonware_macros::test_traced; +use commonware_runtime::{ + Runner as _, + deterministic::{Config, Runner}, +}; +use futures::future::join_all; + +use super::common::{assert_no_dkg_failures, wait_for_outcome, wait_for_validators_to_reach_epoch}; +use crate::{Setup, setup_validators}; + +#[test_traced] +fn full_dkg_ceremony() { + FullDkgTest { + how_many_signers: 1, + epoch_length: 10, + full_dkg_epoch: 1, + } + .run(); +} + +struct FullDkgTest { + how_many_signers: u32, + epoch_length: u64, + full_dkg_epoch: u64, +} + +impl FullDkgTest { + fn run(self) { + let _ = tempo_eyre::install(); + + let setup = Setup::new() + .how_many_signers(self.how_many_signers) + .t2_time(0) + .epoch_length(self.epoch_length); + + let cfg = Config::default().with_seed(setup.seed); + let executor = Runner::from(cfg); + + executor.start(|mut context| async move { + let (mut validators, execution_runtime) = setup_validators(&mut context, setup).await; + + join_all(validators.iter_mut().map(|v| v.start(&context))).await; + + // Schedule full DKG for the specified epoch + let http_url: Url = validators[0] + .execution() + .rpc_server_handle() + .http_url() + .unwrap() + .parse() + .unwrap(); + + execution_runtime + .set_next_full_dkg_ceremony_v2(http_url, self.full_dkg_epoch) + .await + .unwrap(); + + tracing::info!(full_dkg_epoch = self.full_dkg_epoch, "Scheduled full DKG"); + + // Step 1: Wait for and verify the is_next_full_dkg flag in epoch N-1 + let outcome_before = wait_for_outcome( + &context, + &validators, + self.full_dkg_epoch - 1, + self.epoch_length, + ) + .await; + + assert!( + outcome_before.is_next_full_dkg, + "Epoch {} outcome should have is_next_full_dkg=true", + self.full_dkg_epoch - 1 + ); + let pubkey_before = *outcome_before.sharing().public(); + tracing::info!(?pubkey_before, "Group public key BEFORE full DKG"); + + // Step 2: Wait for full DKG to complete (epoch N+1) + wait_for_validators_to_reach_epoch( + &context, + self.full_dkg_epoch + 1, + self.how_many_signers, + ) + .await; + assert_no_dkg_failures(&context); + + // Step 3: Verify full DKG created a NEW polynomial (different public key) + let outcome_after_full = wait_for_outcome( + &context, + &validators, + self.full_dkg_epoch, + self.epoch_length, + ) + .await; + + let pubkey_after_full = *outcome_after_full.sharing().public(); + tracing::info!(?pubkey_after_full, "Group public key AFTER full DKG"); + + assert_ne!( + pubkey_before, pubkey_after_full, + "Full DKG must produce a DIFFERENT group public key" + ); + tracing::info!("Verified: full DKG created independent polynomial"); + + // Step 4: Wait for reshare (epoch N+2) and verify it PRESERVES the public key + wait_for_validators_to_reach_epoch( + &context, + self.full_dkg_epoch + 2, + self.how_many_signers, + ) + .await; + assert_no_dkg_failures(&context); + + let outcome_after_reshare = wait_for_outcome( + &context, + &validators, + self.full_dkg_epoch + 1, + self.epoch_length, + ) + .await; + + assert!( + !outcome_after_reshare.is_next_full_dkg, + "Epoch {} should NOT have is_next_full_dkg flag", + self.full_dkg_epoch + 1 + ); + + let pubkey_after_reshare = *outcome_after_reshare.sharing().public(); + tracing::info!(?pubkey_after_reshare, "Group public key AFTER reshare"); + + assert_eq!( + pubkey_after_full, pubkey_after_reshare, + "Reshare must PRESERVE the group public key" + ); + tracing::info!("Verified: reshare preserved polynomial (full DKG only ran once)"); + }) + } +} diff --git a/crates/e2e/src/tests/v2_at_genesis/dkg/mod.rs b/crates/e2e/src/tests/v2_at_genesis/dkg/mod.rs new file mode 100644 index 0000000000..b0bfed2b84 --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/dkg/mod.rs @@ -0,0 +1,8 @@ +//! Tests on chain DKG and epoch transition + +pub(crate) mod common; +mod dynamic; +mod fast_sync_after_full_dkg; +mod full_ceremony; +mod share_loss; +mod static_transitions; diff --git a/crates/e2e/src/tests/v2_at_genesis/dkg/share_loss.rs b/crates/e2e/src/tests/v2_at_genesis/dkg/share_loss.rs new file mode 100644 index 0000000000..bb5e651683 --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/dkg/share_loss.rs @@ -0,0 +1,93 @@ +use std::time::Duration; + +use commonware_macros::test_traced; +use commonware_runtime::{ + Clock as _, Metrics as _, Runner as _, + deterministic::{Config, Runner}, +}; +use futures::future::join_all; + +use crate::{CONSENSUS_NODE_PREFIX, Setup, setup_validators}; + +#[test_traced] +fn validator_lost_share_but_gets_share_in_next_epoch() { + let _ = tempo_eyre::install(); + + let seed = 0; + + let cfg = Config::default().with_seed(seed); + let executor = Runner::from(cfg); + + executor.start(|mut context| async move { + let epoch_length = 20; + let setup = Setup::new() + .seed(seed) + .epoch_length(epoch_length) + .t2_time(0) + .connect_execution_layer_nodes(true); + + let (mut validators, _execution_runtime) = + setup_validators(&mut context, setup.clone()).await; + let uid = { + let last_node = validators + .last_mut() + .expect("we just asked for a couple of validators"); + last_node + .consensus_config_mut() + .share + .take() + .expect("the node must have had a share"); + last_node.uid().to_string() + }; + + join_all(validators.iter_mut().map(|v| v.start(&context))).await; + + let mut node_forgot_share = false; + + 'acquire_share: loop { + context.sleep(Duration::from_secs(1)).await; + + let metrics = context.encode(); + + 'metrics: for line in metrics.lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue 'metrics; + } + + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + + if metrics.ends_with("_peers_blocked") { + let value = value.parse::().unwrap(); + assert_eq!(value, 0); + } + + if metric.ends_with("_epoch_manager_latest_epoch") { + let value = value.parse::().unwrap(); + assert!(value < 2, "reached 2nd epoch without recovering new share"); + } + + // Ensures that node has no share. + if !node_forgot_share + && metric.contains(&uid) + && metric.ends_with("_epoch_manager_how_often_verifier_total") + { + let value = value.parse::().unwrap(); + node_forgot_share = value > 0; + } + + // Ensure that the node gets a share by becoming a signer. + if node_forgot_share + && metric.contains(&uid) + && metric.ends_with("_epoch_manager_how_often_signer_total") + { + let value = value.parse::().unwrap(); + if value > 0 { + break 'acquire_share; + } + } + } + } + }); +} diff --git a/crates/e2e/src/tests/v2_at_genesis/dkg/static_transitions.rs b/crates/e2e/src/tests/v2_at_genesis/dkg/static_transitions.rs new file mode 100644 index 0000000000..9d142b2695 --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/dkg/static_transitions.rs @@ -0,0 +1,121 @@ +//! Tests for successful DKG ceremonies with static sets of validators. +//! +//! Contains test for DKG transition logic +//! at genesis. +use commonware_macros::test_traced; + +use crate::{Setup, run, tests::v2_at_genesis::assert_no_v1}; + +#[test_traced] +fn single_validator_can_transition_once() { + AssertStaticTransitions { + how_many: 1, + epoch_length: 5, + transitions: 1, + } + .run(); +} + +#[test_traced] +fn single_validator_can_transition_twice() { + AssertStaticTransitions { + how_many: 1, + epoch_length: 5, + transitions: 2, + } + .run(); +} + +#[test_traced] +fn single_validator_can_transition_four_times() { + AssertStaticTransitions { + how_many: 1, + epoch_length: 5, + transitions: 4, + } + .run(); +} + +#[test_traced] +fn two_validators_can_transition_once() { + AssertStaticTransitions { + how_many: 2, + epoch_length: 20, + transitions: 1, + } + .run(); +} + +#[test_traced] +fn two_validators_can_transition_twice() { + AssertStaticTransitions { + how_many: 2, + epoch_length: 20, + transitions: 2, + } + .run(); +} + +#[test_traced] +fn four_validators_can_transition_once() { + AssertStaticTransitions { + how_many: 4, + epoch_length: 20, + transitions: 1, + } + .run(); +} + +#[test_traced] +fn four_validators_can_transition_twice() { + AssertStaticTransitions { + how_many: 4, + epoch_length: 20, + transitions: 2, + } + .run(); +} + +struct AssertStaticTransitions { + how_many: u32, + epoch_length: u64, + transitions: u64, +} + +impl AssertStaticTransitions { + fn run(self) { + let Self { + how_many, + epoch_length, + transitions, + } = self; + let _ = tempo_eyre::install(); + + let setup = Setup::new() + .how_many_signers(how_many) + .epoch_length(epoch_length) + .t2_time(0); + + let mut epoch_reached = false; + let mut dkg_successful = false; + let _first = run(setup, move |metric, value| { + assert_no_v1(metric, value); + + if metric.ends_with("_dkg_manager_ceremony_failures_total") { + let value = value.parse::().unwrap(); + assert_eq!(0, value); + } + + if metric.ends_with("_epoch_manager_latest_epoch") { + let value = value.parse::().unwrap(); + epoch_reached |= value >= transitions; + } + if metric.ends_with("_dkg_manager_ceremony_successes_total") { + let value = value.parse::().unwrap(); + dkg_successful |= value >= transitions; + } + + epoch_reached && dkg_successful + }); + } +} diff --git a/crates/e2e/src/tests/v2_at_genesis/mod.rs b/crates/e2e/src/tests/v2_at_genesis/mod.rs new file mode 100644 index 0000000000..a603decfc0 --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/mod.rs @@ -0,0 +1,30 @@ +//! Tests on chain DKG and epoch transition + +mod backfill; +mod consensus_rpc; +mod dkg; +mod restart; +mod simple; +mod snapshot; + +// FIXME: subblocks are currently flaky. Don't want to add extra flaky tests +// right now. +// mod subblocks; + +fn assert_no_v1(metric: &str, value: &str) { + if metric.ends_with("_dkg_manager_read_players_from_v1_contract_total") { + assert_eq!(0, value.parse::().unwrap()); + } + if metric.ends_with("_dkg_manager_syncing_players") { + assert_eq!(0, value.parse::().unwrap()); + } + if metric.ends_with("_dkg_manager_read_re_dkg_epoch_from_v1_contract_total") { + assert_eq!(0, value.parse::().unwrap()); + } +} + +fn assert_no_dkg_failure(metric: &str, value: &str) { + if metric.ends_with("_dkg_manager_ceremony_failures_total") { + assert_eq!(0, value.parse::().unwrap(),); + } +} diff --git a/crates/e2e/src/tests/v2_at_genesis/restart.rs b/crates/e2e/src/tests/v2_at_genesis/restart.rs new file mode 100644 index 0000000000..3fe221f8af --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/restart.rs @@ -0,0 +1,544 @@ +//! Tests for validator restart/kill scenarios +//! +//! These tests verify that validators can be killed and restarted, and that they +//! properly catch up to the rest of the network after restart. + +use std::time::Duration; + +use commonware_consensus::types::{Epocher, FixedEpocher, Height}; +use commonware_macros::test_traced; +use commonware_runtime::{ + Clock, Metrics as _, Runner as _, + deterministic::{self, Context, Runner}, +}; +use commonware_utils::NZU64; +use futures::future::join_all; +use rand_08::Rng; +use tracing::debug; + +use crate::{CONSENSUS_NODE_PREFIX, Setup, setup_validators}; + +#[test_traced("WARN")] +fn committee_of_one() { + SimpleRestart { + committee_size: 1, + epoch_length: 10, + restart_after: 5, + stop_at: 10, + connect_execution_layer: false, + } + .run() +} + +#[test_traced("WARN")] +fn committee_of_three() { + SimpleRestart { + committee_size: 3, + epoch_length: 10, + restart_after: 5, + stop_at: 10, + connect_execution_layer: false, + } + .run() +} + +struct SimpleRestart { + committee_size: u32, + epoch_length: u64, + restart_after: u64, + stop_at: u64, + connect_execution_layer: bool, +} + +impl SimpleRestart { + #[track_caller] + fn run(self) { + let Self { + committee_size, + epoch_length, + restart_after, + stop_at, + connect_execution_layer, + } = self; + let _ = tempo_eyre::install(); + + let setup = Setup::new() + .how_many_signers(committee_size) + .seed(0) + .epoch_length(epoch_length) + .t2_time(0) + .connect_execution_layer_nodes(connect_execution_layer); + + let cfg = deterministic::Config::default().with_seed(setup.seed); + let executor = Runner::from(cfg); + + executor.start(|mut context| async move { + let (mut validators, _execution_runtime) = + setup_validators(&mut context, setup.clone()).await; + + join_all(validators.iter_mut().map(|v| v.start(&context))).await; + + debug!( + height = restart_after, + "waiting for network to reach target height before stopping a validator", + ); + wait_for_height(&context, setup.how_many_signers, restart_after, false).await; + + validators[0].stop().await; + debug!(public_key = %validators[0].public_key(), "stopped validator"); + + // wait a bit to let the network settle; some finalizations come in later + context.sleep(Duration::from_secs(5)).await; + ensure_no_progress(&context, 5).await; + + validators[0].start(&context).await; + debug!( + public_key = %validators[0].public_key(), + "restarted validator", + ); + + debug!( + height = stop_at, + "waiting for reconstituted validators to reach target height to reach test success", + ); + wait_for_height(&context, validators.len() as u32, stop_at, false).await; + }) + } +} + +#[test_traced] +fn validator_catches_up_to_network_during_epoch() { + let _ = tempo_eyre::install(); + + RestartSetup { + epoch_length: 100, + shutdown_height: 5, + restart_height: 10, + final_height: 15, + assert_skips: false, + connect_execution_layer: false, + } + .run(); +} + +#[test_traced] +fn validator_catches_up_with_gap_of_one_epoch() { + let _ = tempo_eyre::install(); + + let epoch_length = 30; + RestartSetup { + epoch_length, + shutdown_height: epoch_length + 1, + restart_height: 2 * epoch_length + 1, + final_height: 3 * epoch_length + 1, + assert_skips: false, + connect_execution_layer: false, + } + .run(); +} + +#[test_traced] +fn validator_catches_up_with_gap_of_three_epochs() { + let _ = tempo_eyre::install(); + + let epoch_length = 30; + RestartSetup { + epoch_length, + shutdown_height: epoch_length + 1, + restart_height: 4 * epoch_length + 1, + final_height: 5 * epoch_length + 1, + assert_skips: true, + connect_execution_layer: true, + } + .run(); +} + +#[test_traced] +fn single_node_recovers_after_finalizing_ceremony() { + AssertNodeRecoversAfterFinalizingBlock { + n_validators: 1, + epoch_length: 6, + shutdown_after_finalizing: ShutdownAfterFinalizing::Ceremony, + } + .run() +} + +#[test_traced] +fn node_recovers_after_finalizing_ceremony_four_validators() { + AssertNodeRecoversAfterFinalizingBlock { + n_validators: 4, + epoch_length: 30, + shutdown_after_finalizing: ShutdownAfterFinalizing::Ceremony, + } + .run() +} + +#[test_traced] +fn node_recovers_after_finalizing_middle_of_epoch_four_validators() { + AssertNodeRecoversAfterFinalizingBlock { + n_validators: 4, + epoch_length: 30, + shutdown_after_finalizing: ShutdownAfterFinalizing::MiddleOfEpoch, + } + .run() +} + +#[test_traced] +fn node_recovers_before_finalizing_middle_of_epoch_four_validators() { + AssertNodeRecoversAfterFinalizingBlock { + n_validators: 4, + epoch_length: 30, + shutdown_after_finalizing: ShutdownAfterFinalizing::BeforeMiddleOfEpoch, + } + .run() +} + +#[test_traced] +fn single_node_recovers_after_finalizing_boundary() { + AssertNodeRecoversAfterFinalizingBlock { + n_validators: 1, + epoch_length: 10, + shutdown_after_finalizing: ShutdownAfterFinalizing::Boundary, + } + .run() +} + +#[test_traced] +fn node_recovers_after_finalizing_boundary_four_validators() { + AssertNodeRecoversAfterFinalizingBlock { + n_validators: 4, + epoch_length: 30, + shutdown_after_finalizing: ShutdownAfterFinalizing::Boundary, + } + .run() +} + +/// Test configuration for restart scenarios +#[derive(Clone)] +struct RestartSetup { + // The epoch length to use. + epoch_length: u64, + /// Height at which to shutdown a validator + shutdown_height: u64, + /// Height at which to restart the validator + restart_height: u64, + /// Final height that all validators (including restarted) must reach + final_height: u64, + /// Whether to assert that DKG rounds were skipped + assert_skips: bool, + /// Whether to connect the execution layer. + connect_execution_layer: bool, +} + +impl RestartSetup { + #[track_caller] + fn run(self) { + let Self { + epoch_length, + shutdown_height, + restart_height, + final_height, + assert_skips, + connect_execution_layer, + } = self; + let _ = tempo_eyre::install(); + + let setup = Setup::new() + .epoch_length(epoch_length) + .t2_time(0) + .connect_execution_layer_nodes(connect_execution_layer); + let cfg = deterministic::Config::default().with_seed(setup.seed); + let executor = Runner::from(cfg); + + executor.start(|mut context| async move { + let (mut validators, _execution_runtime) = + setup_validators(&mut context, setup.clone()).await; + + join_all(validators.iter_mut().map(|v| v.start(&context))).await; + + debug!( + height = shutdown_height, + "waiting for network to reach target height before stopping a validator", + ); + wait_for_height( + &context, + setup.how_many_signers, + shutdown_height, + false, + ) + .await; + + // Randomly select a validator to kill + let idx = context.gen_range(0..validators.len()); + validators[idx].stop().await; + + debug!(public_key = %validators[idx].public_key(), "stopped a random validator"); + + debug!( + height = restart_height, + "waiting for remaining validators to reach target height before restarting validator", + ); + wait_for_height( + &context, + setup.how_many_signers - 1, + restart_height, + false, + ) + .await; + + debug!("target height reached, restarting stopped validator"); + validators[idx].start(&context).await; + debug!( + public_key = %validators[idx].public_key(), + "restarted validator", + ); + + debug!( + height = final_height, + "waiting for reconstituted validators to reach target height to reach test success", + ); + wait_for_height( + &context, + setup.how_many_signers, + final_height, + assert_skips, + ) + .await; + }) + } +} + +/// Wait for a specific number of validators to reach a target height +async fn wait_for_height( + context: &Context, + expected_validators: u32, + target_height: u64, + assert_skips: bool, +) { + let mut skips_observed = false; + loop { + let metrics = context.encode(); + let mut validators_at_height = 0; + + for line in metrics.lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue; + } + + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + + // Check if this is a height metric + if metric.ends_with("_marshal_processed_height") { + let height = value.parse::().unwrap(); + if height >= target_height { + validators_at_height += 1; + } + } + if metric.ends_with("_rounds_skipped_total") { + let count = value.parse::().unwrap(); + skips_observed |= count > 0; + } + } + if validators_at_height == expected_validators { + assert!(!assert_skips || skips_observed); + break; + } + context.sleep(Duration::from_secs(1)).await; + } +} + +/// Ensures that no more finalizations happen. +async fn ensure_no_progress(context: &Context, tries: u32) { + let baseline = { + let metrics = context.encode(); + let mut height = None; + for line in metrics.lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue; + } + let mut parts = line.split_whitespace(); + let metrics = parts.next().unwrap(); + let value = parts.next().unwrap(); + if metrics.ends_with("_marshal_processed_height") { + let value = value.parse::().unwrap(); + if Some(value) > height { + height.replace(value); + } + } + } + height.expect("processed height is a metric") + }; + for _ in 0..=tries { + context.sleep(Duration::from_secs(1)).await; + + let metrics = context.encode(); + let mut height = None; + for line in metrics.lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue; + } + let mut parts = line.split_whitespace(); + let metrics = parts.next().unwrap(); + let value = parts.next().unwrap(); + if metrics.ends_with("_marshal_processed_height") { + let value = value.parse::().unwrap(); + if Some(value) > height { + height.replace(value); + } + } + } + let height = height.expect("processed height is a metric"); + if height != baseline { + panic!( + "height has changed, progress was made while the network was \ + stopped: baseline = `{baseline}`, progressed_to = `{height}`" + ); + } + } +} +enum ShutdownAfterFinalizing { + Boundary, + Ceremony, + BeforeMiddleOfEpoch, + MiddleOfEpoch, +} + +impl ShutdownAfterFinalizing { + fn is_target_height(&self, epoch_length: u64, block_height: Height) -> bool { + let epoch_strategy = FixedEpocher::new(NZU64!(epoch_length)); + match self { + // NOTE: ceremonies are finalized on the pre-to-last block, so + // block + 1 needs to be the boundary / last block. + Self::Ceremony => { + block_height.next() + == epoch_strategy + .containing(block_height.next()) + .unwrap() + .last() + } + Self::Boundary => { + block_height == epoch_strategy.containing(block_height).unwrap().last() + } + Self::BeforeMiddleOfEpoch => { + block_height.next().get().rem_euclid(epoch_length) == epoch_length / 2 + } + Self::MiddleOfEpoch => block_height.get().rem_euclid(epoch_length) == epoch_length / 2, + } + } +} + +impl std::fmt::Display for ShutdownAfterFinalizing { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let msg = match self { + Self::Boundary => "boundary", + Self::Ceremony => "ceremony", + Self::BeforeMiddleOfEpoch => "before-middle-of-epoch", + Self::MiddleOfEpoch => "middle-of-epoch", + }; + f.write_str(msg) + } +} + +struct AssertNodeRecoversAfterFinalizingBlock { + n_validators: u32, + epoch_length: u64, + shutdown_after_finalizing: ShutdownAfterFinalizing, +} + +impl AssertNodeRecoversAfterFinalizingBlock { + fn run(self) { + let _ = tempo_eyre::install(); + + let Self { + n_validators, + epoch_length, + shutdown_after_finalizing, + } = self; + + let setup = Setup::new() + .how_many_signers(n_validators) + .t2_time(0) + .epoch_length(epoch_length); + + let cfg = deterministic::Config::default().with_seed(setup.seed); + let executor = Runner::from(cfg); + + executor.start(|mut context| async move { + let (mut validators, _execution_runtime) = + setup_validators(&mut context, setup.clone()).await; + + join_all(validators.iter_mut().map(|node| node.start(&context))).await; + + // Catch a node right after it processed the pre-to-boundary height. + // Best-effort: we hot-loop in 100ms steps, but if processing is too + // fast we might miss the window and the test will succeed no matter + // what. + let (stopped_val_metric, height) = 'wait_to_boundary: loop { + let metrics = context.encode(); + 'lines: for line in metrics.lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue 'lines; + } + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + + if metric.ends_with("_marshal_processed_height") { + let value = value.parse::().unwrap(); + if shutdown_after_finalizing + .is_target_height(setup.epoch_length, Height::new(value)) + { + break 'wait_to_boundary (metric.to_string(), value); + } + } + } + context.sleep(Duration::from_millis(100)).await; + }; + + tracing::debug!( + stopped_val_metric, + height, + target = %shutdown_after_finalizing, + "found a node that finalized the target height", + ); + // Now restart the node for which we found the metric. + let idx = validators + .iter() + .position(|node| stopped_val_metric.contains(node.uid())) + .unwrap(); + let uid = validators[idx].uid.clone(); + validators[idx].stop().await; + validators[idx].start(&context).await; + + let mut iteration = 0; + 'look_for_progress: loop { + context.sleep(Duration::from_secs(1)).await; + let metrics = context.encode(); + 'lines: for line in metrics.lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue 'lines; + } + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + if metric.contains(&uid) + && metric.ends_with("_marshal_processed_height") + && value.parse::().unwrap() > height + 10 + { + break 'look_for_progress; + } + if metric.ends_with("ceremony_bad_dealings") { + assert_eq!(value.parse::().unwrap(), 0); + } + } + iteration += 1; + assert!( + iteration < 10, + "node did not progress for 10 iterations; restart on boundary likely failed" + ); + } + }); + } +} diff --git a/crates/e2e/src/tests/v2_at_genesis/simple.rs b/crates/e2e/src/tests/v2_at_genesis/simple.rs new file mode 100644 index 0000000000..46d55008b8 --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/simple.rs @@ -0,0 +1,96 @@ +//! Simple tests: just start and build a few blocks. +use std::time::Duration; + +use crate::{Setup, run, tests::v2_at_genesis::assert_no_v1}; +use commonware_macros::test_traced; +use commonware_p2p::simulated::Link; + +#[test_traced] +fn single_node() { + let _ = tempo_eyre::install(); + + let setup = Setup::new() + .how_many_signers(1) + .epoch_length(100) + .t2_time(0) + .seed(0); + let _first = run(setup, |metric, value| { + assert_no_v1(metric, value); + if metric.ends_with("_marshal_processed_height") { + let value = value.parse::().unwrap(); + value >= 5 + } else { + false + } + }); +} + +#[test_traced] +fn only_good_links() { + let _ = tempo_eyre::install(); + + let setup = Setup::new().epoch_length(100).t2_time(0).seed(42); + let _first = run(setup, |metric, value| { + assert_no_v1(metric, value); + if metric.ends_with("_marshal_processed_height") { + let value = value.parse::().unwrap(); + value >= 5 + } else { + false + } + }); +} + +#[test_traced] +fn many_bad_links() { + let _ = tempo_eyre::install(); + + let link = Link { + latency: Duration::from_millis(200), + jitter: Duration::from_millis(150), + success_rate: 0.75, + }; + + let setup = Setup::new() + .seed(42) + .epoch_length(100) + .t2_time(0) + .linkage(link); + + let _first = run(setup, |metric, value| { + assert_no_v1(metric, value); + if metric.ends_with("_marshal_processed_height") { + let value = value.parse::().unwrap(); + value >= 5 + } else { + false + } + }); +} + +#[test_traced] +fn reach_height_20_with_a_few_bad_links() { + let _ = tempo_eyre::install(); + + let link = Link { + latency: Duration::from_millis(80), + jitter: Duration::from_millis(10), + success_rate: 0.98, + }; + + let setup = Setup::new() + .how_many_signers(10) + .epoch_length(100) + .t2_time(0) + .linkage(link); + + run(setup, |metric, value| { + assert_no_v1(metric, value); + if metric.ends_with("_marshal_processed_height") { + let value = value.parse::().unwrap(); + value >= 20 + } else { + false + } + }); +} diff --git a/crates/e2e/src/tests/v2_at_genesis/snapshot.rs b/crates/e2e/src/tests/v2_at_genesis/snapshot.rs new file mode 100644 index 0000000000..451ba9dd2c --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/snapshot.rs @@ -0,0 +1,510 @@ +//! Tests for syncing nodes from scratch. +//! +//! These tests are similar to the tests in [`crate::tests::restart`], but +//! assume that the node has never been run but been given a synced execution +//! layer database./// Runs a validator restart test with the given configuration + +use std::time::Duration; + +use alloy::transports::http::reqwest::Url; +use commonware_consensus::types::{Epocher as _, FixedEpocher, Height}; +use commonware_macros::test_traced; +use commonware_runtime::{ + Clock as _, Metrics as _, Runner as _, + deterministic::{self, Context, Runner}, +}; +use commonware_utils::NZU64; +use futures::future::join_all; +use reth_ethereum::provider::BlockNumReader as _; +use tracing::info; + +use crate::{ + CONSENSUS_NODE_PREFIX, Setup, setup_validators, + tests::v2_at_genesis::dkg::common::wait_for_outcome, +}; + +/// This is a lengthy test. First, a validator needs to be run for a sufficiently +/// long time to populate its database. Then, a new validator is rotated in +/// by taking the replaced validator's database. This simulates starting from +/// a snapshot. +#[test_traced] +fn joins_from_snapshot() { + let _ = tempo_eyre::install(); + + let epoch_length = 20; + // Create a verifier that we will never start. It just the private keys + // we desire. + let setup = Setup::new() + .how_many_signers(4) + .how_many_verifiers(1) + .t2_time(0) + .connect_execution_layer_nodes(true) + .epoch_length(epoch_length); + let cfg = deterministic::Config::default().with_seed(setup.seed); + let executor = Runner::from(cfg); + + executor.start(|mut context| async move { + let (mut validators, execution_runtime) = + setup_validators(&mut context, setup.clone()).await; + + // The replacement validator that will start later. + let mut replacement = { + let idx = validators + .iter() + .position(|node| node.consensus_config().share.is_none()) + .expect("at least one node must be a verifier, i.e. not have a share"); + validators.remove(idx) + }; + join_all(validators.iter_mut().map(|v| v.start(&context))).await; + + // The validator that will donate it its database to the replacement. + let mut donor = validators.pop().unwrap(); + + let http_url = validators[0] + .execution() + .rpc_server_handle() + .http_url() + .unwrap() + .parse::() + .unwrap(); + + // Validator setup generated 2 different addresses for both validators. + // Make them the same so that ValidatorConfigV2.rotateValidator knows + // which one to target. + replacement.chain_address = donor.chain_address; + let receipt = execution_runtime + .rotate_validator(http_url, &replacement) + .await + .unwrap(); + + let rotate_height = Height::new(receipt.block_number.unwrap()); + tracing::debug!( + block.height = %rotate_height, + "validatorConfigV2.rotateValidator executed", + ); + + // Wait for the next DKG outcome - unless rotate_height is on a boundary. + // Then wait one more epoch. + let epoch_strat = FixedEpocher::new(NZU64!(epoch_length)); + let info = epoch_strat.containing(rotate_height).unwrap(); + let target_epoch = if info.last() == rotate_height { + info.epoch().next() + } else { + info.epoch() + }; + + let outcome_start_rotation = + wait_for_outcome(&context, &validators, target_epoch.get(), epoch_length).await; + + assert!( + outcome_start_rotation + .players() + .position(&donor.public_key()) + .is_some() + ); + assert!( + outcome_start_rotation + .next_players() + .position(&donor.public_key()) + .is_none() + ); + assert!( + outcome_start_rotation + .players() + .position(&replacement.public_key()) + .is_none() + ); + assert!( + outcome_start_rotation + .next_players() + .position(&replacement.public_key()) + .is_some() + ); + + let outcome_finish_rotation = wait_for_outcome( + &context, + &validators, + target_epoch.next().get(), + epoch_length, + ) + .await; + + assert!( + outcome_finish_rotation + .players() + .position(&donor.public_key()) + .is_none() + ); + assert!( + outcome_finish_rotation + .next_players() + .position(&donor.public_key()) + .is_none() + ); + assert!( + outcome_finish_rotation + .players() + .position(&replacement.public_key()) + .is_some() + ); + assert!( + outcome_finish_rotation + .next_players() + .position(&replacement.public_key()) + .is_some() + ); + + info!("new validator was added to the committee, but not started"); + + donor.stop().await; + let last_epoch_before_stop = latest_epoch_of_validator(&context, &donor.uid); + info!(%last_epoch_before_stop, "stopped the original validator"); + + // Now the old validator donates its database to the new validator. + // + // This works by assigning the replacement validator's fields to the + // old validator's. This way, the old validator "donates" its database + // to the replacement. This is to simulate a snapshot. + donor.uid = replacement.uid; + donor.private_key = replacement.private_key; + { + let peer_manager = replacement.consensus_config.peer_manager.clone(); + donor.consensus_config = replacement.consensus_config; + donor.consensus_config.peer_manager = peer_manager; + } + donor.network_address = replacement.network_address; + donor.chain_address = replacement.chain_address; + donor.start(&context).await; + + // Rename, so that it's less confusing below. + let replacement = donor; + + info!( + uid = %replacement.uid, + "started the validator with a changed identity", + ); + + loop { + context.sleep(Duration::from_secs(1)).await; + + let metrics = context.encode(); + let mut validators_at_epoch = 0; + + for line in metrics.lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue; + } + + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + + if metric.ends_with("_epoch_manager_latest_epoch") { + let epoch = value.parse::().unwrap(); + + assert!( + epoch < last_epoch_before_stop + 4, + "network advanced 4 epochs before without the new \ + validator catching up; there is likely a bug", + ); + + if metric.contains(&replacement.uid) { + assert!( + epoch >= last_epoch_before_stop, + "the replacement validator should never enter epochs \ + older than what is in the snapshot" + ); + } + + if epoch > last_epoch_before_stop { + validators_at_epoch += 1; + } + + if metric.contains(&replacement.uid) { + // -1 to account for stopping on boundaries. + assert!( + epoch >= last_epoch_before_stop.saturating_sub(1), + "when starting from snapshot, older epochs must never \ + had consensus engines running" + ); + } + } + } + if validators_at_epoch == 4 { + break; + } + } + }); +} + +/// This test is the same as `joins_from_snapshot`, but with the extra condition +/// that the validator can restart (stop, start), after having booted from a +/// snapshot. +#[test_traced] +fn can_restart_after_joining_from_snapshot() { + let _ = tempo_eyre::install(); + + let epoch_length = 20; + // Create a verifier that we will never start. It just the private keys + // we desire. + let setup = Setup::new() + .how_many_signers(4) + .how_many_verifiers(1) + .t2_time(0) + .connect_execution_layer_nodes(true) + .epoch_length(epoch_length); + let cfg = deterministic::Config::default().with_seed(setup.seed); + let executor = Runner::from(cfg); + + executor.start(|mut context| async move { + let (mut validators, execution_runtime) = + setup_validators(&mut context, setup.clone()).await; + + // The replacement validator that will start later. + let mut replacement = { + let idx = validators + .iter() + .position(|node| node.consensus_config().share.is_none()) + .expect("at least one node must be a verifier, i.e. not have a share"); + validators.remove(idx) + }; + join_all(validators.iter_mut().map(|v| v.start(&context))).await; + + // The validator that will donate it its database to the replacement. + let mut donor = validators.pop().unwrap(); + + let http_url = validators[0] + .execution() + .rpc_server_handle() + .http_url() + .unwrap() + .parse::() + .unwrap(); + + // Validator setup generated 2 different addresses for both validators. + // Make them the same so that ValidatorConfigV2.rotateValidator knows + // which one to target. + replacement.chain_address = donor.chain_address; + let receipt = execution_runtime + .rotate_validator(http_url, &replacement) + .await + .unwrap(); + + let rotate_height = Height::new(receipt.block_number.unwrap()); + tracing::debug!( + block.height = %rotate_height, + "validatorConfigV2.rotateValidator executed", + ); + + // Wait for the next DKG outcome - unless rotate_height is on a boundary. + // Then wait one more epoch. + let epoch_strat = FixedEpocher::new(NZU64!(epoch_length)); + let info = epoch_strat.containing(rotate_height).unwrap(); + let target_epoch = if info.last() == rotate_height { + info.epoch().next() + } else { + info.epoch() + }; + + let outcome_start_rotation = + wait_for_outcome(&context, &validators, target_epoch.get(), epoch_length).await; + + assert!( + outcome_start_rotation + .players() + .position(&donor.public_key()) + .is_some() + ); + assert!( + outcome_start_rotation + .next_players() + .position(&donor.public_key()) + .is_none() + ); + assert!( + outcome_start_rotation + .players() + .position(&replacement.public_key()) + .is_none() + ); + assert!( + outcome_start_rotation + .next_players() + .position(&replacement.public_key()) + .is_some() + ); + + let outcome_finish_rotation = wait_for_outcome( + &context, + &validators, + target_epoch.next().get(), + epoch_length, + ) + .await; + + assert!( + outcome_finish_rotation + .players() + .position(&donor.public_key()) + .is_none() + ); + assert!( + outcome_finish_rotation + .next_players() + .position(&donor.public_key()) + .is_none() + ); + assert!( + outcome_finish_rotation + .players() + .position(&replacement.public_key()) + .is_some() + ); + assert!( + outcome_finish_rotation + .next_players() + .position(&replacement.public_key()) + .is_some() + ); + + info!("new validator was added to the committee, but not started"); + + donor.stop().await; + let last_epoch_before_stop = latest_epoch_of_validator(&context, &donor.uid); + info!(%last_epoch_before_stop, "stopped the original validator"); + + // Now the old validator donates its database to the new validator. + // + // This works by assigning the replacement validator's fields to the + // old validator's. This way, the old validator "donates" its database + // to the replacement. This is to simulate a snapshot. + donor.uid = replacement.uid; + donor.private_key = replacement.private_key; + { + let peer_manager = replacement.consensus_config.peer_manager.clone(); + donor.consensus_config = replacement.consensus_config; + donor.consensus_config.peer_manager = peer_manager; + } + donor.network_address = replacement.network_address; + donor.chain_address = replacement.chain_address; + donor.start(&context).await; + + // Rename, so that it's less confusing below. + let mut replacement = donor; + + info!( + uid = %replacement.uid, + "started the validator with a changed identity", + ); + + loop { + context.sleep(Duration::from_secs(1)).await; + + let metrics = context.encode(); + let mut validators_at_epoch = 0; + + for line in metrics.lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue; + } + + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + + if metric.ends_with("_epoch_manager_latest_epoch") { + let epoch = value.parse::().unwrap(); + + assert!( + epoch < last_epoch_before_stop + 4, + "network advanced 4 epochs before without the new \ + validator catching up; there is likely a bug", + ); + + if metric.contains(&replacement.uid) { + assert!( + epoch >= last_epoch_before_stop, + "the replacement validator should never enter epochs \ + older than what is in the snapshot" + ); + } + + if epoch > last_epoch_before_stop { + validators_at_epoch += 1; + } + + if metric.contains(&replacement.uid) { + // -1 to account for stopping on boundaries. + assert!( + epoch >= last_epoch_before_stop.saturating_sub(1), + "when starting from snapshot, older epochs must never \ + had consensus engines running" + ); + } + } + } + if validators_at_epoch == 4 { + break; + } + } + + // Restart the node. This ensures that it's state is still sound after + // doing a snapshot sync. + replacement.stop().await; + + let network_head = validators[0] + .execution_provider() + .best_block_number() + .unwrap(); + + replacement.start(&context).await; + + info!( + network_head, + "restarting the node and waiting for it to catch up" + ); + + 'progress: loop { + context.sleep(Duration::from_secs(1)).await; + + let metrics = context.encode(); + + for line in metrics.lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue; + } + + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + + if metric.contains(&replacement.uid) + && metric.ends_with("_marshal_processed_height") + && value.parse::().unwrap() > network_head + { + break 'progress; + } + } + } + }); +} + +fn latest_epoch_of_validator(context: &Context, id: &str) -> u64 { + let metrics = context.encode(); + + for line in metrics.lines() { + if !line.starts_with(CONSENSUS_NODE_PREFIX) { + continue; + } + + let mut parts = line.split_whitespace(); + let metric = parts.next().unwrap(); + let value = parts.next().unwrap(); + + if metric.ends_with("_epoch_manager_latest_epoch") && metric.contains(id) { + return value.parse::().unwrap(); + } + } + + panic!("validator had no entry for latest epoch"); +} diff --git a/crates/e2e/src/tests/v2_at_genesis/subblocks.rs b/crates/e2e/src/tests/v2_at_genesis/subblocks.rs new file mode 100644 index 0000000000..6687e6c610 --- /dev/null +++ b/crates/e2e/src/tests/v2_at_genesis/subblocks.rs @@ -0,0 +1,442 @@ +use std::{collections::HashMap, time::Duration}; + +use alloy::{ + consensus::{Transaction, TxReceipt}, + rlp::Decodable, + signers::local::PrivateKeySigner, +}; +use alloy_network::{TxSignerSync, eip2718::Encodable2718}; +use alloy_primitives::{Address, TxHash, U256, b256}; +use commonware_macros::test_traced; +use commonware_runtime::{ + Runner as _, + deterministic::{Config, Runner}, +}; +use futures::{StreamExt, future::join_all}; +use reth_ethereum::{ + chainspec::{ChainSpecProvider, EthChainSpec}, + rpc::eth::EthApiServer, +}; +use reth_node_builder::ConsensusEngineEvent; +use reth_node_core::primitives::transaction::TxHashRef; +use tempo_chainspec::spec::{SYSTEM_TX_COUNT, TEMPO_T1_BASE_FEE}; +use tempo_node::primitives::{ + SubBlockMetadata, TempoTransaction, TempoTxEnvelope, + subblock::{PartialValidatorKey, TEMPO_SUBBLOCK_NONCE_KEY_PREFIX}, + transaction::{Call, calc_gas_balance_spending}, +}; +use tempo_precompiles::{ + DEFAULT_FEE_TOKEN, NONCE_PRECOMPILE_ADDRESS, nonce::NonceManager, tip20::TIP20Token, +}; + +use tempo_node::consensus::TEMPO_SHARED_GAS_DIVISOR; + +use crate::{Setup, TestingNode, setup_validators}; + +#[test_traced] +fn subblocks_are_included() { + let _ = tempo_eyre::install(); + + Runner::from(Config::default().with_seed(0)).start(|mut context| async move { + let how_many_signers = 4; + + let setup = Setup::new() + .how_many_signers(how_many_signers) + .epoch_length(10); + + // Setup and start all nodes. + let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup.clone()).await; + + let mut fee_recipients = Vec::new(); + + for node in &mut nodes { + // Due to how Commonware deterministic runtime behaves in CI, we need to bump this timeout + // to ensure that payload builder has enough time to accumulate subblocks. + node.consensus_config_mut().new_payload_wait_time = Duration::from_millis(500); + + let fee_recipient = Address::random(); + node.consensus_config_mut().fee_recipient = fee_recipient; + fee_recipients.push(fee_recipient); + } + + join_all(nodes.iter_mut().map(|node| node.start(&context))).await; + + let mut stream = nodes[0] + .execution() + .add_ons_handle + .engine_events + .new_listener(); + + let mut expected_transactions: Vec = Vec::new(); + while let Some(update) = stream.next().await { + let block = match update { + ConsensusEngineEvent::BlockReceived(_) + | ConsensusEngineEvent::ForkchoiceUpdated(_, _) + | ConsensusEngineEvent::CanonicalChainCommitted(_, _) => continue, + ConsensusEngineEvent::ForkBlockAdded(_, _) => unreachable!("unexpected reorg"), + ConsensusEngineEvent::InvalidBlock(_) => unreachable!("unexpected invalid block"), + ConsensusEngineEvent::CanonicalBlockAdded(block, _) => block, + }; + + let receipts = &block.execution_outcome().receipts; + + // Assert that block only contains our subblock transactions and the system transactions + assert_eq!( + block.sealed_block().body().transactions.len(), + SYSTEM_TX_COUNT + expected_transactions.len() + ); + + // Assert that all expected transactions are included in the block. + for tx in expected_transactions.drain(..) { + if !block + .sealed_block() + .body() + .transactions + .iter() + .any(|t| t.tx_hash() == *tx) + { + panic!("transaction {tx} was not included"); + } + } + + // Assert that all transactions were successful + for receipt in receipts { + assert!(receipt.status()); + } + + if !expected_transactions.is_empty() { + let fee_token_storage = &block + .execution_outcome() + .state + .state + .get(&DEFAULT_FEE_TOKEN) + .unwrap() + .storage; + + // Assert that all validators were paid for their subblock transactions + for fee_recipient in &fee_recipients { + let balance_slot = TIP20Token::from_address(DEFAULT_FEE_TOKEN) + .unwrap() + .balances[*fee_recipient] + .slot(); + let slot = fee_token_storage.get(&balance_slot).unwrap(); + + assert!(slot.present_value > slot.original_value()); + } + } + + // Exit once we reach height 20. + if block.block_number() == 20 { + break; + } + + // Send subblock transactions to all nodes. + for node in nodes.iter() { + for _ in 0..5 { + expected_transactions.push(submit_subblock_tx(node).await); + } + } + } + }); +} + +#[test_traced] +fn subblocks_are_included_with_failing_txs() { + let _ = tempo_eyre::install(); + + Runner::from(Config::default().with_seed(0)).start(|mut context| async move { + let how_many_signers = 5; + + let setup = Setup::new() + .how_many_signers(how_many_signers) + .epoch_length(10); + + // Setup and start all nodes. + let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup.clone()).await; + + let mut fee_recipients = Vec::new(); + + for node in &mut nodes { + // Due to how Commonware deterministic runtime behaves in CI, we need to bump this timeout + // to ensure that payload builder has enough time to accumulate subblocks. + node.consensus_config_mut().new_payload_wait_time = Duration::from_millis(500); + + let fee_recipient = Address::random(); + node.consensus_config_mut().fee_recipient = fee_recipient; + fee_recipients.push(fee_recipient); + } + + join_all(nodes.iter_mut().map(|node| node.start(&context))).await; + + let mut stream = nodes[0] + .execution() + .add_ons_handle + .engine_events + .new_listener(); + + let mut expected_transactions: Vec = Vec::new(); + let mut failing_transactions: Vec = Vec::new(); + while let Some(update) = stream.next().await { + let block = match update { + ConsensusEngineEvent::BlockReceived(_) + | ConsensusEngineEvent::ForkchoiceUpdated(_, _) + | ConsensusEngineEvent::CanonicalChainCommitted(_, _) => continue, + ConsensusEngineEvent::ForkBlockAdded(_, _) => unreachable!("unexpected reorg"), + ConsensusEngineEvent::InvalidBlock(_) => unreachable!("unexpected invalid block"), + ConsensusEngineEvent::CanonicalBlockAdded(block, _) => block, + }; + let receipts = &block.execution_outcome().receipts; + + // Assert that block only contains our subblock transactions and system transactions + assert_eq!( + block.sealed_block().body().transactions.len(), + SYSTEM_TX_COUNT + expected_transactions.len() + ); + + // Assert that all expected transactions are included in the block. + for tx in expected_transactions.drain(..) { + if !block + .sealed_block() + .body() + .transactions + .iter() + .any(|t| t.tx_hash() == *tx) + { + panic!("transaction {tx} was not included"); + } + } + + let fee_recipients = Vec::::decode( + &mut block + .sealed_block() + .body() + .transactions + .last() + .unwrap() + .input() + .as_ref(), + ) + .unwrap() + .into_iter() + .map(|metadata| { + ( + PartialValidatorKey::from_slice(&metadata.validator[..15]), + metadata.fee_recipient, + ) + }) + .collect::>(); + + let mut expected_fees = HashMap::new(); + let mut cumulative_gas_used = 0; + + for (receipt, tx) in receipts + .iter() + .zip(block.recovered_block().transactions_recovered()) + { + if !expected_transactions.contains(tx.tx_hash()) { + continue; + } + + let fee_recipient = fee_recipients + .get(&tx.subblock_proposer().unwrap()) + .unwrap(); + *expected_fees.entry(fee_recipient).or_insert(U256::ZERO) += + calc_gas_balance_spending( + receipt.cumulative_gas_used - cumulative_gas_used, + TEMPO_T1_BASE_FEE as u128, + ); + cumulative_gas_used = receipt.cumulative_gas_used; + + if !failing_transactions.contains(tx.tx_hash()) { + assert!(receipt.status()); + assert!(receipt.cumulative_gas_used > 0); + continue; + } + + let sender = tx.signer(); + let nonce_key = tx.as_aa().unwrap().tx().nonce_key; + let nonce_slot = NonceManager::new().nonces[sender][nonce_key].slot(); + + let slot = block + .execution_outcome() + .state + .state + .get(&NONCE_PRECOMPILE_ADDRESS) + .unwrap() + .storage + .get(&nonce_slot) + .unwrap(); + + // Assert that all failing transactions have bumped the nonce and resulted in a failing receipt + assert!(slot.present_value == slot.original_value() + U256::ONE); + assert!(!receipt.status()); + assert!(receipt.logs().is_empty()); + assert_eq!(receipt.cumulative_gas_used, 0); + } + + for (fee_recipient, expected_fee) in expected_fees { + let fee_token_storage = &block + .execution_outcome() + .state + .state + .get(&DEFAULT_FEE_TOKEN) + .unwrap() + .storage; + + // Assert that all validators were paid for their subblock transactions + let balance_slot = TIP20Token::from_address(DEFAULT_FEE_TOKEN) + .unwrap() + .balances[*fee_recipient] + .slot(); + let slot = fee_token_storage.get(&balance_slot).unwrap(); + + assert_eq!(slot.present_value, slot.original_value() + expected_fee); + } + + // Exit once we reach height 20. + if block.block_number() == 20 { + break; + } + + // Send subblock transactions to all nodes. + // TIP-1000 charges 250k gas for new account creation, so txs from random signers + // need ~300k intrinsic gas. With 600k per-validator budget (5 validators), we fit 2 txs. + for node in nodes.iter() { + for _ in 0..5 { + // Randomly submit some of the transactions from a new signer that doesn't have any funds + if rand_08::random::() { + let tx = + submit_subblock_tx_from(node, &PrivateKeySigner::random(), 1_000_000) + .await; + failing_transactions.push(tx); + expected_transactions.push(tx); + tx + } else { + let tx = submit_subblock_tx(node).await; + expected_transactions.push(tx); + tx + }; + } + } + } + }); +} + +#[test_traced] +fn oversized_subblock_txs_are_removed() { + let _ = tempo_eyre::install(); + + Runner::from(Config::default().with_seed(42)).start(|mut context| async move { + let how_many_signers = 4; + + let setup = Setup::new() + .how_many_signers(how_many_signers) + .epoch_length(10); + + let (mut nodes, _execution_runtime) = setup_validators(&mut context, setup.clone()).await; + + for node in &mut nodes { + node.consensus_config_mut().new_payload_wait_time = Duration::from_millis(500); + } + + join_all(nodes.iter_mut().map(|node| node.start(&context))).await; + + let mut stream = nodes[0] + .execution() + .add_ons_handle + .engine_events + .new_listener(); + + let (mut oversized_tx_hash, mut submitted) = (None, false); + + while let Some(update) = stream.next().await { + let block = match update { + ConsensusEngineEvent::CanonicalBlockAdded(block, _) => block, + _ => continue, + }; + + // After first block, submit an oversized transaction + if !submitted && block.block_number() >= 1 { + let block_gas_limit = block.sealed_block().header().inner.gas_limit; + let gas_budget = + block_gas_limit / TEMPO_SHARED_GAS_DIVISOR / how_many_signers as u64; + + oversized_tx_hash = Some( + submit_subblock_tx_from(&nodes[0], &PrivateKeySigner::random(), gas_budget + 1) + .await, + ); + + submitted = true; + } + + // Check results after submission - verify oversized tx is never included + if submitted && block.block_number() >= 3 { + let txs = &block.sealed_block().body().transactions; + + // Oversized tx should NOT be included in any block + if let Some(hash) = oversized_tx_hash { + assert!( + !txs.iter().any(|t| t.tx_hash() == *hash), + "oversized transaction should not be included in block" + ); + } + } + + if block.block_number() >= 10 { + break; + } + } + }); +} + +async fn submit_subblock_tx( + node: &TestingNode, +) -> TxHash { + // First signer of the test mnemonic + let wallet = PrivateKeySigner::from_bytes(&b256!( + "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" + )) + .unwrap(); + + submit_subblock_tx_from(node, &wallet, 300_000).await +} + +async fn submit_subblock_tx_from( + node: &TestingNode, + wallet: &PrivateKeySigner, + gas_limit: u64, +) -> TxHash { + let mut nonce_bytes = rand_08::random::<[u8; 32]>(); + nonce_bytes[0] = TEMPO_SUBBLOCK_NONCE_KEY_PREFIX; + nonce_bytes[1..16].copy_from_slice(&node.public_key().as_ref()[..15]); + + let provider = node.execution_provider(); + + let gas_price = TEMPO_T1_BASE_FEE as u128; + + let mut tx = TempoTransaction { + chain_id: provider.chain_spec().chain_id(), + calls: vec![Call { + to: Address::ZERO.into(), + input: Default::default(), + value: Default::default(), + }], + gas_limit, + nonce_key: U256::from_be_bytes(nonce_bytes), + max_fee_per_gas: gas_price, + max_priority_fee_per_gas: gas_price, + ..Default::default() + }; + assert!(tx.subblock_proposer().unwrap().matches(node.public_key())); + let signature = wallet.sign_transaction_sync(&mut tx).unwrap(); + + let tx = TempoTxEnvelope::AA(tx.into_signed(signature.into())); + let tx_hash = *tx.tx_hash(); + node.execution() + .eth_api() + .send_raw_transaction(tx.encoded_2718().into()) + .await + .unwrap(); + + tx_hash +} diff --git a/crates/precompiles/src/validator_config_v2/mod.rs b/crates/precompiles/src/validator_config_v2/mod.rs index fcfd66c656..3e65fa2061 100644 --- a/crates/precompiles/src/validator_config_v2/mod.rs +++ b/crates/precompiles/src/validator_config_v2/mod.rs @@ -601,7 +601,6 @@ impl ValidatorConfigV2 { if deactivated_at_height == 0 { self.active_ingress_ips[ingress_hash].write(true)?; } - Ok(()) }