2
2
#include < lb/tcp/connector.hpp>
3
3
#include < lb/tcp/session.hpp>
4
4
5
+ using SocketType = lb::tcp::HttpSession::SocketType;
6
+
5
7
namespace lb ::tcp {
6
8
7
9
Connector::Connector (boost::asio::io_context& ctx, SelectorPtr selector)
@@ -10,38 +12,100 @@ Connector::Connector(boost::asio::io_context& ctx, SelectorPtr selector)
10
12
, selector(selector)
11
13
{}
12
14
13
- SessionPtr MakeSession (SelectorPtr& selector, boost::asio::ip::tcp::socket client_socket, boost::asio::ip::tcp::socket server_socket, Backend backend)
15
+
16
+ class LeastConnectionsCallbacks : public StateNotifier
17
+ {
18
+ public:
19
+ LeastConnectionsCallbacks (Backend backend, SelectorPtr selector)
20
+ : StateNotifier()
21
+ , backend(std::move(backend))
22
+ , selector(std::dynamic_pointer_cast<LeastConnectionsSelector>(selector))
23
+ {}
24
+
25
+ void OnConnect () override
26
+ {
27
+ selector->IncreaseConnectionCount (backend);
28
+ }
29
+
30
+ void OnDisconnect () override
31
+ {
32
+ selector->DecreaseConnectionCount (backend);
33
+ }
34
+
35
+ using StateNotifier::StateNotifier;
36
+ private:
37
+ Backend backend;
38
+ std::shared_ptr<LeastConnectionsSelector> selector;
39
+ };
40
+
41
+
42
+ class LeastResponseTimeCallbacks : public StateNotifier
14
43
{
44
+ public:
45
+ using TimeType = decltype (std::chrono::high_resolution_clock::now());
46
+ public:
47
+ LeastResponseTimeCallbacks (Backend backend, SelectorPtr selector)
48
+ : StateNotifier()
49
+ , backend(std::move(backend))
50
+ , selector(std::dynamic_pointer_cast<LeastResponseTimeSelector>(selector))
51
+ {}
52
+
53
+ void OnRequestSent () override
54
+ {
55
+ response_begin = std::chrono::high_resolution_clock::now ();
56
+ }
15
57
16
- switch (selector->Type ()) {
17
- case SelectorType::LEAST_CONNECTIONS: {
18
- std::shared_ptr<LeastConnectionsSelector> lc_selector = std::dynamic_pointer_cast<LeastConnectionsSelector>(selector);
19
- return std::make_shared<LeastConnectionsHttpSession>(std::move (client_socket), std::move (server_socket), lc_selector, backend);
20
- } break ;
21
- case SelectorType::LEAST_RESPONSE_TIME: {
22
- std::shared_ptr<LeastResponseTimeSelector> lrt_selector = std::dynamic_pointer_cast<LeastResponseTimeSelector>(selector);
23
- return std::make_shared<LeastResponseTimeHttpSession>(std::move (client_socket), std::move (server_socket), lrt_selector, backend);
24
- } break ;
25
- default : {
26
- return std::make_shared<HttpSession>(std::move (client_socket), std::move (server_socket));
27
- }
58
+ void OnResponseReceive () override
59
+ {
60
+ response_end = std::chrono::high_resolution_clock::now ();
61
+ std::chrono::duration<long , std::nano> duration = response_end - response_begin;
62
+ selector->AddResponseTime (backend, duration.count ());
63
+ }
64
+
65
+ using StateNotifier::StateNotifier;
66
+ private:
67
+ Backend backend;
68
+ std::shared_ptr<LeastResponseTimeSelector> selector;
69
+ TimeType response_begin;
70
+ TimeType response_end;
71
+ };
72
+
73
+ SessionPtr MakeSession (SelectorPtr& selector,
74
+ SocketType client_socket,
75
+ SocketType server_socket,
76
+ Backend backend)
77
+ {
78
+ switch (selector->Type ()) {
79
+ case SelectorType::LEAST_CONNECTIONS:
80
+ {
81
+ return std::make_shared<HttpSession>(std::move (client_socket), std::move (server_socket),
82
+ std::make_unique<LeastConnectionsCallbacks>(std::move (backend), selector));
83
+ } break ;
84
+
85
+ case SelectorType::LEAST_RESPONSE_TIME:
86
+ {
87
+ return std::make_shared<HttpSession>(std::move (client_socket), std::move (server_socket),
88
+ std::make_unique<LeastResponseTimeCallbacks>(std::move (backend), selector));
89
+ } break ;
90
+ default :
91
+ return std::make_shared<HttpSession>(std::move (client_socket), std::move (server_socket));
28
92
}
29
93
}
30
94
31
95
32
- void Connector::MakeAndRunSession (boost::asio::ip::tcp::socket client_socket)
96
+ void Connector::MakeAndRunSession (SocketType client_socket)
33
97
{
34
- // TODO: selection of backend
35
98
DEBUG (" In connector" );
36
99
Backend backend = selector->SelectBackend (client_socket.remote_endpoint ());
37
100
38
101
if (backend.IsIpEndpoint ()) {
39
102
DEBUG (" Is ip endpoint" );
40
- auto server_socket = std::make_shared<boost::asio::ip::tcp::socket >(client_socket.get_executor ());
103
+ auto server_socket = std::make_shared<SocketType >(client_socket.get_executor ());
41
104
42
105
server_socket->async_connect (
43
106
backend.AsEndpoint (),
44
- [this , server_socket, client_socket=std::move (client_socket), backend=std::move (backend)] (const boost::system::error_code& error) mutable
107
+ [this , server_socket, client_socket=std::move (client_socket), backend=std::move (backend)]
108
+ (const boost::system::error_code& error) mutable
45
109
{
46
110
if (error) {
47
111
ERROR (" {}" , error.message ());
@@ -56,7 +120,7 @@ void Connector::MakeAndRunSession(boost::asio::ip::tcp::socket client_socket)
56
120
});
57
121
} else if (backend.IsUrl ()) {
58
122
DEBUG (" Is url" );
59
- auto server_socket = std::make_shared<boost::asio::ip::tcp::socket >(client_socket.get_executor ());
123
+ auto server_socket = std::make_shared<SocketType >(client_socket.get_executor ());
60
124
61
125
const auto & url = backend.AsUrl ();
62
126
DEBUG (" URL: hostname: {}, port: {}" , url.Hostname (), url.Port ());
0 commit comments