Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4 / 5] Make approval-voting runnable on a worker thread #4846

Merged
merged 28 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8dbb088
Make approval-distribution logic runnable on a separate thread
alexggh Jun 17, 2024
14727c5
Move crypto checks in the approval-distribution
alexggh Jun 19, 2024
1942139
Make approval-voting runable on a worker thread
alexggh Jun 18, 2024
7175957
Address review feedback
alexggh Jul 26, 2024
469866f
Merge remote-tracking branch 'origin/alexaggh/approval-voting-paralle…
alexggh Jul 26, 2024
d9fdd51
Address review feedback
alexggh Jul 26, 2024
135fe79
Merge remote-tracking branch 'origin/alexaggh/approval-voting-paralle…
alexggh Jul 26, 2024
2752d6e
Fail early on oversized claims
alexggh Jul 29, 2024
00d22ee
Add more unittests
alexggh Jul 29, 2024
4f99f65
Merge remote-tracking branch 'origin/master' into alexaggh/approval-v…
alexggh Jul 29, 2024
f289995
Merge remote-tracking branch 'origin/alexaggh/approval-voting-paralle…
alexggh Jul 29, 2024
ead49d2
Review feedback
alexggh Aug 8, 2024
c67d99d
Add checked indirect assignment
alexggh Aug 8, 2024
23d5a28
Merge remote-tracking branch 'origin/alexaggh/approval-voting-paralle…
alexggh Aug 8, 2024
247161f
Merge remote-tracking branch 'origin/master' into alexaggh/approval-v…
alexggh Aug 12, 2024
edd7898
Merge remote-tracking branch 'origin/alexaggh/approval-voting-paralle…
alexggh Aug 12, 2024
0b140cc
Change OurViewChange order
alexggh Aug 13, 2024
0581468
Merge branch 'fix_signal_order' into alexaggh/approval-voting-paralle…
alexggh Aug 13, 2024
08889e1
Merge remote-tracking branch 'origin/master' into alexaggh/approval-v…
alexggh Aug 14, 2024
d04d4cc
Merge branch 'alexaggh/approval-voting-parallel-3-5' into alexaggh/ap…
alexggh Aug 14, 2024
b0fb6d4
Merge remote-tracking branch 'origin/master' into alexaggh/approval-v…
alexggh Aug 27, 2024
7539184
Remove crate dependency between approval-distribution and approval-vo…
alexggh Aug 27, 2024
8da0be8
Merge branch 'alexaggh/approval-voting-parallel-3-5' into alexaggh/ap…
alexggh Aug 27, 2024
377f4e1
Address review feedback
alexggh Aug 27, 2024
f5a2447
Merge remote-tracking branch 'origin/master' into alexaggh/approval-v…
alexggh Sep 9, 2024
2410bd2
Update polkadot/node/core/approval-voting/src/lib.rs
alexggh Sep 9, 2024
72150d4
Add prdoc
alexggh Sep 11, 2024
e58ec71
Make cargo fmt happy
alexggh Sep 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions polkadot/cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,16 +373,16 @@ pub fn run() -> Result<()> {
Ok(runner.async_run(|mut config| {
let (client, backend, _, task_manager) =
polkadot_service::new_chain_ops(&mut config, None)?;
let task_handle = task_manager.spawn_handle();
let aux_revert = Box::new(|client, backend, blocks| {
polkadot_service::revert_backend(client, backend, blocks, config).map_err(
|err| {
polkadot_service::revert_backend(client, backend, blocks, config, task_handle)
.map_err(|err| {
match err {
polkadot_service::Error::Blockchain(err) => err.into(),
// Generic application-specific error.
err => sc_cli::Error::Application(err.into()),
}
},
)
})
});
Ok((
cmd.run(client, backend, Some(aux_revert)).map_err(Error::SubstrateCli),
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/core/approval-voting/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ kvdb = { workspace = true }
derive_more = { workspace = true, default-features = true }
thiserror = { workspace = true }
itertools = { workspace = true }
async-trait = { workspace = true }

polkadot-node-subsystem = { workspace = true, default-features = true }
polkadot-node-subsystem-util = { workspace = true, default-features = true }
Expand Down
114 changes: 65 additions & 49 deletions polkadot/node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use polkadot_node_subsystem::{
overseer, RuntimeApiError, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{determine_new_blocks, runtime::RuntimeInfo};
use polkadot_overseer::SubsystemSender;
use polkadot_primitives::{
node_features, BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, ConsensusLog,
CoreIndex, GroupIndex, Hash, Header, SessionIndex,
Expand Down Expand Up @@ -111,8 +112,8 @@ enum ImportedBlockInfoError {
/// Computes information about the imported block. Returns an error if the info couldn't be
/// extracted.
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
async fn imported_block_info<Context>(
ctx: &mut Context,
async fn imported_block_info<Sender: SubsystemSender<RuntimeApiMessage>>(
sender: &mut Sender,
env: ImportedBlockInfoEnv<'_>,
block_hash: Hash,
block_header: &Header,
Expand All @@ -124,11 +125,12 @@ async fn imported_block_info<Context>(
// fetch candidates
let included_candidates: Vec<_> = {
let (c_tx, c_rx) = oneshot::channel();
ctx.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CandidateEvents(c_tx),
))
.await;
sender
.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CandidateEvents(c_tx),
))
.await;

let events: Vec<CandidateEvent> = match c_rx.await {
Ok(Ok(events)) => events,
Expand All @@ -151,11 +153,12 @@ async fn imported_block_info<Context>(
// short, that shouldn't happen.
let session_index = {
let (s_tx, s_rx) = oneshot::channel();
ctx.send_message(RuntimeApiMessage::Request(
block_header.parent_hash,
RuntimeApiRequest::SessionIndexForChild(s_tx),
))
.await;
sender
.send_message(RuntimeApiMessage::Request(
block_header.parent_hash,
RuntimeApiRequest::SessionIndexForChild(s_tx),
))
.await;

let session_index = match s_rx.await {
Ok(Ok(s)) => s,
Expand Down Expand Up @@ -201,11 +204,12 @@ async fn imported_block_info<Context>(
// by one block. This gives us the opposite invariant for sessions - the parent block's
// post-state gives us the canonical information about the session index for any of its
// children, regardless of which slot number they might be produced at.
ctx.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CurrentBabeEpoch(s_tx),
))
.await;
sender
.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CurrentBabeEpoch(s_tx),
))
.await;

match s_rx.await {
Ok(Ok(s)) => s,
Expand All @@ -216,7 +220,7 @@ async fn imported_block_info<Context>(
};

let extended_session_info =
get_extended_session_info(env.runtime_info, ctx.sender(), block_hash, session_index).await;
get_extended_session_info(env.runtime_info, sender, block_hash, session_index).await;
let enable_v2_assignments = extended_session_info.map_or(false, |extended_session_info| {
*extended_session_info
.node_features
Expand All @@ -225,7 +229,7 @@ async fn imported_block_info<Context>(
.unwrap_or(&false)
});

let session_info = get_session_info(env.runtime_info, ctx.sender(), block_hash, session_index)
let session_info = get_session_info(env.runtime_info, sender, block_hash, session_index)
.await
.ok_or(ImportedBlockInfoError::SessionInfoUnavailable)?;

Expand Down Expand Up @@ -329,9 +333,15 @@ pub struct BlockImportedCandidates {
/// * and return information about all candidates imported under each block.
///
/// It is the responsibility of the caller to schedule wakeups for each block.
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
pub(crate) async fn handle_new_head<Context, B: Backend>(
ctx: &mut Context,
pub(crate) async fn handle_new_head<
Sender: SubsystemSender<ChainApiMessage>
+ SubsystemSender<RuntimeApiMessage>
+ SubsystemSender<ChainSelectionMessage>,
AVSender: SubsystemSender<ApprovalDistributionMessage>,
B: Backend,
>(
sender: &mut Sender,
approval_voting_sender: &mut AVSender,
state: &State,
db: &mut OverlayedBackend<'_, B>,
session_info_provider: &mut RuntimeInfo,
Expand All @@ -349,7 +359,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(

let header = {
let (h_tx, h_rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await;
sender.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await;
match h_rx.await? {
Err(e) => {
gum::debug!(
Expand All @@ -375,7 +385,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
let lower_bound_number = finalized_number.unwrap_or(lower_bound_number).max(lower_bound_number);

let new_blocks = determine_new_blocks(
ctx.sender(),
sender,
|h| db.load_block_entry(h).map(|e| e.is_some()),
head,
&header,
Expand All @@ -401,12 +411,15 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
keystore: &state.keystore,
};

match imported_block_info(ctx, env, block_hash, &block_header, finalized_number).await {
match imported_block_info(sender, env, block_hash, &block_header, finalized_number)
.await
{
Ok(i) => imported_blocks_and_info.push((block_hash, block_header, i)),
Err(error) => {
// It's possible that we've lost a race with finality.
let (tx, rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx))
sender
.send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx))
.await;

let lost_to_finality = match rx.await {
Expand Down Expand Up @@ -450,17 +463,11 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
force_approve,
} = imported_block_info;

let session_info = match get_session_info(
session_info_provider,
ctx.sender(),
head,
session_index,
)
.await
{
Some(session_info) => session_info,
None => return Ok(Vec::new()),
};
let session_info =
match get_session_info(session_info_provider, sender, head, session_index).await {
Some(session_info) => session_info,
None => return Ok(Vec::new()),
};

let (block_tick, no_show_duration) = {
let block_tick = slot_number_to_tick(state.slot_duration_millis, slot);
Expand Down Expand Up @@ -510,7 +517,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
};
// If all bits are already set, then send an approve message.
if approved_bitfield.count_ones() == approved_bitfield.len() {
ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await;
sender.send_message(ChainSelectionMessage::Approved(block_hash)).await;
}

let block_entry = v3::BlockEntry {
Expand Down Expand Up @@ -567,7 +574,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(

// Notify chain-selection of all approved hashes.
for hash in approved_hashes {
ctx.send_message(ChainSelectionMessage::Approved(hash)).await;
sender.send_message(ChainSelectionMessage::Approved(hash)).await;
}
}

Expand Down Expand Up @@ -603,7 +610,8 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
"Informing distribution of newly imported chain",
);

ctx.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta));
approval_voting_sender
.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta));
Ok(imported_candidates)
}

Expand All @@ -620,7 +628,10 @@ pub(crate) mod tests {
approval::v1::{VrfSignature, VrfTranscript},
DISPUTE_WINDOW,
};
use polkadot_node_subsystem::messages::{AllMessages, ApprovalVotingMessage};
use polkadot_node_subsystem::{
messages::{AllMessages, ApprovalVotingMessage},
SubsystemContext,
};
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_node_subsystem_util::database::Database;
use polkadot_primitives::{
Expand Down Expand Up @@ -662,7 +673,7 @@ pub(crate) mod tests {
State {
keystore: Arc::new(LocalKeystore::in_memory()),
slot_duration_millis: 6_000,
clock: Box::new(MockClock::default()),
clock: Arc::new(MockClock::default()),
assignment_criteria: Box::new(MockAssignmentCriteria::default()),
spans: HashMap::new(),
per_block_assignments_gathering_times: LruMap::new(ByLength::new(
Expand Down Expand Up @@ -806,8 +817,9 @@ pub(crate) mod tests {
keystore: &LocalKeystore::in_memory(),
};

let info =
imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await.unwrap();
let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(4))
.await
.unwrap();

assert_eq!(info.included_candidates, included_candidates);
assert_eq!(info.session_index, session);
Expand Down Expand Up @@ -953,7 +965,7 @@ pub(crate) mod tests {
keystore: &LocalKeystore::in_memory(),
};

let info = imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await;
let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(4)).await;

assert_matches!(info, Err(ImportedBlockInfoError::VrfInfoUnavailable));
})
Expand Down Expand Up @@ -1092,7 +1104,7 @@ pub(crate) mod tests {
keystore: &LocalKeystore::in_memory(),
};

let info = imported_block_info(&mut ctx, env, hash, &header, &Some(6)).await;
let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(6)).await;

assert_matches!(info, Err(ImportedBlockInfoError::BlockAlreadyFinalized));
})
Expand Down Expand Up @@ -1128,7 +1140,8 @@ pub(crate) mod tests {
#[test]
fn imported_block_info_extracts_force_approve() {
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context(pool.clone());
let (mut ctx, mut handle) =
make_subsystem_context::<ApprovalVotingMessage, _>(pool.clone());

let session = 5;
let session_info = dummy_session_info(session);
Expand Down Expand Up @@ -1191,7 +1204,7 @@ pub(crate) mod tests {
};

let info =
imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await.unwrap();
imported_block_info(ctx.sender(), env, hash, &header, &Some(4)).await.unwrap();

assert_eq!(info.included_candidates, included_candidates);
assert_eq!(info.session_index, session);
Expand Down Expand Up @@ -1384,8 +1397,11 @@ pub(crate) mod tests {
let test_fut = {
Box::pin(async move {
let mut overlay_db = OverlayedBackend::new(&db);

let mut approval_voting_sender = ctx.sender().clone();
let result = handle_new_head(
&mut ctx,
ctx.sender(),
&mut approval_voting_sender,
&state,
&mut overlay_db,
&mut session_info_provider,
Expand Down
Loading
Loading