Skip to content

Commit

Permalink
[4 / 5] Make approval-voting runnable on a worker thread (#4846)
Browse files Browse the repository at this point in the history
This is part of the work to further optimize the approval subsystems, if
you want to understand the full context start with reading
#4849 (comment),

# Description
This PR contain changes to make possible the run of single
approval-voting instance on a worker thread, so that it can be
instantiated by the approval-voting-parallel subsystem.

This does not contain any functional changes it just decouples the
subsystem from the subsystem Context and introduces more specific trait
dependencies for each function instead of all of them requiring a
context.

This change can be merged  independent of the followup PRs.

---------

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
  • Loading branch information
alexggh and sandreim authored Sep 12, 2024
1 parent ec9a734 commit a34cc8d
Show file tree
Hide file tree
Showing 9 changed files with 382 additions and 188 deletions.
8 changes: 4 additions & 4 deletions polkadot/cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,16 +373,16 @@ pub fn run() -> Result<()> {
Ok(runner.async_run(|mut config| {
let (client, backend, _, task_manager) =
polkadot_service::new_chain_ops(&mut config, None)?;
let task_handle = task_manager.spawn_handle();
let aux_revert = Box::new(|client, backend, blocks| {
polkadot_service::revert_backend(client, backend, blocks, config).map_err(
|err| {
polkadot_service::revert_backend(client, backend, blocks, config, task_handle)
.map_err(|err| {
match err {
polkadot_service::Error::Blockchain(err) => err.into(),
// Generic application-specific error.
err => sc_cli::Error::Application(err.into()),
}
},
)
})
});
Ok((
cmd.run(client, backend, Some(aux_revert)).map_err(Error::SubstrateCli),
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/core/approval-voting/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ kvdb = { workspace = true }
derive_more = { workspace = true, default-features = true }
thiserror = { workspace = true }
itertools = { workspace = true }
async-trait = { workspace = true }

polkadot-node-subsystem = { workspace = true, default-features = true }
polkadot-node-subsystem-util = { workspace = true, default-features = true }
Expand Down
114 changes: 65 additions & 49 deletions polkadot/node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use polkadot_node_subsystem::{
overseer, RuntimeApiError, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{determine_new_blocks, runtime::RuntimeInfo};
use polkadot_overseer::SubsystemSender;
use polkadot_primitives::{
node_features, BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, ConsensusLog,
CoreIndex, GroupIndex, Hash, Header, SessionIndex,
Expand Down Expand Up @@ -111,8 +112,8 @@ enum ImportedBlockInfoError {
/// Computes information about the imported block. Returns an error if the info couldn't be
/// extracted.
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
async fn imported_block_info<Context>(
ctx: &mut Context,
async fn imported_block_info<Sender: SubsystemSender<RuntimeApiMessage>>(
sender: &mut Sender,
env: ImportedBlockInfoEnv<'_>,
block_hash: Hash,
block_header: &Header,
Expand All @@ -124,11 +125,12 @@ async fn imported_block_info<Context>(
// fetch candidates
let included_candidates: Vec<_> = {
let (c_tx, c_rx) = oneshot::channel();
ctx.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CandidateEvents(c_tx),
))
.await;
sender
.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CandidateEvents(c_tx),
))
.await;

let events: Vec<CandidateEvent> = match c_rx.await {
Ok(Ok(events)) => events,
Expand All @@ -151,11 +153,12 @@ async fn imported_block_info<Context>(
// short, that shouldn't happen.
let session_index = {
let (s_tx, s_rx) = oneshot::channel();
ctx.send_message(RuntimeApiMessage::Request(
block_header.parent_hash,
RuntimeApiRequest::SessionIndexForChild(s_tx),
))
.await;
sender
.send_message(RuntimeApiMessage::Request(
block_header.parent_hash,
RuntimeApiRequest::SessionIndexForChild(s_tx),
))
.await;

let session_index = match s_rx.await {
Ok(Ok(s)) => s,
Expand Down Expand Up @@ -201,11 +204,12 @@ async fn imported_block_info<Context>(
// by one block. This gives us the opposite invariant for sessions - the parent block's
// post-state gives us the canonical information about the session index for any of its
// children, regardless of which slot number they might be produced at.
ctx.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CurrentBabeEpoch(s_tx),
))
.await;
sender
.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CurrentBabeEpoch(s_tx),
))
.await;

match s_rx.await {
Ok(Ok(s)) => s,
Expand All @@ -216,7 +220,7 @@ async fn imported_block_info<Context>(
};

let extended_session_info =
get_extended_session_info(env.runtime_info, ctx.sender(), block_hash, session_index).await;
get_extended_session_info(env.runtime_info, sender, block_hash, session_index).await;
let enable_v2_assignments = extended_session_info.map_or(false, |extended_session_info| {
*extended_session_info
.node_features
Expand All @@ -225,7 +229,7 @@ async fn imported_block_info<Context>(
.unwrap_or(&false)
});

let session_info = get_session_info(env.runtime_info, ctx.sender(), block_hash, session_index)
let session_info = get_session_info(env.runtime_info, sender, block_hash, session_index)
.await
.ok_or(ImportedBlockInfoError::SessionInfoUnavailable)?;

Expand Down Expand Up @@ -329,9 +333,15 @@ pub struct BlockImportedCandidates {
/// * and return information about all candidates imported under each block.
///
/// It is the responsibility of the caller to schedule wakeups for each block.
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
pub(crate) async fn handle_new_head<Context, B: Backend>(
ctx: &mut Context,
pub(crate) async fn handle_new_head<
Sender: SubsystemSender<ChainApiMessage>
+ SubsystemSender<RuntimeApiMessage>
+ SubsystemSender<ChainSelectionMessage>,
AVSender: SubsystemSender<ApprovalDistributionMessage>,
B: Backend,
>(
sender: &mut Sender,
approval_voting_sender: &mut AVSender,
state: &State,
db: &mut OverlayedBackend<'_, B>,
session_info_provider: &mut RuntimeInfo,
Expand All @@ -349,7 +359,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(

let header = {
let (h_tx, h_rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await;
sender.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await;
match h_rx.await? {
Err(e) => {
gum::debug!(
Expand All @@ -375,7 +385,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
let lower_bound_number = finalized_number.unwrap_or(lower_bound_number).max(lower_bound_number);

let new_blocks = determine_new_blocks(
ctx.sender(),
sender,
|h| db.load_block_entry(h).map(|e| e.is_some()),
head,
&header,
Expand All @@ -401,12 +411,15 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
keystore: &state.keystore,
};

match imported_block_info(ctx, env, block_hash, &block_header, finalized_number).await {
match imported_block_info(sender, env, block_hash, &block_header, finalized_number)
.await
{
Ok(i) => imported_blocks_and_info.push((block_hash, block_header, i)),
Err(error) => {
// It's possible that we've lost a race with finality.
let (tx, rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx))
sender
.send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx))
.await;

let lost_to_finality = match rx.await {
Expand Down Expand Up @@ -450,17 +463,11 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
force_approve,
} = imported_block_info;

let session_info = match get_session_info(
session_info_provider,
ctx.sender(),
head,
session_index,
)
.await
{
Some(session_info) => session_info,
None => return Ok(Vec::new()),
};
let session_info =
match get_session_info(session_info_provider, sender, head, session_index).await {
Some(session_info) => session_info,
None => return Ok(Vec::new()),
};

let (block_tick, no_show_duration) = {
let block_tick = slot_number_to_tick(state.slot_duration_millis, slot);
Expand Down Expand Up @@ -510,7 +517,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
};
// If all bits are already set, then send an approve message.
if approved_bitfield.count_ones() == approved_bitfield.len() {
ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await;
sender.send_message(ChainSelectionMessage::Approved(block_hash)).await;
}

let block_entry = v3::BlockEntry {
Expand Down Expand Up @@ -567,7 +574,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(

// Notify chain-selection of all approved hashes.
for hash in approved_hashes {
ctx.send_message(ChainSelectionMessage::Approved(hash)).await;
sender.send_message(ChainSelectionMessage::Approved(hash)).await;
}
}

Expand Down Expand Up @@ -603,7 +610,8 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
"Informing distribution of newly imported chain",
);

ctx.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta));
approval_voting_sender
.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta));
Ok(imported_candidates)
}

Expand All @@ -620,7 +628,10 @@ pub(crate) mod tests {
approval::v1::{VrfSignature, VrfTranscript},
DISPUTE_WINDOW,
};
use polkadot_node_subsystem::messages::{AllMessages, ApprovalVotingMessage};
use polkadot_node_subsystem::{
messages::{AllMessages, ApprovalVotingMessage},
SubsystemContext,
};
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_node_subsystem_util::database::Database;
use polkadot_primitives::{
Expand Down Expand Up @@ -662,7 +673,7 @@ pub(crate) mod tests {
State {
keystore: Arc::new(LocalKeystore::in_memory()),
slot_duration_millis: 6_000,
clock: Box::new(MockClock::default()),
clock: Arc::new(MockClock::default()),
assignment_criteria: Box::new(MockAssignmentCriteria::default()),
spans: HashMap::new(),
per_block_assignments_gathering_times: LruMap::new(ByLength::new(
Expand Down Expand Up @@ -806,8 +817,9 @@ pub(crate) mod tests {
keystore: &LocalKeystore::in_memory(),
};

let info =
imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await.unwrap();
let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(4))
.await
.unwrap();

assert_eq!(info.included_candidates, included_candidates);
assert_eq!(info.session_index, session);
Expand Down Expand Up @@ -953,7 +965,7 @@ pub(crate) mod tests {
keystore: &LocalKeystore::in_memory(),
};

let info = imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await;
let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(4)).await;

assert_matches!(info, Err(ImportedBlockInfoError::VrfInfoUnavailable));
})
Expand Down Expand Up @@ -1092,7 +1104,7 @@ pub(crate) mod tests {
keystore: &LocalKeystore::in_memory(),
};

let info = imported_block_info(&mut ctx, env, hash, &header, &Some(6)).await;
let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(6)).await;

assert_matches!(info, Err(ImportedBlockInfoError::BlockAlreadyFinalized));
})
Expand Down Expand Up @@ -1128,7 +1140,8 @@ pub(crate) mod tests {
#[test]
fn imported_block_info_extracts_force_approve() {
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context(pool.clone());
let (mut ctx, mut handle) =
make_subsystem_context::<ApprovalVotingMessage, _>(pool.clone());

let session = 5;
let session_info = dummy_session_info(session);
Expand Down Expand Up @@ -1191,7 +1204,7 @@ pub(crate) mod tests {
};

let info =
imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await.unwrap();
imported_block_info(ctx.sender(), env, hash, &header, &Some(4)).await.unwrap();

assert_eq!(info.included_candidates, included_candidates);
assert_eq!(info.session_index, session);
Expand Down Expand Up @@ -1384,8 +1397,11 @@ pub(crate) mod tests {
let test_fut = {
Box::pin(async move {
let mut overlay_db = OverlayedBackend::new(&db);

let mut approval_voting_sender = ctx.sender().clone();
let result = handle_new_head(
&mut ctx,
ctx.sender(),
&mut approval_voting_sender,
&state,
&mut overlay_db,
&mut session_info_provider,
Expand Down
Loading

0 comments on commit a34cc8d

Please sign in to comment.