diff --git a/Cargo.lock b/Cargo.lock
index a650d93b5d28..cf9529f2c5c5 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -13294,6 +13294,55 @@ dependencies = [
"tracing-gum",
]
+[[package]]
+name = "polkadot-node-core-approval-voting-parallel"
+version = "7.0.0"
+dependencies = [
+ "assert_matches",
+ "async-trait",
+ "bitvec",
+ "derive_more",
+ "env_logger 0.11.3",
+ "futures",
+ "futures-timer",
+ "itertools 0.11.0",
+ "kvdb",
+ "kvdb-memorydb",
+ "log",
+ "merlin",
+ "parity-scale-codec",
+ "parking_lot 0.12.3",
+ "polkadot-approval-distribution",
+ "polkadot-node-core-approval-voting",
+ "polkadot-node-jaeger",
+ "polkadot-node-metrics",
+ "polkadot-node-network-protocol",
+ "polkadot-node-primitives",
+ "polkadot-node-subsystem",
+ "polkadot-node-subsystem-test-helpers",
+ "polkadot-node-subsystem-util",
+ "polkadot-overseer",
+ "polkadot-primitives",
+ "polkadot-primitives-test-helpers",
+ "polkadot-subsystem-bench",
+ "rand",
+ "rand_chacha",
+ "rand_core",
+ "sc-keystore",
+ "schnellru",
+ "schnorrkel 0.11.4",
+ "sp-application-crypto",
+ "sp-consensus",
+ "sp-consensus-babe",
+ "sp-consensus-slots",
+ "sp-core",
+ "sp-keyring",
+ "sp-keystore",
+ "sp-runtime",
+ "thiserror",
+ "tracing-gum",
+]
+
[[package]]
name = "polkadot-node-core-av-store"
version = "7.0.0"
@@ -14757,6 +14806,7 @@ dependencies = [
"polkadot-network-bridge",
"polkadot-node-collation-generation",
"polkadot-node-core-approval-voting",
+ "polkadot-node-core-approval-voting-parallel",
"polkadot-node-core-av-store",
"polkadot-node-core-backing",
"polkadot-node-core-bitfield-signing",
diff --git a/Cargo.toml b/Cargo.toml
index db9a2bd72273..9baeda27ea92 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -152,6 +152,7 @@ members = [
"polkadot/erasure-coding/fuzzer",
"polkadot/node/collation-generation",
"polkadot/node/core/approval-voting",
+ "polkadot/node/core/approval-voting-parallel",
"polkadot/node/core/av-store",
"polkadot/node/core/backing",
"polkadot/node/core/bitfield-signing",
@@ -1009,6 +1010,7 @@ polkadot-gossip-support = { path = "polkadot/node/network/gossip-support", defau
polkadot-network-bridge = { path = "polkadot/node/network/bridge", default-features = false }
polkadot-node-collation-generation = { path = "polkadot/node/collation-generation", default-features = false }
polkadot-node-core-approval-voting = { path = "polkadot/node/core/approval-voting", default-features = false }
+polkadot-node-core-approval-voting-parallel = { path = "polkadot/node/core/approval-voting-parallel", default-features = false }
polkadot-node-core-av-store = { path = "polkadot/node/core/av-store", default-features = false }
polkadot-node-core-backing = { path = "polkadot/node/core/backing", default-features = false }
polkadot-node-core-bitfield-signing = { path = "polkadot/node/core/bitfield-signing", default-features = false }
diff --git a/polkadot/node/core/approval-voting-parallel/Cargo.toml b/polkadot/node/core/approval-voting-parallel/Cargo.toml
new file mode 100644
index 000000000000..df3589cd320f
--- /dev/null
+++ b/polkadot/node/core/approval-voting-parallel/Cargo.toml
@@ -0,0 +1,66 @@
+[package]
+name = "polkadot-node-core-approval-voting-parallel"
+version = "7.0.0"
+authors.workspace = true
+edition.workspace = true
+license.workspace = true
+description = "Approval Voting Subsystem running approval work in parallel"
+
+[lints]
+workspace = true
+
+[dependencies]
+futures = "0.3.30"
+futures-timer = "3.0.2"
+codec = { package = "parity-scale-codec", version = "3.6.12", default-features = false, features = ["bit-vec", "derive"] }
+gum = { package = "tracing-gum", path = "../../gum" }
+bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
+schnellru = "0.2.1"
+merlin = "3.0"
+schnorrkel = "0.11.4"
+kvdb = "0.13.0"
+derive_more = "0.99.17"
+thiserror = { workspace = true }
+itertools = "0.11"
+
+polkadot-node-core-approval-voting = { workspace = true, default-features = true }
+polkadot-approval-distribution = { workspace = true, default-features = true }
+
+
+polkadot-node-subsystem = { workspace = true, default-features = true }
+polkadot-node-subsystem-util = { workspace = true, default-features = true }
+polkadot-overseer = { workspace = true, default-features = true }
+polkadot-primitives = { workspace = true, default-features = true }
+polkadot-node-primitives = { workspace = true, default-features = true }
+polkadot-node-jaeger = { workspace = true, default-features = true }
+
+sc-keystore = { workspace = true, default-features = false }
+sp-consensus = { workspace = true, default-features = false }
+sp-consensus-slots = { workspace = true, default-features = false }
+sp-application-crypto = { workspace = true, default-features = false, features = ["full_crypto"] }
+sp-runtime = { workspace = true, default-features = false }
+polkadot-node-network-protocol = { workspace = true, default-features = true }
+polkadot-node-metrics = { workspace = true, default-features = true}
+
+rand = "0.8.5"
+
+# rand_core should match schnorrkel
+rand_core = "0.6.2"
+rand_chacha = { version = "0.3.1" }
+
+[dev-dependencies]
+async-trait = "0.1.79"
+parking_lot = "0.12.1"
+sp-keyring = { workspace = true, default-features = true }
+sp-keystore = {workspace = true, default-features = true}
+sp-core = { workspace = true, default-features = true}
+sp-consensus-babe = { workspace = true, default-features = true }
+polkadot-node-subsystem-test-helpers = { workspace = true, default-features = true}
+assert_matches = "1.4.0"
+kvdb-memorydb = "0.13.0"
+polkadot-primitives-test-helpers = { workspace = true, default-features = true }
+log = { workspace = true, default-features = true }
+env_logger = "0.11"
+
+polkadot-subsystem-bench = { workspace = true, default-features = true}
+
diff --git a/polkadot/node/core/approval-voting-parallel/src/lib.rs b/polkadot/node/core/approval-voting-parallel/src/lib.rs
new file mode 100644
index 000000000000..5d8344223d8e
--- /dev/null
+++ b/polkadot/node/core/approval-voting-parallel/src/lib.rs
@@ -0,0 +1,488 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! The Approval Voting Parallel Subsystem.
+//!
+//! This subsystem is responsible for orchestrating the work done by
+//! approval-voting and approval-distribution subsystem, so they can
+//! do their work in parallel, rather than serially, when they are run
+//! as independent subsystems.
+use polkadot_node_core_approval_voting::{
+ time::{Clock, SystemClock},
+ Config,
+};
+use polkadot_node_metrics::metered;
+
+use polkadot_node_subsystem::{
+ messages::{ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage},
+ overseer, FromOrchestra, SpawnedSubsystem, SubsystemError, SubsystemResult,
+};
+
+use polkadot_node_subsystem_util::{
+ self,
+ database::Database,
+ metrics::{self, prometheus},
+};
+use polkadot_overseer::SubsystemSender;
+use polkadot_primitives::ValidatorIndex;
+use rand::SeedableRng;
+
+use sc_keystore::LocalKeystore;
+use sp_consensus::SyncOracle;
+
+use futures::{channel::oneshot, prelude::*, StreamExt};
+use polkadot_node_core_approval_voting::approval_db::common::Config as DatabaseConfig;
+use std::{collections::HashMap, sync::Arc};
+
+pub(crate) const LOG_TARGET: &str = "parachain::approval-voting-parallel";
+
+/// The approval voting subsystem.
+pub struct ApprovalVotingParallelSubsystem {
+ /// `LocalKeystore` is needed for assignment keys, but not necessarily approval keys.
+ ///
+ /// We do a lot of VRF signing and need the keys to have low latency.
+ keystore: Arc,
+ db_config: DatabaseConfig,
+ slot_duration_millis: u64,
+ db: Arc,
+ mode: polkadot_node_core_approval_voting::Mode,
+ metrics: Metrics,
+ spawner: Arc,
+ clock: Arc,
+}
+
+/// Approval Voting metrics.
+#[derive(Default, Clone)]
+pub struct Metrics(
+ pub polkadot_approval_distribution::metrics::Metrics,
+ pub polkadot_node_core_approval_voting::Metrics,
+);
+
+impl metrics::Metrics for Metrics {
+ fn try_register(
+ registry: &prometheus::Registry,
+ ) -> std::result::Result {
+ Ok(Metrics(
+ polkadot_approval_distribution::metrics::Metrics::try_register(registry)?,
+ polkadot_node_core_approval_voting::Metrics::try_register(registry)?,
+ ))
+ }
+}
+
+impl ApprovalVotingParallelSubsystem {
+ /// Create a new approval voting subsystem with the given keystore, config, and database.
+ pub fn with_config(
+ config: Config,
+ db: Arc,
+ keystore: Arc,
+ sync_oracle: Box,
+ metrics: Metrics,
+ spawner: impl overseer::gen::Spawner + 'static + Clone,
+ ) -> Self {
+ ApprovalVotingParallelSubsystem::with_config_and_clock(
+ config,
+ db,
+ keystore,
+ sync_oracle,
+ metrics,
+ Arc::new(SystemClock {}),
+ spawner,
+ )
+ }
+
+ /// Create a new approval voting subsystem with the given keystore, config, and database.
+ pub fn with_config_and_clock(
+ config: Config,
+ db: Arc,
+ keystore: Arc,
+ sync_oracle: Box,
+ metrics: Metrics,
+ clock: Arc,
+ spawner: impl overseer::gen::Spawner + 'static,
+ ) -> Self {
+ ApprovalVotingParallelSubsystem {
+ keystore,
+ slot_duration_millis: config.slot_duration_millis,
+ db,
+ db_config: DatabaseConfig { col_approval_data: config.col_approval_data },
+ mode: polkadot_node_core_approval_voting::Mode::Syncing(sync_oracle),
+ metrics,
+ spawner: Arc::new(spawner),
+ clock,
+ }
+ }
+}
+
+#[overseer::subsystem(ApprovalVotingRewrite, error = SubsystemError, prefix = self::overseer)]
+impl ApprovalVotingParallelSubsystem {
+ fn start(self, ctx: Context) -> SpawnedSubsystem {
+ let future = run::(ctx, self)
+ .map_err(|e| SubsystemError::with_origin("approval-voting-parallel", e))
+ .boxed();
+
+ SpawnedSubsystem { name: "approval-voting-parallel-subsystem", future }
+ }
+}
+
+/// The number of workers used for running the approval-distribution logic.
+pub const APPROVAL_DISTRIBUTION_WORKER_COUNT: usize = 8;
+
+/// The channel size for the workers.
+pub const WORKERS_CHANNEL_SIZE: usize = 64000 / APPROVAL_DISTRIBUTION_WORKER_COUNT;
+
+#[overseer::contextbounds(ApprovalVotingRewrite, prefix = self::overseer)]
+async fn run(
+ mut ctx: Context,
+ subsystem: ApprovalVotingParallelSubsystem,
+) -> SubsystemResult<()>
+where
+{
+ let mut approval_distribution_channels = Vec::new();
+ let (mut tx_approval_voting_work, rx_approval_voting_work) = futures::channel::mpsc::channel::<
+ FromOrchestra,
+ >(WORKERS_CHANNEL_SIZE);
+
+ let slot_duration_millis = subsystem.slot_duration_millis;
+
+ gum::info!(target: LOG_TARGET, "Starting approval distribution workers");
+
+ for i in 0..APPROVAL_DISTRIBUTION_WORKER_COUNT {
+ let approval_distro_orig = polkadot_approval_distribution::ApprovalDistribution::new(
+ subsystem.metrics.0.clone(),
+ subsystem.slot_duration_millis,
+ );
+
+ let (tx_approval_distribution_work, mut rx_approval_distribution_work) =
+ futures::channel::mpsc::channel::>(
+ WORKERS_CHANNEL_SIZE,
+ );
+
+ let task_name = format!("approval-voting-parallel-{}", i);
+ let mut approval_distribution_to_approval_voting =
+ ApprovalDistributionToApprovalWorker(tx_approval_voting_work.clone());
+ let mut network_sender = ctx.sender().clone();
+ let clock = subsystem.clock.clone();
+
+ subsystem.spawner.spawn_blocking(
+ task_name.leak(),
+ Some("approval-voting-parallel-subsystem"),
+ Box::pin(async move {
+ let mut state =
+ polkadot_approval_distribution::State::with_config(slot_duration_millis, clock);
+ let mut rng = rand::rngs::StdRng::from_entropy();
+
+ loop {
+ let message = rx_approval_distribution_work.next().await.unwrap();
+ approval_distro_orig
+ .handle_from_orchestra(
+ message,
+ &mut approval_distribution_to_approval_voting,
+ &mut network_sender,
+ &mut state,
+ &mut rng,
+ )
+ .await;
+ }
+ }),
+ );
+ approval_distribution_channels.push(tx_approval_distribution_work);
+ }
+ gum::info!(target: LOG_TARGET, "Starting approval voting workers");
+
+ let sender = ctx.sender().clone();
+
+ let approval_voting_to_subsystem = ApprovalVotingToApprovalDistribution(sender.clone());
+
+ polkadot_node_core_approval_voting::start_approval_worker(
+ rx_approval_voting_work,
+ sender.clone(),
+ approval_voting_to_subsystem,
+ polkadot_node_core_approval_voting::Config {
+ slot_duration_millis: subsystem.slot_duration_millis,
+ col_approval_data: subsystem.db_config.col_approval_data,
+ },
+ subsystem.db.clone(),
+ subsystem.keystore.clone(),
+ subsystem.mode,
+ subsystem.metrics.1.clone(),
+ subsystem.spawner.clone(),
+ subsystem.clock.clone(),
+ )
+ .await
+ .unwrap();
+
+ gum::info!(target: LOG_TARGET, "Starting main subsystem loop");
+
+ // Main loop of the subsystem, it shouldn't include any logic just dispatching of messages to
+ // the workers.
+ loop {
+ futures::select! {
+ next_msg = ctx.recv().fuse() => {
+ match next_msg.unwrap() {
+ FromOrchestra::Signal(msg) => {
+ for worker in approval_distribution_channels.iter_mut() {
+ worker
+ .send(FromOrchestra::Signal(msg.clone())).await?;
+ }
+
+ tx_approval_voting_work.send(FromOrchestra::Signal(msg)).await?;
+ },
+ FromOrchestra::Communication { msg } => match msg {
+ // The message the approval voting subsystem would've handled.
+ ApprovalVotingParallelMessage::CheckAndImportAssignment(_,_, _) |
+ ApprovalVotingParallelMessage::CheckAndImportApproval(_)|
+ ApprovalVotingParallelMessage::ApprovedAncestor(_, _,_) |
+ ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(_, _) => {
+ // Safe to unwrap because we know the message is the right type.
+ tx_approval_voting_work.send(FromOrchestra::Communication{msg: msg.try_into().unwrap()}).await?;
+ },
+ // Not the message the approval distribution subsystem would've handled.
+ ApprovalVotingParallelMessage::NewBlocks(msg) => {
+ for worker in approval_distribution_channels.iter_mut() {
+ worker
+ .send(FromOrchestra::Communication {
+ msg: ApprovalDistributionMessage::NewBlocks(msg.clone()),
+ })
+ .await?;
+ }
+ },
+ ApprovalVotingParallelMessage::DistributeAssignment(assignment, claimed) => {
+ let worker_index = assignment.validator.0 as usize % approval_distribution_channels.len();
+ let worker = approval_distribution_channels.get_mut(worker_index).unwrap();
+ worker
+ .send(FromOrchestra::Communication {
+ msg: ApprovalDistributionMessage::DistributeAssignment(assignment, claimed),
+ })
+ .await?;
+
+ },
+ ApprovalVotingParallelMessage::DistributeApproval(vote) => {
+ let worker_index = vote.validator.0 as usize % approval_distribution_channels.len();
+ let worker = approval_distribution_channels.get_mut(worker_index).unwrap();
+ worker
+ .send(FromOrchestra::Communication {
+ msg: ApprovalDistributionMessage::DistributeApproval(vote),
+ }).await?;
+
+ },
+ ApprovalVotingParallelMessage::NetworkBridgeUpdate(msg) => {
+ if let polkadot_node_subsystem::messages::NetworkBridgeEvent::PeerMessage(
+ peer_id,
+ msg,
+ ) = msg
+ {
+ let validator_index = validator_index_for_msg(&msg);
+ let worker_index = validator_index.0 as usize % approval_distribution_channels.len();
+ let worker = approval_distribution_channels.get_mut(worker_index).unwrap();
+
+ worker
+ .send(FromOrchestra::Communication {
+ msg: ApprovalDistributionMessage::NetworkBridgeUpdate(
+ polkadot_node_subsystem::messages::NetworkBridgeEvent::PeerMessage(
+ peer_id, msg,
+ ),
+ ),
+ })
+ .await?;
+ } else {
+ for worker in approval_distribution_channels.iter_mut() {
+ worker
+ .send(FromOrchestra::Communication {
+ msg: ApprovalDistributionMessage::NetworkBridgeUpdate(msg.clone()),
+ }).await?;
+ }
+ }
+ },
+ ApprovalVotingParallelMessage::GetApprovalSignatures(indices, tx) => {
+ let mut sigs = HashMap::new();
+ let mut signatures_channels = Vec::new();
+ for worker in approval_distribution_channels.iter_mut() {
+ let (tx, rx) = oneshot::channel();
+ worker
+ .send(FromOrchestra::Communication {
+ msg: ApprovalDistributionMessage::GetApprovalSignatures(indices.clone(), tx),
+ }).await?;
+ signatures_channels.push(rx);
+ }
+ let results = futures::future::join_all(signatures_channels).await;
+
+ for result in results {
+ let worker_sigs = result.unwrap();
+ sigs.extend(worker_sigs);
+ }
+
+ if let Err(_) = tx.send(sigs) {
+ gum::debug!(
+ target: LOG_TARGET,
+ "Sending back approval signatures failed, oneshot got closed"
+ );
+ }
+ },
+ ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag) => {
+ for worker in approval_distribution_channels.iter_mut() {
+ worker
+ .send(FromOrchestra::Communication {
+ msg: ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag),
+ }).await?;
+ }
+ },
+ },
+ };
+
+ },
+ };
+ }
+}
+
+// Returns the validators that initially created this Assignment or Vote.
+fn validator_index_for_msg(
+ msg: &polkadot_node_network_protocol::ApprovalDistributionMessage,
+) -> ValidatorIndex {
+ match msg {
+ polkadot_node_network_protocol::Versioned::V1(ref msg) => match msg {
+ polkadot_node_network_protocol::v1::ApprovalDistributionMessage::Assignments(msgs) =>
+ msgs.first().unwrap().0.validator,
+ polkadot_node_network_protocol::v1::ApprovalDistributionMessage::Approvals(msgs) =>
+ msgs.first().unwrap().validator,
+ },
+ polkadot_node_network_protocol::Versioned::V2(ref msg) => match msg {
+ polkadot_node_network_protocol::v2::ApprovalDistributionMessage::Assignments(msgs) =>
+ msgs.first().unwrap().0.validator,
+ polkadot_node_network_protocol::v2::ApprovalDistributionMessage::Approvals(msgs) =>
+ msgs.first().unwrap().validator,
+ },
+ polkadot_node_network_protocol::Versioned::V3(ref msg) => match msg {
+ polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Assignments(msgs) =>
+ msgs.first().unwrap().0.validator,
+ polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Approvals(msgs) =>
+ msgs.first().unwrap().validator,
+ },
+ }
+}
+
+/// Just a wrapper for implementing overseer::SubsystemSender, so that
+/// we can inject into the approval-distribution subsystem.
+#[derive(Clone)]
+pub struct ApprovalDistributionToApprovalWorker(
+ futures::channel::mpsc::Sender>,
+);
+
+impl overseer::SubsystemSender for ApprovalDistributionToApprovalWorker {
+ fn send_message<'life0, 'async_trait>(
+ &'life0 mut self,
+ msg: ApprovalVotingMessage,
+ ) -> ::core::pin::Pin<
+ Box + ::core::marker::Send + 'async_trait>,
+ >
+ where
+ 'life0: 'async_trait,
+ Self: 'async_trait,
+ {
+ async {
+ self.0
+ .send(polkadot_overseer::FromOrchestra::Communication { msg })
+ .await
+ .unwrap()
+ }
+ .boxed()
+ }
+
+ fn try_send_message(
+ &mut self,
+ _msg: ApprovalVotingMessage,
+ ) -> Result<(), metered::TrySendError> {
+ todo!("Unused for now")
+ }
+
+ fn send_messages<'life0, 'async_trait, I>(
+ &'life0 mut self,
+ _msgs: I,
+ ) -> ::core::pin::Pin<
+ Box + ::core::marker::Send + 'async_trait>,
+ >
+ where
+ I: IntoIterator- + Send,
+ I::IntoIter: Send,
+ I: 'async_trait,
+ 'life0: 'async_trait,
+ Self: 'async_trait,
+ {
+ todo!("Unused for now")
+ }
+
+ fn send_unbounded_message(&mut self, _msg: ApprovalVotingMessage) {
+ todo!("Unused for now")
+ }
+}
+
+/// Just a wrapper for implementing overseer::SubsystemSender, so that
+/// we can inject into the approval voting subsystem.
+#[derive(Clone)]
+pub struct ApprovalVotingToApprovalDistribution>(
+ S,
+);
+
+impl>
+ overseer::SubsystemSender for ApprovalVotingToApprovalDistribution
+{
+ #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
+ fn send_message<'life0, 'async_trait>(
+ &'life0 mut self,
+ msg: ApprovalDistributionMessage,
+ ) -> ::core::pin::Pin<
+ Box + ::core::marker::Send + 'async_trait>,
+ >
+ where
+ 'life0: 'async_trait,
+ Self: 'async_trait,
+ {
+ self.0.send_message(msg.into())
+ }
+
+ fn try_send_message(
+ &mut self,
+ msg: ApprovalDistributionMessage,
+ ) -> Result<(), metered::TrySendError> {
+ self.0.try_send_message(msg.into()).map_err(|err| match err {
+ // Safe to unwrap because it was built from the same type.
+ metered::TrySendError::Closed(msg) =>
+ metered::TrySendError::Closed(msg.try_into().unwrap()),
+ metered::TrySendError::Full(msg) =>
+ metered::TrySendError::Full(msg.try_into().unwrap()),
+ })
+ }
+
+ #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
+ fn send_messages<'life0, 'async_trait, I>(
+ &'life0 mut self,
+ msgs: I,
+ ) -> ::core::pin::Pin<
+ Box + ::core::marker::Send + 'async_trait>,
+ >
+ where
+ I: IntoIterator- + Send,
+ I::IntoIter: Send,
+ I: 'async_trait,
+ 'life0: 'async_trait,
+ Self: 'async_trait,
+ {
+ self.0.send_messages(msgs.into_iter().map(|msg| msg.into()))
+ }
+
+ fn send_unbounded_message(&mut self, msg: ApprovalDistributionMessage) {
+ self.0.send_unbounded_message(msg.into())
+ }
+}
diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs
index d6208cbf6bac..fc0cfe23db08 100644
--- a/polkadot/node/core/approval-voting/src/lib.rs
+++ b/polkadot/node/core/approval-voting/src/lib.rs
@@ -1558,8 +1558,8 @@ pub async fn start_approval_worker<
let backend = DbBackend::new(db.clone(), approval_voting.db_config);
let spawner = approval_voting.spawner.clone();
spawner.spawn_blocking(
- "approval-voting-rewrite-db",
- Some("approval-voting-rewrite-subsystem"),
+ "approval-voting-parallel-db",
+ Some("approval-voting-parallel-subsystem"),
Box::pin(async move {
run_approval_on_worker_thread(
approval_work,
diff --git a/polkadot/node/core/dispute-coordinator/src/initialized.rs b/polkadot/node/core/dispute-coordinator/src/initialized.rs
index 5f86da87f21c..ca459e147763 100644
--- a/polkadot/node/core/dispute-coordinator/src/initialized.rs
+++ b/polkadot/node/core/dispute-coordinator/src/initialized.rs
@@ -34,7 +34,8 @@ use polkadot_node_primitives::{
};
use polkadot_node_subsystem::{
messages::{
- ApprovalVotingMessage, BlockDescription, ChainSelectionMessage, DisputeCoordinatorMessage,
+ approval_voting_parallel_enabled, ApprovalVotingMessage, ApprovalVotingParallelMessage,
+ BlockDescription, ChainSelectionMessage, DisputeCoordinatorMessage,
DisputeDistributionMessage, ImportStatementsResult,
},
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError,
@@ -1059,9 +1060,21 @@ impl Initialized {
// 4. We are waiting (and blocking the whole subsystem) on a response right after -
// therefore even with all else failing we will never have more than
// one message in flight at any given time.
- ctx.send_unbounded_message(
- ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx),
- );
+ if approval_voting_parallel_enabled() {
+ ctx.send_unbounded_message(
+ ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(
+ candidate_hash,
+ tx,
+ ),
+ );
+ } else {
+ ctx.send_unbounded_message(
+ ApprovalVotingMessage::GetApprovalSignaturesForCandidate(
+ candidate_hash,
+ tx,
+ ),
+ );
+ }
match rx.await {
Err(_) => {
gum::warn!(
diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs
index 64562f88a821..f4b68b125352 100644
--- a/polkadot/node/network/approval-distribution/src/lib.rs
+++ b/polkadot/node/network/approval-distribution/src/lib.rs
@@ -68,7 +68,8 @@ use std::{
time::Duration,
};
-mod metrics;
+/// Approval distribution metrics.
+pub mod metrics;
#[cfg(test)]
mod tests;
diff --git a/polkadot/node/network/bridge/src/rx/mod.rs b/polkadot/node/network/bridge/src/rx/mod.rs
index 84e935366d0c..93d52ac1d041 100644
--- a/polkadot/node/network/bridge/src/rx/mod.rs
+++ b/polkadot/node/network/bridge/src/rx/mod.rs
@@ -44,9 +44,10 @@ use polkadot_node_network_protocol::{
use polkadot_node_subsystem::{
errors::SubsystemError,
messages::{
- network_bridge_event::NewGossipTopology, ApprovalDistributionMessage,
- BitfieldDistributionMessage, CollatorProtocolMessage, GossipSupportMessage,
- NetworkBridgeEvent, NetworkBridgeRxMessage, StatementDistributionMessage,
+ approval_voting_parallel_enabled, network_bridge_event::NewGossipTopology,
+ ApprovalDistributionMessage, ApprovalVotingParallelMessage, BitfieldDistributionMessage,
+ CollatorProtocolMessage, GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeRxMessage,
+ StatementDistributionMessage,
},
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
};
@@ -1140,7 +1141,13 @@ async fn dispatch_validation_events_to_all(
.send_messages(event.focus().map(StatementDistributionMessage::from))
.await;
sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await;
- sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await;
+ if approval_voting_parallel_enabled() {
+ sender
+ .send_messages(event.focus().map(ApprovalVotingParallelMessage::from))
+ .await;
+ } else {
+ sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await;
+ }
sender.send_messages(event.focus().map(GossipSupportMessage::from)).await;
}
}
diff --git a/polkadot/node/overseer/src/dummy.rs b/polkadot/node/overseer/src/dummy.rs
index fc5f0070773b..6f9cd9d00403 100644
--- a/polkadot/node/overseer/src/dummy.rs
+++ b/polkadot/node/overseer/src/dummy.rs
@@ -88,6 +88,7 @@ pub fn dummy_overseer_builder(
DummySubsystem,
DummySubsystem,
DummySubsystem,
+ DummySubsystem,
>,
SubsystemError,
>
@@ -131,6 +132,7 @@ pub fn one_for_all_overseer_builder(
Sub,
Sub,
Sub,
+ Sub,
>,
SubsystemError,
>
@@ -155,6 +157,7 @@ where
+ Subsystem, SubsystemError>
+ Subsystem, SubsystemError>
+ Subsystem, SubsystemError>
+ + Subsystem, SubsystemError>
+ Subsystem, SubsystemError>
+ Subsystem, SubsystemError>
+ Subsystem, SubsystemError>
@@ -183,6 +186,7 @@ where
.statement_distribution(subsystem.clone())
.approval_distribution(subsystem.clone())
.approval_voting(subsystem.clone())
+ .approval_voting_parallel(subsystem.clone())
.gossip_support(subsystem.clone())
.dispute_coordinator(subsystem.clone())
.dispute_distribution(subsystem.clone())
diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs
index 24985a99913d..55bfefd4a29c 100644
--- a/polkadot/node/overseer/src/lib.rs
+++ b/polkadot/node/overseer/src/lib.rs
@@ -76,13 +76,13 @@ use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotificat
use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
use polkadot_node_subsystem_types::messages::{
- ApprovalDistributionMessage, ApprovalVotingMessage, AvailabilityDistributionMessage,
- AvailabilityRecoveryMessage, AvailabilityStoreMessage, BitfieldDistributionMessage,
- CandidateBackingMessage, CandidateValidationMessage, ChainApiMessage, ChainSelectionMessage,
- CollationGenerationMessage, CollatorProtocolMessage, DisputeCoordinatorMessage,
- DisputeDistributionMessage, GossipSupportMessage, NetworkBridgeRxMessage,
- NetworkBridgeTxMessage, ProspectiveParachainsMessage, ProvisionerMessage, RuntimeApiMessage,
- StatementDistributionMessage,
+ ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
+ AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
+ BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
+ ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
+ DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
+ NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage,
+ ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
};
pub use polkadot_node_subsystem_types::{
@@ -549,6 +549,7 @@ pub struct Overseer {
BitfieldDistributionMessage,
StatementDistributionMessage,
ApprovalDistributionMessage,
+ ApprovalVotingParallelMessage,
GossipSupportMessage,
DisputeDistributionMessage,
CollationGenerationMessage,
@@ -593,7 +594,19 @@ pub struct Overseer {
RuntimeApiMessage,
])]
approval_voting: ApprovalVoting,
-
+ #[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
+ AvailabilityRecoveryMessage,
+ CandidateValidationMessage,
+ ChainApiMessage,
+ ChainSelectionMessage,
+ DisputeCoordinatorMessage,
+ RuntimeApiMessage,
+ NetworkBridgeTxMessage,
+ ApprovalVotingMessage,
+ ApprovalDistributionMessage,
+ ApprovalVotingParallelMessage,
+ ])]
+ approval_voting_parallel: ApprovalVotingRewrite,
#[subsystem(GossipSupportMessage, sends: [
NetworkBridgeTxMessage,
NetworkBridgeRxMessage, // TODO
@@ -611,6 +624,7 @@ pub struct Overseer {
AvailabilityStoreMessage,
AvailabilityRecoveryMessage,
ChainSelectionMessage,
+ ApprovalVotingParallelMessage,
])]
dispute_coordinator: DisputeCoordinator,
diff --git a/polkadot/node/primitives/src/approval.rs b/polkadot/node/primitives/src/approval.rs
index 54a1cb4b3ca0..c2e6b469dd4d 100644
--- a/polkadot/node/primitives/src/approval.rs
+++ b/polkadot/node/primitives/src/approval.rs
@@ -18,7 +18,7 @@
/// A list of primitives introduced in v1.
pub mod v1 {
- use sp_consensus_babe as babe_primitives;
+ use sp_consensus_babe::{self as babe_primitives, SlotDuration};
pub use sp_consensus_babe::{
Randomness, Slot, VrfPreOutput, VrfProof, VrfSignature, VrfTranscript,
};
@@ -118,7 +118,7 @@ pub mod v1 {
}
/// Metadata about a block which is now live in the approval protocol.
- #[derive(Debug)]
+ #[derive(Debug, Clone)]
pub struct BlockApprovalMeta {
/// The hash of the block.
pub hash: Hash,
diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml
index 23cd51d8a04c..bceabd697fc1 100644
--- a/polkadot/node/service/Cargo.toml
+++ b/polkadot/node/service/Cargo.toml
@@ -128,6 +128,7 @@ polkadot-gossip-support = { optional = true, workspace = true, default-features
polkadot-network-bridge = { optional = true, workspace = true, default-features = true }
polkadot-node-collation-generation = { optional = true, workspace = true, default-features = true }
polkadot-node-core-approval-voting = { optional = true, workspace = true, default-features = true }
+polkadot-node-core-approval-voting-parallel = { optional = true, workspace = true, default-features = true }
polkadot-node-core-av-store = { optional = true, workspace = true, default-features = true }
polkadot-node-core-backing = { optional = true, workspace = true, default-features = true }
polkadot-node-core-bitfield-signing = { optional = true, workspace = true, default-features = true }
@@ -172,6 +173,7 @@ full-node = [
"polkadot-network-bridge",
"polkadot-node-collation-generation",
"polkadot-node-core-approval-voting",
+ "polkadot-node-core-approval-voting-parallel",
"polkadot-node-core-av-store",
"polkadot-node-core-backing",
"polkadot-node-core-bitfield-signing",
diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs
index 1a14b3f85cb1..bd9c4b433cdb 100644
--- a/polkadot/node/service/src/overseer.rs
+++ b/polkadot/node/service/src/overseer.rs
@@ -58,6 +58,7 @@ pub use polkadot_network_bridge::{
};
pub use polkadot_node_collation_generation::CollationGenerationSubsystem;
pub use polkadot_node_core_approval_voting::ApprovalVotingSubsystem;
+pub use polkadot_node_core_approval_voting_parallel::ApprovalVotingParallelSubsystem as ApprovalVotingRewriteSubsystem;
pub use polkadot_node_core_av_store::AvailabilityStoreSubsystem;
pub use polkadot_node_core_backing::CandidateBackingSubsystem;
pub use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem;
@@ -203,6 +204,7 @@ pub fn validator_overseer_builder(
CollatorProtocolSubsystem,
ApprovalDistributionSubsystem,
ApprovalVotingSubsystem,
+ ApprovalVotingRewriteSubsystem,
GossipSupportSubsystem,
DisputeCoordinatorSubsystem,
DisputeDistributionSubsystem,
@@ -313,13 +315,21 @@ where
approval_voting_config.slot_duration_millis,
))
.approval_voting(ApprovalVotingSubsystem::with_config(
- approval_voting_config,
+ approval_voting_config.clone(),
parachains_db.clone(),
keystore.clone(),
Box::new(sync_service.clone()),
Metrics::register(registry)?,
Arc::new(spawner.clone()),
))
+ .approval_voting_parallel(ApprovalVotingRewriteSubsystem::with_config(
+ approval_voting_config,
+ parachains_db.clone(),
+ keystore.clone(),
+ Box::new(sync_service.clone()),
+ Metrics::register(registry)?,
+ spawner.clone(),
+ ))
.gossip_support(GossipSupportSubsystem::new(
keystore.clone(),
authority_discovery_service.clone(),
@@ -405,6 +415,7 @@ pub fn collator_overseer_builder(
DummySubsystem,
DummySubsystem,
DummySubsystem,
+ DummySubsystem,
>,
Error,
>
@@ -479,6 +490,7 @@ where
.statement_distribution(DummySubsystem)
.approval_distribution(DummySubsystem)
.approval_voting(DummySubsystem)
+ .approval_voting_parallel(DummySubsystem)
.gossip_support(DummySubsystem)
.dispute_coordinator(DummySubsystem)
.dispute_distribution(DummySubsystem)
diff --git a/polkadot/node/service/src/relay_chain_selection.rs b/polkadot/node/service/src/relay_chain_selection.rs
index c0b1ce8b0ebe..10cbd67e5618 100644
--- a/polkadot/node/service/src/relay_chain_selection.rs
+++ b/polkadot/node/service/src/relay_chain_selection.rs
@@ -39,8 +39,9 @@ use super::{HeaderProvider, HeaderProviderProvider};
use futures::channel::oneshot;
use polkadot_node_primitives::MAX_FINALITY_LAG as PRIMITIVES_MAX_FINALITY_LAG;
use polkadot_node_subsystem::messages::{
- ApprovalDistributionMessage, ApprovalVotingMessage, ChainSelectionMessage,
- DisputeCoordinatorMessage, HighestApprovedAncestorBlock,
+ approval_voting_parallel_enabled, ApprovalDistributionMessage, ApprovalVotingMessage,
+ ApprovalVotingParallelMessage, ChainSelectionMessage, DisputeCoordinatorMessage,
+ HighestApprovedAncestorBlock,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_overseer::{AllMessages, Handle};
@@ -477,12 +478,21 @@ where
if let Some(spawn_handle) = &self.spawn_handle {
let mut overseer_handle = self.overseer.clone();
let lag_update_task = async move {
- overseer_handle
- .send_msg(
- ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag),
- std::any::type_name::(),
- )
- .await;
+ if approval_voting_parallel_enabled() {
+ overseer_handle
+ .send_msg(
+ ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag),
+ std::any::type_name::(),
+ )
+ .await;
+ } else {
+ overseer_handle
+ .send_msg(
+ ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag),
+ std::any::type_name::(),
+ )
+ .await;
+ }
};
spawn_handle.spawn(
diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs
index 90bbc235495b..71911bec5e1e 100644
--- a/polkadot/node/subsystem-types/src/messages.rs
+++ b/polkadot/node/subsystem-types/src/messages.rs
@@ -945,6 +945,123 @@ pub struct BlockDescription {
pub candidates: Vec,
}
+/// Message to the Approval Voting subsystem running both approval-distribution and approval-voting
+/// logic in parallel. This is a combination of all the messages ApprovalVoting and
+/// ApprovalDistribution subsystems can receive.
+#[derive(Debug, derive_more::From)]
+pub enum ApprovalVotingParallelMessage {
+ /// Check if the assignment is valid and can be accepted by our view of the protocol.
+ /// Should not be sent unless the block hash is known.
+ CheckAndImportAssignment(IndirectAssignmentCertV2, CandidateBitfield, DelayTranche),
+ /// Check if the approval vote is valid and can be accepted by our view of the
+ /// protocol.
+ ///
+ /// Should not be sent unless the block hash within the indirect vote is known.
+ CheckAndImportApproval(IndirectSignedApprovalVoteV2),
+ /// Returns the highest possible ancestor hash of the provided block hash which is
+ /// acceptable to vote on finality for.
+ /// The `BlockNumber` provided is the number of the block's ancestor which is the
+ /// earliest possible vote.
+ ///
+ /// It can also return the same block hash, if that is acceptable to vote upon.
+ /// Return `None` if the input hash is unrecognized.
+ ApprovedAncestor(Hash, BlockNumber, oneshot::Sender