From 7e07f638bc3fcb71d85dc16dda886c8b6bfe8180 Mon Sep 17 00:00:00 2001 From: Alexey Ozeritskiy Date: Sun, 26 Nov 2023 12:35:11 +0300 Subject: [PATCH] Add client --- CMakeLists.txt | 2 ++ server/server.cpp | 24 ------------------------ src/messages.h | 7 +++++++ src/server.cpp | 24 +++++++++++++++++++++++- src/server.h | 26 ++++++++++++++++++++++++++ 5 files changed, 58 insertions(+), 25 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index aac5e79..f50dfad 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,10 +20,12 @@ add_library(miniraft add_executable(test_raft test/test_raft.cpp) add_executable(test_read_write test/test_read_write.cpp) add_executable(server server/server.cpp) +add_executable(client client/client.cpp) include_directories(${CMAKE_SOURCE_DIR}/src ${CMAKE_SOURCE_DIR}/coroio/src) target_link_libraries(server miniraft) +target_link_libraries(client miniraft) target_include_directories(test_raft PRIVATE ${CMOCKA_INCLUDE_DIRS}) target_link_directories(test_raft PRIVATE ${CMOCKA_LIBRARY_DIRS}) diff --git a/server/server.cpp b/server/server.cpp index d1ac556..cdf014f 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -1,34 +1,10 @@ #include "socket.hpp" -#include #include #include #include #include #include -struct THost { - std::string Address; - int Port = 0; - uint32_t Id = 0; - - THost() { } - - THost(const std::string& str) { - std::string_view s(str); - auto p = s.find(':'); - Address = s.substr(0, p); - s = s.substr(p + 1); - p = s.find(':'); - std::from_chars(s.begin(), s.begin()+p, Port); - s = s.substr(p + 1); - std::from_chars(s.begin(), s.begin()+p, Id); - - std::cout << "Addr: '" << Address << "'\n"; - std::cout << "Port: " << Port << "\n"; - std::cout << "Id: " << Id << "\n"; - } -}; - int main(int argc, char** argv) { signal(SIGPIPE, SIG_IGN); std::vector hosts; diff --git a/src/messages.h b/src/messages.h index 1e24c49..c638a52 100644 --- a/src/messages.h +++ b/src/messages.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -193,6 +194,12 @@ TMessageHolder NewHoldedMessage() { return NewHoldedMessage(static_cast(T::MessageType), sizeof(T)); } +template +TMessageHolder NewHoldedMessage(uint32_t size) { + assert(size >= sizeof(T)); + return NewHoldedMessage(static_cast(T::MessageType), size); +} + inline TMessageHolder NewTimeout() { return NewHoldedMessage(static_cast(EMessageType::TIMEOUT), sizeof(TTimeout)); } diff --git a/src/server.cpp b/src/server.cpp index b9f1ea8..e39de77 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3,9 +3,24 @@ #include #include #include +#include #include "server.h" #include "messages.h" +namespace { +class TClientNode: public INode { +public: + void Send(const TMessageHolder& mes) override { + Messages.push_back(mes); + } + + void Drain() override { } + + std::vector> Messages; +}; + +}; + TPromise::TTask TWriter::Write(TMessageHolder message) { auto payload = std::move(message.Payload); char* p = (char*)message.Mes; // TODO: const char @@ -128,10 +143,17 @@ NNet::TTestTask TNode::DoConnect() { NNet::TSimpleTask TRaftServer::InboundConnection(NNet::TSocket socket) { try { + TClientNode client; while (true) { auto mes = co_await TReader(socket).Read(); std::cout << "Got message " << mes->Type << "\n"; - Raft->Process(std::move(mes)); + Raft->Process(std::move(mes), &client); + if (!client.Messages.empty()) { + auto tosend = std::move(client.Messages); client.Messages.clear(); + for (auto&& mes : tosend) { + co_await TWriter(socket).Write(std::move(mes)); + } + } DrainNodes(); } } catch (const std::exception & ex) { diff --git a/src/server.h b/src/server.h index d3cc003..aa1698e 100644 --- a/src/server.h +++ b/src/server.h @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -126,6 +127,31 @@ class TWriter { NNet::TPoll::TSocket& Socket; }; +struct THost { + std::string Address; + int Port = 0; + uint32_t Id = 0; + + THost() { } + + THost(const std::string& str) { + std::string_view s(str); + auto p = s.find(':'); + Address = s.substr(0, p); + s = s.substr(p + 1); + p = s.find(':'); + std::from_chars(s.begin(), s.begin()+p, Port); + s = s.substr(p + 1); + std::from_chars(s.begin(), s.begin()+p, Id); + } + + void DebugPrint() const { + std::cout << "Addr: '" << Address << "'\n"; + std::cout << "Port: " << Port << "\n"; + std::cout << "Id: " << Id << "\n"; + } +}; + class TNode: public INode { public: TNode(NNet::TPoll& poller, uint32_t id, NNet::TAddress address, const std::shared_ptr& ts)