From c80136aee03b208b22b17f61619f6f8f761bec11 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Thu, 17 Oct 2024 17:40:10 -0400 Subject: [PATCH] Sg/leios ib generation (#44) * Make TransactionId wrapper * Implement IB generation and header propagation * Move praos and leios state to different structs * Fix realistic test data and update README * Don't use hash inside of InputBlockId * Propagate input block bodies * Do not include transactions in multiple IBs * Fix bug preventing more than one IB lottery per round * Log more info after each run * Improve sim accuracy * Fix event queue bugs/slowness --- sim-rs/Cargo.lock | 39 ++- sim-rs/Cargo.toml | 4 +- sim-rs/README.md | 4 +- sim-rs/src/clock.rs | 2 +- sim-rs/src/config.rs | 11 +- sim-rs/src/events.rs | 195 ++++++++--- sim-rs/src/main.rs | 5 +- sim-rs/src/model.rs | 75 +++++ sim-rs/src/network.rs | 6 +- sim-rs/src/sim.rs | 572 +++++++++++++++++++++++--------- sim-rs/src/sim/event_queue.rs | 96 ++++++ sim-rs/test_data/realistic.toml | 7 +- sim-rs/test_data/simple.toml | 15 +- 13 files changed, 808 insertions(+), 223 deletions(-) create mode 100644 sim-rs/src/model.rs create mode 100644 sim-rs/src/sim/event_queue.rs diff --git a/sim-rs/Cargo.lock b/sim-rs/Cargo.lock index fab18a2..a82746a 100644 --- a/sim-rs/Cargo.lock +++ b/sim-rs/Cargo.lock @@ -72,6 +72,28 @@ version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -440,7 +462,7 @@ dependencies = [ [[package]] name = "netsim-async" version = "0.1.0" -source = "git+https://github.com/SupernaviX/ce-netsim.git?rev=8569a6f#8569a6f01f2e9b65898e2255c73b5d8604cff6e5" +source = "git+https://github.com/input-output-hk/ce-netsim.git?rev=f4feba6#f4feba657021459690118b2874f9fed809a2dc4a" dependencies = [ "anyhow", "netsim-core", @@ -450,7 +472,7 @@ dependencies = [ [[package]] name = "netsim-core" version = "0.1.0" -source = "git+https://github.com/SupernaviX/ce-netsim.git?rev=8569a6f#8569a6f01f2e9b65898e2255c73b5d8604cff6e5" +source = "git+https://github.com/input-output-hk/ce-netsim.git?rev=f4feba6#f4feba657021459690118b2874f9fed809a2dc4a" dependencies = [ "anyhow", "logos", @@ -553,6 +575,17 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "priority-queue" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714c75db297bc88a63783ffc6ab9f830698a6705aa0201416931759ef4c8183d" +dependencies = [ + "autocfg", + "equivalent", + "indexmap", +] + [[package]] name = "proc-macro2" version = "1.0.87" @@ -708,10 +741,12 @@ name = "sim-rs" version = "0.1.0" dependencies = [ "anyhow", + "async-stream", "clap", "ctrlc", "futures", "netsim-async", + "priority-queue", "rand", "rand_chacha", "rand_distr", diff --git a/sim-rs/Cargo.toml b/sim-rs/Cargo.toml index 5e2dc4e..6b097b2 100644 --- a/sim-rs/Cargo.toml +++ b/sim-rs/Cargo.toml @@ -5,10 +5,12 @@ edition = "2021" [dependencies] anyhow = "1" +async-stream = "0.3" clap = { version = "4", features = ["derive"] } ctrlc = "3" futures = "0.3" -netsim-async = { git = "https://github.com/SupernaviX/ce-netsim.git", rev = "8569a6f" } +netsim-async = { git = "https://github.com/input-output-hk/ce-netsim.git", rev = "f4feba6" } +priority-queue = "2" rand = "0.8" rand_chacha = "0.3" rand_distr = "0.4" diff --git a/sim-rs/README.md b/sim-rs/README.md index 46d19a3..12ab261 100644 --- a/sim-rs/README.md +++ b/sim-rs/README.md @@ -13,4 +13,6 @@ cargo run --release ./test_data/simple.toml output/simple.json The `input_path` is a TOML file which describes protocol parameters, the network topology, and other necessary configuration. Input files for predefined scenarios are in the `test_data` directory. -While the simulation is running, it will log what's going on to the console. You can stop it at any time with ctrl+c, and when you do it will save the stream of events to `output_path`. \ No newline at end of file +While the simulation is running, it will log what's going on to the console. You can stop it at any time with ctrl+c, and when you do it will save the stream of events to `output_path`. + +The simulation runs in realtime (1 slot every second), but you can speed it up by passing e.g. `-t 16` to run 16 times faster. diff --git a/sim-rs/src/clock.rs b/sim-rs/src/clock.rs index f78d1fd..5f43d3c 100644 --- a/sim-rs/src/clock.rs +++ b/sim-rs/src/clock.rs @@ -6,7 +6,7 @@ use std::{ use serde::Serialize; use tokio::time::{self, Sleep}; -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct Timestamp(Duration); impl Add for Timestamp { type Output = Timestamp; diff --git a/sim-rs/src/config.rs b/sim-rs/src/config.rs index 26f5108..9dee57a 100644 --- a/sim-rs/src/config.rs +++ b/sim-rs/src/config.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use crate::probability::FloatDistribution; -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] pub struct NodeId(usize); impl Display for NodeId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -50,8 +50,11 @@ struct RawConfig { nodes: Vec, links: Vec, block_generation_probability: f64, + ib_generation_probability: f64, max_block_size: u64, max_tx_size: u64, + max_ib_size: u64, + max_ib_requests_per_peer: usize, transaction_frequency_ms: DistributionConfig, transaction_size_bytes: DistributionConfig, } @@ -75,8 +78,11 @@ pub struct SimConfiguration { pub nodes: Vec, pub links: Vec, pub block_generation_probability: f64, + pub ib_generation_probability: f64, pub max_block_size: u64, pub max_tx_size: u64, + pub max_ib_size: u64, + pub max_ib_requests_per_peer: usize, pub transaction_frequency_ms: FloatDistribution, pub transaction_size_bytes: FloatDistribution, } @@ -126,8 +132,11 @@ impl From for SimConfiguration { nodes, links, block_generation_probability: value.block_generation_probability, + ib_generation_probability: value.ib_generation_probability, max_block_size: value.max_block_size, max_tx_size: value.max_tx_size, + max_ib_size: value.max_ib_size, + max_ib_requests_per_peer: value.max_ib_requests_per_peer, transaction_frequency_ms: value.transaction_frequency_ms.into(), transaction_size_bytes: value.transaction_size_bytes.into(), } diff --git a/sim-rs/src/events.rs b/sim-rs/src/events.rs index 9e27ffe..7bef11f 100644 --- a/sim-rs/src/events.rs +++ b/sim-rs/src/events.rs @@ -3,14 +3,19 @@ use std::{collections::BTreeMap, fs, path::PathBuf, sync::Arc}; use anyhow::Result; use serde::Serialize; use tokio::sync::mpsc; -use tracing::{info, warn}; +use tracing::{info, info_span, warn}; use crate::{ clock::{Clock, Timestamp}, config::{NodeId, SimConfiguration}, + model::{Block, InputBlock, Transaction, TransactionId}, }; pub enum Event { + Transaction { + id: TransactionId, + bytes: u64, + }, Slot { number: u64, block: Option, @@ -20,35 +25,25 @@ pub enum Event { sender: NodeId, recipient: NodeId, }, - Transaction { - id: u64, - bytes: u64, + InputBlockGenerated { + block: Arc, + }, + InputBlockReceived { + block: Arc, + sender: NodeId, + recipient: NodeId, }, -} - -#[derive(Clone, PartialEq, Eq)] -pub struct Block { - pub slot: u64, - pub publisher: NodeId, - pub conflicts: Vec, - pub transactions: Vec>, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Transaction { - pub id: u64, - pub bytes: u64, } #[derive(Clone, Serialize)] enum OutputEvent { - BlockGenerated { + PraosBlockGenerated { time: Timestamp, slot: u64, - publisher: NodeId, - transactions: Vec, + producer: NodeId, + transactions: Vec, }, - BlockReceived { + PraosBlockReceived { time: Timestamp, slot: u64, sender: NodeId, @@ -56,9 +51,24 @@ enum OutputEvent { }, TransactionCreated { time: Timestamp, - id: u64, + id: TransactionId, bytes: u64, }, + InputBlockGenerated { + time: Timestamp, + slot: u64, + producer: NodeId, + index: u64, + transactions: Vec, + }, + InputBlockReceived { + time: Timestamp, + slot: u64, + producer: NodeId, + index: u64, + sender: NodeId, + recipient: NodeId, + }, } #[derive(Clone)] @@ -91,6 +101,18 @@ impl EventTracker { }); } + pub fn track_ib_generated(&self, block: Arc) { + self.send(Event::InputBlockGenerated { block }); + } + + pub fn track_ib_received(&self, block: Arc, sender: NodeId, recipient: NodeId) { + self.send(Event::InputBlockReceived { + block, + sender, + recipient, + }); + } + fn send(&self, event: Event) { if self.sender.send((event, self.clock.now())).is_err() { warn!("tried sending event after aggregator finished"); @@ -99,6 +121,7 @@ impl EventTracker { } pub struct EventMonitor { + node_ids: Vec, pool_ids: Vec, events_source: mpsc::UnboundedReceiver<(Event, Timestamp)>, output_path: Option, @@ -110,6 +133,7 @@ impl EventMonitor { events_source: mpsc::UnboundedReceiver<(Event, Timestamp)>, output_path: Option, ) -> Self { + let node_ids = config.nodes.iter().map(|p| p.id).collect(); let pool_ids = config .nodes .iter() @@ -117,6 +141,7 @@ impl EventMonitor { .map(|p| p.id) .collect(); Self { + node_ids, pool_ids, events_source, output_path, @@ -128,30 +153,35 @@ impl EventMonitor { pub async fn run(mut self) -> Result<()> { let mut blocks_published: BTreeMap = BTreeMap::new(); let mut blocks_rejected: BTreeMap = BTreeMap::new(); - let mut pending_tx_sizes: BTreeMap = BTreeMap::new(); + let mut pending_tx_sizes: BTreeMap = BTreeMap::new(); + let mut tx_ib_counts: BTreeMap = BTreeMap::new(); + let mut seen_ibs: BTreeMap = BTreeMap::new(); let mut filled_slots = 0u64; let mut empty_slots = 0u64; let mut published_txs = 0u64; let mut published_bytes = 0u64; + let mut generated_ibs = 0u64; + let mut total_txs = 0u64; let mut output = vec![]; while let Some((event, timestamp)) = self.events_source.recv().await { self.compute_output_events(&mut output, &event, timestamp); match event { + Event::Transaction { id, bytes } => { + total_txs += 1; + pending_tx_sizes.insert(id, bytes); + } Event::Slot { number, block } => { if let Some(block) = block { - info!( - "Pool {} published a block in slot {number}.", - block.publisher - ); + info!("Pool {} produced a block in slot {number}.", block.producer); filled_slots += 1; for published_tx in block.transactions { published_txs += 1; published_bytes += published_tx.bytes; pending_tx_sizes.remove(&published_tx.id); } - *blocks_published.entry(block.publisher).or_default() += 1; + *blocks_published.entry(block.producer).or_default() += 1; for conflict in block.conflicts { *blocks_rejected.entry(conflict).or_default() += 1; @@ -162,30 +192,72 @@ impl EventMonitor { } } Event::BlockReceived { .. } => {} - Event::Transaction { id, bytes } => { - pending_tx_sizes.insert(id, bytes); + Event::InputBlockGenerated { block } => { + generated_ibs += 1; + for tx in &block.transactions { + *tx_ib_counts.entry(tx.id).or_default() += 1; + } + *seen_ibs.entry(block.header.producer).or_default() += 1; + info!( + "Pool {} generated an IB with {} transaction(s) in slot {}", + block.header.producer, + block.transactions.len(), + block.header.slot, + ) + } + Event::InputBlockReceived { recipient, .. } => { + *seen_ibs.entry(recipient).or_default() += 1; } } } - info!("{filled_slots} block(s) were published."); - info!("{empty_slots} slot(s) had no blocks."); - info!("{published_txs} transaction(s) ({published_bytes} byte(s)) made it on-chain."); - - info!( - "{} transaction(s) ({} byte(s)) did not reach a block.", - pending_tx_sizes.len(), - pending_tx_sizes.into_values().sum::() - ); + info_span!("praos").in_scope(|| { + info!("{filled_slots} block(s) were published."); + info!("{empty_slots} slot(s) had no blocks."); + info!("{published_txs} transaction(s) ({published_bytes} byte(s)) made it on-chain."); + info!( + "{} transaction(s) ({} byte(s)) did not reach a block.", + pending_tx_sizes.len(), + pending_tx_sizes.into_values().sum::() + ); - for id in self.pool_ids { - if let Some(published) = blocks_published.get(&id) { - info!("Pool {id} published {published} block(s)"); - } - if let Some(rejected) = blocks_rejected.get(&id) { - info!("Pool {id} failed to publish {rejected} block(s) due to conflicts."); + for id in self.pool_ids { + if let Some(published) = blocks_published.get(&id) { + info!("Pool {id} published {published} block(s)"); + } + if let Some(rejected) = blocks_rejected.get(&id) { + info!("Pool {id} failed to publish {rejected} block(s) due to conflicts."); + } } - } + }); + + info_span!("leios").in_scope(|| { + let txs_in_ib: u64 = tx_ib_counts.values().copied().sum(); + let avg_seen = self + .node_ids + .iter() + .map(|id| seen_ibs.get(id).copied().unwrap_or_default() as f64) + .sum::() + / self.node_ids.len() as f64; + info!( + "{generated_ibs} IB(s) were generated, on average {} per slot.", + generated_ibs as f64 / (filled_slots + empty_slots) as f64 + ); + info!( + "{} out of {} transaction(s) reached an IB.", + tx_ib_counts.len(), + total_txs + ); + info!( + "Each transaction was included in an average of {} IBs.", + txs_in_ib as f64 / total_txs as f64 + ); + info!( + "Each IB contained an average of {} transactions.", + txs_in_ib as f64 / generated_ibs as f64 + ); + info!("Each node received an average of {avg_seen} IBs."); + }); if let Some(path) = self.output_path { if let Some(parent) = path.parent() { @@ -200,10 +272,10 @@ impl EventMonitor { match event { Event::Slot { number, block } => { if let Some(block) = block { - output.push(OutputEvent::BlockGenerated { + output.push(OutputEvent::PraosBlockGenerated { time, slot: *number, - publisher: block.publisher, + producer: block.producer, transactions: block.transactions.iter().map(|t| t.id).collect(), }); } @@ -220,13 +292,36 @@ impl EventMonitor { sender, recipient, } => { - output.push(OutputEvent::BlockReceived { + output.push(OutputEvent::PraosBlockReceived { time, slot: *slot, sender: *sender, recipient: *recipient, }); } + Event::InputBlockGenerated { block } => { + output.push(OutputEvent::InputBlockGenerated { + time, + slot: block.header.slot, + producer: block.header.producer, + index: block.header.index, + transactions: block.transactions.iter().map(|t| t.id).collect(), + }); + } + Event::InputBlockReceived { + block, + sender, + recipient, + } => { + output.push(OutputEvent::InputBlockReceived { + time, + slot: block.header.slot, + producer: block.header.producer, + index: block.header.index, + sender: *sender, + recipient: *recipient, + }); + } } } } diff --git a/sim-rs/src/main.rs b/sim-rs/src/main.rs index 60da986..972e3d8 100644 --- a/sim-rs/src/main.rs +++ b/sim-rs/src/main.rs @@ -15,6 +15,7 @@ use tracing::warn; mod clock; mod config; mod events; +mod model; mod network; mod probability; mod sim; @@ -52,10 +53,10 @@ async fn main() -> Result<()> { let clock = Clock::new(Instant::now(), config.timescale); let tracker = EventTracker::new(events_sink, clock.clone()); - let mut simulation = Simulation::new(config, clock)?; + let mut simulation = Simulation::new(config, tracker, clock)?; select! { - result = simulation.run(tracker) => { result? } + result = simulation.run() => { result? } result = &mut monitor => { result? } _ = ctrlc_source => {} }; diff --git a/sim-rs/src/model.rs b/sim-rs/src/model.rs new file mode 100644 index 0000000..ecdf3de --- /dev/null +++ b/sim-rs/src/model.rs @@ -0,0 +1,75 @@ +use std::{fmt::Display, sync::Arc}; + +use crate::{clock::Timestamp, config::NodeId}; +use serde::Serialize; + +macro_rules! id_wrapper { + ($outer:ident, $inner:ty) => { + #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)] + pub struct $outer($inner); + impl Display for $outer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } + } + impl $outer { + #[allow(unused)] + pub fn new(value: $inner) -> Self { + Self(value) + } + } + }; +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Block { + pub slot: u64, + pub producer: NodeId, + pub conflicts: Vec, + pub transactions: Vec>, +} + +id_wrapper!(TransactionId, u64); + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Transaction { + pub id: TransactionId, + pub bytes: u64, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)] +pub struct InputBlockId { + slot: u64, + producer: NodeId, + index: u64, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct InputBlockHeader { + pub slot: u64, + pub producer: NodeId, + /// Need this field to distinguish IBs from the same slot+producer. + /// The real implementation can use the VRF proof for that. + pub index: u64, + pub timestamp: Timestamp, +} +impl InputBlockHeader { + pub fn id(&self) -> InputBlockId { + InputBlockId { + slot: self.slot, + producer: self.producer, + index: self.index, + } + } +} + +#[derive(Debug)] +pub struct InputBlock { + pub header: InputBlockHeader, + pub transactions: Vec>, +} +impl InputBlock { + pub fn bytes(&self) -> u64 { + self.transactions.iter().map(|tx| tx.bytes).sum() + } +} diff --git a/sim-rs/src/network.rs b/sim-rs/src/network.rs index 39bdf4d..62a1a59 100644 --- a/sim-rs/src/network.rs +++ b/sim-rs/src/network.rs @@ -13,9 +13,11 @@ pub struct Network { } impl Network { - pub fn new() -> Self { + pub fn new(timescale: u32) -> Self { + let mut config = netsim_async::SimConfiguration::default(); + config.idle_duration /= timescale; Self { - context: SimContext::new(), + context: SimContext::with_config(config), id_lookup: IdLookup::default(), } } diff --git a/sim-rs/src/sim.rs b/sim-rs/src/sim.rs index 7d33de4..07dda73 100644 --- a/sim-rs/src/sim.rs +++ b/sim-rs/src/sim.rs @@ -1,53 +1,60 @@ use std::{ - cmp::Reverse, - collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque}, + collections::{BTreeMap, BTreeSet, HashSet, VecDeque}, sync::Arc, time::Duration, }; use anyhow::{bail, Context, Result}; -use futures::{stream::FuturesUnordered, StreamExt}; +use event_queue::EventQueue; use netsim_async::{EdgePolicy, HasBytesSize, Latency}; +use priority_queue::PriorityQueue; use rand::Rng as _; use rand_chacha::{rand_core::SeedableRng, ChaChaRng}; use rand_distr::Distribution as _; -use tokio::select; use crate::{ clock::{Clock, Timestamp}, config::{NodeConfiguration, NodeId, SimConfiguration}, - events::{Block, EventTracker, Transaction}, - network::{Network, NetworkSink, NetworkSource}, + events::EventTracker, + model::{Block, InputBlock, InputBlockHeader, InputBlockId, Transaction, TransactionId}, + network::{Network, NetworkSink}, }; +mod event_queue; + pub struct Simulation { config: SimConfiguration, - clock: Clock, + tracker: EventTracker, rng: ChaChaRng, network: Network, + event_queue: EventQueue, nodes: BTreeMap, - msg_sources: BTreeMap>, - next_slot: u64, next_tx_id: u64, - event_queue: BinaryHeap, unpublished_txs: VecDeque>, - txs: BTreeMap>, + txs: BTreeMap>, } impl Simulation { - pub fn new(config: SimConfiguration, clock: Clock) -> Result { + pub fn new(config: SimConfiguration, tracker: EventTracker, clock: Clock) -> Result { let total_stake = config.nodes.iter().map(|p| p.stake).sum(); - let mut network = Network::new(); + let mut network = Network::new(config.timescale); let rng = ChaChaRng::seed_from_u64(config.seed); let mut nodes = BTreeMap::new(); - let mut msg_sources = BTreeMap::new(); + let mut msg_sources = vec![]; for node_config in &config.nodes { let id = node_config.id; let (msg_source, msg_sink) = network.open(id).context("could not open socket")?; - let node = Node::new(node_config, &config, total_stake, msg_sink); - msg_sources.insert(node.id, msg_source); + let node = Node::new( + node_config, + &config, + total_stake, + msg_sink, + tracker.clone(), + clock.clone(), + ); + msg_sources.push((node.id, msg_source)); nodes.insert(node.id, node); } for link_config in config.links.iter() { @@ -61,36 +68,34 @@ impl Simulation { )?; } - let mut sim = Self { + let mut event_queue = EventQueue::new(clock, msg_sources); + event_queue.queue_event(SimulationEvent::NewSlot(0), Duration::ZERO); + event_queue.queue_event(SimulationEvent::NewTransaction, Duration::ZERO); + Ok(Self { config, - clock, + tracker, rng, network, + event_queue, nodes, - msg_sources, - next_slot: 0, next_tx_id: 0, - event_queue: BinaryHeap::new(), unpublished_txs: VecDeque::new(), txs: BTreeMap::new(), - }; - sim.queue_event(SimulationEvent::NewSlot, Duration::ZERO); - sim.queue_event(SimulationEvent::NewTransaction, Duration::ZERO); - - Ok(sim) + }) } // Run the simulation indefinitely. - pub async fn run(&mut self, tracker: EventTracker) -> Result<()> { - while let Some(event) = self.next_event().await { + pub async fn run(&mut self) -> Result<()> { + while let Some(event) = self.event_queue.next_event().await { match event { - SimulationEvent::NewSlot => self.run_slot_lottery(&tracker)?, - SimulationEvent::NewTransaction => self.generate_tx(&tracker)?, + SimulationEvent::NewSlot(slot) => self.handle_new_slot(slot)?, + SimulationEvent::NewTransaction => self.generate_tx()?, SimulationEvent::NetworkMessage { from, to, msg } => { let Some(target) = self.nodes.get_mut(&to) else { bail!("unrecognized message target {to}"); }; match msg { + // TX propagation SimulationMessage::AnnounceTx(id) => { target.receive_announce_tx(from, id)?; } @@ -101,6 +106,8 @@ impl Simulation { SimulationMessage::Tx(tx) => { target.receive_tx(from, tx)?; } + + // Block propagation SimulationMessage::RollForward(slot) => { target.receive_roll_forward(from, slot)?; } @@ -108,9 +115,32 @@ impl Simulation { target.receive_request_block(from, slot)?; } SimulationMessage::Block(block) => { - tracker.track_block_received(block.slot, from, to); + self.tracker.track_block_received(block.slot, from, to); target.receive_block(from, block)?; } + + // IB header propagation + SimulationMessage::AnnounceIBHeader(id) => { + target.receive_announce_ib_header(from, id)?; + } + SimulationMessage::RequestIBHeader(id) => { + target.receive_request_ib_header(from, id)?; + } + SimulationMessage::IBHeader(header) => { + target.receive_ib_header(from, header)?; + } + + // IB transmission + SimulationMessage::AnnounceIB(id) => { + target.receive_announce_ib(from, id)?; + } + SimulationMessage::RequestIB(id) => { + target.receive_request_ib(from, id)?; + } + SimulationMessage::IB(ib) => { + self.tracker.track_ib_received(ib.clone(), from, to); + target.receive_ib(from, ib)?; + } } } } @@ -122,59 +152,51 @@ impl Simulation { self.network.shutdown() } - fn queue_event(&mut self, event: SimulationEvent, after: Duration) { + fn handle_new_slot(&mut self, slot: u64) -> Result<()> { + self.handle_input_block_generation(slot)?; + self.try_generate_praos_block(slot)?; + self.event_queue - .push(FutureEvent(self.clock.now() + after, event)); + .queue_event(SimulationEvent::NewSlot(slot + 1), Duration::from_secs(1)); + Ok(()) } - async fn next_event(&mut self) -> Option { - let queued_event = self.event_queue.peek().cloned(); - - let clock = self.clock.clone(); - let next_queued_event = async move { - let FutureEvent(timestamp, event) = queued_event?; - clock.wait_until(timestamp).await; - Some(event) - }; - - let mut next_incoming_message = FuturesUnordered::new(); - for (id, source) in self.msg_sources.iter_mut() { - next_incoming_message.push(async move { - let (from, msg) = source.recv().await?; - Some(SimulationEvent::NetworkMessage { from, to: *id, msg }) - }); + fn handle_input_block_generation(&mut self, slot: u64) -> Result<()> { + // Publish any input blocks from last round + if slot > 0 { + for node in self.nodes.values_mut() { + node.finish_generating_ibs(slot - 1)?; + } } - select! { - biased; // always poll the "next queued event" future first - Some(event) = next_queued_event => { - self.event_queue.pop(); - Some(event) + let mut probability = self.config.ib_generation_probability; + let mut block_counts = BTreeMap::new(); + while probability > 0.0 { + let next_p = f64::min(probability, 1.0); + for (id, _) in self.run_vrf_lottery(next_p) { + *block_counts.entry(id).or_default() += 1; } - Some(Some(event)) = next_incoming_message.next() => Some(event), - else => None + probability -= 1.0; + } + for (id, count) in block_counts { + self.get_node_mut(&id).begin_generating_ibs(slot, count)?; } + Ok(()) } - fn run_slot_lottery(&mut self, tracker: &EventTracker) -> Result<()> { - let vrf_winners: Vec<(NodeId, u64)> = self - .nodes - .values() - .filter_map(|node| { - let result = node.run_vrf(&mut self.rng)?; - Some((node.id, result)) - }) - .collect(); + fn try_generate_praos_block(&mut self, slot: u64) -> Result<()> { + let vrf_winners = self.run_vrf_lottery(self.config.block_generation_probability); let winner = vrf_winners .iter() .max_by_key(|(_, result)| *result) .map(|(id, _)| *id); - if let Some(publisher) = winner { + // L1 block generation + if let Some(producer) = winner { let conflicts = vrf_winners .into_iter() - .filter_map(|(id, _)| if publisher != id { Some(id) } else { None }) + .filter_map(|(id, _)| if producer != id { Some(id) } else { None }) .collect(); // Fill a block with as many pending transactions as can fit @@ -189,51 +211,58 @@ impl Simulation { } let block = Block { - slot: self.next_slot, - publisher, + slot, + producer, conflicts, transactions, }; - self.nodes - .get_mut(&publisher) - .unwrap() + self.get_node_mut(&producer) .publish_block(Arc::new(block.clone()))?; - tracker.track_slot(self.next_slot, Some(block)); + self.tracker.track_slot(slot, Some(block)); } else { - tracker.track_slot(self.next_slot, None); + self.tracker.track_slot(slot, None); } - self.next_slot += 1; - self.queue_event(SimulationEvent::NewSlot, Duration::from_secs(1)); Ok(()) } - fn generate_tx(&mut self, tracker: &EventTracker) -> Result<()> { - let id = self.next_tx_id; + fn run_vrf_lottery(&mut self, success_rate: f64) -> Vec<(NodeId, u64)> { + self.nodes + .values() + .filter_map(|node| { + let result = node.run_vrf(&mut self.rng, success_rate)?; + Some((node.id, result)) + }) + .collect() + } + + fn generate_tx(&mut self) -> Result<()> { + let id = TransactionId::new(self.next_tx_id); let bytes = self .config .max_tx_size .min(self.config.transaction_size_bytes.sample(&mut self.rng) as u64); let tx = Arc::new(Transaction { id, bytes }); - tracker.track_transaction(&tx); + self.tracker.track_transaction(&tx); self.unpublished_txs.push_back(tx.clone()); - self.txs.insert(id, tx); + self.txs.insert(id, tx.clone()); self.next_tx_id += 1; let ms_until_tx = self.config.transaction_frequency_ms.sample(&mut self.rng) as u64; - self.queue_event( + self.event_queue.queue_event( SimulationEvent::NewTransaction, Duration::from_millis(ms_until_tx), ); // any node could be the first to see a transaction let publisher_id = self.choose_random_node(); - let publisher = self - .nodes - .get_mut(&publisher_id) - .expect("chose nonexistent node"); - publisher.propagate_tx(id) + self.get_node_mut(&publisher_id) + .receive_tx(publisher_id, tx) + } + + fn get_node_mut(&mut self, id: &NodeId) -> &mut Node { + self.nodes.get_mut(id).expect("chose nonexistent node") } fn choose_random_node(&mut self) -> NodeId { @@ -245,13 +274,43 @@ impl Simulation { struct Node { id: NodeId, msg_sink: NetworkSink, - target_vrf_stake: u64, + tracker: EventTracker, + clock: Clock, + stake: u64, total_stake: u64, + max_ib_size: u64, + max_ib_requests_per_peer: usize, peers: Vec, + praos: NodePraosState, + leios: NodeLeiosState, +} + +#[derive(Default)] +struct NodePraosState { peer_heads: BTreeMap, blocks_seen: BTreeSet, blocks: BTreeMap>, - txs_seen: BTreeSet, + txs_seen: BTreeSet, +} + +#[derive(Default)] +struct PeerInputBlockRequests { + pending: PriorityQueue, + active: HashSet, +} + +struct PendingInputBlock { + header: InputBlockHeader, + has_been_requested: bool, +} + +#[derive(Default)] +struct NodeLeiosState { + mempool: BTreeMap>, + unsent_ibs: BTreeMap>, + ibs: BTreeMap>, + pending_ibs: BTreeMap, + ib_requests: BTreeMap, } impl Node { @@ -260,49 +319,48 @@ impl Node { sim_config: &SimConfiguration, total_stake: u64, msg_sink: NetworkSink, + tracker: EventTracker, + clock: Clock, ) -> Self { let id = config.id; - let target_vrf_stake = compute_target_vrf_stake( - config.stake, - total_stake, - sim_config.block_generation_probability, - ); + let stake = config.stake; let peers = config.peers.clone(); Self { id, msg_sink, - target_vrf_stake, + tracker, + clock, + stake, total_stake, + max_ib_size: sim_config.max_ib_size, + max_ib_requests_per_peer: sim_config.max_ib_requests_per_peer, peers, - peer_heads: BTreeMap::new(), - blocks_seen: BTreeSet::new(), - blocks: BTreeMap::new(), - txs_seen: BTreeSet::new(), - } - } - - fn propagate_tx(&mut self, id: u64) -> Result<()> { - for peer in &self.peers { - self.msg_sink - .send_to(*peer, SimulationMessage::AnnounceTx(id))?; + praos: NodePraosState::default(), + leios: NodeLeiosState::default(), } - Ok(()) } fn publish_block(&mut self, block: Arc) -> Result<()> { + // Do not remove TXs in these blocks from the leios mempool. + // Wait until we learn more about how praos and leios interact. for peer in &self.peers { - if !self.peer_heads.get(peer).is_some_and(|&s| s >= block.slot) { + if !self + .praos + .peer_heads + .get(peer) + .is_some_and(|&s| s >= block.slot) + { self.msg_sink .send_to(*peer, SimulationMessage::RollForward(block.slot))?; - self.peer_heads.insert(*peer, block.slot); + self.praos.peer_heads.insert(*peer, block.slot); } } - self.blocks.insert(block.slot, block); + self.praos.blocks.insert(block.slot, block); Ok(()) } - fn receive_announce_tx(&mut self, from: NodeId, id: u64) -> Result<()> { - if self.txs_seen.insert(id) { + fn receive_announce_tx(&mut self, from: NodeId, id: TransactionId) -> Result<()> { + if self.praos.txs_seen.insert(id) { self.msg_sink .send_to(from, SimulationMessage::RequestTx(id))?; } @@ -314,18 +372,20 @@ impl Node { } fn receive_tx(&mut self, from: NodeId, tx: Arc) -> Result<()> { + let id = tx.id; + self.leios.mempool.insert(tx.id, tx); for peer in &self.peers { if *peer == from { continue; } self.msg_sink - .send_to(*peer, SimulationMessage::AnnounceTx(tx.id))?; + .send_to(*peer, SimulationMessage::AnnounceTx(id))?; } - Ok(()) + self.try_fill_ibs() } fn receive_roll_forward(&mut self, from: NodeId, slot: u64) -> Result<()> { - if self.blocks_seen.insert(slot) { + if self.praos.blocks_seen.insert(slot) { self.msg_sink .send_to(from, SimulationMessage::RequestBlock(slot))?; } @@ -333,7 +393,7 @@ impl Node { } fn receive_request_block(&mut self, from: NodeId, slot: u64) -> Result<()> { - if let Some(block) = self.blocks.get(&slot) { + if let Some(block) = self.praos.blocks.get(&slot) { self.msg_sink .send_to(from, SimulationMessage::Block(block.clone()))?; } @@ -341,8 +401,15 @@ impl Node { } fn receive_block(&mut self, from: NodeId, block: Arc) -> Result<()> { - if self.blocks.insert(block.slot, block.clone()).is_none() { - let head = self.peer_heads.entry(from).or_default(); + if self + .praos + .blocks + .insert(block.slot, block.clone()) + .is_none() + { + // Do not remove TXs in these blocks from the leios mempool. + // Wait until we learn more about how praos and leios interact. + let head = self.praos.peer_heads.entry(from).or_default(); if *head < block.slot { *head = block.slot } @@ -351,57 +418,227 @@ impl Node { Ok(()) } - // Simulates the output of a VRF using this node's stake (if any). - fn run_vrf(&self, rng: &mut ChaChaRng) -> Option { - let result = rng.gen_range(0..self.total_stake); - if result < self.target_vrf_stake { - Some(result) + fn receive_announce_ib_header(&mut self, from: NodeId, id: InputBlockId) -> Result<()> { + self.msg_sink + .send_to(from, SimulationMessage::RequestIBHeader(id))?; + Ok(()) + } + + fn receive_request_ib_header(&mut self, from: NodeId, id: InputBlockId) -> Result<()> { + if let Some(pending_ib) = self.leios.pending_ibs.get(&id) { + // We don't have this IB, just the header. Send that. + self.msg_sink + .send_to(from, SimulationMessage::IBHeader(pending_ib.header.clone()))?; + } else if let Some(ib) = self.leios.ibs.get(&id) { + // We have the full IB. Send the header, and also advertise that we have the full IB. + self.msg_sink + .send_to(from, SimulationMessage::IBHeader(ib.header.clone()))?; + self.msg_sink + .send_to(from, SimulationMessage::AnnounceIB(id))?; + } + Ok(()) + } + + fn receive_ib_header(&mut self, from: NodeId, header: InputBlockHeader) -> Result<()> { + let id = header.id(); + if self.leios.ibs.contains_key(&id) { + return Ok(()); + } + if self.leios.pending_ibs.contains_key(&id) { + return Ok(()); + } + self.leios.pending_ibs.insert( + id, + PendingInputBlock { + header, + has_been_requested: false, + }, + ); + // We haven't seen this header before, so propagate it to our neighbors + for peer in &self.peers { + if *peer == from { + continue; + } + self.msg_sink + .send_to(*peer, SimulationMessage::AnnounceIBHeader(id))?; + } + Ok(()) + } + + fn receive_announce_ib(&mut self, from: NodeId, id: InputBlockId) -> Result<()> { + let Some(pending_ib) = self.leios.pending_ibs.get_mut(&id) else { + return Ok(()); + }; + // Ignore IBs which have already been requested + if pending_ib.has_been_requested { + return Ok(()); + } + // Do we have capacity to request this block? + let reqs = self.leios.ib_requests.entry(from).or_default(); + if reqs.active.len() < self.max_ib_requests_per_peer { + // If so, make the request + pending_ib.has_been_requested = true; + reqs.active.insert(id); + self.msg_sink + .send_to(from, SimulationMessage::RequestIB(id))?; } else { - None + // If not, just track that this peer has this IB when we're ready + reqs.pending.push(id, pending_ib.header.timestamp); } + Ok(()) } -} -fn compute_target_vrf_stake( - stake: u64, - total_stake: u64, - block_generation_probability: f64, -) -> u64 { - let ratio = stake as f64 / total_stake as f64; - let p_success = 1. - (1. - block_generation_probability).powf(ratio); - (total_stake as f64 * p_success) as u64 -} + fn receive_request_ib(&mut self, from: NodeId, id: InputBlockId) -> Result<()> { + if let Some(ib) = self.leios.ibs.get(&id) { + self.msg_sink + .send_to(from, SimulationMessage::IB(ib.clone()))?; + } + Ok(()) + } + + fn receive_ib(&mut self, from: NodeId, ib: Arc) -> Result<()> { + let id = ib.header.id(); + for transaction in &ib.transactions { + self.leios.mempool.remove(&transaction.id); + } + self.leios.ibs.insert(id, ib); + + for peer in &self.peers { + if *peer == from { + continue; + } + self.msg_sink + .send_to(*peer, SimulationMessage::AnnounceIB(id))?; + } + + // Mark that this IB is no longer pending + self.leios.pending_ibs.remove(&id); + let reqs = self.leios.ib_requests.entry(from).or_default(); + reqs.active.remove(&id); + + // We now have capacity to request one more IB from this peer + while let Some((id, _)) = reqs.pending.pop() { + let Some(pending_ib) = self.leios.pending_ibs.get_mut(&id) else { + // We fetched this IB from some other node already + continue; + }; + if pending_ib.has_been_requested { + // There's already a request for this IB in flight + continue; + } + + // Make the request + pending_ib.has_been_requested = true; + reqs.active.insert(id); + self.msg_sink + .send_to(from, SimulationMessage::RequestIB(id))?; + } -// wrapper struct which holds a SimulationEvent, -// but is ordered by a timestamp (in reverse) -#[derive(Clone)] -struct FutureEvent(Timestamp, SimulationEvent); -impl FutureEvent { - fn key(&self) -> Reverse { - Reverse(self.0) + Ok(()) } -} -impl PartialEq for FutureEvent { - fn eq(&self, other: &Self) -> bool { - self.key() == other.key() + fn begin_generating_ibs(&mut self, slot: u64, count: u64) -> Result<()> { + let mut unsent_ibs = VecDeque::new(); + for index in 0..count { + let header = InputBlockHeader { + slot, + producer: self.id, + index, + timestamp: self.clock.now(), + }; + unsent_ibs.push_back(InputBlock { + header, + transactions: vec![], + }); + } + self.leios.unsent_ibs.insert(slot, unsent_ibs); + self.try_fill_ibs() + } + + fn try_fill_ibs(&mut self) -> Result<()> { + loop { + let Some(mut first_ib_entry) = self.leios.unsent_ibs.first_entry() else { + // we aren't sending any IBs + return Ok(()); + }; + let slot = *first_ib_entry.key(); + let unsent_ibs = first_ib_entry.get_mut(); + let Some(unsent_ib) = unsent_ibs.front_mut() else { + // we aren't sending any more IBs this round + self.leios.unsent_ibs.remove(&slot); + return Ok(()); + }; + let Some((_, tx)) = self.leios.mempool.first_key_value() else { + // our mempool is empty + return Ok(()); + }; + + let remaining_capacity = self.max_ib_size - unsent_ib.bytes(); + let tx_bytes = tx.bytes; + + if remaining_capacity >= tx_bytes { + // This IB has room for another TX, add it in + let (id, tx) = self.leios.mempool.pop_first().unwrap(); + self.leios.mempool.remove(&id); + unsent_ib.transactions.push(tx); + } + + if remaining_capacity <= tx_bytes { + // This IB is full, :shipit: + let ib = unsent_ibs.pop_front().unwrap(); + self.generate_ib(ib)?; + } + } } -} -impl Eq for FutureEvent {} -impl PartialOrd for FutureEvent { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) + + fn finish_generating_ibs(&mut self, slot: u64) -> Result<()> { + let Some(unsent_ibs) = self.leios.unsent_ibs.remove(&slot) else { + return Ok(()); + }; + for ib in unsent_ibs { + if ib.transactions.is_empty() { + continue; + } + self.generate_ib(ib)?; + } + Ok(()) } -} -impl Ord for FutureEvent { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.key().cmp(&other.key()) + + fn generate_ib(&mut self, mut ib: InputBlock) -> Result<()> { + ib.header.timestamp = self.clock.now(); + let ib = Arc::new(ib); + + self.tracker.track_ib_generated(ib.clone()); + + let id = ib.header.id(); + self.leios.ibs.insert(id, ib); + for peer in &self.peers { + self.msg_sink + .send_to(*peer, SimulationMessage::AnnounceIBHeader(id))?; + } + Ok(()) + } + + // Simulates the output of a VRF using this node's stake (if any). + fn run_vrf(&self, rng: &mut ChaChaRng, success_rate: f64) -> Option { + let target_vrf_stake = compute_target_vrf_stake(self.stake, self.total_stake, success_rate); + let result = rng.gen_range(0..self.total_stake); + if result < target_vrf_stake { + Some(result) + } else { + None + } } } -#[derive(Clone)] +fn compute_target_vrf_stake(stake: u64, total_stake: u64, success_rate: f64) -> u64 { + let ratio = stake as f64 / total_stake as f64; + (total_stake as f64 * ratio * success_rate) as u64 +} + +#[derive(Clone, Debug)] enum SimulationEvent { - NewSlot, + NewSlot(u64), NewTransaction, NetworkMessage { from: NodeId, @@ -410,14 +647,24 @@ enum SimulationEvent { }, } -#[derive(Clone)] +#[derive(Clone, Debug)] enum SimulationMessage { - AnnounceTx(u64), - RequestTx(u64), + // tx "propagation" + AnnounceTx(TransactionId), + RequestTx(TransactionId), Tx(Arc), + // praos block propagation RollForward(u64), RequestBlock(u64), Block(Arc), + // IB header propagation + AnnounceIBHeader(InputBlockId), + RequestIBHeader(InputBlockId), + IBHeader(InputBlockHeader), + // IB transmission + AnnounceIB(InputBlockId), + RequestIB(InputBlockId), + IB(Arc), } impl HasBytesSize for SimulationMessage { @@ -426,9 +673,18 @@ impl HasBytesSize for SimulationMessage { Self::AnnounceTx(_) => 8, Self::RequestTx(_) => 8, Self::Tx(tx) => tx.bytes, + Self::RollForward(_) => 8, Self::RequestBlock(_) => 8, Self::Block(block) => block.transactions.iter().map(|t| t.bytes).sum(), + + Self::AnnounceIBHeader(_) => 8, + Self::RequestIBHeader(_) => 8, + Self::IBHeader(_) => 32, + + Self::AnnounceIB(_) => 8, + Self::RequestIB(_) => 8, + Self::IB(ib) => ib.bytes(), } } } diff --git a/sim-rs/src/sim/event_queue.rs b/sim-rs/src/sim/event_queue.rs new file mode 100644 index 0000000..7debc98 --- /dev/null +++ b/sim-rs/src/sim/event_queue.rs @@ -0,0 +1,96 @@ +use std::{cmp::Reverse, collections::BinaryHeap, pin::Pin, time::Duration}; + +use async_stream::stream; +use futures::{stream::select_all, Stream, StreamExt}; +use tokio::select; + +use crate::{ + clock::{Clock, Timestamp}, + config::NodeId, + network::NetworkSource, +}; + +use super::{SimulationEvent, SimulationMessage}; + +pub struct EventQueue { + clock: Clock, + scheduled: BinaryHeap, + msg_source: Pin>>, +} + +impl EventQueue { + pub fn new(clock: Clock, msg_sources: Vec<(NodeId, NetworkSource)>) -> Self { + Self { + clock, + scheduled: BinaryHeap::new(), + msg_source: Box::pin(stream_incoming_messages(msg_sources)), + } + } + + pub fn queue_event(&mut self, event: SimulationEvent, after: Duration) { + self.scheduled + .push(FutureEvent(self.clock.now() + after, event)); + } + + pub async fn next_event(&mut self) -> Option { + let scheduled_event = self.scheduled.peek().cloned(); + let clock = self.clock.clone(); + + let next_scheduled_event = async move { + let FutureEvent(timestamp, event) = scheduled_event?; + clock.wait_until(timestamp).await; + Some(event) + }; + let next_network_event = &mut self.msg_source.next(); + + select! { + biased; // always poll the "next scheduled event" future first + Some(event) = next_scheduled_event => { + self.scheduled.pop(); + Some(event) + } + Some(event) = next_network_event => Some(event), + else => None + } + } +} + +fn stream_incoming_messages( + msg_sources: Vec<(NodeId, NetworkSource)>, +) -> impl Stream { + select_all(msg_sources.into_iter().map(|(to, mut source)| { + let stream = stream! { + while let Some((from, msg)) = source.recv().await { + yield SimulationEvent::NetworkMessage { from, to, msg } + } + }; + Box::pin(stream) + })) +} + +// wrapper struct which holds a SimulationEvent, +// but is ordered by a timestamp (in reverse) +#[derive(Clone)] +struct FutureEvent(Timestamp, SimulationEvent); +impl FutureEvent { + fn key(&self) -> Reverse { + Reverse(self.0) + } +} + +impl PartialEq for FutureEvent { + fn eq(&self, other: &Self) -> bool { + self.key() == other.key() + } +} +impl Eq for FutureEvent {} +impl PartialOrd for FutureEvent { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} +impl Ord for FutureEvent { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.key().cmp(&other.key()) + } +} diff --git a/sim-rs/test_data/realistic.toml b/sim-rs/test_data/realistic.toml index 1a6e111..71c2664 100644 --- a/sim-rs/test_data/realistic.toml +++ b/sim-rs/test_data/realistic.toml @@ -1,5 +1,8 @@ block_generation_probability = 0.05 +ib_generation_probability = 5.0 max_block_size = 90112 +max_ib_requests_per_peer = 1 +max_ib_size = 327680 max_tx_size = 16384 [[nodes]] @@ -90179,5 +90182,5 @@ scale = 1000.0 [transaction_size_bytes] distribution = "log_normal" -mu = 6.85 -sigma = 1.13 +mu = 6.833 +sigma = 1.127 diff --git a/sim-rs/test_data/simple.toml b/sim-rs/test_data/simple.toml index 14816a1..137dabc 100644 --- a/sim-rs/test_data/simple.toml +++ b/sim-rs/test_data/simple.toml @@ -39,8 +39,17 @@ links = [ { nodes = [10, 11] }, { nodes = [11, 12] }, ] + +# transaction parameters +transaction_frequency_ms = { distribution = "exp", lambda = 0.85, scale = 1000 } +transaction_size_bytes = { distribution = "log_normal", mu = 6.833, sigma = 1.127 } +max_tx_size = 16384 + +# praos block generation parameters block_generation_probability = 0.05 max_block_size = 90112 -max_tx_size = 16384 -transaction_frequency_ms = { distribution = "exp", lambda = 0.85, scale = 1000 } -transaction_size_bytes = { distribution = "log_normal", mu = 6.85, sigma = 1.13 } + +# IB parameters +ib_generation_probability = 0.5 # corresponds to 𝑓I in the model +max_ib_size = 327680 +max_ib_requests_per_peer = 1