diff --git a/server/server.cpp b/server/server.cpp index cdf014f..3a485ad 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -31,7 +31,7 @@ int main(int argc, char** argv) { } else { nodes[host.Id] = std::make_shared( loop.Poller(), - host.Id, + std::to_string(host.Id), NNet::TAddress{host.Address, host.Port}, timeSource); } diff --git a/src/server.cpp b/src/server.cpp index 9130e35..bb8638d 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -7,20 +7,6 @@ #include "server.h" #include "messages.h" -namespace { -class TClientNode: public INode { -public: - void Send(const TMessageHolder& mes) override { - Messages.push_back(mes); - } - - void Drain() override { } - - std::vector> Messages; -}; - -}; - TPromise::TTask TWriter::Write(TMessageHolder message) { auto payload = std::move(message.Payload); char* p = (char*)message.Mes; // TODO: const char @@ -105,33 +91,34 @@ NNet::TTestTask TNode::DoDrain() { } } catch (const std::exception& ex) { std::cout << "Error on write: " << ex.what() << "\n"; + Connect(); } Messages.clear(); co_return; } void TNode::Connect() { - if (!Connector || Connector.done()) { + if (Address && (!Connector || Connector.done())) { if (Connector && Connector.done()) { Connector.destroy(); } - Socket = NNet::TSocket(Address, Poller); + Socket = NNet::TSocket(*Address, Poller); Connected = false; Connector = DoConnect(); } } NNet::TTestTask TNode::DoConnect() { - std::cout << "Connecting " << Id << "\n"; + std::cout << "Connecting " << Name << "\n"; while (!Connected) { try { auto deadline = NNet::TClock::now() + std::chrono::milliseconds(100); // TODO: broken timeout in coroio co_await Socket.Connect(deadline); - std::cout << "Connected " << Id << "\n"; + std::cout << "Connected " << Name << "\n"; Connected = true; } catch (const std::exception& ex) { - std::cout << "Error on connect: " << Id << " " << ex.what() << "\n"; + std::cout << "Error on connect: " << Name << " " << ex.what() << "\n"; } if (!Connected) { co_await Poller.Sleep(std::chrono::milliseconds(1000)); @@ -142,18 +129,14 @@ NNet::TTestTask TNode::DoConnect() { NNet::TSimpleTask TRaftServer::InboundConnection(NNet::TSocket socket) { try { - auto client = std::make_shared(); + auto client = std::make_shared( + Poller, "client", std::move(socket), TimeSource + ); Nodes.insert(client); while (true) { - auto mes = co_await TReader(socket).Read(); + auto mes = co_await TReader(client->Sock()).Read(); std::cout << "Got message " << mes->Type << "\n"; Raft->Process(std::move(mes), client); - if (!client->Messages.empty()) { - auto tosend = std::move(client->Messages); client->Messages.clear(); - for (auto&& mes : tosend) { - co_await TWriter(socket).Write(std::move(mes)); - } - } DrainNodes(); } } catch (const std::exception & ex) { diff --git a/src/server.h b/src/server.h index 1794b8e..401f79f 100644 --- a/src/server.h +++ b/src/server.h @@ -155,15 +155,26 @@ struct THost { class TNode: public INode { public: - TNode(NNet::TPoll& poller, uint32_t id, NNet::TAddress address, const std::shared_ptr& ts) + TNode(NNet::TPoll& poller, const std::string& name, NNet::TAddress address, const std::shared_ptr& ts) : Poller(poller) - , Id(id) + , Name(name) , Address(address) , TimeSource(ts) { } + TNode(NNet::TPoll& poller, const std::string& name, NNet::TSocket socket, const std::shared_ptr& ts) + : Poller(poller) + , Name(name) + , Socket(std::move(socket)) + , Connected(true) + , TimeSource(ts) + { } + void Send(const TMessageHolder& message) override; void Drain() override; + NNet::TSocket& Sock() { + return Socket; + } private: void Connect(); @@ -172,8 +183,8 @@ class TNode: public INode { NNet::TTestTask DoConnect(); NNet::TPoll& Poller; - uint32_t Id; - NNet::TAddress Address; + std::string Name; + std::optional Address; std::shared_ptr TimeSource; NNet::TPoll::TSocket Socket; bool Connected = false;