diff --git a/sim-rs/Cargo.lock b/sim-rs/Cargo.lock index 8d61605..e99b6b7 100644 --- a/sim-rs/Cargo.lock +++ b/sim-rs/Cargo.lock @@ -793,13 +793,27 @@ dependencies = [ ] [[package]] -name = "sim-rs" +name = "sim-cli" version = "0.1.0" dependencies = [ "anyhow", - "async-stream", "clap", "ctrlc", + "serde", + "serde_json", + "sim-core", + "tokio", + "toml", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "sim-core" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-stream", "futures", "netsim-async", "priority-queue", @@ -807,11 +821,8 @@ dependencies = [ "rand_chacha", "rand_distr", "serde", - "serde_json", "tokio", - "toml", "tracing", - "tracing-subscriber", ] [[package]] diff --git a/sim-rs/Cargo.toml b/sim-rs/Cargo.toml index 499a2b6..d4f5ffa 100644 --- a/sim-rs/Cargo.toml +++ b/sim-rs/Cargo.toml @@ -1,25 +1,11 @@ -[package] -name = "sim-rs" -version = "0.1.0" -edition = "2021" +[workspace] -[dependencies] -anyhow = "1" -async-stream = "0.3" -clap = { version = "4", features = ["derive"] } -ctrlc = "3" -futures = "0.3" -netsim-async = { git = "https://github.com/SupernaviX/ce-netsim.git", rev = "535ae49" } -priority-queue = "2" -rand = "0.8" -rand_chacha = "0.3" -rand_distr = "0.4" -serde = { version = "1", features = ["derive"] } -serde_json = "1" -tokio = { version = "1", features = ["full"] } -toml = "0.8" -tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +members = [ + "sim-cli", + "sim-core", +] + +resolver = "2" [profile.release] debug = true diff --git a/sim-rs/sim-cli/Cargo.toml b/sim-rs/sim-cli/Cargo.toml new file mode 100644 index 0000000..69b6529 --- /dev/null +++ b/sim-rs/sim-cli/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "sim-cli" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1" +clap = { version = "4", features = ["derive"] } +ctrlc = "3" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +sim-core = { path = "../sim-core" } +tokio = { version = "1", features = ["full"] } +toml = "0.8" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/sim-rs/sim-cli/src/config.rs b/sim-rs/sim-cli/src/config.rs new file mode 100644 index 0000000..13f3a69 --- /dev/null +++ b/sim-rs/sim-cli/src/config.rs @@ -0,0 +1,22 @@ +use std::{fs, path::Path}; + +use anyhow::Result; +use sim_core::config::{NodeId, RawConfig, SimConfiguration}; + +pub fn read_config( + filename: &Path, + timescale: Option, + trace_nodes: &[usize], +) -> Result { + let file = fs::read_to_string(filename)?; + let mut raw_config: RawConfig = toml::from_str(&file)?; + if let Some(ts) = timescale { + raw_config.timescale = Some(ts); + } + for id in trace_nodes { + raw_config.trace_nodes.insert(NodeId::new(*id)); + } + let config: SimConfiguration = raw_config.into(); + config.validate()?; + Ok(config) +} diff --git a/sim-rs/src/events.rs b/sim-rs/sim-cli/src/events.rs similarity index 82% rename from sim-rs/src/events.rs rename to sim-rs/sim-cli/src/events.rs index 3bfc804..89d1c81 100644 --- a/sim-rs/src/events.rs +++ b/sim-rs/sim-cli/src/events.rs @@ -1,39 +1,15 @@ -use std::{collections::BTreeMap, path::PathBuf, sync::Arc}; +use std::{collections::BTreeMap, path::PathBuf}; use anyhow::Result; use serde::Serialize; -use tokio::{fs::File, io::AsyncWriteExt, sync::mpsc}; -use tracing::{info, info_span, warn}; - -use crate::{ - clock::{Clock, Timestamp}, +use sim_core::{ + clock::Timestamp, config::{NodeId, SimConfiguration}, - model::{Block, InputBlock, Transaction, TransactionId}, + events::Event, + model::TransactionId, }; - -pub enum Event { - Transaction { - id: TransactionId, - bytes: u64, - }, - Slot { - number: u64, - block: Option, - }, - BlockReceived { - slot: u64, - sender: NodeId, - recipient: NodeId, - }, - InputBlockGenerated { - block: Arc, - }, - InputBlockReceived { - block: Arc, - sender: NodeId, - recipient: NodeId, - }, -} +use tokio::{fs::File, io::AsyncWriteExt as _, sync::mpsc}; +use tracing::{info, info_span}; #[derive(Clone, Serialize)] enum OutputEvent { @@ -71,55 +47,6 @@ enum OutputEvent { }, } -#[derive(Clone)] -pub struct EventTracker { - sender: mpsc::UnboundedSender<(Event, Timestamp)>, - clock: Clock, -} - -impl EventTracker { - pub fn new(sender: mpsc::UnboundedSender<(Event, Timestamp)>, clock: Clock) -> Self { - Self { sender, clock } - } - - pub fn track_slot(&self, number: u64, block: Option) { - self.send(Event::Slot { number, block }); - } - - pub fn track_block_received(&self, slot: u64, sender: NodeId, recipient: NodeId) { - self.send(Event::BlockReceived { - slot, - sender, - recipient, - }); - } - - pub fn track_transaction(&self, transaction: &Transaction) { - self.send(Event::Transaction { - id: transaction.id, - bytes: transaction.bytes, - }); - } - - pub fn track_ib_generated(&self, block: Arc) { - self.send(Event::InputBlockGenerated { block }); - } - - pub fn track_ib_received(&self, block: Arc, sender: NodeId, recipient: NodeId) { - self.send(Event::InputBlockReceived { - block, - sender, - recipient, - }); - } - - fn send(&self, event: Event) { - if self.sender.send((event, self.clock.now())).is_err() { - warn!("tried sending event after aggregator finished"); - } - } -} - pub struct EventMonitor { node_ids: Vec, pool_ids: Vec, diff --git a/sim-rs/src/main.rs b/sim-rs/sim-cli/src/main.rs similarity index 93% rename from sim-rs/src/main.rs rename to sim-rs/sim-cli/src/main.rs index ab2142c..826403c 100644 --- a/sim-rs/src/main.rs +++ b/sim-rs/sim-cli/src/main.rs @@ -2,10 +2,9 @@ use std::{path::PathBuf, process, time::Instant}; use anyhow::Result; use clap::Parser; -use clock::Clock; use config::read_config; -use events::{EventMonitor, EventTracker}; -use sim::Simulation; +use events::EventMonitor; +use sim_core::{clock::Clock, events::EventTracker, sim::Simulation}; use tokio::{ pin, select, sync::{mpsc, oneshot}, @@ -13,13 +12,8 @@ use tokio::{ use tracing::{level_filters::LevelFilter, warn}; use tracing_subscriber::{layer::SubscriberExt as _, util::SubscriberInitExt, EnvFilter}; -mod clock; mod config; mod events; -mod model; -mod network; -mod probability; -mod sim; #[derive(Parser)] struct Args { diff --git a/sim-rs/sim-core/Cargo.toml b/sim-rs/sim-core/Cargo.toml new file mode 100644 index 0000000..4c0e5aa --- /dev/null +++ b/sim-rs/sim-core/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "sim-core" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1" +async-stream = "0.3" +futures = "0.3" +netsim-async = { git = "https://github.com/SupernaviX/ce-netsim.git", rev = "535ae49" } +priority-queue = "2" +rand = "0.8" +rand_chacha = "0.3" +rand_distr = "0.4" +serde = { version = "1", features = ["derive"] } +tokio = { version = "1", features = ["macros", "time"] } +tracing = "0.1" diff --git a/sim-rs/src/clock.rs b/sim-rs/sim-core/src/clock.rs similarity index 100% rename from sim-rs/src/clock.rs rename to sim-rs/sim-core/src/clock.rs diff --git a/sim-rs/src/config.rs b/sim-rs/sim-core/src/config.rs similarity index 66% rename from sim-rs/src/config.rs rename to sim-rs/sim-core/src/config.rs index 84dcdca..0444c34 100644 --- a/sim-rs/src/config.rs +++ b/sim-rs/sim-core/src/config.rs @@ -1,8 +1,6 @@ use std::{ collections::{HashSet, VecDeque}, fmt::Display, - fs, - path::Path, time::Duration, }; @@ -23,14 +21,14 @@ impl NodeId { pub fn to_inner(self) -> usize { self.0 } - pub fn from_usize(value: usize) -> Self { + pub fn new(value: usize) -> Self { Self(value) } } #[derive(Debug, Deserialize)] #[serde(tag = "distribution", rename_all = "snake_case")] -enum DistributionConfig { +pub enum DistributionConfig { Normal { mean: f64, std_dev: f64 }, Exp { lambda: f64, scale: Option }, LogNormal { mu: f64, sigma: f64 }, @@ -50,64 +48,33 @@ impl From for FloatDistribution { } #[derive(Debug, Deserialize)] -struct RawConfig { - seed: Option, - timescale: Option, +pub struct RawConfig { + pub seed: Option, + pub timescale: Option, #[serde(default)] - trace_nodes: HashSet, - nodes: Vec, - links: Vec, - block_generation_probability: f64, - ib_generation_probability: f64, - max_block_size: u64, - max_tx_size: u64, - max_ib_size: u64, - max_ib_requests_per_peer: usize, - transaction_frequency_ms: DistributionConfig, - transaction_size_bytes: DistributionConfig, -} - -#[derive(Debug, Deserialize)] -struct RawNodeConfig { - location: (f64, f64), - stake: Option, -} - -#[derive(Debug, Deserialize)] -struct RawLinkConfig { - nodes: (usize, usize), - latency_ms: Option, -} - -#[derive(Debug, Clone)] -pub struct SimConfiguration { - pub seed: u64, - pub timescale: u32, pub trace_nodes: HashSet, - pub nodes: Vec, - pub links: Vec, + pub nodes: Vec, + pub links: Vec, pub block_generation_probability: f64, pub ib_generation_probability: f64, pub max_block_size: u64, pub max_tx_size: u64, pub max_ib_size: u64, pub max_ib_requests_per_peer: usize, - pub transaction_frequency_ms: FloatDistribution, - pub transaction_size_bytes: FloatDistribution, + pub transaction_frequency_ms: DistributionConfig, + pub transaction_size_bytes: DistributionConfig, } -#[derive(Debug, Clone)] -pub struct NodeConfiguration { - pub id: NodeId, - pub location: Location, - pub stake: u64, - pub peers: Vec, +#[derive(Debug, Deserialize)] +pub struct RawNodeConfig { + pub location: (f64, f64), + pub stake: Option, } -#[derive(Debug, Clone)] -pub struct LinkConfiguration { - pub nodes: (NodeId, NodeId), - pub latency: Duration, +#[derive(Debug, Deserialize)] +pub struct RawLinkConfig { + nodes: (usize, usize), + latency_ms: Option, } impl From for SimConfiguration { @@ -118,7 +85,7 @@ impl From for SimConfiguration { .into_iter() .enumerate() .map(|(index, raw)| NodeConfiguration { - id: NodeId(index), + id: NodeId::new(index), location: to_netsim_location(raw.location), stake: raw.stake.unwrap_or_default(), peers: vec![], @@ -127,10 +94,10 @@ impl From for SimConfiguration { let mut links = vec![]; for link in value.links { let (id1, id2) = link.nodes; - nodes[id1].peers.push(NodeId(id2)); - nodes[id2].peers.push(NodeId(id1)); + nodes[id1].peers.push(NodeId::new(id2)); + nodes[id2].peers.push(NodeId::new(id1)); links.push(LinkConfiguration { - nodes: (NodeId(id1), NodeId(id2)), + nodes: (NodeId::new(id1), NodeId::new(id2)), latency: compute_latency(nodes[id1].location, nodes[id2].location, link.latency_ms) / timescale, }); @@ -153,6 +120,52 @@ impl From for SimConfiguration { } } +#[derive(Debug, Clone)] +pub struct SimConfiguration { + pub seed: u64, + pub timescale: u32, + pub trace_nodes: HashSet, + pub nodes: Vec, + pub links: Vec, + pub block_generation_probability: f64, + pub ib_generation_probability: f64, + pub max_block_size: u64, + pub max_tx_size: u64, + pub max_ib_size: u64, + pub max_ib_requests_per_peer: usize, + pub transaction_frequency_ms: FloatDistribution, + pub transaction_size_bytes: FloatDistribution, +} + +impl SimConfiguration { + pub fn validate(&self) -> Result<()> { + // The graph must be nonempty and fully connected, + // and every link must be between two nodes which exist + let mut connected_nodes = HashSet::new(); + let mut frontier = VecDeque::new(); + let first_node = self + .nodes + .first() + .ok_or_else(|| anyhow!("Graph must not be empty!"))?; + frontier.push_back(first_node); + while let Some(node) = frontier.pop_front() { + if connected_nodes.insert(node.id) { + for peer_id in &node.peers { + let peer = self + .nodes + .get(peer_id.0) + .ok_or_else(|| anyhow!("Node {peer_id} not found!"))?; + frontier.push_back(peer); + } + } + } + if connected_nodes.len() < self.nodes.len() { + bail!("Graph must be fully connected!"); + } + Ok(()) + } +} + fn to_netsim_location((lat, long): (f64, f64)) -> Location { ((lat * 10000.) as i64, (long * 10000.) as u64) } @@ -165,47 +178,16 @@ fn compute_latency(loc1: Location, loc2: Location, extra_ms: Option) -> Dur geo_latency + extra_latency } -fn validate_graph(config: &SimConfiguration) -> Result<()> { - // The graph must be nonempty and fully connected, - // and every link must be between two nodes which exist - let mut connected_nodes = HashSet::new(); - let mut frontier = VecDeque::new(); - let first_node = config - .nodes - .first() - .ok_or_else(|| anyhow!("Graph must not be empty!"))?; - frontier.push_back(first_node); - while let Some(node) = frontier.pop_front() { - if connected_nodes.insert(node.id) { - for peer_id in &node.peers { - let peer = config - .nodes - .get(peer_id.0) - .ok_or_else(|| anyhow!("Node {peer_id} not found!"))?; - frontier.push_back(peer); - } - } - } - if connected_nodes.len() < config.nodes.len() { - bail!("Graph must be fully connected!"); - } - Ok(()) +#[derive(Debug, Clone)] +pub struct NodeConfiguration { + pub id: NodeId, + pub location: Location, + pub stake: u64, + pub peers: Vec, } -pub fn read_config( - filename: &Path, - timescale: Option, - trace_nodes: &[usize], -) -> Result { - let file = fs::read_to_string(filename)?; - let mut raw_config: RawConfig = toml::from_str(&file)?; - if let Some(ts) = timescale { - raw_config.timescale = Some(ts); - } - for id in trace_nodes { - raw_config.trace_nodes.insert(NodeId(*id)); - } - let config = raw_config.into(); - validate_graph(&config)?; - Ok(config) +#[derive(Debug, Clone)] +pub struct LinkConfiguration { + pub nodes: (NodeId, NodeId), + pub latency: Duration, } diff --git a/sim-rs/sim-core/src/events.rs b/sim-rs/sim-core/src/events.rs new file mode 100644 index 0000000..4ce0e45 --- /dev/null +++ b/sim-rs/sim-core/src/events.rs @@ -0,0 +1,83 @@ +use std::sync::Arc; + +use tokio::sync::mpsc; +use tracing::warn; + +use crate::{ + clock::{Clock, Timestamp}, + config::NodeId, + model::{Block, InputBlock, Transaction, TransactionId}, +}; + +pub enum Event { + Transaction { + id: TransactionId, + bytes: u64, + }, + Slot { + number: u64, + block: Option, + }, + BlockReceived { + slot: u64, + sender: NodeId, + recipient: NodeId, + }, + InputBlockGenerated { + block: Arc, + }, + InputBlockReceived { + block: Arc, + sender: NodeId, + recipient: NodeId, + }, +} + +#[derive(Clone)] +pub struct EventTracker { + sender: mpsc::UnboundedSender<(Event, Timestamp)>, + clock: Clock, +} + +impl EventTracker { + pub fn new(sender: mpsc::UnboundedSender<(Event, Timestamp)>, clock: Clock) -> Self { + Self { sender, clock } + } + + pub fn track_slot(&self, number: u64, block: Option) { + self.send(Event::Slot { number, block }); + } + + pub fn track_block_received(&self, slot: u64, sender: NodeId, recipient: NodeId) { + self.send(Event::BlockReceived { + slot, + sender, + recipient, + }); + } + + pub fn track_transaction(&self, transaction: &Transaction) { + self.send(Event::Transaction { + id: transaction.id, + bytes: transaction.bytes, + }); + } + + pub fn track_ib_generated(&self, block: Arc) { + self.send(Event::InputBlockGenerated { block }); + } + + pub fn track_ib_received(&self, block: Arc, sender: NodeId, recipient: NodeId) { + self.send(Event::InputBlockReceived { + block, + sender, + recipient, + }); + } + + fn send(&self, event: Event) { + if self.sender.send((event, self.clock.now())).is_err() { + warn!("tried sending event after aggregator finished"); + } + } +} diff --git a/sim-rs/sim-core/src/lib.rs b/sim-rs/sim-core/src/lib.rs new file mode 100644 index 0000000..94216aa --- /dev/null +++ b/sim-rs/sim-core/src/lib.rs @@ -0,0 +1,7 @@ +pub mod clock; +pub mod config; +pub mod events; +pub mod model; +mod network; +pub mod probability; +pub mod sim; diff --git a/sim-rs/src/model.rs b/sim-rs/sim-core/src/model.rs similarity index 100% rename from sim-rs/src/model.rs rename to sim-rs/sim-core/src/model.rs diff --git a/sim-rs/src/network.rs b/sim-rs/sim-core/src/network.rs similarity index 98% rename from sim-rs/src/network.rs rename to sim-rs/sim-core/src/network.rs index 2085ed8..71c5178 100644 --- a/sim-rs/src/network.rs +++ b/sim-rs/sim-core/src/network.rs @@ -83,6 +83,6 @@ impl IdLookup { fn find_node_id(&self, sim_id: SimId) -> NodeId { let id_list = self.0.read().expect("id list rwlock poisoned!"); let index = id_list.binary_search(&sim_id).expect("unrecognized sim id"); - NodeId::from_usize(index) + NodeId::new(index) } } diff --git a/sim-rs/src/probability.rs b/sim-rs/sim-core/src/probability.rs similarity index 100% rename from sim-rs/src/probability.rs rename to sim-rs/sim-core/src/probability.rs diff --git a/sim-rs/src/sim.rs b/sim-rs/sim-core/src/sim.rs similarity index 99% rename from sim-rs/src/sim.rs rename to sim-rs/sim-core/src/sim.rs index 006dfc8..94a8ac7 100644 --- a/sim-rs/src/sim.rs +++ b/sim-rs/sim-core/src/sim.rs @@ -275,7 +275,7 @@ impl Simulation { fn choose_random_node(&mut self) -> NodeId { let index = self.rng.gen_range(0..self.nodes.len()); - NodeId::from_usize(index) + NodeId::new(index) } } diff --git a/sim-rs/src/sim/event_queue.rs b/sim-rs/sim-core/src/sim/event_queue.rs similarity index 100% rename from sim-rs/src/sim/event_queue.rs rename to sim-rs/sim-core/src/sim/event_queue.rs