diff --git a/Cargo.lock b/Cargo.lock index b4b7840b..9e6dae30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5360,6 +5360,7 @@ dependencies = [ "rand", "rand_chacha", "regex", + "reqwest 0.12.0", "serde", "serde_json", "serde_yaml", @@ -5372,6 +5373,7 @@ dependencies = [ "tracing", "tracing-appender", "tracing-subscriber", + "url", "wildmatch 2.3.3", ] @@ -6400,6 +6402,7 @@ dependencies = [ "form_urlencoded", "idna 0.5.0", "percent-encoding", + "serde", ] [[package]] diff --git a/crates/aot/src/ledger/query.rs b/crates/aot/src/ledger/query.rs index 81076f5f..24550ff7 100644 --- a/crates/aot/src/ledger/query.rs +++ b/crates/aot/src/ledger/query.rs @@ -81,6 +81,8 @@ impl LedgerQuery { .route("/mainnet/block/hash/latest", get(Self::latest_hash)) .route("/mainnet/transaction/broadcast", post(Self::broadcast_tx)) .route("/block", post(Self::add_block)) + // TODO: for ahead of time ledger generation, support a /beacon_block endpoint to write beacon block + // TODO: api to get and decrypt records for a private key .with_state(Arc::new(state)); let listener = tokio::net::TcpListener::bind(SocketAddr::new(self.bind, self.port)).await?; diff --git a/crates/snot-agent/Cargo.toml b/crates/snot-agent/Cargo.toml index dc33bbc2..60ac4d85 100644 --- a/crates/snot-agent/Cargo.toml +++ b/crates/snot-agent/Cargo.toml @@ -15,7 +15,7 @@ futures-util.workspace = true http.workspace = true httpdate = "1.0.3" local-ip-address = "0.6.1" -reqwest = { version = "0.12.0", features = ["stream"] } +reqwest = { workspace = true, features = ["stream", "json"] } snot-common = { path = "../snot-common" } tarpc.workspace = true tokio.workspace = true diff --git a/crates/snot-agent/src/rpc.rs b/crates/snot-agent/src/rpc.rs index a0ce8dcf..6287067c 100644 --- a/crates/snot-agent/src/rpc.rs +++ b/crates/snot-agent/src/rpc.rs @@ -3,7 +3,7 @@ use std::{collections::HashSet, net::IpAddr, ops::Deref, process::Stdio, sync::A use snot_common::{ rpc::{ agent::{ - AgentMetric, AgentService, AgentServiceRequest, AgentServiceResponse, ReconcileError, + AgentError, AgentMetric, AgentService, AgentServiceRequest, AgentServiceResponse, ReconcileError, }, control::{ControlServiceRequest, ControlServiceResponse}, MuxMessage, @@ -369,6 +369,27 @@ impl AgentService for AgentRpcServer { ) } + async fn get_state_root(self, _: context::Context) -> Result { + if !matches!( + self.state.agent_state.read().await.deref(), + AgentState::Node(_, _) + ) { + return Err(AgentError::InvalidState); + } + + let url = format!( + "http://127.0.0.1:{}/mainnet/latest/stateRoot", + self.state.cli.rest + ); + let response = reqwest::get(&url) + .await + .map_err(|_| AgentError::FailedToMakeRequest)?; + response + .json() + .await + .map_err(|_| AgentError::FailedToParseJson) + } + async fn get_metric(self, _: context::Context, metric: AgentMetric) -> f64 { let metrics = self.state.metrics.read().await; diff --git a/crates/snot-common/src/rpc/agent.rs b/crates/snot-common/src/rpc/agent.rs index 2128261e..77f3c156 100644 --- a/crates/snot-common/src/rpc/agent.rs +++ b/crates/snot-common/src/rpc/agent.rs @@ -20,6 +20,8 @@ pub trait AgentService { /// state. async fn reconcile(to: AgentState) -> Result<(), ReconcileError>; + /// Get the state root from the running node + async fn get_state_root() -> Result; async fn get_metric(metric: AgentMetric) -> f64; } @@ -35,6 +37,15 @@ pub enum ReconcileError { Unknown, } +#[derive(Debug, Error, Serialize, Deserialize)] +pub enum AgentError { + #[error("invalid agent state")] + InvalidState, + #[error("failed to parse json")] + FailedToParseJson, + #[error("failed to make a request")] + FailedToMakeRequest, +} #[derive(Debug, Clone, Serialize, Deserialize)] pub enum AgentMetric { Tps, diff --git a/crates/snot-common/src/state.rs b/crates/snot-common/src/state.rs index 754d0150..98c68453 100644 --- a/crates/snot-common/src/state.rs +++ b/crates/snot-common/src/state.rs @@ -16,7 +16,8 @@ pub enum AgentState { #[default] // A node in the inventory can function as a transaction cannon Inventory, - Node(StorageId, NodeState), + /// Test id mapping to node state + Node(usize, NodeState), } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/snot/Cargo.toml b/crates/snot/Cargo.toml index 85026380..6027c503 100644 --- a/crates/snot/Cargo.toml +++ b/crates/snot/Cargo.toml @@ -18,6 +18,7 @@ lazy_static.workspace = true rand.workspace = true rand_chacha.workspace = true regex.workspace = true +reqwest = { workspace = true, features = ["stream"] } serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true @@ -32,4 +33,5 @@ tower-http.workspace = true tracing-appender.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +url = { workspace = true, features = ["serde"] } wildmatch = "2.3.3" diff --git a/crates/snot/src/cannon/mod.rs b/crates/snot/src/cannon/mod.rs new file mode 100644 index 00000000..6773b4d0 --- /dev/null +++ b/crates/snot/src/cannon/mod.rs @@ -0,0 +1,199 @@ +mod net; +pub mod router; +pub mod sink; +pub mod source; + +use std::{ + collections::HashSet, + sync::{atomic::AtomicU32, Arc, Weak}, +}; + +use anyhow::{bail, Result}; + +use tokio::{ + sync::{mpsc::UnboundedSender, Mutex as AsyncMutex}, + task::AbortHandle, +}; +use tracing::warn; + +use crate::{cannon::source::LedgerQueryService, state::GlobalState, testing::Environment}; + +use self::{sink::TxSink, source::TxSource}; + +/* + +STEP ONE +cannon transaction source: (GEN OR PLAYBACK) +- AOT: storage file +- REALTIME: generate executions from available agents?? via rpc + + +STEP 2 +cannon query source: +/cannon//mainnet/latest/stateRoot forwards to one of the following: +- REALTIME-(GEN|PLAYBACK): (test_id, node-key) with a rest ports Client/Validator only +- AOT-GEN: ledger service locally (file mode) +- AOT-PLAYBACK: n/a + +STEP 3 +cannon broadcast ALWAYS HITS control plane at +/cannon//mainnet/transaction/broadcast +cannon TX OUTPUT pointing at +- REALTIME: (test_id, node-key) +- AOT: file + + +cannon rate +cannon buffer size +burst mode?? + +*/ + +/// Transaction cannon state +/// using the `TxSource` and `TxSink` for configuration. +#[derive(Debug)] +pub struct CannonInstance { + // a copy of the global state + global_state: Arc, + + source: TxSource, + sink: TxSink, + + /// The test_id/storage associated with this cannon. + /// To point at an external node, create a topology with external node + /// To generate ahead-of-time, upload a test with a timeline referencing a + /// cannon pointing at a file + env: Weak, + + /// Local query service port. Only present if the TxSource uses a local query source. + query_port: Option, + + // TODO: run the actual cannon in this task + task: AsyncMutex, + + /// channel to send transactions to the the task + tx_sender: UnboundedSender, + fired_txs: AtomicU32, +} + +impl CannonInstance { + /// Create a new active transaction cannon + /// with the given source and sink. + /// + /// Locks the global state's tests and storage for reading. + pub async fn new( + global_state: Arc, + source: TxSource, + sink: TxSink, + test_id: usize, + ) -> Result { + // mapping with async is ugly and blocking_read is scary + let env = { + let Some(env) = global_state.envs.read().await.get(&test_id).cloned() else { + bail!("test {test_id} not found") + }; + + env + }; + let env2 = env.clone(); + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let tx_sender = tx.clone(); + + let query_port = source.get_query_port()?; + + let fired_txs = AtomicU32::new(0); + + let handle = tokio::spawn(async move { + // TODO: write tx to sink at desired rate + let _tx = rx.recv().await; + + // TODO: if a sink or a source uses node_keys or storage + // env will be used + println!("{}", env2.storage.id); + + // compare the tx id to an authorization id + let _pending_txs = HashSet::::new(); + + // TODO: if a local query service exists, spawn it here + // kill on drop + + // TODO: determine the rate that transactions need to be created + // based on the sink + + // TODO: if source is realtime, generate authorizations and + // send them to any available agent + + std::future::pending::<()>().await + }); + + Ok(Self { + global_state, + source, + sink, + env: Arc::downgrade(&env), + tx_sender, + query_port, + task: AsyncMutex::new(handle.abort_handle()), + fired_txs, + }) + } + + /// Called by axum to forward /cannon//mainnet/latest/stateRoot + /// to the ledger query service's /mainnet/latest/stateRoot + pub async fn proxy_state_root(&self) -> Result { + match &self.source { + TxSource::RealTime { query, .. } => match query { + LedgerQueryService::Local(qs) => { + if let Some(port) = self.query_port { + qs.get_state_root(port).await + } else { + bail!("cannon is missing a query port") + } + } + LedgerQueryService::Node(key) => { + let Some(env) = self.env.upgrade() else { + unreachable!("called from a place where env is present") + }; + + // env_id must be Some because LedgerQueryService::Node requires it + let Some(agent_id) = env.get_agent_by_key(key) else { + bail!("cannon target agent not found") + }; + + let Some(client) = self.global_state.get_client(agent_id).await else { + bail!("cannon target agent is offline") + }; + + // call client's rpc method to get the state root + // this will fail if the client is not running a node + client.get_state_root().await + } + }, + TxSource::Playback { .. } => { + bail!("cannon is configured to playback from file.") + } + } + } + + /// Called by axum to forward /cannon//mainnet/transaction/broadcast + /// to the desired sink + pub fn proxy_broadcast(&self, body: String) -> Result<()> { + match &self.source { + TxSource::RealTime { .. } => { + self.tx_sender.send(body)?; + } + TxSource::Playback { .. } => { + warn!("cannon received broadcasted transaction in playback mode. ignoring.") + } + } + Ok(()) + } +} + +impl Drop for CannonInstance { + fn drop(&mut self) { + // cancel the task on drop + self.task.blocking_lock().abort(); + } +} diff --git a/crates/snot/src/cannon/net.rs b/crates/snot/src/cannon/net.rs new file mode 100644 index 00000000..6edb20e9 --- /dev/null +++ b/crates/snot/src/cannon/net.rs @@ -0,0 +1,7 @@ +use std::net::{Ipv4Addr, SocketAddrV4, TcpListener}; + +/// Get an available port on the local machine. +pub fn get_available_port() -> Option { + let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0); + Some(TcpListener::bind(addr).ok()?.local_addr().ok()?.port()) +} diff --git a/crates/snot/src/cannon/router.rs b/crates/snot/src/cannon/router.rs new file mode 100644 index 00000000..239b4b2d --- /dev/null +++ b/crates/snot/src/cannon/router.rs @@ -0,0 +1,68 @@ +use axum::{ + extract::{Path, State}, + response::{IntoResponse, Response}, + routing::{get, post}, + Json, Router, +}; +use reqwest::StatusCode; +use serde_json::json; + +use crate::state::AppState; + +pub(crate) fn redirect_cannon_routes() -> Router { + Router::new() + .route("/:env/:id/mainnet/latest/stateRoot", get(state_root)) + .route("/:env/:id/mainnet/transaction/broadcast", post(transaction)) +} + +async fn state_root( + Path((env_id, cannon_id)): Path<(usize, usize)>, + state: State, +) -> Response { + let Some(env) = ({ + let env = state.envs.read().await; + env.get(&env_id).cloned() + }) else { + return StatusCode::NOT_FOUND.into_response(); + }; + + let Some(cannon) = env.cannons.get(&cannon_id) else { + return StatusCode::NOT_FOUND.into_response(); + }; + + match cannon.proxy_state_root().await { + // the nodes expect this state root to be string escaped json + Ok(root) => Json(json!(root)).into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("{e}")})), + ) + .into_response(), + } +} + +async fn transaction( + Path((env_id, cannon_id)): Path<(usize, usize)>, + state: State, + body: String, +) -> Response { + let Some(env) = ({ + let env = state.envs.read().await; + env.get(&env_id).cloned() + }) else { + return StatusCode::NOT_FOUND.into_response(); + }; + + let Some(cannon) = env.cannons.get(&cannon_id) else { + return StatusCode::NOT_FOUND.into_response(); + }; + + match cannon.proxy_broadcast(body) { + Ok(_) => StatusCode::OK.into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("{e}")})), + ) + .into_response(), + } +} diff --git a/crates/snot/src/cannon/sink.rs b/crates/snot/src/cannon/sink.rs new file mode 100644 index 00000000..688618d4 --- /dev/null +++ b/crates/snot/src/cannon/sink.rs @@ -0,0 +1,35 @@ +use serde::Deserialize; + +use crate::schema::NodeTargets; + +#[derive(Clone, Debug, Deserialize)] +pub enum TxSink { + /// Write transactions to a file + Record { + /// filename for the recording txs list + name: String, + }, + //// Write transactions to a ledger query service + // AoTAppend { + // // information for running .. another ledger service + // // solely for appending blocks to a ledger... + // // storage_id: usize, + // // port: u16, + // /// Number of transactions per block + // tx_per_block: u32, + // }, + /// Send transactions to nodes in a env + RealTime { + /// The nodes to send transactions to + /// + /// Requires cannon to have an associated env_id + target: NodeTargets, + + /// How long between each burst of transactions + burst_delay_ms: u32, + /// How many transactions to fire off in each burst + tx_per_burst: u32, + /// How long between each transaction in a burst + tx_delay_ms: u32, + }, +} diff --git a/crates/snot/src/cannon/source.rs b/crates/snot/src/cannon/source.rs new file mode 100644 index 00000000..f55f78e1 --- /dev/null +++ b/crates/snot/src/cannon/source.rs @@ -0,0 +1,120 @@ +use std::collections::HashSet; + +use anyhow::{anyhow, Result}; +use serde::Deserialize; +use snot_common::state::NodeKey; + +use crate::schema::nodes::KeySource; + +use super::net::get_available_port; + +/// Represents an instance of a local query service. +#[derive(Clone, Debug, Deserialize)] +pub struct LocalQueryService { + /// Ledger & genesis block to use + // pub storage_id: usize, + /// port to host the service on (needs to be unused by other cannons and services) + /// this port will be use when forwarding requests to the local query service + // pub port: u16, + + // TODO debate this + /// An optional node to sync blocks from... + /// necessary for private tx mode in realtime mode as this will have to + /// sync from a node that has a valid ledger + /// + /// When present, the cannon will update the ledger service from this node + /// if the node is out of sync, it will corrupt the ledger... + /// + /// requires cannon to have an associated env_id + pub sync_from: Option, +} + +impl LocalQueryService { + // TODO: cache this when sync_from is false + /// Fetch the state root from the local query service + /// (non-cached) + pub async fn get_state_root(&self, port: u16) -> Result { + let url = format!("http://127.0.0.1:{}/mainnet/latest/stateRoot", port); + let response = reqwest::get(&url).await?; + Ok(response.text().await?) + } +} + +/// Used to determine the redirection for the following paths: +/// /cannon//mainnet/latest/stateRoot +/// /cannon//mainnet/transaction/broadcast +#[derive(Clone, Debug, Deserialize)] +pub enum LedgerQueryService { + /// Use the local ledger query service + Local(LocalQueryService), + /// Target a specific node (probably over rpc instead of reqwest lol...) + /// + /// Requires cannon to have an associated env_id + Node(NodeKey), +} + +/// Which service is providing the compute power for executing transactions +#[derive(Default, Clone, Debug, Deserialize)] +pub enum ComputeTarget { + /// Use the agent pool to generate executions + #[default] + AgentPool, + /// Use demox' API to generate executions + Demox, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, Deserialize)] +pub enum CreditsTxMode { + BondPublic, + UnbondPublic, + TransferPublic, + TransferPublicToPrivate, + // cannot run these in aot mode + TransferPrivate, + TransferPrivateToPublic, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, Deserialize)] +pub enum TxMode { + Credits(CreditsTxMode), + // TODO: Program(program, func, input types??) +} + +#[derive(Clone, Debug, Deserialize)] +pub enum TxSource { + /// Read transactions from a file + Playback { + // filename from the storage for the tx list + name: String, + }, + /// Generate transactions in real time + RealTime { + query: LedgerQueryService, + compute: ComputeTarget, + + /// defaults to TransferPublic + tx_modes: HashSet, + + /// private keys for making transactions + /// defaults to committee keys + private_keys: Vec, + /// addreses for transaction targets + /// defaults to committee addresses + addresses: Vec, + }, +} + +impl TxSource { + /// Get an available port for the query service if applicable + pub fn get_query_port(&self) -> Result> { + matches!( + self, + TxSource::RealTime { + query: LedgerQueryService::Local(_), + .. + } + ) + .then(|| get_available_port().ok_or(anyhow!("could not get an available port"))) + .transpose() + } +} diff --git a/crates/snot/src/main.rs b/crates/snot/src/main.rs index 3ea56a2a..b27461e9 100644 --- a/crates/snot/src/main.rs +++ b/crates/snot/src/main.rs @@ -5,6 +5,7 @@ use cli::Cli; use tracing::level_filters::LevelFilter; use tracing_subscriber::prelude::*; +pub mod cannon; pub mod cli; pub mod schema; pub mod server; diff --git a/crates/snot/src/schema/cannon.rs b/crates/snot/src/schema/cannon.rs new file mode 100644 index 00000000..3d0ff4d5 --- /dev/null +++ b/crates/snot/src/schema/cannon.rs @@ -0,0 +1,13 @@ +use serde::Deserialize; + +use crate::cannon::{sink::TxSink, source::TxSource}; + +/// A document describing the node infrastructure for a test. +#[derive(Deserialize, Debug, Clone)] +pub struct Document { + pub name: String, + pub description: Option, + + pub source: TxSource, + pub sink: TxSink, +} diff --git a/crates/snot/src/schema/mod.rs b/crates/snot/src/schema/mod.rs index fef59d3f..9a504717 100644 --- a/crates/snot/src/schema/mod.rs +++ b/crates/snot/src/schema/mod.rs @@ -9,6 +9,7 @@ use serde::{ use snot_common::state::{NodeKey, NodeType}; use wildmatch::WildMatch; +pub mod cannon; pub mod infrastructure; pub mod nodes; pub mod outcomes; @@ -38,6 +39,9 @@ pub enum ItemDocument { #[serde(rename = "outcomes.snarkos.testing.monadic.us/v1")] Outcomes(Box), + + #[serde(rename = "cannon.snarkos.testing.monadic.us/v1")] + Cannon(Box), } /// One or more deserialized node targets. Composed of one or more diff --git a/crates/snot/src/schema/storage.rs b/crates/snot/src/schema/storage.rs index 14ade997..4137a219 100644 --- a/crates/snot/src/schema/storage.rs +++ b/crates/snot/src/schema/storage.rs @@ -3,7 +3,10 @@ use std::{ ops::Deref, path::PathBuf, process::Stdio, - sync::atomic::{AtomicUsize, Ordering}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; use anyhow::{anyhow, ensure}; @@ -28,6 +31,7 @@ pub struct Document { #[serde(default)] pub prefer_existing: bool, pub generate: Option, + pub connect: Option, } /// Data generation instructions. @@ -42,10 +46,19 @@ pub struct StorageGeneration { #[serde(default)] pub ledger: LedgerGeneration, + #[serde(default)] + pub accounts: Vec, + #[serde(default)] pub transactions: Vec, } +#[derive(Deserialize, Debug, Clone)] +pub struct Accounts { + pub file: PathBuf, + pub total: u64, +} + // TODO: I don't know what this type should look like #[derive(Deserialize, Debug, Clone)] pub struct Transaction { @@ -155,7 +168,7 @@ impl From for String { } impl Document { - pub async fn prepare(&self, state: &GlobalState) -> anyhow::Result { + pub async fn prepare(self, state: &GlobalState) -> anyhow::Result> { static STORAGE_ID_INT: AtomicUsize = AtomicUsize::new(0); let id = String::from(self.id.clone()); @@ -174,7 +187,7 @@ impl Document { // TODO: respect self.prefer_existing - match self.generate.clone() { + match self.generate { // generate the block and ledger if we have generation params Some(generation) => 'generate: { // warn if an existing block/ledger already exists @@ -193,28 +206,38 @@ impl Document { .join("../../target/release/snarkos-aot"), ); let output = base.join(&generation.genesis.output); - let res = Command::new(bin) - .stdout(Stdio::inherit()) - .stderr(Stdio::inherit()) - .arg("genesis") - .arg("--output") - .arg(&output) - .arg("--committee-size") - .arg(generation.genesis.committee.to_string()) - .arg("--committee-output") - .arg(base.join("committee.json")) - .arg("--additional-accounts") - .arg(generation.genesis.additional_accounts.to_string()) - .arg("--additional-accounts-output") - .arg(base.join("accounts.json")) - .arg("--ledger") - .arg(base.join("ledger")) - .spawn()? - .wait() - .await?; - if !res.success() { - warn!("failed to run genesis generation command..."); + match self.connect { + Some(url) => { + let res = reqwest::get(url).await?.error_for_status()?.bytes().await?; + + tokio::fs::write(&output, res).await?; + } + None => { + let res = Command::new(bin) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .arg("genesis") + .arg("--output") + .arg(&output) + .arg("--committee-size") + .arg(generation.genesis.committee.to_string()) + .arg("--committee-output") + .arg(base.join("committee.json")) + .arg("--additional-accounts") + .arg(generation.genesis.additional_accounts.to_string()) + .arg("--additional-accounts-output") + .arg(base.join("accounts.json")) + .arg("--ledger") + .arg(base.join("ledger")) + .spawn()? + .wait() + .await?; + + if !res.success() { + warn!("failed to run genesis generation command..."); + } + } } if tokio::fs::try_exists(&output).await.is_err() { @@ -290,18 +313,16 @@ impl Document { let int_id = STORAGE_ID_INT.fetch_add(1, Ordering::Relaxed); storage_lock.insert(int_id, id.to_owned()); + let storage = Arc::new(LoadedStorage { + id: id.to_owned(), + path: base.clone(), + committee: read_to_addrs(pick_commitee_addr, base.join("committee.json")).await?, + accounts, + }); let mut storage_lock = state.storage.write().await; - storage_lock.insert( - int_id, - LoadedStorage { - id: id.to_owned(), - path: base.clone(), - committee: read_to_addrs(pick_commitee_addr, base.join("committee.json")).await?, - accounts, - }, - ); + storage_lock.insert(int_id, storage.clone()); - Ok(int_id) + Ok(storage) } } diff --git a/crates/snot/src/schema/timeline.rs b/crates/snot/src/schema/timeline.rs index 76192a37..bc577a58 100644 --- a/crates/snot/src/schema/timeline.rs +++ b/crates/snot/src/schema/timeline.rs @@ -1,10 +1,13 @@ -use std::{fmt, path::PathBuf, time::Duration}; +use std::{fmt, time::Duration}; use indexmap::IndexMap; use serde::{ de::{Error, Visitor}, Deserialize, Deserializer, }; +use snot_common::state::NodeKey; + +use super::NodeTargets; /// A document describing a test's event timeline. #[derive(Deserialize, Debug, Clone)] @@ -42,7 +45,7 @@ pub enum Action { /// Update the given nodes to an offline state Offline(NodeTarget), /// Fire transactions from a source file at a target node - Cannon(Vec), + Cannon(Vec), /// Set the height of some nodes' ledgers Height(IndexMap), } @@ -175,9 +178,11 @@ impl<'de> Deserialize<'de> for EventDuration { } #[derive(Deserialize, Debug, Clone)] -pub struct TxCannon { - pub target: String, - pub source: PathBuf, - pub total: u64, - pub tps: u32, +pub struct SpawnCannon { + pub name: String, + pub count: u64, + /// overwrite the query's source node + pub query: Option, + /// overwrite the cannon sink target + pub target: Option, } diff --git a/crates/snot/src/server/api.rs b/crates/snot/src/server/api.rs index b15a1b56..a90f372a 100644 --- a/crates/snot/src/server/api.rs +++ b/crates/snot/src/server/api.rs @@ -5,12 +5,12 @@ use axum::{ routing::{delete, get, post}, Json, Router, }; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use serde_json::json; use snot_common::rpc::agent::AgentMetric; use super::AppState; -use crate::testing::Test; +use crate::testing::Environment; pub(super) fn routes() -> Router { Router::new() @@ -73,7 +73,7 @@ async fn get_agent_tps(state: State, Path(id): Path) -> Respons } async fn post_test_prepare(state: State, body: String) -> Response { - let documents = match Test::deserialize(&body) { + let documents = match Environment::deserialize(&body) { Ok(documents) => documents, Err(e) => { return ( @@ -89,10 +89,8 @@ async fn post_test_prepare(state: State, body: String) -> Response { // TODO: clean up existing test - // TODO: support concurrent tests + return test id - - match Test::prepare(documents, &state).await { - Ok(test_id) => (StatusCode::OK, Json(json!({ "id": test_id }))).into_response(), + match Environment::prepare(documents, &state).await { + Ok(env_id) => (StatusCode::OK, Json(json!({ "id": env_id }))).into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": format!("{e}") })), @@ -102,10 +100,10 @@ async fn post_test_prepare(state: State, body: String) -> Response { } async fn delete_test( - Path(test_id): Path, + Path(env_id): Path, State(state): State, ) -> impl IntoResponse { - match Test::cleanup(&test_id, &state).await { + match Environment::cleanup(&env_id, &state).await { Ok(_) => StatusCode::OK.into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, diff --git a/crates/snot/src/server/mod.rs b/crates/snot/src/server/mod.rs index dd8ba736..84240b44 100644 --- a/crates/snot/src/server/mod.rs +++ b/crates/snot/src/server/mod.rs @@ -1,15 +1,14 @@ -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use ::jwt::VerifyWithKey; use anyhow::Result; use axum::{ - body::Body, extract::{ ws::{Message, WebSocket}, State, WebSocketUpgrade, }, - http::{HeaderMap, Request}, - response::{IntoResponse, Response}, + http::HeaderMap, + response::IntoResponse, routing::get, Router, }; @@ -22,13 +21,14 @@ use surrealdb::Surreal; use tarpc::server::Channel; use tokio::select; use tower_http::trace::{DefaultMakeSpan, TraceLayer}; -use tracing::{info, warn, Span}; +use tracing::{info, warn}; use self::{ jwt::{Claims, JWT_NONCE, JWT_SECRET}, rpc::ControlRpcServer, }; use crate::{ + cannon::router::redirect_cannon_routes, cli::Cli, server::rpc::{MuxedMessageIncoming, MuxedMessageOutgoing}, state::{Agent, AppState, GlobalState}, @@ -49,27 +49,27 @@ pub async fn start(cli: Cli) -> Result<()> { pool: Default::default(), storage_ids: Default::default(), storage: Default::default(), - tests_counter: Default::default(), - tests: Default::default(), + envs_counter: Default::default(), + envs: Default::default(), }; - let app = - Router::new() - .route("/agent", get(agent_ws_handler)) - .nest("/api/v1", api::routes()) - // /env//ledger/* - ledger query service reverse proxying /mainnet/latest/stateRoot - .nest("/content", content::init_routes(&state).await) - .with_state(Arc::new(state)) - .layer( - TraceLayer::new_for_http() - .make_span_with(DefaultMakeSpan::new().include_headers(true)), /* .on_request(|request: &Request, _span: &Span| { - * tracing::info!("req {} - {}", request.method(), request.uri()); - * }) - * .on_response(|response: &Response, _latency: Duration, span: &Span| { - * span.record("status_code", &tracing::field::display(response.status())); - * tracing::info!("res {}", response.status()) - * }), */ - ); + let app = Router::new() + .route("/agent", get(agent_ws_handler)) + .nest("/api/v1", api::routes()) + // /env//ledger/* - ledger query service reverse proxying /mainnet/latest/stateRoot + .nest("/content", content::init_routes(&state).await) + .nest("/cannons", redirect_cannon_routes()) + .with_state(Arc::new(state)) + .layer( + TraceLayer::new_for_http().make_span_with(DefaultMakeSpan::new().include_headers(true)), + //.on_request(|request: &Request, _span: &Span| { + // tracing::info!("req {} - {}", request.method(), request.uri()); + //}) + //.on_response(|response: &Response, _latency: Duration, span: &Span| { + // span.record("status_code", &tracing::field::display(response.status())); + // tracing::info!("res {}", response.status()) + //}), + ); let listener = tokio::net::TcpListener::bind("0.0.0.0:1234").await?; axum::serve(listener, app).await?; diff --git a/crates/snot/src/state.rs b/crates/snot/src/state.rs index 74be5924..031a54d9 100644 --- a/crates/snot/src/state.rs +++ b/crates/snot/src/state.rs @@ -23,7 +23,7 @@ use crate::{ cli::Cli, schema::storage::LoadedStorage, server::jwt::{Claims, JWT_NONCE, JWT_SECRET}, - testing::Test, + testing::Environment, }; pub type AgentId = usize; @@ -38,10 +38,10 @@ pub struct GlobalState { pub pool: RwLock>, /// A map from ephemeral integer storage ID to actual storage ID. pub storage_ids: RwLock>, - pub storage: RwLock>, + pub storage: RwLock>>, - pub tests_counter: AtomicUsize, - pub tests: RwLock>, + pub envs_counter: AtomicUsize, + pub envs: RwLock>>, } /// This is the representation of a public addr or a list of internal addrs. @@ -183,6 +183,10 @@ impl AgentClient { pub async fn reconcile(&self, to: AgentState) -> Result, RpcError> { self.0.reconcile(context::current(), to).await } + + pub async fn get_state_root(&self) -> Result { + Ok(self.0.get_state_root(context::current()).await??) + } } #[derive(Debug, Clone)] @@ -255,4 +259,14 @@ impl GlobalState { }) .collect() } + + /// Lookup an rpc client by agent id. + /// Locks pools for reading + pub async fn get_client(&self, id: AgentId) -> Option { + self.pool + .read() + .await + .get(&id) + .and_then(|a| a.client_owned()) + } } diff --git a/crates/snot/src/testing.rs b/crates/snot/src/testing.rs index 94234357..ba949a62 100644 --- a/crates/snot/src/testing.rs +++ b/crates/snot/src/testing.rs @@ -1,4 +1,11 @@ -use std::{fmt::Display, sync::atomic::Ordering}; +use std::{ + collections::HashMap, + fmt::Display, + sync::{ + atomic::{AtomicU32, AtomicUsize, Ordering}, + Arc, + }, +}; use anyhow::{anyhow, bail, ensure}; use bimap::{BiHashMap, BiMap}; @@ -9,24 +16,34 @@ use snot_common::state::{AgentId, AgentPeer, AgentState, NodeKey}; use tracing::{info, warn}; use crate::{ + cannon::{sink::TxSink, source::TxSource, CannonInstance}, schema::{ nodes::{ExternalNode, Node}, + storage::LoadedStorage, ItemDocument, NodeTargets, }, state::GlobalState, }; -#[derive(Debug, Clone)] -pub struct Test { - pub storage_id: usize, - pub node_map: BiMap, - pub initial_nodes: IndexMap, - // TODO: GlobalStorage.storage should maybe be here instead +#[derive(Debug)] +pub struct Environment { + pub storage: Arc, + pub node_map: BiMap, + pub initial_nodes: IndexMap, + + /// Map of transaction files to their respective counters + pub transaction_counters: HashMap, + /// Map of cannon ids to their cannon configurations + pub cannon_configs: HashMap, + /// To help generate the id of the new cannon. + pub cannons_counter: AtomicUsize, + /// Map of cannon ids to their cannon instances + pub cannons: HashMap, } #[derive(Debug, Clone)] /// The effective test state of a node. -pub enum TestNode { +pub enum EnvNode { Internal(Node), External(ExternalNode), } @@ -35,21 +52,21 @@ pub enum TestNode { /// A way of looking up a peer in the test state. /// Could technically use AgentPeer like this but it would have needless port /// information -pub enum TestPeer { +pub enum EnvPeer { Internal(AgentId), External, } -impl Display for TestPeer { +impl Display for EnvPeer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - TestPeer::Internal(id) => write!(f, "agent {id}"), - TestPeer::External => write!(f, "external node"), + EnvPeer::Internal(id) => write!(f, "agent {id}"), + EnvPeer::External => write!(f, "external node"), } } } -impl Test { +impl Environment { /// Deserialize (YAML) many documents into a `Vec` of documents. pub fn deserialize(str: &str) -> Result, anyhow::Error> { serde_yaml::Deserializer::from_str(str) @@ -62,26 +79,31 @@ impl Test { /// Prepare a test. This will set the current test on the GlobalState. /// - /// **This will error if the current test is not unset before calling to + /// **This will error if the current env is not unset before calling to /// ensure tests are properly cleaned up.** pub async fn prepare( documents: Vec, state: &GlobalState, ) -> anyhow::Result { - let mut state_lock = state.tests.write().await; + let mut state_lock = state.envs.write().await; - let mut storage_id = None; + let mut storage = None; let mut node_map = BiHashMap::default(); let mut initial_nodes = IndexMap::default(); + let mut cannon_configs = HashMap::new(); for document in documents { match document { - ItemDocument::Storage(storage) => { - let int_id = storage.prepare(state).await?; - if storage_id.is_none() { - storage_id = Some(int_id); + ItemDocument::Storage(doc) => { + if storage.is_none() { + storage = Some(doc.prepare(state).await?); + } else { + bail!("multiple storage documents found in env") } } + ItemDocument::Cannon(cannon) => { + cannon_configs.insert(cannon.name.to_owned(), (cannon.source, cannon.sink)); + } ItemDocument::Nodes(nodes) => { // flatten replicas for (doc_node_key, mut doc_node) in nodes.nodes { @@ -109,7 +131,7 @@ impl Test { if let Some(key) = node.key.take() { node.key = Some(key.with_index(i)) } - ent.insert(TestNode::Internal(node)) + ent.insert(EnvNode::Internal(node)) } }; } @@ -136,7 +158,7 @@ impl Test { initial_nodes .keys() .cloned() - .zip(available_agent.map(|agent| TestPeer::Internal(agent.id()))), + .zip(available_agent.map(|agent| EnvPeer::Internal(agent.id()))), ); info!("delegated {} nodes to agents", node_map.len()); @@ -149,7 +171,7 @@ impl Test { for (node_key, node) in &nodes.external { match initial_nodes.entry(node_key.clone()) { Entry::Occupied(ent) => bail!("duplicate node key: {}", ent.key()), - Entry::Vacant(ent) => ent.insert(TestNode::External(node.to_owned())), + Entry::Vacant(ent) => ent.insert(EnvNode::External(node.to_owned())), }; } node_map.extend( @@ -157,7 +179,7 @@ impl Test { .external .keys() .cloned() - .map(|k| (k, TestPeer::External)), + .map(|k| (k, EnvPeer::External)), ) } @@ -165,40 +187,44 @@ impl Test { } } - let test = Test { - storage_id: storage_id.ok_or_else(|| anyhow!("test is missing storage document"))?, + let env = Environment { + storage: storage.ok_or_else(|| anyhow!("env is missing storage document"))?, node_map, initial_nodes, + transaction_counters: Default::default(), + cannon_configs, + cannons_counter: Default::default(), + cannons: Default::default(), }; - let test_id = state.tests_counter.fetch_add(1, Ordering::Relaxed); - state_lock.insert(test_id, test); + let env_id = state.envs_counter.fetch_add(1, Ordering::Relaxed); + state_lock.insert(env_id, Arc::new(env)); drop(state_lock); // reconcile the nodes - initial_reconcile(&test_id, state).await?; + initial_reconcile(&env_id, state).await?; - Ok(test_id) + Ok(env_id) } pub async fn cleanup(id: &usize, state: &GlobalState) -> anyhow::Result<()> { - // clear the test state - info!("clearing test {id} state..."); - let Some(test) = ({ - let mut state_lock = state.tests.write().await; + // clear the env state + info!("clearing env {id} state..."); + let Some(env) = ({ + let mut state_lock = state.envs.write().await; state_lock.remove(id) }) else { - bail!("test {id} not found") + bail!("env {id} not found") }; // reconcile all online agents let (ids, handles): (Vec<_>, Vec<_>) = { let agents = state.pool.read().await; - test.node_map + env.node_map .right_values() - // find all agents associated with the test + // find all agents associated with the env .filter_map(|peer| match peer { - TestPeer::Internal(id) => agents.get(id), + EnvPeer::Internal(id) => agents.get(id), _ => None, }) // map the agents to rpc clients @@ -243,6 +269,14 @@ impl Test { Ok(()) } + + /// Lookup a env agent id by node key. + pub fn get_agent_by_key(&self, key: &NodeKey) -> Option { + self.node_map.get_by_left(key).and_then(|id| match id { + EnvPeer::Internal(id) => Some(*id), + EnvPeer::External => None, + }) + } } /// Reconcile all associated nodes with their initial state. @@ -250,27 +284,17 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul let mut handles = vec![]; let mut agent_ids = vec![]; { - let tests_lock = state.tests.read().await; - let test = tests_lock - .get(id) - .ok_or_else(|| anyhow!("test not found"))?; - - // get the numeric storage ID from the string storage ID - let storage_id = test.storage_id; - - // obtain the actual storage - let Some(storage) = state.storage.read().await.get(&storage_id).cloned() else { - bail!("test {id} storage {storage_id} not found...") - }; + let envs_lock = state.envs.read().await; + let env = envs_lock.get(id).ok_or_else(|| anyhow!("env not found"))?; let pool_lock = state.pool.read().await; // Lookup agent peers given a node key - let node_to_agent = |key: &NodeKey, node: &TestPeer, is_validator: bool| { + let node_to_agent = |key: &NodeKey, node: &EnvPeer, is_validator: bool| { // get the internal agent ID from the node key match node { // internal peers are mapped to internal agents - TestPeer::Internal(id) => { + EnvPeer::Internal(id) => { let Some(agent) = pool_lock.get(id) else { bail!("agent {id} not found in pool") }; @@ -285,8 +309,8 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul )) } // external peers are mapped to external nodes - TestPeer::External => { - let Some(TestNode::External(external)) = test.initial_nodes.get(key) else { + EnvPeer::External => { + let Some(EnvNode::External(external)) = env.initial_nodes.get(key) else { bail!("external node with key {key} not found") }; @@ -318,24 +342,24 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul // alternatively, use a more efficient data structure for // storing node keys - test.node_map + env.node_map .iter() .filter(|(k, _)| *k != key && target.matches(k)) .map(|(k, v)| node_to_agent(k, v, is_validator)) .collect() }; - for (key, node) in &test.initial_nodes { - let TestNode::Internal(node) = node else { + for (key, node) in &env.initial_nodes { + let EnvNode::Internal(node) = node else { continue; }; // get the internal agent ID from the node key - let Some(TestPeer::Internal(id)) = test.node_map.get_by_left(key) else { + let Some(id) = env.get_agent_by_key(key) else { bail!("expected internal agent peer for node with key {key}") }; - let Some(client) = pool_lock.get(id).and_then(|a| a.client_owned()) else { + let Some(client) = pool_lock.get(&id).and_then(|a| a.client_owned()) else { continue; }; @@ -344,12 +368,12 @@ pub async fn initial_reconcile(id: &usize, state: &GlobalState) -> anyhow::Resul node_state.private_key = node .key .as_ref() - .and_then(|key| storage.lookup_keysource(key)); + .and_then(|key| env.storage.lookup_keysource(key)); node_state.peers = matching_nodes(key, &node.peers, false)?; node_state.validators = matching_nodes(key, &node.validators, true)?; - let agent_state = AgentState::Node(storage_id, node_state); - agent_ids.push(*id); + let agent_state = AgentState::Node(id, node_state); + agent_ids.push(id); handles.push(tokio::spawn(async move { client .reconcile(agent_state.clone()) diff --git a/specs/test-1-validators.yaml b/specs/test-1-validators.yaml index 417d6125..71612908 100644 --- a/specs/test-1-validators.yaml +++ b/specs/test-1-validators.yaml @@ -12,7 +12,7 @@ generate: --- version: nodes.snarkos.testing.monadic.us/v1 -name: 4-validators +name: 1-validator nodes: # validator/test: diff --git a/specs/test-4-clients-canary.yaml b/specs/test-4-clients-canary.yaml new file mode 100644 index 00000000..efac6c2a --- /dev/null +++ b/specs/test-4-clients-canary.yaml @@ -0,0 +1,37 @@ +--- +version: storage.snarkos.testing.monadic.us/v1 + +id: canary-clients +name: canary-clients + +fetch: https://pub-d74d58a8616c4d54bc1a948b4d001970.r2.dev/block.genesis + +generate: + path: ./tests/canary-clients + + accounts: + - file: "accounts.json" + total: 5 + +--- +version: nodes.snarkos.testing.monadic.us/v1 +name: 4-clients-canary + +external: + validator/1@canarynet: + node: 11.12.13.14:4130 + + validator/2@canarynet: + node: 11.12.13.14:4130 + + client/1@canarynet: + node: 11.12.13.14:4130 + +nodes: + client/test: + key: accounts.$ + replicas: 4 + height: 0 + validators: [] + peers: ["*/*@canarynet"] + diff --git a/specs/test-4-validators.yaml b/specs/test-4-validators.yaml index 2e77ad50..7385dc92 100644 --- a/specs/test-4-validators.yaml +++ b/specs/test-4-validators.yaml @@ -10,7 +10,7 @@ name: 4-validators nodes: validator/test: - replicas: 4 + replicas: 2 key: committee.$ height: 0 validators: [validator/*] diff --git a/test-spec.yaml b/test-spec.yaml index 8c9ab1ed..0c396482 100644 --- a/test-spec.yaml +++ b/test-spec.yaml @@ -6,8 +6,14 @@ name: private tx, 500 rounds, 4 accounts description: | This ledger was built to test private transactions +fetch: https://example.com/genesis.block + # instructions for generating this test generate: + accounts: + - file: accounts.json + total: 5 + # genesis generation genesis: seed: 0 @@ -15,6 +21,7 @@ generate: committee-balances: 10_000_000_000_000 additional-accounts: 10 additional-balances: 100_000_000_000 + # ledger setup ledger: blocks: 100