Skip to content

Commit

Permalink
Fix: http issues
Browse files Browse the repository at this point in the history
  • Loading branch information
bersen66 committed May 9, 2024
1 parent a746e01 commit 25bf506
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 485 deletions.
3 changes: 2 additions & 1 deletion src/lb/formatters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ namespace YAML {class Node;}
template <> struct fmt::formatter<boost::stacktrace::stacktrace> : ostream_formatter{};
template <> struct fmt::formatter<YAML::Node> : ostream_formatter{};
template <> struct fmt::formatter<lb::tcp::Backend> : ostream_formatter{};
template <> struct fmt::formatter<boost::beast::http::request<boost::beast::http::string_body>> : ostream_formatter{};
template <> struct fmt::formatter<boost::beast::http::request<boost::beast::http::string_body>> : ostream_formatter{};
template <> struct fmt::formatter<boost::beast::http::response<boost::beast::http::string_body>> : ostream_formatter{};
100 changes: 82 additions & 18 deletions src/lb/tcp/connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include <lb/tcp/connector.hpp>
#include <lb/tcp/session.hpp>

using SocketType = lb::tcp::HttpSession::SocketType;

namespace lb::tcp {

Connector::Connector(boost::asio::io_context& ctx, SelectorPtr selector)
Expand All @@ -10,38 +12,100 @@ Connector::Connector(boost::asio::io_context& ctx, SelectorPtr selector)
, selector(selector)
{}

SessionPtr MakeSession(SelectorPtr& selector, boost::asio::ip::tcp::socket client_socket, boost::asio::ip::tcp::socket server_socket, Backend backend)

class LeastConnectionsCallbacks : public StateNotifier
{
public:
LeastConnectionsCallbacks(Backend backend, SelectorPtr selector)
: StateNotifier()
, backend(std::move(backend))
, selector(std::dynamic_pointer_cast<LeastConnectionsSelector>(selector))
{}

void OnConnect() override
{
selector->IncreaseConnectionCount(backend);
}

void OnDisconnect() override
{
selector->DecreaseConnectionCount(backend);
}

using StateNotifier::StateNotifier;
private:
Backend backend;
std::shared_ptr<LeastConnectionsSelector> selector;
};


class LeastResponseTimeCallbacks : public StateNotifier
{
public:
using TimeType = decltype(std::chrono::high_resolution_clock::now());
public:
LeastResponseTimeCallbacks(Backend backend, SelectorPtr selector)
: StateNotifier()
, backend(std::move(backend))
, selector(std::dynamic_pointer_cast<LeastResponseTimeSelector>(selector))
{}

void OnRequestSent() override
{
response_begin = std::chrono::high_resolution_clock::now();
}

switch(selector->Type()) {
case SelectorType::LEAST_CONNECTIONS: {
std::shared_ptr<LeastConnectionsSelector> lc_selector = std::dynamic_pointer_cast<LeastConnectionsSelector>(selector);
return std::make_shared<LeastConnectionsHttpSession>(std::move(client_socket), std::move(server_socket), lc_selector, backend);
} break;
case SelectorType::LEAST_RESPONSE_TIME: {
std::shared_ptr<LeastResponseTimeSelector> lrt_selector = std::dynamic_pointer_cast<LeastResponseTimeSelector>(selector);
return std::make_shared<LeastResponseTimeHttpSession>(std::move(client_socket), std::move(server_socket), lrt_selector, backend);
} break;
default: {
return std::make_shared<HttpSession>(std::move(client_socket), std::move(server_socket));
}
void OnResponseReceive() override
{
response_end = std::chrono::high_resolution_clock::now();
std::chrono::duration<long, std::nano> duration = response_end - response_begin;
selector->AddResponseTime(backend, duration.count());
}

using StateNotifier::StateNotifier;
private:
Backend backend;
std::shared_ptr<LeastResponseTimeSelector> selector;
TimeType response_begin;
TimeType response_end;
};

SessionPtr MakeSession(SelectorPtr& selector,
SocketType client_socket,
SocketType server_socket,
Backend backend)
{
switch (selector->Type()) {
case SelectorType::LEAST_CONNECTIONS:
{
return std::make_shared<HttpSession>(std::move(client_socket), std::move(server_socket),
std::make_unique<LeastConnectionsCallbacks>(std::move(backend), selector));
} break;

case SelectorType::LEAST_RESPONSE_TIME:
{
return std::make_shared<HttpSession>(std::move(client_socket), std::move(server_socket),
std::make_unique<LeastResponseTimeCallbacks>(std::move(backend), selector));
} break;
default:
return std::make_shared<HttpSession>(std::move(client_socket), std::move(server_socket));
}
}


void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket)
void Connector::MakeAndRunSession(SocketType client_socket)
{
// TODO: selection of backend
DEBUG("In connector");
Backend backend = selector->SelectBackend(client_socket.remote_endpoint());

if (backend.IsIpEndpoint()) {
DEBUG("Is ip endpoint");
auto server_socket = std::make_shared<boost::asio::ip::tcp::socket>(client_socket.get_executor());
auto server_socket = std::make_shared<SocketType>(client_socket.get_executor());

server_socket->async_connect(
backend.AsEndpoint(),
[this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)] (const boost::system::error_code& error) mutable
[this, server_socket, client_socket=std::move(client_socket), backend=std::move(backend)]
(const boost::system::error_code& error) mutable
{
if (error) {
ERROR("{}", error.message());
Expand All @@ -56,7 +120,7 @@ void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket)
});
} else if (backend.IsUrl()) {
DEBUG("Is url");
auto server_socket = std::make_shared<boost::asio::ip::tcp::socket>(client_socket.get_executor());
auto server_socket = std::make_shared<SocketType>(client_socket.get_executor());

const auto& url = backend.AsUrl();
DEBUG("URL: hostname: {}, port: {}", url.Hostname(), url.Port());
Expand Down
3 changes: 1 addition & 2 deletions src/lb/tcp/connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ class Connector {
Connector& operator=(Connector&&) = delete;

void MakeAndRunSession(boost::asio::ip::tcp::socket client);
private:
void HandleAsyncResolve(const boost::system::error_code& ec, ResolverResults results, boost::asio::ip::tcp::socket client_socket);

private:
boost::asio::io_context& ioc;
boost::asio::ip::tcp::resolver resolver;
Expand Down
12 changes: 11 additions & 1 deletion src/lb/tcp/selectors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ SelectorPtr DetectSelector(const YAML::Node& node)
// ============================ RoundRobinSelector ============================
void RoundRobinSelector::Configure(const YAML::Node &balancing_node)
{
INFO("Configuring RoundRobinSelector");
if (!balancing_node["endpoints"].IsDefined()) {
STACKTRACE("Round-robin endpoints node is missed");
}
Expand Down Expand Up @@ -219,6 +220,7 @@ std::size_t ReadWeight(const YAML::Node& ep)

void WeightedRoundRobinSelector::Configure(const YAML::Node &balancing_node)
{
INFO("Configuring WeightedRoundRobinSelector");
if (!balancing_node["endpoints"].IsDefined()) {
STACKTRACE("Weighted-round-robin endpoints node is missed");
}
Expand Down Expand Up @@ -314,6 +316,7 @@ void WeightedRoundRobinSelector::AdvanceCounter()

void IpHashSelector::Configure(const YAML::Node& balancing_node)
{
INFO("Configuring IpHashSelector");
if (!balancing_node["endpoints"].IsDefined()) {
STACKTRACE("Ip-hash endpoints node is missed");
}
Expand Down Expand Up @@ -375,7 +378,9 @@ SelectorType IpHashSelector::Type() const

BackendCHTraits::HashType BackendCHTraits::GetHash(const Backend& backend)
{
return std::hash<std::string>{}(backend.ToString());
std::size_t res = std::hash<std::string>{}(backend.ToString());
DEBUG("Hash: {}", res);
return res;
}

std::vector<BackendCHTraits::HashType>
Expand All @@ -398,6 +403,7 @@ ConsistentHashSelector::ConsistentHashSelector(std::size_t spawn_replicas)

void ConsistentHashSelector::Configure(const YAML::Node &balancing_node)
{
INFO("Configuring ConsistentHashSelector");
if (!balancing_node["endpoints"].IsDefined()) {
STACKTRACE("Consistent hash endpoints node is missed");
}
Expand Down Expand Up @@ -429,13 +435,15 @@ Backend ConsistentHashSelector::SelectBackend(const boost::asio::ip::tcp::endpoi
{
boost::mutex::scoped_lock lock(mutex_);
Backend result = ring_.SelectNode(client);
DEBUG("Selected: {}", result);
return result;
}

void ConsistentHashSelector::ExcludeBackend(const Backend& backend)
{
boost::mutex::scoped_lock lock(mutex_);
ring_.EraseNode(backend);
DEBUG("Excluded: {}", backend);
if (ring_.Empty()) {
EXCEPTION("All backends are excluded!");
}
Expand All @@ -449,6 +457,7 @@ SelectorType ConsistentHashSelector::Type() const
// ============================ LeastConnectionsSelector ============================

void LeastConnectionsSelector::Configure(const YAML::Node& config) {
INFO("Configuring LeastConnectionsSelector");
if (!config["endpoints"].IsDefined()) {
STACKTRACE("Least connections endpoints node is missed");
}
Expand Down Expand Up @@ -546,6 +555,7 @@ void LeastConnectionsSelector::DecreaseConnectionCount(const Backend& backend)

void LeastResponseTimeSelector::Configure(const YAML::Node& config)
{
INFO("Configuring LeastResponseTimeSelector");
if (!config["endpoints"].IsDefined()) {
STACKTRACE("Least response time endpoints node is missed");
}
Expand Down
Loading

0 comments on commit 25bf506

Please sign in to comment.