diff --git a/Cargo.lock b/Cargo.lock index 3091d59df82c7..4aac0f5581bb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13296,6 +13296,55 @@ dependencies = [ "tracing-gum", ] +[[package]] +name = "polkadot-node-core-approval-voting-parallel" +version = "7.0.0" +dependencies = [ + "assert_matches", + "async-trait", + "bitvec", + "derive_more", + "env_logger 0.11.3", + "futures", + "futures-timer", + "itertools 0.11.0", + "kvdb", + "kvdb-memorydb", + "log", + "merlin", + "parity-scale-codec", + "parking_lot 0.12.3", + "polkadot-approval-distribution", + "polkadot-node-core-approval-voting", + "polkadot-node-jaeger", + "polkadot-node-metrics", + "polkadot-node-network-protocol", + "polkadot-node-primitives", + "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", + "polkadot-node-subsystem-util", + "polkadot-overseer", + "polkadot-primitives", + "polkadot-primitives-test-helpers", + "polkadot-subsystem-bench", + "rand", + "rand_chacha", + "rand_core", + "sc-keystore", + "schnellru", + "schnorrkel 0.11.4", + "sp-application-crypto", + "sp-consensus", + "sp-consensus-babe", + "sp-consensus-slots", + "sp-core", + "sp-keyring", + "sp-keystore", + "sp-runtime", + "thiserror", + "tracing-gum", +] + [[package]] name = "polkadot-node-core-av-store" version = "7.0.0" @@ -14759,6 +14808,7 @@ dependencies = [ "polkadot-network-bridge", "polkadot-node-collation-generation", "polkadot-node-core-approval-voting", + "polkadot-node-core-approval-voting-parallel", "polkadot-node-core-av-store", "polkadot-node-core-backing", "polkadot-node-core-bitfield-signing", diff --git a/Cargo.toml b/Cargo.toml index db9a2bd722735..9baeda27ea921 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,6 +152,7 @@ members = [ "polkadot/erasure-coding/fuzzer", "polkadot/node/collation-generation", "polkadot/node/core/approval-voting", + "polkadot/node/core/approval-voting-parallel", "polkadot/node/core/av-store", "polkadot/node/core/backing", "polkadot/node/core/bitfield-signing", @@ -1009,6 +1010,7 @@ polkadot-gossip-support = { path = "polkadot/node/network/gossip-support", defau polkadot-network-bridge = { path = "polkadot/node/network/bridge", default-features = false } polkadot-node-collation-generation = { path = "polkadot/node/collation-generation", default-features = false } polkadot-node-core-approval-voting = { path = "polkadot/node/core/approval-voting", default-features = false } +polkadot-node-core-approval-voting-parallel = { path = "polkadot/node/core/approval-voting-parallel", default-features = false } polkadot-node-core-av-store = { path = "polkadot/node/core/av-store", default-features = false } polkadot-node-core-backing = { path = "polkadot/node/core/backing", default-features = false } polkadot-node-core-bitfield-signing = { path = "polkadot/node/core/bitfield-signing", default-features = false } diff --git a/polkadot/node/core/approval-voting-parallel/Cargo.toml b/polkadot/node/core/approval-voting-parallel/Cargo.toml new file mode 100644 index 0000000000000..df3589cd320f2 --- /dev/null +++ b/polkadot/node/core/approval-voting-parallel/Cargo.toml @@ -0,0 +1,66 @@ +[package] +name = "polkadot-node-core-approval-voting-parallel" +version = "7.0.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +description = "Approval Voting Subsystem running approval work in parallel" + +[lints] +workspace = true + +[dependencies] +futures = "0.3.30" +futures-timer = "3.0.2" +codec = { package = "parity-scale-codec", version = "3.6.12", default-features = false, features = ["bit-vec", "derive"] } +gum = { package = "tracing-gum", path = "../../gum" } +bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] } +schnellru = "0.2.1" +merlin = "3.0" +schnorrkel = "0.11.4" +kvdb = "0.13.0" +derive_more = "0.99.17" +thiserror = { workspace = true } +itertools = "0.11" + +polkadot-node-core-approval-voting = { workspace = true, default-features = true } +polkadot-approval-distribution = { workspace = true, default-features = true } + + +polkadot-node-subsystem = { workspace = true, default-features = true } +polkadot-node-subsystem-util = { workspace = true, default-features = true } +polkadot-overseer = { workspace = true, default-features = true } +polkadot-primitives = { workspace = true, default-features = true } +polkadot-node-primitives = { workspace = true, default-features = true } +polkadot-node-jaeger = { workspace = true, default-features = true } + +sc-keystore = { workspace = true, default-features = false } +sp-consensus = { workspace = true, default-features = false } +sp-consensus-slots = { workspace = true, default-features = false } +sp-application-crypto = { workspace = true, default-features = false, features = ["full_crypto"] } +sp-runtime = { workspace = true, default-features = false } +polkadot-node-network-protocol = { workspace = true, default-features = true } +polkadot-node-metrics = { workspace = true, default-features = true} + +rand = "0.8.5" + +# rand_core should match schnorrkel +rand_core = "0.6.2" +rand_chacha = { version = "0.3.1" } + +[dev-dependencies] +async-trait = "0.1.79" +parking_lot = "0.12.1" +sp-keyring = { workspace = true, default-features = true } +sp-keystore = {workspace = true, default-features = true} +sp-core = { workspace = true, default-features = true} +sp-consensus-babe = { workspace = true, default-features = true } +polkadot-node-subsystem-test-helpers = { workspace = true, default-features = true} +assert_matches = "1.4.0" +kvdb-memorydb = "0.13.0" +polkadot-primitives-test-helpers = { workspace = true, default-features = true } +log = { workspace = true, default-features = true } +env_logger = "0.11" + +polkadot-subsystem-bench = { workspace = true, default-features = true} + diff --git a/polkadot/node/core/approval-voting-parallel/src/lib.rs b/polkadot/node/core/approval-voting-parallel/src/lib.rs new file mode 100644 index 0000000000000..5d8344223d8ea --- /dev/null +++ b/polkadot/node/core/approval-voting-parallel/src/lib.rs @@ -0,0 +1,488 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The Approval Voting Parallel Subsystem. +//! +//! This subsystem is responsible for orchestrating the work done by +//! approval-voting and approval-distribution subsystem, so they can +//! do their work in parallel, rather than serially, when they are run +//! as independent subsystems. +use polkadot_node_core_approval_voting::{ + time::{Clock, SystemClock}, + Config, +}; +use polkadot_node_metrics::metered; + +use polkadot_node_subsystem::{ + messages::{ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage}, + overseer, FromOrchestra, SpawnedSubsystem, SubsystemError, SubsystemResult, +}; + +use polkadot_node_subsystem_util::{ + self, + database::Database, + metrics::{self, prometheus}, +}; +use polkadot_overseer::SubsystemSender; +use polkadot_primitives::ValidatorIndex; +use rand::SeedableRng; + +use sc_keystore::LocalKeystore; +use sp_consensus::SyncOracle; + +use futures::{channel::oneshot, prelude::*, StreamExt}; +use polkadot_node_core_approval_voting::approval_db::common::Config as DatabaseConfig; +use std::{collections::HashMap, sync::Arc}; + +pub(crate) const LOG_TARGET: &str = "parachain::approval-voting-parallel"; + +/// The approval voting subsystem. +pub struct ApprovalVotingParallelSubsystem { + /// `LocalKeystore` is needed for assignment keys, but not necessarily approval keys. + /// + /// We do a lot of VRF signing and need the keys to have low latency. + keystore: Arc, + db_config: DatabaseConfig, + slot_duration_millis: u64, + db: Arc, + mode: polkadot_node_core_approval_voting::Mode, + metrics: Metrics, + spawner: Arc, + clock: Arc, +} + +/// Approval Voting metrics. +#[derive(Default, Clone)] +pub struct Metrics( + pub polkadot_approval_distribution::metrics::Metrics, + pub polkadot_node_core_approval_voting::Metrics, +); + +impl metrics::Metrics for Metrics { + fn try_register( + registry: &prometheus::Registry, + ) -> std::result::Result { + Ok(Metrics( + polkadot_approval_distribution::metrics::Metrics::try_register(registry)?, + polkadot_node_core_approval_voting::Metrics::try_register(registry)?, + )) + } +} + +impl ApprovalVotingParallelSubsystem { + /// Create a new approval voting subsystem with the given keystore, config, and database. + pub fn with_config( + config: Config, + db: Arc, + keystore: Arc, + sync_oracle: Box, + metrics: Metrics, + spawner: impl overseer::gen::Spawner + 'static + Clone, + ) -> Self { + ApprovalVotingParallelSubsystem::with_config_and_clock( + config, + db, + keystore, + sync_oracle, + metrics, + Arc::new(SystemClock {}), + spawner, + ) + } + + /// Create a new approval voting subsystem with the given keystore, config, and database. + pub fn with_config_and_clock( + config: Config, + db: Arc, + keystore: Arc, + sync_oracle: Box, + metrics: Metrics, + clock: Arc, + spawner: impl overseer::gen::Spawner + 'static, + ) -> Self { + ApprovalVotingParallelSubsystem { + keystore, + slot_duration_millis: config.slot_duration_millis, + db, + db_config: DatabaseConfig { col_approval_data: config.col_approval_data }, + mode: polkadot_node_core_approval_voting::Mode::Syncing(sync_oracle), + metrics, + spawner: Arc::new(spawner), + clock, + } + } +} + +#[overseer::subsystem(ApprovalVotingRewrite, error = SubsystemError, prefix = self::overseer)] +impl ApprovalVotingParallelSubsystem { + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = run::(ctx, self) + .map_err(|e| SubsystemError::with_origin("approval-voting-parallel", e)) + .boxed(); + + SpawnedSubsystem { name: "approval-voting-parallel-subsystem", future } + } +} + +/// The number of workers used for running the approval-distribution logic. +pub const APPROVAL_DISTRIBUTION_WORKER_COUNT: usize = 8; + +/// The channel size for the workers. +pub const WORKERS_CHANNEL_SIZE: usize = 64000 / APPROVAL_DISTRIBUTION_WORKER_COUNT; + +#[overseer::contextbounds(ApprovalVotingRewrite, prefix = self::overseer)] +async fn run( + mut ctx: Context, + subsystem: ApprovalVotingParallelSubsystem, +) -> SubsystemResult<()> +where +{ + let mut approval_distribution_channels = Vec::new(); + let (mut tx_approval_voting_work, rx_approval_voting_work) = futures::channel::mpsc::channel::< + FromOrchestra, + >(WORKERS_CHANNEL_SIZE); + + let slot_duration_millis = subsystem.slot_duration_millis; + + gum::info!(target: LOG_TARGET, "Starting approval distribution workers"); + + for i in 0..APPROVAL_DISTRIBUTION_WORKER_COUNT { + let approval_distro_orig = polkadot_approval_distribution::ApprovalDistribution::new( + subsystem.metrics.0.clone(), + subsystem.slot_duration_millis, + ); + + let (tx_approval_distribution_work, mut rx_approval_distribution_work) = + futures::channel::mpsc::channel::>( + WORKERS_CHANNEL_SIZE, + ); + + let task_name = format!("approval-voting-parallel-{}", i); + let mut approval_distribution_to_approval_voting = + ApprovalDistributionToApprovalWorker(tx_approval_voting_work.clone()); + let mut network_sender = ctx.sender().clone(); + let clock = subsystem.clock.clone(); + + subsystem.spawner.spawn_blocking( + task_name.leak(), + Some("approval-voting-parallel-subsystem"), + Box::pin(async move { + let mut state = + polkadot_approval_distribution::State::with_config(slot_duration_millis, clock); + let mut rng = rand::rngs::StdRng::from_entropy(); + + loop { + let message = rx_approval_distribution_work.next().await.unwrap(); + approval_distro_orig + .handle_from_orchestra( + message, + &mut approval_distribution_to_approval_voting, + &mut network_sender, + &mut state, + &mut rng, + ) + .await; + } + }), + ); + approval_distribution_channels.push(tx_approval_distribution_work); + } + gum::info!(target: LOG_TARGET, "Starting approval voting workers"); + + let sender = ctx.sender().clone(); + + let approval_voting_to_subsystem = ApprovalVotingToApprovalDistribution(sender.clone()); + + polkadot_node_core_approval_voting::start_approval_worker( + rx_approval_voting_work, + sender.clone(), + approval_voting_to_subsystem, + polkadot_node_core_approval_voting::Config { + slot_duration_millis: subsystem.slot_duration_millis, + col_approval_data: subsystem.db_config.col_approval_data, + }, + subsystem.db.clone(), + subsystem.keystore.clone(), + subsystem.mode, + subsystem.metrics.1.clone(), + subsystem.spawner.clone(), + subsystem.clock.clone(), + ) + .await + .unwrap(); + + gum::info!(target: LOG_TARGET, "Starting main subsystem loop"); + + // Main loop of the subsystem, it shouldn't include any logic just dispatching of messages to + // the workers. + loop { + futures::select! { + next_msg = ctx.recv().fuse() => { + match next_msg.unwrap() { + FromOrchestra::Signal(msg) => { + for worker in approval_distribution_channels.iter_mut() { + worker + .send(FromOrchestra::Signal(msg.clone())).await?; + } + + tx_approval_voting_work.send(FromOrchestra::Signal(msg)).await?; + }, + FromOrchestra::Communication { msg } => match msg { + // The message the approval voting subsystem would've handled. + ApprovalVotingParallelMessage::CheckAndImportAssignment(_,_, _) | + ApprovalVotingParallelMessage::CheckAndImportApproval(_)| + ApprovalVotingParallelMessage::ApprovedAncestor(_, _,_) | + ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(_, _) => { + // Safe to unwrap because we know the message is the right type. + tx_approval_voting_work.send(FromOrchestra::Communication{msg: msg.try_into().unwrap()}).await?; + }, + // Not the message the approval distribution subsystem would've handled. + ApprovalVotingParallelMessage::NewBlocks(msg) => { + for worker in approval_distribution_channels.iter_mut() { + worker + .send(FromOrchestra::Communication { + msg: ApprovalDistributionMessage::NewBlocks(msg.clone()), + }) + .await?; + } + }, + ApprovalVotingParallelMessage::DistributeAssignment(assignment, claimed) => { + let worker_index = assignment.validator.0 as usize % approval_distribution_channels.len(); + let worker = approval_distribution_channels.get_mut(worker_index).unwrap(); + worker + .send(FromOrchestra::Communication { + msg: ApprovalDistributionMessage::DistributeAssignment(assignment, claimed), + }) + .await?; + + }, + ApprovalVotingParallelMessage::DistributeApproval(vote) => { + let worker_index = vote.validator.0 as usize % approval_distribution_channels.len(); + let worker = approval_distribution_channels.get_mut(worker_index).unwrap(); + worker + .send(FromOrchestra::Communication { + msg: ApprovalDistributionMessage::DistributeApproval(vote), + }).await?; + + }, + ApprovalVotingParallelMessage::NetworkBridgeUpdate(msg) => { + if let polkadot_node_subsystem::messages::NetworkBridgeEvent::PeerMessage( + peer_id, + msg, + ) = msg + { + let validator_index = validator_index_for_msg(&msg); + let worker_index = validator_index.0 as usize % approval_distribution_channels.len(); + let worker = approval_distribution_channels.get_mut(worker_index).unwrap(); + + worker + .send(FromOrchestra::Communication { + msg: ApprovalDistributionMessage::NetworkBridgeUpdate( + polkadot_node_subsystem::messages::NetworkBridgeEvent::PeerMessage( + peer_id, msg, + ), + ), + }) + .await?; + } else { + for worker in approval_distribution_channels.iter_mut() { + worker + .send(FromOrchestra::Communication { + msg: ApprovalDistributionMessage::NetworkBridgeUpdate(msg.clone()), + }).await?; + } + } + }, + ApprovalVotingParallelMessage::GetApprovalSignatures(indices, tx) => { + let mut sigs = HashMap::new(); + let mut signatures_channels = Vec::new(); + for worker in approval_distribution_channels.iter_mut() { + let (tx, rx) = oneshot::channel(); + worker + .send(FromOrchestra::Communication { + msg: ApprovalDistributionMessage::GetApprovalSignatures(indices.clone(), tx), + }).await?; + signatures_channels.push(rx); + } + let results = futures::future::join_all(signatures_channels).await; + + for result in results { + let worker_sigs = result.unwrap(); + sigs.extend(worker_sigs); + } + + if let Err(_) = tx.send(sigs) { + gum::debug!( + target: LOG_TARGET, + "Sending back approval signatures failed, oneshot got closed" + ); + } + }, + ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag) => { + for worker in approval_distribution_channels.iter_mut() { + worker + .send(FromOrchestra::Communication { + msg: ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag), + }).await?; + } + }, + }, + }; + + }, + }; + } +} + +// Returns the validators that initially created this Assignment or Vote. +fn validator_index_for_msg( + msg: &polkadot_node_network_protocol::ApprovalDistributionMessage, +) -> ValidatorIndex { + match msg { + polkadot_node_network_protocol::Versioned::V1(ref msg) => match msg { + polkadot_node_network_protocol::v1::ApprovalDistributionMessage::Assignments(msgs) => + msgs.first().unwrap().0.validator, + polkadot_node_network_protocol::v1::ApprovalDistributionMessage::Approvals(msgs) => + msgs.first().unwrap().validator, + }, + polkadot_node_network_protocol::Versioned::V2(ref msg) => match msg { + polkadot_node_network_protocol::v2::ApprovalDistributionMessage::Assignments(msgs) => + msgs.first().unwrap().0.validator, + polkadot_node_network_protocol::v2::ApprovalDistributionMessage::Approvals(msgs) => + msgs.first().unwrap().validator, + }, + polkadot_node_network_protocol::Versioned::V3(ref msg) => match msg { + polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Assignments(msgs) => + msgs.first().unwrap().0.validator, + polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Approvals(msgs) => + msgs.first().unwrap().validator, + }, + } +} + +/// Just a wrapper for implementing overseer::SubsystemSender, so that +/// we can inject into the approval-distribution subsystem. +#[derive(Clone)] +pub struct ApprovalDistributionToApprovalWorker( + futures::channel::mpsc::Sender>, +); + +impl overseer::SubsystemSender for ApprovalDistributionToApprovalWorker { + fn send_message<'life0, 'async_trait>( + &'life0 mut self, + msg: ApprovalVotingMessage, + ) -> ::core::pin::Pin< + Box + ::core::marker::Send + 'async_trait>, + > + where + 'life0: 'async_trait, + Self: 'async_trait, + { + async { + self.0 + .send(polkadot_overseer::FromOrchestra::Communication { msg }) + .await + .unwrap() + } + .boxed() + } + + fn try_send_message( + &mut self, + _msg: ApprovalVotingMessage, + ) -> Result<(), metered::TrySendError> { + todo!("Unused for now") + } + + fn send_messages<'life0, 'async_trait, I>( + &'life0 mut self, + _msgs: I, + ) -> ::core::pin::Pin< + Box + ::core::marker::Send + 'async_trait>, + > + where + I: IntoIterator + Send, + I::IntoIter: Send, + I: 'async_trait, + 'life0: 'async_trait, + Self: 'async_trait, + { + todo!("Unused for now") + } + + fn send_unbounded_message(&mut self, _msg: ApprovalVotingMessage) { + todo!("Unused for now") + } +} + +/// Just a wrapper for implementing overseer::SubsystemSender, so that +/// we can inject into the approval voting subsystem. +#[derive(Clone)] +pub struct ApprovalVotingToApprovalDistribution>( + S, +); + +impl> + overseer::SubsystemSender for ApprovalVotingToApprovalDistribution +{ + #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)] + fn send_message<'life0, 'async_trait>( + &'life0 mut self, + msg: ApprovalDistributionMessage, + ) -> ::core::pin::Pin< + Box + ::core::marker::Send + 'async_trait>, + > + where + 'life0: 'async_trait, + Self: 'async_trait, + { + self.0.send_message(msg.into()) + } + + fn try_send_message( + &mut self, + msg: ApprovalDistributionMessage, + ) -> Result<(), metered::TrySendError> { + self.0.try_send_message(msg.into()).map_err(|err| match err { + // Safe to unwrap because it was built from the same type. + metered::TrySendError::Closed(msg) => + metered::TrySendError::Closed(msg.try_into().unwrap()), + metered::TrySendError::Full(msg) => + metered::TrySendError::Full(msg.try_into().unwrap()), + }) + } + + #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)] + fn send_messages<'life0, 'async_trait, I>( + &'life0 mut self, + msgs: I, + ) -> ::core::pin::Pin< + Box + ::core::marker::Send + 'async_trait>, + > + where + I: IntoIterator + Send, + I::IntoIter: Send, + I: 'async_trait, + 'life0: 'async_trait, + Self: 'async_trait, + { + self.0.send_messages(msgs.into_iter().map(|msg| msg.into())) + } + + fn send_unbounded_message(&mut self, msg: ApprovalDistributionMessage) { + self.0.send_unbounded_message(msg.into()) + } +} diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 509c2a4412cc9..225ecb4fea874 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -1558,8 +1558,8 @@ pub async fn start_approval_worker< let backend = DbBackend::new(db.clone(), approval_voting.db_config); let spawner = approval_voting.spawner.clone(); spawner.spawn_blocking( - "approval-voting-rewrite-db", - Some("approval-voting-rewrite-subsystem"), + "approval-voting-parallel-db", + Some("approval-voting-parallel-subsystem"), Box::pin(async move { run_approval_on_worker_thread( approval_work, diff --git a/polkadot/node/core/dispute-coordinator/src/initialized.rs b/polkadot/node/core/dispute-coordinator/src/initialized.rs index 5f86da87f21ca..ca459e1477630 100644 --- a/polkadot/node/core/dispute-coordinator/src/initialized.rs +++ b/polkadot/node/core/dispute-coordinator/src/initialized.rs @@ -34,7 +34,8 @@ use polkadot_node_primitives::{ }; use polkadot_node_subsystem::{ messages::{ - ApprovalVotingMessage, BlockDescription, ChainSelectionMessage, DisputeCoordinatorMessage, + approval_voting_parallel_enabled, ApprovalVotingMessage, ApprovalVotingParallelMessage, + BlockDescription, ChainSelectionMessage, DisputeCoordinatorMessage, DisputeDistributionMessage, ImportStatementsResult, }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, @@ -1059,9 +1060,21 @@ impl Initialized { // 4. We are waiting (and blocking the whole subsystem) on a response right after - // therefore even with all else failing we will never have more than // one message in flight at any given time. - ctx.send_unbounded_message( - ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx), - ); + if approval_voting_parallel_enabled() { + ctx.send_unbounded_message( + ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate( + candidate_hash, + tx, + ), + ); + } else { + ctx.send_unbounded_message( + ApprovalVotingMessage::GetApprovalSignaturesForCandidate( + candidate_hash, + tx, + ), + ); + } match rx.await { Err(_) => { gum::warn!( diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index 4d9f8e31c2368..3a9afda86bc7e 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -74,7 +74,8 @@ use std::{ time::Duration, }; -mod metrics; +/// Approval distribution metrics. +pub mod metrics; #[cfg(test)] mod tests; diff --git a/polkadot/node/network/bridge/src/rx/mod.rs b/polkadot/node/network/bridge/src/rx/mod.rs index 84e935366d0cb..93d52ac1d0419 100644 --- a/polkadot/node/network/bridge/src/rx/mod.rs +++ b/polkadot/node/network/bridge/src/rx/mod.rs @@ -44,9 +44,10 @@ use polkadot_node_network_protocol::{ use polkadot_node_subsystem::{ errors::SubsystemError, messages::{ - network_bridge_event::NewGossipTopology, ApprovalDistributionMessage, - BitfieldDistributionMessage, CollatorProtocolMessage, GossipSupportMessage, - NetworkBridgeEvent, NetworkBridgeRxMessage, StatementDistributionMessage, + approval_voting_parallel_enabled, network_bridge_event::NewGossipTopology, + ApprovalDistributionMessage, ApprovalVotingParallelMessage, BitfieldDistributionMessage, + CollatorProtocolMessage, GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeRxMessage, + StatementDistributionMessage, }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, }; @@ -1140,7 +1141,13 @@ async fn dispatch_validation_events_to_all( .send_messages(event.focus().map(StatementDistributionMessage::from)) .await; sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await; - sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await; + if approval_voting_parallel_enabled() { + sender + .send_messages(event.focus().map(ApprovalVotingParallelMessage::from)) + .await; + } else { + sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await; + } sender.send_messages(event.focus().map(GossipSupportMessage::from)).await; } } diff --git a/polkadot/node/overseer/src/dummy.rs b/polkadot/node/overseer/src/dummy.rs index fc5f0070773b7..6f9cd9d004032 100644 --- a/polkadot/node/overseer/src/dummy.rs +++ b/polkadot/node/overseer/src/dummy.rs @@ -88,6 +88,7 @@ pub fn dummy_overseer_builder( DummySubsystem, DummySubsystem, DummySubsystem, + DummySubsystem, >, SubsystemError, > @@ -131,6 +132,7 @@ pub fn one_for_all_overseer_builder( Sub, Sub, Sub, + Sub, >, SubsystemError, > @@ -155,6 +157,7 @@ where + Subsystem, SubsystemError> + Subsystem, SubsystemError> + Subsystem, SubsystemError> + + Subsystem, SubsystemError> + Subsystem, SubsystemError> + Subsystem, SubsystemError> + Subsystem, SubsystemError> @@ -183,6 +186,7 @@ where .statement_distribution(subsystem.clone()) .approval_distribution(subsystem.clone()) .approval_voting(subsystem.clone()) + .approval_voting_parallel(subsystem.clone()) .gossip_support(subsystem.clone()) .dispute_coordinator(subsystem.clone()) .dispute_distribution(subsystem.clone()) diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 2c113f81c85fc..a2b985042b73e 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -76,13 +76,13 @@ use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotificat use self::messages::{BitfieldSigningMessage, PvfCheckerMessage}; use polkadot_node_subsystem_types::messages::{ - ApprovalDistributionMessage, ApprovalVotingMessage, AvailabilityDistributionMessage, - AvailabilityRecoveryMessage, AvailabilityStoreMessage, BitfieldDistributionMessage, - CandidateBackingMessage, CandidateValidationMessage, ChainApiMessage, ChainSelectionMessage, - CollationGenerationMessage, CollatorProtocolMessage, DisputeCoordinatorMessage, - DisputeDistributionMessage, GossipSupportMessage, NetworkBridgeRxMessage, - NetworkBridgeTxMessage, ProspectiveParachainsMessage, ProvisionerMessage, RuntimeApiMessage, - StatementDistributionMessage, + ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage, + AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage, + BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage, + ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage, + DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage, + NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage, + ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, }; pub use polkadot_node_subsystem_types::{ @@ -549,6 +549,7 @@ pub struct Overseer { BitfieldDistributionMessage, StatementDistributionMessage, ApprovalDistributionMessage, + ApprovalVotingParallelMessage, GossipSupportMessage, DisputeDistributionMessage, CollationGenerationMessage, @@ -594,7 +595,19 @@ pub struct Overseer { RuntimeApiMessage, ])] approval_voting: ApprovalVoting, - + #[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [ + AvailabilityRecoveryMessage, + CandidateValidationMessage, + ChainApiMessage, + ChainSelectionMessage, + DisputeCoordinatorMessage, + RuntimeApiMessage, + NetworkBridgeTxMessage, + ApprovalVotingMessage, + ApprovalDistributionMessage, + ApprovalVotingParallelMessage, + ])] + approval_voting_parallel: ApprovalVotingRewrite, #[subsystem(GossipSupportMessage, sends: [ NetworkBridgeTxMessage, NetworkBridgeRxMessage, // TODO @@ -612,6 +625,7 @@ pub struct Overseer { AvailabilityStoreMessage, AvailabilityRecoveryMessage, ChainSelectionMessage, + ApprovalVotingParallelMessage, ])] dispute_coordinator: DisputeCoordinator, diff --git a/polkadot/node/primitives/src/approval.rs b/polkadot/node/primitives/src/approval.rs index ec41647fc3d14..d1874f7603649 100644 --- a/polkadot/node/primitives/src/approval.rs +++ b/polkadot/node/primitives/src/approval.rs @@ -18,7 +18,7 @@ /// A list of primitives introduced in v1. pub mod v1 { - use sp_consensus_babe as babe_primitives; + use sp_consensus_babe::{self as babe_primitives, SlotDuration}; pub use sp_consensus_babe::{ Randomness, Slot, VrfPreOutput, VrfProof, VrfSignature, VrfTranscript, }; @@ -118,7 +118,7 @@ pub mod v1 { } /// Metadata about a block which is now live in the approval protocol. - #[derive(Debug)] + #[derive(Debug, Clone)] pub struct BlockApprovalMeta { /// The hash of the block. pub hash: Hash, diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index 23cd51d8a04c3..bceabd697fc11 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -128,6 +128,7 @@ polkadot-gossip-support = { optional = true, workspace = true, default-features polkadot-network-bridge = { optional = true, workspace = true, default-features = true } polkadot-node-collation-generation = { optional = true, workspace = true, default-features = true } polkadot-node-core-approval-voting = { optional = true, workspace = true, default-features = true } +polkadot-node-core-approval-voting-parallel = { optional = true, workspace = true, default-features = true } polkadot-node-core-av-store = { optional = true, workspace = true, default-features = true } polkadot-node-core-backing = { optional = true, workspace = true, default-features = true } polkadot-node-core-bitfield-signing = { optional = true, workspace = true, default-features = true } @@ -172,6 +173,7 @@ full-node = [ "polkadot-network-bridge", "polkadot-node-collation-generation", "polkadot-node-core-approval-voting", + "polkadot-node-core-approval-voting-parallel", "polkadot-node-core-av-store", "polkadot-node-core-backing", "polkadot-node-core-bitfield-signing", diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index 1a14b3f85cb19..bd9c4b433cdbf 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -58,6 +58,7 @@ pub use polkadot_network_bridge::{ }; pub use polkadot_node_collation_generation::CollationGenerationSubsystem; pub use polkadot_node_core_approval_voting::ApprovalVotingSubsystem; +pub use polkadot_node_core_approval_voting_parallel::ApprovalVotingParallelSubsystem as ApprovalVotingRewriteSubsystem; pub use polkadot_node_core_av_store::AvailabilityStoreSubsystem; pub use polkadot_node_core_backing::CandidateBackingSubsystem; pub use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem; @@ -203,6 +204,7 @@ pub fn validator_overseer_builder( CollatorProtocolSubsystem, ApprovalDistributionSubsystem, ApprovalVotingSubsystem, + ApprovalVotingRewriteSubsystem, GossipSupportSubsystem, DisputeCoordinatorSubsystem, DisputeDistributionSubsystem, @@ -313,13 +315,21 @@ where approval_voting_config.slot_duration_millis, )) .approval_voting(ApprovalVotingSubsystem::with_config( - approval_voting_config, + approval_voting_config.clone(), parachains_db.clone(), keystore.clone(), Box::new(sync_service.clone()), Metrics::register(registry)?, Arc::new(spawner.clone()), )) + .approval_voting_parallel(ApprovalVotingRewriteSubsystem::with_config( + approval_voting_config, + parachains_db.clone(), + keystore.clone(), + Box::new(sync_service.clone()), + Metrics::register(registry)?, + spawner.clone(), + )) .gossip_support(GossipSupportSubsystem::new( keystore.clone(), authority_discovery_service.clone(), @@ -405,6 +415,7 @@ pub fn collator_overseer_builder( DummySubsystem, DummySubsystem, DummySubsystem, + DummySubsystem, >, Error, > @@ -479,6 +490,7 @@ where .statement_distribution(DummySubsystem) .approval_distribution(DummySubsystem) .approval_voting(DummySubsystem) + .approval_voting_parallel(DummySubsystem) .gossip_support(DummySubsystem) .dispute_coordinator(DummySubsystem) .dispute_distribution(DummySubsystem) diff --git a/polkadot/node/service/src/relay_chain_selection.rs b/polkadot/node/service/src/relay_chain_selection.rs index c0b1ce8b0ebe1..10cbd67e56187 100644 --- a/polkadot/node/service/src/relay_chain_selection.rs +++ b/polkadot/node/service/src/relay_chain_selection.rs @@ -39,8 +39,9 @@ use super::{HeaderProvider, HeaderProviderProvider}; use futures::channel::oneshot; use polkadot_node_primitives::MAX_FINALITY_LAG as PRIMITIVES_MAX_FINALITY_LAG; use polkadot_node_subsystem::messages::{ - ApprovalDistributionMessage, ApprovalVotingMessage, ChainSelectionMessage, - DisputeCoordinatorMessage, HighestApprovedAncestorBlock, + approval_voting_parallel_enabled, ApprovalDistributionMessage, ApprovalVotingMessage, + ApprovalVotingParallelMessage, ChainSelectionMessage, DisputeCoordinatorMessage, + HighestApprovedAncestorBlock, }; use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_overseer::{AllMessages, Handle}; @@ -477,12 +478,21 @@ where if let Some(spawn_handle) = &self.spawn_handle { let mut overseer_handle = self.overseer.clone(); let lag_update_task = async move { - overseer_handle - .send_msg( - ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag), - std::any::type_name::(), - ) - .await; + if approval_voting_parallel_enabled() { + overseer_handle + .send_msg( + ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag), + std::any::type_name::(), + ) + .await; + } else { + overseer_handle + .send_msg( + ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag), + std::any::type_name::(), + ) + .await; + } }; spawn_handle.spawn( diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 4233c35c62cdc..b98c8b3aa4c1b 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -945,6 +945,123 @@ pub struct BlockDescription { pub candidates: Vec, } +/// Message to the Approval Voting subsystem running both approval-distribution and approval-voting +/// logic in parallel. This is a combination of all the messages ApprovalVoting and +/// ApprovalDistribution subsystems can receive. +#[derive(Debug, derive_more::From)] +pub enum ApprovalVotingParallelMessage { + /// Check if the assignment is valid and can be accepted by our view of the protocol. + /// Should not be sent unless the block hash is known. + CheckAndImportAssignment(IndirectAssignmentCertV2, CandidateBitfield, DelayTranche), + /// Check if the approval vote is valid and can be accepted by our view of the + /// protocol. + /// + /// Should not be sent unless the block hash within the indirect vote is known. + CheckAndImportApproval(IndirectSignedApprovalVoteV2), + /// Returns the highest possible ancestor hash of the provided block hash which is + /// acceptable to vote on finality for. + /// The `BlockNumber` provided is the number of the block's ancestor which is the + /// earliest possible vote. + /// + /// It can also return the same block hash, if that is acceptable to vote upon. + /// Return `None` if the input hash is unrecognized. + ApprovedAncestor(Hash, BlockNumber, oneshot::Sender>), + + /// Retrieve all available approval signatures for a candidate from approval-voting. + /// + /// This message involves a linear search for candidates on each relay chain fork and also + /// requires calling into `approval-distribution`: Calls should be infrequent and bounded. + GetApprovalSignaturesForCandidate( + CandidateHash, + oneshot::Sender, ValidatorSignature)>>, + ), + /// Notify the `ApprovalDistribution` subsystem about new blocks + /// and the candidates contained within them. + NewBlocks(Vec), + /// Distribute an assignment cert from the local validator. The cert is assumed + /// to be valid, relevant, and for the given relay-parent and validator index. + DistributeAssignment(IndirectAssignmentCertV2, CandidateBitfield), + /// Distribute an approval vote for the local validator. The approval vote is assumed to be + /// valid, relevant, and the corresponding approval already issued. + /// If not, the subsystem is free to drop the message. + DistributeApproval(IndirectSignedApprovalVoteV2), + /// An update from the network bridge. + #[from] + NetworkBridgeUpdate(NetworkBridgeEvent), + + /// Get all approval signatures for all chains a candidate appeared in. + GetApprovalSignatures( + HashSet<(Hash, CandidateIndex)>, + oneshot::Sender, ValidatorSignature)>>, + ), + /// Approval checking lag update measured in blocks. + ApprovalCheckingLagUpdate(BlockNumber), +} + +impl TryFrom for ApprovalVotingMessage { + type Error = (); + + fn try_from(msg: ApprovalVotingParallelMessage) -> Result { + match msg { + ApprovalVotingParallelMessage::CheckAndImportAssignment(cert, bitfield, tranche) => + Ok(ApprovalVotingMessage::CheckAndImportAssignment(cert, bitfield, tranche)), + ApprovalVotingParallelMessage::CheckAndImportApproval(vote) => + Ok(ApprovalVotingMessage::CheckAndImportApproval(vote)), + ApprovalVotingParallelMessage::ApprovedAncestor(hash, number, tx) => + Ok(ApprovalVotingMessage::ApprovedAncestor(hash, number, tx)), + ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(candidate, tx) => + Ok(ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate, tx)), + _ => Err(()), + } + } +} + +/// TODO: Add logic to enable it dynamically at node startup. +pub fn approval_voting_parallel_enabled() -> bool { + false +} + +impl TryFrom for ApprovalDistributionMessage { + type Error = (); + + fn try_from(msg: ApprovalVotingParallelMessage) -> Result { + match msg { + ApprovalVotingParallelMessage::NewBlocks(blocks) => + Ok(ApprovalDistributionMessage::NewBlocks(blocks)), + ApprovalVotingParallelMessage::DistributeAssignment(assignment, claimed_cores) => + Ok(ApprovalDistributionMessage::DistributeAssignment(assignment, claimed_cores)), + ApprovalVotingParallelMessage::DistributeApproval(vote) => + Ok(ApprovalDistributionMessage::DistributeApproval(vote)), + ApprovalVotingParallelMessage::NetworkBridgeUpdate(msg) => + Ok(ApprovalDistributionMessage::NetworkBridgeUpdate(msg)), + ApprovalVotingParallelMessage::GetApprovalSignatures(candidate_indicies, tx) => + Ok(ApprovalDistributionMessage::GetApprovalSignatures(candidate_indicies, tx)), + ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag) => + Ok(ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag)), + _ => Err(()), + } + } +} + +impl From for ApprovalVotingParallelMessage { + fn from(msg: ApprovalDistributionMessage) -> Self { + match msg { + ApprovalDistributionMessage::NewBlocks(blocks) => + ApprovalVotingParallelMessage::NewBlocks(blocks), + ApprovalDistributionMessage::DistributeAssignment(cert, bitfield) => + ApprovalVotingParallelMessage::DistributeAssignment(cert, bitfield), + ApprovalDistributionMessage::DistributeApproval(vote) => + ApprovalVotingParallelMessage::DistributeApproval(vote), + ApprovalDistributionMessage::NetworkBridgeUpdate(msg) => + ApprovalVotingParallelMessage::NetworkBridgeUpdate(msg), + ApprovalDistributionMessage::GetApprovalSignatures(candidate_indicies, tx) => + ApprovalVotingParallelMessage::GetApprovalSignatures(candidate_indicies, tx), + ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag) => + ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag), + } + } +} + /// Response type to `ApprovalVotingMessage::ApprovedAncestor`. #[derive(Clone, Debug)] pub struct HighestApprovedAncestorBlock {