diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs index b89054b4dc321..276ee87516858 100644 --- a/polkadot/cli/src/command.rs +++ b/polkadot/cli/src/command.rs @@ -373,16 +373,16 @@ pub fn run() -> Result<()> { Ok(runner.async_run(|mut config| { let (client, backend, _, task_manager) = polkadot_service::new_chain_ops(&mut config, None)?; + let task_handle = task_manager.spawn_handle(); let aux_revert = Box::new(|client, backend, blocks| { - polkadot_service::revert_backend(client, backend, blocks, config).map_err( - |err| { + polkadot_service::revert_backend(client, backend, blocks, config, task_handle) + .map_err(|err| { match err { polkadot_service::Error::Blockchain(err) => err.into(), // Generic application-specific error. err => sc_cli::Error::Application(err.into()), } - }, - ) + }) }); Ok(( cmd.run(client, backend, Some(aux_revert)).map_err(Error::SubstrateCli), diff --git a/polkadot/node/core/approval-voting/Cargo.toml b/polkadot/node/core/approval-voting/Cargo.toml index 65985c0a5db93..50708a77ad7fa 100644 --- a/polkadot/node/core/approval-voting/Cargo.toml +++ b/polkadot/node/core/approval-voting/Cargo.toml @@ -22,6 +22,7 @@ kvdb = { workspace = true } derive_more = { workspace = true, default-features = true } thiserror = { workspace = true } itertools = { workspace = true } +async-trait = { workspace = true } polkadot-node-subsystem = { workspace = true, default-features = true } polkadot-node-subsystem-util = { workspace = true, default-features = true } diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index a138af1188d29..9ab16ba1cd6b1 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -44,6 +44,7 @@ use polkadot_node_subsystem::{ overseer, RuntimeApiError, SubsystemError, SubsystemResult, }; use polkadot_node_subsystem_util::{determine_new_blocks, runtime::RuntimeInfo}; +use polkadot_overseer::SubsystemSender; use polkadot_primitives::{ node_features, BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, ConsensusLog, CoreIndex, GroupIndex, Hash, Header, SessionIndex, @@ -110,8 +111,8 @@ enum ImportedBlockInfoError { /// Computes information about the imported block. Returns an error if the info couldn't be /// extracted. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn imported_block_info( - ctx: &mut Context, +async fn imported_block_info>( + sender: &mut Sender, env: ImportedBlockInfoEnv<'_>, block_hash: Hash, block_header: &Header, @@ -123,11 +124,12 @@ async fn imported_block_info( // fetch candidates let included_candidates: Vec<_> = { let (c_tx, c_rx) = oneshot::channel(); - ctx.send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::CandidateEvents(c_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::CandidateEvents(c_tx), + )) + .await; let events: Vec = match c_rx.await { Ok(Ok(events)) => events, @@ -150,11 +152,12 @@ async fn imported_block_info( // short, that shouldn't happen. let session_index = { let (s_tx, s_rx) = oneshot::channel(); - ctx.send_message(RuntimeApiMessage::Request( - block_header.parent_hash, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_header.parent_hash, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) + .await; let session_index = match s_rx.await { Ok(Ok(s)) => s, @@ -200,11 +203,12 @@ async fn imported_block_info( // by one block. This gives us the opposite invariant for sessions - the parent block's // post-state gives us the canonical information about the session index for any of its // children, regardless of which slot number they might be produced at. - ctx.send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::CurrentBabeEpoch(s_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::CurrentBabeEpoch(s_tx), + )) + .await; match s_rx.await { Ok(Ok(s)) => s, @@ -215,7 +219,7 @@ async fn imported_block_info( }; let extended_session_info = - get_extended_session_info(env.runtime_info, ctx.sender(), block_hash, session_index).await; + get_extended_session_info(env.runtime_info, sender, block_hash, session_index).await; let enable_v2_assignments = extended_session_info.map_or(false, |extended_session_info| { *extended_session_info .node_features @@ -224,7 +228,7 @@ async fn imported_block_info( .unwrap_or(&false) }); - let session_info = get_session_info(env.runtime_info, ctx.sender(), block_hash, session_index) + let session_info = get_session_info(env.runtime_info, sender, block_hash, session_index) .await .ok_or(ImportedBlockInfoError::SessionInfoUnavailable)?; @@ -328,9 +332,15 @@ pub struct BlockImportedCandidates { /// * and return information about all candidates imported under each block. /// /// It is the responsibility of the caller to schedule wakeups for each block. -#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -pub(crate) async fn handle_new_head( - ctx: &mut Context, +pub(crate) async fn handle_new_head< + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender, + AVSender: SubsystemSender, + B: Backend, +>( + sender: &mut Sender, + approval_voting_sender: &mut AVSender, state: &State, db: &mut OverlayedBackend<'_, B>, session_info_provider: &mut RuntimeInfo, @@ -348,7 +358,7 @@ pub(crate) async fn handle_new_head( let header = { let (h_tx, h_rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await; + sender.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await; match h_rx.await? { Err(e) => { gum::debug!( @@ -374,7 +384,7 @@ pub(crate) async fn handle_new_head( let lower_bound_number = finalized_number.unwrap_or(lower_bound_number).max(lower_bound_number); let new_blocks = determine_new_blocks( - ctx.sender(), + sender, |h| db.load_block_entry(h).map(|e| e.is_some()), head, &header, @@ -400,12 +410,15 @@ pub(crate) async fn handle_new_head( keystore: &state.keystore, }; - match imported_block_info(ctx, env, block_hash, &block_header, finalized_number).await { + match imported_block_info(sender, env, block_hash, &block_header, finalized_number) + .await + { Ok(i) => imported_blocks_and_info.push((block_hash, block_header, i)), Err(error) => { // It's possible that we've lost a race with finality. let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx)) + sender + .send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx)) .await; let lost_to_finality = match rx.await { @@ -449,17 +462,11 @@ pub(crate) async fn handle_new_head( force_approve, } = imported_block_info; - let session_info = match get_session_info( - session_info_provider, - ctx.sender(), - head, - session_index, - ) - .await - { - Some(session_info) => session_info, - None => return Ok(Vec::new()), - }; + let session_info = + match get_session_info(session_info_provider, sender, head, session_index).await { + Some(session_info) => session_info, + None => return Ok(Vec::new()), + }; let (block_tick, no_show_duration) = { let block_tick = slot_number_to_tick(state.slot_duration_millis, slot); @@ -509,7 +516,7 @@ pub(crate) async fn handle_new_head( }; // If all bits are already set, then send an approve message. if approved_bitfield.count_ones() == approved_bitfield.len() { - ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await; + sender.send_message(ChainSelectionMessage::Approved(block_hash)).await; } let block_entry = v3::BlockEntry { @@ -566,7 +573,7 @@ pub(crate) async fn handle_new_head( // Notify chain-selection of all approved hashes. for hash in approved_hashes { - ctx.send_message(ChainSelectionMessage::Approved(hash)).await; + sender.send_message(ChainSelectionMessage::Approved(hash)).await; } } @@ -602,7 +609,8 @@ pub(crate) async fn handle_new_head( "Informing distribution of newly imported chain", ); - ctx.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta)); + approval_voting_sender + .send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta)); Ok(imported_candidates) } @@ -660,7 +668,7 @@ pub(crate) mod tests { State { keystore: Arc::new(LocalKeystore::in_memory()), slot_duration_millis: 6_000, - clock: Box::new(MockClock::default()), + clock: Arc::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::default()), spans: HashMap::new(), per_block_assignments_gathering_times: LruMap::new(ByLength::new( @@ -804,8 +812,9 @@ pub(crate) mod tests { keystore: &LocalKeystore::in_memory(), }; - let info = - imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await.unwrap(); + let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(4)) + .await + .unwrap(); assert_eq!(info.included_candidates, included_candidates); assert_eq!(info.session_index, session); @@ -951,7 +960,7 @@ pub(crate) mod tests { keystore: &LocalKeystore::in_memory(), }; - let info = imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await; + let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(4)).await; assert_matches!(info, Err(ImportedBlockInfoError::VrfInfoUnavailable)); }) @@ -1090,7 +1099,7 @@ pub(crate) mod tests { keystore: &LocalKeystore::in_memory(), }; - let info = imported_block_info(&mut ctx, env, hash, &header, &Some(6)).await; + let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(6)).await; assert_matches!(info, Err(ImportedBlockInfoError::BlockAlreadyFinalized)); }) @@ -1126,7 +1135,8 @@ pub(crate) mod tests { #[test] fn imported_block_info_extracts_force_approve() { let pool = TaskExecutor::new(); - let (mut ctx, mut handle) = make_subsystem_context(pool.clone()); + let (mut ctx, mut handle) = + make_subsystem_context::(pool.clone()); let session = 5; let session_info = dummy_session_info(session); @@ -1189,7 +1199,7 @@ pub(crate) mod tests { }; let info = - imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await.unwrap(); + imported_block_info(ctx.sender(), env, hash, &header, &Some(4)).await.unwrap(); assert_eq!(info.included_candidates, included_candidates); assert_eq!(info.session_index, session); @@ -1382,8 +1392,11 @@ pub(crate) mod tests { let test_fut = { Box::pin(async move { let mut overlay_db = OverlayedBackend::new(&db); + + let mut approval_voting_sender = ctx.sender().clone(); let result = handle_new_head( - &mut ctx, + ctx.sender(), + &mut approval_voting_sender, &state, &mut overlay_db, &mut session_info_provider, diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 5d588a53f3d1a..da44aedaf6125 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -148,7 +148,7 @@ pub struct Config { // When transitioning from `Syncing` to `Active`, the node notifies the `ApprovalDistribution` // subsystem of all unfinalized blocks and the candidates included within them, as well as all // votes that the local node itself has cast on candidates within those blocks. -enum Mode { +pub enum Mode { Active, Syncing(Box), } @@ -164,7 +164,8 @@ pub struct ApprovalVotingSubsystem { db: Arc, mode: Mode, metrics: Metrics, - clock: Box, + clock: Arc, + spawner: Arc, } #[derive(Clone)] @@ -483,6 +484,7 @@ impl ApprovalVotingSubsystem { keystore: Arc, sync_oracle: Box, metrics: Metrics, + spawner: Arc, ) -> Self { ApprovalVotingSubsystem::with_config_and_clock( config, @@ -490,7 +492,8 @@ impl ApprovalVotingSubsystem { keystore, sync_oracle, metrics, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), + spawner, ) } @@ -501,7 +504,8 @@ impl ApprovalVotingSubsystem { keystore: Arc, sync_oracle: Box, metrics: Metrics, - clock: Box, + clock: Arc, + spawner: Arc, ) -> Self { ApprovalVotingSubsystem { keystore, @@ -511,6 +515,7 @@ impl ApprovalVotingSubsystem { mode: Mode::Syncing(sync_oracle), metrics, clock, + spawner, } } @@ -550,12 +555,21 @@ fn db_sanity_check(db: Arc, config: DatabaseConfig) -> SubsystemRe #[overseer::subsystem(ApprovalVoting, error = SubsystemError, prefix = self::overseer)] impl ApprovalVotingSubsystem { - fn start(self, ctx: Context) -> SpawnedSubsystem { + fn start(self, mut ctx: Context) -> SpawnedSubsystem { let backend = DbBackend::new(self.db.clone(), self.db_config); - let future = - run::(ctx, self, Box::new(RealAssignmentCriteria), backend) - .map_err(|e| SubsystemError::with_origin("approval-voting", e)) - .boxed(); + let to_other_subsystems = ctx.sender().clone(); + let to_approval_distr = ctx.sender().clone(); + + let future = run::( + ctx, + to_other_subsystems, + to_approval_distr, + self, + Box::new(RealAssignmentCriteria), + backend, + ) + .map_err(|e| SubsystemError::with_origin("approval-voting", e)) + .boxed(); SpawnedSubsystem { name: "approval-voting-subsystem", future } } @@ -824,7 +838,7 @@ where struct State { keystore: Arc, slot_duration_millis: u64, - clock: Box, + clock: Arc, assignment_criteria: Box, spans: HashMap, // Per block, candidate records about how long we take until we gather enough @@ -960,20 +974,20 @@ impl State { } // Returns the approval voting params from the RuntimeApi. - #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] - async fn get_approval_voting_params_or_default( + async fn get_approval_voting_params_or_default>( &self, - ctx: &mut Context, + sender: &mut Sender, session_index: SessionIndex, block_hash: Hash, ) -> Option { let (s_tx, s_rx) = oneshot::channel(); - ctx.send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::ApprovalVotingParams(session_index, s_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::ApprovalVotingParams(session_index, s_tx), + )) + .await; match s_rx.await { Ok(Ok(params)) => { @@ -1142,9 +1156,36 @@ enum Action { Conclude, } +/// Trait for providing approval voting subsystem with work. +#[async_trait::async_trait] +pub trait ApprovalVotingWorkProvider { + async fn recv(&mut self) -> SubsystemResult>; +} + +#[async_trait::async_trait] #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn run( - mut ctx: Context, +impl ApprovalVotingWorkProvider for Context { + async fn recv(&mut self) -> SubsystemResult> { + self.recv().await + } +} + +#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] +async fn run< + B, + WorkProvider: ApprovalVotingWorkProvider, + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + Clone, + ADSender: SubsystemSender, +>( + mut work_provider: WorkProvider, + mut to_other_subsystems: Sender, + mut to_approval_distr: ADSender, mut subsystem: ApprovalVotingSubsystem, assignment_criteria: Box, mut backend: B, @@ -1168,19 +1209,11 @@ where no_show_stats: NoShowStats::default(), }; - // `None` on start-up. Gets initialized/updated on leaf update - let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { - keystore: None, - session_cache_lru_size: DISPUTE_WINDOW.get(), - }); - let mut wakeups = Wakeups::default(); - let mut currently_checking_set = CurrentlyCheckingSet::default(); - let mut delayed_approvals_timers = DelayedApprovalTimer::default(); - let mut approvals_cache = LruMap::new(ByLength::new(APPROVAL_CACHE_SIZE)); - let mut last_finalized_height: Option = { let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await; + to_other_subsystems + .send_message(ChainApiMessage::FinalizedBlockNumber(tx)) + .await; match rx.await? { Ok(number) => Some(number), Err(err) => { @@ -1190,13 +1223,24 @@ where } }; + // `None` on start-up. Gets initialized/updated on leaf update + let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: DISPUTE_WINDOW.get(), + }); + + let mut wakeups = Wakeups::default(); + let mut currently_checking_set = CurrentlyCheckingSet::default(); + let mut delayed_approvals_timers = DelayedApprovalTimer::default(); + let mut approvals_cache = LruMap::new(ByLength::new(APPROVAL_CACHE_SIZE)); + loop { let mut overlayed_db = OverlayedBackend::new(&backend); let actions = futures::select! { (_tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => { subsystem.metrics.on_wakeup(); process_wakeup( - &mut ctx, + &mut to_other_subsystems, &mut state, &mut overlayed_db, &mut session_info_provider, @@ -1206,9 +1250,11 @@ where &wakeups, ).await? } - next_msg = ctx.recv().fuse() => { + next_msg = work_provider.recv().fuse() => { let mut actions = handle_from_overseer( - &mut ctx, + &mut to_other_subsystems, + &mut to_approval_distr, + &subsystem.spawner, &mut state, &mut overlayed_db, &mut session_info_provider, @@ -1268,7 +1314,8 @@ where &mut overlayed_db, &mut session_info_provider, &state, - &mut ctx, + &mut to_other_subsystems, + &mut to_approval_distr, block_hash, validator_index, &subsystem.metrics, @@ -1290,7 +1337,9 @@ where }; if handle_actions( - &mut ctx, + &mut to_other_subsystems, + &mut to_approval_distr, + &subsystem.spawner, &mut state, &mut overlayed_db, &mut session_info_provider, @@ -1317,6 +1366,65 @@ where Ok(()) } +// Starts a worker thread that runs the approval voting subsystem. +pub async fn start_approval_worker< + WorkProvider: ApprovalVotingWorkProvider + Send + 'static, + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + Clone, + ADSender: SubsystemSender, +>( + work_provider: WorkProvider, + to_other_subsystems: Sender, + to_approval_distr: ADSender, + config: Config, + db: Arc, + keystore: Arc, + mode: Mode, + metrics: Metrics, + spawner: Arc, + clock: Arc, +) -> SubsystemResult<()> { + let sync_oracle = match mode { + Mode::Active => todo!(), + Mode::Syncing(oracle) => oracle, + }; + let approval_voting = ApprovalVotingSubsystem::with_config_and_clock( + config, + db.clone(), + keystore, + sync_oracle, + metrics, + clock, + spawner, + ); + let backend = DbBackend::new(db.clone(), approval_voting.db_config); + let spawner = approval_voting.spawner.clone(); + spawner.spawn_blocking( + "approval-voting-rewrite-db", + Some("approval-voting-rewrite-subsystem"), + Box::pin(async move { + if let Err(err) = run( + work_provider, + to_other_subsystems, + to_approval_distr, + approval_voting, + Box::new(RealAssignmentCriteria), + backend, + ) + .await + { + gum::error!(target: LOG_TARGET, ?err, "Approval voting worker stoped processing messages"); + }; + }), + ); + Ok(()) +} + // Handle actions is a function that accepts a set of instructions // and subsequently updates the underlying approvals_db in accordance // with the linear set of instructions passed in. Therefore, actions @@ -1337,8 +1445,19 @@ where // // returns `true` if any of the actions was a `Conclude` command. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn handle_actions( - ctx: &mut Context, +async fn handle_actions< + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + Clone, + ADSender: SubsystemSender, +>( + sender: &mut Sender, + approval_voting_sender: &mut ADSender, + spawn_handle: &Arc, state: &mut State, overlayed_db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, @@ -1370,7 +1489,8 @@ async fn handle_actions( // Note that chaining these iterators is O(n) as we must consume // the prior iterator. let next_actions: Vec = issue_approval( - ctx, + sender, + approval_voting_sender, state, overlayed_db, session_info_provider, @@ -1421,10 +1541,12 @@ async fn handle_actions( let validator_index = indirect_cert.validator; if distribute_assignment { - ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment( - indirect_cert, - claimed_candidate_indices, - )); + approval_voting_sender.send_unbounded_message( + ApprovalDistributionMessage::DistributeAssignment( + indirect_cert, + claimed_candidate_indices, + ), + ); } match approvals_cache.get(&candidate_hash) { @@ -1439,7 +1561,8 @@ async fn handle_actions( actions_iter = new_actions.into_iter(); }, None => { - let ctx = &mut *ctx; + let sender = sender.clone(); + let spawn_handle = spawn_handle.clone(); currently_checking_set .insert_relay_block_hash( @@ -1448,7 +1571,8 @@ async fn handle_actions( relay_block_hash, async move { launch_approval( - ctx, + sender, + spawn_handle, metrics.clone(), session, candidate, @@ -1477,13 +1601,13 @@ async fn handle_actions( }) .with_string_tag("block-hash", format!("{:?}", block_hash)) .with_stage(jaeger::Stage::ApprovalChecking); - ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await; + sender.send_message(ChainSelectionMessage::Approved(block_hash)).await; }, Action::BecomeActive => { *mode = Mode::Active; let (messages, next_actions) = distribution_messages_for_activation( - ctx, + sender, overlayed_db, state, delayed_approvals_timers, @@ -1491,7 +1615,7 @@ async fn handle_actions( ) .await?; - ctx.send_messages(messages.into_iter()).await; + approval_voting_sender.send_messages(messages.into_iter()).await; let next_actions: Vec = next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect(); @@ -1565,8 +1689,8 @@ fn get_assignment_core_indices( } #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn distribution_messages_for_activation( - ctx: &mut Context, +async fn distribution_messages_for_activation>( + sender: &mut Sender, db: &OverlayedBackend<'_, impl Backend>, state: &State, delayed_approvals_timers: &mut DelayedApprovalTimer, @@ -1683,7 +1807,7 @@ async fn distribution_messages_for_activation( let ExtendedSessionInfo { ref executor_params, .. } = match get_extended_session_info( session_info_provider, - ctx.sender(), + sender, block_entry.block_hash(), block_entry.session(), ) @@ -1781,9 +1905,16 @@ async fn distribution_messages_for_activation( } // Handle an incoming signal from the overseer. Returns true if execution should conclude. -#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn handle_from_overseer( - ctx: &mut Context, +async fn handle_from_overseer< + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + Clone, + ADSender: SubsystemSender, +>( + sender: &mut Sender, + approval_voting_sender: &mut ADSender, + spawn_handle: &Arc, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, @@ -1801,7 +1932,8 @@ async fn handle_from_overseer( jaeger::PerLeafSpan::new(activated.span, "approval-voting"); state.spans.insert(head, approval_voting_span); match import::handle_new_head( - ctx, + sender, + approval_voting_sender, state, db, session_info_provider, @@ -1885,7 +2017,7 @@ async fn handle_from_overseer( FromOrchestra::Communication { msg } => match msg { ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_cores, tranche, tx) => { let (check_outcome, actions) = check_and_import_assignment( - ctx.sender(), + sender, state, db, session_info_provider, @@ -1905,7 +2037,7 @@ async fn handle_from_overseer( }, ApprovalVotingMessage::CheckAndImportApproval(a, tx) => { let result = check_and_import_approval( - ctx.sender(), + sender, state, db, session_info_provider, @@ -1933,7 +2065,7 @@ async fn handle_from_overseer( .with_stage(jaeger::Stage::ApprovalChecking) .with_string_tag("leaf", format!("{:?}", target)); match handle_approved_ancestor( - ctx, + sender, db, target, lower_bound, @@ -1956,7 +2088,14 @@ async fn handle_from_overseer( }, ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx) => { metrics.on_candidate_signatures_request(); - get_approval_signatures_for_candidate(ctx, db, candidate_hash, tx).await?; + get_approval_signatures_for_candidate( + approval_voting_sender.clone(), + spawn_handle, + db, + candidate_hash, + tx, + ) + .await?; Vec::new() }, }, @@ -1970,8 +2109,11 @@ async fn handle_from_overseer( /// This involves an unbounded message send to approval-distribution, the caller has to ensure that /// calls to this function are infrequent and bounded. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn get_approval_signatures_for_candidate( - ctx: &mut Context, +async fn get_approval_signatures_for_candidate< + Sender: SubsystemSender, +>( + mut sender: Sender, + spawn_handle: &Arc, db: &OverlayedBackend<'_, impl Backend>, candidate_hash: CandidateHash, tx: oneshot::Sender, ValidatorSignature)>>, @@ -2030,7 +2172,6 @@ async fn get_approval_signatures_for_candidate( } } - let mut sender = ctx.sender().clone(); let get_approvals = async move { let (tx_distribution, rx_distribution) = oneshot::channel(); sender.send_unbounded_message(ApprovalDistributionMessage::GetApprovalSignatures( @@ -2110,12 +2251,17 @@ async fn get_approval_signatures_for_candidate( ?candidate_hash, "Spawning task for fetching signatures from approval-distribution" ); - ctx.spawn("get-approval-signatures", Box::pin(get_approvals)) + spawn_handle.spawn( + "get-approval-signatures", + Some("approval-voting-subsystem"), + Box::pin(get_approvals), + ); + Ok(()) } #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn handle_approved_ancestor( - ctx: &mut Context, +async fn handle_approved_ancestor>( + sender: &mut Sender, db: &OverlayedBackend<'_, impl Backend>, target: Hash, lower_bound: BlockNumber, @@ -2135,7 +2281,7 @@ async fn handle_approved_ancestor( let target_number = { let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::BlockNumber(target, tx)).await; + sender.send_message(ChainApiMessage::BlockNumber(target, tx)).await; match rx.await { Ok(Ok(Some(n))) => n, @@ -2156,12 +2302,13 @@ async fn handle_approved_ancestor( let ancestry = if target_number > lower_bound + 1 { let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::Ancestors { - hash: target, - k: (target_number - (lower_bound + 1)) as usize, - response_channel: tx, - }) - .await; + sender + .send_message(ChainApiMessage::Ancestors { + hash: target, + k: (target_number - (lower_bound + 1)) as usize, + response_channel: tx, + }) + .await; match rx.await { Ok(Ok(a)) => a, @@ -3106,9 +3253,8 @@ fn should_trigger_assignment( } } -#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn process_wakeup( - ctx: &mut Context, +async fn process_wakeup>( + sender: &mut Sender, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, @@ -3139,7 +3285,7 @@ async fn process_wakeup( let ExtendedSessionInfo { ref session_info, ref executor_params, .. } = match get_extended_session_info( session_info_provider, - ctx.sender(), + sender, block_entry.block_hash(), block_entry.session(), ) @@ -3281,7 +3427,7 @@ async fn process_wakeup( // Note that this function also schedules a wakeup as necessary. actions.extend( advance_approval_state( - ctx.sender(), + sender, state, db, session_info_provider, @@ -3302,8 +3448,14 @@ async fn process_wakeup( // spawned. When the background work is no longer needed, the `AbortHandle` should be dropped // to cancel the background work and any requests it has spawned. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn launch_approval( - ctx: &mut Context, +async fn launch_approval< + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender, +>( + mut sender: Sender, + spawn_handle: Arc, metrics: Metrics, session_index: SessionIndex, candidate: CandidateReceipt, @@ -3354,14 +3506,15 @@ async fn launch_approval( .with_stage(jaeger::Stage::ApprovalChecking); let timer = metrics.time_recover_and_approve(); - ctx.send_message(AvailabilityRecoveryMessage::RecoverAvailableData( - candidate.clone(), - session_index, - Some(backing_group), - core_index, - a_tx, - )) - .await; + sender + .send_message(AvailabilityRecoveryMessage::RecoverAvailableData( + candidate.clone(), + session_index, + Some(backing_group), + core_index, + a_tx, + )) + .await; let request_validation_result_span = span .child("request-validation-result") @@ -3370,15 +3523,18 @@ async fn launch_approval( .with_string_tag("block-hash", format!("{:?}", block_hash)) .with_stage(jaeger::Stage::ApprovalChecking); - ctx.send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::ValidationCodeByHash(candidate.descriptor.validation_code_hash, code_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::ValidationCodeByHash( + candidate.descriptor.validation_code_hash, + code_tx, + ), + )) + .await; let candidate = candidate.clone(); let metrics_guard = StaleGuard(Some(metrics)); - let mut sender = ctx.sender().clone(); let background = async move { // Force the move of the timer into the background task. let _timer = timer; @@ -3508,14 +3664,19 @@ async fn launch_approval( } }; let (background, remote_handle) = background.remote_handle(); - ctx.spawn("approval-checks", Box::pin(background)).map(move |()| remote_handle) + spawn_handle.spawn("approval-checks", Some("approval-voting-subsystem"), Box::pin(background)); + Ok(remote_handle) } // Issue and import a local approval vote. Should only be invoked after approval checks // have been done. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn issue_approval( - ctx: &mut Context, +async fn issue_approval< + Sender: SubsystemSender, + ADSender: SubsystemSender, +>( + sender: &mut Sender, + approval_voting_sender: &mut ADSender, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, @@ -3594,7 +3755,7 @@ async fn issue_approval( let session_info = match get_session_info( session_info_provider, - ctx.sender(), + sender, block_entry.parent_hash(), block_entry.session(), ) @@ -3636,7 +3797,7 @@ async fn issue_approval( ); let actions = advance_approval_state( - ctx.sender(), + sender, state, db, session_info_provider, @@ -3653,7 +3814,8 @@ async fn issue_approval( db, session_info_provider, state, - ctx, + sender, + approval_voting_sender, block_hash, validator_index, metrics, @@ -3672,11 +3834,15 @@ async fn issue_approval( // Create signature for the approved candidates pending signatures #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn maybe_create_signature( +async fn maybe_create_signature< + Sender: SubsystemSender, + ADSender: SubsystemSender, +>( db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, state: &State, - ctx: &mut Context, + sender: &mut Sender, + approval_voting_sender: &mut ADSender, block_hash: Hash, validator_index: ValidatorIndex, metrics: &Metrics, @@ -3695,7 +3861,7 @@ async fn maybe_create_signature( }; let approval_params = state - .get_approval_voting_params_or_default(ctx, block_entry.session(), block_hash) + .get_approval_voting_params_or_default(sender, block_entry.session(), block_hash) .await .unwrap_or_default(); @@ -3715,7 +3881,7 @@ async fn maybe_create_signature( let session_info = match get_session_info( session_info_provider, - ctx.sender(), + sender, block_entry.parent_hash(), block_entry.session(), ) @@ -3796,7 +3962,7 @@ async fn maybe_create_signature( metrics.on_approval_produced(); - ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeApproval( + approval_voting_sender.send_unbounded_message(ApprovalDistributionMessage::DistributeApproval( IndirectSignedApprovalVoteV2 { block_hash: block_entry.block_hash(), candidate_indices: candidates_indices, @@ -3837,7 +4003,7 @@ fn issue_local_invalid_statement( candidate_hash: CandidateHash, candidate: CandidateReceipt, ) where - Sender: overseer::ApprovalVotingSenderTrait, + Sender: SubsystemSender, { // We need to send an unbounded message here to break a cycle: // DisputeCoordinatorMessage::IssueLocalStatement -> diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index 8aab353f9b840..8565e5d14a1ae 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -39,7 +39,7 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::TimeoutExt; -use polkadot_overseer::HeadSupportsParachains; +use polkadot_overseer::{HeadSupportsParachains, SpawnGlue}; use polkadot_primitives::{ ApprovalVote, CandidateCommitments, CandidateEvent, CoreIndex, DisputeStatement, GroupIndex, Header, Id as ParaId, IndexedVec, NodeFeatures, ValidDisputeStatementKind, ValidationCode, @@ -537,7 +537,7 @@ impl Default for HarnessConfig { struct TestHarness { virtual_overseer: VirtualOverseer, - clock: Box, + clock: Arc, sync_oracle_handle: TestSyncOracleHandle, } @@ -556,7 +556,7 @@ fn test_harness>( let pool = sp_core::testing::TaskExecutor::new(); let (context, virtual_overseer) = - polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); + polkadot_node_subsystem_test_helpers::make_subsystem_context(pool.clone()); let keystore = LocalKeystore::in_memory(); let _ = keystore.sr25519_generate_new( @@ -564,7 +564,7 @@ fn test_harness>( Some(&Sr25519Keyring::Alice.to_seed()), ); - let clock = Box::new(clock); + let clock = Arc::new(clock); let db = kvdb_memorydb::create(test_constants::NUM_COLUMNS); let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]); @@ -580,6 +580,7 @@ fn test_harness>( sync_oracle, Metrics::default(), clock.clone(), + Arc::new(SpawnGlue(pool)), ), assignment_criteria, backend, @@ -4164,7 +4165,7 @@ async fn handle_approval_on_max_coalesce_count( async fn handle_approval_on_max_wait_time( virtual_overseer: &mut VirtualOverseer, candidate_indices: Vec, - clock: Box, + clock: Arc, ) { const TICK_NOW_BEGIN: u64 = 1; const MAX_COALESCE_COUNT: u32 = 3; @@ -4462,7 +4463,7 @@ async fn build_chain_with_two_blocks_with_one_candidate_each( async fn setup_overseer_with_two_blocks_each_with_one_assignment_triggered( virtual_overseer: &mut VirtualOverseer, store: TestStore, - clock: &Box, + clock: &Arc, sync_oracle_handle: TestSyncOracleHandle, ) { assert_matches!( @@ -4976,7 +4977,7 @@ fn test_gathering_assignments_statements() { let mut state = State { keystore: Arc::new(LocalKeystore::in_memory()), slot_duration_millis: 6_000, - clock: Box::new(MockClock::default()), + clock: Arc::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))), spans: HashMap::new(), per_block_assignments_gathering_times: LruMap::new(ByLength::new( @@ -5071,7 +5072,7 @@ fn test_observe_assignment_gathering_status() { let mut state = State { keystore: Arc::new(LocalKeystore::in_memory()), slot_duration_millis: 6_000, - clock: Box::new(MockClock::default()), + clock: Arc::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))), spans: HashMap::new(), per_block_assignments_gathering_times: LruMap::new(ByLength::new( diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index b4f63bd2aa06b..b76f40dd31029 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -63,6 +63,7 @@ use { }; use polkadot_node_subsystem_util::database::Database; +use polkadot_overseer::SpawnGlue; #[cfg(feature = "full-node")] pub use { @@ -83,7 +84,7 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; use prometheus_endpoint::Registry; #[cfg(feature = "full-node")] use sc_service::KeystoreContainer; -use sc_service::RpcHandlers; +use sc_service::{RpcHandlers, SpawnTaskHandle}; use sc_telemetry::TelemetryWorker; #[cfg(feature = "full-node")] use sc_telemetry::{Telemetry, TelemetryWorkerHandle}; @@ -1481,6 +1482,7 @@ pub fn revert_backend( backend: Arc, blocks: BlockNumber, config: Configuration, + task_handle: SpawnTaskHandle, ) -> Result<(), Error> { let best_number = client.info().best_number; let finalized = client.info().finalized_number; @@ -1501,7 +1503,7 @@ pub fn revert_backend( let parachains_db = open_database(&config.database) .map_err(|err| sp_blockchain::Error::Backend(err.to_string()))?; - revert_approval_voting(parachains_db.clone(), hash)?; + revert_approval_voting(parachains_db.clone(), hash, task_handle)?; revert_chain_selection(parachains_db, hash)?; // Revert Substrate consensus related components sc_consensus_babe::revert(client.clone(), backend, blocks)?; @@ -1524,7 +1526,11 @@ fn revert_chain_selection(db: Arc, hash: Hash) -> sp_blockchain::R .map_err(|err| sp_blockchain::Error::Backend(err.to_string())) } -fn revert_approval_voting(db: Arc, hash: Hash) -> sp_blockchain::Result<()> { +fn revert_approval_voting( + db: Arc, + hash: Hash, + task_handle: SpawnTaskHandle, +) -> sp_blockchain::Result<()> { let config = approval_voting_subsystem::Config { col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data, slot_duration_millis: Default::default(), @@ -1536,6 +1542,7 @@ fn revert_approval_voting(db: Arc, hash: Hash) -> sp_blockchain::R Arc::new(sc_keystore::LocalKeystore::in_memory()), Box::new(sp_consensus::NoNetwork), approval_voting_subsystem::Metrics::default(), + Arc::new(SpawnGlue(task_handle)), ); approval_voting diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index eedc92572e08c..1a14b3f85cb19 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -318,6 +318,7 @@ where keystore.clone(), Box::new(sync_service.clone()), Metrics::register(registry)?, + Arc::new(spawner.clone()), )) .gossip_support(GossipSupportSubsystem::new( keystore.clone(), diff --git a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs index e2bb31bc159b2..80861d850be7c 100644 --- a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs @@ -801,7 +801,8 @@ fn build_overseer( Arc::new(keystore), Box::new(TestSyncOracle {}), state.approval_voting_metrics.clone(), - Box::new(system_clock.clone()), + Arc::new(system_clock.clone()), + Arc::new(SpawnGlue(spawn_task_handle.clone())), ); let approval_distribution = ApprovalDistribution::new_with_clock(