Skip to content

Commit

Permalink
Refactor: update gRPC example to adapt to LeaderId changes
Browse files Browse the repository at this point in the history
- Update the `raft-kv-memstore-grpc` example to use the protobuf-defined `LeaderId`.
- Automatically implement `CommittedLeaderId` for all types.
- Add `openraft::vote::LeaderIdCompare` to provide comparison functions for both single-leader-per-term and multi-leader-per-term implementations.
  • Loading branch information
drmingdrmer committed Dec 31, 2024
1 parent 6908a3c commit 7b51cf9
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 75 deletions.
7 changes: 7 additions & 0 deletions examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"proto/management_service.proto",
"proto/api_service.proto",
];

// TODO: remove serde

tonic_build::configure()
.type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.type_attribute(
Expand All @@ -17,6 +20,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"openraftpb.Response",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"openraftpb.LeaderId",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.compile_protos_with_config(config, &proto_files, &["proto"])?;
Ok(())
}
4 changes: 2 additions & 2 deletions examples/raft-kv-memstore-grpc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ message Vote {

// LogId represents the log identifier in Raft
message LogId {
uint64 index = 1;
LeaderId leader_id = 2;
uint64 term = 1;
uint64 index = 2;
}

// VoteRequest represents a request for votes during leader election
Expand Down
77 changes: 31 additions & 46 deletions examples/raft-kv-memstore-grpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#![allow(clippy::uninlined_format_args)]

use crate::protobuf::Node;
use crate::protobuf::Response;
use crate::protobuf::SetRequest;
use crate::protobuf as pb;
use crate::store::StateMachineData;
use crate::typ::*;

Expand All @@ -12,14 +10,17 @@ pub mod store;
#[cfg(test)]
mod test;

mod pb_impl;

pub type NodeId = u64;

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig:
D = SetRequest,
R = Response,
Node = Node,
D = pb::SetRequest,
R = pb::Response,
LeaderId = pb::LeaderId,
Node = pb::Node,
SnapshotData = StateMachineData,
);

Expand All @@ -33,87 +34,71 @@ pub mod protobuf {
#[path = "../../utils/declare_types.rs"]
pub mod typ;

impl From<protobuf::LeaderId> for LeaderId {
fn from(proto_leader_id: protobuf::LeaderId) -> Self {
LeaderId::new(proto_leader_id.term, proto_leader_id.node_id)
}
}

impl From<protobuf::Vote> for typ::Vote {
fn from(proto_vote: protobuf::Vote) -> Self {
impl From<pb::Vote> for Vote {
fn from(proto_vote: pb::Vote) -> Self {
let leader_id: LeaderId = proto_vote.leader_id.unwrap().into();
if proto_vote.committed {
typ::Vote::new_committed(leader_id.term, leader_id.node_id)
Vote::new_committed(leader_id.term, leader_id.node_id)
} else {
typ::Vote::new(leader_id.term, leader_id.node_id)
Vote::new(leader_id.term, leader_id.node_id)
}
}
}

impl From<protobuf::LogId> for LogId {
fn from(proto_log_id: protobuf::LogId) -> Self {
let leader_id: LeaderId = proto_log_id.leader_id.unwrap().into();
LogId::new(leader_id, proto_log_id.index)
impl From<pb::LogId> for LogId {
fn from(proto_log_id: pb::LogId) -> Self {
LogId::new(proto_log_id.term, proto_log_id.index)
}
}

impl From<protobuf::VoteRequest> for VoteRequest {
fn from(proto_vote_req: protobuf::VoteRequest) -> Self {
let vote: typ::Vote = proto_vote_req.vote.unwrap().into();
impl From<pb::VoteRequest> for VoteRequest {
fn from(proto_vote_req: pb::VoteRequest) -> Self {
let vote: Vote = proto_vote_req.vote.unwrap().into();
let last_log_id = proto_vote_req.last_log_id.map(|log_id| log_id.into());
VoteRequest::new(vote, last_log_id)
}
}

impl From<protobuf::VoteResponse> for VoteResponse {
fn from(proto_vote_resp: protobuf::VoteResponse) -> Self {
let vote: typ::Vote = proto_vote_resp.vote.unwrap().into();
impl From<pb::VoteResponse> for VoteResponse {
fn from(proto_vote_resp: pb::VoteResponse) -> Self {
let vote: Vote = proto_vote_resp.vote.unwrap().into();
let last_log_id = proto_vote_resp.last_log_id.map(|log_id| log_id.into());
VoteResponse::new(vote, last_log_id, proto_vote_resp.vote_granted)
}
}

impl From<LeaderId> for protobuf::LeaderId {
fn from(leader_id: LeaderId) -> Self {
protobuf::LeaderId {
term: leader_id.term,
node_id: leader_id.node_id,
}
}
}

impl From<typ::Vote> for protobuf::Vote {
fn from(vote: typ::Vote) -> Self {
protobuf::Vote {
leader_id: Some(protobuf::LeaderId {
impl From<Vote> for pb::Vote {
fn from(vote: Vote) -> Self {
pb::Vote {
leader_id: Some(pb::LeaderId {
term: vote.leader_id().term,
node_id: vote.leader_id().node_id,
}),
committed: vote.is_committed(),
}
}
}
impl From<LogId> for protobuf::LogId {
impl From<LogId> for pb::LogId {
fn from(log_id: LogId) -> Self {
protobuf::LogId {
pb::LogId {
term: log_id.leader_id,
index: log_id.index,
leader_id: Some(log_id.leader_id.into()),
}
}
}

impl From<VoteRequest> for protobuf::VoteRequest {
impl From<VoteRequest> for pb::VoteRequest {
fn from(vote_req: VoteRequest) -> Self {
protobuf::VoteRequest {
pb::VoteRequest {
vote: Some(vote_req.vote.into()),
last_log_id: vote_req.last_log_id.map(|log_id| log_id.into()),
}
}
}

impl From<VoteResponse> for protobuf::VoteResponse {
impl From<VoteResponse> for pb::VoteResponse {
fn from(vote_resp: VoteResponse) -> Self {
protobuf::VoteResponse {
pb::VoteResponse {
vote: Some(vote_resp.vote.into()),
vote_granted: vote_resp.vote_granted,
last_log_id: vote_resp.last_log_id.map(|log_id| log_id.into()),
Expand Down
51 changes: 51 additions & 0 deletions examples/raft-kv-memstore-grpc/src/pb_impl/impl_leader_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//! Implement [`RaftLeaderId`] for protobuf defined LeaderId, so that it can be used in OpenRaft
use std::cmp::Ordering;
use std::fmt;

use openraft::vote::LeaderIdCompare;
use openraft::vote::RaftLeaderId;

use crate::protobuf as pb;
use crate::TypeConfig;

/// Implements PartialOrd for LeaderId to enforce the standard Raft behavior of at most one leader
/// per term.
///
/// In standard Raft, each term can have at most one leader. This is enforced by making leader IDs
/// with the same term incomparable (returning None), unless they refer to the same node.
///
/// This differs from the [`PartialOrd`] default implementation which would allow multiple leaders
/// in the same term by comparing node IDs.
impl PartialOrd for pb::LeaderId {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
LeaderIdCompare::std(self, other)
}
}

impl fmt::Display for pb::LeaderId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "T{}-N{}", self.term, self.node_id)
}
}

impl RaftLeaderId<TypeConfig> for pb::LeaderId {
type Committed = u64;

fn new(term: u64, node_id: u64) -> Self {
Self { term, node_id }
}

fn term(&self) -> u64 {
self.term
}

fn node_id_ref(&self) -> Option<&u64> {
Some(&self.node_id)
}

fn to_committed(&self) -> Self::Committed {
self.term
}
}
3 changes: 3 additions & 0 deletions examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Implements traits for protobuf types
mod impl_leader_id;
9 changes: 4 additions & 5 deletions openraft/src/vote/leader_id/leader_id_adv.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! [`RaftLeaderId`] implementation that allows multiple leaders per term.
use std::fmt;

use crate::vote::RaftCommittedLeaderId;
use crate::vote::RaftLeaderId;
use crate::RaftTypeConfig;

Expand Down Expand Up @@ -42,7 +43,7 @@ where C: RaftTypeConfig
pub type CommittedLeaderId<C> = LeaderId<C>;

impl<C> RaftLeaderId<C> for LeaderId<C>
where C: RaftTypeConfig
where C: RaftTypeConfig<LeaderId = Self>
{
type Committed = Self;

Expand All @@ -63,8 +64,6 @@ where C: RaftTypeConfig
}
}

impl<C> RaftCommittedLeaderId<C> for LeaderId<C> where C: RaftTypeConfig {}

#[cfg(test)]
mod tests {
use super::LeaderId;
Expand All @@ -89,7 +88,7 @@ mod tests {
}

#[test]
fn test_leader_id_partial_order() -> anyhow::Result<()> {
fn test_adv_leader_id_partial_order() -> anyhow::Result<()> {
#[allow(clippy::redundant_closure)]
let lid = |term, node_id| LeaderId::<UTConfig>::new(term, node_id);

Expand Down
123 changes: 123 additions & 0 deletions openraft/src/vote/leader_id/leader_id_cmp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::cmp::Ordering;
use std::marker::PhantomData;

use crate::vote::RaftLeaderId;
use crate::RaftTypeConfig;

/// Provide comparison functions for [`RaftLeaderId`] implementations.
pub struct LeaderIdCompare<C>(PhantomData<C>);

impl<C> LeaderIdCompare<C>
where C: RaftTypeConfig
{
/// Implements [`PartialOrd`] for LeaderId to enforce the standard Raft behavior of at most one
/// leader per term.
///
/// In standard Raft, each term can have at most one leader. This is enforced by making leader
/// IDs with the same term incomparable (returning None), unless they refer to the same
/// node.
pub fn std<LID>(a: &LID, b: &LID) -> Option<Ordering>
where LID: RaftLeaderId<C> {
match a.term().cmp(&b.term()) {
Ordering::Equal => match (a.node_id_ref(), b.node_id_ref()) {
(None, None) => Some(Ordering::Equal),
(Some(_), None) => Some(Ordering::Greater),
(None, Some(_)) => Some(Ordering::Less),
(Some(a), Some(b)) => {
if a == b {
Some(Ordering::Equal)
} else {
None
}
}
},
cmp => Some(cmp),
}
}

/// Implements [`PartialOrd`] for LeaderId to allow multiple leaders per term.
pub fn adv<LID>(a: &LID, b: &LID) -> Option<Ordering>
where LID: RaftLeaderId<C> {
let res = (a.term(), a.node_id_ref()).cmp(&(b.term(), b.node_id_ref()));
Some(res)
}
}

#[cfg(test)]
mod tests {
use std::cmp::Ordering;

use crate::engine::testing::UTConfig;
use crate::vote::RaftLeaderId;

#[derive(Debug, PartialEq, Eq, Default, Clone, PartialOrd, derive_more::Display)]
#[display("T{}-N{:?}", _0, _1)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
struct LeaderId(u64, Option<u64>);

impl RaftLeaderId<UTConfig> for LeaderId {
type Committed = u64;

fn new(term: u64, node_id: u64) -> Self {
Self(term, Some(node_id))
}

fn term(&self) -> u64 {
self.0
}

fn node_id_ref(&self) -> Option<&u64> {
self.1.as_ref()
}

fn to_committed(&self) -> Self::Committed {
self.0
}
}

#[test]
fn test_std_cmp() {
use Ordering::*;

use super::LeaderIdCompare as Cmp;

let lid = |term, node_id| LeaderId(term, Some(node_id));
let lid_none = |term| LeaderId(term, None);

// Compare term first
assert_eq!(Cmp::std(&lid(2, 2), &lid(1, 2)), Some(Greater));
assert_eq!(Cmp::std(&lid(1, 2), &lid(2, 2)), Some(Less));

// Equal term, Some > None
assert_eq!(Cmp::std(&lid(2, 2), &lid_none(2)), Some(Greater));
assert_eq!(Cmp::std(&lid_none(2), &lid(2, 2)), Some(Less));

// Equal
assert_eq!(Cmp::std(&lid(2, 2), &lid(2, 2)), Some(Equal));

// Incomparable
assert_eq!(Cmp::std(&lid(2, 2), &lid(2, 1)), None);
assert_eq!(Cmp::std(&lid(2, 1), &lid(2, 2)), None);
assert_eq!(Cmp::std(&lid(2, 2), &lid(2, 3)), None);
}

#[test]
fn test_adv_cmp() {
use Ordering::*;

use super::LeaderIdCompare as Cmp;

let lid = |term, node_id| LeaderId(term, Some(node_id));

// Compare term first
assert_eq!(Cmp::adv(&lid(2, 2), &lid(1, 2)), Some(Greater));
assert_eq!(Cmp::adv(&lid(1, 2), &lid(2, 2)), Some(Less));

// Equal term
assert_eq!(Cmp::adv(&lid(2, 2), &lid(2, 1)), Some(Greater));
assert_eq!(Cmp::adv(&lid(2, 1), &lid(2, 2)), Some(Less));

// Equal term, node_id
assert_eq!(Cmp::adv(&lid(2, 2), &lid(2, 2)), Some(Equal));
}
}
Loading

0 comments on commit 7b51cf9

Please sign in to comment.