Skip to content

Commit

Permalink
Refactor: Extract type alias from examples
Browse files Browse the repository at this point in the history
Moved the type alias declaration (e.g., `pub type Raft =
openraft::Raft<TypeConfig>`) to a separate file, `declare_types.rs`, for
better reusability across all examples.
  • Loading branch information
drmingdrmer committed Dec 25, 2024
1 parent 429a9fd commit dac4286
Show file tree
Hide file tree
Showing 35 changed files with 308 additions and 542 deletions.
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore-grpc/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use raft_kv_memstore_grpc::network::Network;
use raft_kv_memstore_grpc::protobuf::api_service_server::ApiServiceServer;
use raft_kv_memstore_grpc::protobuf::internal_service_server::InternalServiceServer;
use raft_kv_memstore_grpc::protobuf::management_service_server::ManagementServiceServer;
use raft_kv_memstore_grpc::typ::Raft;
use raft_kv_memstore_grpc::LogStore;
use raft_kv_memstore_grpc::Raft;
use raft_kv_memstore_grpc::StateMachineStore;
use tonic::transport::Server;
use tracing::info;
Expand Down
7 changes: 3 additions & 4 deletions examples/raft-kv-memstore-grpc/src/grpc/api_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use openraft::Raft;
use tonic::Request;
use tonic::Response;
use tonic::Status;
Expand All @@ -11,7 +10,7 @@ use crate::protobuf::GetRequest;
use crate::protobuf::Response as PbResponse;
use crate::protobuf::SetRequest;
use crate::store::StateMachineStore;
use crate::TypeConfig;
use crate::typ::*;

/// External API service implementation providing key-value store operations.
/// This service handles client requests for getting and setting values in the distributed store.
Expand All @@ -26,7 +25,7 @@ use crate::TypeConfig;
/// before processing them through the Raft consensus protocol.
pub struct ApiServiceImpl {
/// The Raft node instance for consensus operations
raft_node: Raft<TypeConfig>,
raft_node: Raft,
/// The state machine store for direct reads
state_machine_store: Arc<StateMachineStore>,
}
Expand All @@ -37,7 +36,7 @@ impl ApiServiceImpl {
/// # Arguments
/// * `raft_node` - The Raft node instance this service will use
/// * `state_machine_store` - The state machine store for reading data
pub fn new(raft_node: Raft<TypeConfig>, state_machine_store: Arc<StateMachineStore>) -> Self {
pub fn new(raft_node: Raft, state_machine_store: Arc<StateMachineStore>) -> Self {
ApiServiceImpl {
raft_node,
state_machine_store,
Expand Down
7 changes: 3 additions & 4 deletions examples/raft-kv-memstore-grpc/src/grpc/internal_service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use bincode::deserialize;
use bincode::serialize;
use futures::StreamExt;
use openraft::Raft;
use openraft::Snapshot;
use tonic::Request;
use tonic::Response;
Expand All @@ -16,7 +15,7 @@ use crate::protobuf::SnapshotRequest;
use crate::protobuf::VoteRequest;
use crate::protobuf::VoteResponse;
use crate::store::StateMachineData;
use crate::TypeConfig;
use crate::typ::*;

/// Internal gRPC service implementation for Raft protocol communications.
/// This service handles the core Raft consensus protocol operations between cluster nodes.
Expand All @@ -31,15 +30,15 @@ use crate::TypeConfig;
/// exposed to other trusted Raft cluster nodes, never to external clients.
pub struct InternalServiceImpl {
/// The local Raft node instance that this service operates on
raft_node: Raft<TypeConfig>,
raft_node: Raft,
}

impl InternalServiceImpl {
/// Creates a new instance of the internal service
///
/// # Arguments
/// * `raft_node` - The Raft node instance this service will operate on
pub fn new(raft_node: Raft<TypeConfig>) -> Self {
pub fn new(raft_node: Raft) -> Self {
InternalServiceImpl { raft_node }
}

Expand Down
7 changes: 3 additions & 4 deletions examples/raft-kv-memstore-grpc/src/grpc/management_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::BTreeMap;

use openraft::Raft;
use tonic::Request;
use tonic::Response;
use tonic::Status;
Expand All @@ -12,8 +11,8 @@ use crate::protobuf::ChangeMembershipRequest;
use crate::protobuf::InitRequest;
use crate::protobuf::RaftReplyString;
use crate::protobuf::RaftRequestString;
use crate::typ::*;
use crate::Node;
use crate::TypeConfig;

/// Management service implementation for Raft cluster administration.
/// Handles cluster initialization, membership changes, and metrics collection.
Expand All @@ -24,15 +23,15 @@ use crate::TypeConfig;
/// - Changing cluster membership
/// - Collecting metrics
pub struct ManagementServiceImpl {
raft_node: Raft<TypeConfig>,
raft_node: Raft,
}

impl ManagementServiceImpl {
/// Creates a new instance of the management service
///
/// # Arguments
/// * `raft_node` - The Raft node instance this service will manage
pub fn new(raft_node: Raft<TypeConfig>) -> Self {
pub fn new(raft_node: Raft) -> Self {
ManagementServiceImpl { raft_node }
}

Expand Down
57 changes: 17 additions & 40 deletions examples/raft-kv-memstore-grpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
#![allow(clippy::uninlined_format_args)]

use openraft::raft::VoteRequest;
use openraft::raft::VoteResponse;
use openraft::LeaderId;
use openraft::LogId;

use crate::protobuf::Node;
use crate::protobuf::Response;
use crate::protobuf::SetRequest;
use crate::store::StateMachineData;
use crate::typ::*;

pub mod grpc;
pub mod network;
Expand All @@ -29,42 +25,23 @@ openraft::declare_raft_types!(

pub type LogStore = store::LogStore;
pub type StateMachineStore = store::StateMachineStore;
pub type Raft = openraft::Raft<TypeConfig>;

pub mod protobuf {
tonic::include_proto!("openraftpb");
}

pub mod typ {

use crate::TypeConfig;

pub type Vote = openraft::Vote<TypeConfig>;
pub type SnapshotMeta = openraft::SnapshotMeta<TypeConfig>;
pub type SnapshotData = <TypeConfig as openraft::RaftTypeConfig>::SnapshotData;
pub type Snapshot = openraft::Snapshot<TypeConfig>;

pub type RaftError<E = openraft::error::Infallible> = openraft::error::RaftError<TypeConfig, E>;
pub type RPCError = openraft::error::RPCError<TypeConfig>;
pub type StreamingError = openraft::error::StreamingError<TypeConfig>;

pub type ClientWriteError = openraft::error::ClientWriteError<TypeConfig>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<TypeConfig>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<TypeConfig>;
pub type InitializeError = openraft::error::InitializeError<TypeConfig>;

pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<TypeConfig>;
}
#[path = "../../utils/declare_types.rs"]
pub mod typ;

impl From<protobuf::LeaderId> for LeaderId<TypeConfig> {
impl From<protobuf::LeaderId> for LeaderId {
fn from(proto_leader_id: protobuf::LeaderId) -> Self {
LeaderId::new(proto_leader_id.term, proto_leader_id.node_id)
}
}

impl From<protobuf::Vote> for typ::Vote {
fn from(proto_vote: protobuf::Vote) -> Self {
let leader_id: LeaderId<TypeConfig> = proto_vote.leader_id.unwrap().into();
let leader_id: LeaderId = proto_vote.leader_id.unwrap().into();
if proto_vote.committed {
typ::Vote::new_committed(leader_id.term, leader_id.node_id)
} else {
Expand All @@ -73,31 +50,31 @@ impl From<protobuf::Vote> for typ::Vote {
}
}

impl From<protobuf::LogId> for LogId<TypeConfig> {
impl From<protobuf::LogId> for LogId {
fn from(proto_log_id: protobuf::LogId) -> Self {
let leader_id: LeaderId<TypeConfig> = proto_log_id.leader_id.unwrap().into();
let leader_id: LeaderId = proto_log_id.leader_id.unwrap().into();
LogId::new(leader_id, proto_log_id.index)
}
}

impl From<protobuf::VoteRequest> for VoteRequest<TypeConfig> {
impl From<protobuf::VoteRequest> for VoteRequest {
fn from(proto_vote_req: protobuf::VoteRequest) -> Self {
let vote: typ::Vote = proto_vote_req.vote.unwrap().into();
let last_log_id = proto_vote_req.last_log_id.map(|log_id| log_id.into());
VoteRequest::new(vote, last_log_id)
}
}

impl From<protobuf::VoteResponse> for VoteResponse<TypeConfig> {
impl From<protobuf::VoteResponse> for VoteResponse {
fn from(proto_vote_resp: protobuf::VoteResponse) -> Self {
let vote: typ::Vote = proto_vote_resp.vote.unwrap().into();
let last_log_id = proto_vote_resp.last_log_id.map(|log_id| log_id.into());
VoteResponse::new(vote, last_log_id, proto_vote_resp.vote_granted)
}
}

impl From<LeaderId<TypeConfig>> for protobuf::LeaderId {
fn from(leader_id: LeaderId<TypeConfig>) -> Self {
impl From<LeaderId> for protobuf::LeaderId {
fn from(leader_id: LeaderId) -> Self {
protobuf::LeaderId {
term: leader_id.term,
node_id: leader_id.node_id,
Expand All @@ -116,26 +93,26 @@ impl From<typ::Vote> for protobuf::Vote {
}
}
}
impl From<LogId<TypeConfig>> for protobuf::LogId {
fn from(log_id: LogId<TypeConfig>) -> Self {
impl From<LogId> for protobuf::LogId {
fn from(log_id: LogId) -> Self {
protobuf::LogId {
index: log_id.index,
leader_id: Some(log_id.leader_id.into()),
}
}
}

impl From<VoteRequest<TypeConfig>> for protobuf::VoteRequest {
fn from(vote_req: VoteRequest<TypeConfig>) -> Self {
impl From<VoteRequest> for protobuf::VoteRequest {
fn from(vote_req: VoteRequest) -> Self {
protobuf::VoteRequest {
vote: Some(vote_req.vote.into()),
last_log_id: vote_req.last_log_id.map(|log_id| log_id.into()),
}
}
}

impl From<VoteResponse<TypeConfig>> for protobuf::VoteResponse {
fn from(vote_resp: VoteResponse<TypeConfig>) -> Self {
impl From<VoteResponse> for protobuf::VoteResponse {
fn from(vote_resp: VoteResponse) -> Self {
protobuf::VoteResponse {
vote: Some(vote_resp.vote.into()),
vote_granted: vote_resp.vote_granted,
Expand Down
21 changes: 7 additions & 14 deletions examples/raft-kv-memstore-grpc/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ use openraft::error::NetworkError;
use openraft::error::Unreachable;
use openraft::network::v2::RaftNetworkV2;
use openraft::network::RPCOption;
use openraft::raft::AppendEntriesRequest;
use openraft::raft::AppendEntriesResponse;
use openraft::raft::VoteRequest;
use openraft::raft::VoteResponse;
use openraft::RaftNetworkFactory;
use tonic::transport::Channel;

Expand All @@ -17,6 +13,7 @@ use crate::protobuf::SnapshotRequest;
use crate::protobuf::VoteRequest as PbVoteRequest;
use crate::protobuf::VoteResponse as PbVoteResponse;
use crate::typ::RPCError;
use crate::typ::*;
use crate::Node;
use crate::NodeId;
use crate::TypeConfig;
Expand Down Expand Up @@ -56,9 +53,9 @@ impl NetworkConnection {
impl RaftNetworkV2<TypeConfig> for NetworkConnection {
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
req: AppendEntriesRequest,
_option: RPCOption,
) -> Result<AppendEntriesResponse<TypeConfig>, RPCError> {
) -> Result<AppendEntriesResponse, RPCError> {
let server_addr = self.target_node.rpc_addr.clone();
let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await {
Ok(channel) => channel,
Expand All @@ -78,11 +75,11 @@ impl RaftNetworkV2<TypeConfig> for NetworkConnection {

async fn full_snapshot(
&mut self,
vote: openraft::Vote<TypeConfig>,
snapshot: openraft::Snapshot<TypeConfig>,
vote: Vote,
snapshot: Snapshot,
_cancel: impl std::future::Future<Output = openraft::error::ReplicationClosed> + openraft::OptionalSend + 'static,
_option: RPCOption,
) -> Result<openraft::raft::SnapshotResponse<TypeConfig>, crate::typ::StreamingError> {
) -> Result<SnapshotResponse, crate::typ::StreamingError> {
let server_addr = self.target_node.rpc_addr.clone();
let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await {
Ok(channel) => channel,
Expand Down Expand Up @@ -128,11 +125,7 @@ impl RaftNetworkV2<TypeConfig> for NetworkConnection {
Ok(result)
}

async fn vote(
&mut self,
req: VoteRequest<TypeConfig>,
_option: RPCOption,
) -> Result<VoteResponse<TypeConfig>, RPCError> {
async fn vote(&mut self, req: VoteRequest, _option: RPCOption) -> Result<VoteResponse, RPCError> {
let server_addr = self.target_node.rpc_addr.clone();
let channel = match Channel::builder(format!("http://{}", server_addr).parse().unwrap()).connect().await {
Ok(channel) => channel,
Expand Down
Loading

0 comments on commit dac4286

Please sign in to comment.