Skip to content

Commit

Permalink
Refactor: update gRPC example to adapt to LeaderId changes
Browse files Browse the repository at this point in the history
- Update the `raft-kv-memstore-grpc` example to use the protobuf-defined `LeaderId`.
- Automatically implement `CommittedLeaderId` for all types.
- Add `openraft::vote::LeaderIdCompare` to provide comparison functions for both single-leader-per-term and multi-leader-per-term implementations.
  • Loading branch information
drmingdrmer committed Dec 31, 2024
1 parent 6908a3c commit afe2134
Show file tree
Hide file tree
Showing 14 changed files with 243 additions and 80 deletions.
7 changes: 7 additions & 0 deletions examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"proto/management_service.proto",
"proto/api_service.proto",
];

// TODO: remove serde

tonic_build::configure()
.type_attribute("openraftpb.Node", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.type_attribute(
Expand All @@ -17,6 +20,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"openraftpb.Response",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute(
"openraftpb.LeaderId",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.compile_protos_with_config(config, &proto_files, &["proto"])?;
Ok(())
}
4 changes: 2 additions & 2 deletions examples/raft-kv-memstore-grpc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ message Vote {

// LogId represents the log identifier in Raft
message LogId {
uint64 index = 1;
LeaderId leader_id = 2;
uint64 term = 1;
uint64 index = 2;
}

// VoteRequest represents a request for votes during leader election
Expand Down
6 changes: 3 additions & 3 deletions examples/raft-kv-memstore-grpc/src/grpc/management_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use tonic::Response;
use tonic::Status;
use tracing::debug;

use crate::pb;
use crate::protobuf::management_service_server::ManagementService;
use crate::protobuf::AddLearnerRequest;
use crate::protobuf::ChangeMembershipRequest;
use crate::protobuf::InitRequest;
use crate::protobuf::RaftReplyString;
use crate::protobuf::RaftRequestString;
use crate::typ::*;
use crate::Node;

/// Management service implementation for Raft cluster administration.
/// Handles cluster initialization, membership changes, and metrics collection.
Expand Down Expand Up @@ -62,11 +62,11 @@ impl ManagementService for ManagementServiceImpl {
let req = request.into_inner();

// Convert nodes into required format
let nodes_map: BTreeMap<u64, Node> = req
let nodes_map: BTreeMap<u64, pb::Node> = req
.nodes
.into_iter()
.map(|node| {
(node.node_id, Node {
(node.node_id, pb::Node {
rpc_addr: node.rpc_addr,
node_id: node.node_id,
})
Expand Down
77 changes: 31 additions & 46 deletions examples/raft-kv-memstore-grpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#![allow(clippy::uninlined_format_args)]

use crate::protobuf::Node;
use crate::protobuf::Response;
use crate::protobuf::SetRequest;
use crate::protobuf as pb;
use crate::store::StateMachineData;
use crate::typ::*;

Expand All @@ -12,14 +10,17 @@ pub mod store;
#[cfg(test)]
mod test;

mod pb_impl;

pub type NodeId = u64;

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig:
D = SetRequest,
R = Response,
Node = Node,
D = pb::SetRequest,
R = pb::Response,
LeaderId = pb::LeaderId,
Node = pb::Node,
SnapshotData = StateMachineData,
);

Expand All @@ -33,87 +34,71 @@ pub mod protobuf {
#[path = "../../utils/declare_types.rs"]
pub mod typ;

impl From<protobuf::LeaderId> for LeaderId {
fn from(proto_leader_id: protobuf::LeaderId) -> Self {
LeaderId::new(proto_leader_id.term, proto_leader_id.node_id)
}
}

impl From<protobuf::Vote> for typ::Vote {
fn from(proto_vote: protobuf::Vote) -> Self {
impl From<pb::Vote> for Vote {
fn from(proto_vote: pb::Vote) -> Self {
let leader_id: LeaderId = proto_vote.leader_id.unwrap().into();
if proto_vote.committed {
typ::Vote::new_committed(leader_id.term, leader_id.node_id)
Vote::new_committed(leader_id.term, leader_id.node_id)
} else {
typ::Vote::new(leader_id.term, leader_id.node_id)
Vote::new(leader_id.term, leader_id.node_id)
}
}
}

impl From<protobuf::LogId> for LogId {
fn from(proto_log_id: protobuf::LogId) -> Self {
let leader_id: LeaderId = proto_log_id.leader_id.unwrap().into();
LogId::new(leader_id, proto_log_id.index)
impl From<pb::LogId> for LogId {
fn from(proto_log_id: pb::LogId) -> Self {
LogId::new(proto_log_id.term, proto_log_id.index)
}
}

impl From<protobuf::VoteRequest> for VoteRequest {
fn from(proto_vote_req: protobuf::VoteRequest) -> Self {
let vote: typ::Vote = proto_vote_req.vote.unwrap().into();
impl From<pb::VoteRequest> for VoteRequest {
fn from(proto_vote_req: pb::VoteRequest) -> Self {
let vote: Vote = proto_vote_req.vote.unwrap().into();
let last_log_id = proto_vote_req.last_log_id.map(|log_id| log_id.into());
VoteRequest::new(vote, last_log_id)
}
}

impl From<protobuf::VoteResponse> for VoteResponse {
fn from(proto_vote_resp: protobuf::VoteResponse) -> Self {
let vote: typ::Vote = proto_vote_resp.vote.unwrap().into();
impl From<pb::VoteResponse> for VoteResponse {
fn from(proto_vote_resp: pb::VoteResponse) -> Self {
let vote: Vote = proto_vote_resp.vote.unwrap().into();
let last_log_id = proto_vote_resp.last_log_id.map(|log_id| log_id.into());
VoteResponse::new(vote, last_log_id, proto_vote_resp.vote_granted)
}
}

impl From<LeaderId> for protobuf::LeaderId {
fn from(leader_id: LeaderId) -> Self {
protobuf::LeaderId {
term: leader_id.term,
node_id: leader_id.node_id,
}
}
}

impl From<typ::Vote> for protobuf::Vote {
fn from(vote: typ::Vote) -> Self {
protobuf::Vote {
leader_id: Some(protobuf::LeaderId {
impl From<Vote> for pb::Vote {
fn from(vote: Vote) -> Self {
pb::Vote {
leader_id: Some(pb::LeaderId {
term: vote.leader_id().term,
node_id: vote.leader_id().node_id,
}),
committed: vote.is_committed(),
}
}
}
impl From<LogId> for protobuf::LogId {
impl From<LogId> for pb::LogId {
fn from(log_id: LogId) -> Self {
protobuf::LogId {
pb::LogId {
term: log_id.leader_id,
index: log_id.index,
leader_id: Some(log_id.leader_id.into()),
}
}
}

impl From<VoteRequest> for protobuf::VoteRequest {
impl From<VoteRequest> for pb::VoteRequest {
fn from(vote_req: VoteRequest) -> Self {
protobuf::VoteRequest {
pb::VoteRequest {
vote: Some(vote_req.vote.into()),
last_log_id: vote_req.last_log_id.map(|log_id| log_id.into()),
}
}
}

impl From<VoteResponse> for protobuf::VoteResponse {
impl From<VoteResponse> for pb::VoteResponse {
fn from(vote_resp: VoteResponse) -> Self {
protobuf::VoteResponse {
pb::VoteResponse {
vote: Some(vote_resp.vote.into()),
vote_granted: vote_resp.vote_granted,
last_log_id: vote_resp.last_log_id.map(|log_id| log_id.into()),
Expand Down
4 changes: 2 additions & 2 deletions examples/raft-kv-memstore-grpc/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use openraft::network::RPCOption;
use openraft::RaftNetworkFactory;
use tonic::transport::Channel;

use crate::protobuf as pb;
use crate::protobuf::internal_service_client::InternalServiceClient;
use crate::protobuf::RaftRequestBytes;
use crate::protobuf::SnapshotRequest;
use crate::protobuf::VoteRequest as PbVoteRequest;
use crate::protobuf::VoteResponse as PbVoteResponse;
use crate::typ::RPCError;
use crate::typ::*;
use crate::Node;
use crate::NodeId;
use crate::TypeConfig;

Expand All @@ -38,7 +38,7 @@ impl RaftNetworkFactory<TypeConfig> for Network {
/// Represents an active network connection to a remote Raft node.
/// Handles serialization and deserialization of Raft messages over gRPC.
pub struct NetworkConnection {
target_node: Node,
target_node: pb::Node,
}

impl NetworkConnection {
Expand Down
51 changes: 51 additions & 0 deletions examples/raft-kv-memstore-grpc/src/pb_impl/impl_leader_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//! Implement [`RaftLeaderId`] for protobuf defined LeaderId, so that it can be used in OpenRaft
use std::cmp::Ordering;
use std::fmt;

use openraft::vote::LeaderIdCompare;
use openraft::vote::RaftLeaderId;

use crate::protobuf as pb;
use crate::TypeConfig;

/// Implements PartialOrd for LeaderId to enforce the standard Raft behavior of at most one leader
/// per term.
///
/// In standard Raft, each term can have at most one leader. This is enforced by making leader IDs
/// with the same term incomparable (returning None), unless they refer to the same node.
///
/// This differs from the [`PartialOrd`] default implementation which would allow multiple leaders
/// in the same term by comparing node IDs.
impl PartialOrd for pb::LeaderId {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
LeaderIdCompare::std(self, other)
}
}

impl fmt::Display for pb::LeaderId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "T{}-N{}", self.term, self.node_id)
}
}

impl RaftLeaderId<TypeConfig> for pb::LeaderId {
type Committed = u64;

fn new(term: u64, node_id: u64) -> Self {
Self { term, node_id }
}

fn term(&self) -> u64 {
self.term
}

fn node_id_ref(&self) -> Option<&u64> {
Some(&self.node_id)
}

fn to_committed(&self) -> Self::Committed {
self.term
}
}
3 changes: 3 additions & 0 deletions examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Implements traits for protobuf types
mod impl_leader_id;
2 changes: 2 additions & 0 deletions examples/utils/declare_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub type Entry = openraft::Entry<TypeConfig>;
pub type EntryPayload = openraft::EntryPayload<TypeConfig>;
pub type StoredMembership = openraft::StoredMembership<TypeConfig>;

pub type Node = <TypeConfig as openraft::RaftTypeConfig>::Node;

pub type LogState = openraft::storage::LogState<TypeConfig>;

pub type SnapshotMeta = openraft::SnapshotMeta<TypeConfig>;
Expand Down
9 changes: 4 additions & 5 deletions openraft/src/vote/leader_id/leader_id_adv.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! [`RaftLeaderId`] implementation that allows multiple leaders per term.
use std::fmt;

use crate::vote::RaftCommittedLeaderId;
use crate::vote::RaftLeaderId;
use crate::RaftTypeConfig;

Expand Down Expand Up @@ -42,7 +43,7 @@ where C: RaftTypeConfig
pub type CommittedLeaderId<C> = LeaderId<C>;

impl<C> RaftLeaderId<C> for LeaderId<C>
where C: RaftTypeConfig
where C: RaftTypeConfig<LeaderId = Self>
{
type Committed = Self;

Expand All @@ -63,8 +64,6 @@ where C: RaftTypeConfig
}
}

impl<C> RaftCommittedLeaderId<C> for LeaderId<C> where C: RaftTypeConfig {}

#[cfg(test)]
mod tests {
use super::LeaderId;
Expand All @@ -89,7 +88,7 @@ mod tests {
}

#[test]
fn test_leader_id_partial_order() -> anyhow::Result<()> {
fn test_adv_leader_id_partial_order() -> anyhow::Result<()> {
#[allow(clippy::redundant_closure)]
let lid = |term, node_id| LeaderId::<UTConfig>::new(term, node_id);

Expand Down
Loading

0 comments on commit afe2134

Please sign in to comment.