Skip to content

Commit

Permalink
Feature: RaftNetwork::snapshot() to send a complete snapshot
Browse files Browse the repository at this point in the history
Add `RaftNetwork::snapshot()` to send a complete snapshot and move
sending snapshot by chunks out of ReplicationCore.

To enable a fully customizable implementation of snapshot transmission
tailored to the application's needs, this commit relocates the
chunk-by-chunk transmission logic from `ReplicationCore` to a new
sub mod, `crate::network::stream_snapshot`.

The `stream_snapshot` mod provides a default chunk-based snapshot
transmission mechanism, which can be overridden by creating a custom
implementation of the `RaftNetwork::snapshot()` method. As part of this
commit, `RaftNetwork::snapshot()` simply delegates to `stream_snapshot`.
Developers may use `stream_snapshot` as a reference when implementing
their own snapshot transmission strategy.

Snapshot transmission is internally divided into two distinct phases:

1. Upon request for snapshot transmission, `ReplicationCore` initiates a
   new task `RaftNetwork::snapshot()` dedicated to sending a complete
   `Snapshot`. This task should be able to be terminated gracefully by
   subscribing the `cancel` future.

2. Once the snapshot has been fully transmitted by
   `RaftNetwork::snapshot()`, the task signals an event back to
   `ReplicationCore`. Subsequently, `ReplicationCore` informs `RaftCore`
   of the event, allowing it to acknowledge the completion of the
   snapshot transmission.

Other changes:

- `ReplicationCore` has two `RaftNetwork`s, one for log replication and
  heartbeat, the other for snapshot only.

- `ReplicationClosed` becomes a public error for notifying the
  application implemented sender that a snapshot replication is
  canceled.

- `StreamingError` is introduced as a container of errors that may occur
  in application defined snapshot transmission, including local IO
  error, network errors, errors returned by remote peer and `ReplicationClosed`.

- The `SnapshotResponse` type is introduced to differentiate it from the
  `InstallSnapshotResponse`, which is used for chunk-based responses.

---

- Part of #606
  • Loading branch information
drmingdrmer committed Feb 19, 2024
1 parent 23553ba commit 4098c38
Show file tree
Hide file tree
Showing 25 changed files with 655 additions and 176 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ jobs:
with:
name: tests-feature-test
path: |
openraft/_log/
tests/_log/
lint:
name: lint
Expand Down
9 changes: 9 additions & 0 deletions openraft/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ pub struct Config {
///
/// It is disabled by default, by setting it to `0`.
/// The timeout for sending every segment is `install_snapshot_timeout`.
#[deprecated(
since = "0.9.0",
note = "Sending snapshot by chunks is deprecated; Use `install_snapshot_timeout` instead"
)]
#[clap(long, default_value = "0")]
pub send_snapshot_timeout: u64,

Expand Down Expand Up @@ -258,7 +262,12 @@ impl Config {
}

/// Get the timeout for sending a non-last snapshot segment.
#[deprecated(
since = "0.9.0",
note = "Sending snapshot by chunks is deprecated; Use `install_snapshot_timeout()` instead"
)]
pub fn send_snapshot_timeout(&self) -> Duration {
#[allow(deprecated)]
if self.send_snapshot_timeout > 0 {
Duration::from_millis(self.send_snapshot_timeout)
} else {
Expand Down
7 changes: 6 additions & 1 deletion openraft/src/config/config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ fn test_build() -> anyhow::Result<()> {
assert_eq!(10, config.election_timeout_min);
assert_eq!(20, config.election_timeout_max);
assert_eq!(5, config.heartbeat_interval);
assert_eq!(199, config.send_snapshot_timeout);

#[allow(deprecated)]
{
assert_eq!(199, config.send_snapshot_timeout);
}
assert_eq!(200, config.install_snapshot_timeout);
assert_eq!(201, config.max_payload_entries);
assert_eq!(SnapshotPolicy::LogsSinceLast(202), config.snapshot_policy);
Expand All @@ -78,6 +82,7 @@ fn test_build() -> anyhow::Result<()> {
assert_eq!(207, config.purge_batch_size);

// Test config methods
#[allow(deprecated)]
{
let mut c = config;
assert_eq!(Duration::from_millis(199), c.send_snapshot_timeout());
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ where

let membership_log_id = self.engine.state.membership_state.effective().log_id();
let network = self.network.new_client(target, target_node).await;
let snapshot_network = self.network.new_client(target, target_node).await;

let session_id = ReplicationSessionId::new(*self.engine.state.vote_ref(), *membership_log_id);

Expand All @@ -789,6 +790,7 @@ where
self.engine.state.committed().copied(),
progress_entry.matching,
network,
snapshot_network,
self.log_store.get_log_reader().await,
self.tx_notify.clone(),
tracing::span!(parent: &self.span, Level::DEBUG, "replication", id=display(self.id), target=display(target)),
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::BoxCoreFn;
use crate::raft::ClientWriteResponse;
use crate::raft::InstallSnapshotResponse;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::type_config::alias::LogIdOf;
Expand Down Expand Up @@ -64,7 +64,7 @@ where C: RaftTypeConfig
InstallCompleteSnapshot {
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
tx: ResultSender<InstallSnapshotResponse<C::NodeId>>,
tx: ResultSender<SnapshotResponse<C::NodeId>>,
},

/// Begin receiving a snapshot from the leader.
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::raft::AppendEntriesResponse;
use crate::raft::InstallSnapshotResponse;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::LeaderId;
Expand Down Expand Up @@ -229,7 +230,7 @@ where
AppendEntries(ValueSender<Result<AppendEntriesResponse<NID>, Infallible>>),
ReceiveSnapshotChunk(ValueSender<Result<(), InstallSnapshotError>>),
InstallSnapshot(ValueSender<Result<InstallSnapshotResponse<NID>, InstallSnapshotError>>),
InstallCompleteSnapshot(ValueSender<Result<InstallSnapshotResponse<NID>, Infallible>>),
InstallCompleteSnapshot(ValueSender<Result<SnapshotResponse<NID>, Infallible>>),
Initialize(ValueSender<Result<(), InitializeError<NID, N>>>),
}

Expand Down
10 changes: 4 additions & 6 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::error::RejectVoteRequest;
use crate::internal_server_state::InternalServerState;
use crate::membership::EffectiveMembership;
use crate::raft::AppendEntriesResponse;
use crate::raft::InstallSnapshotResponse;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft_state::LogStateReader;
Expand Down Expand Up @@ -451,14 +451,12 @@ where C: RaftTypeConfig
&mut self,
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
tx: ResultSender<InstallSnapshotResponse<C::NodeId>>,
tx: ResultSender<SnapshotResponse<C::NodeId>>,
) {
tracing::info!(vote = display(vote), snapshot = display(&snapshot), "{}", func_name!());

let vote_res = self.vote_handler().accept_vote(&vote, tx, |state, _rejected| {
Ok(InstallSnapshotResponse {
vote: *state.vote_ref(),
})
Ok(SnapshotResponse::new(*state.vote_ref()))
});

let Some(tx) = vote_res else {
Expand All @@ -467,7 +465,7 @@ where C: RaftTypeConfig

let mut fh = self.following_handler();
fh.install_complete_snapshot(snapshot);
let res = Ok(InstallSnapshotResponse {
let res = Ok(SnapshotResponse {
vote: *self.state.vote_ref(),
});

Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ where C: RaftTypeConfig
let granted = *self
.leader
.clock_progress
.update(&node_id, Some(t))
.increase_to(&node_id, Some(t))
.expect("it should always update existing progress");

tracing::debug!(
Expand Down
25 changes: 20 additions & 5 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! Error types exposed by this crate.

mod replication_closed;
mod streaming_error;

use std::collections::BTreeSet;
use std::error::Error;
use std::fmt;
Expand All @@ -8,6 +11,8 @@ use std::time::Duration;

use anyerror::AnyError;

pub use self::replication_closed::ReplicationClosed;
pub use self::streaming_error::StreamingError;
use crate::network::RPCTypes;
use crate::node::Node;
use crate::raft::AppendEntriesResponse;
Expand Down Expand Up @@ -243,11 +248,6 @@ where
RPCError(#[from] RPCError<NID, N, RaftError<NID, Infallible>>),
}

/// Error occurs when replication is closed.
#[derive(Debug, thiserror::Error)]
#[error("Replication is closed by RaftCore")]
pub(crate) struct ReplicationClosed {}

/// Error occurs when invoking a remote raft API.
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
// NID already has serde bound.
Expand Down Expand Up @@ -326,6 +326,21 @@ impl<NID: NodeId, N: Node, T: Error> RemoteError<NID, N, T> {
}
}

impl<NID, N, E> From<RemoteError<NID, N, Fatal<NID>>> for RemoteError<NID, N, RaftError<NID, E>>
where
NID: NodeId,
N: Node,
E: Error,
{
fn from(e: RemoteError<NID, N, Fatal<NID>>) -> Self {
RemoteError {
target: e.target,
target_node: e.target_node,
source: RaftError::Fatal(e.source),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[error("seen a higher vote: {higher} GT mine: {mine}")]
Expand Down
17 changes: 17 additions & 0 deletions openraft/src/error/replication_closed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/// Replication is closed intentionally.
///
/// No further replication action should be taken.
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[error("Replication is closed: {reason}")]
pub struct ReplicationClosed {
reason: String,
}

impl ReplicationClosed {
pub fn new(reason: impl ToString) -> Self {
Self {
reason: reason.to_string(),
}
}
}
72 changes: 72 additions & 0 deletions openraft/src/error/streaming_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::error::Error;

use crate::error::NetworkError;
use crate::error::RPCError;
use crate::error::RaftError;
use crate::error::RemoteError;
use crate::error::ReplicationClosed;
use crate::error::ReplicationError;
use crate::error::Timeout;
use crate::error::Unreachable;
use crate::RaftTypeConfig;
use crate::StorageError;

/// Error occurred when streaming local data to a remote raft node.
///
/// Thus this error includes storage error, network error, and remote error.
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(
feature = "serde",
derive(serde::Serialize, serde::Deserialize),
serde(bound(serialize = "E: serde::Serialize")),
serde(bound(deserialize = "E: for <'d> serde::Deserialize<'d>"))
)]
pub enum StreamingError<C: RaftTypeConfig, E: Error> {
/// The replication stream is closed intentionally.
#[error(transparent)]
Closed(#[from] ReplicationClosed),

/// Storage error occurs when reading local data.
#[error(transparent)]
StorageError(#[from] StorageError<C::NodeId>),

/// Timeout when streaming data to remote node.
#[error(transparent)]
Timeout(#[from] Timeout<C::NodeId>),

/// The node is temporarily unreachable and should backoff before retrying.
#[error(transparent)]
Unreachable(#[from] Unreachable),

/// Failed to send the RPC request and should retry immediately.
#[error(transparent)]
Network(#[from] NetworkError),

/// Remote node returns an error.
#[error(transparent)]
RemoteError(#[from] RemoteError<C::NodeId, C::Node, E>),
}

impl<C: RaftTypeConfig, E> From<StreamingError<C, E>> for ReplicationError<C::NodeId, C::Node>
where
E: Error,
RaftError<C::NodeId>: From<E>,
{
fn from(e: StreamingError<C, E>) -> Self {
match e {
StreamingError::Closed(e) => ReplicationError::Closed(e),
StreamingError::StorageError(e) => ReplicationError::StorageError(e),
StreamingError::Timeout(e) => ReplicationError::RPCError(RPCError::Timeout(e)),
StreamingError::Unreachable(e) => ReplicationError::RPCError(RPCError::Unreachable(e)),
StreamingError::Network(e) => ReplicationError::RPCError(RPCError::Network(e)),
StreamingError::RemoteError(e) => {
let remote_err = RemoteError {
target: e.target,
target_node: e.target_node,
source: RaftError::from(e.source),
};
ReplicationError::RPCError(RPCError::RemoteError(remote_err))
}
}
}
}
4 changes: 2 additions & 2 deletions openraft/src/membership/into_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ where
N: Node,
NID: NodeId,
{
#[deprecated(note = "unused any more")]
#[deprecated(since = "0.8.4", note = "unused any more")]
fn has_nodes(&self) -> bool {
unimplemented!("has_nodes is deprecated")
}

#[deprecated(note = "unused any more")]
#[deprecated(since = "0.8.4", note = "unused any more")]
fn node_ids(&self) -> Vec<NID> {
unimplemented!("node_ids is deprecated")
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/membership/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ where
}

/// Check to see if the config is currently in joint consensus.
#[deprecated(note = "use `get_joint_config().len() > 1` instead")]
#[deprecated(since = "0.8.4", note = "use `get_joint_config().len() > 1` instead")]
pub fn is_in_joint_consensus(&self) -> bool {
self.configs.len() > 1
}
Expand Down
9 changes: 6 additions & 3 deletions openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,18 @@ where
}

/// Wait until applied exactly `want_log`(inclusive) logs or timeout.
#[deprecated(note = "use `log_index()` and `applied_index()` instead, deprecated since 0.9.0")]
#[deprecated(since = "0.9.0", note = "use `log_index()` and `applied_index()` instead")]
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn log(&self, want_log_index: Option<u64>, msg: impl ToString) -> Result<RaftMetrics<NID, N>, WaitError> {
self.eq(Metric::LastLogIndex(want_log_index), msg.to_string()).await?;
self.eq(Metric::AppliedIndex(want_log_index), msg.to_string()).await
}

/// Wait until applied at least `want_log`(inclusive) logs or timeout.
#[deprecated(note = "use `log_index_at_least()` and `applied_index_at_least()` instead, deprecated since 0.9.0")]
#[deprecated(
since = "0.9.0",
note = "use `log_index_at_least()` and `applied_index_at_least()` instead"
)]
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn log_at_least(
&self,
Expand Down Expand Up @@ -195,7 +198,7 @@ where
}

/// Wait for `membership` to become the expected node id set or timeout.
#[deprecated(note = "use `voter_ids()` instead, deprecated since 0.9.0")]
#[deprecated(since = "0.9.0", note = "use `voter_ids()` instead")]
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn members(
&self,
Expand Down
Loading

0 comments on commit 4098c38

Please sign in to comment.