From 56a3e1cb66efaec4e4320ef9476579bb62ecb4c7 Mon Sep 17 00:00:00 2001 From: iceseer Date: Tue, 9 Apr 2024 22:17:02 +0300 Subject: [PATCH] cores fixup Signed-off-by: iceseer --- core/parachain/backing/store.hpp | 3 +- core/parachain/backing/store_impl.cpp | 29 +- core/parachain/backing/store_impl.hpp | 6 +- .../validator/impl/parachain_processor.cpp | 282 +++++++++++++----- .../validator/parachain_processor.hpp | 19 +- .../runtime_api/parachain_host_types.hpp | 26 ++ 6 files changed, 273 insertions(+), 92 deletions(-) diff --git a/core/parachain/backing/store.hpp b/core/parachain/backing/store.hpp index 476d5c5589..7e8705dc68 100644 --- a/core/parachain/backing/store.hpp +++ b/core/parachain/backing/store.hpp @@ -43,7 +43,8 @@ namespace kagome::parachain { virtual std::optional put( const RelayHash &relay_parent, - const std::unordered_map> + GroupIndex group_id, + const std::unordered_map> &groups, Statement statement, bool allow_multiple_seconded) = 0; diff --git a/core/parachain/backing/store_impl.cpp b/core/parachain/backing/store_impl.cpp index 2d9ad05309..4ec97bb9e5 100644 --- a/core/parachain/backing/store_impl.cpp +++ b/core/parachain/backing/store_impl.cpp @@ -60,24 +60,29 @@ namespace kagome::parachain { ValidatorIndex from, const CandidateHash &digest, const ValidityVote &vote) { + std::cout << fmt::format("1 {}\n", __PRETTY_FUNCTION__); auto it = state.candidate_votes_.find(digest); if (it == state.candidate_votes_.end()) { + std::cout << fmt::format("2 {}\n", __PRETTY_FUNCTION__); return std::nullopt; } BackingStore::StatementInfo &votes = it->second; if (!is_in_group(groups, votes.group_id, from)) { + std::cout << fmt::format("3 {}\n", __PRETTY_FUNCTION__); return Error::UNAUTHORIZED_STATEMENT; } auto i = votes.validity_votes.find(from); if (i != votes.validity_votes.end()) { if (i->second != vote) { + std::cout << fmt::format("4 {}\n", __PRETTY_FUNCTION__); return Error::DOUBLE_VOTE; } return std::nullopt; } + std::cout << fmt::format("5 {}\n", __PRETTY_FUNCTION__); votes.validity_votes[from] = vote; return BackingStore::ImportResult{ .candidate = digest, @@ -89,17 +94,19 @@ namespace kagome::parachain { outcome::result> BackingStoreImpl::import_candidate( PerRelayParent &state, - const std::unordered_map> + GroupIndex group_id, + const std::unordered_map> &groups, ValidatorIndex authority, const network::CommittedCandidateReceipt &candidate, const ValidatorSignature &signature, bool allow_multiple_seconded) { - const auto group = candidate.descriptor.para_id; - if (auto it = groups.find(group); + std::cout << fmt::format("1 {}\n", __PRETTY_FUNCTION__); + if (auto it = groups.find(group_id); it == groups.end() || std::find(it->second.begin(), it->second.end(), authority) == it->second.end()) { + std::cout << fmt::format("2 {}\n", __PRETTY_FUNCTION__); return Error::UNAUTHORIZED_STATEMENT; } @@ -111,8 +118,10 @@ namespace kagome::parachain { if (!allow_multiple_seconded && existing.proposals.size() == 1) { const auto &[old_digest, old_sig] = existing.proposals[0]; if (old_digest != digest) { + std::cout << fmt::format("3 {}\n", __PRETTY_FUNCTION__); return Error::MULTIPLE_CANDIDATES; } + std::cout << fmt::format("4 {}\n", __PRETTY_FUNCTION__); new_proposal = false; } else if (allow_multiple_seconded && std::find_if(existing.proposals.begin(), @@ -122,21 +131,26 @@ namespace kagome::parachain { return h == digest; }) != existing.proposals.end()) { + std::cout << fmt::format("5 {}\n", __PRETTY_FUNCTION__); new_proposal = false; } else { + std::cout << fmt::format("6 {}\n", __PRETTY_FUNCTION__); existing.proposals.emplace_back(digest, signature); new_proposal = true; } } else { + std::cout << fmt::format("7 {}\n", __PRETTY_FUNCTION__); auto &ad = state.authority_data_[authority]; ad.proposals.emplace_back(digest, signature); new_proposal = true; } + std::cout << fmt::format("8 {}\n", __PRETTY_FUNCTION__); if (new_proposal) { + std::cout << fmt::format("9 {}\n", __PRETTY_FUNCTION__); auto &cv = state.candidate_votes_[digest]; cv.candidate = candidate; - cv.group_id = group; + cv.group_id = group_id; } return validity_vote( @@ -145,7 +159,8 @@ namespace kagome::parachain { std::optional BackingStoreImpl::put( const RelayHash &relay_parent, - const std::unordered_map> + GroupIndex group_id, + const std::unordered_map> &groups, Statement stm, bool allow_multiple_seconded) { @@ -154,6 +169,7 @@ namespace kagome::parachain { [&](PerRelayParent &state) { per_rp_state = state; }); if (!per_rp_state) { + std::cout << fmt::format("1 {}\n", __PRETTY_FUNCTION__); return std::nullopt; } @@ -165,6 +181,7 @@ namespace kagome::parachain { statement.candidate_state, [&](const network::CommittedCandidateReceipt &candidate) { return import_candidate(per_rp_state->get(), + group_id, groups, signer, candidate, @@ -180,10 +197,12 @@ namespace kagome::parachain { }, [](const auto &) { UNREACHABLE; + std::cout << fmt::format("UNREACHABLE\n"); return Error::CRITICAL_ERROR; }); if (res.has_error()) { + std::cout << fmt::format("2 {}\n", __PRETTY_FUNCTION__); return std::nullopt; } return res.value(); diff --git a/core/parachain/backing/store_impl.hpp b/core/parachain/backing/store_impl.hpp index c490a0cf97..7949e649b3 100644 --- a/core/parachain/backing/store_impl.hpp +++ b/core/parachain/backing/store_impl.hpp @@ -28,7 +28,8 @@ namespace kagome::parachain { std::optional put( const RelayHash &relay_parent, - const std::unordered_map> + GroupIndex group_id, + const std::unordered_map> &groups, Statement statement, bool allow_multiple_seconded) override; @@ -86,7 +87,8 @@ namespace kagome::parachain { const ValidityVote &vote); outcome::result> import_candidate( PerRelayParent &state, - const std::unordered_map> + GroupIndex group_id, + const std::unordered_map> &groups, ValidatorIndex authority, const network::CommittedCandidateReceipt &candidate, diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index 0929fd3406..f1c8b12d96 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -57,6 +57,8 @@ OUTCOME_CPP_DEFINE_CATEGORY(kagome::parachain, return "Validate and make available skipped"; case E::OUT_OF_VIEW: return "Out of view"; + case E::CORE_INDEX_UNAVAILABLE: + return "Core index unavailable"; case E::DUPLICATE: return "Duplicate"; case E::NO_INSTANCE: @@ -854,7 +856,7 @@ namespace kagome::parachain { parachain_host_->session_index_for_child(relay_parent)); OUTCOME_TRY(session_info, parachain_host_->session_info(relay_parent, session_index)); - auto &[validator_groups, group_rotation_info] = groups; + const auto &[validator_groups, group_rotation_info] = groups; if (!validator) { SL_TRACE(logger_, "Not a validator, or no para keys."); @@ -862,29 +864,62 @@ namespace kagome::parachain { } is_parachain_validator = true; + auto mode = + prospective_parachains_->prospectiveParachainsMode(relay_parent); const auto n_cores = cores.size(); - std::optional assignment; - std::optional required_collator; - - std::unordered_map> out_groups; - for (CoreIndex core_index = 0; - core_index < static_cast(cores.size()); - ++core_index) { - if (const auto *scheduled = - std::get_if(&cores[core_index])) { - const auto group_index = - group_rotation_info.groupForCore(core_index, n_cores); - if (group_index < validator_groups.size()) { - auto &g = validator_groups[group_index]; - if (g.contains(validator->validatorIndex())) { - assignment = scheduled->para_id; - required_collator = scheduled->collator; + + std::unordered_map> out_groups; + std::optional assigned_core; + std::optional assigned_para; + + for (CoreIndex idx = 0; idx < static_cast(cores.size()); ++idx) { + std::optional core_para_id = visit_in_place(cores[idx], + [&](const runtime::OccupiedCore &occupied) -> std::optional { + if (mode) { + if (occupied.next_up_on_available) { + return occupied.next_up_on_available->para_id; + } else { + return std::nullopt; + } + } else { + return std::nullopt; } - out_groups[scheduled->para_id] = g.validators; + }, + [](const runtime::ScheduledCore &scheduled) -> std::optional { + return scheduled.para_id; + }, + [](const runtime::FreeCore &) -> std::optional { + return std::nullopt; + } + ); + + if (!core_para_id) { + continue; + } + + const CoreIndex core_index = idx; + const GroupIndex group_index = group_rotation_info.groupForCore(core_index, n_cores); + + if (group_index < validator_groups.size()) { + const auto &g = validator_groups[group_index]; + if (validator && g.contains(validator->validatorIndex())) { + assigned_para = core_para_id; + assigned_core = core_index; } + out_groups.emplace(core_index, g.validators); } } + std::vector> validator_to_group; + validator_to_group.resize(validators.size()); + for (GroupIndex group_idx = 0; group_idx < validator_groups.size(); ++group_idx) { + const auto &validator_group = validator_groups[group_idx]; + for (const auto v : validator_group.validators) { + SL_TRACE(logger_, "Bind {} -> {}", v, group_idx); + validator_to_group[v] = group_idx; + } + } + uint32_t minimum_backing_votes = 2; /// legacy value if (auto r = parachain_host_->minimum_backing_votes(relay_parent, session_index); @@ -912,8 +947,6 @@ namespace kagome::parachain { } std::optional statement_store; - auto mode = - prospective_parachains_->prospectiveParachainsMode(relay_parent); if (mode) { [[maybe_unused]] const auto _ = our_current_state_.implicit_view->activate_leaf(relay_parent); @@ -923,9 +956,10 @@ namespace kagome::parachain { } SL_VERBOSE(logger_, - "Inited new backing task v2.(assignment={}, our index={}, relay " + "Inited new backing task v2.(assigned_para={}, assigned_core={}, our index={}, relay " "parent={})", - assignment, + assigned_para, + assigned_core, validator->validatorIndex(), relay_parent); @@ -935,11 +969,12 @@ namespace kagome::parachain { } return RelayParentState{ .prospective_parachains_mode = mode, - .assignment = assignment, + .assigned_core = assigned_core, + .assigned_para = assigned_para, + .validator_to_group = std::move(validator_to_group), .seconded = {}, .our_index = validator->validatorIndex(), .our_group = our_group, - .required_collator = required_collator, .collations = {}, .table_context = TableContext{ @@ -1026,21 +1061,12 @@ namespace kagome::parachain { /// fetched_candidates ??? auto ¶chain_state = opt_parachain_state->get(); - auto &assignment = parachain_state.assignment; + const auto &assignment = parachain_state.assigned_para; // auto &seconded = parachain_state.seconded; auto &issued_statements = parachain_state.issued_statements; - if (parachain_state.required_collator - && *parachain_state.required_collator != descriptor.collator_id) { - SL_WARN(logger_, - "Fetched collation from wrong collator: received {} from {}", - descriptor.collator_id, - pending_collation.peer_id); - return; - } - const auto candidate_para_id = descriptor.para_id; - if (candidate_para_id != assignment) { + if (!assignment || candidate_para_id != *assignment) { SL_WARN(logger_, "Try to second for para_id {} out of our assignment {}.", candidate_para_id, @@ -1753,6 +1779,7 @@ namespace kagome::parachain { "candidate_hash={})", stm.relay_parent, candidateHash(getPayload(stm.compact))); + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); auto parachain_state = tryGetStateByRelayParent(stm.relay_parent); if (!parachain_state) { SL_TRACE(logger_, @@ -2136,6 +2163,7 @@ namespace kagome::parachain { candidate_hash, group_index); + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); if (r.has_error()) { SL_INFO(logger_, "Fetch attested candidate returned an error. (relay parent={}, " @@ -2239,17 +2267,20 @@ namespace kagome::parachain { void ParachainProcessorImpl::new_confirmed_candidate_fragment_tree_updates( const HypotheticalCandidate &candidate) { + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); fragment_tree_update_inner(std::nullopt, std::nullopt, {candidate}); } void ParachainProcessorImpl::new_leaf_fragment_tree_updates( const Hash &leaf_hash) { + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); fragment_tree_update_inner({leaf_hash}, std::nullopt, std::nullopt); } void ParachainProcessorImpl::prospective_backed_notification_fragment_tree_updates( ParachainId para_id, const Hash ¶_head) { + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); std::pair, ParachainId> p{{para_head}, para_id}; fragment_tree_update_inner(std::nullopt, p, std::nullopt); @@ -2261,6 +2292,8 @@ namespace kagome::parachain { required_parent_info, std::optional> known_hypotheticals) { + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); + std::vector hypotheticals; if (!known_hypotheticals) { hypotheticals = candidates_.frontier_hypotheticals(required_parent_info); @@ -2389,6 +2422,7 @@ namespace kagome::parachain { return; } + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); std::vector> imported; per_relay_parent.statement_store->fresh_statements_for_backing( @@ -2400,33 +2434,40 @@ namespace kagome::parachain { const auto &compact = getPayload(statement); imported.emplace_back(v, compact); - SL_TRACE(logger_, "Handle statement {}", relay_parent); - handleStatement( - relay_parent, - SignedFullStatementWithPVD{ - .payload = - { - .payload = visit_in_place( - compact.inner_value, - [&](const network::vstaging::SecondedCandidateHash - &) -> StatementWithPVD { - return StatementWithPVDSeconded{ - .committed_receipt = confirmed.receipt, - .pvd = confirmed.persisted_validation_data, - }; - }, - [](const network::vstaging::ValidCandidateHash - &val) -> StatementWithPVD { - return StatementWithPVDValid{ - .candidate_hash = val.hash, - }; - }, - [](const auto &) -> StatementWithPVD { - UNREACHABLE; - }), - .ix = statement.payload.ix, - }, - .signature = statement.signature, + SignedFullStatementWithPVD carrying_pvd{ + .payload = + { + .payload = visit_in_place( + compact.inner_value, + [&](const network::vstaging::SecondedCandidateHash &) + -> StatementWithPVD { + return StatementWithPVDSeconded{ + .committed_receipt = confirmed.receipt, + .pvd = confirmed.persisted_validation_data, + }; + }, + [](const network::vstaging::ValidCandidateHash &val) + -> StatementWithPVD { + return StatementWithPVDValid{ + .candidate_hash = val.hash, + }; + }, + [](const auto &) -> StatementWithPVD { + UNREACHABLE; + }), + .ix = statement.payload.ix, + }, + .signature = statement.signature, + }; + + main_pool_handler_->execute( + [wself{weak_from_this()}, + relay_parent{relay_parent}, + carrying_pvd{std::move(carrying_pvd)}]() { + if (auto self = wself.lock()) { + SL_TRACE(self->logger_, "Handle statement {}", relay_parent); + self->handleStatement(relay_parent, carrying_pvd); + } }); }); @@ -2576,14 +2617,6 @@ namespace kagome::parachain { return; } - const auto &collator_id = - collatorIdFromDescriptor(attesting_data.candidate.descriptor); - if (parachain_state.required_collator - && collator_id != *parachain_state.required_collator) { - parachain_state.issued_statements.insert(candidate_hash); - return; - } - auto session_info = retrieveSessionInfo(relay_parent); if (!session_info) { SL_WARN(logger_, "No session info.(relay_parent={})", relay_parent); @@ -2754,6 +2787,7 @@ namespace kagome::parachain { const primitives::BlockHash &relay_parent, const SignedFullStatementWithPVD &statement) { BOOST_ASSERT(main_pool_handler_->isInCurrentThread()); + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); auto opt_parachain_state = tryGetStateByRelayParent(relay_parent); if (!opt_parachain_state) { @@ -2761,8 +2795,11 @@ namespace kagome::parachain { return; } + SL_TRACE(logger_, "=====> 111 2"); + auto ¶chain_state = opt_parachain_state->get(); - auto &assignment = parachain_state.assignment; + const auto &assigned_para = parachain_state.assigned_para; + const auto &assigned_core = parachain_state.assigned_core; auto &fallbacks = parachain_state.fallbacks; auto &awaiting_validation = parachain_state.awaiting_validation; @@ -2777,11 +2814,12 @@ namespace kagome::parachain { post_import_statement_actions(relay_parent, parachain_state, res.value()); if (auto result = res.value()) { - if (result->group_id != assignment) { + if (!assigned_core || result->group_id != *assigned_core) { SL_TRACE( logger_, - "Registered statement from not our group(our: {}, registered: {}).", - assignment, + "Registered statement from not our group(assigned_para our={}, assigned_core our={}, registered={}).", + assigned_para, + assigned_core, result->group_id); return; } @@ -2853,12 +2891,15 @@ namespace kagome::parachain { ParachainProcessorImpl::importStatementToTable( const RelayHash &relay_parent, ParachainProcessorImpl::RelayParentState &relayParentState, + GroupIndex group_id, const primitives::BlockHash &candidate_hash, const network::SignedStatement &statement) { SL_TRACE( logger_, "Import statement into table.(candidate={})", candidate_hash); + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); return backing_store_->put( relay_parent, + group_id, relayParentState.table_context.groups, statement, relayParentState.prospective_parachains_mode.has_value()); @@ -3028,6 +3069,7 @@ namespace kagome::parachain { void ParachainProcessorImpl::statementDistributionBackedCandidate( const CandidateHash &candidate_hash) { + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); auto confirmed_opt = candidates_.get_confirmed(candidate_hash); if (!confirmed_opt) { SL_TRACE(logger_, @@ -3192,6 +3234,7 @@ namespace kagome::parachain { const CandidateHash &digest, const ParachainProcessorImpl::TableContext &context, uint32_t minimum_backing_votes) { + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); if (auto opt_validity_votes = backing_store_->getCadidateInfo(relay_parent, digest)) { auto &data = opt_validity_votes->get(); @@ -3211,6 +3254,7 @@ namespace kagome::parachain { std::optional ParachainProcessorImpl::table_attested_to_backed( AttestedCandidate &&attested, TableContext &table_context) { + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); const auto para_id = attested.group_id; if (auto it = table_context.groups.find(para_id); it != table_context.groups.end()) { @@ -3270,6 +3314,7 @@ namespace kagome::parachain { const network::RelayHash &relay_parent, const SignedFullStatementWithPVD &statement, ParachainProcessorImpl::RelayParentState &rp_state) { + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); const CandidateHash candidate_hash = candidateHashFrom(parachain::getPayload(statement)); @@ -3330,10 +3375,86 @@ namespace kagome::parachain { }, .signature = statement.signature, }; + + auto core = core_index_from_statement( + rp_state.validator_to_group, + rp_state.group_rotation_info, + rp_state.availability_cores, + statement + ); + if (!core) { + return Error::CORE_INDEX_UNAVAILABLE; + }; + return importStatementToTable( - relay_parent, rp_state, candidate_hash, stmnt); + relay_parent, rp_state, *core, candidate_hash, stmnt); + } + +std::optional ParachainProcessorImpl::core_index_from_statement( + const std::vector> &validator_to_group, + const runtime::GroupDescriptor &group_rotation_info, + const std::vector &cores, + const SignedFullStatementWithPVD &statement +) { + const auto &compact_statement = getPayload(statement); + const auto candidate_hash = candidateHashFrom(compact_statement); + + const auto n_cores = cores.size(); + SL_TRACE(logger_, "Extracting core index from statement. (candidate_hash={}, n_cores={})", candidate_hash, n_cores); + + const auto statement_validator_index = statement.payload.ix; + if (statement_validator_index >= validator_to_group.size()) { + SL_TRACE(logger_, "Invalid validator index. (candidate_hash={}, validator_to_group={}, statement_validator_index={}, n_cores={})", candidate_hash, validator_to_group.size(), statement_validator_index, n_cores); + return std::nullopt; } + const auto group_index = validator_to_group[statement_validator_index]; + if (!group_index) { + SL_TRACE(logger_, "Invalid validator index. Empty group. (candidate_hash={}, statement_validator_index={}, n_cores={})", candidate_hash, statement_validator_index, n_cores); + return std::nullopt; + } + + const auto core_index = group_rotation_info.coreForGroup(*group_index, n_cores); + + if (size_t(core_index) > n_cores) { + SL_WARN(logger_, "Invalid CoreIndex. (candidate_hash={}, core_index={}, validator={}, n_cores={})", candidate_hash, core_index, statement_validator_index, n_cores); + return std::nullopt; + } + + if (auto s = if_type(getPayload(statement))) { + const auto candidate_para_id = s->get().committed_receipt.descriptor.para_id; + std::optional assigned_para_id = + visit_in_place(cores[core_index], + [&](const runtime::OccupiedCore &occupied) -> std::optional { + if (occupied.next_up_on_available) { + return occupied.next_up_on_available->para_id; + } else { + return std::nullopt; + } + }, + [&](const runtime::ScheduledCore &scheduled) -> std::optional { + return scheduled.para_id; + }, + [&](const runtime::FreeCore &) -> std::optional { + SL_TRACE(logger_, "Invalid CoreIndex, core is not assigned to any para_id. (candidate_hash={}, core_index={}, validator={}, n_cores={})", candidate_hash, core_index, statement_validator_index, n_cores); + return std::nullopt; + } + ); + + if (!assigned_para_id) { + return std::nullopt; + } + + if (*assigned_para_id != candidate_para_id) { + SL_TRACE(logger_, "Invalid CoreIndex, core is assigned to a different para_id. (candidate_hash={}, core_index={}, validator={}, n_cores={})", candidate_hash, core_index, statement_validator_index, n_cores); + return std::nullopt; + } + return core_index; + } else { + return core_index; + } +} + void ParachainProcessorImpl::unblockAdvertisements( ParachainProcessorImpl::RelayParentState &rp_state, ParachainId para_id, @@ -3402,6 +3523,7 @@ namespace kagome::parachain { ParachainProcessorImpl::sign_import_and_distribute_statement( ParachainProcessorImpl::RelayParentState &rp_state, const ValidateAndSecondResult &validation_result) { + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); if (auto statement = createAndSignStatement(validation_result)) { const SignedFullStatementWithPVD stm = visit_in_place( @@ -3438,6 +3560,7 @@ namespace kagome::parachain { [&](const auto &) -> SignedFullStatementWithPVD { return SignedFullStatementWithPVD{}; }); + SL_TRACE(logger_, "=====> 111 1"); OUTCOME_TRY( summary, @@ -3456,12 +3579,13 @@ namespace kagome::parachain { const RelayHash &relay_parent, ParachainProcessorImpl::RelayParentState &rp_state, std::optional &summary) { + SL_TRACE(logger_, "{}", __PRETTY_FUNCTION__); if (!summary) { return; } SL_TRACE(logger_, - "Import result.(candidate={}, group id={}, validity votes={})", + "Import result.(candidate={}, para id={}, validity votes={})", summary->candidate, summary->group_id, summary->validity_votes); @@ -4008,7 +4132,7 @@ namespace kagome::parachain { const Groups &groups = *per_relay_parent.groups; const std::optional &local_assignment = - per_relay_parent.assignment; + per_relay_parent.assigned_para; const network::ValidatorIndex local_index = *per_relay_parent.our_index; const auto local_group_opt = groups.byValidatorIndex(local_index); if (!opt_session_info) { @@ -4553,7 +4677,7 @@ namespace kagome::parachain { const ProspectiveParachainsModeOpt &relay_parent_mode = per_relay_parent.prospective_parachains_mode; const std::optional &assignment = - per_relay_parent.assignment; + per_relay_parent.assigned_para; auto peer_state = pm_->getPeerState(peer_id); if (!peer_state) { @@ -4568,7 +4692,7 @@ namespace kagome::parachain { } const ParachainId collator_para_id = collator_state->para_id; - if (!assignment) { + if (!assignment || collator_para_id != *assignment) { SL_TRACE(logger_, "Invalid assignment. (peerd_id={}, collator={})", peer_id, diff --git a/core/parachain/validator/parachain_processor.hpp b/core/parachain/validator/parachain_processor.hpp index 22907f0b2a..314c92d087 100644 --- a/core/parachain/validator/parachain_processor.hpp +++ b/core/parachain/validator/parachain_processor.hpp @@ -90,7 +90,8 @@ namespace kagome::parachain { NO_SESSION_INFO, OUT_OF_BOUND, REJECTED_BY_PROSPECTIVE_PARACHAINS, - INCORRECT_BITFIELD_SIZE + INCORRECT_BITFIELD_SIZE, + CORE_INDEX_UNAVAILABLE }; static constexpr uint64_t kBackgroundWorkers = 5; @@ -186,7 +187,7 @@ namespace kagome::parachain { struct TableContext { std::optional validator; - std::unordered_map> groups; + std::unordered_map> groups; std::vector validators; size_t minimum_votes(size_t n_validators) const { @@ -250,11 +251,13 @@ namespace kagome::parachain { struct RelayParentState { ProspectiveParachainsModeOpt prospective_parachains_mode; - std::optional assignment; + std::optional assigned_core; + std::optional assigned_para; + std::vector> validator_to_group; + std::optional seconded; std::optional our_index; std::optional our_group; - std::optional required_collator; Collations collations; TableContext table_context; @@ -505,7 +508,12 @@ namespace kagome::parachain { const network::RelayHash &relay_parent, const SignedFullStatementWithPVD &statement, ParachainProcessorImpl::RelayParentState &relayParentState); - + std::optional core_index_from_statement( + const std::vector> &validator_to_group, + const runtime::GroupDescriptor &group_rotation_info, + const std::vector &cores, + const SignedFullStatementWithPVD &statement +); const network::CandidateDescriptor &candidateDescriptorFrom( const network::CollationFetchingResponse &collation) { return visit_in_place( @@ -687,6 +695,7 @@ namespace kagome::parachain { std::optional importStatementToTable( const RelayHash &relay_parent, ParachainProcessorImpl::RelayParentState &relayParentState, + GroupIndex group_id, const primitives::BlockHash &candidate_hash, const network::SignedStatement &statement); diff --git a/core/runtime/runtime_api/parachain_host_types.hpp b/core/runtime/runtime_api/parachain_host_types.hpp index db72e6e856..01c147ec1f 100644 --- a/core/runtime/runtime_api/parachain_host_types.hpp +++ b/core/runtime/runtime_api/parachain_host_types.hpp @@ -109,6 +109,32 @@ namespace kagome::runtime { return GroupIndex((size_t(core_index) + size_t(rotations)) % cores_normalized); } + + /// Returns the index of the group assigned to the given core. This does no checking or + /// whether the group index is in-bounds. + /// + /// `core_index` should be less than `cores`, which is capped at `u32::max()`. + CoreIndex coreForGroup(GroupIndex group_index, size_t cores_) const { + if (group_rotation_frequency == 0) { + return group_index; + } + if (cores_ == 0) { + return 0; + } + + const auto cores = std::min(cores_, size_t(std::numeric_limits::max())); + const auto blocks_since_start = math::sat_sub_unsigned(now_block_num, session_start_block); + const auto r = blocks_since_start / group_rotation_frequency; + const auto rotations = r % uint32_t(cores); + + // g = c + r mod cores + // c = g - r mod cores + // x = x + cores mod cores + // c = (g + cores) - r mod cores + + const auto idx = (size_t(group_index) + cores - size_t(rotations)) % cores; + return CoreIndex(idx); + } }; struct ValidatorGroup {