diff --git a/src/lb/formatters.hpp b/src/lb/formatters.hpp index bf26948..3103b4a 100644 --- a/src/lb/formatters.hpp +++ b/src/lb/formatters.hpp @@ -10,4 +10,5 @@ namespace YAML {class Node;} template <> struct fmt::formatter : ostream_formatter{}; template <> struct fmt::formatter : ostream_formatter{}; template <> struct fmt::formatter : ostream_formatter{}; -template <> struct fmt::formatter> : ostream_formatter{}; \ No newline at end of file +template <> struct fmt::formatter> : ostream_formatter{}; +template <> struct fmt::formatter> : ostream_formatter{}; \ No newline at end of file diff --git a/src/lb/tcp/connector.cpp b/src/lb/tcp/connector.cpp index 982f112..751b6e3 100644 --- a/src/lb/tcp/connector.cpp +++ b/src/lb/tcp/connector.cpp @@ -2,6 +2,8 @@ #include #include +using SocketType = lb::tcp::HttpSession::SocketType; + namespace lb::tcp { Connector::Connector(boost::asio::io_context& ctx, SelectorPtr selector) @@ -10,38 +12,100 @@ Connector::Connector(boost::asio::io_context& ctx, SelectorPtr selector) , selector(selector) {} -SessionPtr MakeSession(SelectorPtr& selector, boost::asio::ip::tcp::socket client_socket, boost::asio::ip::tcp::socket server_socket, Backend backend) + +class LeastConnectionsCallbacks : public StateNotifier +{ +public: + LeastConnectionsCallbacks(Backend backend, SelectorPtr selector) + : StateNotifier() + , backend(std::move(backend)) + , selector(std::dynamic_pointer_cast(selector)) + {} + + void OnConnect() override + { + selector->IncreaseConnectionCount(backend); + } + + void OnDisconnect() override + { + selector->DecreaseConnectionCount(backend); + } + + using StateNotifier::StateNotifier; +private: + Backend backend; + std::shared_ptr selector; +}; + + +class LeastResponseTimeCallbacks : public StateNotifier { +public: + using TimeType = decltype(std::chrono::high_resolution_clock::now()); +public: + LeastResponseTimeCallbacks(Backend backend, SelectorPtr selector) + : StateNotifier() + , backend(std::move(backend)) + , selector(std::dynamic_pointer_cast(selector)) + {} + + void OnRequestSent() override + { + response_begin = std::chrono::high_resolution_clock::now(); + } - switch(selector->Type()) { - case SelectorType::LEAST_CONNECTIONS: { - std::shared_ptr lc_selector = std::dynamic_pointer_cast(selector); - return std::make_shared(std::move(client_socket), std::move(server_socket), lc_selector, backend); - } break; - case SelectorType::LEAST_RESPONSE_TIME: { - std::shared_ptr lrt_selector = std::dynamic_pointer_cast(selector); - return std::make_shared(std::move(client_socket), std::move(server_socket), lrt_selector, backend); - } break; - default: { - return std::make_shared(std::move(client_socket), std::move(server_socket)); - } + void OnResponseReceive() override + { + response_end = std::chrono::high_resolution_clock::now(); + std::chrono::duration duration = response_end - response_begin; + selector->AddResponseTime(backend, duration.count()); + } + + using StateNotifier::StateNotifier; +private: + Backend backend; + std::shared_ptr selector; + TimeType response_begin; + TimeType response_end; +}; + +SessionPtr MakeSession(SelectorPtr& selector, + SocketType client_socket, + SocketType server_socket, + Backend backend) +{ + switch (selector->Type()) { + case SelectorType::LEAST_CONNECTIONS: + { + return std::make_shared(std::move(client_socket), std::move(server_socket), + std::make_unique(std::move(backend), selector)); + } break; + + case SelectorType::LEAST_RESPONSE_TIME: + { + return std::make_shared(std::move(client_socket), std::move(server_socket), + std::make_unique(std::move(backend), selector)); + } break; + default: + return std::make_shared(std::move(client_socket), std::move(server_socket)); } } -void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket) +void Connector::MakeAndRunSession(SocketType client_socket) { - // TODO: selection of backend DEBUG("In connector"); Backend backend = selector->SelectBackend(client_socket.remote_endpoint()); if (backend.IsIpEndpoint()) { DEBUG("Is ip endpoint"); - auto server_socket = std::make_shared(client_socket.get_executor()); + auto server_socket = std::make_shared(client_socket.get_executor()); server_socket->async_connect( backend.AsEndpoint(), - [this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)] (const boost::system::error_code& error) mutable + [this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)] + (const boost::system::error_code& error) mutable { if (error) { ERROR("{}", error.message()); @@ -56,7 +120,7 @@ void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket) }); } else if (backend.IsUrl()) { DEBUG("Is url"); - auto server_socket = std::make_shared(client_socket.get_executor()); + auto server_socket = std::make_shared(client_socket.get_executor()); const auto& url = backend.AsUrl(); DEBUG("URL: hostname: {}, port: {}", url.Hostname(), url.Port()); diff --git a/src/lb/tcp/connector.hpp b/src/lb/tcp/connector.hpp index e8b3b17..f79990e 100644 --- a/src/lb/tcp/connector.hpp +++ b/src/lb/tcp/connector.hpp @@ -21,8 +21,7 @@ class Connector { Connector& operator=(Connector&&) = delete; void MakeAndRunSession(boost::asio::ip::tcp::socket client); -private: - void HandleAsyncResolve(const boost::system::error_code& ec, ResolverResults results, boost::asio::ip::tcp::socket client_socket); + private: boost::asio::io_context& ioc; boost::asio::ip::tcp::resolver resolver; diff --git a/src/lb/tcp/selectors.cpp b/src/lb/tcp/selectors.cpp index 5ebc410..b61ea5a 100644 --- a/src/lb/tcp/selectors.cpp +++ b/src/lb/tcp/selectors.cpp @@ -152,6 +152,7 @@ SelectorPtr DetectSelector(const YAML::Node& node) // ============================ RoundRobinSelector ============================ void RoundRobinSelector::Configure(const YAML::Node &balancing_node) { + INFO("Configuring RoundRobinSelector"); if (!balancing_node["endpoints"].IsDefined()) { STACKTRACE("Round-robin endpoints node is missed"); } @@ -219,6 +220,7 @@ std::size_t ReadWeight(const YAML::Node& ep) void WeightedRoundRobinSelector::Configure(const YAML::Node &balancing_node) { + INFO("Configuring WeightedRoundRobinSelector"); if (!balancing_node["endpoints"].IsDefined()) { STACKTRACE("Weighted-round-robin endpoints node is missed"); } @@ -314,6 +316,7 @@ void WeightedRoundRobinSelector::AdvanceCounter() void IpHashSelector::Configure(const YAML::Node& balancing_node) { + INFO("Configuring IpHashSelector"); if (!balancing_node["endpoints"].IsDefined()) { STACKTRACE("Ip-hash endpoints node is missed"); } @@ -375,7 +378,9 @@ SelectorType IpHashSelector::Type() const BackendCHTraits::HashType BackendCHTraits::GetHash(const Backend& backend) { - return std::hash{}(backend.ToString()); + std::size_t res = std::hash{}(backend.ToString()); + DEBUG("Hash: {}", res); + return res; } std::vector @@ -398,6 +403,7 @@ ConsistentHashSelector::ConsistentHashSelector(std::size_t spawn_replicas) void ConsistentHashSelector::Configure(const YAML::Node &balancing_node) { + INFO("Configuring ConsistentHashSelector"); if (!balancing_node["endpoints"].IsDefined()) { STACKTRACE("Consistent hash endpoints node is missed"); } @@ -429,6 +435,7 @@ Backend ConsistentHashSelector::SelectBackend(const boost::asio::ip::tcp::endpoi { boost::mutex::scoped_lock lock(mutex_); Backend result = ring_.SelectNode(client); + DEBUG("Selected: {}", result); return result; } @@ -436,6 +443,7 @@ void ConsistentHashSelector::ExcludeBackend(const Backend& backend) { boost::mutex::scoped_lock lock(mutex_); ring_.EraseNode(backend); + DEBUG("Excluded: {}", backend); if (ring_.Empty()) { EXCEPTION("All backends are excluded!"); } @@ -449,6 +457,7 @@ SelectorType ConsistentHashSelector::Type() const // ============================ LeastConnectionsSelector ============================ void LeastConnectionsSelector::Configure(const YAML::Node& config) { + INFO("Configuring LeastConnectionsSelector"); if (!config["endpoints"].IsDefined()) { STACKTRACE("Least connections endpoints node is missed"); } @@ -546,6 +555,7 @@ void LeastConnectionsSelector::DecreaseConnectionCount(const Backend& backend) void LeastResponseTimeSelector::Configure(const YAML::Node& config) { + INFO("Configuring LeastResponseTimeSelector"); if (!config["endpoints"].IsDefined()) { STACKTRACE("Least response time endpoints node is missed"); } diff --git a/src/lb/tcp/session.cpp b/src/lb/tcp/session.cpp index 6f3f637..6d0004b 100644 --- a/src/lb/tcp/session.cpp +++ b/src/lb/tcp/session.cpp @@ -5,51 +5,58 @@ #include + + namespace lb { namespace tcp { -// ================================= HttpSession ================================= -HttpSession::HttpSession(boost::asio::ip::tcp::socket client, - boost::asio::ip::tcp::socket server) +HttpSession::HttpSession(SocketType client_socket, SocketType server_socket, VisitorPtr visitor) : BasicSession() - , client_socket(std::move(client)) - , server_socket(std::move(server)) + , client_stream_(std::move(client_socket)) + , server_stream_(std::move(server_socket)) , id(generateId()) + , visitor_(std::move(visitor)) { - cb.prepare(BUFFER_SIZE); - sb.prepare(BUFFER_SIZE); - DEBUG("HttpSession id:{} constructed", id); + DEBUG("HttpSession id:{} created", id); } void HttpSession::Run() { + if (visitor_) { + visitor_->OnConnect(); + } ClientRead(); - ServerRead(); } -bool NeedErrorLogging(const boost::system::error_code& ec) +bool NeedErrorLogging(const HttpSession::ErrorCode& ec) { return ec != boost::asio::error::eof && ec != boost::beast::http::error::end_of_stream && ec != boost::asio::error::operation_aborted; } -// Client->Server communication callbacks-chain + void HttpSession::ClientRead() { - cr.clear(); - boost::beast::http::async_read( - client_socket, - cb, - cr, - [self=shared_from_this()] (const boost::system::error_code& ec, std::size_t length) { + namespace http = boost::beast::http; + + client_buffer_.clear(); + server_buffer_.clear(); + client_request_.clear(); + server_response_.clear(); + + http::async_read( + client_stream_, + client_buffer_, + client_request_, + [self=shared_from_this()](ErrorCode ec, std::size_t length){ self->HandleClientRead(ec, length); } ); } -void HttpSession::HandleClientRead(boost::system::error_code ec, std::size_t length) +void HttpSession::HandleClientRead(ErrorCode ec, std::size_t length) { if (ec) { if (NeedErrorLogging(ec)) { @@ -58,22 +65,27 @@ void HttpSession::HandleClientRead(boost::system::error_code ec, std::size_t len Cancel(); return; } - //DEBUG("sid:{} client-msg:{}", id, client_buffer); + + if (visitor_) { + visitor_->OnRequestReceive(); + } SendToServer(); } void HttpSession::SendToServer() { - boost::beast::http::async_write( - server_socket, - cr, - [self=shared_from_this()](boost::system::error_code ec, std::size_t length) { - self->HandleSendToServer(ec, length); + namespace http = boost::beast::http; + + http::async_write( + server_stream_, + client_request_, + [self=shared_from_this()](ErrorCode ec, std::size_t length){ + self->HandleSendToServer(ec, length); }); } -void HttpSession::HandleSendToServer(boost::system::error_code ec, std::size_t length) +void HttpSession::HandleSendToServer(ErrorCode ec, std::size_t length) { if (ec) { if (NeedErrorLogging(ec)) { @@ -82,25 +94,28 @@ void HttpSession::HandleSendToServer(boost::system::error_code ec, std::size_t l Cancel(); return; } - - ClientRead(); + DEBUG("sid: {} sent to server", id); + if (visitor_) { + visitor_->OnRequestSent(); + } + ServerRead(); } -// Server->Client communication callbacks-chain + void HttpSession::ServerRead() { - sr.clear(); - boost::beast::http::async_read( - server_socket, - sb, - sr, - [self=shared_from_this()] (const boost::system::error_code& ec, std::size_t length) { + namespace http = boost::beast::http; + http::async_read( + server_stream_, + server_buffer_, + server_response_, + [self=shared_from_this()](ErrorCode ec, std::size_t length){ self->HandleServerRead(ec, length); } ); } -void HttpSession::HandleServerRead(boost::system::error_code ec, std::size_t length) +void HttpSession::HandleServerRead(ErrorCode ec, std::size_t length) { if (ec) { if (NeedErrorLogging(ec)) { @@ -109,30 +124,41 @@ void HttpSession::HandleServerRead(boost::system::error_code ec, std::size_t len Cancel(); return; } + if (visitor_) { + visitor_->OnResponseReceive(); + } SendToClient(); } void HttpSession::SendToClient() { - boost::beast::http::async_write(client_socket, sr, - [self=shared_from_this()](boost::system::error_code ec, std::size_t length){ - self->HandleSendToClient(ec, length); - }); + namespace http = boost::beast::http; + http::async_write( + client_stream_, + server_response_, + [self=shared_from_this()](ErrorCode ec, std::size_t length){ + self->HandleSendToClient(ec, length); + } + ); } -void HttpSession::HandleSendToClient(boost::system::error_code ec, std::size_t length) { +void HttpSession::HandleSendToClient(ErrorCode ec, std::size_t length) { if (ec) { if (NeedErrorLogging(ec)) { SERROR("sid:{} {}", id, ec.message()); } Cancel(); + return; } - ServerRead(); + if (visitor_) { + visitor_->OnResponseSent(); + } + ClientRead(); } -void CloseSocket(boost::asio::ip::tcp::socket& socket) +void CloseSocket(HttpSession::SocketType& socket) { - boost::system::error_code ec; + HttpSession::ErrorCode ec; if (socket.is_open()) { socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); if (!ec) { @@ -144,11 +170,14 @@ void CloseSocket(boost::asio::ip::tcp::socket& socket) } } - // Cancel all unfinished async operartions on boths sockets + void HttpSession::Cancel() { - CloseSocket(client_socket); - CloseSocket(server_socket); + CloseSocket(client_stream_.socket()); + CloseSocket(server_stream_.socket()); + if (visitor_) { + visitor_->OnDisconnect(); + } } HttpSession::~HttpSession() @@ -168,300 +197,6 @@ const HttpSession::IdType& HttpSession::Id() const return id; } - -// ================================= LeastConnectionsHttpSession ================================= -LeastConnectionsHttpSession::LeastConnectionsHttpSession(boost::asio::ip::tcp::socket client, - boost::asio::ip::tcp::socket server, - SelectorType selector, Backend server_backend) - : BasicSession() - , client_socket(std::move(client)) - , server_socket(std::move(server)) - , id(generateId()) - , lc_selector(std::move(selector)) - , server_backend(std::move(server_backend)) -{ - cb.prepare(BUFFER_SIZE); - sb.prepare(BUFFER_SIZE); - DEBUG("LeastConnectionsHttpSession id:{} constructed", id); - // selector->IncreaseConnectionCount(server_backend); -} - -void LeastConnectionsHttpSession::Run() -{ - ClientRead(); - ServerRead(); -} - - -// Client->Server communication callbacks-chain -void LeastConnectionsHttpSession::ClientRead() -{ - cr.clear(); - boost::beast::http::async_read( - client_socket, - cb, - cr, - [self=shared_from_this()] (const boost::system::error_code& ec, std::size_t length) { - self->HandleClientRead(ec, length); - } - ); -} - -void LeastConnectionsHttpSession::HandleClientRead(boost::system::error_code ec, std::size_t length) -{ - if (ec) { - if (NeedErrorLogging(ec)) { - SERROR("sid:{} {}", id, ec.message()); - } - Cancel(); - return; - } - //DEBUG("sid:{} client-msg:{}", id, client_buffer); - SendToServer(); -} - -void LeastConnectionsHttpSession::SendToServer() -{ - boost::beast::http::async_write( - server_socket, - cr, - [self=shared_from_this()](boost::system::error_code ec, std::size_t length) { - self->HandleSendToServer(ec, length); - }); -} - - -void LeastConnectionsHttpSession::HandleSendToServer(boost::system::error_code ec, std::size_t length) -{ - if (ec) { - if (NeedErrorLogging(ec)) { - SERROR("sid:{} {}", id, ec.message()); - } - Cancel(); - return; - } - - ClientRead(); -} - -// Server->Client communication callbacks-chain -void LeastConnectionsHttpSession::ServerRead() -{ - sr.clear(); - boost::beast::http::async_read( - server_socket, - sb, - sr, - [self=shared_from_this()] (const boost::system::error_code& ec, std::size_t length) { - self->HandleServerRead(ec, length); - } - ); -} - -void LeastConnectionsHttpSession::HandleServerRead(boost::system::error_code ec, std::size_t length) -{ - if (ec) { - if (NeedErrorLogging(ec)) { - SERROR("sid:{} {}", id, ec.message()); - } - Cancel(); - return; - } - SendToClient(); -} - -void LeastConnectionsHttpSession::SendToClient() -{ - boost::beast::http::async_write(client_socket, sr, - [self=shared_from_this()](boost::system::error_code ec, std::size_t length){ - self->HandleSendToClient(ec, length); - }); -} - -void LeastConnectionsHttpSession::HandleSendToClient(boost::system::error_code ec, std::size_t length) { - if (ec) { - if (NeedErrorLogging(ec)) { - SERROR("sid:{} {}", id, ec.message()); - } - Cancel(); - } - ServerRead(); -} - -void LeastConnectionsHttpSession::Cancel() -{ - CloseSocket(client_socket); - CloseSocket(server_socket); - lc_selector->DecreaseConnectionCount(server_backend); -} - - -LeastConnectionsHttpSession::IdType LeastConnectionsHttpSession::generateId() -{ - static std::atomic id = 0; - LeastConnectionsHttpSession::IdType result = id.fetch_add(1, std::memory_order_relaxed); - return result; -} - -const LeastConnectionsHttpSession::IdType& LeastConnectionsHttpSession::Id() const -{ - return id; -} - -LeastConnectionsHttpSession::~LeastConnectionsHttpSession() -{ - Cancel(); - DEBUG("LeastConnectionsHttpSession id:{} destructed", id); -} - - -// ================================= LeastResponseTimeHttpSession ================================= -LeastResponseTimeHttpSession::LeastResponseTimeHttpSession(boost::asio::ip::tcp::socket client, - boost::asio::ip::tcp::socket server, - SelectorType selector, Backend server_backend) - : BasicSession() - , client_socket(std::move(client)) - , server_socket(std::move(server)) - , id(generateId()) - , lrt_selector(std::move(selector)) - , server_backend(std::move(server_backend)) -{ - cb.prepare(BUFFER_SIZE); - sb.prepare(BUFFER_SIZE); - DEBUG("LeastConnectionsHttpSession id:{} constructed", id); - // selector->IncreaseConnectionCount(server_backend); -} - -void LeastResponseTimeHttpSession::Run() -{ - ClientRead(); - ServerRead(); -} - - -// Client->Server communication callbacks-chain -void LeastResponseTimeHttpSession::ClientRead() -{ - cr.clear(); - boost::beast::http::async_read( - client_socket, - cb, - cr, - [self=shared_from_this()] (const boost::system::error_code& ec, std::size_t length) { - self->HandleClientRead(ec, length); - } - ); -} - -void LeastResponseTimeHttpSession::HandleClientRead(boost::system::error_code ec, std::size_t length) -{ - if (ec) { - if (NeedErrorLogging(ec)) { - SERROR("sid:{} {}", id, ec.message()); - } - Cancel(); - return; - } - //DEBUG("sid:{} client-msg:{}", id, client_buffer); - SendToServer(); -} - -void LeastResponseTimeHttpSession::SendToServer() -{ - boost::beast::http::async_write( - server_socket, - cr, - [self=shared_from_this()](boost::system::error_code ec, std::size_t length) { - self->HandleSendToServer(ec, length); - }); -} - - -void LeastResponseTimeHttpSession::HandleSendToServer(boost::system::error_code ec, std::size_t length) -{ - if (ec) { - if (NeedErrorLogging(ec)) { - SERROR("sid:{} {}", id, ec.message()); - } - Cancel(); - return; - } - - ClientRead(); -} - -// Server->Client communication callbacks-chain -void LeastResponseTimeHttpSession::ServerRead() -{ - sr.clear(); - response_begin = boost::posix_time::microsec_clock::local_time(); - boost::beast::http::async_read( - server_socket, - sb, - sr, - [self=shared_from_this()] (const boost::system::error_code& ec, std::size_t length) { - self->HandleServerRead(ec, length); - } - ); -} - -void LeastResponseTimeHttpSession::HandleServerRead(boost::system::error_code ec, std::size_t length) -{ - if (ec) { - if (NeedErrorLogging(ec)) { - SERROR("sid:{} {}", id, ec.message()); - } - Cancel(); - return; - } - response_end = boost::posix_time::microsec_clock::local_time(); - long response_time = (response_end - response_begin).total_microseconds(); - lrt_selector->AddResponseTime(server_backend, response_time); - SendToClient(); -} - -void LeastResponseTimeHttpSession::SendToClient() -{ - boost::beast::http::async_write(client_socket, sr, - [self=shared_from_this()](boost::system::error_code ec, std::size_t length){ - self->HandleSendToClient(ec, length); - }); -} - -void LeastResponseTimeHttpSession::HandleSendToClient(boost::system::error_code ec, std::size_t length) { - if (ec) { - if (NeedErrorLogging(ec)) { - SERROR("sid:{} {}", id, ec.message()); - } - Cancel(); - } - ServerRead(); -} - -void LeastResponseTimeHttpSession::Cancel() -{ - CloseSocket(client_socket); - CloseSocket(server_socket); -} - - -LeastResponseTimeHttpSession::IdType LeastResponseTimeHttpSession::generateId() -{ - static std::atomic id = 0; - LeastResponseTimeHttpSession::IdType result = id.fetch_add(1, std::memory_order_relaxed); - return result; -} - -const LeastResponseTimeHttpSession::IdType& LeastResponseTimeHttpSession::Id() const -{ - return id; -} - -LeastResponseTimeHttpSession::~LeastResponseTimeHttpSession() -{ - Cancel(); - DEBUG("LeastConnectionsHttpSession id:{} destructed", id); -} } // namespace tcp } // namespace lb \ No newline at end of file diff --git a/src/lb/tcp/session.hpp b/src/lb/tcp/session.hpp index 9ae0497..6354159 100644 --- a/src/lb/tcp/session.hpp +++ b/src/lb/tcp/session.hpp @@ -18,14 +18,31 @@ struct BasicSession { using SessionPtr = std::shared_ptr; +struct StateNotifier { + virtual void OnConnect() {}; + virtual void OnDisconnect() {}; + virtual void OnResponseReceive() {}; + virtual void OnResponseSent() {}; + virtual void OnRequestReceive() {}; + virtual void OnRequestSent() {}; + virtual ~StateNotifier() = default; +}; + +using VisitorPtr = std::unique_ptr; + class HttpSession : public BasicSession, public std::enable_shared_from_this { public: - using IdType = std::size_t; - static constexpr const std::size_t BUFFER_SIZE = 4096; + using IdType = std::size_t; + using SocketType = boost::asio::ip::tcp::socket; + using EndpointType = boost::asio::ip::tcp::endpoint; + using TcpStream = boost::beast::tcp_stream; + using BufferType = boost::beast::flat_buffer; + using RequestType = boost::beast::http::request; + using ResponseType = boost::beast::http::response; + using ErrorCode = boost::system::error_code; public: - HttpSession(boost::asio::ip::tcp::socket client, - boost::asio::ip::tcp::socket server); + HttpSession(SocketType client_socket, SocketType server_socket, VisitorPtr visitor=nullptr); HttpSession(const HttpSession&) = delete; HttpSession& operator=(const HttpSession&) = delete; @@ -33,139 +50,31 @@ class HttpSession : public BasicSession, void Run() override; - // Cancel all unfinished async operartions on boths sockets void Cancel() override; const IdType& Id() const; protected: - - // Client->Server communication callbacks-chain void ClientRead(); - void HandleClientRead(boost::system::error_code ec, std::size_t length); + void HandleClientRead(ErrorCode ec, std::size_t length); void SendToServer(); - void HandleSendToServer(boost::system::error_code ec, std::size_t length); - - // Server->Client communication callbacks-chain + void HandleSendToServer(ErrorCode ec, std::size_t length); void ServerRead(); - void HandleServerRead(boost::system::error_code ec, std::size_t length); + void HandleServerRead(ErrorCode ec, std::size_t length); void SendToClient(); - void HandleSendToClient(boost::system::error_code ec, std::size_t length); + void HandleSendToClient(ErrorCode ec, std::size_t length); protected: static IdType generateId(); protected: - boost::asio::ip::tcp::socket client_socket; - boost::asio::ip::tcp::socket server_socket; - boost::asio::streambuf cb; - boost::asio::streambuf sb; + TcpStream client_stream_; + TcpStream server_stream_; + BufferType client_buffer_; + BufferType server_buffer_; + RequestType client_request_; + ResponseType server_response_; IdType id; - boost::beast::http::request cr; - boost::beast::http::response sr; - boost::mutex mutex; + VisitorPtr visitor_; }; - -class LeastConnectionsHttpSession : public BasicSession, - public std::enable_shared_from_this { -public: - using IdType = std::size_t; - static constexpr const std::size_t BUFFER_SIZE = 4096; - using SelectorType = std::shared_ptr; -public: - LeastConnectionsHttpSession(boost::asio::ip::tcp::socket client, - boost::asio::ip::tcp::socket server, - SelectorType selector, Backend server_backend); - - LeastConnectionsHttpSession(const LeastConnectionsHttpSession&) = delete; - LeastConnectionsHttpSession& operator=(const LeastConnectionsHttpSession&) = delete; - ~LeastConnectionsHttpSession() noexcept; - - void Run() override; - - // Cancel all unfinished async operartions on boths sockets - void Cancel() override; - - const IdType& Id() const; -protected: - - // Client->Server communication callbacks-chain - void ClientRead(); - void HandleClientRead(boost::system::error_code ec, std::size_t length); - void SendToServer(); - void HandleSendToServer(boost::system::error_code ec, std::size_t length); - - // Server->Client communication callbacks-chain - void ServerRead(); - void HandleServerRead(boost::system::error_code ec, std::size_t length); - void SendToClient(); - void HandleSendToClient(boost::system::error_code ec, std::size_t length); -protected: - static IdType generateId(); -protected: - boost::asio::ip::tcp::socket client_socket; - boost::asio::ip::tcp::socket server_socket; - boost::asio::streambuf cb; - boost::asio::streambuf sb; - IdType id; - boost::beast::http::request cr; - boost::beast::http::response sr; - boost::mutex mutex; - SelectorType lc_selector; - Backend server_backend; -}; - - - -class LeastResponseTimeHttpSession : public BasicSession, - public std::enable_shared_from_this { -public: - using IdType = std::size_t; - static constexpr const std::size_t BUFFER_SIZE = 4096; - using SelectorType = std::shared_ptr; - using TimeType = boost::posix_time::ptime; -public: - LeastResponseTimeHttpSession(boost::asio::ip::tcp::socket client, - boost::asio::ip::tcp::socket server, - SelectorType selector, Backend server_backend); - - LeastResponseTimeHttpSession(const LeastResponseTimeHttpSession&) = delete; - LeastResponseTimeHttpSession& operator=(const LeastResponseTimeHttpSession&) = delete; - ~LeastResponseTimeHttpSession() noexcept; - - void Run() override; - - // Cancel all unfinished async operartions on boths sockets - void Cancel() override; - - const IdType& Id() const; -protected: - - // Client->Server communication callbacks-chain - void ClientRead(); - void HandleClientRead(boost::system::error_code ec, std::size_t length); - void SendToServer(); - void HandleSendToServer(boost::system::error_code ec, std::size_t length); - - // Server->Client communication callbacks-chain - void ServerRead(); - void HandleServerRead(boost::system::error_code ec, std::size_t length); - void SendToClient(); - void HandleSendToClient(boost::system::error_code ec, std::size_t length); -protected: - static IdType generateId(); -protected: - boost::asio::ip::tcp::socket client_socket; - boost::asio::ip::tcp::socket server_socket; - boost::asio::streambuf cb; - boost::asio::streambuf sb; - IdType id; - boost::beast::http::request cr; - boost::beast::http::response sr; - boost::mutex mutex; - SelectorType lrt_selector; - Backend server_backend; - TimeType response_begin; - TimeType response_end; -}; } // namespace tcp