Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
- parametrise backing votes runtime API with session index
- remove RuntimeInfo usage in backing subsystem, as runtime API
caches the min backing votes by session index anyway.
- move the logic for adjusting the configured needed backing votes with the size of the backing group
to a primitives helper.
- move the legacy min backing votes value to a primitives helper.
- mark JoinMultiple error as fatal, since the Canceled (non-multiple) counterpart is also fatal.
- make backing subsystem handle fatal errors for new leaves update.
- add HostConfiguration consistency check for zeroed backing votes threshold
- add cumulus accompanying change
  • Loading branch information
alindima committed Aug 28, 2023
1 parent be44e33 commit e7e82bc
Show file tree
Hide file tree
Showing 21 changed files with 171 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,14 @@ impl RuntimeApiSubsystemClient for BlockChainRpcClient {
.await?)
}

async fn minimum_backing_votes(
&self,
at: Hash,
session_index: polkadot_primitives::SessionIndex,
) -> Result<u32, ApiError> {
Ok(self.rpc_client.parachain_host_minimum_backing_votes(at, session_index).await?)
}

async fn staging_async_backing_params(&self, at: Hash) -> Result<AsyncBackingParams, ApiError> {
Ok(self.rpc_client.parachain_host_staging_async_backing_params(at).await?)
}
Expand Down
10 changes: 10 additions & 0 deletions cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,16 @@ impl RelayChainRpcClient {
.await
}

/// Get the minimum number of backing votes for a candidate.
pub async fn parachain_host_minimum_backing_votes(
&self,
at: RelayHash,
_session_index: SessionIndex,
) -> Result<u32, RelayChainError> {
self.call_remote_runtime_function("ParachainHost_minimum_backing_votes", at, None::<()>)
.await
}

#[allow(missing_docs)]
pub async fn parachain_host_staging_async_backing_params(
&self,
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/core/backing/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub enum Error {
RuntimeApiUnavailable(#[source] oneshot::Canceled),

#[error("a channel was closed before receipt in try_join!")]
#[fatal]
JoinMultiple(#[source] oneshot::Canceled),

#[error("Obtaining erasure chunks failed")]
Expand Down
51 changes: 17 additions & 34 deletions polkadot/node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ use polkadot_node_subsystem::{
use polkadot_node_subsystem_util::{
self as util,
backing_implicit_view::{FetchError as ImplicitViewFetchError, View as ImplicitView},
request_from_runtime, request_validator_groups, request_validators,
runtime::{prospective_parachains_mode, ProspectiveParachainsMode},
request_from_runtime, request_session_index_for_child, request_validator_groups,
request_validators,
runtime::{
self, prospective_parachains_mode, request_min_backing_votes, ProspectiveParachainsMode,
},
Validator,
};
use polkadot_primitives::{
Expand All @@ -115,7 +118,6 @@ use statement_table::{
},
Config as TableConfig, Context as TableContextTrait, Table,
};
use util::runtime::{request_min_backing_votes, RuntimeInfo};

mod error;

Expand Down Expand Up @@ -277,8 +279,6 @@ struct State {
background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
/// The handle to the keystore used for signing.
keystore: KeystorePtr,
/// Runtime info cached per-session.
runtime_info: RuntimeInfo,
}

impl State {
Expand All @@ -293,7 +293,6 @@ impl State {
per_candidate: HashMap::new(),
background_validation_tx,
keystore,
runtime_info: RuntimeInfo::new(None),
}
}
}
Expand Down Expand Up @@ -948,14 +947,7 @@ async fn handle_active_leaves_update<Context>(

// construct a `PerRelayParent` from the runtime API
// and insert it.
let per = construct_per_relay_parent_state(
ctx,
maybe_new,
&state.keystore,
&mut state.runtime_info,
mode,
)
.await?;
let per = construct_per_relay_parent_state(ctx, maybe_new, &state.keystore, mode).await?;

if let Some(per) = per {
state.per_relay_parent.insert(maybe_new, per);
Expand All @@ -971,41 +963,29 @@ async fn construct_per_relay_parent_state<Context>(
ctx: &mut Context,
relay_parent: Hash,
keystore: &KeystorePtr,
runtime_info: &mut RuntimeInfo,
mode: ProspectiveParachainsMode,
) -> Result<Option<PerRelayParentState>, Error> {
macro_rules! try_runtime_api {
($x: expr) => {
match $x {
Ok(x) => x,
Err(e) => {
gum::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to fetch runtime API data for job",
);
Err(err) => {
// Only bubble up fatal errors.
error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;

// We can't do candidate validation work if we don't have the
// requisite runtime API data. But these errors should not take
// down the node.
return Ok(None);
}
return Ok(None)
},
}
}
};
}

let parent = relay_parent;

let session_index =
try_runtime_api!(runtime_info.get_session_index_for_child(ctx.sender(), parent).await);

let minimum_backing_votes =
request_min_backing_votes(parent, ctx.sender(), |parent, sender| {
runtime_info.get_min_backing_votes(sender, session_index, parent)
})
.await?;

let (validators, groups, cores) = futures::try_join!(
let (session_index, validators, groups, cores) = futures::try_join!(
request_session_index_for_child(parent, ctx.sender()).await,
request_validators(parent, ctx.sender()).await,
request_validator_groups(parent, ctx.sender()).await,
request_from_runtime(parent, ctx.sender(), |tx| {
Expand All @@ -1015,9 +995,12 @@ async fn construct_per_relay_parent_state<Context>(
)
.map_err(Error::JoinMultiple)?;

let session_index = try_runtime_api!(session_index);
let validators: Vec<_> = try_runtime_api!(validators);
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
let cores = try_runtime_api!(cores);
let minimum_backing_votes =
try_runtime_api!(request_min_backing_votes(parent, session_index, ctx.sender()).await);

let signing_context = SigningContext { parent_hash: parent, session_index };
let validator =
Expand Down
10 changes: 0 additions & 10 deletions polkadot/node/core/backing/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,16 +262,6 @@ async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &TestS
}
);

// Check that subsystem job issues a request for the runtime API version.
// assert_matches!(
// virtual_overseer.recv().await,
// AllMessages::RuntimeApi(
// RuntimeApiMessage::Request(parent, RuntimeApiRequest::Version(tx))
// ) if parent == test_state.relay_parent => {
// tx.send(Ok(RuntimeApiRequest::MINIMUM_BACKING_VOTES_RUNTIME_REQUIREMENT)).unwrap();
// }
// );

// Check if subsystem job issues a request for the minimum backing votes.
// This may or may not happen, depending if the minimum backing votes is already cached in the
// RuntimeInfo.
Expand Down
30 changes: 15 additions & 15 deletions polkadot/node/core/runtime-api/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const DEFAULT_CACHE_CAP: NonZeroUsize = match NonZeroUsize::new(128) {
pub(crate) struct RequestResultCache {
authorities: LruCache<Hash, Vec<AuthorityDiscoveryId>>,
validators: LruCache<Hash, Vec<ValidatorId>>,
minimum_backing_votes: LruCache<Hash, u32>,
validator_groups: LruCache<Hash, (Vec<Vec<ValidatorIndex>>, GroupRotationInfo)>,
availability_cores: LruCache<Hash, Vec<CoreState>>,
persisted_validation_data:
Expand Down Expand Up @@ -69,6 +68,7 @@ pub(crate) struct RequestResultCache {
LruCache<Hash, Vec<(SessionIndex, CandidateHash, vstaging::slashing::PendingSlashes)>>,
key_ownership_proof:
LruCache<(Hash, ValidatorId), Option<vstaging::slashing::OpaqueKeyOwnershipProof>>,
minimum_backing_votes: LruCache<SessionIndex, u32>,

staging_para_backing_state: LruCache<(Hash, ParaId), Option<vstaging::BackingState>>,
staging_async_backing_params: LruCache<Hash, vstaging::AsyncBackingParams>,
Expand All @@ -79,7 +79,6 @@ impl Default for RequestResultCache {
Self {
authorities: LruCache::new(DEFAULT_CACHE_CAP),
validators: LruCache::new(DEFAULT_CACHE_CAP),
minimum_backing_votes: LruCache::new(DEFAULT_CACHE_CAP),
validator_groups: LruCache::new(DEFAULT_CACHE_CAP),
availability_cores: LruCache::new(DEFAULT_CACHE_CAP),
persisted_validation_data: LruCache::new(DEFAULT_CACHE_CAP),
Expand All @@ -102,6 +101,7 @@ impl Default for RequestResultCache {
disputes: LruCache::new(DEFAULT_CACHE_CAP),
unapplied_slashes: LruCache::new(DEFAULT_CACHE_CAP),
key_ownership_proof: LruCache::new(DEFAULT_CACHE_CAP),
minimum_backing_votes: LruCache::new(DEFAULT_CACHE_CAP),

staging_para_backing_state: LruCache::new(DEFAULT_CACHE_CAP),
staging_async_backing_params: LruCache::new(DEFAULT_CACHE_CAP),
Expand Down Expand Up @@ -133,18 +133,6 @@ impl RequestResultCache {
self.validators.put(relay_parent, validators);
}

pub(crate) fn minimum_backing_votes(&mut self, relay_parent: &Hash) -> Option<u32> {
self.minimum_backing_votes.get(relay_parent).copied()
}

pub(crate) fn cache_minimum_backing_votes(
&mut self,
relay_parent: Hash,
minimum_backing_votes: u32,
) {
self.minimum_backing_votes.put(relay_parent, minimum_backing_votes);
}

pub(crate) fn validator_groups(
&mut self,
relay_parent: &Hash,
Expand Down Expand Up @@ -451,6 +439,18 @@ impl RequestResultCache {
None
}

pub(crate) fn minimum_backing_votes(&mut self, session_index: SessionIndex) -> Option<u32> {
self.minimum_backing_votes.get(&session_index).copied()
}

pub(crate) fn cache_minimum_backing_votes(
&mut self,
session_index: SessionIndex,
minimum_backing_votes: u32,
) {
self.minimum_backing_votes.put(session_index, minimum_backing_votes);
}

pub(crate) fn staging_para_backing_state(
&mut self,
key: (Hash, ParaId),
Expand Down Expand Up @@ -486,7 +486,7 @@ pub(crate) enum RequestResult {
// The structure of each variant is (relay_parent, [params,]*, result)
Authorities(Hash, Vec<AuthorityDiscoveryId>),
Validators(Hash, Vec<ValidatorId>),
MinimumBackingVotes(Hash, u32),
MinimumBackingVotes(Hash, SessionIndex, u32),
ValidatorGroups(Hash, (Vec<Vec<ValidatorIndex>>, GroupRotationInfo)),
AvailabilityCores(Hash, Vec<CoreState>),
PersistedValidationData(Hash, ParaId, OccupiedCoreAssumption, Option<PersistedValidationData>),
Expand Down
19 changes: 13 additions & 6 deletions polkadot/node/core/runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ where
self.requests_cache.cache_authorities(relay_parent, authorities),
Validators(relay_parent, validators) =>
self.requests_cache.cache_validators(relay_parent, validators),
MinimumBackingVotes(relay_parent, minimum_backing_votes) => self
MinimumBackingVotes(_, session_index, minimum_backing_votes) => self
.requests_cache
.cache_minimum_backing_votes(relay_parent, minimum_backing_votes),
.cache_minimum_backing_votes(session_index, minimum_backing_votes),
ValidatorGroups(relay_parent, groups) =>
self.requests_cache.cache_validator_groups(relay_parent, groups),
AvailabilityCores(relay_parent, cores) =>
Expand Down Expand Up @@ -304,8 +304,15 @@ where
Request::StagingAsyncBackingParams(sender) =>
query!(staging_async_backing_params(), sender)
.map(|sender| Request::StagingAsyncBackingParams(sender)),
Request::MinimumBackingVotes(sender) => query!(minimum_backing_votes(), sender)
.map(|sender| Request::MinimumBackingVotes(sender)),
Request::MinimumBackingVotes(index, sender) => {
if let Some(value) = self.requests_cache.minimum_backing_votes(index) {
self.metrics.on_cached_request();
let _ = sender.send(Ok(value));
None
} else {
Some(Request::MinimumBackingVotes(index, sender))
}
},
}
}

Expand Down Expand Up @@ -556,9 +563,9 @@ where
ver = Request::SUBMIT_REPORT_DISPUTE_LOST_RUNTIME_REQUIREMENT,
sender
),
Request::MinimumBackingVotes(sender) => query!(
Request::MinimumBackingVotes(index, sender) => query!(
MinimumBackingVotes,
minimum_backing_votes(),
minimum_backing_votes(index),
ver = Request::MINIMUM_BACKING_VOTES_RUNTIME_REQUIREMENT,
sender
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

//! A utility for tracking groups and their members within a session.

use polkadot_primitives::vstaging::{GroupIndex, IndexedVec, ValidatorIndex};
use polkadot_primitives::{
effective_minimum_backing_votes,
vstaging::{GroupIndex, IndexedVec, ValidatorIndex},
};

use std::collections::HashMap;

Expand Down Expand Up @@ -64,7 +67,7 @@ impl Groups {
group_index: GroupIndex,
) -> Option<(usize, usize)> {
self.get(group_index)
.map(|g| (g.len(), std::cmp::min(g.len(), self.backing_threshold as usize)))
.map(|g| (g.len(), effective_minimum_backing_votes(g.len(), self.backing_threshold)))
}

/// Get the group index for a validator by index.
Expand Down
20 changes: 3 additions & 17 deletions polkadot/node/network/statement-distribution/src/vstaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,10 @@ use polkadot_node_subsystem::{
},
overseer, ActivatedLeaf,
};
use polkadot_node_subsystem_types::messages::RuntimeApiRequest;
use polkadot_node_subsystem_util::{
backing_implicit_view::View as ImplicitView,
reputation::ReputationAggregator,
request_from_runtime,
runtime::{recv_runtime, request_min_backing_votes, ProspectiveParachainsMode},
runtime::{request_min_backing_votes, ProspectiveParachainsMode},
};
use polkadot_primitives::vstaging::{
AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, CoreState, GroupIndex,
Expand Down Expand Up @@ -507,20 +505,8 @@ pub(crate) async fn handle_active_leaves_update<Context>(
Some(s) => s,
};

let minimum_backing_votes = request_min_backing_votes(
new_relay_parent,
ctx.sender(),
|parent, sender| async move {
recv_runtime(
request_from_runtime(parent, sender, |tx| {
RuntimeApiRequest::MinimumBackingVotes(tx)
})
.await,
)
.await
},
)
.await?;
let minimum_backing_votes =
request_min_backing_votes(new_relay_parent, session_index, ctx.sender()).await?;

state.per_session.insert(
session_index,
Expand Down
4 changes: 2 additions & 2 deletions polkadot/node/subsystem-types/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,6 @@ pub enum RuntimeApiRequest {
Authorities(RuntimeApiSender<Vec<AuthorityDiscoveryId>>),
/// Get the current validator set.
Validators(RuntimeApiSender<Vec<ValidatorId>>),
/// Get the minimum required backing votes.
MinimumBackingVotes(RuntimeApiSender<u32>),
/// Get the validator groups and group rotation info.
ValidatorGroups(RuntimeApiSender<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo)>),
/// Get information on all availability cores.
Expand Down Expand Up @@ -693,6 +691,8 @@ pub enum RuntimeApiRequest {
slashing::OpaqueKeyOwnershipProof,
RuntimeApiSender<Option<()>>,
),
/// Get the minimum required backing votes.
MinimumBackingVotes(SessionIndex, RuntimeApiSender<u32>),

/// Get the backing state of the given para.
/// This is a staging API that will not be available on production runtimes.
Expand Down
12 changes: 10 additions & 2 deletions polkadot/node/subsystem-types/src/runtime_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,11 @@ pub trait RuntimeApiSubsystemClient {

// === STAGING v6 ===
/// Get the minimum number of backing votes.
async fn minimum_backing_votes(&self, at: Hash) -> Result<u32, ApiError>;
async fn minimum_backing_votes(
&self,
at: Hash,
session_index: SessionIndex,
) -> Result<u32, ApiError>;

// === Asynchronous backing API ===

Expand Down Expand Up @@ -477,7 +481,11 @@ where
runtime_api.submit_report_dispute_lost(at, dispute_proof, key_ownership_proof)
}

async fn minimum_backing_votes(&self, at: Hash) -> Result<u32, ApiError> {
async fn minimum_backing_votes(
&self,
at: Hash,
_session_index: SessionIndex,
) -> Result<u32, ApiError> {
self.client.runtime_api().minimum_backing_votes(at)
}

Expand Down
Loading

0 comments on commit e7e82bc

Please sign in to comment.