diff --git a/cmake/Hunter/hunter-gate-url.cmake b/cmake/Hunter/hunter-gate-url.cmake index 8e014ec2aa..f108408443 100644 --- a/cmake/Hunter/hunter-gate-url.cmake +++ b/cmake/Hunter/hunter-gate-url.cmake @@ -1,5 +1,5 @@ HunterGate( - URL https://github.com/qdrvm/hunter/archive/refs/tags/v0.25.3-qdrvm30.tar.gz - SHA1 37fc6bf5938db78086b86cf32d58f433888fb5a8 + URL https://github.com/qdrvm/hunter/archive/refs/tags/v0.25.3-qdrvm31.zip + SHA1 f2d79db34e0adfb09acc010afbb7ca7aeefdbc01 LOCAL ) diff --git a/core/authority_discovery/query/query_impl.cpp b/core/authority_discovery/query/query_impl.cpp index 6e0bae4f58..ef2c347066 100644 --- a/core/authority_discovery/query/query_impl.cpp +++ b/core/authority_discovery/query/query_impl.cpp @@ -198,8 +198,8 @@ namespace kagome::authority_discovery { if (has(authorities, it->second)) { ++it; } else { - it = peer_to_auth_cache_.erase(it); validation_protocol_.get()->reserve(it->first, false); + it = peer_to_auth_cache_.erase(it); } } std::shuffle(authorities.begin(), authorities.end(), random_); diff --git a/core/network/impl/protocols/beefy_justification_protocol.cpp b/core/network/impl/protocols/beefy_justification_protocol.cpp index ddab3bf822..699fab634a 100644 --- a/core/network/impl/protocols/beefy_justification_protocol.cpp +++ b/core/network/impl/protocols/beefy_justification_protocol.cpp @@ -15,22 +15,26 @@ #include "network/peer_manager.hpp" namespace kagome::network { + constexpr std::chrono::seconds kRequestTimeout{3}; BeefyJustificationProtocol::BeefyJustificationProtocol( - libp2p::Host &host, + const RequestResponseInject &inject, const blockchain::GenesisBlockHash &genesis, - common::MainThreadPool &main_thread_pool, std::shared_ptr peer_manager, std::shared_ptr beefy) : RequestResponseProtocolImpl{kName, - host, + inject, make_protocols(kBeefyJustificationProtocol, genesis), log::createLogger(kName), - main_thread_pool}, - main_pool_handler_{main_thread_pool.handlerStarted()}, + kRequestTimeout}, + main_pool_handler_{inject.main_thread_pool->handlerStarted()}, peer_manager_{std::move(peer_manager)}, - beefy_{std::move(beefy)} {} + beefy_{std::move(beefy)} { + BOOST_ASSERT(main_pool_handler_ != nullptr); + BOOST_ASSERT(peer_manager_ != nullptr); + BOOST_ASSERT(beefy_ != nullptr); + } std::optional> BeefyJustificationProtocol::onRxRequest(RequestType block, diff --git a/core/network/impl/protocols/beefy_justification_protocol.hpp b/core/network/impl/protocols/beefy_justification_protocol.hpp index e75b9d54ba..e4b70c266c 100644 --- a/core/network/impl/protocols/beefy_justification_protocol.hpp +++ b/core/network/impl/protocols/beefy_justification_protocol.hpp @@ -39,9 +39,8 @@ namespace kagome::network { static constexpr auto kName = "BeefyJustificationProtocol"; public: - BeefyJustificationProtocol(libp2p::Host &host, + BeefyJustificationProtocol(const RequestResponseInject &inject, const blockchain::GenesisBlockHash &genesis, - common::MainThreadPool &main_thread_pool, std::shared_ptr peer_manager, std::shared_ptr beefy); diff --git a/core/network/impl/protocols/fetch_attested_candidate.hpp b/core/network/impl/protocols/fetch_attested_candidate.hpp index 2c23e3b537..03416cb749 100644 --- a/core/network/impl/protocols/fetch_attested_candidate.hpp +++ b/core/network/impl/protocols/fetch_attested_candidate.hpp @@ -29,20 +29,21 @@ namespace kagome::network { ScaleMessageReadWriter>, NonCopyable, NonMovable { + static constexpr std::chrono::milliseconds kRequestTimeout{2500}; + public: FetchAttestedCandidateProtocol( - libp2p::Host &host, + RequestResponseInject inject, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis_hash, std::shared_ptr< parachain::statement_distribution::StatementDistribution> - statement_distribution, - common::MainThreadPool &main_thread_pool) + statement_distribution) : RequestResponseProtocolImpl< vstaging::AttestedCandidateRequest, vstaging::AttestedCandidateResponse, ScaleMessageReadWriter>{kFetchAttestedCandidateProtocolName, - host, + std::move(inject), make_protocols( kFetchAttestedCandidateProtocol, genesis_hash, @@ -50,7 +51,7 @@ namespace kagome::network { log::createLogger( kFetchAttestedCandidateProtocolName, "req_attested_candidate_protocol"), - main_thread_pool}, + kRequestTimeout}, statement_distribution_(std::move(statement_distribution)) { BOOST_ASSERT(statement_distribution_); } diff --git a/core/network/impl/protocols/light.cpp b/core/network/impl/protocols/light.cpp index ce55d355ad..60d8644a9c 100644 --- a/core/network/impl/protocols/light.cpp +++ b/core/network/impl/protocols/light.cpp @@ -14,25 +14,31 @@ #include "storage/trie/on_read.hpp" namespace kagome::network { + constexpr std::chrono::seconds kRequestTimeout{15}; + LightProtocol::LightProtocol( - libp2p::Host &host, + RequestResponseInject inject, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis, std::shared_ptr repository, std::shared_ptr storage, std::shared_ptr module_repo, - std::shared_ptr executor, - common::MainThreadPool &main_thread_pool) + std::shared_ptr executor) : RequestResponseProtocolImpl{kName, - host, + std::move(inject), make_protocols( kLightProtocol, genesis, chain_spec), log::createLogger(kName), - main_thread_pool}, + kRequestTimeout}, repository_{std::move(repository)}, storage_{std::move(storage)}, module_repo_{std::move(module_repo)}, - executor_{std::move(executor)} {} + executor_{std::move(executor)} { + BOOST_ASSERT(repository_ != nullptr); + BOOST_ASSERT(storage_ != nullptr); + BOOST_ASSERT(module_repo_ != nullptr); + BOOST_ASSERT(executor_ != nullptr); + } std::optional> LightProtocol::onRxRequest(RequestType req, std::shared_ptr) { diff --git a/core/network/impl/protocols/light.hpp b/core/network/impl/protocols/light.hpp index 956b641a03..5c19f72e77 100644 --- a/core/network/impl/protocols/light.hpp +++ b/core/network/impl/protocols/light.hpp @@ -43,14 +43,13 @@ namespace kagome::network { static constexpr auto kName = "LightProtocol"; public: - LightProtocol(libp2p::Host &host, + LightProtocol(RequestResponseInject inject, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis, std::shared_ptr repository, std::shared_ptr storage, std::shared_ptr module_repo, - std::shared_ptr executor, - common::MainThreadPool &main_thread_pool); + std::shared_ptr executor); std::optional> onRxRequest( RequestType req, std::shared_ptr) override; diff --git a/core/network/impl/protocols/protocol_fetch_available_data.hpp b/core/network/impl/protocols/protocol_fetch_available_data.hpp index 7790a380ed..2320c451fa 100644 --- a/core/network/impl/protocols/protocol_fetch_available_data.hpp +++ b/core/network/impl/protocols/protocol_fetch_available_data.hpp @@ -28,25 +28,25 @@ namespace kagome::network { ScaleMessageReadWriter> { public: static constexpr const char *kName = "FetchAvailableDataProtocol"; + static constexpr std::chrono::seconds kRequestTimeout{2}; FetchAvailableDataProtocolImpl( - libp2p::Host &host, + RequestResponseInject inject, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis_hash, - std::shared_ptr av_store, - common::MainThreadPool &main_thread_pool) + std::shared_ptr av_store) : RequestResponseProtocolImpl< FetchAvailableDataRequest, FetchAvailableDataResponse, ScaleMessageReadWriter>{kName, - host, + std::move(inject), make_protocols( kFetchAvailableDataProtocol, genesis_hash, kProtocolPrefixPolkadot), log::createLogger( kName, "req_available_data_protocol"), - main_thread_pool}, + kRequestTimeout}, av_store_{std::move(av_store)} {} private: @@ -73,24 +73,24 @@ namespace kagome::network { ScaleMessageReadWriter> { public: static constexpr const char *kName = "FetchStatementProtocol"; + static constexpr std::chrono::seconds kRequestTimeout{1}; StatementFetchingProtocol( - libp2p::Host &host, + RequestResponseInject inject, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis_hash, - std::shared_ptr backing_store, - common::MainThreadPool &main_thread_pool) + std::shared_ptr backing_store) : RequestResponseProtocolImpl< FetchStatementRequest, FetchStatementResponse, ScaleMessageReadWriter>{kName, - host, + std::move(inject), make_protocols(kFetchStatementProtocol, genesis_hash, kProtocolPrefixPolkadot), log::createLogger( kName, "req_statement_protocol"), - main_thread_pool}, + kRequestTimeout}, backing_store_{std::move(backing_store)} {} private: diff --git a/core/network/impl/protocols/protocol_fetch_chunk.hpp b/core/network/impl/protocols/protocol_fetch_chunk.hpp index c477a9a343..f16627bdb6 100644 --- a/core/network/impl/protocols/protocol_fetch_chunk.hpp +++ b/core/network/impl/protocols/protocol_fetch_chunk.hpp @@ -37,25 +37,26 @@ namespace kagome::network { ScaleMessageReadWriter>, NonCopyable, NonMovable { + static constexpr std::chrono::seconds kRequestTimeout{1}; + public: FetchChunkProtocolImpl( - libp2p::Host &host, + RequestResponseInject inject, const application::ChainSpec & /*chain_spec*/, const blockchain::GenesisBlockHash &genesis_hash, std::shared_ptr pp, - std::shared_ptr pm, - common::MainThreadPool &main_thread_pool) + std::shared_ptr pm) : RequestResponseProtocolImpl< FetchChunkRequest, FetchChunkResponse, ScaleMessageReadWriter>{kFetchChunkProtocolName, - host, + std::move(inject), make_protocols(kFetchChunkProtocol, genesis_hash, kProtocolPrefixPolkadot), log::createLogger(kFetchChunkProtocolName, "req_chunk_protocol"), - main_thread_pool}, + kRequestTimeout}, pp_{std::move(pp)}, pm_{std::move(pm)} { BOOST_ASSERT(pp_); diff --git a/core/network/impl/protocols/protocol_fetch_chunk_obsolete.hpp b/core/network/impl/protocols/protocol_fetch_chunk_obsolete.hpp index 5ee803e4dd..07db071cc6 100644 --- a/core/network/impl/protocols/protocol_fetch_chunk_obsolete.hpp +++ b/core/network/impl/protocols/protocol_fetch_chunk_obsolete.hpp @@ -16,6 +16,7 @@ #include "blockchain/genesis_block_hash.hpp" #include "log/logger.hpp" #include "network/common.hpp" +#include "network/helpers/scale_message_read_writer.hpp" #include "network/impl/protocols/request_response_protocol.hpp" #include "parachain/validator/parachain_processor.hpp" #include "utils/non_copyable.hpp" @@ -37,9 +38,11 @@ namespace kagome::network { ScaleMessageReadWriter>, NonCopyable, NonMovable { + static constexpr std::chrono::seconds kRequestTimeout{1}; + public: FetchChunkProtocolObsoleteImpl( - libp2p::Host &host, + const RequestResponseInject &inject, const application::ChainSpec & /*chain_spec*/, const blockchain::GenesisBlockHash &genesis_hash, std::shared_ptr pp, @@ -48,14 +51,14 @@ namespace kagome::network { FetchChunkRequest, FetchChunkResponseObsolete, ScaleMessageReadWriter>{kFetchChunkProtocolName, - host, + inject, make_protocols( kFetchChunkProtocolObsolete, genesis_hash, kProtocolPrefixPolkadot), log::createLogger(kFetchChunkProtocolName, "req_chunk_protocol"), - main_thread_pool}, + kRequestTimeout}, pp_{std::move(pp)} { BOOST_ASSERT(pp_); } diff --git a/core/network/impl/protocols/protocol_req_collation.cpp b/core/network/impl/protocols/protocol_req_collation.cpp index ea85bd1657..1baebcacb3 100644 --- a/core/network/impl/protocols/protocol_req_collation.cpp +++ b/core/network/impl/protocols/protocol_req_collation.cpp @@ -25,18 +25,19 @@ namespace kagome::network { std::decay_t, ScaleMessageReadWriter>; - ReqCollationProtocolImpl(libp2p::Host &host, + static constexpr std::chrono::seconds kRequestTimeout{2}; + + ReqCollationProtocolImpl(RequestResponseInject inject, const libp2p::peer::ProtocolName &protoname, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis_hash, - std::shared_ptr observer, - common::MainThreadPool &main_thread_pool) + std::shared_ptr observer) : Base{kReqCollationProtocolName, - host, + std::move(inject), make_protocols(protoname, genesis_hash, kProtocolPrefixPolkadot), log::createLogger(kReqCollationProtocolName, "req_collation_protocol"), - main_thread_pool}, + kRequestTimeout}, observer_{std::move(observer)} {} protected: @@ -59,29 +60,22 @@ namespace kagome::network { }; ReqCollationProtocol::ReqCollationProtocol( - libp2p::Host &host, + const RequestResponseInject &inject, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis_hash, - std::shared_ptr observer, - common::MainThreadPool &main_thread_pool) + std::shared_ptr observer) : v1_impl_{std::make_shared< ReqCollationProtocolImpl>( - host, - kReqCollationProtocol, - chain_spec, - genesis_hash, - observer, - main_thread_pool)}, + inject, kReqCollationProtocol, chain_spec, genesis_hash, observer)}, vstaging_impl_{std::make_shared< ReqCollationProtocolImpl>( - host, + inject, kReqCollationVStagingProtocol, chain_spec, genesis_hash, - observer, - main_thread_pool)} { + observer)} { BOOST_ASSERT(v1_impl_); BOOST_ASSERT(vstaging_impl_); } diff --git a/core/network/impl/protocols/protocol_req_collation.hpp b/core/network/impl/protocols/protocol_req_collation.hpp index e84faf452f..66b8dc3aca 100644 --- a/core/network/impl/protocols/protocol_req_collation.hpp +++ b/core/network/impl/protocols/protocol_req_collation.hpp @@ -17,6 +17,7 @@ #include "application/chain_spec.hpp" #include "common/main_thread_pool.hpp" #include "log/logger.hpp" +#include "network/impl/protocols/request_response_protocol.hpp" #include "network/peer_manager.hpp" #include "network/protocols/req_collation_protocol.hpp" #include "network/types/collator_messages_vstaging.hpp" @@ -39,11 +40,10 @@ namespace kagome::network { ReqCollationProtocol() = delete; ~ReqCollationProtocol() override = default; - ReqCollationProtocol(libp2p::Host &host, + ReqCollationProtocol(const RequestResponseInject &inject, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis_hash, - std::shared_ptr observer, - common::MainThreadPool &main_thread_pool); + std::shared_ptr observer); const Protocol &protocolName() const override; diff --git a/core/network/impl/protocols/protocol_req_pov.cpp b/core/network/impl/protocols/protocol_req_pov.cpp index 44b2b5550e..77a8620093 100644 --- a/core/network/impl/protocols/protocol_req_pov.cpp +++ b/core/network/impl/protocols/protocol_req_pov.cpp @@ -20,22 +20,23 @@ namespace kagome::network { ScaleMessageReadWriter>, NonCopyable, NonMovable { - ReqPovProtocolImpl(libp2p::Host &host, + static constexpr std::chrono::seconds kRequestTimeout{2}; + + ReqPovProtocolImpl(RequestResponseInject &&inject, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis_hash, - std::shared_ptr observer, - common::MainThreadPool &main_thread_pool) + std::shared_ptr observer) : RequestResponseProtocolImpl< RequestPov, ResponsePov, ScaleMessageReadWriter>{kReqPovProtocolName, - host, + std::move(inject), make_protocols(kReqPovProtocol, genesis_hash, kProtocolPrefixPolkadot), log::createLogger(kReqPovProtocolName, "req_pov_protocol"), - main_thread_pool}, + kRequestTimeout}, observer_{std::move(observer)} {} protected: @@ -70,16 +71,13 @@ namespace kagome::network { }; ReqPovProtocol::ReqPovProtocol( - libp2p::Host &host, + RequestResponseInject inject, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis_hash, - std::shared_ptr observer, - common::MainThreadPool &main_thread_pool) - : impl_{std::make_shared(host, - chain_spec, - genesis_hash, - std::move(observer), - main_thread_pool)} {} + std::shared_ptr observer) + : impl_{std::make_shared( + std::move(inject), chain_spec, genesis_hash, std::move(observer))} { + } const Protocol &ReqPovProtocol::protocolName() const { BOOST_ASSERT(impl_ && !!"ReqPovProtocolImpl must be initialized!"); diff --git a/core/network/impl/protocols/protocol_req_pov.hpp b/core/network/impl/protocols/protocol_req_pov.hpp index 636b0e003e..497db4629f 100644 --- a/core/network/impl/protocols/protocol_req_pov.hpp +++ b/core/network/impl/protocols/protocol_req_pov.hpp @@ -17,6 +17,7 @@ #include "application/chain_spec.hpp" #include "common/main_thread_pool.hpp" #include "log/logger.hpp" +#include "network/impl/protocols/request_response_protocol.hpp" #include "network/peer_manager.hpp" #include "network/protocols/req_pov_protocol.hpp" #include "utils/non_copyable.hpp" @@ -34,11 +35,10 @@ namespace kagome::network { ReqPovProtocol() = delete; ~ReqPovProtocol() override = default; - ReqPovProtocol(libp2p::Host &host, + ReqPovProtocol(RequestResponseInject inject, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis_hash, - std::shared_ptr observer, - common::MainThreadPool &main_thread_pool); + std::shared_ptr observer); const Protocol &protocolName() const override; diff --git a/core/network/impl/protocols/request_response_protocol.hpp b/core/network/impl/protocols/request_response_protocol.hpp index 340add7849..a00599e0af 100644 --- a/core/network/impl/protocols/request_response_protocol.hpp +++ b/core/network/impl/protocols/request_response_protocol.hpp @@ -6,11 +6,16 @@ #pragma once -#include "network/impl/protocols/protocol_base_impl.hpp" +#include +#include #include "common/main_thread_pool.hpp" -#include "protocol_error.hpp" +#include "metrics/metrics.hpp" +#include "network/helpers/new_stream.hpp" +#include "network/impl/protocols/protocol_base_impl.hpp" +#include "network/impl/protocols/protocol_error.hpp" #include "utils/box.hpp" +#include "utils/weak_macro.hpp" namespace kagome::network { @@ -25,6 +30,94 @@ namespace kagome::network { &&response_handler) = 0; }; + struct RequestResponseInject { + std::shared_ptr host; + std::shared_ptr scheduler; + std::shared_ptr main_thread_pool; + }; + + struct RequestResponseMetrics { + RequestResponseMetrics(const std::string &name) + : timeout_{metric(name, "timeout")}, + success_{metric(name, "success")}, + failure_{metric(name, "failure")}, + lost_{metric(name, "lost")} {} + + static metrics::Counter *metric(const std::string &protocol, + const std::string &type) { + auto name = "kagome_request_response_protocol_result"; + auto help = + "Number of timeout, success, failure results for request response " + "protocols."; + static auto registry = [&] { + auto registry = metrics::createRegistry(); + registry->registerCounterFamily(name, help); + return registry; + }(); + return registry->registerCounterMetric( + name, {{"protocol", protocol}, {"type", type}}); + } + + class Lost { + public: + Lost(const Lost &) = delete; + Lost(Lost &&other) noexcept + : lost_{std::exchange(other.lost_, nullptr)} {} + Lost(RequestResponseMetrics &metrics) : lost_{metrics.lost_} {} + ~Lost() { + if (lost_ != nullptr) { + lost_->inc(); + } + } + void notLost() { + lost_ = nullptr; + } + + // cppcoreguidelines-special-member-functions + Lost &operator=(const Lost &) = delete; + Lost &operator=(Lost &&) = delete; + + private: + metrics::Counter *lost_; + }; + + metrics::Counter *timeout_; + metrics::Counter *success_; + metrics::Counter *failure_; + metrics::Counter *lost_; + }; + + // TODO(turuslan): #2372, RequestResponseProtocol + struct RequestResponseTimeout { + template + static void wrap(const std::shared_ptr &self, + auto &cb, + std::weak_ptr weak_stream) { + auto timer = self->scheduler_->scheduleWithHandle( + [weak_self{std::weak_ptr{self}}, + weak_stream{std::move(weak_stream)}] { + IF_WEAK_LOCK(stream) { + stream->reset(); + IF_WEAK_LOCK(self) { + self->metrics_.timeout_->inc(); + } + } + }, + self->timeout_); + cb = libp2p::SharedFn{[weak_self{std::weak_ptr{self}}, + cb{std::move(cb)}, + lost{RequestResponseMetrics::Lost{self->metrics_}}, + timer{std::move(timer)}](auto r) mutable { + lost.notLost(); + timer.reset(); + IF_WEAK_LOCK(self) { + (r ? self->metrics_.success_ : self->metrics_.failure_)->inc(); + } + cb(std::move(r)); + }}; + } + }; + template struct RequestResponseProtocolImpl : virtual protected ProtocolBase, @@ -35,16 +128,21 @@ namespace kagome::network { using ResponseType = Response; using ReadWriterType = ReadWriter; - RequestResponseProtocolImpl( - Protocol name, - libp2p::Host &host, - Protocols protocols, - log::Logger logger, - common::MainThreadPool &main_thread_pool, - std::chrono::milliseconds timeout = std::chrono::seconds(1)) - : base_(std::move(name), host, std::move(protocols), std::move(logger)), + friend RequestResponseTimeout; + + RequestResponseProtocolImpl(const Protocol &name, + RequestResponseInject inject, + Protocols protocols, + log::Logger logger, + std::chrono::milliseconds timeout) + : base_(name, *inject.host, std::move(protocols), std::move(logger)), + metrics_{name}, + scheduler_{std::move(inject.scheduler)}, timeout_(std::move(timeout)), - main_pool_handler_{main_thread_pool.handlerStarted()} {} + main_pool_handler_{inject.main_thread_pool->handlerStarted()} { + BOOST_ASSERT(scheduler_ != nullptr); + BOOST_ASSERT(main_pool_handler_ != nullptr); + } bool start() override { return base_.start(this->weak_from_this()); @@ -61,7 +159,7 @@ namespace kagome::network { onTxRequest(request); newOutgoingStream( peer_id, - [wptr{this->weak_from_this()}, + [WEAK_SELF, request{std::move(request)}, response_handler{std::move(response_handler)}](auto &&res) mutable { if (res.has_error()) { @@ -71,13 +169,15 @@ namespace kagome::network { auto &stream = res.value(); BOOST_ASSERT(stream); - auto self = wptr.lock(); + auto self = weak_self.lock(); if (!self) { - self->base_.closeStream(std::move(wptr), std::move(stream)); + stream->reset(); response_handler(outcome::failure(ProtocolError::GONE)); return; } + RequestResponseTimeout::wrap(self, response_handler, stream); + SL_DEBUG(self->base_.logger(), "Established outgoing {} stream with {}", self->protocolName(), @@ -123,23 +223,12 @@ namespace kagome::network { "New outgoing {} stream with {}", protocolName(), peer_id); - - auto addresses_res = - base_.host().getPeerRepository().getAddressRepository().getAddresses( - peer_id); - if (not addresses_res.has_value()) { - main_pool_handler_->execute( - [cb(std::move(cb)), addresses_res(std::move(addresses_res))] { - cb(addresses_res.as_failure()); - }); - return; - } - - base_.host().newStream( - PeerInfo{peer_id, std::move(addresses_res.value())}, + newStream( + base_.host(), + peer_id, base_.protocolIds(), [wptr{this->weak_from_this()}, peer_id, cb{std::move(cb)}]( - auto &&stream_and_proto) mutable { + libp2p::StreamAndProtocolOrError &&stream_and_proto) mutable { if (!stream_and_proto.has_value()) { cb(stream_and_proto.as_failure()); return; @@ -377,8 +466,12 @@ namespace kagome::network { }); } + // NOLINTBEGIN(cppcoreguidelines-non-private-member-variables-in-classes) ProtocolBaseImpl base_; + RequestResponseMetrics metrics_; + std::shared_ptr scheduler_; std::chrono::milliseconds timeout_; + // NOLINTEND(cppcoreguidelines-non-private-member-variables-in-classes) private: std::shared_ptr main_pool_handler_; diff --git a/core/network/impl/protocols/send_dispute_protocol.hpp b/core/network/impl/protocols/send_dispute_protocol.hpp index e809f31f34..6d769a7402 100644 --- a/core/network/impl/protocols/send_dispute_protocol.hpp +++ b/core/network/impl/protocols/send_dispute_protocol.hpp @@ -41,24 +41,25 @@ namespace kagome::network { ScaleMessageReadWriter>, NonCopyable, NonMovable { + static constexpr std::chrono::seconds kRequestTimeout{12}; + public: - SendDisputeProtocolImpl(libp2p::Host &host, + SendDisputeProtocolImpl(RequestResponseInject inject, const blockchain::GenesisBlockHash &genesis_hash, std::shared_ptr - dispute_request_observer, - common::MainThreadPool &main_thread_pool) + dispute_request_observer) : RequestResponseProtocolImpl< DisputeRequest, DisputeResponse, ScaleMessageReadWriter>{kSendDisputeProtocolName, - host, + std::move(inject), make_protocols(kSendDisputeProtocol, genesis_hash, kProtocolPrefixPolkadot), log::createLogger( kSendDisputeProtocolName, "dispute_protocol"), - main_thread_pool}, + kRequestTimeout}, dispute_request_observer_{std::move(dispute_request_observer)} { BOOST_ASSERT(dispute_request_observer_); } diff --git a/core/network/impl/protocols/state_protocol_impl.hpp b/core/network/impl/protocols/state_protocol_impl.hpp index faa38ee86b..c56fcfbb28 100644 --- a/core/network/impl/protocols/state_protocol_impl.hpp +++ b/core/network/impl/protocols/state_protocol_impl.hpp @@ -30,6 +30,7 @@ namespace kagome::network { using PeerId = libp2p::peer::PeerId; using PeerInfo = libp2p::peer::PeerInfo; + // TODO(turuslan): #2372, RequestResponseProtocol class StateProtocolImpl final : public StateProtocol, public std::enable_shared_from_this, diff --git a/core/network/impl/protocols/sync_protocol_impl.cpp b/core/network/impl/protocols/sync_protocol_impl.cpp index 143354a6b6..73fc277719 100644 --- a/core/network/impl/protocols/sync_protocol_impl.cpp +++ b/core/network/impl/protocols/sync_protocol_impl.cpp @@ -11,6 +11,7 @@ #include "network/adapters/protobuf_block_request.hpp" #include "network/adapters/protobuf_block_response.hpp" #include "network/common.hpp" +#include "network/helpers/new_stream.hpp" #include "network/helpers/protobuf_message_read_writer.hpp" #include "network/impl/protocols/protocol_error.hpp" #include "network/rpc.hpp" @@ -18,6 +19,7 @@ #include "network/types/blocks_response.hpp" namespace kagome::network { + constexpr std::chrono::seconds kRequestTimeout{20}; namespace detail { BlocksResponseCache::BlocksResponseCache( @@ -126,6 +128,7 @@ namespace kagome::network { SyncProtocolImpl::SyncProtocolImpl( libp2p::Host &host, + std::shared_ptr scheduler, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis_hash, std::shared_ptr sync_observer, @@ -134,10 +137,14 @@ namespace kagome::network { host, make_protocols(kSyncProtocol, genesis_hash, chain_spec), log::createLogger(kSyncProtocolName, "sync_protocol")), + scheduler_{std::move(scheduler)}, + metrics_{kSyncProtocolName}, + timeout_{kRequestTimeout}, sync_observer_(std::move(sync_observer)), reputation_repository_(std::move(reputation_repository)), response_cache_(kResponsesCacheCapacity, kResponsesCacheExpirationTimeout) { + BOOST_ASSERT(scheduler_ != nullptr); BOOST_ASSERT(sync_observer_ != nullptr); BOOST_ASSERT(reputation_repository_ != nullptr); } @@ -159,17 +166,9 @@ namespace kagome::network { "New outgoing {} stream with {}", protocolName(), peer_id); - - auto addresses_res = - base_.host().getPeerRepository().getAddressRepository().getAddresses( - peer_id); - if (not addresses_res.has_value()) { - cb(addresses_res.as_failure()); - return; - } - - base_.host().newStream( - PeerInfo{peer_id, std::move(addresses_res.value())}, + newStream( + base_.host(), + peer_id, base_.protocolIds(), [wp{weak_from_this()}, peer_id, cb = std::move(cb)]( auto &&stream_res) mutable { @@ -469,7 +468,7 @@ namespace kagome::network { newOutgoingStream( peer_id, - [wp{weak_from_this()}, + [WEAK_SELF, response_handler = std::move(response_handler), block_request = std::move(block_request)](auto &&stream_res) mutable { if (not stream_res.has_value()) { @@ -478,7 +477,7 @@ namespace kagome::network { } auto &stream = stream_res.value(); - auto self = wp.lock(); + auto self = weak_self.lock(); if (not self) { stream->reset(); response_handler(ProtocolError::GONE); @@ -490,13 +489,15 @@ namespace kagome::network { self->protocolName(), stream->remotePeerId().value()); + RequestResponseTimeout::wrap(self, response_handler, stream); + self->writeRequest(stream, block_request, [stream, - wp = std::move(wp), + weak_self, response_handler = std::move(response_handler)]( auto &&write_res) mutable { - auto self = wp.lock(); + auto self = weak_self.lock(); if (not self) { stream->reset(); response_handler(ProtocolError::GONE); diff --git a/core/network/impl/protocols/sync_protocol_impl.hpp b/core/network/impl/protocols/sync_protocol_impl.hpp index 82bb67ec83..e86b6ac448 100644 --- a/core/network/impl/protocols/sync_protocol_impl.hpp +++ b/core/network/impl/protocols/sync_protocol_impl.hpp @@ -21,6 +21,7 @@ #include "blockchain/genesis_block_hash.hpp" #include "log/logger.hpp" #include "network/impl/protocols/protocol_base_impl.hpp" +#include "network/impl/protocols/request_response_protocol.hpp" #include "network/reputation_repository.hpp" #include "network/sync_protocol_observer.hpp" #include "utils/non_copyable.hpp" @@ -103,14 +104,18 @@ namespace kagome::network { } // namespace detail + // TODO(turuslan): #2372, RequestResponseProtocol class SyncProtocolImpl final : public SyncProtocol, public std::enable_shared_from_this, NonCopyable, NonMovable { public: + using Response = BlocksResponse; + SyncProtocolImpl( libp2p::Host &host, + std::shared_ptr scheduler, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis_hash, std::shared_ptr sync_observer, @@ -149,6 +154,10 @@ namespace kagome::network { private: inline static const auto kSyncProtocolName = "SyncProtocol"s; ProtocolBaseImpl base_; + friend RequestResponseTimeout; + std::shared_ptr scheduler_; + RequestResponseMetrics metrics_; + std::chrono::milliseconds timeout_; std::shared_ptr sync_observer_; std::shared_ptr reputation_repository_; detail::BlocksResponseCache response_cache_; diff --git a/core/network/notifications/connect_and_handshake.hpp b/core/network/notifications/connect_and_handshake.hpp index 37e3077894..88e5610e83 100644 --- a/core/network/notifications/connect_and_handshake.hpp +++ b/core/network/notifications/connect_and_handshake.hpp @@ -6,6 +6,7 @@ #pragma once +#include "network/helpers/new_stream.hpp" #include "network/helpers/stream_read_buffer.hpp" #include "network/impl/protocols/protocol_base_impl.hpp" #include "network/notifications/handshake.hpp" @@ -64,16 +65,6 @@ namespace kagome::network::notifications { std::move(stream), std::move(frame_stream), handshake, std::move(cb)); }; - auto addresses_res = - base.host().getPeerRepository().getAddressRepository().getAddresses( - peer_id); - if (not addresses_res.has_value()) { - cb(addresses_res.as_failure()); - return; - } - - base.host().newStream(PeerInfo{peer_id, std::move(addresses_res.value())}, - base.protocolIds(), - std::move(cb)); + newStream(base.host(), peer_id, base.protocolIds(), std::move(cb)); } } // namespace kagome::network::notifications diff --git a/core/network/warp/protocol.hpp b/core/network/warp/protocol.hpp index 95fccc4684..60c0d80463 100644 --- a/core/network/warp/protocol.hpp +++ b/core/network/warp/protocol.hpp @@ -31,19 +31,19 @@ namespace kagome::network { WarpResponse, ScaleMessageReadWriter> { static constexpr auto kName = "WarpProtocol"; + static constexpr std::chrono::seconds kRequestTimeout{10}; public: - WarpProtocolImpl(libp2p::Host &host, + WarpProtocolImpl(RequestResponseInject inject, const application::ChainSpec &chain_spec, const blockchain::GenesisBlockHash &genesis, - std::shared_ptr cache, - common::MainThreadPool &main_thread_pool) + std::shared_ptr cache) : RequestResponseProtocolImpl( kName, - host, + std::move(inject), make_protocols(kWarpProtocol, genesis, chain_spec), log::createLogger(kName, "warp_sync_protocol"), - main_thread_pool), + kRequestTimeout), cache_{std::move(cache)} {} std::optional> onRxRequest( diff --git a/core/utils/weak_macro.hpp b/core/utils/weak_macro.hpp index 150981d7a1..e3e0514e9b 100644 --- a/core/utils/weak_macro.hpp +++ b/core/utils/weak_macro.hpp @@ -6,9 +6,9 @@ #pragma once -#define WEAK_SELF \ - weak_self { \ - weak_from_this() \ +#define WEAK_SELF \ + weak_self { \ + this->weak_from_this() \ } #define WEAK_LOCK(name) \