Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/kusama grid size fixes #2180

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4db536c
fix recovery (validators <= discovery_keys)
turuslan Aug 6, 2024
cc3c7b5
Merge remote-tracking branch 'origin/fix/recovery'
iceseer Aug 7, 2024
8e599f2
backing init fixup
iceseer Aug 8, 2024
d85691f
kusama fixup
iceseer Aug 8, 2024
4b66108
update our view in topology
iceseer Aug 9, 2024
d738478
remove logs
iceseer Aug 9, 2024
d3075ec
remove logs
iceseer Aug 9, 2024
9ddbd74
kusama fixup
iceseer Aug 10, 2024
294e62a
kusama fixup
iceseer Aug 10, 2024
d24595b
not to doublicate sends
iceseer Aug 11, 2024
6ba5f3b
test fixup!
iceseer Aug 13, 2024
4409145
grid view from authorities
iceseer Aug 13, 2024
e636577
audi
iceseer Aug 13, 2024
312db03
audi
iceseer Aug 13, 2024
7063722
audi errors
iceseer Aug 14, 2024
59b80b6
force authority discovery for the given peer set
iceseer Aug 15, 2024
4e466f1
logs
iceseer Aug 15, 2024
8a81d70
fixup!
iceseer Aug 15, 2024
82b3ac6
remove logs
iceseer Sep 17, 2024
0361c16
Merge remote-tracking branch 'origin/1691-consider-pvf-executor-param…
iceseer Sep 17, 2024
69ee666
revert
iceseer Sep 17, 2024
3c2a372
revert
iceseer Sep 17, 2024
abf8c4f
Merge branch 'master' into fix/kusama_backed_candidates
iceseer Sep 17, 2024
5f990b7
fixup
iceseer Sep 17, 2024
1868eef
fixup fork wakeups
iceseer Sep 17, 2024
2a39991
Merge branch 'master' into fix/kusama_backed_candidates
iceseer Sep 17, 2024
399eca8
Merge branch 'master' into fix/kusama_backed_candidates
iceseer Sep 23, 2024
0c479bd
unify_with_peer
iceseer Sep 23, 2024
afd6364
PATCH FOR APPROVAL
iceseer Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/authority_discovery/query/query_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ namespace kagome::authority_discovery {
}
auto r = add(*id, value);
if (not r) {
SL_DEBUG(log_, "Can't add: {}", r.error());
SL_WARN(log_, "Can't add: {}", r.error());
} else {
SL_INFO(log_, "Added or updated success: {}", *id);
}
return r;
}
Expand Down
3 changes: 3 additions & 0 deletions core/network/impl/protocols/protocol_fetch_available_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ namespace kagome::network {
private:
std::optional<outcome::result<ResponseType>> onRxRequest(
RequestType candidate_hash, std::shared_ptr<Stream>) override {
base().logger()->trace("Fetch available data .(candidate hash={})",
candidate_hash);

if (auto r = av_store_->getPovAndData(candidate_hash)) {
return std::move(*r);
}
Expand Down
3 changes: 3 additions & 0 deletions core/parachain/approval/approval.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ namespace kagome::parachain::approval {
inline bool is_local_approval(const ApprovalStateTransition &val) {
return boost::get<LocalApproval>(&val) != nullptr;
}
inline bool is_remote_approval(const ApprovalStateTransition &val) {
return boost::get<RemoteApproval>(&val) != nullptr;
}

/// Metadata about a block which is now live in the approval protocol.
struct BlockApprovalMeta {
Expand Down
152 changes: 124 additions & 28 deletions core/parachain/approval/approval_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ namespace kagome::parachain {
common::MainThreadPool &main_thread_pool,
LazySPtr<dispute::DisputeCoordinator> dispute_coordinator)
: approval_thread_handler_{poolHandlerReadyMake(
this, app_state_manager, approval_thread_pool, logger_)},
this, app_state_manager, approval_thread_pool, logger_)},
worker_pool_handler_{worker_thread_pool.handler(*app_state_manager)},
parachain_host_(std::move(parachain_host)),
slots_util_(slots_util),
Expand Down Expand Up @@ -1469,25 +1469,55 @@ namespace kagome::parachain {
blocks_by_number_[meta.number].insert(meta.hash);
}

std::unordered_set<libp2p::peer::PeerId> active_peers;
pm_->enumeratePeerState(
[&](const libp2p::peer::PeerId &peer, network::PeerState &_) {
active_peers.insert(peer);
return true;
});

network::View our_current_view{
.heads_ = block_tree_->getLeaves(),
.finalized_number_ = block_tree_->getLastFinalized().number,
};

approval_thread_handler_->execute([wself{weak_from_this()},
our_current_view{
std::move(our_current_view)},
active_peers{std::move(active_peers)},
new_hash,
finalized_block_number,
meta{std::move(meta)}]() {
if (auto self = wself.lock()) {
SL_TRACE(self->logger_, "Got new block.(hash={})", new_hash);
// std::unordered_map<libp2p::peer::PeerId, network::View>
// peer_views(std::move(self->peer_views_));
for (auto it = self->peer_views_.begin();
it != self->peer_views_.end();) {
if (active_peers.contains(it->first)) {
++it;
} else {
it = self->peer_views_.erase(it);
}
}

for (const auto &p : active_peers) {
std::ignore = self->peer_views_[p];
}

for (const auto &[peed_id, view] : self->peer_views_) {
for (const auto &h : view.heads_) {
if (h == meta.hash) {
self->unify_with_peer(
self->storedDistribBlockEntries(),
peed_id,
network::View{
.heads_ = {h},
.finalized_number_ = finalized_block_number,
},
false);
}
network::View view_intersection{
.heads_ = {},
.finalized_number_ = view.finalized_number_,
};

if (new_hash && view.contains(*new_hash)) {
view_intersection.heads_.emplace_back(*new_hash);
}

self->unify_with_peer(self->storedDistribBlockEntries(),
peed_id,
view_intersection,
false);
}

for (auto it = self->pending_known_.begin();
Expand Down Expand Up @@ -2134,8 +2164,10 @@ namespace kagome::parachain {
ApprovalRouting{
.required_routing =
grid::RequiredRouting{
.value = grid::RequiredRouting::
GridXY}, /// TODO(iceseer): calculate based on grid
.value =
grid::RequiredRouting::All}, /// TODO(iceseer):
/// calculate
/// based on grid
.local = local,
.random_routing = {},
.peers_randomly_routed = {},
Expand Down Expand Up @@ -2547,17 +2579,36 @@ namespace kagome::parachain {
auto se = pm_->getStreamEngine();
BOOST_ASSERT(se);

se->broadcast(
router_->getValidationProtocolVStaging(),
std::make_shared<
auto msg = std::make_shared<
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::vstaging::ApprovalDistributionMessage{
network::vstaging::Assignments{
.assignments = {network::vstaging::Assignment{
.indirect_assignment_cert = indirect_cert,
.candidate_bitfield = candidate_indices,
}}}}),
[&](const libp2p::peer::PeerId &p) { return peers.contains(p); });
}}}});

for (const auto &peer : peers) {
parachain_processor_->tryOpenOutgoingValidationStream(
peer,
network::CollationVersion::VStaging,
[WEAK_SELF, peer{peer}, se, msg](auto &&stream) {
WEAK_LOCK(self);
se->send(peer, self->router_->getValidationProtocolVStaging(), msg);
});
}

// se->broadcast(
// router_->getValidationProtocolVStaging(),
// std::make_shared<
// network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
// network::vstaging::ApprovalDistributionMessage{
// network::vstaging::Assignments{
// .assignments = {network::vstaging::Assignment{
// .indirect_assignment_cert = indirect_cert,
// .candidate_bitfield = candidate_indices,
// }}}}),
// [&](const libp2p::peer::PeerId &p) { return peers.count(p) != 0ull; });
}

void ApprovalDistribution::send_assignments_batched(
Expand Down Expand Up @@ -2668,15 +2719,22 @@ namespace kagome::parachain {
auto se = pm_->getStreamEngine();
BOOST_ASSERT(se);

se->broadcast(
router_->getValidationProtocolVStaging(),
std::make_shared<
auto msg = std::make_shared<
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::vstaging::ApprovalDistributionMessage{
network::vstaging::Approvals{
.approvals = {vote},
}}),
[&](const libp2p::peer::PeerId &p) { return peers.contains(p); });
}});

for (const auto &peer : peers) {
parachain_processor_->tryOpenOutgoingValidationStream(
peer,
network::CollationVersion::VStaging,
[WEAK_SELF, peer{peer}, se, msg](auto &&stream) {
WEAK_LOCK(self);
se->send(peer, self->router_->getValidationProtocolVStaging(), msg);
});
}
}

void ApprovalDistribution::issue_approval(const CandidateHash &candidate_hash,
Expand Down Expand Up @@ -2887,9 +2945,10 @@ namespace kagome::parachain {
};
return approval::min_or_some(
e.next_no_show,
(e.last_assignment_tick ? filter(
*e.last_assignment_tick + kApprovalDelay, tick_now)
: std::optional<Tick>{}));
(e.last_assignment_tick
? filter(*e.last_assignment_tick + kApprovalDelay,
tick_now)
: std::optional<Tick>{}));
},
[&](const approval::PendingRequiredTranche &e) {
std::optional<DelayTranche> next_announced{};
Expand Down Expand Up @@ -3036,6 +3095,37 @@ namespace kagome::parachain {
status.block_tick,
tick_now,
status.required_tranches);

if (is_approved && is_remote_approval(transition)) {
for (const auto &[fork_block_hash, fork_approval_entry] :
candidate_entry.block_assignments) {
if (fork_block_hash == block_hash) {
continue;
}

bool assigned_on_fork_block = false;
if (validator_index) {
assigned_on_fork_block =
fork_approval_entry.is_assigned(*validator_index);
}

if (!wakeup_for(fork_block_hash, candidate_hash)
&& !fork_approval_entry.approved && assigned_on_fork_block) {
auto opt_fork_block_entry = storedBlockEntries().get(fork_block_hash);
if (!opt_fork_block_entry) {
SL_TRACE(logger_,
"Failed to load block entry. (fork_block_hash={})",
fork_block_hash);
} else {
runScheduleWakeup(fork_block_hash,
opt_fork_block_entry->get().block_number,
candidate_hash,
tick_now + 1);
}
}
}
}

if (approval::is_local_approval(transition) || newly_approved
|| (already_approved_by && !*already_approved_by)) {
BOOST_ASSERT(storedCandidateEntries().get(candidate_hash)->get()
Expand Down Expand Up @@ -3108,6 +3198,12 @@ namespace kagome::parachain {
target_block[candidate_hash].emplace_back(tick, std::move(handle));
}

bool ApprovalDistribution::wakeup_for(const primitives::BlockHash &block_hash,
const CandidateHash &candidate_hash) {
auto it = active_tranches_.find(block_hash);
return it != active_tranches_.end() && it->second.contains(candidate_hash);
}

void ApprovalDistribution::handleTranche(
const primitives::BlockHash &block_hash,
primitives::BlockNumber block_number,
Expand Down
10 changes: 7 additions & 3 deletions core/parachain/approval/approval_distribution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ namespace kagome::parachain {
}

/// Whether a validator is already assigned.
bool is_assigned(ValidatorIndex validator_index) {
bool is_assigned(ValidatorIndex validator_index) const {
if (validator_index < assignments.bits.size()) {
return assignments.bits[validator_index];
}
Expand Down Expand Up @@ -233,8 +233,9 @@ namespace kagome::parachain {
CandidateEntry(const network::CandidateReceipt &receipt,
SessionIndex session_index,
size_t approvals_size)
: CandidateEntry(
HashedCandidateReceipt{receipt}, session_index, approvals_size) {}
: CandidateEntry(HashedCandidateReceipt{receipt},
session_index,
approvals_size) {}

std::optional<std::reference_wrapper<ApprovalEntry>> approval_entry(
const network::RelayHash &relay_hash) {
Expand Down Expand Up @@ -792,6 +793,9 @@ namespace kagome::parachain {
void scheduleTranche(const primitives::BlockHash &head,
BlockImportedCandidates &&candidate);

bool wakeup_for(const primitives::BlockHash &block_hash,
const CandidateHash &candidate_hash);

void runDistributeAssignment(
const approval::IndirectAssignmentCertV2 &indirect_cert,
const scale::BitVec &candidate_indices,
Expand Down
14 changes: 0 additions & 14 deletions core/parachain/availability/bitfield/signer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,6 @@ namespace kagome::parachain {
for (auto &candidate : candidates) {
bitfield.bits.push_back(
candidate && store_->hasChunk(*candidate, signer.validatorIndex()));
if (candidate) {
SL_TRACE(logger_,
"Signing bitfields.(relay_parent={}, validator index={}, has "
"chunk={})",
relay_parent,
signer.validatorIndex(),
bitfield.bits.back() ? 1 : 0);
} else {
SL_TRACE(logger_,
"Signing bitfields.(relay_parent={}, validator index={}, NOT "
"OCCUPIED)",
relay_parent,
signer.validatorIndex());
}
}

OUTCOME_TRY(signed_bitfield, signer.sign(bitfield));
Expand Down
5 changes: 5 additions & 0 deletions core/parachain/availability/recovery/recovery_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,11 @@ namespace kagome::parachain {
OUTCOME_TRY(chunks, toChunks(active.chunks_total, data));
auto root = makeTrieProof(chunks);
if (root != active.erasure_encoding_root) {
SL_TRACE(logger_,
"Trie root mismatch. (root={}, ref root={}, n_validators={})",
root,
active.erasure_encoding_root,
active.validators.size());
return ErasureCodingRootError::MISMATCH;
}
return outcome::success();
Expand Down
12 changes: 6 additions & 6 deletions core/parachain/backing/grid.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ namespace kagome::parachain::grid {

/// View for one group
struct View {
std::unordered_set<ValidatorIndex> receiving, sending;
std::unordered_set<ValidatorIndex> sending, receiving;

bool operator==(const View &r) const {
return sending == r.sending && receiving == r.receiving;
}

bool canReceive(bool full, ValidatorIndex from) const {
return (full ? receiving : sending).contains(from);
Expand Down Expand Up @@ -237,12 +241,8 @@ namespace kagome::parachain::grid {
}

inline std::vector<ValidatorIndex> shuffle(
const std::vector<std::vector<ValidatorIndex>> &groups,
size_t n,
std::span<const uint8_t, 32> babe_randomness) {
size_t n = 0;
for (auto &group : groups) {
n += group.size();
}
std::vector<ValidatorIndex> validators;
validators.resize(n);
std::iota(validators.begin(), validators.end(), 0);
Expand Down
Loading
Loading