Skip to content

Commit

Permalink
Support rsm read (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius authored Mar 3, 2024
1 parent 094269c commit d3f1646
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 13 deletions.
1 change: 1 addition & 0 deletions client/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ TVoidTask Client(TPoller& poller, TSocket socket) {

TLine line;
TCommandRequest header;
header.Flags = TCommandRequest::EWrite;
header.Type = static_cast<uint32_t>(TCommandRequest::MessageType);
auto lineReader = TLineReader(input, 2*1024);
auto byteWriter = TByteWriter(socket);
Expand Down
2 changes: 1 addition & 1 deletion examples/kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ struct TResultValue: public TCommandResponse {
char Data[0];
};

TMessageHolder<TMessage> TKv::Read(TMessageHolder<TCommandRequest> message) {
TMessageHolder<TMessage> TKv::Read(TMessageHolder<TCommandRequest> message, uint64_t index) {
auto readKv = message.Cast<TReadKv>();
std::string_view k(readKv->Data, readKv->KeySize);
auto it = H.find(std::string(k));
Expand Down
2 changes: 1 addition & 1 deletion examples/kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class TKv: public IRsm {
public:
TMessageHolder<TMessage> Read(TMessageHolder<TCommandRequest> message) override;
TMessageHolder<TMessage> Read(TMessageHolder<TCommandRequest> message, uint64_t index) override;
void Write(TMessageHolder<TLogEntry> message) override;
TMessageHolder<TLogEntry> Prepare(TMessageHolder<TCommandRequest> message, uint64_t term) override;

Expand Down
7 changes: 6 additions & 1 deletion src/messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,15 @@ static_assert(sizeof(TAppendEntriesResponse) == sizeof(TMessageEx) + 16);

struct TCommandRequest: public TMessage {
static constexpr EMessageType MessageType = EMessageType::COMMAND_REQUEST;
enum EFlags {
ENone = 0,
EWrite = 1,
};
uint32_t Flags = ENone;
char Data[0];
};

static_assert(sizeof(TCommandRequest) == sizeof(TMessage));
static_assert(sizeof(TCommandRequest) == sizeof(TMessage) + 4);

struct TCommandResponse: public TMessage {
static constexpr EMessageType MessageType = EMessageType::COMMAND_RESPONSE;
Expand Down
21 changes: 14 additions & 7 deletions src/raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ static uint32_t rand_(uint32_t* seed) {

} // namespace

TMessageHolder<TMessage> TDummyRsm::Read(TMessageHolder<TCommandRequest> message)
TMessageHolder<TMessage> TDummyRsm::Read(TMessageHolder<TCommandRequest> message, uint64_t index)
{
return {};
return NewHoldedMessage<TCommandResponse>(TCommandResponse {.Index = index});
}

void TDummyRsm::Write(TMessageHolder<TLogEntry> message)
Expand Down Expand Up @@ -253,12 +253,13 @@ void TRaft::OnAppendEntries(TMessageHolder<TAppendEntriesResponse> message) {

void TRaft::OnCommandRequest(TMessageHolder<TCommandRequest> command, const std::shared_ptr<INode>& replyTo) {
auto& log = State->Log;
auto entry = Rsm->Prepare(std::move(command), State->CurrentTerm);
log.emplace_back(std::move(entry));
if (command->Flags & TCommandRequest::EWrite) {
auto entry = Rsm->Prepare(std::move(command), State->CurrentTerm);
log.emplace_back(std::move(entry));
}
auto index = log.size();
if (replyTo) {
auto mes = NewHoldedMessage(TCommandResponse {.Index = index});
waiting.emplace(TWaiting{mes->Index, mes, replyTo});
waiting.emplace(TWaiting{index, std::move(command), replyTo});
}
}

Expand Down Expand Up @@ -378,7 +379,13 @@ void TRaft::ProcessWaiting() {
auto lastApplied = VolatileState->LastApplied;
while (!waiting.empty() && waiting.top().Index <= lastApplied) {
auto w = waiting.top(); waiting.pop();
w.ReplyTo->Send(std::move(w.Message));
TMessageHolder<TMessage> reply;
if (w.Command->Flags & TCommandRequest::EWrite) {
reply = NewHoldedMessage(TCommandResponse {.Index = w.Index});
} else {
reply = Rsm->Read(std::move(w.Command), w.Index);
}
w.ReplyTo->Send(std::move(reply));
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ struct INode {
// CommandRequest -> Read? -> CurrentIndex (fixate) >= CommittedIndex -> CommandResponse
struct IRsm {
virtual ~IRsm() = default;
virtual TMessageHolder<TMessage> Read(TMessageHolder<TCommandRequest> message) = 0;
virtual TMessageHolder<TMessage> Read(TMessageHolder<TCommandRequest> message, uint64_t index) = 0;
virtual void Write(TMessageHolder<TLogEntry> message) = 0;
virtual TMessageHolder<TLogEntry> Prepare(TMessageHolder<TCommandRequest> message, uint64_t term) = 0;
};

struct TDummyRsm: public IRsm {
TMessageHolder<TMessage> Read(TMessageHolder<TCommandRequest> message) override;
TMessageHolder<TMessage> Read(TMessageHolder<TCommandRequest> message, uint64_t index) override;
void Write(TMessageHolder<TLogEntry> message) override;
TMessageHolder<TLogEntry> Prepare(TMessageHolder<TCommandRequest> message, uint64_t term) override;
};
Expand Down Expand Up @@ -158,7 +158,7 @@ class TRaft {

struct TWaiting {
uint64_t Index;
TMessageHolder<TMessage> Message;
TMessageHolder<TCommandRequest> Command;
std::shared_ptr<INode> ReplyTo;
bool operator< (const TWaiting& other) const {
return Index > other.Index;
Expand Down

0 comments on commit d3f1646

Please sign in to comment.