Skip to content

Commit

Permalink
Implement become
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius committed Nov 23, 2023
1 parent aed0404 commit dec8e59
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 11 deletions.
17 changes: 13 additions & 4 deletions src/messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,23 @@ struct TLogEntry: public TMessage {
struct TMessageEx: public TMessage {
uint32_t Src;
uint32_t Dst;
uint64_t Term;
};

struct TRequestVoteRequest: public TMessageEx {
static constexpr EMessageType MessageType = EMessageType::REQUEST_VOTE_REQUEST;
uint64_t Term;
uint64_t LastLogIndex;
uint64_t LastLogTerm;
uint32_t CandidateId;
};

struct TRequestVoteResponse: public TMessageEx {
static constexpr EMessageType MessageType = EMessageType::REQUEST_VOTE_RESPONSE;
uint64_t Term;
uint32_t VoteGranted;
};

struct TAppendEntriesRequest: public TMessageEx {
static constexpr EMessageType MessageType = EMessageType::APPEND_ENTRIES_REQUEST;
uint64_t Term;
uint64_t PrevLogIndex;
uint64_t PrevLogTerm;
uint64_t LeaderCommit;
Expand All @@ -61,7 +59,6 @@ struct TAppendEntriesRequest: public TMessageEx {

struct TAppendEntriesResponse: public TMessageEx {
static constexpr EMessageType MessageType = EMessageType::APPEND_ENTRIES_RESPONSE;
uint64_t Term;
uint64_t MatchIndex;
uint32_t Success;
};
Expand Down Expand Up @@ -109,10 +106,18 @@ struct TMessageHolder {
return Mes;
}

const T* operator->() const {
return Mes;
}

operator bool() {
return !!Mes;
}

bool IsEx() const {
return (2 <= Mes->Type && Mes->Type <= 5);
}

template<typename U>
requires std::derived_from<U, T>
TMessageHolder<U> Cast() {
Expand Down Expand Up @@ -166,3 +171,7 @@ TMessageHolder<T> NewHoldedMessage(uint32_t type, uint32_t len)
T* mes = NewMessage<T>(type, len);
return TMessageHolder<T>(mes, std::shared_ptr<char[]>(reinterpret_cast<char*>(mes)));
}

inline TMessageHolder<TTimeout> NewTimeout() {
return NewHoldedMessage<TTimeout>(static_cast<uint32_t>(EMessageType::TIMEOUT), sizeof(TTimeout));
}
49 changes: 47 additions & 2 deletions src/raft.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#include <memory>
#include <stdexcept>

#include "raft.h"
#include "messages.h"

Expand All @@ -9,13 +12,55 @@ TRaft::TRaft(int node, const TNodeDict& nodes, const std::shared_ptr<ITimeSource
, Npeers(nodes.size())
, Nservers(nodes.size()+1)
, StateName(EState::FOLLOWER)
, LastTime(TimeSource->now())
, LastTime(TimeSource->Now())
{ }

std::unique_ptr<TResult> TRaft::Follower(uint64_t now, const TMessageHolder<TMessage>& message) {
std::unique_ptr<TResult> TRaft::Follower(uint64_t now, TMessageHolder<TMessage> message) {
return nullptr;
}

std::unique_ptr<TResult> TRaft::Candidate(uint64_t now, TMessageHolder<TMessage> message) {
return nullptr;
}

std::unique_ptr<TResult> TRaft::Leader(uint64_t now, TMessageHolder<TMessage> message) {
return nullptr;
}

void TRaft::Become(EState newStateName) {
if (StateName != newStateName) {
StateName = newStateName;
Process(NewTimeout());
}
}

void TRaft::Process(TMessageHolder<TMessage> message, INode* replyTo) {
auto now = TimeSource->Now();

if (message.IsEx()) {
auto messageEx = message.Cast<TMessageEx>();
State->CurrentTerm = messageEx->Term;
State->VotedFor = 0;
StateName = EState::FOLLOWER;
}
std::unique_ptr<TResult> result;
switch (StateName) {
case EState::FOLLOWER:
result = Follower(now, std::move(message));
break;
case EState::CANDIDATE:
Candidate(now, std::move(message));
break;
case EState::LEADER:
Leader(now, std::move(message));
break;
default:
throw std::logic_error("Unknown state");
}

ApplyResult(now, std::move(result), replyTo);
}

void TRaft::ApplyResult(uint64_t now, std::unique_ptr<TResult> result, INode* replyTo) {
if (!result) {
return;
Expand Down
8 changes: 6 additions & 2 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,19 @@ class TRaft {
public:
TRaft(int node, const TNodeDict& nodes, const std::shared_ptr<ITimeSource>& ts);

void Process(const TMessageHolder<TMessage>& message, INode* replyTo = nullptr);
void Process(TMessageHolder<TMessage> message, INode* replyTo = nullptr);
void ApplyResult(uint64_t now, std::unique_ptr<TResult> result, INode* replyTo = nullptr);

EState CurrentStateName() const {
return StateName;
}

void Become(EState newStateName);

private:
std::unique_ptr<TResult> Follower(uint64_t now, const TMessageHolder<TMessage>& message);
std::unique_ptr<TResult> Follower(uint64_t now, TMessageHolder<TMessage> message);
std::unique_ptr<TResult> Candidate(uint64_t now, TMessageHolder<TMessage> message);
std::unique_ptr<TResult> Leader(uint64_t now, TMessageHolder<TMessage> message);

int Id;
TNodeDict Nodes;
Expand Down
4 changes: 2 additions & 2 deletions src/timesource.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

struct ITimeSource {
virtual ~ITimeSource() = default;
virtual uint64_t now() = 0;
virtual uint64_t Now() = 0;
};

class TTimeSource: public ITimeSource {
public:
uint64_t now();
uint64_t Now();
};
10 changes: 9 additions & 1 deletion test/test_raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TFakeNode: public INode {

class TFakeTimeSource: public ITimeSource {
public:
uint64_t now() override {
uint64_t Now() override {
return 0;
}
};
Expand Down Expand Up @@ -135,13 +135,21 @@ void test_initial(void**) {
assert_true(raft->CurrentStateName() == EState::FOLLOWER);
}

void test_become(void**) {
auto raft = MakeRaft();
assert_true(raft->CurrentStateName() == EState::FOLLOWER);
raft->Become(EState::CANDIDATE);
assert_true(raft->CurrentStateName() == EState::CANDIDATE);
}

int main() {
const struct CMUnitTest tests[] = {
cmocka_unit_test(test_empty),
cmocka_unit_test(test_message_create),
cmocka_unit_test(test_message_cast),
cmocka_unit_test(test_message_send_recv),
cmocka_unit_test(test_initial),
cmocka_unit_test(test_become),
};
return cmocka_run_group_tests(tests, NULL, NULL);
}

0 comments on commit dec8e59

Please sign in to comment.