From f9f886886b62667b5eaf0bd7e7ebbcf7f5362bf1 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Mon, 5 Feb 2024 08:46:22 +0200 Subject: [PATCH] Introduce approval-voting/distribution benchmark (#2621) ## Summary Built on top of the tooling and ideas introduced in https://github.com/paritytech/polkadot-sdk/pull/2528, this PR introduces a synthetic benchmark for measuring and assessing the performance characteristics of the approval-voting and approval-distribution subsystems. Currently this allows, us to simulate the behaviours of these systems based on the following dimensions: ``` TestConfiguration: # Test 1 - objective: !ApprovalsTest last_considered_tranche: 89 min_coalesce: 1 max_coalesce: 6 enable_assignments_v2: true send_till_tranche: 60 stop_when_approved: false coalesce_tranche_diff: 12 workdir_prefix: "/tmp" num_no_shows_per_candidate: 0 approval_distribution_expected_tof: 6.0 approval_distribution_cpu_ms: 3.0 approval_voting_cpu_ms: 4.30 n_validators: 500 n_cores: 100 n_included_candidates: 100 min_pov_size: 1120 max_pov_size: 5120 peer_bandwidth: 524288000000 bandwidth: 524288000000 latency: min_latency: secs: 0 nanos: 1000000 max_latency: secs: 0 nanos: 100000000 error: 0 num_blocks: 10 ``` ## The approach 1. We build a real overseer with the real implementations for approval-voting and approval-distribution subsystems. 2. For a given network size, for each validator we pre-computed all potential assignments and approvals it would send, because this a computation heavy operation this will be cached on a file on disk and be re-used if the generation parameters don't change. 3. The messages will be sent accordingly to the configured parameters and those are split into 3 main benchmarking scenarios. ## Benchmarking scenarios ### Best case scenario *approvals_throughput_best_case.yaml* It send to the approval-distribution only the minimum required tranche to gathered the needed_approvals, so that a candidate is approved. ### Behaviour in the presence of no-shows *approvals_no_shows.yaml* It sends the tranche needed to approve a candidate when we have a maximum of *num_no_shows_per_candidate* tranches with no-shows for each candidate. ### Maximum throughput *approvals_throughput.yaml* It sends all the tranches for each block and measures the used CPU and necessary network bandwidth. by the approval-voting and approval-distribution subsystem. ## How to run it ``` cargo run -p polkadot-subsystem-bench --release -- test-sequence --path polkadot/node/subsystem-bench/examples/approvals_throughput.yaml ``` ## Evaluating performance ### Use the real subsystems metrics If you follow the steps in https://github.com/paritytech/polkadot-sdk/tree/master/polkadot/node/subsystem-bench#install-grafana for installing locally prometheus and grafana, all real metrics for the `approval-distribution`, `approval-voting` and overseer are available. E.g: Screenshot 2023-12-05 at 11 07 46 Screenshot 2023-12-05 at 11 09 42 Screenshot 2023-12-05 at 11 10 15 Screenshot 2023-12-05 at 11 10 52 ### Profile with pyroscope 1. Setup pyroscope following the steps in https://github.com/paritytech/polkadot-sdk/tree/master/polkadot/node/subsystem-bench#install-pyroscope, then run any of the benchmark scenario with `--profile` as the arguments. 2. Open the pyroscope dashboard in grafana, e.g: Screenshot 2024-01-09 at 17 09 58 ### Useful logs 1. Network bandwidth requirements: ``` Payload bytes received from peers: 503993 KiB total, 50399 KiB/block Payload bytes sent to peers: 629971 KiB total, 62997 KiB/block ``` 2. Cpu usage by the approval-distribution/approval-voting subsystems. ``` approval-distribution CPU usage 84.061s approval-distribution CPU usage per block 8.406s approval-voting CPU usage 96.532s approval-voting CPU usage per block 9.653s ``` 3. Time passed until a given block is approved ``` Chain selection approved after 3500 ms hash=0x0101010101010101010101010101010101010101010101010101010101010101 Chain selection approved after 4500 ms hash=0x0202020202020202020202020202020202020202020202020202020202020202 ``` ### Using benchmark to quantify improvements from https://github.com/paritytech/polkadot-sdk/pull/1178 + https://github.com/paritytech/polkadot-sdk/pull/1191 Using a versi-node we compare the scenarios where all new optimisations are disabled with a scenarios where tranche0 assignments are sent in a single message and a conservative simulation where the coalescing of approvals gives us just 50% reduction in the number of messages we send. Overall, what we see is a speedup of around 30-40% in the time it takes to process the necessary messages and a 30-40% reduction in the necessary bandwidth. #### Best case scenario comparison(minimum required tranches sent). Unoptimised ``` Number of blocks: 10 Payload bytes received from peers: 53289 KiB total, 5328 KiB/block Payload bytes sent to peers: 52489 KiB total, 5248 KiB/block approval-distribution CPU usage 6.732s approval-distribution CPU usage per block 0.673s approval-voting CPU usage 9.523s approval-voting CPU usage per block 0.952s ``` vs Optimisation enabled ``` Number of blocks: 10 Payload bytes received from peers: 32141 KiB total, 3214 KiB/block Payload bytes sent to peers: 37314 KiB total, 3731 KiB/block approval-distribution CPU usage 4.658s approval-distribution CPU usage per block 0.466s approval-voting CPU usage 6.236s approval-voting CPU usage per block 0.624s ``` #### Worst case all tranches sent, very unlikely happens when sharding breaks. Unoptimised ``` Number of blocks: 10 Payload bytes received from peers: 746393 KiB total, 74639 KiB/block Payload bytes sent to peers: 729151 KiB total, 72915 KiB/block approval-distribution CPU usage 118.681s approval-distribution CPU usage per block 11.868s approval-voting CPU usage 124.118s approval-voting CPU usage per block 12.412s ``` vs optimised ``` Number of blocks: 10 Payload bytes received from peers: 503993 KiB total, 50399 KiB/block Payload bytes sent to peers: 629971 KiB total, 62997 KiB/block approval-distribution CPU usage 84.061s approval-distribution CPU usage per block 8.406s approval-voting CPU usage 96.532s approval-voting CPU usage per block 9.653s ``` ## TODOs [x] Polish implementation. [x] Use what we have so far to evaluate https://github.com/paritytech/polkadot-sdk/pull/1191 before merging. [x] List of features and additional dimensions we want to use for benchmarking. [x] Run benchmark on hardware similar with versi and kusama nodes. [ ] Add benchmark to be run in CI for catching regression in performance. [ ] Rebase on latest changes for network emulation. --------- Signed-off-by: Andrei Sandu Signed-off-by: Alexandru Gheorghe Co-authored-by: Andrei Sandu Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com> --- Cargo.lock | 15 +- .../node/core/approval-voting/src/criteria.rs | 8 +- polkadot/node/core/approval-voting/src/lib.rs | 41 +- .../node/core/approval-voting/src/tests.rs | 4 +- .../node/core/approval-voting/src/time.rs | 24 +- polkadot/node/subsystem-bench/Cargo.toml | 13 + .../examples/approvals_no_shows.yaml | 18 + .../examples/approvals_throughput.yaml | 19 + .../approvals_throughput_best_case.yaml | 18 + ...s_throughput_no_optimisations_enabled.yaml | 18 + .../subsystem-bench/src/approval/helpers.rs | 207 ++++ .../src/approval/message_generator.rs | 686 +++++++++++ .../src/approval/mock_chain_selection.rs | 66 + .../node/subsystem-bench/src/approval/mod.rs | 1059 +++++++++++++++++ .../src/approval/test_message.rs | 304 +++++ .../src/availability/av_store_helpers.rs | 16 +- .../subsystem-bench/src/availability/mod.rs | 26 +- polkadot/node/subsystem-bench/src/cli.rs | 4 + .../subsystem-bench/src/core/configuration.rs | 92 +- .../node/subsystem-bench/src/core/display.rs | 9 +- .../subsystem-bench/src/core/environment.rs | 33 +- .../node/subsystem-bench/src/core/keyring.rs | 8 +- .../src/core/mock/chain_api.rs | 48 +- .../node/subsystem-bench/src/core/mock/mod.rs | 18 +- .../src/core/mock/network_bridge.rs | 41 +- .../src/core/mock/runtime_api.rs | 114 +- polkadot/node/subsystem-bench/src/core/mod.rs | 2 + .../node/subsystem-bench/src/core/network.rs | 57 +- .../subsystem-bench/src/subsystem-bench.rs | 16 +- 29 files changed, 2857 insertions(+), 127 deletions(-) create mode 100644 polkadot/node/subsystem-bench/examples/approvals_no_shows.yaml create mode 100644 polkadot/node/subsystem-bench/examples/approvals_throughput.yaml create mode 100644 polkadot/node/subsystem-bench/examples/approvals_throughput_best_case.yaml create mode 100644 polkadot/node/subsystem-bench/examples/approvals_throughput_no_optimisations_enabled.yaml create mode 100644 polkadot/node/subsystem-bench/src/approval/helpers.rs create mode 100644 polkadot/node/subsystem-bench/src/approval/message_generator.rs create mode 100644 polkadot/node/subsystem-bench/src/approval/mock_chain_selection.rs create mode 100644 polkadot/node/subsystem-bench/src/approval/mod.rs create mode 100644 polkadot/node/subsystem-bench/src/approval/test_message.rs diff --git a/Cargo.lock b/Cargo.lock index 663d8ccdd393..540bcc327a3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13467,6 +13467,7 @@ version = "1.0.0" dependencies = [ "assert_matches", "async-trait", + "bincode", "bitvec", "clap 4.4.18", "clap-num", @@ -13475,16 +13476,19 @@ dependencies = [ "env_logger 0.9.3", "futures", "futures-timer", + "hex", "itertools 0.11.0", "kvdb-memorydb", "log", "orchestra", "parity-scale-codec", "paste", + "polkadot-approval-distribution", "polkadot-availability-bitfield-distribution", "polkadot-availability-distribution", "polkadot-availability-recovery", "polkadot-erasure-coding", + "polkadot-node-core-approval-voting", "polkadot-node-core-av-store", "polkadot-node-core-chain-api", "polkadot-node-metrics", @@ -13501,17 +13505,24 @@ dependencies = [ "pyroscope", "pyroscope_pprofrs", "rand", + "rand_chacha 0.3.1", + "rand_core 0.6.4", "rand_distr", "sc-keystore", "sc-network", "sc-service", + "schnorrkel 0.9.1", "serde", "serde_yaml", + "sha1", "sp-application-crypto", "sp-consensus", + "sp-consensus-babe", "sp-core", "sp-keyring", "sp-keystore", + "sp-runtime", + "sp-timestamp", "substrate-prometheus-endpoint", "tokio", "tracing-gum", @@ -17208,9 +17219,9 @@ dependencies = [ [[package]] name = "sha1" -version = "0.10.5" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", "cpufeatures", diff --git a/polkadot/node/core/approval-voting/src/criteria.rs b/polkadot/node/core/approval-voting/src/criteria.rs index 1af61e72d7af..1ebea2641b62 100644 --- a/polkadot/node/core/approval-voting/src/criteria.rs +++ b/polkadot/node/core/approval-voting/src/criteria.rs @@ -55,11 +55,11 @@ pub struct OurAssignment { } impl OurAssignment { - pub(crate) fn cert(&self) -> &AssignmentCertV2 { + pub fn cert(&self) -> &AssignmentCertV2 { &self.cert } - pub(crate) fn tranche(&self) -> DelayTranche { + pub fn tranche(&self) -> DelayTranche { self.tranche } @@ -225,7 +225,7 @@ fn assigned_core_transcript(core_index: CoreIndex) -> Transcript { /// Information about the world assignments are being produced in. #[derive(Clone, Debug)] -pub(crate) struct Config { +pub struct Config { /// The assignment public keys for validators. assignment_keys: Vec, /// The groups of validators assigned to each core. @@ -321,7 +321,7 @@ impl AssignmentCriteria for RealAssignmentCriteria { /// different times. The idea is that most assignments are never triggered and fall by the wayside. /// /// This will not assign to anything the local validator was part of the backing group for. -pub(crate) fn compute_assignments( +pub fn compute_assignments( keystore: &LocalKeystore, relay_vrf_story: RelayVRFStory, config: &Config, diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 3161d6186a1e..456ae319787b 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -92,11 +92,11 @@ use time::{slot_number_to_tick, Clock, ClockExt, DelayedApprovalTimer, SystemClo mod approval_checking; pub mod approval_db; mod backend; -mod criteria; +pub mod criteria; mod import; mod ops; mod persisted_entries; -mod time; +pub mod time; use crate::{ approval_checking::{Check, TranchesToApproveResult}, @@ -159,6 +159,7 @@ pub struct ApprovalVotingSubsystem { db: Arc, mode: Mode, metrics: Metrics, + clock: Box, } #[derive(Clone)] @@ -444,6 +445,25 @@ impl ApprovalVotingSubsystem { keystore: Arc, sync_oracle: Box, metrics: Metrics, + ) -> Self { + ApprovalVotingSubsystem::with_config_and_clock( + config, + db, + keystore, + sync_oracle, + metrics, + Box::new(SystemClock {}), + ) + } + + /// Create a new approval voting subsystem with the given keystore, config, and database. + pub fn with_config_and_clock( + config: Config, + db: Arc, + keystore: Arc, + sync_oracle: Box, + metrics: Metrics, + clock: Box, ) -> Self { ApprovalVotingSubsystem { keystore, @@ -452,6 +472,7 @@ impl ApprovalVotingSubsystem { db_config: DatabaseConfig { col_approval_data: config.col_approval_data }, mode: Mode::Syncing(sync_oracle), metrics, + clock, } } @@ -493,15 +514,10 @@ fn db_sanity_check(db: Arc, config: DatabaseConfig) -> SubsystemRe impl ApprovalVotingSubsystem { fn start(self, ctx: Context) -> SpawnedSubsystem { let backend = DbBackend::new(self.db.clone(), self.db_config); - let future = run::( - ctx, - self, - Box::new(SystemClock), - Box::new(RealAssignmentCriteria), - backend, - ) - .map_err(|e| SubsystemError::with_origin("approval-voting", e)) - .boxed(); + let future = + run::(ctx, self, Box::new(RealAssignmentCriteria), backend) + .map_err(|e| SubsystemError::with_origin("approval-voting", e)) + .boxed(); SpawnedSubsystem { name: "approval-voting-subsystem", future } } @@ -909,7 +925,6 @@ enum Action { async fn run( mut ctx: Context, mut subsystem: ApprovalVotingSubsystem, - clock: Box, assignment_criteria: Box, mut backend: B, ) -> SubsystemResult<()> @@ -923,7 +938,7 @@ where let mut state = State { keystore: subsystem.keystore, slot_duration_millis: subsystem.slot_duration_millis, - clock, + clock: subsystem.clock, assignment_criteria, spans: HashMap::new(), }; diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index 7a0bde6a55e2..9220e84a2554 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -549,7 +549,7 @@ fn test_harness>( let subsystem = run( context, - ApprovalVotingSubsystem::with_config( + ApprovalVotingSubsystem::with_config_and_clock( Config { col_approval_data: test_constants::TEST_CONFIG.col_approval_data, slot_duration_millis: SLOT_DURATION_MILLIS, @@ -558,8 +558,8 @@ fn test_harness>( Arc::new(keystore), sync_oracle, Metrics::default(), + clock.clone(), ), - clock.clone(), assignment_criteria, backend, ); diff --git a/polkadot/node/core/approval-voting/src/time.rs b/polkadot/node/core/approval-voting/src/time.rs index 61091f3c34cd..99dfbe07678f 100644 --- a/polkadot/node/core/approval-voting/src/time.rs +++ b/polkadot/node/core/approval-voting/src/time.rs @@ -33,14 +33,14 @@ use std::{ }; use polkadot_primitives::{Hash, ValidatorIndex}; -const TICK_DURATION_MILLIS: u64 = 500; +pub const TICK_DURATION_MILLIS: u64 = 500; /// A base unit of time, starting from the Unix epoch, split into half-second intervals. -pub(crate) type Tick = u64; +pub type Tick = u64; /// A clock which allows querying of the current tick as well as /// waiting for a tick to be reached. -pub(crate) trait Clock { +pub trait Clock { /// Yields the current tick. fn tick_now(&self) -> Tick; @@ -49,7 +49,7 @@ pub(crate) trait Clock { } /// Extension methods for clocks. -pub(crate) trait ClockExt { +pub trait ClockExt { fn tranche_now(&self, slot_duration_millis: u64, base_slot: Slot) -> DelayTranche; } @@ -61,7 +61,8 @@ impl ClockExt for C { } /// A clock which uses the actual underlying system clock. -pub(crate) struct SystemClock; +#[derive(Clone)] +pub struct SystemClock; impl Clock for SystemClock { /// Yields the current tick. @@ -93,11 +94,22 @@ fn tick_to_time(tick: Tick) -> SystemTime { } /// assumes `slot_duration_millis` evenly divided by tick duration. -pub(crate) fn slot_number_to_tick(slot_duration_millis: u64, slot: Slot) -> Tick { +pub fn slot_number_to_tick(slot_duration_millis: u64, slot: Slot) -> Tick { let ticks_per_slot = slot_duration_millis / TICK_DURATION_MILLIS; u64::from(slot) * ticks_per_slot } +/// Converts a tick to the slot number. +pub fn tick_to_slot_number(slot_duration_millis: u64, tick: Tick) -> Slot { + let ticks_per_slot = slot_duration_millis / TICK_DURATION_MILLIS; + (tick / ticks_per_slot).into() +} + +/// Converts a tranche from a slot to the tick number. +pub fn tranche_to_tick(slot_duration_millis: u64, slot: Slot, tranche: u32) -> Tick { + slot_number_to_tick(slot_duration_millis, slot) + tranche as u64 +} + /// A list of delayed futures that gets triggered when the waiting time has expired and it is /// time to sign the candidate. /// We have a timer per relay-chain block. diff --git a/polkadot/node/subsystem-bench/Cargo.toml b/polkadot/node/subsystem-bench/Cargo.toml index f7866f993631..136eccbf6854 100644 --- a/polkadot/node/subsystem-bench/Cargo.toml +++ b/polkadot/node/subsystem-bench/Cargo.toml @@ -38,6 +38,9 @@ sp-core = { path = "../../../substrate/primitives/core" } clap = { version = "4.4.18", features = ["derive"] } futures = "0.3.21" futures-timer = "3.0.2" +bincode = "1.3.3" +sha1 = "0.10.6" +hex = "0.4.3" gum = { package = "tracing-gum", path = "../gum" } polkadot-erasure-coding = { package = "polkadot-erasure-coding", path = "../../erasure-coding" } log = "0.4.17" @@ -64,6 +67,16 @@ prometheus_endpoint = { package = "substrate-prometheus-endpoint", path = "../.. prometheus = { version = "0.13.0", default-features = false } serde = "1.0.195" serde_yaml = "0.9" + +polkadot-node-core-approval-voting = { path = "../core/approval-voting" } +polkadot-approval-distribution = { path = "../network/approval-distribution" } +sp-consensus-babe = { path = "../../../substrate/primitives/consensus/babe" } +sp-runtime = { path = "../../../substrate/primitives/runtime", default-features = false } +sp-timestamp = { path = "../../../substrate/primitives/timestamp" } + +schnorrkel = { version = "0.9.1", default-features = false } +rand_core = "0.6.2" # should match schnorrkel +rand_chacha = { version = "0.3.1" } paste = "1.0.14" orchestra = { version = "0.3.5", default-features = false, features = ["futures_channel"] } pyroscope = "0.5.7" diff --git a/polkadot/node/subsystem-bench/examples/approvals_no_shows.yaml b/polkadot/node/subsystem-bench/examples/approvals_no_shows.yaml new file mode 100644 index 000000000000..758c7fbbf112 --- /dev/null +++ b/polkadot/node/subsystem-bench/examples/approvals_no_shows.yaml @@ -0,0 +1,18 @@ +TestConfiguration: +# Test 1 +- objective: !ApprovalVoting + last_considered_tranche: 89 + coalesce_mean: 3.0 + coalesce_std_dev: 1.0 + stop_when_approved: true + coalesce_tranche_diff: 12 + workdir_prefix: "/tmp/" + enable_assignments_v2: true + num_no_shows_per_candidate: 10 + n_validators: 500 + n_cores: 100 + min_pov_size: 1120 + max_pov_size: 5120 + peer_bandwidth: 524288000000 + bandwidth: 524288000000 + num_blocks: 10 diff --git a/polkadot/node/subsystem-bench/examples/approvals_throughput.yaml b/polkadot/node/subsystem-bench/examples/approvals_throughput.yaml new file mode 100644 index 000000000000..9eeeefc53a42 --- /dev/null +++ b/polkadot/node/subsystem-bench/examples/approvals_throughput.yaml @@ -0,0 +1,19 @@ +TestConfiguration: +# Test 1 +- objective: !ApprovalVoting + coalesce_mean: 3.0 + coalesce_std_dev: 1.0 + enable_assignments_v2: true + last_considered_tranche: 89 + stop_when_approved: false + coalesce_tranche_diff: 12 + workdir_prefix: "/tmp" + num_no_shows_per_candidate: 0 + n_validators: 500 + n_cores: 100 + n_included_candidates: 100 + min_pov_size: 1120 + max_pov_size: 5120 + peer_bandwidth: 524288000000 + bandwidth: 524288000000 + num_blocks: 10 diff --git a/polkadot/node/subsystem-bench/examples/approvals_throughput_best_case.yaml b/polkadot/node/subsystem-bench/examples/approvals_throughput_best_case.yaml new file mode 100644 index 000000000000..370bb31a5c4c --- /dev/null +++ b/polkadot/node/subsystem-bench/examples/approvals_throughput_best_case.yaml @@ -0,0 +1,18 @@ +TestConfiguration: +# Test 1 +- objective: !ApprovalVoting + coalesce_mean: 3.0 + coalesce_std_dev: 1.0 + enable_assignments_v2: true + last_considered_tranche: 89 + stop_when_approved: true + coalesce_tranche_diff: 12 + workdir_prefix: "/tmp/" + num_no_shows_per_candidate: 0 + n_validators: 500 + n_cores: 100 + min_pov_size: 1120 + max_pov_size: 5120 + peer_bandwidth: 524288000000 + bandwidth: 524288000000 + num_blocks: 10 diff --git a/polkadot/node/subsystem-bench/examples/approvals_throughput_no_optimisations_enabled.yaml b/polkadot/node/subsystem-bench/examples/approvals_throughput_no_optimisations_enabled.yaml new file mode 100644 index 000000000000..30b9ac8dc50f --- /dev/null +++ b/polkadot/node/subsystem-bench/examples/approvals_throughput_no_optimisations_enabled.yaml @@ -0,0 +1,18 @@ +TestConfiguration: +# Test 1 +- objective: !ApprovalVoting + coalesce_mean: 1.0 + coalesce_std_dev: 0.0 + enable_assignments_v2: false + last_considered_tranche: 89 + stop_when_approved: false + coalesce_tranche_diff: 12 + workdir_prefix: "/tmp/" + num_no_shows_per_candidate: 0 + n_validators: 500 + n_cores: 100 + min_pov_size: 1120 + max_pov_size: 5120 + peer_bandwidth: 524288000000 + bandwidth: 524288000000 + num_blocks: 10 diff --git a/polkadot/node/subsystem-bench/src/approval/helpers.rs b/polkadot/node/subsystem-bench/src/approval/helpers.rs new file mode 100644 index 000000000000..7cbe3ba75949 --- /dev/null +++ b/polkadot/node/subsystem-bench/src/approval/helpers.rs @@ -0,0 +1,207 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +use crate::core::configuration::TestAuthorities; +use itertools::Itertools; +use polkadot_node_core_approval_voting::time::{Clock, SystemClock, Tick}; +use polkadot_node_network_protocol::{ + grid_topology::{SessionGridTopology, TopologyPeerInfo}, + View, +}; +use polkadot_node_subsystem_types::messages::{ + network_bridge_event::NewGossipTopology, ApprovalDistributionMessage, NetworkBridgeEvent, +}; +use polkadot_overseer::AllMessages; +use polkadot_primitives::{ + BlockNumber, CandidateEvent, CandidateReceipt, CoreIndex, GroupIndex, Hash, Header, + Id as ParaId, Slot, ValidatorIndex, +}; +use polkadot_primitives_test_helpers::dummy_candidate_receipt_bad_sig; +use rand::{seq::SliceRandom, SeedableRng}; +use rand_chacha::ChaCha20Rng; +use sc_network::PeerId; +use sp_consensus_babe::{ + digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest}, + AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch, VrfSignature, VrfTranscript, +}; +use sp_core::crypto::VrfSecret; +use sp_keyring::sr25519::Keyring as Sr25519Keyring; +use sp_runtime::{Digest, DigestItem}; +use std::sync::{atomic::AtomicU64, Arc}; + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +/// A fake system clock used for driving the approval voting and make +/// it process blocks, assignments and approvals from the past. +#[derive(Clone)] +pub struct PastSystemClock { + /// The real system clock + real_system_clock: SystemClock, + /// The difference in ticks between the real system clock and the current clock. + delta_ticks: Arc, +} + +impl PastSystemClock { + /// Creates a new fake system clock with `delta_ticks` between the real time and the fake one. + pub fn new(real_system_clock: SystemClock, delta_ticks: Arc) -> Self { + PastSystemClock { real_system_clock, delta_ticks } + } +} + +impl Clock for PastSystemClock { + fn tick_now(&self) -> Tick { + self.real_system_clock.tick_now() - + self.delta_ticks.load(std::sync::atomic::Ordering::SeqCst) + } + + fn wait( + &self, + tick: Tick, + ) -> std::pin::Pin + Send + 'static>> { + self.real_system_clock + .wait(tick + self.delta_ticks.load(std::sync::atomic::Ordering::SeqCst)) + } +} + +/// Helper function to generate a babe epoch for this benchmark. +/// It does not change for the duration of the test. +pub fn generate_babe_epoch(current_slot: Slot, authorities: TestAuthorities) -> BabeEpoch { + let authorities = authorities + .validator_babe_id + .into_iter() + .enumerate() + .map(|(index, public)| (public, index as u64)) + .collect_vec(); + BabeEpoch { + epoch_index: 1, + start_slot: current_slot.saturating_sub(1u64), + duration: 200, + authorities, + randomness: [0xde; 32], + config: BabeEpochConfiguration { c: (1, 4), allowed_slots: AllowedSlots::PrimarySlots }, + } +} + +/// Generates a topology to be used for this benchmark. +pub fn generate_topology(test_authorities: &TestAuthorities) -> SessionGridTopology { + let keyrings = test_authorities + .validator_authority_id + .clone() + .into_iter() + .zip(test_authorities.peer_ids.clone()) + .collect_vec(); + + let topology = keyrings + .clone() + .into_iter() + .enumerate() + .map(|(index, (discovery_id, peer_id))| TopologyPeerInfo { + peer_ids: vec![peer_id], + validator_index: ValidatorIndex(index as u32), + discovery_id, + }) + .collect_vec(); + let shuffled = (0..keyrings.len()).collect_vec(); + + SessionGridTopology::new(shuffled, topology) +} + +/// Generates new session topology message. +pub fn generate_new_session_topology( + test_authorities: &TestAuthorities, + test_node: ValidatorIndex, +) -> Vec { + let topology = generate_topology(test_authorities); + + let event = NetworkBridgeEvent::NewGossipTopology(NewGossipTopology { + session: 1, + topology, + local_index: Some(test_node), + }); + vec![AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(event))] +} + +/// Generates a peer view change for the passed `block_hash` +pub fn generate_peer_view_change_for(block_hash: Hash, peer_id: PeerId) -> AllMessages { + let network = NetworkBridgeEvent::PeerViewChange(peer_id, View::new([block_hash], 0)); + + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(network)) +} + +/// Helper function to create a a signature for the block header. +fn garbage_vrf_signature() -> VrfSignature { + let transcript = VrfTranscript::new(b"test-garbage", &[]); + Sr25519Keyring::Alice.pair().vrf_sign(&transcript.into()) +} + +/// Helper function to create a block header. +pub fn make_header(parent_hash: Hash, slot: Slot, number: u32) -> Header { + let digest = + { + let mut digest = Digest::default(); + let vrf_signature = garbage_vrf_signature(); + digest.push(DigestItem::babe_pre_digest(PreDigest::SecondaryVRF( + SecondaryVRFPreDigest { authority_index: 0, slot, vrf_signature }, + ))); + digest + }; + + Header { + digest, + extrinsics_root: Default::default(), + number, + state_root: Default::default(), + parent_hash, + } +} + +/// Helper function to create a candidate receipt. +fn make_candidate(para_id: ParaId, hash: &Hash) -> CandidateReceipt { + let mut r = dummy_candidate_receipt_bad_sig(*hash, Some(Default::default())); + r.descriptor.para_id = para_id; + r +} + +/// Helper function to create a list of candidates that are included in the block +pub fn make_candidates( + block_hash: Hash, + block_number: BlockNumber, + num_cores: u32, + num_candidates: u32, +) -> Vec { + let seed = [block_number as u8; 32]; + let mut rand_chacha = ChaCha20Rng::from_seed(seed); + let mut candidates = (0..num_cores) + .map(|core| { + CandidateEvent::CandidateIncluded( + make_candidate(ParaId::from(core), &block_hash), + Vec::new().into(), + CoreIndex(core), + GroupIndex(core), + ) + }) + .collect_vec(); + let (candidates, _) = candidates.partial_shuffle(&mut rand_chacha, num_candidates as usize); + candidates + .iter_mut() + .map(|val| val.clone()) + .sorted_by(|a, b| match (a, b) { + ( + CandidateEvent::CandidateIncluded(_, _, core_a, _), + CandidateEvent::CandidateIncluded(_, _, core_b, _), + ) => core_a.0.cmp(&core_b.0), + (_, _) => todo!("Should not happen"), + }) + .collect_vec() +} diff --git a/polkadot/node/subsystem-bench/src/approval/message_generator.rs b/polkadot/node/subsystem-bench/src/approval/message_generator.rs new file mode 100644 index 000000000000..4318dcdf8902 --- /dev/null +++ b/polkadot/node/subsystem-bench/src/approval/message_generator.rs @@ -0,0 +1,686 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use std::{ + cmp::max, + collections::{BTreeMap, HashSet}, + fs, + io::Write, + path::{Path, PathBuf}, + time::Duration, +}; + +use futures::SinkExt; +use itertools::Itertools; +use parity_scale_codec::Encode; +use polkadot_node_core_approval_voting::{ + criteria::{compute_assignments, Config}, + time::tranche_to_tick, +}; +use polkadot_node_network_protocol::grid_topology::{ + GridNeighbors, RandomRouting, RequiredRouting, SessionGridTopology, +}; +use polkadot_node_primitives::approval::{ + self, + v2::{CoreBitfield, IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2}, +}; +use polkadot_primitives::{ + vstaging::ApprovalVoteMultipleCandidates, CandidateEvent, CandidateHash, CandidateIndex, + CoreIndex, SessionInfo, Slot, ValidatorId, ValidatorIndex, ASSIGNMENT_KEY_TYPE_ID, +}; +use rand::{seq::SliceRandom, RngCore, SeedableRng}; +use rand_chacha::ChaCha20Rng; +use rand_distr::{Distribution, Normal}; +use sc_keystore::LocalKeystore; +use sc_network::PeerId; +use sha1::Digest; +use sp_application_crypto::AppCrypto; +use sp_consensus_babe::SlotDuration; +use sp_keystore::Keystore; +use sp_timestamp::Timestamp; + +use super::{ + test_message::{MessagesBundle, TestMessageInfo}, + ApprovalTestState, ApprovalsOptions, BlockTestData, +}; +use crate::{ + approval::{ + helpers::{generate_babe_epoch, generate_topology}, + GeneratedState, BUFFER_FOR_GENERATION_MILLIS, LOG_TARGET, SLOT_DURATION_MILLIS, + }, + core::{ + configuration::{TestAuthorities, TestConfiguration, TestObjective}, + mock::session_info_for_peers, + NODE_UNDER_TEST, + }, +}; +use polkadot_node_network_protocol::v3 as protocol_v3; +use polkadot_primitives::Hash; +use sc_service::SpawnTaskHandle; +/// A generator of messages coming from a given Peer/Validator +pub struct PeerMessagesGenerator { + /// The grid neighbors of the node under test. + pub topology_node_under_test: GridNeighbors, + /// The topology of the network for the epoch under test. + pub topology: SessionGridTopology, + /// The validator index for this object generates the messages. + pub validator_index: ValidatorIndex, + /// An array of pre-generated random samplings, that is used to determine, which nodes would + /// send a given assignment, to the node under test because of the random samplings. + /// As an optimization we generate this sampling at the begining of the test and just pick + /// one randomly, because always taking the samples would be too expensive for benchamrk. + pub random_samplings: Vec>, + /// Channel for sending the generated messages to the aggregator + pub tx_messages: futures::channel::mpsc::UnboundedSender<(Hash, Vec)>, + /// The list of test authorities + pub test_authorities: TestAuthorities, + //// The session info used for the test. + pub session_info: SessionInfo, + /// The blocks used for testing + pub blocks: Vec, + /// Approval options params. + pub options: ApprovalsOptions, +} + +impl PeerMessagesGenerator { + /// Generates messages by spawning a blocking task in the background which begins creating + /// the assignments/approvals and peer view changes at the begining of each block. + pub fn generate_messages(mut self, spawn_task_handle: &SpawnTaskHandle) { + spawn_task_handle.spawn("generate-messages", "generate-messages", async move { + for block_info in &self.blocks { + let assignments = self.generate_assignments(block_info); + + let bytes = self.validator_index.0.to_be_bytes(); + let seed = [ + bytes[0], bytes[1], bytes[2], bytes[3], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ]; + + let mut rand_chacha = ChaCha20Rng::from_seed(seed); + let approvals = issue_approvals( + assignments, + block_info.hash, + &self.test_authorities.validator_public, + block_info.candidates.clone(), + &self.options, + &mut rand_chacha, + self.test_authorities.keyring.keystore_ref(), + ); + + self.tx_messages + .send((block_info.hash, approvals)) + .await + .expect("Should not fail"); + } + }) + } + + // Builds the messages finger print corresponding to this configuration. + // When the finger print exists already on disk the messages are not re-generated. + fn messages_fingerprint( + configuration: &TestConfiguration, + options: &ApprovalsOptions, + ) -> String { + let mut fingerprint = options.fingerprint(); + let mut exclude_objective = configuration.clone(); + // The objective contains the full content of `ApprovalOptions`, we don't want to put all of + // that in fingerprint, so execlute it because we add it manually see above. + exclude_objective.objective = TestObjective::Unimplemented; + let configuration_bytes = bincode::serialize(&exclude_objective).unwrap(); + fingerprint.extend(configuration_bytes); + let mut sha1 = sha1::Sha1::new(); + sha1.update(fingerprint); + let result = sha1.finalize(); + hex::encode(result) + } + + /// Generate all messages(Assignments & Approvals) needed for approving `blocks``. + pub fn generate_messages_if_needed( + configuration: &TestConfiguration, + test_authorities: &TestAuthorities, + options: &ApprovalsOptions, + spawn_task_handle: &SpawnTaskHandle, + ) -> PathBuf { + let path_name = format!( + "{}/{}", + options.workdir_prefix, + Self::messages_fingerprint(configuration, options) + ); + + let path = Path::new(&path_name); + if path.exists() { + return path.to_path_buf(); + } + + gum::info!("Generate message because file does not exist"); + let delta_to_first_slot_under_test = Timestamp::new(BUFFER_FOR_GENERATION_MILLIS); + let initial_slot = Slot::from_timestamp( + (*Timestamp::current() - *delta_to_first_slot_under_test).into(), + SlotDuration::from_millis(SLOT_DURATION_MILLIS), + ); + + let babe_epoch = generate_babe_epoch(initial_slot, test_authorities.clone()); + let session_info = session_info_for_peers(configuration, test_authorities); + let blocks = ApprovalTestState::generate_blocks_information( + configuration, + &babe_epoch, + initial_slot, + ); + + gum::info!(target: LOG_TARGET, "Generate messages"); + let topology = generate_topology(test_authorities); + + let random_samplings = random_samplings_to_node( + ValidatorIndex(NODE_UNDER_TEST), + test_authorities.validator_public.len(), + test_authorities.validator_public.len() * 2, + ); + + let topology_node_under_test = + topology.compute_grid_neighbors_for(ValidatorIndex(NODE_UNDER_TEST)).unwrap(); + + let (tx, mut rx) = futures::channel::mpsc::unbounded(); + + // Spawn a thread to generate the messages for each validator, so that we speed up the + // generation. + for current_validator_index in 1..test_authorities.validator_public.len() { + let peer_message_source = PeerMessagesGenerator { + topology_node_under_test: topology_node_under_test.clone(), + topology: topology.clone(), + validator_index: ValidatorIndex(current_validator_index as u32), + test_authorities: test_authorities.clone(), + session_info: session_info.clone(), + blocks: blocks.clone(), + tx_messages: tx.clone(), + random_samplings: random_samplings.clone(), + options: options.clone(), + }; + + peer_message_source.generate_messages(spawn_task_handle); + } + + std::mem::drop(tx); + + let seed = [0x32; 32]; + let mut rand_chacha = ChaCha20Rng::from_seed(seed); + + let mut all_messages: BTreeMap> = BTreeMap::new(); + // Receive all messages and sort them by Tick they have to be sent. + loop { + match rx.try_next() { + Ok(Some((block_hash, messages))) => + for message in messages { + let block_info = blocks + .iter() + .find(|val| val.hash == block_hash) + .expect("Should find blocks"); + let tick_to_send = tranche_to_tick( + SLOT_DURATION_MILLIS, + block_info.slot, + message.tranche_to_send(), + ); + let to_add = all_messages.entry(tick_to_send).or_default(); + to_add.push(message); + }, + Ok(None) => break, + Err(_) => { + std::thread::sleep(Duration::from_millis(50)); + }, + } + } + let all_messages = all_messages + .into_iter() + .flat_map(|(_, mut messages)| { + // Shuffle the messages inside the same tick, so that we don't priorites messages + // for older nodes. we try to simulate the same behaviour as in real world. + messages.shuffle(&mut rand_chacha); + messages + }) + .collect_vec(); + + gum::info!("Generated a number of {:} unique messages", all_messages.len()); + + let generated_state = GeneratedState { all_messages: Some(all_messages), initial_slot }; + + let mut messages_file = fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .unwrap(); + + messages_file + .write_all(&generated_state.encode()) + .expect("Could not update message file"); + path.to_path_buf() + } + + /// Generates assignments for the given `current_validator_index` + /// Returns a list of assignments to be sent sorted by tranche. + fn generate_assignments(&self, block_info: &BlockTestData) -> Vec { + let config = Config::from(&self.session_info); + + let leaving_cores = block_info + .candidates + .clone() + .into_iter() + .map(|candidate_event| { + if let CandidateEvent::CandidateIncluded(candidate, _, core_index, group_index) = + candidate_event + { + (candidate.hash(), core_index, group_index) + } else { + todo!("Variant is never created in this benchmark") + } + }) + .collect_vec(); + + let mut assignments_by_tranche = BTreeMap::new(); + + let bytes = self.validator_index.0.to_be_bytes(); + let seed = [ + bytes[0], bytes[1], bytes[2], bytes[3], 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ]; + let mut rand_chacha = ChaCha20Rng::from_seed(seed); + + let to_be_sent_by = neighbours_that_would_sent_message( + &self.test_authorities.peer_ids, + self.validator_index.0, + &self.topology_node_under_test, + &self.topology, + ); + + let leaving_cores = leaving_cores + .clone() + .into_iter() + .filter(|(_, core_index, _group_index)| core_index.0 != self.validator_index.0) + .collect_vec(); + + let store = LocalKeystore::in_memory(); + let _public = store + .sr25519_generate_new( + ASSIGNMENT_KEY_TYPE_ID, + Some(self.test_authorities.key_seeds[self.validator_index.0 as usize].as_str()), + ) + .expect("should not fail"); + let assignments = compute_assignments( + &store, + block_info.relay_vrf_story.clone(), + &config, + leaving_cores.clone(), + self.options.enable_assignments_v2, + ); + + let random_sending_nodes = self + .random_samplings + .get(rand_chacha.next_u32() as usize % self.random_samplings.len()) + .unwrap(); + let random_sending_peer_ids = random_sending_nodes + .iter() + .map(|validator| (*validator, self.test_authorities.peer_ids[validator.0 as usize])) + .collect_vec(); + + let mut unique_assignments = HashSet::new(); + for (core_index, assignment) in assignments { + let assigned_cores = match &assignment.cert().kind { + approval::v2::AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } => + core_bitfield.iter_ones().map(|val| CoreIndex::from(val as u32)).collect_vec(), + approval::v2::AssignmentCertKindV2::RelayVRFDelay { core_index } => + vec![*core_index], + approval::v2::AssignmentCertKindV2::RelayVRFModulo { sample: _ } => + vec![core_index], + }; + + let bitfiled: CoreBitfield = assigned_cores.clone().try_into().unwrap(); + + // For the cases where tranch0 assignments are in a single certificate we need to make + // sure we create a single message. + if unique_assignments.insert(bitfiled) { + let this_tranche_assignments = + assignments_by_tranche.entry(assignment.tranche()).or_insert_with(Vec::new); + + this_tranche_assignments.push(( + IndirectAssignmentCertV2 { + block_hash: block_info.hash, + validator: self.validator_index, + cert: assignment.cert().clone(), + }, + block_info + .candidates + .iter() + .enumerate() + .filter(|(_index, candidate)| { + if let CandidateEvent::CandidateIncluded(_, _, core, _) = candidate { + assigned_cores.contains(core) + } else { + panic!("Should not happen"); + } + }) + .map(|(index, _)| index as u32) + .collect_vec() + .try_into() + .unwrap(), + to_be_sent_by + .iter() + .chain(random_sending_peer_ids.iter()) + .copied() + .collect::>(), + assignment.tranche(), + )); + } + } + + assignments_by_tranche + .into_values() + .flat_map(|assignments| assignments.into_iter()) + .map(|assignment| { + let msg = protocol_v3::ApprovalDistributionMessage::Assignments(vec![( + assignment.0, + assignment.1, + )]); + TestMessageInfo { + msg, + sent_by: assignment + .2 + .into_iter() + .map(|(validator_index, _)| validator_index) + .collect_vec(), + tranche: assignment.3, + block_hash: block_info.hash, + } + }) + .collect_vec() + } +} + +/// A list of random samplings that we use to determine which nodes should send a given message to +/// the node under test. +/// We can not sample every time for all the messages because that would be too expensive to +/// perform, so pre-generate a list of samples for a given network size. +/// - result[i] give us as a list of random nodes that would send a given message to the node under +/// test. +fn random_samplings_to_node( + node_under_test: ValidatorIndex, + num_validators: usize, + num_samplings: usize, +) -> Vec> { + let seed = [7u8; 32]; + let mut rand_chacha = ChaCha20Rng::from_seed(seed); + + (0..num_samplings) + .map(|_| { + (0..num_validators) + .filter(|sending_validator_index| { + *sending_validator_index != NODE_UNDER_TEST as usize + }) + .flat_map(|sending_validator_index| { + let mut validators = (0..num_validators).collect_vec(); + validators.shuffle(&mut rand_chacha); + + let mut random_routing = RandomRouting::default(); + validators + .into_iter() + .flat_map(|validator_to_send| { + if random_routing.sample(num_validators, &mut rand_chacha) { + random_routing.inc_sent(); + if validator_to_send == node_under_test.0 as usize { + Some(ValidatorIndex(sending_validator_index as u32)) + } else { + None + } + } else { + None + } + }) + .collect_vec() + }) + .collect_vec() + }) + .collect_vec() +} + +/// Helper function to randomly determine how many approvals we coalesce together in a single +/// message. +fn coalesce_approvals_len( + coalesce_mean: f32, + coalesce_std_dev: f32, + rand_chacha: &mut ChaCha20Rng, +) -> usize { + max( + 1, + Normal::new(coalesce_mean, coalesce_std_dev) + .expect("normal distribution parameters are good") + .sample(rand_chacha) + .round() as i32, + ) as usize +} + +/// Helper function to create approvals signatures for all assignments passed as arguments. +/// Returns a list of Approvals messages that need to be sent. +fn issue_approvals( + assignments: Vec, + block_hash: Hash, + validator_ids: &[ValidatorId], + candidates: Vec, + options: &ApprovalsOptions, + rand_chacha: &mut ChaCha20Rng, + store: &LocalKeystore, +) -> Vec { + let mut queued_to_sign: Vec = Vec::new(); + let mut num_coalesce = + coalesce_approvals_len(options.coalesce_mean, options.coalesce_std_dev, rand_chacha); + let result = assignments + .iter() + .enumerate() + .map(|(_index, message)| match &message.msg { + protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => { + let mut approvals_to_create = Vec::new(); + + let current_validator_index = queued_to_sign + .first() + .map(|msg| msg.validator_index) + .unwrap_or(ValidatorIndex(99999)); + + // Invariant for this benchmark. + assert_eq!(assignments.len(), 1); + + let assignment = assignments.first().unwrap(); + + let earliest_tranche = queued_to_sign + .first() + .map(|val| val.assignment.tranche) + .unwrap_or(message.tranche); + + if queued_to_sign.len() >= num_coalesce || + (!queued_to_sign.is_empty() && + current_validator_index != assignment.0.validator) || + message.tranche - earliest_tranche >= options.coalesce_tranche_diff + { + approvals_to_create.push(TestSignInfo::sign_candidates( + &mut queued_to_sign, + validator_ids, + block_hash, + num_coalesce, + store, + )); + num_coalesce = coalesce_approvals_len( + options.coalesce_mean, + options.coalesce_std_dev, + rand_chacha, + ); + } + + // If more that one candidate was in the assignment queue all of them for issuing + // approvals + for candidate_index in assignment.1.iter_ones() { + let candidate = candidates.get(candidate_index).unwrap(); + if let CandidateEvent::CandidateIncluded(candidate, _, _, _) = candidate { + queued_to_sign.push(TestSignInfo { + candidate_hash: candidate.hash(), + candidate_index: candidate_index as CandidateIndex, + validator_index: assignment.0.validator, + assignment: message.clone(), + }); + } else { + todo!("Other enum variants are not used in this benchmark"); + } + } + approvals_to_create + }, + _ => { + todo!("Other enum variants are not used in this benchmark"); + }, + }) + .collect_vec(); + + let mut messages = result.into_iter().flatten().collect_vec(); + + if !queued_to_sign.is_empty() { + messages.push(TestSignInfo::sign_candidates( + &mut queued_to_sign, + validator_ids, + block_hash, + num_coalesce, + store, + )); + } + messages +} + +/// Helper struct to gather information about more than one candidate an sign it in a single +/// approval message. +struct TestSignInfo { + /// The candidate hash + candidate_hash: CandidateHash, + /// The candidate index + candidate_index: CandidateIndex, + /// The validator sending the assignments + validator_index: ValidatorIndex, + /// The assignments convering this candidate + assignment: TestMessageInfo, +} + +impl TestSignInfo { + /// Helper function to create a signture for all candidates in `to_sign` parameter. + /// Returns a TestMessage + fn sign_candidates( + to_sign: &mut Vec, + validator_ids: &[ValidatorId], + block_hash: Hash, + num_coalesce: usize, + store: &LocalKeystore, + ) -> MessagesBundle { + let current_validator_index = to_sign.first().map(|val| val.validator_index).unwrap(); + let tranche_approval_can_be_sent = + to_sign.iter().map(|val| val.assignment.tranche).max().unwrap(); + let validator_id = validator_ids.get(current_validator_index.0 as usize).unwrap().clone(); + + let unique_assignments: HashSet = + to_sign.iter().map(|info| info.assignment.clone()).collect(); + + let mut to_sign = to_sign + .drain(..) + .sorted_by(|val1, val2| val1.candidate_index.cmp(&val2.candidate_index)) + .peekable(); + + let mut bundle = MessagesBundle { + assignments: unique_assignments.into_iter().collect_vec(), + approvals: Vec::new(), + }; + + while to_sign.peek().is_some() { + let to_sign = to_sign.by_ref().take(num_coalesce).collect_vec(); + + let hashes = to_sign.iter().map(|val| val.candidate_hash).collect_vec(); + let candidate_indices = to_sign.iter().map(|val| val.candidate_index).collect_vec(); + + let sent_by = to_sign + .iter() + .flat_map(|val| val.assignment.sent_by.iter()) + .copied() + .collect::>(); + + let payload = ApprovalVoteMultipleCandidates(&hashes).signing_payload(1); + + let signature = store + .sr25519_sign(ValidatorId::ID, &validator_id.clone().into(), &payload[..]) + .unwrap() + .unwrap() + .into(); + let indirect = IndirectSignedApprovalVoteV2 { + block_hash, + candidate_indices: candidate_indices.try_into().unwrap(), + validator: current_validator_index, + signature, + }; + let msg = protocol_v3::ApprovalDistributionMessage::Approvals(vec![indirect]); + + bundle.approvals.push(TestMessageInfo { + msg, + sent_by: sent_by.into_iter().collect_vec(), + tranche: tranche_approval_can_be_sent, + block_hash, + }); + } + bundle + } +} + +/// Determine what neighbours would send a given message to the node under test. +fn neighbours_that_would_sent_message( + peer_ids: &[PeerId], + current_validator_index: u32, + topology_node_under_test: &GridNeighbors, + topology: &SessionGridTopology, +) -> Vec<(ValidatorIndex, PeerId)> { + let topology_originator = topology + .compute_grid_neighbors_for(ValidatorIndex(current_validator_index)) + .unwrap(); + + let originator_y = topology_originator.validator_indices_y.iter().find(|validator| { + topology_node_under_test.required_routing_by_index(**validator, false) == + RequiredRouting::GridY + }); + + assert!(originator_y != Some(&ValidatorIndex(NODE_UNDER_TEST))); + + let originator_x = topology_originator.validator_indices_x.iter().find(|validator| { + topology_node_under_test.required_routing_by_index(**validator, false) == + RequiredRouting::GridX + }); + + assert!(originator_x != Some(&ValidatorIndex(NODE_UNDER_TEST))); + + let is_neighbour = topology_originator + .validator_indices_x + .contains(&ValidatorIndex(NODE_UNDER_TEST)) || + topology_originator + .validator_indices_y + .contains(&ValidatorIndex(NODE_UNDER_TEST)); + + let mut to_be_sent_by = originator_y + .into_iter() + .chain(originator_x) + .map(|val| (*val, peer_ids[val.0 as usize])) + .collect_vec(); + + if is_neighbour { + to_be_sent_by.push((ValidatorIndex(current_validator_index), peer_ids[0])); + } + + to_be_sent_by +} diff --git a/polkadot/node/subsystem-bench/src/approval/mock_chain_selection.rs b/polkadot/node/subsystem-bench/src/approval/mock_chain_selection.rs new file mode 100644 index 000000000000..ab23a8ced484 --- /dev/null +++ b/polkadot/node/subsystem-bench/src/approval/mock_chain_selection.rs @@ -0,0 +1,66 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::approval::{LOG_TARGET, SLOT_DURATION_MILLIS}; + +use super::{ApprovalTestState, PastSystemClock}; +use futures::FutureExt; +use polkadot_node_core_approval_voting::time::{slot_number_to_tick, Clock, TICK_DURATION_MILLIS}; +use polkadot_node_subsystem::{overseer, SpawnedSubsystem, SubsystemError}; +use polkadot_node_subsystem_types::messages::ChainSelectionMessage; + +/// Mock ChainSelection subsystem used to answer request made by the approval-voting subsystem, +/// during benchmark. All the necessary information to answer the requests is stored in the `state` +pub struct MockChainSelection { + pub state: ApprovalTestState, + pub clock: PastSystemClock, +} + +#[overseer::subsystem(ChainSelection, error=SubsystemError, prefix=self::overseer)] +impl MockChainSelection { + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = self.run(ctx).map(|_| Ok(())).boxed(); + + SpawnedSubsystem { name: "mock-chain-subsystem", future } + } +} + +#[overseer::contextbounds(ChainSelection, prefix = self::overseer)] +impl MockChainSelection { + async fn run(self, mut ctx: Context) { + loop { + let msg = ctx.recv().await.expect("Should not fail"); + match msg { + orchestra::FromOrchestra::Signal(_) => {}, + orchestra::FromOrchestra::Communication { msg } => + if let ChainSelectionMessage::Approved(hash) = msg { + let block_info = self.state.get_info_by_hash(hash); + let approved_number = block_info.block_number; + + block_info.approved.store(true, std::sync::atomic::Ordering::SeqCst); + self.state + .last_approved_block + .store(approved_number, std::sync::atomic::Ordering::SeqCst); + + let approved_in_tick = self.clock.tick_now() - + slot_number_to_tick(SLOT_DURATION_MILLIS, block_info.slot); + + gum::info!(target: LOG_TARGET, ?hash, "Chain selection approved after {:} ms", approved_in_tick * TICK_DURATION_MILLIS); + }, + } + } + } +} diff --git a/polkadot/node/subsystem-bench/src/approval/mod.rs b/polkadot/node/subsystem-bench/src/approval/mod.rs new file mode 100644 index 000000000000..3544ce74711e --- /dev/null +++ b/polkadot/node/subsystem-bench/src/approval/mod.rs @@ -0,0 +1,1059 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use self::{ + helpers::{make_candidates, make_header}, + test_message::{MessagesBundle, TestMessageInfo}, +}; +use crate::{ + approval::{ + helpers::{ + generate_babe_epoch, generate_new_session_topology, generate_peer_view_change_for, + PastSystemClock, + }, + message_generator::PeerMessagesGenerator, + mock_chain_selection::MockChainSelection, + }, + core::{ + configuration::{TestAuthorities, TestConfiguration}, + environment::{TestEnvironment, TestEnvironmentDependencies, MAX_TIME_OF_FLIGHT}, + mock::{ + dummy_builder, + network_bridge::{MockNetworkBridgeRx, MockNetworkBridgeTx}, + AlwaysSupportsParachains, ChainApiState, MockChainApi, MockRuntimeApi, TestSyncOracle, + }, + network::{ + new_network, HandleNetworkMessage, NetworkEmulatorHandle, NetworkInterface, + NetworkInterfaceReceiver, + }, + NODE_UNDER_TEST, + }, +}; +use colored::Colorize; +use futures::channel::oneshot; +use itertools::Itertools; +use orchestra::TimeoutExt; +use overseer::{metrics::Metrics as OverseerMetrics, MetricsTrait}; +use parity_scale_codec::{Decode, Encode}; +use polkadot_approval_distribution::ApprovalDistribution; +use polkadot_node_core_approval_voting::{ + time::{slot_number_to_tick, tick_to_slot_number, Clock, ClockExt, SystemClock}, + ApprovalVotingSubsystem, Config as ApprovalVotingConfig, Metrics as ApprovalVotingMetrics, +}; +use polkadot_node_network_protocol::v3 as protocol_v3; +use polkadot_node_primitives::approval::{self, v1::RelayVRFStory}; +use polkadot_node_subsystem::{overseer, AllMessages, Overseer, OverseerConnector, SpawnGlue}; +use polkadot_node_subsystem_test_helpers::mock::new_block_import_info; +use polkadot_node_subsystem_types::messages::{ApprovalDistributionMessage, ApprovalVotingMessage}; +use polkadot_node_subsystem_util::metrics::Metrics; +use polkadot_overseer::Handle as OverseerHandleReal; +use polkadot_primitives::{ + BlockNumber, CandidateEvent, CandidateIndex, CandidateReceipt, Hash, Header, Slot, + ValidatorIndex, +}; +use prometheus::Registry; +use sc_keystore::LocalKeystore; +use sc_service::SpawnTaskHandle; +use serde::{Deserialize, Serialize}; +use sp_consensus_babe::Epoch as BabeEpoch; +use sp_core::H256; +use std::{ + cmp::max, + collections::{HashMap, HashSet}, + fs, + io::Read, + ops::Sub, + sync::{ + atomic::{AtomicBool, AtomicU32, AtomicU64}, + Arc, + }, + time::{Duration, Instant}, +}; +use tokio::time::sleep; + +mod helpers; +mod message_generator; +mod mock_chain_selection; +mod test_message; + +pub const LOG_TARGET: &str = "subsystem-bench::approval"; +const DATA_COL: u32 = 0; +pub(crate) const NUM_COLUMNS: u32 = 1; +pub(crate) const SLOT_DURATION_MILLIS: u64 = 6000; +pub(crate) const TEST_CONFIG: ApprovalVotingConfig = ApprovalVotingConfig { + col_approval_data: DATA_COL, + slot_duration_millis: SLOT_DURATION_MILLIS, +}; + +/// Start generating messages for a slot into the future, so that the +/// generation nevers falls behind the current slot. +const BUFFER_FOR_GENERATION_MILLIS: u64 = 30_000; + +/// Parameters specific to the approvals benchmark +#[derive(Debug, Clone, Serialize, Deserialize, clap::Parser)] +#[clap(rename_all = "kebab-case")] +#[allow(missing_docs)] +pub struct ApprovalsOptions { + #[clap(short, long, default_value_t = 89)] + /// The last considered tranche for which we send the message. + pub last_considered_tranche: u32, + #[clap(short, long, default_value_t = 1.0)] + /// Min candidates to be signed in a single approval. + pub coalesce_mean: f32, + #[clap(short, long, default_value_t = 1.0)] + /// Max candidate to be signed in a single approval. + pub coalesce_std_dev: f32, + /// The maximum tranche diff between approvals coalesced toghther. + pub coalesce_tranche_diff: u32, + #[clap(short, long, default_value_t = false)] + /// Enable assignments v2. + pub enable_assignments_v2: bool, + #[clap(short, long, default_value_t = true)] + /// Sends messages only till block is approved. + pub stop_when_approved: bool, + #[clap(short, long)] + /// Work directory. + #[clap(short, long, default_value_t = format!("/tmp"))] + pub workdir_prefix: String, + /// The number of no shows per candidate + #[clap(short, long, default_value_t = 0)] + pub num_no_shows_per_candidate: u32, +} + +impl ApprovalsOptions { + // Generates a fingerprint use to determine if messages need to be re-generated. + fn fingerprint(&self) -> Vec { + let mut bytes = Vec::new(); + bytes.extend(self.coalesce_mean.to_be_bytes()); + bytes.extend(self.coalesce_std_dev.to_be_bytes()); + bytes.extend(self.coalesce_tranche_diff.to_be_bytes()); + bytes.extend((self.enable_assignments_v2 as i32).to_be_bytes()); + bytes + } +} + +/// Information about a block. It is part of test state and it is used by the mock +/// subsystems to be able to answer the calls approval-voting and approval-distribution +/// do into the outside world. +#[derive(Clone, Debug)] +struct BlockTestData { + /// The slot this block occupies, see implementer's guide to understand what a slot + /// is in the context of polkadot. + slot: Slot, + /// The hash of the block. + hash: Hash, + /// The block number. + block_number: BlockNumber, + /// The list of candidates included in this block. + candidates: Vec, + /// The block header. + header: Header, + /// The vrf story for the given block. + relay_vrf_story: RelayVRFStory, + /// If the block has been approved by the approval-voting subsystem. + /// This set on `true` when ChainSelectionMessage::Approved is received inside the chain + /// selection mock subsystem. + approved: Arc, + /// The total number of candidates before this block. + total_candidates_before: u64, + /// The votes we sent. + /// votes[validator_index][candidate_index] tells if validator sent vote for candidate. + /// We use this to mark the test as succesfull if GetApprovalSignatures returns all the votes + /// from here. + votes: Arc>>, +} + +/// Candidate information used during the test to decide if more messages are needed. +#[derive(Debug)] +struct CandidateTestData { + /// The configured maximum number of no-shows for this candidate. + max_no_shows: u32, + /// The last tranche where we had a no-show. + last_tranche_with_no_show: u32, + /// The number of sent assignments. + sent_assignment: u32, + /// The number of no-shows. + num_no_shows: u32, + /// The maximum tranche were we covered the needed approvals + max_tranche: u32, + /// Minimum needed votes to approve candidate. + needed_approvals: u32, +} + +impl CandidateTestData { + /// If message in this tranche needs to be sent. + fn should_send_tranche(&self, tranche: u32) -> bool { + self.sent_assignment <= self.needed_approvals || + tranche <= self.max_tranche + self.num_no_shows + } + + /// Sets max tranche + fn set_max_tranche(&mut self, tranche: u32) { + self.max_tranche = max(tranche, self.max_tranche); + } + + /// Records no-show for candidate. + fn record_no_show(&mut self, tranche: u32) { + self.num_no_shows += 1; + self.last_tranche_with_no_show = max(tranche, self.last_tranche_with_no_show); + } + + /// Marks an assignment sent. + fn mark_sent_assignment(&mut self, tranche: u32) { + if self.sent_assignment < self.needed_approvals { + self.set_max_tranche(tranche); + } + + self.sent_assignment += 1; + } + + /// Tells if a message in this tranche should be a no-show. + fn should_no_show(&self, tranche: u32) -> bool { + (self.num_no_shows < self.max_no_shows && self.last_tranche_with_no_show < tranche) || + (tranche == 0 && self.num_no_shows == 0 && self.max_no_shows > 0) + } +} + +/// Test state that is pre-generated and loaded from a file that matches the fingerprint +/// of the TestConfiguration. +#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] +struct GeneratedState { + /// All assignments and approvals + all_messages: Option>, + /// The first slot in the test. + initial_slot: Slot, +} + +/// Approval test state used by all mock subsystems to be able to answer messages emitted +/// by the approval-voting and approval-distribution-subystems. +/// +/// This gets cloned across all mock subsystems, so if there is any information that gets +/// updated between subsystems, they would have to be wrapped in Arc's. +#[derive(Clone)] +pub struct ApprovalTestState { + /// The main test configuration + configuration: TestConfiguration, + /// The specific test configurations passed when starting the benchmark. + options: ApprovalsOptions, + /// The list of blocks used for testing. + blocks: Vec, + /// The babe epoch used during testing. + babe_epoch: BabeEpoch, + /// The pre-generated state. + generated_state: GeneratedState, + /// The test authorities + test_authorities: TestAuthorities, + /// Last approved block number. + last_approved_block: Arc, + /// Total sent messages from peers to node + total_sent_messages_to_node: Arc, + /// Total sent messages from test node to other peers + total_sent_messages_from_node: Arc, + /// Total unique sent messages. + total_unique_messages: Arc, + /// Approval voting metrics. + approval_voting_metrics: ApprovalVotingMetrics, + /// The delta ticks from the tick the messages were generated to the the time we start this + /// message. + delta_tick_from_generated: Arc, +} + +impl ApprovalTestState { + /// Build a new `ApprovalTestState` object out of the configurations passed when the benchmark + /// was tested. + fn new( + configuration: &TestConfiguration, + options: ApprovalsOptions, + dependencies: &TestEnvironmentDependencies, + ) -> Self { + let test_authorities = configuration.generate_authorities(); + let start = Instant::now(); + + let messages_path = PeerMessagesGenerator::generate_messages_if_needed( + configuration, + &test_authorities, + &options, + &dependencies.task_manager.spawn_handle(), + ); + + let mut messages_file = + fs::OpenOptions::new().read(true).open(messages_path.as_path()).unwrap(); + let mut messages_bytes = Vec::::with_capacity(2000000); + + messages_file + .read_to_end(&mut messages_bytes) + .expect("Could not initialize list of messages"); + let generated_state: GeneratedState = + Decode::decode(&mut messages_bytes.as_slice()).expect("Could not decode messages"); + + gum::info!( + "It took {:?} ms to load {:?} unique messages", + start.elapsed().as_millis(), + generated_state.all_messages.as_ref().map(|val| val.len()).unwrap_or_default() + ); + + let babe_epoch = + generate_babe_epoch(generated_state.initial_slot, test_authorities.clone()); + let blocks = Self::generate_blocks_information( + configuration, + &babe_epoch, + generated_state.initial_slot, + ); + + let state = ApprovalTestState { + blocks, + babe_epoch: babe_epoch.clone(), + generated_state, + test_authorities, + last_approved_block: Arc::new(AtomicU32::new(0)), + total_sent_messages_to_node: Arc::new(AtomicU64::new(0)), + total_sent_messages_from_node: Arc::new(AtomicU64::new(0)), + total_unique_messages: Arc::new(AtomicU64::new(0)), + options, + approval_voting_metrics: ApprovalVotingMetrics::try_register(&dependencies.registry) + .unwrap(), + delta_tick_from_generated: Arc::new(AtomicU64::new(630720000)), + configuration: configuration.clone(), + }; + + gum::info!("Built testing state"); + + state + } + + /// Generates the blocks and the information about the blocks that will be used + /// to drive this test. + fn generate_blocks_information( + configuration: &TestConfiguration, + babe_epoch: &BabeEpoch, + initial_slot: Slot, + ) -> Vec { + let mut per_block_heads: Vec = Vec::new(); + let mut prev_candidates = 0; + for block_number in 1..=configuration.num_blocks { + let block_hash = Hash::repeat_byte(block_number as u8); + let parent_hash = + per_block_heads.last().map(|val| val.hash).unwrap_or(Hash::repeat_byte(0xde)); + let slot_for_block = initial_slot + (block_number as u64 - 1); + + let header = make_header(parent_hash, slot_for_block, block_number as u32); + + let unsafe_vrf = approval::v1::babe_unsafe_vrf_info(&header) + .expect("Can not continue without vrf generator"); + let relay_vrf_story = unsafe_vrf + .compute_randomness( + &babe_epoch.authorities, + &babe_epoch.randomness, + babe_epoch.epoch_index, + ) + .expect("Can not continue without vrf story"); + let block_info = BlockTestData { + slot: slot_for_block, + block_number: block_number as BlockNumber, + hash: block_hash, + header, + candidates: make_candidates( + block_hash, + block_number as BlockNumber, + configuration.n_cores as u32, + configuration.n_cores as u32, + ), + relay_vrf_story, + approved: Arc::new(AtomicBool::new(false)), + total_candidates_before: prev_candidates, + votes: Arc::new( + (0..configuration.n_validators) + .map(|_| { + (0..configuration.n_cores).map(|_| AtomicBool::new(false)).collect_vec() + }) + .collect_vec(), + ), + }; + prev_candidates += block_info.candidates.len() as u64; + per_block_heads.push(block_info) + } + per_block_heads + } + + /// Starts the generation of messages(Assignments & Approvals) needed for approving blocks. + async fn start_message_production( + &mut self, + network_emulator: &NetworkEmulatorHandle, + overseer_handle: OverseerHandleReal, + env: &TestEnvironment, + registry: Registry, + ) -> oneshot::Receiver<()> { + gum::info!(target: LOG_TARGET, "Start assignments/approvals production"); + + let (producer_tx, producer_rx) = oneshot::channel(); + let peer_message_source = PeerMessageProducer { + network: network_emulator.clone(), + overseer_handle: overseer_handle.clone(), + state: self.clone(), + options: self.options.clone(), + notify_done: producer_tx, + registry, + }; + + peer_message_source + .produce_messages(env, self.generated_state.all_messages.take().unwrap()); + producer_rx + } + + // Generates a ChainApiState used for driving MockChainApi + fn build_chain_api_state(&self) -> ChainApiState { + ChainApiState { + block_headers: self + .blocks + .iter() + .map(|block| (block.hash, block.header.clone())) + .collect(), + } + } + + // Builds a map with the list of candidate events per-block. + fn candidate_events_by_block(&self) -> HashMap> { + self.blocks.iter().map(|block| (block.hash, block.candidates.clone())).collect() + } + + // Builds a map with the list of candidate hashes per-block. + fn candidate_hashes_by_block(&self) -> HashMap> { + self.blocks + .iter() + .map(|block| { + ( + block.hash, + block + .candidates + .iter() + .map(|candidate_event| match candidate_event { + CandidateEvent::CandidateBacked(_, _, _, _) => todo!(), + CandidateEvent::CandidateIncluded(receipt, _, _, _) => receipt.clone(), + CandidateEvent::CandidateTimedOut(_, _, _) => todo!(), + }) + .collect_vec(), + ) + }) + .collect() + } +} + +impl ApprovalTestState { + /// Returns test data for the given hash + fn get_info_by_hash(&self, requested_hash: Hash) -> &BlockTestData { + self.blocks + .iter() + .find(|block| block.hash == requested_hash) + .expect("Mocks should not use unknown hashes") + } + + /// Returns test data for the given slot + fn get_info_by_slot(&self, slot: Slot) -> Option<&BlockTestData> { + self.blocks.iter().find(|block| block.slot == slot) + } +} + +impl HandleNetworkMessage for ApprovalTestState { + fn handle( + &self, + _message: crate::core::network::NetworkMessage, + _node_sender: &mut futures::channel::mpsc::UnboundedSender< + crate::core::network::NetworkMessage, + >, + ) -> Option { + self.total_sent_messages_from_node + .as_ref() + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + None + } +} + +/// A generator of messages coming from a given Peer/Validator +struct PeerMessageProducer { + /// The state state used to know what messages to generate. + state: ApprovalTestState, + /// Configuration options, passed at the beginning of the test. + options: ApprovalsOptions, + /// A reference to the network emulator + network: NetworkEmulatorHandle, + /// A handle to the overseer, used for sending messages to the node + /// under test. + overseer_handle: OverseerHandleReal, + /// Channel for producer to notify main loop it finished sending + /// all messages and they have been processed. + notify_done: oneshot::Sender<()>, + /// The metrics registry. + registry: Registry, +} + +impl PeerMessageProducer { + /// Generates messages by spawning a blocking task in the background which begins creating + /// the assignments/approvals and peer view changes at the begining of each block. + fn produce_messages(mut self, env: &TestEnvironment, all_messages: Vec) { + env.spawn_blocking("produce-messages", async move { + let mut initialized_blocks = HashSet::new(); + let mut per_candidate_data: HashMap<(Hash, CandidateIndex), CandidateTestData> = + self.initialize_candidates_test_data(); + let mut skipped_messages: Vec = Vec::new(); + let mut re_process_skipped = false; + + let system_clock = + PastSystemClock::new(SystemClock {}, self.state.delta_tick_from_generated.clone()); + let mut all_messages = all_messages.into_iter().peekable(); + + while all_messages.peek().is_some() { + let current_slot = + tick_to_slot_number(SLOT_DURATION_MILLIS, system_clock.tick_now()); + let block_to_initialize = self + .state + .blocks + .iter() + .filter(|block_info| { + block_info.slot <= current_slot && + !initialized_blocks.contains(&block_info.hash) + }) + .cloned() + .collect_vec(); + for block_info in block_to_initialize { + if !TestEnvironment::metric_lower_than( + &self.registry, + "polkadot_parachain_imported_candidates_total", + (block_info.total_candidates_before + block_info.candidates.len() as u64 - + 1) as f64, + ) { + initialized_blocks.insert(block_info.hash); + self.initialize_block(&block_info).await; + } + } + + let mut maybe_need_skip = if re_process_skipped { + skipped_messages.clone().into_iter().peekable() + } else { + vec![].into_iter().peekable() + }; + + let progressing_iterator = if !re_process_skipped { + &mut all_messages + } else { + re_process_skipped = false; + skipped_messages.clear(); + &mut maybe_need_skip + }; + + while progressing_iterator + .peek() + .map(|bundle| { + self.time_to_process_message( + bundle, + current_slot, + &initialized_blocks, + &system_clock, + &per_candidate_data, + ) + }) + .unwrap_or_default() + { + let bundle = progressing_iterator.next().unwrap(); + re_process_skipped = self.process_message( + bundle, + &mut per_candidate_data, + &mut skipped_messages, + ) || re_process_skipped; + } + // Sleep, so that we don't busy wait in this loop when don't have anything to send. + sleep(Duration::from_millis(50)).await; + } + + gum::info!( + "All messages sent max_tranche {:?} last_tranche_with_no_show {:?}", + per_candidate_data.values().map(|data| data.max_tranche).max(), + per_candidate_data.values().map(|data| data.last_tranche_with_no_show).max() + ); + sleep(Duration::from_secs(6)).await; + // Send an empty GetApprovalSignatures as the last message + // so when the approval-distribution answered to it, we know it doesn't have anything + // else to process. + let (tx, rx) = oneshot::channel(); + let msg = ApprovalDistributionMessage::GetApprovalSignatures(HashSet::new(), tx); + self.send_overseer_message( + AllMessages::ApprovalDistribution(msg), + ValidatorIndex(0), + None, + ) + .await; + rx.await.expect("Failed to get signatures"); + self.notify_done.send(()).expect("Failed to notify main loop"); + gum::info!("All messages processed "); + }); + } + + // Processes a single message bundle and queue the messages to be sent by the peers that would + // send the message in our simulation. + pub fn process_message( + &mut self, + bundle: MessagesBundle, + per_candidate_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>, + skipped_messages: &mut Vec, + ) -> bool { + let mut reprocess_skipped = false; + let block_info = self + .state + .get_info_by_hash(bundle.assignments.first().unwrap().block_hash) + .clone(); + + if bundle.should_send(per_candidate_data, &self.options) { + bundle.record_sent_assignment(per_candidate_data); + + let assignments = bundle.assignments.clone(); + + for message in bundle.assignments.into_iter().chain(bundle.approvals.into_iter()) { + if message.no_show_if_required(&assignments, per_candidate_data) { + reprocess_skipped = true; + continue; + } else { + message.record_vote(&block_info); + } + self.state + .total_unique_messages + .as_ref() + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + for (peer, messages) in + message.clone().split_by_peer_id(&self.state.test_authorities) + { + for message in messages { + self.state + .total_sent_messages_to_node + .as_ref() + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + self.queue_message_from_peer(message, peer.0) + } + } + } + } else if !block_info.approved.load(std::sync::atomic::Ordering::SeqCst) && + self.options.num_no_shows_per_candidate > 0 + { + skipped_messages.push(bundle); + } + reprocess_skipped + } + + // Tells if it is the time to process a message. + pub fn time_to_process_message( + &self, + bundle: &MessagesBundle, + current_slot: Slot, + initialized_blocks: &HashSet, + system_clock: &PastSystemClock, + per_candidate_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>, + ) -> bool { + let block_info = + self.state.get_info_by_hash(bundle.assignments.first().unwrap().block_hash); + let tranche_now = system_clock.tranche_now(SLOT_DURATION_MILLIS, block_info.slot); + + Self::is_past_tranche( + bundle, + tranche_now, + current_slot, + block_info, + initialized_blocks.contains(&block_info.hash), + ) || !bundle.should_send(per_candidate_data, &self.options) + } + + // Tells if the tranche where the bundle should be sent has passed. + pub fn is_past_tranche( + bundle: &MessagesBundle, + tranche_now: u32, + current_slot: Slot, + block_info: &BlockTestData, + block_initialized: bool, + ) -> bool { + bundle.tranche_to_send() <= tranche_now && + current_slot >= block_info.slot && + block_initialized + } + + // Queue message to be sent by validator `sent_by` + fn queue_message_from_peer(&mut self, message: TestMessageInfo, sent_by: ValidatorIndex) { + let peer_authority_id = self + .state + .test_authorities + .validator_authority_id + .get(sent_by.0 as usize) + .expect("We can't handle unknown peers") + .clone(); + + self.network + .send_message_from_peer( + &peer_authority_id, + protocol_v3::ValidationProtocol::ApprovalDistribution(message.msg).into(), + ) + .unwrap_or_else(|_| panic!("Network should be up and running {:?}", sent_by)); + } + + // Queues a message to be sent by the peer identified by the `sent_by` value. + async fn send_overseer_message( + &mut self, + message: AllMessages, + _sent_by: ValidatorIndex, + _latency: Option, + ) { + self.overseer_handle + .send_msg(message, LOG_TARGET) + .timeout(MAX_TIME_OF_FLIGHT) + .await + .unwrap_or_else(|| { + panic!("{} ms maximum time of flight breached", MAX_TIME_OF_FLIGHT.as_millis()) + }); + } + + // Sends the messages needed by approval-distribution and approval-voting for processing a + // message. E.g: PeerViewChange. + async fn initialize_block(&mut self, block_info: &BlockTestData) { + gum::info!("Initialize block {:?}", block_info.hash); + let (tx, rx) = oneshot::channel(); + self.overseer_handle.wait_for_activation(block_info.hash, tx).await; + + rx.await + .expect("We should not fail waiting for block to be activated") + .expect("We should not fail waiting for block to be activated"); + + for validator in 1..self.state.test_authorities.validator_authority_id.len() as u32 { + let peer_id = self.state.test_authorities.peer_ids.get(validator as usize).unwrap(); + let validator = ValidatorIndex(validator); + let view_update = generate_peer_view_change_for(block_info.hash, *peer_id); + + self.send_overseer_message(view_update, validator, None).await; + } + } + + // Initializes the candidates test data. This is used for bookeeping if more assignments and + // approvals would be needed. + fn initialize_candidates_test_data( + &self, + ) -> HashMap<(Hash, CandidateIndex), CandidateTestData> { + let mut per_candidate_data: HashMap<(Hash, CandidateIndex), CandidateTestData> = + HashMap::new(); + for block_info in self.state.blocks.iter() { + for (candidate_index, _) in block_info.candidates.iter().enumerate() { + per_candidate_data.insert( + (block_info.hash, candidate_index as CandidateIndex), + CandidateTestData { + max_no_shows: self.options.num_no_shows_per_candidate, + last_tranche_with_no_show: 0, + sent_assignment: 0, + num_no_shows: 0, + max_tranche: 0, + needed_approvals: self.state.configuration.needed_approvals as u32, + }, + ); + } + } + per_candidate_data + } +} + +/// Helper function to build an overseer with the real implementation for `ApprovalDistribution` and +/// `ApprovalVoting` subystems and mock subsytems for all others. +fn build_overseer( + state: &ApprovalTestState, + network: &NetworkEmulatorHandle, + config: &TestConfiguration, + dependencies: &TestEnvironmentDependencies, + network_interface: &NetworkInterface, + network_receiver: NetworkInterfaceReceiver, +) -> (Overseer, AlwaysSupportsParachains>, OverseerHandleReal) { + let overseer_connector = OverseerConnector::with_event_capacity(6400000); + + let spawn_task_handle = dependencies.task_manager.spawn_handle(); + + let db = kvdb_memorydb::create(NUM_COLUMNS); + let db: polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter = + polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]); + let keystore = LocalKeystore::in_memory(); + + let system_clock = + PastSystemClock::new(SystemClock {}, state.delta_tick_from_generated.clone()); + let approval_voting = ApprovalVotingSubsystem::with_config_and_clock( + TEST_CONFIG, + Arc::new(db), + Arc::new(keystore), + Box::new(TestSyncOracle {}), + state.approval_voting_metrics.clone(), + Box::new(system_clock.clone()), + ); + + let approval_distribution = + ApprovalDistribution::new(Metrics::register(Some(&dependencies.registry)).unwrap()); + let mock_chain_api = MockChainApi::new(state.build_chain_api_state()); + let mock_chain_selection = MockChainSelection { state: state.clone(), clock: system_clock }; + let mock_runtime_api = MockRuntimeApi::new( + config.clone(), + state.test_authorities.clone(), + state.candidate_hashes_by_block(), + state.candidate_events_by_block(), + Some(state.babe_epoch.clone()), + 1, + ); + let mock_tx_bridge = MockNetworkBridgeTx::new( + network.clone(), + network_interface.subsystem_sender(), + state.test_authorities.clone(), + ); + let mock_rx_bridge = MockNetworkBridgeRx::new(network_receiver, None); + let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap(); + let dummy = dummy_builder!(spawn_task_handle, overseer_metrics) + .replace_approval_distribution(|_| approval_distribution) + .replace_approval_voting(|_| approval_voting) + .replace_chain_api(|_| mock_chain_api) + .replace_chain_selection(|_| mock_chain_selection) + .replace_runtime_api(|_| mock_runtime_api) + .replace_network_bridge_tx(|_| mock_tx_bridge) + .replace_network_bridge_rx(|_| mock_rx_bridge); + + let (overseer, raw_handle) = + dummy.build_with_connector(overseer_connector).expect("Should not fail"); + + let overseer_handle = OverseerHandleReal::new(raw_handle); + (overseer, overseer_handle) +} + +/// Takes a test configuration and uses it to creates the `TestEnvironment`. +pub fn prepare_test( + config: TestConfiguration, + options: ApprovalsOptions, +) -> (TestEnvironment, ApprovalTestState) { + prepare_test_inner(config, TestEnvironmentDependencies::default(), options) +} + +/// Build the test environment for an Approval benchmark. +fn prepare_test_inner( + config: TestConfiguration, + dependencies: TestEnvironmentDependencies, + options: ApprovalsOptions, +) -> (TestEnvironment, ApprovalTestState) { + gum::info!("Prepare test state"); + let state = ApprovalTestState::new(&config, options, &dependencies); + + gum::info!("Build network emulator"); + + let (network, network_interface, network_receiver) = + new_network(&config, &dependencies, &state.test_authorities, vec![Arc::new(state.clone())]); + + gum::info!("Build overseer"); + + let (overseer, overseer_handle) = build_overseer( + &state, + &network, + &config, + &dependencies, + &network_interface, + network_receiver, + ); + + ( + TestEnvironment::new( + dependencies, + config, + network, + overseer, + overseer_handle, + state.test_authorities.clone(), + ), + state, + ) +} + +pub async fn bench_approvals(env: &mut TestEnvironment, mut state: ApprovalTestState) { + let producer_rx = state + .start_message_production( + env.network(), + env.overseer_handle().clone(), + env, + env.registry().clone(), + ) + .await; + bench_approvals_run(env, state, producer_rx).await +} + +/// Runs the approval benchmark. +pub async fn bench_approvals_run( + env: &mut TestEnvironment, + state: ApprovalTestState, + producer_rx: oneshot::Receiver<()>, +) { + let config = env.config().clone(); + + env.metrics().set_n_validators(config.n_validators); + env.metrics().set_n_cores(config.n_cores); + + // First create the initialization messages that make sure that then node under + // tests receives notifications about the topology used and the connected peers. + let mut initialization_messages = env.network().generate_peer_connected(); + initialization_messages.extend(generate_new_session_topology( + &state.test_authorities, + ValidatorIndex(NODE_UNDER_TEST), + )); + for message in initialization_messages { + env.send_message(message).await; + } + + let start_marker = Instant::now(); + let real_clock = SystemClock {}; + state.delta_tick_from_generated.store( + real_clock.tick_now() - + slot_number_to_tick(SLOT_DURATION_MILLIS, state.generated_state.initial_slot), + std::sync::atomic::Ordering::SeqCst, + ); + let system_clock = PastSystemClock::new(real_clock, state.delta_tick_from_generated.clone()); + + for block_num in 0..env.config().num_blocks { + let mut current_slot = tick_to_slot_number(SLOT_DURATION_MILLIS, system_clock.tick_now()); + + // Wait untill the time arrieves at the first slot under test. + while current_slot < state.generated_state.initial_slot { + sleep(Duration::from_millis(5)).await; + current_slot = tick_to_slot_number(SLOT_DURATION_MILLIS, system_clock.tick_now()); + } + + gum::info!(target: LOG_TARGET, "Current block {}/{}", block_num + 1, env.config().num_blocks); + env.metrics().set_current_block(block_num); + let block_start_ts = Instant::now(); + + if let Some(block_info) = state.get_info_by_slot(current_slot) { + env.import_block(new_block_import_info(block_info.hash, block_info.block_number)) + .await; + } + + let block_time = Instant::now().sub(block_start_ts).as_millis() as u64; + env.metrics().set_block_time(block_time); + gum::info!("Block time {}", format!("{:?}ms", block_time).cyan()); + + system_clock + .wait(slot_number_to_tick(SLOT_DURATION_MILLIS, current_slot + 1)) + .await; + } + + // Wait for all blocks to be approved before exiting. + // This is an invariant of the benchmark, if this does not happen something went teribbly wrong. + while state.last_approved_block.load(std::sync::atomic::Ordering::SeqCst) < + env.config().num_blocks as u32 + { + gum::info!( + "Waiting for all blocks to be approved current approved {:} num_sent {:} num_unique {:}", + state.last_approved_block.load(std::sync::atomic::Ordering::SeqCst), + state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst), + state.total_unique_messages.load(std::sync::atomic::Ordering::SeqCst) + ); + tokio::time::sleep(Duration::from_secs(6)).await; + } + + gum::info!("Awaiting producer to signal done"); + + producer_rx.await.expect("Failed to receive done from message producer"); + + gum::info!("Awaiting polkadot_parachain_subsystem_bounded_received to tells us the messages have been processed"); + let at_least_messages = + state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst) as usize; + env.wait_until_metric( + "polkadot_parachain_subsystem_bounded_received", + Some(("subsystem_name", "approval-distribution-subsystem")), + |value| { + gum::info!(target: LOG_TARGET, ?value, ?at_least_messages, "Waiting metric"); + value >= at_least_messages as f64 + }, + ) + .await; + gum::info!("Requesting approval votes ms"); + + for info in &state.blocks { + for (index, candidates) in info.candidates.iter().enumerate() { + match candidates { + CandidateEvent::CandidateBacked(_, _, _, _) => todo!(), + CandidateEvent::CandidateIncluded(receipt_fetch, _head, _, _) => { + let (tx, rx) = oneshot::channel(); + + let msg = ApprovalVotingMessage::GetApprovalSignaturesForCandidate( + receipt_fetch.hash(), + tx, + ); + env.send_message(AllMessages::ApprovalVoting(msg)).await; + + let result = rx.await.unwrap(); + + for (validator, _) in result.iter() { + info.votes + .get(validator.0 as usize) + .unwrap() + .get(index) + .unwrap() + .store(false, std::sync::atomic::Ordering::SeqCst); + } + }, + + CandidateEvent::CandidateTimedOut(_, _, _) => todo!(), + }; + } + } + + gum::info!("Awaiting polkadot_parachain_subsystem_bounded_received to tells us the messages have been processed"); + let at_least_messages = + state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst) as usize; + env.wait_until_metric( + "polkadot_parachain_subsystem_bounded_received", + Some(("subsystem_name", "approval-distribution-subsystem")), + |value| { + gum::info!(target: LOG_TARGET, ?value, ?at_least_messages, "Waiting metric"); + value >= at_least_messages as f64 + }, + ) + .await; + + for state in &state.blocks { + for (validator, votes) in state + .votes + .as_ref() + .iter() + .enumerate() + .filter(|(validator, _)| *validator != NODE_UNDER_TEST as usize) + { + for (index, candidate) in votes.iter().enumerate() { + assert_eq!( + ( + validator, + index, + candidate.load(std::sync::atomic::Ordering::SeqCst), + state.hash + ), + (validator, index, false, state.hash) + ); + } + } + } + + env.stop().await; + + let duration: u128 = start_marker.elapsed().as_millis(); + gum::info!( + "All blocks processed in {} total_sent_messages_to_node {} total_sent_messages_from_node {} num_unique_messages {}", + format!("{:?}ms", duration).cyan(), + state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst), + state.total_sent_messages_from_node.load(std::sync::atomic::Ordering::SeqCst), + state.total_unique_messages.load(std::sync::atomic::Ordering::SeqCst) + ); + + env.display_network_usage(); + env.display_cpu_usage(&["approval-distribution", "approval-voting"]); +} diff --git a/polkadot/node/subsystem-bench/src/approval/test_message.rs b/polkadot/node/subsystem-bench/src/approval/test_message.rs new file mode 100644 index 000000000000..ea578776f261 --- /dev/null +++ b/polkadot/node/subsystem-bench/src/approval/test_message.rs @@ -0,0 +1,304 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use super::{ApprovalsOptions, BlockTestData, CandidateTestData}; +use crate::core::configuration::TestAuthorities; +use itertools::Itertools; +use parity_scale_codec::{Decode, Encode}; +use polkadot_node_network_protocol::v3 as protocol_v3; + +use polkadot_primitives::{CandidateIndex, Hash, ValidatorIndex}; +use sc_network::PeerId; +use std::collections::{HashMap, HashSet}; + +#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] +pub struct TestMessageInfo { + /// The actual message + pub msg: protocol_v3::ApprovalDistributionMessage, + /// The list of peers that would sends this message in a real topology. + /// It includes both the peers that would send the message because of the topology + /// or because of randomly chosing so. + pub sent_by: Vec, + /// The tranche at which this message should be sent. + pub tranche: u32, + /// The block hash this message refers to. + pub block_hash: Hash, +} + +impl std::hash::Hash for TestMessageInfo { + fn hash(&self, state: &mut H) { + match &self.msg { + protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => { + for (assignment, candidates) in assignments { + (assignment.block_hash, assignment.validator).hash(state); + candidates.hash(state); + } + }, + protocol_v3::ApprovalDistributionMessage::Approvals(approvals) => { + for approval in approvals { + (approval.block_hash, approval.validator).hash(state); + approval.candidate_indices.hash(state); + } + }, + }; + } +} + +#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] +/// A list of messages that depend of each-other, approvals cover one of the assignments and +/// vice-versa. +pub struct MessagesBundle { + pub assignments: Vec, + pub approvals: Vec, +} + +impl MessagesBundle { + /// The tranche when this bundle can be sent correctly, so no assignments or approvals will be + /// from the future. + pub fn tranche_to_send(&self) -> u32 { + self.assignments + .iter() + .chain(self.approvals.iter()) + .max_by(|a, b| a.tranche.cmp(&b.tranche)) + .unwrap() + .tranche + } + + /// The min tranche in the bundle. + pub fn min_tranche(&self) -> u32 { + self.assignments + .iter() + .chain(self.approvals.iter()) + .min_by(|a, b| a.tranche.cmp(&b.tranche)) + .unwrap() + .tranche + } + + /// Tells if the bundle is needed for sending. + /// We either send it because we need more assignments and approvals to approve the candidates + /// or because we configured the test to send messages untill a given tranche. + pub fn should_send( + &self, + candidates_test_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>, + options: &ApprovalsOptions, + ) -> bool { + self.needed_for_approval(candidates_test_data) || + (!options.stop_when_approved && + self.min_tranche() <= options.last_considered_tranche) + } + + /// Tells if the bundle is needed because we need more messages to approve the candidates. + pub fn needed_for_approval( + &self, + candidates_test_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>, + ) -> bool { + self.assignments + .iter() + .any(|message| message.needed_for_approval(candidates_test_data)) + } + + /// Mark the assignments in the bundle as sent. + pub fn record_sent_assignment( + &self, + candidates_test_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>, + ) { + self.assignments + .iter() + .for_each(|assignment| assignment.record_sent_assignment(candidates_test_data)); + } +} + +impl TestMessageInfo { + /// Tells if the message is an approval. + fn is_approval(&self) -> bool { + match self.msg { + protocol_v3::ApprovalDistributionMessage::Assignments(_) => false, + protocol_v3::ApprovalDistributionMessage::Approvals(_) => true, + } + } + + /// Records an approval. + /// We use this to check after all messages have been processed that we didn't loose any + /// message. + pub fn record_vote(&self, state: &BlockTestData) { + if self.is_approval() { + match &self.msg { + protocol_v3::ApprovalDistributionMessage::Assignments(_) => todo!(), + protocol_v3::ApprovalDistributionMessage::Approvals(approvals) => + for approval in approvals { + for candidate_index in approval.candidate_indices.iter_ones() { + state + .votes + .get(approval.validator.0 as usize) + .unwrap() + .get(candidate_index) + .unwrap() + .store(true, std::sync::atomic::Ordering::SeqCst); + } + }, + } + } + } + + /// Mark the assignments in the message as sent. + pub fn record_sent_assignment( + &self, + candidates_test_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>, + ) { + match &self.msg { + protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => { + for (assignment, candidate_indices) in assignments { + for candidate_index in candidate_indices.iter_ones() { + let candidate_test_data = candidates_test_data + .get_mut(&(assignment.block_hash, candidate_index as CandidateIndex)) + .unwrap(); + candidate_test_data.mark_sent_assignment(self.tranche) + } + } + }, + protocol_v3::ApprovalDistributionMessage::Approvals(_approvals) => todo!(), + } + } + + /// Returns a list of candidates indicies in this message + pub fn candidate_indices(&self) -> HashSet { + let mut unique_candidate_indicies = HashSet::new(); + match &self.msg { + protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => + for (_assignment, candidate_indices) in assignments { + for candidate_index in candidate_indices.iter_ones() { + unique_candidate_indicies.insert(candidate_index); + } + }, + protocol_v3::ApprovalDistributionMessage::Approvals(approvals) => + for approval in approvals { + for candidate_index in approval.candidate_indices.iter_ones() { + unique_candidate_indicies.insert(candidate_index); + } + }, + } + unique_candidate_indicies + } + + /// Marks this message as no-shows if the number of configured no-shows is above the registered + /// no-shows. + /// Returns true if the message is a no-show. + pub fn no_show_if_required( + &self, + assignments: &[TestMessageInfo], + candidates_test_data: &mut HashMap<(Hash, CandidateIndex), CandidateTestData>, + ) -> bool { + let mut should_no_show = false; + if self.is_approval() { + let covered_candidates = assignments + .iter() + .map(|assignment| (assignment, assignment.candidate_indices())) + .collect_vec(); + + match &self.msg { + protocol_v3::ApprovalDistributionMessage::Assignments(_) => todo!(), + protocol_v3::ApprovalDistributionMessage::Approvals(approvals) => { + assert_eq!(approvals.len(), 1); + + for approval in approvals { + should_no_show = should_no_show || + approval.candidate_indices.iter_ones().all(|candidate_index| { + let candidate_test_data = candidates_test_data + .get_mut(&( + approval.block_hash, + candidate_index as CandidateIndex, + )) + .unwrap(); + let assignment = covered_candidates + .iter() + .find(|(_assignment, candidates)| { + candidates.contains(&candidate_index) + }) + .unwrap(); + candidate_test_data.should_no_show(assignment.0.tranche) + }); + + if should_no_show { + for candidate_index in approval.candidate_indices.iter_ones() { + let candidate_test_data = candidates_test_data + .get_mut(&( + approval.block_hash, + candidate_index as CandidateIndex, + )) + .unwrap(); + let assignment = covered_candidates + .iter() + .find(|(_assignment, candidates)| { + candidates.contains(&candidate_index) + }) + .unwrap(); + candidate_test_data.record_no_show(assignment.0.tranche) + } + } + } + }, + } + } + should_no_show + } + + /// Tells if a message is needed for approval + pub fn needed_for_approval( + &self, + candidates_test_data: &HashMap<(Hash, CandidateIndex), CandidateTestData>, + ) -> bool { + match &self.msg { + protocol_v3::ApprovalDistributionMessage::Assignments(assignments) => + assignments.iter().any(|(assignment, candidate_indices)| { + candidate_indices.iter_ones().any(|candidate_index| { + candidates_test_data + .get(&(assignment.block_hash, candidate_index as CandidateIndex)) + .map(|data| data.should_send_tranche(self.tranche)) + .unwrap_or_default() + }) + }), + protocol_v3::ApprovalDistributionMessage::Approvals(approvals) => + approvals.iter().any(|approval| { + approval.candidate_indices.iter_ones().any(|candidate_index| { + candidates_test_data + .get(&(approval.block_hash, candidate_index as CandidateIndex)) + .map(|data| data.should_send_tranche(self.tranche)) + .unwrap_or_default() + }) + }), + } + } + + /// Splits a message into multiple messages based on what peers should send this message. + /// It build a HashMap of messages that should be sent by each peer. + pub fn split_by_peer_id( + self, + authorities: &TestAuthorities, + ) -> HashMap<(ValidatorIndex, PeerId), Vec> { + let mut result: HashMap<(ValidatorIndex, PeerId), Vec> = HashMap::new(); + + for validator_index in &self.sent_by { + let peer = authorities.peer_ids.get(validator_index.0 as usize).unwrap(); + result.entry((*validator_index, *peer)).or_default().push(TestMessageInfo { + msg: self.msg.clone(), + sent_by: Default::default(), + tranche: self.tranche, + block_hash: self.block_hash, + }); + } + result + } +} diff --git a/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs b/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs index 18ea2f72891f..e6827f1d8aea 100644 --- a/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs +++ b/polkadot/node/subsystem-bench/src/availability/av_store_helpers.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use crate::core::mock::TestSyncOracle; + use super::*; use polkadot_node_metrics::metrics::Metrics; @@ -31,22 +33,10 @@ mod columns { const TEST_CONFIG: Config = Config { col_data: columns::DATA, col_meta: columns::META }; -struct DumbOracle; - -impl sp_consensus::SyncOracle for DumbOracle { - fn is_major_syncing(&self) -> bool { - false - } - - fn is_offline(&self) -> bool { - unimplemented!("oh no!") - } -} - pub fn new_av_store(dependencies: &TestEnvironmentDependencies) -> AvailabilityStoreSubsystem { let metrics = Metrics::try_register(&dependencies.registry).unwrap(); - AvailabilityStoreSubsystem::new(test_store(), TEST_CONFIG, Box::new(DumbOracle), metrics) + AvailabilityStoreSubsystem::new(test_store(), TEST_CONFIG, Box::new(TestSyncOracle {}), metrics) } fn test_store() -> Arc { diff --git a/polkadot/node/subsystem-bench/src/availability/mod.rs b/polkadot/node/subsystem-bench/src/availability/mod.rs index f9892efb3c68..f7f1184448b3 100644 --- a/polkadot/node/subsystem-bench/src/availability/mod.rs +++ b/polkadot/node/subsystem-bench/src/availability/mod.rs @@ -25,7 +25,7 @@ use polkadot_node_subsystem_types::{ messages::{AvailabilityStoreMessage, NetworkBridgeEvent}, Span, }; -use polkadot_overseer::Handle as OverseerHandle; +use polkadot_overseer::{metrics::Metrics as OverseerMetrics, Handle as OverseerHandle}; use sc_network::{request_responses::ProtocolConfig, PeerId}; use sp_core::H256; use std::{collections::HashMap, iter::Cycle, ops::Sub, sync::Arc, time::Instant}; @@ -85,9 +85,12 @@ fn build_overseer_for_availability_read( av_store: MockAvailabilityStore, network_bridge: (MockNetworkBridgeTx, MockNetworkBridgeRx), availability_recovery: AvailabilityRecoverySubsystem, + dependencies: &TestEnvironmentDependencies, ) -> (Overseer, AlwaysSupportsParachains>, OverseerHandle) { let overseer_connector = OverseerConnector::with_event_capacity(64000); - let dummy = dummy_builder!(spawn_task_handle); + let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap(); + + let dummy = dummy_builder!(spawn_task_handle, overseer_metrics); let builder = dummy .replace_runtime_api(|_| runtime_api) .replace_availability_store(|_| av_store) @@ -101,6 +104,7 @@ fn build_overseer_for_availability_read( (overseer, OverseerHandle::new(raw_handle)) } +#[allow(clippy::too_many_arguments)] fn build_overseer_for_availability_write( spawn_task_handle: SpawnTaskHandle, runtime_api: MockRuntimeApi, @@ -109,9 +113,12 @@ fn build_overseer_for_availability_write( chain_api: MockChainApi, availability_store: AvailabilityStoreSubsystem, bitfield_distribution: BitfieldDistribution, + dependencies: &TestEnvironmentDependencies, ) -> (Overseer, AlwaysSupportsParachains>, OverseerHandle) { let overseer_connector = OverseerConnector::with_event_capacity(64000); - let dummy = dummy_builder!(spawn_task_handle); + let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap(); + + let dummy = dummy_builder!(spawn_task_handle, overseer_metrics); let builder = dummy .replace_runtime_api(|_| runtime_api) .replace_availability_store(|_| availability_store) @@ -171,6 +178,9 @@ fn prepare_test_inner( config.clone(), test_authorities.clone(), candidate_hashes, + Default::default(), + Default::default(), + 0, ); let availability_state = NetworkAvailabilityState { @@ -198,6 +208,7 @@ fn prepare_test_inner( let network_bridge_tx = network_bridge::MockNetworkBridgeTx::new( network.clone(), network_interface.subsystem_sender(), + test_authorities.clone(), ); let network_bridge_rx = @@ -231,6 +242,7 @@ fn prepare_test_inner( av_store, (network_bridge_tx, network_bridge_rx), subsystem, + &dependencies, ) }, TestObjective::DataAvailabilityWrite => { @@ -240,7 +252,7 @@ fn prepare_test_inner( Metrics::try_register(&dependencies.registry).unwrap(), ); - let block_headers = (0..=config.num_blocks) + let block_headers = (1..=config.num_blocks) .map(|block_number| { ( Hash::repeat_byte(block_number as u8), @@ -267,6 +279,7 @@ fn prepare_test_inner( chain_api, new_av_store(&dependencies), bitfield_distribution, + &dependencies, ) }, _ => { @@ -614,9 +627,10 @@ pub async fn benchmark_availability_write(env: &mut TestEnvironment, mut state: ); // Wait for all bitfields to be processed. - env.wait_until_metric_eq( + env.wait_until_metric( "polkadot_parachain_received_availabilty_bitfields_total", - config.connected_count() * block_num, + None, + |value| value == (config.connected_count() * block_num) as f64, ) .await; diff --git a/polkadot/node/subsystem-bench/src/cli.rs b/polkadot/node/subsystem-bench/src/cli.rs index 7213713eb6ba..bfce8cc183a9 100644 --- a/polkadot/node/subsystem-bench/src/cli.rs +++ b/polkadot/node/subsystem-bench/src/cli.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . use super::availability::DataAvailabilityReadOptions; +use crate::approval::ApprovalsOptions; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, clap::Parser)] @@ -34,6 +35,9 @@ pub enum TestObjective { DataAvailabilityWrite, /// Run a test sequence specified in a file TestSequence(TestSequenceOptions), + /// Benchmark the approval-voting and approval-distribution subsystems. + ApprovalVoting(ApprovalsOptions), + Unimplemented, } #[derive(Debug, clap::Parser)] diff --git a/polkadot/node/subsystem-bench/src/core/configuration.rs b/polkadot/node/subsystem-bench/src/core/configuration.rs index 66da8a1db45d..0c8a78c504c8 100644 --- a/polkadot/node/subsystem-bench/src/core/configuration.rs +++ b/polkadot/node/subsystem-bench/src/core/configuration.rs @@ -16,11 +16,14 @@ // //! Test configuration definition and helpers. use super::*; +use itertools::Itertools; use keyring::Keyring; -use std::path::Path; +use sc_network::PeerId; +use sp_consensus_babe::AuthorityId; +use std::{collections::HashMap, path::Path}; pub use crate::cli::TestObjective; -use polkadot_primitives::{AuthorityDiscoveryId, ValidatorId}; +use polkadot_primitives::{AssignmentId, AuthorityDiscoveryId, ValidatorId}; use rand::thread_rng; use rand_distr::{Distribution, Normal, Uniform}; @@ -65,6 +68,25 @@ fn default_backing_group_size() -> usize { 5 } +// Default needed approvals +fn default_needed_approvals() -> usize { + 30 +} + +fn default_zeroth_delay_tranche_width() -> usize { + 0 +} +fn default_relay_vrf_modulo_samples() -> usize { + 6 +} + +fn default_n_delay_tranches() -> usize { + 89 +} +fn default_no_show_slots() -> usize { + 3 +} + /// The test input parameters #[derive(Clone, Debug, Serialize, Deserialize)] pub struct TestConfiguration { @@ -74,6 +96,17 @@ pub struct TestConfiguration { pub n_validators: usize, /// Number of cores pub n_cores: usize, + /// The number of needed votes to approve a candidate. + #[serde(default = "default_needed_approvals")] + pub needed_approvals: usize, + #[serde(default = "default_zeroth_delay_tranche_width")] + pub zeroth_delay_tranche_width: usize, + #[serde(default = "default_relay_vrf_modulo_samples")] + pub relay_vrf_modulo_samples: usize, + #[serde(default = "default_n_delay_tranches")] + pub n_delay_tranches: usize, + #[serde(default = "default_no_show_slots")] + pub no_show_slots: usize, /// Maximum backing group size #[serde(default = "default_backing_group_size")] pub max_validators_per_core: usize, @@ -139,6 +172,11 @@ pub struct TestAuthorities { pub keyring: Keyring, pub validator_public: Vec, pub validator_authority_id: Vec, + pub validator_babe_id: Vec, + pub validator_assignment_id: Vec, + pub key_seeds: Vec, + pub peer_ids: Vec, + pub peer_id_to_authority: HashMap, } impl TestConfiguration { @@ -162,18 +200,45 @@ impl TestConfiguration { pub fn generate_authorities(&self) -> TestAuthorities { let keyring = Keyring::default(); - let keys = (0..self.n_validators) - .map(|peer_index| keyring.sr25519_new(format!("Node{}", peer_index))) + let key_seeds = (0..self.n_validators) + .map(|peer_index| format!("//Node{}", peer_index)) + .collect_vec(); + + let keys = key_seeds + .iter() + .map(|seed| keyring.sr25519_new(seed.as_str())) .collect::>(); - // Generate `AuthorityDiscoveryId`` for each peer + // Generate keys and peers ids in each of the format needed by the tests. let validator_public: Vec = keys.iter().map(|key| (*key).into()).collect::>(); let validator_authority_id: Vec = keys.iter().map(|key| (*key).into()).collect::>(); - TestAuthorities { keyring, validator_public, validator_authority_id } + let validator_babe_id: Vec = + keys.iter().map(|key| (*key).into()).collect::>(); + + let validator_assignment_id: Vec = + keys.iter().map(|key| (*key).into()).collect::>(); + let peer_ids: Vec = keys.iter().map(|_| PeerId::random()).collect::>(); + + let peer_id_to_authority = peer_ids + .iter() + .zip(validator_authority_id.iter()) + .map(|(peer_id, authorithy_id)| (*peer_id, authorithy_id.clone())) + .collect(); + + TestAuthorities { + keyring, + validator_public, + validator_authority_id, + peer_ids, + validator_babe_id, + validator_assignment_id, + key_seeds, + peer_id_to_authority, + } } /// An unconstrained standard configuration matching Polkadot/Kusama @@ -199,6 +264,11 @@ impl TestConfiguration { min_pov_size, max_pov_size, connectivity: 100, + needed_approvals: default_needed_approvals(), + n_delay_tranches: default_n_delay_tranches(), + no_show_slots: default_no_show_slots(), + relay_vrf_modulo_samples: default_relay_vrf_modulo_samples(), + zeroth_delay_tranche_width: default_zeroth_delay_tranche_width(), } } @@ -223,6 +293,11 @@ impl TestConfiguration { min_pov_size, max_pov_size, connectivity: 95, + needed_approvals: default_needed_approvals(), + n_delay_tranches: default_n_delay_tranches(), + no_show_slots: default_no_show_slots(), + relay_vrf_modulo_samples: default_relay_vrf_modulo_samples(), + zeroth_delay_tranche_width: default_zeroth_delay_tranche_width(), } } @@ -247,6 +322,11 @@ impl TestConfiguration { min_pov_size, max_pov_size, connectivity: 67, + needed_approvals: default_needed_approvals(), + n_delay_tranches: default_n_delay_tranches(), + no_show_slots: default_no_show_slots(), + relay_vrf_modulo_samples: default_relay_vrf_modulo_samples(), + zeroth_delay_tranche_width: default_zeroth_delay_tranche_width(), } } } diff --git a/polkadot/node/subsystem-bench/src/core/display.rs b/polkadot/node/subsystem-bench/src/core/display.rs index bca82d7b90ae..b130afdcfad5 100644 --- a/polkadot/node/subsystem-bench/src/core/display.rs +++ b/polkadot/node/subsystem-bench/src/core/display.rs @@ -26,7 +26,7 @@ use prometheus::{ }; use std::fmt::Display; -#[derive(Default)] +#[derive(Default, Debug)] pub struct MetricCollection(Vec); impl From> for MetricCollection { @@ -49,6 +49,11 @@ impl MetricCollection { .sum() } + /// Tells if entries in bucket metric is lower than `value` + pub fn metric_lower_than(&self, metric_name: &str, value: f64) -> bool { + self.sum_by(metric_name) < value + } + pub fn subset_with_label_value(&self, label_name: &str, label_value: &str) -> MetricCollection { self.0 .iter() @@ -163,7 +168,7 @@ pub fn parse_metrics(registry: &Registry) -> MetricCollection { name: h_name, label_names, label_values, - value: h.get_sample_sum(), + value: h.get_sample_count() as f64, }); }, MetricType::SUMMARY => { diff --git a/polkadot/node/subsystem-bench/src/core/environment.rs b/polkadot/node/subsystem-bench/src/core/environment.rs index b6846316430b..59bfed7f1120 100644 --- a/polkadot/node/subsystem-bench/src/core/environment.rs +++ b/polkadot/node/subsystem-bench/src/core/environment.rs @@ -243,6 +243,11 @@ impl TestEnvironment { &self.network } + /// Returns a reference to the overseer handle. + pub fn overseer_handle(&self) -> &OverseerHandle { + &self.overseer_handle + } + /// Returns the Prometheus registry. pub fn registry(&self) -> &Registry { &self.dependencies.registry @@ -311,18 +316,32 @@ impl TestEnvironment { self.overseer_handle.stop().await; } - /// Blocks until `metric_name` == `value` - pub async fn wait_until_metric_eq(&self, metric_name: &str, value: usize) { - let value = value as f64; + /// Tells if entries in bucket metric is lower than `value` + pub fn metric_lower_than(registry: &Registry, metric_name: &str, value: f64) -> bool { + let test_metrics = super::display::parse_metrics(registry); + test_metrics.metric_lower_than(metric_name, value) + } + + /// Blocks until `metric_name` >= `value` + pub async fn wait_until_metric( + &self, + metric_name: &str, + label: Option<(&str, &str)>, + condition: impl Fn(f64) -> bool, + ) { loop { - let test_metrics = super::display::parse_metrics(self.registry()); + let test_metrics = if let Some((label_name, label_value)) = label { + super::display::parse_metrics(self.registry()) + .subset_with_label_value(label_name, label_value) + } else { + super::display::parse_metrics(self.registry()) + }; let current_value = test_metrics.sum_by(metric_name); - gum::debug!(target: LOG_TARGET, metric_name, current_value, value, "Waiting for metric"); - if current_value == value { + gum::debug!(target: LOG_TARGET, metric_name, current_value, "Waiting for metric"); + if condition(current_value) { break } - // Check value every 50ms. tokio::time::sleep(std::time::Duration::from_millis(50)).await; } diff --git a/polkadot/node/subsystem-bench/src/core/keyring.rs b/polkadot/node/subsystem-bench/src/core/keyring.rs index 66c7229847c3..c290d30b46fb 100644 --- a/polkadot/node/subsystem-bench/src/core/keyring.rs +++ b/polkadot/node/subsystem-bench/src/core/keyring.rs @@ -34,13 +34,17 @@ impl Default for Keyring { } impl Keyring { - pub fn sr25519_new(&self, name: String) -> Public { + pub fn sr25519_new(&self, seed: &str) -> Public { self.keystore - .sr25519_generate_new(ValidatorId::ID, Some(&format!("//{}", name))) + .sr25519_generate_new(ValidatorId::ID, Some(seed)) .expect("Insert key into keystore") } pub fn keystore(&self) -> Arc { self.keystore.clone() } + + pub fn keystore_ref(&self) -> &LocalKeystore { + self.keystore.as_ref() + } } diff --git a/polkadot/node/subsystem-bench/src/core/mock/chain_api.rs b/polkadot/node/subsystem-bench/src/core/mock/chain_api.rs index 008d8eef106a..7a5ee80de800 100644 --- a/polkadot/node/subsystem-bench/src/core/mock/chain_api.rs +++ b/polkadot/node/subsystem-bench/src/core/mock/chain_api.rs @@ -16,6 +16,7 @@ //! //! A generic runtime api subsystem mockup suitable to be used in benchmarks. +use itertools::Itertools; use polkadot_primitives::Header; use polkadot_node_subsystem::{ @@ -38,6 +39,12 @@ pub struct MockChainApi { state: ChainApiState, } +impl ChainApiState { + fn get_header_by_number(&self, requested_number: u32) -> Option<&Header> { + self.block_headers.values().find(|header| header.number == requested_number) + } +} + impl MockChainApi { pub fn new(state: ChainApiState) -> MockChainApi { Self { state } @@ -77,9 +84,44 @@ impl MockChainApi { .expect("Relay chain block hashes are known"), ))); }, - ChainApiMessage::Ancestors { hash: _hash, k: _k, response_channel } => { - // For our purposes, no ancestors is fine. - let _ = response_channel.send(Ok(Vec::new())); + ChainApiMessage::FinalizedBlockNumber(val) => { + val.send(Ok(0)).unwrap(); + }, + ChainApiMessage::FinalizedBlockHash(requested_number, sender) => { + let hash = self + .state + .get_header_by_number(requested_number) + .expect("Unknow block number") + .hash(); + sender.send(Ok(Some(hash))).unwrap(); + }, + ChainApiMessage::BlockNumber(requested_hash, sender) => { + sender + .send(Ok(Some( + self.state + .block_headers + .get(&requested_hash) + .expect("Unknown block hash") + .number, + ))) + .unwrap(); + }, + ChainApiMessage::Ancestors { hash, k: _, response_channel } => { + let block_number = self + .state + .block_headers + .get(&hash) + .expect("Unknown block hash") + .number; + let ancestors = self + .state + .block_headers + .iter() + .filter(|(_, header)| header.number < block_number) + .sorted_by(|a, b| a.1.number.cmp(&b.1.number)) + .map(|(hash, _)| *hash) + .collect_vec(); + response_channel.send(Ok(ancestors)).unwrap(); }, _ => { unimplemented!("Unexpected chain-api message") diff --git a/polkadot/node/subsystem-bench/src/core/mock/mod.rs b/polkadot/node/subsystem-bench/src/core/mock/mod.rs index b67c6611e8cd..e766b07e8b16 100644 --- a/polkadot/node/subsystem-bench/src/core/mock/mod.rs +++ b/polkadot/node/subsystem-bench/src/core/mock/mod.rs @@ -37,7 +37,7 @@ impl HeadSupportsParachains for AlwaysSupportsParachains { // An orchestra with dummy subsystems macro_rules! dummy_builder { - ($spawn_task_handle: ident) => {{ + ($spawn_task_handle: ident, $metrics: ident) => {{ use super::core::mock::dummy::*; // Initialize a mock overseer. @@ -69,10 +69,24 @@ macro_rules! dummy_builder { .activation_external_listeners(Default::default()) .span_per_active_leaf(Default::default()) .active_leaves(Default::default()) - .metrics(Default::default()) + .metrics($metrics) .supports_parachains(AlwaysSupportsParachains {}) .spawner(SpawnGlue($spawn_task_handle)) }}; } pub(crate) use dummy_builder; +use sp_consensus::SyncOracle; + +#[derive(Clone)] +pub struct TestSyncOracle {} + +impl SyncOracle for TestSyncOracle { + fn is_major_syncing(&self) -> bool { + false + } + + fn is_offline(&self) -> bool { + unimplemented!("not used by subsystem benchmarks") + } +} diff --git a/polkadot/node/subsystem-bench/src/core/mock/network_bridge.rs b/polkadot/node/subsystem-bench/src/core/mock/network_bridge.rs index a2be853ef8d5..a171deb2e715 100644 --- a/polkadot/node/subsystem-bench/src/core/mock/network_bridge.rs +++ b/polkadot/node/subsystem-bench/src/core/mock/network_bridge.rs @@ -18,11 +18,11 @@ //! the emulated network. use futures::{channel::mpsc::UnboundedSender, FutureExt, StreamExt}; use polkadot_node_subsystem_types::{ - messages::{BitfieldDistributionMessage, NetworkBridgeEvent}, + messages::{ApprovalDistributionMessage, BitfieldDistributionMessage, NetworkBridgeEvent}, OverseerSignal, }; -use sc_network::{request_responses::ProtocolConfig, PeerId, RequestFailure}; +use sc_network::{request_responses::ProtocolConfig, RequestFailure}; use polkadot_node_subsystem::{ messages::NetworkBridgeTxMessage, overseer, SpawnedSubsystem, SubsystemError, @@ -30,8 +30,9 @@ use polkadot_node_subsystem::{ use polkadot_node_network_protocol::Versioned; -use crate::core::network::{ - NetworkEmulatorHandle, NetworkInterfaceReceiver, NetworkMessage, RequestExt, +use crate::core::{ + configuration::TestAuthorities, + network::{NetworkEmulatorHandle, NetworkInterfaceReceiver, NetworkMessage, RequestExt}, }; const LOG_TARGET: &str = "subsystem-bench::network-bridge"; @@ -44,6 +45,8 @@ pub struct MockNetworkBridgeTx { network: NetworkEmulatorHandle, /// A channel to the network interface, to_network_interface: UnboundedSender, + /// Test authorithies + test_authorithies: TestAuthorities, } /// A mock of the network bridge tx subsystem. @@ -58,8 +61,9 @@ impl MockNetworkBridgeTx { pub fn new( network: NetworkEmulatorHandle, to_network_interface: UnboundedSender, + test_authorithies: TestAuthorities, ) -> MockNetworkBridgeTx { - Self { network, to_network_interface } + Self { network, to_network_interface, test_authorithies } } } @@ -126,9 +130,21 @@ impl MockNetworkBridgeTx { NetworkBridgeTxMessage::ReportPeer(_) => { // ingore rep changes }, - _ => { - unimplemented!("Unexpected network bridge message") + NetworkBridgeTxMessage::SendValidationMessage(peers, message) => { + for peer in peers { + self.to_network_interface + .unbounded_send(NetworkMessage::MessageFromNode( + self.test_authorithies + .peer_id_to_authority + .get(&peer) + .unwrap() + .clone(), + message.clone(), + )) + .expect("Should not fail"); + } }, + _ => unimplemented!("Unexpected network bridge message"), }, } } @@ -145,16 +161,23 @@ impl MockNetworkBridgeRx { maybe_peer_message = from_network_interface.next() => { if let Some(message) = maybe_peer_message { match message { - NetworkMessage::MessageFromPeer(message) => match message { + NetworkMessage::MessageFromPeer(peer_id, message) => match message { Versioned::V2( polkadot_node_network_protocol::v2::ValidationProtocol::BitfieldDistribution( bitfield, ), ) => { ctx.send_message( - BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(PeerId::random(), polkadot_node_network_protocol::Versioned::V2(bitfield))) + BitfieldDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V2(bitfield))) ).await; }, + Versioned::V3( + polkadot_node_network_protocol::v3::ValidationProtocol::ApprovalDistribution(msg) + ) => { + ctx.send_message( + ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V3(msg))) + ).await; + } _ => { unimplemented!("We only talk v2 network protocol") }, diff --git a/polkadot/node/subsystem-bench/src/core/mock/runtime_api.rs b/polkadot/node/subsystem-bench/src/core/mock/runtime_api.rs index caefe068efff..ca6896dbb29d 100644 --- a/polkadot/node/subsystem-bench/src/core/mock/runtime_api.rs +++ b/polkadot/node/subsystem-bench/src/core/mock/runtime_api.rs @@ -16,8 +16,10 @@ //! //! A generic runtime api subsystem mockup suitable to be used in benchmarks. +use itertools::Itertools; use polkadot_primitives::{ - CandidateReceipt, CoreState, GroupIndex, IndexedVec, OccupiedCore, SessionInfo, ValidatorIndex, + vstaging::NodeFeatures, CandidateEvent, CandidateReceipt, CoreState, GroupIndex, IndexedVec, + OccupiedCore, SessionIndex, SessionInfo, ValidatorIndex, }; use bitvec::prelude::BitVec; @@ -26,6 +28,7 @@ use polkadot_node_subsystem::{ overseer, SpawnedSubsystem, SubsystemError, }; use polkadot_node_subsystem_types::OverseerSignal; +use sp_consensus_babe::Epoch as BabeEpoch; use sp_core::H256; use std::collections::HashMap; @@ -38,8 +41,13 @@ const LOG_TARGET: &str = "subsystem-bench::runtime-api-mock"; pub struct RuntimeApiState { // All authorities in the test, authorities: TestAuthorities, - // Candidate + // Candidate hashes per block candidate_hashes: HashMap>, + // Included candidates per bock + included_candidates: HashMap>, + babe_epoch: Option, + // The session child index, + session_index: SessionIndex, } /// A mocked `runtime-api` subsystem. @@ -53,34 +61,57 @@ impl MockRuntimeApi { config: TestConfiguration, authorities: TestAuthorities, candidate_hashes: HashMap>, + included_candidates: HashMap>, + babe_epoch: Option, + session_index: SessionIndex, ) -> MockRuntimeApi { - Self { state: RuntimeApiState { authorities, candidate_hashes }, config } + Self { + state: RuntimeApiState { + authorities, + candidate_hashes, + included_candidates, + babe_epoch, + session_index, + }, + config, + } } fn session_info(&self) -> SessionInfo { - let all_validators = (0..self.config.n_validators) - .map(|i| ValidatorIndex(i as _)) - .collect::>(); - - let validator_groups = all_validators - .chunks(self.config.max_validators_per_core) - .map(Vec::from) - .collect::>(); - SessionInfo { - validators: self.state.authorities.validator_public.clone().into(), - discovery_keys: self.state.authorities.validator_authority_id.clone(), - validator_groups: IndexedVec::>::from(validator_groups), - assignment_keys: vec![], - n_cores: self.config.n_cores as u32, - zeroth_delay_tranche_width: 0, - relay_vrf_modulo_samples: 0, - n_delay_tranches: 0, - no_show_slots: 0, - needed_approvals: 0, - active_validator_indices: vec![], - dispute_period: 6, - random_seed: [0u8; 32], - } + session_info_for_peers(&self.config, &self.state.authorities) + } +} + +/// Generates a test session info with all passed authorities as consensus validators. +pub fn session_info_for_peers( + configuration: &TestConfiguration, + authorities: &TestAuthorities, +) -> SessionInfo { + let all_validators = (0..configuration.n_validators) + .map(|i| ValidatorIndex(i as _)) + .collect::>(); + + let validator_groups = all_validators + .chunks(configuration.max_validators_per_core) + .map(Vec::from) + .collect::>(); + + SessionInfo { + validators: authorities.validator_public.iter().cloned().collect(), + discovery_keys: authorities.validator_authority_id.to_vec(), + assignment_keys: authorities.validator_assignment_id.to_vec(), + validator_groups: IndexedVec::>::from(validator_groups), + n_cores: configuration.n_cores as u32, + needed_approvals: configuration.needed_approvals as u32, + zeroth_delay_tranche_width: configuration.zeroth_delay_tranche_width as u32, + relay_vrf_modulo_samples: configuration.relay_vrf_modulo_samples as u32, + n_delay_tranches: configuration.n_delay_tranches as u32, + no_show_slots: configuration.no_show_slots as u32, + active_validator_indices: (0..authorities.validator_authority_id.len()) + .map(|index| ValidatorIndex(index as u32)) + .collect_vec(), + dispute_period: 6, + random_seed: [0u8; 32], } } @@ -110,6 +141,13 @@ impl MockRuntimeApi { gum::debug!(target: LOG_TARGET, msg=?msg, "recv message"); match msg { + RuntimeApiMessage::Request( + request, + RuntimeApiRequest::CandidateEvents(sender), + ) => { + let candidate_events = self.state.included_candidates.get(&request); + let _ = sender.send(Ok(candidate_events.cloned().unwrap_or_default())); + }, RuntimeApiMessage::Request( _block_hash, RuntimeApiRequest::SessionInfo(_session_index, sender), @@ -123,24 +161,24 @@ impl MockRuntimeApi { let _ = sender.send(Ok(Some(Default::default()))); }, RuntimeApiMessage::Request( - _block_hash, - RuntimeApiRequest::Validators(sender), + _request, + RuntimeApiRequest::NodeFeatures(_session_index, sender), ) => { - let _ = - sender.send(Ok(self.state.authorities.validator_public.clone())); + let _ = sender.send(Ok(NodeFeatures::EMPTY)); }, RuntimeApiMessage::Request( _block_hash, - RuntimeApiRequest::CandidateEvents(sender), + RuntimeApiRequest::Validators(sender), ) => { - let _ = sender.send(Ok(Default::default())); + let _ = + sender.send(Ok(self.state.authorities.validator_public.clone())); }, RuntimeApiMessage::Request( _block_hash, RuntimeApiRequest::SessionIndexForChild(sender), ) => { // Session is always the same. - let _ = sender.send(Ok(0)); + let _ = sender.send(Ok(self.state.session_index)); }, RuntimeApiMessage::Request( block_hash, @@ -176,10 +214,14 @@ impl MockRuntimeApi { let _ = sender.send(Ok(cores)); }, RuntimeApiMessage::Request( - _block_hash, - RuntimeApiRequest::NodeFeatures(_session_index, sender), + _request, + RuntimeApiRequest::CurrentBabeEpoch(sender), ) => { - let _ = sender.send(Ok(Default::default())); + let _ = sender.send(Ok(self + .state + .babe_epoch + .clone() + .expect("Babe epoch unpopulated"))); }, // Long term TODO: implement more as needed. message => { diff --git a/polkadot/node/subsystem-bench/src/core/mod.rs b/polkadot/node/subsystem-bench/src/core/mod.rs index 282788d143b4..507dd1aa83f6 100644 --- a/polkadot/node/subsystem-bench/src/core/mod.rs +++ b/polkadot/node/subsystem-bench/src/core/mod.rs @@ -15,6 +15,8 @@ // along with Polkadot. If not, see . const LOG_TARGET: &str = "subsystem-bench::core"; +// The validator index that represent the node that is under test. +pub const NODE_UNDER_TEST: u32 = 0; pub mod configuration; pub mod display; diff --git a/polkadot/node/subsystem-bench/src/core/network.rs b/polkadot/node/subsystem-bench/src/core/network.rs index e2932bf0f51b..8e7c28140635 100644 --- a/polkadot/node/subsystem-bench/src/core/network.rs +++ b/polkadot/node/subsystem-bench/src/core/network.rs @@ -10,7 +10,6 @@ // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. - // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! @@ -48,17 +47,21 @@ use futures::{ stream::FuturesUnordered, }; +use itertools::Itertools; use net_protocol::{ + peer_set::{ProtocolVersion, ValidationVersion}, request_response::{Recipient, Requests, ResponseSender}, - VersionedValidationProtocol, + ObservedRole, VersionedValidationProtocol, }; use parity_scale_codec::Encode; +use polkadot_node_subsystem_types::messages::{ApprovalDistributionMessage, NetworkBridgeEvent}; +use polkadot_overseer::AllMessages; use polkadot_primitives::AuthorityDiscoveryId; use prometheus_endpoint::U64; use rand::{seq::SliceRandom, thread_rng}; use sc_network::{ request_responses::{IncomingRequest, OutgoingResponse}, - RequestFailure, + PeerId, RequestFailure, }; use sc_service::SpawnTaskHandle; use std::{ @@ -142,7 +145,7 @@ impl RateLimit { /// peer(`AuthorityDiscoveryId``). pub enum NetworkMessage { /// A gossip message from peer to node. - MessageFromPeer(VersionedValidationProtocol), + MessageFromPeer(PeerId, VersionedValidationProtocol), /// A gossip message from node to a peer. MessageFromNode(AuthorityDiscoveryId, VersionedValidationProtocol), /// A request originating from our node @@ -155,9 +158,9 @@ impl NetworkMessage { /// Returns the size of the encoded message or request pub fn size(&self) -> usize { match &self { - NetworkMessage::MessageFromPeer(Versioned::V2(message)) => message.encoded_size(), - NetworkMessage::MessageFromPeer(Versioned::V1(message)) => message.encoded_size(), - NetworkMessage::MessageFromPeer(Versioned::V3(message)) => message.encoded_size(), + NetworkMessage::MessageFromPeer(_, Versioned::V2(message)) => message.encoded_size(), + NetworkMessage::MessageFromPeer(_, Versioned::V1(message)) => message.encoded_size(), + NetworkMessage::MessageFromPeer(_, Versioned::V3(message)) => message.encoded_size(), NetworkMessage::MessageFromNode(_peer_id, Versioned::V2(message)) => message.encoded_size(), NetworkMessage::MessageFromNode(_peer_id, Versioned::V1(message)) => @@ -430,6 +433,7 @@ pub struct EmulatedPeerHandle { messages_tx: UnboundedSender, /// Send actions to be performed by the peer. actions_tx: UnboundedSender, + peer_id: PeerId, } impl EmulatedPeerHandle { @@ -441,7 +445,7 @@ impl EmulatedPeerHandle { /// Send a message to the node. pub fn send_message(&self, message: VersionedValidationProtocol) { self.actions_tx - .unbounded_send(NetworkMessage::MessageFromPeer(message)) + .unbounded_send(NetworkMessage::MessageFromPeer(self.peer_id, message)) .expect("Peer action channel hangup"); } @@ -613,6 +617,7 @@ pub fn new_peer( stats: Arc, to_network_interface: UnboundedSender, latency_ms: usize, + peer_id: PeerId, ) -> EmulatedPeerHandle { let (messages_tx, messages_rx) = mpsc::unbounded::(); let (actions_tx, actions_rx) = mpsc::unbounded::(); @@ -641,7 +646,7 @@ pub fn new_peer( .boxed(), ); - EmulatedPeerHandle { messages_tx, actions_tx } + EmulatedPeerHandle { messages_tx, actions_tx, peer_id } } /// Book keeping of sent and received bytes. @@ -719,6 +724,28 @@ pub struct NetworkEmulatorHandle { validator_authority_ids: HashMap, } +impl NetworkEmulatorHandle { + /// Generates peer_connected messages for all peers in `test_authorities` + pub fn generate_peer_connected(&self) -> Vec { + self.peers + .iter() + .filter(|peer| peer.is_connected()) + .map(|peer| { + let network = NetworkBridgeEvent::PeerConnected( + peer.handle().peer_id, + ObservedRole::Full, + ProtocolVersion::from(ValidationVersion::V3), + None, + ); + + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate( + network, + )) + }) + .collect_vec() + } +} + /// Create a new emulated network based on `config`. /// Each emulated peer will run the specified `handlers` to process incoming messages. pub fn new_network( @@ -753,6 +780,7 @@ pub fn new_network( stats, to_network_interface.clone(), random_latency(config.latency.as_ref()), + *authorities.peer_ids.get(peer_index).unwrap(), )), ) }) @@ -760,10 +788,14 @@ pub fn new_network( let connected_count = config.connected_count(); - let (_connected, to_disconnect) = peers.partial_shuffle(&mut thread_rng(), connected_count); + let mut peers_indicies = (0..n_peers).collect_vec(); + let (_connected, to_disconnect) = + peers_indicies.partial_shuffle(&mut thread_rng(), connected_count); - for peer in to_disconnect { - peer.disconnect(); + // Node under test is always mark as disconnected. + peers[NODE_UNDER_TEST as usize].disconnect(); + for peer in to_disconnect.iter().skip(1) { + peers[*peer].disconnect(); } gum::info!(target: LOG_TARGET, "{}",format!("Network created, connected validator count {}", connected_count).bright_black()); @@ -786,6 +818,7 @@ pub fn new_network( } /// Errors that can happen when sending data to emulated peers. +#[derive(Clone, Debug)] pub enum EmulatedPeerError { NotConnected, } diff --git a/polkadot/node/subsystem-bench/src/subsystem-bench.rs b/polkadot/node/subsystem-bench/src/subsystem-bench.rs index 8633ebb703aa..6f45214bc735 100644 --- a/polkadot/node/subsystem-bench/src/subsystem-bench.rs +++ b/polkadot/node/subsystem-bench/src/subsystem-bench.rs @@ -26,6 +26,7 @@ use pyroscope_pprofrs::{pprof_backend, PprofConfig}; use std::path::Path; +pub(crate) mod approval; pub(crate) mod availability; pub(crate) mod cli; pub(crate) mod core; @@ -43,7 +44,7 @@ use core::{ use clap_num::number_range; -use crate::core::display::display_configuration; +use crate::{approval::bench_approvals, core::display::display_configuration}; fn le_100(s: &str) -> Result { number_range(s, 0, 100) @@ -174,6 +175,12 @@ impl BenchCli { &mut env, state, )); }, + TestObjective::ApprovalVoting(ref options) => { + let (mut env, state) = + approval::prepare_test(test_config.clone(), options.clone()); + + env.runtime().block_on(bench_approvals(&mut env, state)); + }, TestObjective::DataAvailabilityWrite => { let mut state = TestState::new(&test_config); let (mut env, _protocol_config) = prepare_test(test_config, &mut state); @@ -181,13 +188,16 @@ impl BenchCli { &mut env, state, )); }, - _ => gum::error!("Invalid test objective in sequence"), + TestObjective::TestSequence(_) => todo!(), + TestObjective::Unimplemented => todo!(), } } return Ok(()) }, TestObjective::DataAvailabilityRead(ref _options) => self.create_test_configuration(), TestObjective::DataAvailabilityWrite => self.create_test_configuration(), + TestObjective::ApprovalVoting(_) => todo!(), + TestObjective::Unimplemented => todo!(), }; let mut latency_config = test_config.latency.clone().unwrap_or_default(); @@ -232,6 +242,8 @@ impl BenchCli { .block_on(availability::benchmark_availability_write(&mut env, state)); }, TestObjective::TestSequence(_options) => {}, + TestObjective::ApprovalVoting(_) => todo!(), + TestObjective::Unimplemented => todo!(), } if let Some(agent_running) = agent_running {