Skip to content

Commit

Permalink
Change: remove deprecated RaftNetwork methods without option argument
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Feb 24, 2024
1 parent b73f906 commit 8928ad9
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ impl RaftNetworkFactory<TypeConfig> for Router {
}

impl RaftNetwork<TypeConfig> for Connection {
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
let resp = self
.router
Expand All @@ -65,7 +66,11 @@ impl RaftNetwork<TypeConfig> for Connection {
Ok(resp)
}

async fn send_vote(&mut self, req: VoteRequest<NodeId>) -> Result<VoteResponse<NodeId>, typ::RPCError> {
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/vote", req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ impl RaftNetworkFactory<TypeConfig> for Router {
}

impl RaftNetwork<TypeConfig> for Connection {
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
let resp = self
.router
Expand All @@ -65,7 +66,11 @@ impl RaftNetwork<TypeConfig> for Connection {
Ok(resp)
}

async fn send_vote(&mut self, req: VoteRequest<NodeId>) -> Result<VoteResponse<NodeId>, typ::RPCError> {
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/vote", req)
Expand Down
13 changes: 10 additions & 3 deletions examples/raft-kv-memstore-singlethreaded/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use openraft::error::InstallSnapshotError;
use openraft::error::RemoteError;
use openraft::network::RPCOption;
use openraft::raft::AppendEntriesRequest;
use openraft::raft::AppendEntriesResponse;
use openraft::raft::InstallSnapshotRequest;
Expand Down Expand Up @@ -32,9 +33,10 @@ impl RaftNetworkFactory<TypeConfig> for Router {
}

impl RaftNetwork<TypeConfig> for Connection {
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
let resp = self
.router
Expand All @@ -44,9 +46,10 @@ impl RaftNetwork<TypeConfig> for Connection {
Ok(resp)
}

async fn send_install_snapshot(
async fn install_snapshot(
&mut self,
req: InstallSnapshotRequest<TypeConfig>,
_option: RPCOption,
) -> Result<InstallSnapshotResponse<NodeId>, typ::RPCError<InstallSnapshotError>> {
let resp = self
.router
Expand All @@ -56,7 +59,11 @@ impl RaftNetwork<TypeConfig> for Connection {
Ok(resp)
}

async fn send_vote(&mut self, req: VoteRequest<NodeId>) -> Result<VoteResponse<NodeId>, typ::RPCError> {
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
let resp = self
.router
.send(self.target, "/raft/vote", req)
Expand Down
13 changes: 10 additions & 3 deletions examples/raft-kv-memstore/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use openraft::error::InstallSnapshotError;
use openraft::error::NetworkError;
use openraft::error::RemoteError;
use openraft::network::RPCOption;
use openraft::network::RaftNetwork;
use openraft::network::RaftNetworkFactory;
use openraft::raft::AppendEntriesRequest;
Expand Down Expand Up @@ -77,21 +78,27 @@ pub struct NetworkConnection {
}

impl RaftNetwork<TypeConfig> for NetworkConnection {
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, typ::RPCError> {
self.owner.send_rpc(self.target, &self.target_node, "raft-append", req).await
}

async fn send_install_snapshot(
async fn install_snapshot(
&mut self,
req: InstallSnapshotRequest<TypeConfig>,
_option: RPCOption,
) -> Result<InstallSnapshotResponse<NodeId>, typ::RPCError<InstallSnapshotError>> {
self.owner.send_rpc(self.target, &self.target_node, "raft-snapshot", req).await
}

async fn send_vote(&mut self, req: VoteRequest<NodeId>) -> Result<VoteResponse<NodeId>, typ::RPCError> {
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, typ::RPCError> {
self.owner.send_rpc(self.target, &self.target_node, "raft-vote", req).await
}
}
10 changes: 7 additions & 3 deletions examples/raft-kv-rocksdb/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use openraft::error::NetworkError;
use openraft::error::RPCError;
use openraft::error::RaftError;
use openraft::error::RemoteError;
use openraft::network::RPCOption;
use openraft::network::RaftNetwork;
use openraft::network::RaftNetworkFactory;
use openraft::raft::AppendEntriesRequest;
Expand Down Expand Up @@ -112,9 +113,10 @@ fn to_error<E: std::error::Error + 'static + Clone>(e: toy_rpc::Error, target: N
#[allow(clippy::blocks_in_conditions)]
impl RaftNetwork<TypeConfig> for NetworkConnection {
#[tracing::instrument(level = "debug", skip_all, err(Debug))]
async fn send_append_entries(
async fn append_entries(
&mut self,
req: AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<NodeId>, RPCError<NodeId, Node, RaftError<NodeId>>> {
tracing::debug!(req = debug(&req), "send_append_entries");

Expand All @@ -128,18 +130,20 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
}

#[tracing::instrument(level = "debug", skip_all, err(Debug))]
async fn send_install_snapshot(
async fn install_snapshot(
&mut self,
req: InstallSnapshotRequest<TypeConfig>,
_option: RPCOption,
) -> Result<InstallSnapshotResponse<NodeId>, RPCError<NodeId, Node, RaftError<NodeId, InstallSnapshotError>>> {
tracing::debug!(req = debug(&req), "send_install_snapshot");
self.c().await?.raft().snapshot(req).await.map_err(|e| to_error(e, self.target))
}

#[tracing::instrument(level = "debug", skip_all, err(Debug))]
async fn send_vote(
async fn vote(
&mut self,
req: VoteRequest<NodeId>,
_option: RPCOption,
) -> Result<VoteResponse<NodeId>, RPCError<NodeId, Node, RaftError<NodeId>>> {
tracing::debug!(req = debug(&req), "send_vote");
self.c().await?.raft().vote(req).await.map_err(|e| to_error(e, self.target))
Expand Down
100 changes: 19 additions & 81 deletions openraft/src/network/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,6 @@ use crate::Vote;
///
/// A single network instance is used to connect to a single target node. The network instance is
/// constructed by the [`RaftNetworkFactory`](`crate::network::RaftNetworkFactory`).
///
/// ### 2023-05-03: New API with options
///
/// - This trait introduced 3 new API `append_entries`, `install_snapshot` and `vote` which accept
/// an additional argument [`RPCOption`], and deprecated the old API `send_append_entries`,
/// `send_install_snapshot` and `send_vote`.
///
/// - The old API will be **removed** in `0.9`. An application can still implement the old API
/// without any changes. Openraft calls only the new API and the default implementation will
/// delegate to the old API.
///
/// - Implementing the new APIs will disable the old APIs.
#[add_async_trait]
pub trait RaftNetwork<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
Expand All @@ -52,37 +40,26 @@ where C: RaftTypeConfig
&mut self,
rpc: AppendEntriesRequest<C>,
option: RPCOption,
) -> Result<AppendEntriesResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
let _ = option;
#[allow(deprecated)]
self.send_append_entries(rpc).await
}
) -> Result<AppendEntriesResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>>;

/// Send an InstallSnapshot RPC to the target.
#[cfg(not(feature = "generic-snapshot-data"))]
#[deprecated(since = "0.9.0", note = "use `snapshot()` instead for sending a complete snapshot")]
async fn install_snapshot(
&mut self,
rpc: InstallSnapshotRequest<C>,
option: RPCOption,
_rpc: InstallSnapshotRequest<C>,
_option: RPCOption,
) -> Result<
InstallSnapshotResponse<C::NodeId>,
RPCError<C::NodeId, C::Node, RaftError<C::NodeId, InstallSnapshotError>>,
> {
let _ = option;
#[allow(deprecated)]
self.send_install_snapshot(rpc).await
}
>;

/// Send a RequestVote RPC to the target.
async fn vote(
&mut self,
rpc: VoteRequest<C::NodeId>,
option: RPCOption,
) -> Result<VoteResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
let _ = option;
#[allow(deprecated)]
self.send_vote(rpc).await
}
) -> Result<VoteResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>>;

/// Send a complete Snapshot to the target.
///
Expand All @@ -99,67 +76,28 @@ where C: RaftTypeConfig
/// with this vote.
///
/// `cancel` get `Ready` when the caller decides to cancel this snapshot transmission.
#[cfg(feature = "generic-snapshot-data")]
async fn snapshot(
&mut self,
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
cancel: impl Future<Output = ReplicationClosed> + OptionalSend,
option: RPCOption,
) -> Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>> {
#[cfg(not(feature = "generic-snapshot-data"))]
{
use crate::network::stream_snapshot;
use crate::network::stream_snapshot::SnapshotTransport;

let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?;
Ok(resp)
}
#[cfg(feature = "generic-snapshot-data")]
{
let _ = (vote, snapshot, cancel, option);
unimplemented!(
"no default implementation for RaftNetwork::snapshot() if `generic-snapshot-data` feature is enabled"
)
}
}
) -> Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>>;

/// Send an AppendEntries RPC to the target Raft node (§5).
#[deprecated(
since = "0.8.4",
note = "use `append_entries` instead. This method will be removed in 0.9"
)]
async fn send_append_entries(
&mut self,
rpc: AppendEntriesRequest<C>,
) -> Result<AppendEntriesResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
let _ = rpc;
unimplemented!("send_append_entries is deprecated")
}

/// Send an InstallSnapshot RPC to the target Raft node (§7).
#[deprecated(
since = "0.8.4",
note = "use `install_snapshot` instead. This method will be removed in 0.9"
)]
async fn send_install_snapshot(
#[cfg(not(feature = "generic-snapshot-data"))]
async fn snapshot(
&mut self,
rpc: InstallSnapshotRequest<C>,
) -> Result<
InstallSnapshotResponse<C::NodeId>,
RPCError<C::NodeId, C::Node, RaftError<C::NodeId, InstallSnapshotError>>,
> {
let _ = rpc;
unimplemented!("send_install_snapshot is deprecated")
}
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
cancel: impl Future<Output = ReplicationClosed> + OptionalSend,
option: RPCOption,
) -> Result<SnapshotResponse<C::NodeId>, StreamingError<C, Fatal<C::NodeId>>> {
use crate::network::stream_snapshot;
use crate::network::stream_snapshot::SnapshotTransport;

/// Send a RequestVote RPC to the target Raft node (§5).
#[deprecated(since = "0.8.4", note = "use `vote` instead. This method will be removed in 0.9")]
async fn send_vote(
&mut self,
rpc: VoteRequest<C::NodeId>,
) -> Result<VoteResponse<C::NodeId>, RPCError<C::NodeId, C::Node, RaftError<C::NodeId>>> {
let _ = rpc;
unimplemented!("send_vote is deprecated")
let resp = stream_snapshot::Chunked::send_snapshot(self, vote, snapshot, cancel, option).await?;
Ok(resp)
}

/// Build a backoff instance if the target node is temporarily(or permanently) unreachable.
Expand Down
10 changes: 7 additions & 3 deletions tests/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use openraft::error::RaftError;
use openraft::error::RemoteError;
use openraft::error::Unreachable;
use openraft::metrics::Wait;
use openraft::network::RPCOption;
use openraft::network::RaftNetwork;
use openraft::network::RaftNetworkFactory;
use openraft::raft::AppendEntriesRequest;
Expand Down Expand Up @@ -985,9 +986,10 @@ pub struct RaftRouterNetwork {

impl RaftNetwork<MemConfig> for RaftRouterNetwork {
/// Send an AppendEntries RPC to the target Raft node (§5).
async fn send_append_entries(
async fn append_entries(
&mut self,
mut rpc: AppendEntriesRequest<MemConfig>,
_option: RPCOption,
) -> Result<AppendEntriesResponse<MemNodeId>, RPCError<MemNodeId, (), RaftError<MemNodeId>>> {
let from_id = rpc.vote.leader_id().voted_for().unwrap();

Expand Down Expand Up @@ -1048,9 +1050,10 @@ impl RaftNetwork<MemConfig> for RaftRouterNetwork {
}

/// Send an InstallSnapshot RPC to the target Raft node (§7).
async fn send_install_snapshot(
async fn install_snapshot(
&mut self,
rpc: InstallSnapshotRequest<MemConfig>,
_option: RPCOption,
) -> Result<InstallSnapshotResponse<MemNodeId>, RPCError<MemNodeId, (), RaftError<MemNodeId, InstallSnapshotError>>>
{
let from_id = rpc.vote.leader_id().voted_for().unwrap();
Expand All @@ -1069,9 +1072,10 @@ impl RaftNetwork<MemConfig> for RaftRouterNetwork {
}

/// Send a RequestVote RPC to the target Raft node (§5).
async fn send_vote(
async fn vote(
&mut self,
rpc: VoteRequest<MemNodeId>,
_option: RPCOption,
) -> Result<VoteResponse<MemNodeId>, RPCError<MemNodeId, (), RaftError<MemNodeId>>> {
let from_id = rpc.vote.leader_id().voted_for().unwrap();

Expand Down

0 comments on commit 8928ad9

Please sign in to comment.