diff --git a/src/server.cpp b/src/server.cpp index 4f00ffa..caf54cc 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -55,6 +55,31 @@ TPromise>::TTask TReader::Read() { co_return mes; } +void TNode::Send(const TMessageHolder& message) { + Messages.emplace_back(message); +} + +void TNode::Drain() { + if (!Drainer || Drainer.done()) { + if (Drainer && Drainer.done()) { + Drainer.destroy(); + } + Drainer = DoDrain(); + } +} + +NNet::TTestTask TNode::DoDrain() { + if (!Connected) { + co_return; + } + auto tosend = std::move(Messages); + for (auto&& m : tosend) { + co_await TWriter(Socket).Write(std::move(m)); + } + Messages.clear(); + co_return; +} + NNet::TSimpleTask TRaftServer::InboundConnection(NNet::TSocket socket) { try { while (true) { diff --git a/src/server.h b/src/server.h index da32bb0..7c0ba66 100644 --- a/src/server.h +++ b/src/server.h @@ -5,6 +5,7 @@ #include +#include "poll.hpp" #include "timesource.h" #include "messages.h" #include "raft.h" @@ -118,9 +119,15 @@ class TNode: public INode { void Drain() override; private: + NNet::TTestTask DoDrain(); + NNet::TPoll& Poller; uint32_t Id; NNet::TAddress Address; + NNet::TPoll::TSocket Socket; + bool Connected = false; + + std::coroutine_handle<> Drainer; std::vector> Messages; };