Skip to content

Commit

Permalink
Unify node and client node
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius committed Nov 28, 2023
1 parent 8f473df commit 9315f24
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 32 deletions.
2 changes: 1 addition & 1 deletion server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int main(int argc, char** argv) {
} else {
nodes[host.Id] = std::make_shared<TNode>(
loop.Poller(),
host.Id,
std::to_string(host.Id),
NNet::TAddress{host.Address, host.Port},
timeSource);
}
Expand Down
37 changes: 10 additions & 27 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,6 @@
#include "server.h"
#include "messages.h"

namespace {
class TClientNode: public INode {
public:
void Send(const TMessageHolder<TMessage>& mes) override {
Messages.push_back(mes);
}

void Drain() override { }

std::vector<TMessageHolder<TMessage>> Messages;
};

};

TPromise<void>::TTask TWriter::Write(TMessageHolder<TMessage> message) {
auto payload = std::move(message.Payload);
char* p = (char*)message.Mes; // TODO: const char
Expand Down Expand Up @@ -105,33 +91,34 @@ NNet::TTestTask TNode::DoDrain() {
}
} catch (const std::exception& ex) {
std::cout << "Error on write: " << ex.what() << "\n";
Connect();
}
Messages.clear();
co_return;
}

void TNode::Connect() {
if (!Connector || Connector.done()) {
if (Address && (!Connector || Connector.done())) {
if (Connector && Connector.done()) {
Connector.destroy();
}

Socket = NNet::TSocket(Address, Poller);
Socket = NNet::TSocket(*Address, Poller);
Connected = false;
Connector = DoConnect();
}
}

NNet::TTestTask TNode::DoConnect() {
std::cout << "Connecting " << Id << "\n";
std::cout << "Connecting " << Name << "\n";
while (!Connected) {
try {
auto deadline = NNet::TClock::now() + std::chrono::milliseconds(100); // TODO: broken timeout in coroio
co_await Socket.Connect(deadline);
std::cout << "Connected " << Id << "\n";
std::cout << "Connected " << Name << "\n";
Connected = true;
} catch (const std::exception& ex) {
std::cout << "Error on connect: " << Id << " " << ex.what() << "\n";
std::cout << "Error on connect: " << Name << " " << ex.what() << "\n";
}
if (!Connected) {
co_await Poller.Sleep(std::chrono::milliseconds(1000));
Expand All @@ -142,18 +129,14 @@ NNet::TTestTask TNode::DoConnect() {

NNet::TSimpleTask TRaftServer::InboundConnection(NNet::TSocket socket) {
try {
auto client = std::make_shared<TClientNode>();
auto client = std::make_shared<TNode>(
Poller, "client", std::move(socket), TimeSource
);
Nodes.insert(client);
while (true) {
auto mes = co_await TReader(socket).Read();
auto mes = co_await TReader(client->Sock()).Read();
std::cout << "Got message " << mes->Type << "\n";
Raft->Process(std::move(mes), client);
if (!client->Messages.empty()) {
auto tosend = std::move(client->Messages); client->Messages.clear();
for (auto&& mes : tosend) {
co_await TWriter(socket).Write(std::move(mes));
}
}
DrainNodes();
}
} catch (const std::exception & ex) {
Expand Down
19 changes: 15 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,26 @@ struct THost {

class TNode: public INode {
public:
TNode(NNet::TPoll& poller, uint32_t id, NNet::TAddress address, const std::shared_ptr<ITimeSource>& ts)
TNode(NNet::TPoll& poller, const std::string& name, NNet::TAddress address, const std::shared_ptr<ITimeSource>& ts)
: Poller(poller)
, Id(id)
, Name(name)
, Address(address)
, TimeSource(ts)
{ }

TNode(NNet::TPoll& poller, const std::string& name, NNet::TSocket socket, const std::shared_ptr<ITimeSource>& ts)
: Poller(poller)
, Name(name)
, Socket(std::move(socket))
, Connected(true)
, TimeSource(ts)
{ }

void Send(const TMessageHolder<TMessage>& message) override;
void Drain() override;
NNet::TSocket& Sock() {
return Socket;
}

private:
void Connect();
Expand All @@ -172,8 +183,8 @@ class TNode: public INode {
NNet::TTestTask DoConnect();

NNet::TPoll& Poller;
uint32_t Id;
NNet::TAddress Address;
std::string Name;
std::optional<NNet::TAddress> Address;
std::shared_ptr<ITimeSource> TimeSource;
NNet::TPoll::TSocket Socket;
bool Connected = false;
Expand Down

0 comments on commit 9315f24

Please sign in to comment.