Skip to content

Commit

Permalink
Fix: fixed some bugs, added functional and load-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bersen66 committed May 9, 2024
1 parent 25bf506 commit ff64c6c
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 17 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@

build
CMakeUserPresets.json
.vscode
.vscode
tests/functional/__pycache__
Testing
22 changes: 11 additions & 11 deletions configs/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,7 @@ load_balancing:


# Example config of least connections
algorithm: least_connections
endpoints:
- ip: "127.0.0.1"
port: 8081
- ip: "127.0.0.2"
port: 8082
- ip: "127.0.0.3"
port: 8083

# Example config of least response time
# algorithm: least_response_time
# algorithm: least_connections
# endpoints:
# - ip: "127.0.0.1"
# port: 8081
Expand All @@ -45,6 +35,16 @@ load_balancing:
# - ip: "127.0.0.3"
# port: 8083

# Example config of least response time
algorithm: least_response_time
endpoints:
- ip: "127.0.0.1"
port: 8081
- ip: "127.0.0.2"
port: 8082
- ip: "127.0.0.3"
port: 8083

# Example config of round robin
# algorithm: round_robin
# endpoints:
Expand Down
2 changes: 1 addition & 1 deletion lbbuild.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ function BuildApp() {
conan install . --output-folder=build --build=missing --settings=build_type=$BUILD_TYPE
cd build
cmake .. -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DCMAKE_TOOLCHAIN_FILE=conan_toolchain.cmake
cmake --build . -j 4
cmake --build . -j 3
}

if ! command -v conan &> /dev/null
Expand Down
5 changes: 3 additions & 2 deletions src/lb/tcp/selectors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,9 @@ SelectorType IpHashSelector::Type() const

BackendCHTraits::HashType BackendCHTraits::GetHash(const Backend& backend)
{
std::size_t res = std::hash<std::string>{}(backend.ToString());
DEBUG("Hash: {}", res);

static std::hash<std::string> hash{};
std::size_t res = hash(backend.ToString());
return res;
}

Expand Down
20 changes: 20 additions & 0 deletions tests/functional/locustfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# import aiohttp
# import asyncio


# async def main():
# while True:
# async with aiohttp.ClientSession() as session:
# response = await session.get("http://127.0.0.1:8081")
# content = await response.text()
# print(content)


# if __name__ == "__main__":
# asyncio.run(main())
from locust import HttpUser, task

class LoadTest(HttpUser):
@task
def index(self):
self.client.get("/")
74 changes: 74 additions & 0 deletions tests/functional/rps_counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@

import sys
import asyncio
import aiohttp
from aiohttp import web
import random
import os

choices = {
1: "Rock",
2: "Paper",
3: "Scissors",
4: "Pen",
5: "Fire",
6: "Water",
7: "Lemonade"
}

def get_random_item():
choice = random.randint(1, len(choices))
return choices.get(choice)

async def sleep_random_interval():
processing_time = random.uniform(0, 1)
await asyncio.sleep(processing_time)

class RpsCounterMockServer:
def __init__(self, port = 8080, ip = '0.0.0.0'):
self.rps = 0
self.ip = ip
self.port = port
self.event_loop = asyncio.get_event_loop()
self.web_app = aiohttp.web.Application()

async def print_rps(self):
while True:
await asyncio.sleep(1)
os.system('clear')
print(f"ip={self.ip}:{self.port}\nrps: {self.rps}")
self.rps = 0

async def handle_get(self, request):
self.rps += 1
random_item = get_random_item()
await sleep_random_interval()
return aiohttp.web.Response(text=random_item)

async def start_server(self):
runner = aiohttp.web.AppRunner(self.web_app)
self.web_app.router.add_get('/', self.handle_get)
await runner.setup()
site = aiohttp.web.TCPSite(runner, self.ip, self.port)
await site.start()

def run(self):
self.event_loop.create_task(self.start_server())
self.event_loop.create_task(self.print_rps())
self.event_loop.run_forever()

def main():
if len(sys.argv) < 2:
print("Missed tcp-port number")
sys.exit(1)

port = int(sys.argv[1])
server = RpsCounterMockServer(port=port)
try:
server.run()
except KeyboardInterrupt:
print('Terminated by SIGINT')


if __name__ == "__main__":
main()
169 changes: 169 additions & 0 deletions tests/learn_asio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,4 +305,173 @@ TEST(ConnServer, run)
ConnectionServer::Serve(ioc, acceptor, ep);
ioc.run();
}
#include <boost/beast.hpp>
namespace ReverseProxy {
class HttpSession : public std::enable_shared_from_this<HttpSession> {
public:
using SocketType = boost::asio::ip::tcp::socket;
using EndpointType = boost::asio::ip::tcp::endpoint;
using TcpStream = boost::beast::tcp_stream;
using BufferType = boost::beast::flat_buffer;
using RequestType = boost::beast::http::request<boost::beast::http::string_body>;
using ResponseType = boost::beast::http::response<boost::beast::http::string_body>;
public:
HttpSession(SocketType client, SocketType server)
: client_stream_(std::move(client)),
server_stream_(std::move(server))
{}
void Run() {
ClientRead();
}
private:
void ClientRead() {
namespace http = boost::beast::http;
client_buffer_.clear();
server_buffer_.clear();
client_request_.clear();
server_response_.clear();
http::async_read(
client_stream_,
client_buffer_,
client_request_,
[self=shared_from_this()](boost::system::error_code ec, std::size_t length){
self->HandleClientRead(ec, length);
}
);
}
void HandleClientRead(boost::system::error_code ec, std::size_t length) {
if (ec) {
std::cout << ec.message() << std::endl;
return;
}
std::cout << "CLIENT MESSAGE: " << client_request_.target() << std::endl;
ServerWrite();
}
void ServerWrite() {
namespace http = boost::beast::http;
http::async_write(
server_stream_,
client_request_,
[self=shared_from_this()](boost::system::error_code ec, std::size_t length){
self->HandleServerWrite(ec, length);
}
);
}
void HandleServerWrite(boost::system::error_code ec, std::size_t length) {
if (ec) {
std::cout << ec.message() << std::endl;
return;
}
ServerRead();
}
void ServerRead() {
namespace http = boost::beast::http;
http::async_read(
server_stream_,
server_buffer_,
server_response_,
[self=shared_from_this()](boost::system::error_code ec, std::size_t length){
self->HandleServerRead(ec, length);
}
);
}
void HandleServerRead(boost::system::error_code ec, std::size_t length) {
if (ec) {
std::cout << ec.message() << std::endl;
return;
}
std::cout << "SERVER MESSAGE: " << server_response_ << std::endl;
ClientWrite();
}
void ClientWrite() {
namespace http = boost::beast::http;
http::async_write(
client_stream_,
server_response_,
[self=shared_from_this()](boost::system::error_code ec, std::size_t length){
self->HandleClientWrite(ec, length);
}
);
}
void HandleClientWrite(boost::system::error_code ec, std::size_t length) {
if (ec) {
std::cout << ec.message() << std::endl;
return;
}
ClientRead();
}
private:
TcpStream client_stream_;
TcpStream server_stream_;
BufferType client_buffer_;
BufferType server_buffer_;
RequestType client_request_;
ResponseType server_response_;
};
void HandleAccept(boost::asio::io_context& ioc,
std::shared_ptr<boost::asio::ip::tcp::socket> client_socket,
boost::asio::ip::tcp::endpoint server_endpoint)
{
auto server_socket = std::make_shared<boost::asio::ip::tcp::socket>(ioc);
server_socket->async_connect(server_endpoint, [server_socket, client_socket](const boost::system::error_code& error){
if (error) {
std::cout << __LINE__ << " Error: " << error.message() << std::endl;
return;
}
auto connection = std::make_shared<HttpSession>(std::move(*client_socket), std::move(*server_socket));
connection->Run();
});
}
void Serve(boost::asio::io_context& executor,
boost::asio::ip::tcp::acceptor& acceptor,
const boost::asio::ip::tcp::endpoint& endpoint)
{
acceptor.async_accept(
[&] (boost::system::error_code ec, boost::asio::ip::tcp::socket socket) {
if (ec) {
std::cout << ec.message() << std::endl;
return;
}
std::cout << __LINE__ << " Accepted" << std::endl;
Serve(executor, acceptor, endpoint);
auto client_socket = std::make_shared<boost::asio::ip::tcp::socket>(std::move(socket));
HandleAccept(executor, client_socket, endpoint);
}
);
}
} // namespace ReverseProxy
TEST(HttpProxy, run) {
std::cerr << "START\n";
boost::asio::io_context ioc;
boost::asio::ip::tcp::acceptor acceptor(ioc, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 9090));
boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address::from_string("127.0.0.1"), 8081);
ReverseProxy::Serve(ioc, acceptor, ep);
ioc.run();
}
#endif
27 changes: 27 additions & 0 deletions tests/test_selectors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,31 @@ R"(algorithm: least_connections

ASSERT_EQ(selector.SelectBackend(notused), b3);
ASSERT_EQ(selector.SelectBackend(notused), b3);
}


TEST(ConsistentHash, basicUsage)
{
YAML::Node selector_config = YAML::Load(
R"(algorithm: consistent_hash
endpoints:
- ip: 127.0.0.1
port: 8081
- ip: 127.0.0.2
port: 8082
- ip: 127.0.0.3
port: 8083
)");

lb::tcp::ConsistentHashSelector selector;
selector.Configure(selector_config);
boost::asio::ip::tcp::endpoint notused(boost::asio::ip::address::from_string("127.0.0.1"), 8080);
auto b1 = lb::tcp::Backend("127.0.0.1", 8081);
auto b2 = lb::tcp::Backend("127.0.0.2", 8082);
auto b3 = lb::tcp::Backend("127.0.0.3", 8083);

selector.ExcludeBackend(b3);
selector.ExcludeBackend(b2);
ASSERT_THROW(selector.ExcludeBackend(b1), std::runtime_error); // No backend left

}
Loading

0 comments on commit ff64c6c

Please sign in to comment.