Skip to content

Commit

Permalink
request response timeout and metric (#2373)
Browse files Browse the repository at this point in the history
Signed-off-by: turuslan <turuslan.devbox@gmail.com>
Co-authored-by: Igor Egorov <igor@qdrvm.io>
  • Loading branch information
turuslan and igor-egorov authored Feb 13, 2025
1 parent 6df722c commit 337f063
Show file tree
Hide file tree
Showing 22 changed files with 248 additions and 147 deletions.
4 changes: 2 additions & 2 deletions cmake/Hunter/hunter-gate-url.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
HunterGate(
URL https://github.com/qdrvm/hunter/archive/refs/tags/v0.25.3-qdrvm30.tar.gz
SHA1 37fc6bf5938db78086b86cf32d58f433888fb5a8
URL https://github.com/qdrvm/hunter/archive/refs/tags/v0.25.3-qdrvm31.zip
SHA1 f2d79db34e0adfb09acc010afbb7ca7aeefdbc01
LOCAL
)
2 changes: 1 addition & 1 deletion core/authority_discovery/query/query_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ namespace kagome::authority_discovery {
if (has(authorities, it->second)) {
++it;
} else {
it = peer_to_auth_cache_.erase(it);
validation_protocol_.get()->reserve(it->first, false);
it = peer_to_auth_cache_.erase(it);
}
}
std::shuffle(authorities.begin(), authorities.end(), random_);
Expand Down
16 changes: 10 additions & 6 deletions core/network/impl/protocols/beefy_justification_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,26 @@
#include "network/peer_manager.hpp"

namespace kagome::network {
constexpr std::chrono::seconds kRequestTimeout{3};

BeefyJustificationProtocol::BeefyJustificationProtocol(
libp2p::Host &host,
const RequestResponseInject &inject,
const blockchain::GenesisBlockHash &genesis,
common::MainThreadPool &main_thread_pool,
std::shared_ptr<PeerManager> peer_manager,
std::shared_ptr<Beefy> beefy)
: RequestResponseProtocolImpl{kName,
host,
inject,
make_protocols(kBeefyJustificationProtocol,
genesis),
log::createLogger(kName),
main_thread_pool},
main_pool_handler_{main_thread_pool.handlerStarted()},
kRequestTimeout},
main_pool_handler_{inject.main_thread_pool->handlerStarted()},
peer_manager_{std::move(peer_manager)},
beefy_{std::move(beefy)} {}
beefy_{std::move(beefy)} {
BOOST_ASSERT(main_pool_handler_ != nullptr);
BOOST_ASSERT(peer_manager_ != nullptr);
BOOST_ASSERT(beefy_ != nullptr);
}

std::optional<outcome::result<BeefyJustificationProtocol::ResponseType>>
BeefyJustificationProtocol::onRxRequest(RequestType block,
Expand Down
3 changes: 1 addition & 2 deletions core/network/impl/protocols/beefy_justification_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ namespace kagome::network {
static constexpr auto kName = "BeefyJustificationProtocol";

public:
BeefyJustificationProtocol(libp2p::Host &host,
BeefyJustificationProtocol(const RequestResponseInject &inject,
const blockchain::GenesisBlockHash &genesis,
common::MainThreadPool &main_thread_pool,
std::shared_ptr<PeerManager> peer_manager,
std::shared_ptr<Beefy> beefy);

Expand Down
11 changes: 6 additions & 5 deletions core/network/impl/protocols/fetch_attested_candidate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,29 @@ namespace kagome::network {
ScaleMessageReadWriter>,
NonCopyable,
NonMovable {
static constexpr std::chrono::milliseconds kRequestTimeout{2500};

public:
FetchAttestedCandidateProtocol(
libp2p::Host &host,
RequestResponseInject inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<
parachain::statement_distribution::StatementDistribution>
statement_distribution,
common::MainThreadPool &main_thread_pool)
statement_distribution)
: RequestResponseProtocolImpl<
vstaging::AttestedCandidateRequest,
vstaging::AttestedCandidateResponse,
ScaleMessageReadWriter>{kFetchAttestedCandidateProtocolName,
host,
std::move(inject),
make_protocols(
kFetchAttestedCandidateProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kFetchAttestedCandidateProtocolName,
"req_attested_candidate_protocol"),
main_thread_pool},
kRequestTimeout},
statement_distribution_(std::move(statement_distribution)) {
BOOST_ASSERT(statement_distribution_);
}
Expand Down
18 changes: 12 additions & 6 deletions core/network/impl/protocols/light.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,31 @@
#include "storage/trie/on_read.hpp"

namespace kagome::network {
constexpr std::chrono::seconds kRequestTimeout{15};

LightProtocol::LightProtocol(
libp2p::Host &host,
RequestResponseInject inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis,
std::shared_ptr<blockchain::BlockHeaderRepository> repository,
std::shared_ptr<storage::trie::TrieStorage> storage,
std::shared_ptr<runtime::ModuleRepository> module_repo,
std::shared_ptr<runtime::Executor> executor,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<runtime::Executor> executor)
: RequestResponseProtocolImpl{kName,
host,
std::move(inject),
make_protocols(
kLightProtocol, genesis, chain_spec),
log::createLogger(kName),
main_thread_pool},
kRequestTimeout},
repository_{std::move(repository)},
storage_{std::move(storage)},
module_repo_{std::move(module_repo)},
executor_{std::move(executor)} {}
executor_{std::move(executor)} {
BOOST_ASSERT(repository_ != nullptr);
BOOST_ASSERT(storage_ != nullptr);
BOOST_ASSERT(module_repo_ != nullptr);
BOOST_ASSERT(executor_ != nullptr);
}

std::optional<outcome::result<LightProtocol::ResponseType>>
LightProtocol::onRxRequest(RequestType req, std::shared_ptr<Stream>) {
Expand Down
5 changes: 2 additions & 3 deletions core/network/impl/protocols/light.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ namespace kagome::network {
static constexpr auto kName = "LightProtocol";

public:
LightProtocol(libp2p::Host &host,
LightProtocol(RequestResponseInject inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis,
std::shared_ptr<blockchain::BlockHeaderRepository> repository,
std::shared_ptr<storage::trie::TrieStorage> storage,
std::shared_ptr<runtime::ModuleRepository> module_repo,
std::shared_ptr<runtime::Executor> executor,
common::MainThreadPool &main_thread_pool);
std::shared_ptr<runtime::Executor> executor);

std::optional<outcome::result<ResponseType>> onRxRequest(
RequestType req, std::shared_ptr<Stream>) override;
Expand Down
20 changes: 10 additions & 10 deletions core/network/impl/protocols/protocol_fetch_available_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,25 @@ namespace kagome::network {
ScaleMessageReadWriter> {
public:
static constexpr const char *kName = "FetchAvailableDataProtocol";
static constexpr std::chrono::seconds kRequestTimeout{2};

FetchAvailableDataProtocolImpl(
libp2p::Host &host,
RequestResponseInject inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::AvailabilityStore> av_store,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<parachain::AvailabilityStore> av_store)
: RequestResponseProtocolImpl<
FetchAvailableDataRequest,
FetchAvailableDataResponse,
ScaleMessageReadWriter>{kName,
host,
std::move(inject),
make_protocols(
kFetchAvailableDataProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName, "req_available_data_protocol"),
main_thread_pool},
kRequestTimeout},
av_store_{std::move(av_store)} {}

private:
Expand All @@ -73,24 +73,24 @@ namespace kagome::network {
ScaleMessageReadWriter> {
public:
static constexpr const char *kName = "FetchStatementProtocol";
static constexpr std::chrono::seconds kRequestTimeout{1};

StatementFetchingProtocol(
libp2p::Host &host,
RequestResponseInject inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::BackingStore> backing_store,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<parachain::BackingStore> backing_store)
: RequestResponseProtocolImpl<
FetchStatementRequest,
FetchStatementResponse,
ScaleMessageReadWriter>{kName,
host,
std::move(inject),
make_protocols(kFetchStatementProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName, "req_statement_protocol"),
main_thread_pool},
kRequestTimeout},
backing_store_{std::move(backing_store)} {}

private:
Expand Down
11 changes: 6 additions & 5 deletions core/network/impl/protocols/protocol_fetch_chunk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,26 @@ namespace kagome::network {
ScaleMessageReadWriter>,
NonCopyable,
NonMovable {
static constexpr std::chrono::seconds kRequestTimeout{1};

public:
FetchChunkProtocolImpl(
libp2p::Host &host,
RequestResponseInject inject,
const application::ChainSpec & /*chain_spec*/,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::ParachainProcessorImpl> pp,
std::shared_ptr<PeerManager> pm,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<PeerManager> pm)
: RequestResponseProtocolImpl<
FetchChunkRequest,
FetchChunkResponse,
ScaleMessageReadWriter>{kFetchChunkProtocolName,
host,
std::move(inject),
make_protocols(kFetchChunkProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(kFetchChunkProtocolName,
"req_chunk_protocol"),
main_thread_pool},
kRequestTimeout},
pp_{std::move(pp)},
pm_{std::move(pm)} {
BOOST_ASSERT(pp_);
Expand Down
9 changes: 6 additions & 3 deletions core/network/impl/protocols/protocol_fetch_chunk_obsolete.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "blockchain/genesis_block_hash.hpp"
#include "log/logger.hpp"
#include "network/common.hpp"
#include "network/helpers/scale_message_read_writer.hpp"
#include "network/impl/protocols/request_response_protocol.hpp"
#include "parachain/validator/parachain_processor.hpp"
#include "utils/non_copyable.hpp"
Expand All @@ -37,9 +38,11 @@ namespace kagome::network {
ScaleMessageReadWriter>,
NonCopyable,
NonMovable {
static constexpr std::chrono::seconds kRequestTimeout{1};

public:
FetchChunkProtocolObsoleteImpl(
libp2p::Host &host,
const RequestResponseInject &inject,
const application::ChainSpec & /*chain_spec*/,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::ParachainProcessorImpl> pp,
Expand All @@ -48,14 +51,14 @@ namespace kagome::network {
FetchChunkRequest,
FetchChunkResponseObsolete,
ScaleMessageReadWriter>{kFetchChunkProtocolName,
host,
inject,
make_protocols(
kFetchChunkProtocolObsolete,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(kFetchChunkProtocolName,
"req_chunk_protocol"),
main_thread_pool},
kRequestTimeout},
pp_{std::move(pp)} {
BOOST_ASSERT(pp_);
}
Expand Down
28 changes: 11 additions & 17 deletions core/network/impl/protocols/protocol_req_collation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@ namespace kagome::network {
std::decay_t<ResponseT>,
ScaleMessageReadWriter>;

ReqCollationProtocolImpl(libp2p::Host &host,
static constexpr std::chrono::seconds kRequestTimeout{2};

ReqCollationProtocolImpl(RequestResponseInject inject,
const libp2p::peer::ProtocolName &protoname,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqCollationObserver> observer,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<ReqCollationObserver> observer)
: Base{kReqCollationProtocolName,
host,
std::move(inject),
make_protocols(protoname, genesis_hash, kProtocolPrefixPolkadot),
log::createLogger(kReqCollationProtocolName,
"req_collation_protocol"),
main_thread_pool},
kRequestTimeout},
observer_{std::move(observer)} {}

protected:
Expand All @@ -59,29 +60,22 @@ namespace kagome::network {
};

ReqCollationProtocol::ReqCollationProtocol(
libp2p::Host &host,
const RequestResponseInject &inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqCollationObserver> observer,
common::MainThreadPool &main_thread_pool)
std::shared_ptr<ReqCollationObserver> observer)
: v1_impl_{std::make_shared<
ReqCollationProtocolImpl<CollationFetchingRequest,
CollationFetchingResponse>>(
host,
kReqCollationProtocol,
chain_spec,
genesis_hash,
observer,
main_thread_pool)},
inject, kReqCollationProtocol, chain_spec, genesis_hash, observer)},
vstaging_impl_{std::make_shared<
ReqCollationProtocolImpl<vstaging::CollationFetchingRequest,
vstaging::CollationFetchingResponse>>(
host,
inject,
kReqCollationVStagingProtocol,
chain_spec,
genesis_hash,
observer,
main_thread_pool)} {
observer)} {
BOOST_ASSERT(v1_impl_);
BOOST_ASSERT(vstaging_impl_);
}
Expand Down
6 changes: 3 additions & 3 deletions core/network/impl/protocols/protocol_req_collation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "application/chain_spec.hpp"
#include "common/main_thread_pool.hpp"
#include "log/logger.hpp"
#include "network/impl/protocols/request_response_protocol.hpp"
#include "network/peer_manager.hpp"
#include "network/protocols/req_collation_protocol.hpp"
#include "network/types/collator_messages_vstaging.hpp"
Expand All @@ -39,11 +40,10 @@ namespace kagome::network {
ReqCollationProtocol() = delete;
~ReqCollationProtocol() override = default;

ReqCollationProtocol(libp2p::Host &host,
ReqCollationProtocol(const RequestResponseInject &inject,
const application::ChainSpec &chain_spec,
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqCollationObserver> observer,
common::MainThreadPool &main_thread_pool);
std::shared_ptr<ReqCollationObserver> observer);

const Protocol &protocolName() const override;

Expand Down
Loading

0 comments on commit 337f063

Please sign in to comment.