Skip to content

Commit

Permalink
Merge pull request #945 from drmingdrmer/28-move-repl
Browse files Browse the repository at this point in the history
Refactor: move internal replication request data to separate file
  • Loading branch information
drmingdrmer authored Nov 22, 2023
2 parents f8aeab9 + 2be576b commit 29995d8
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 148 deletions.
4 changes: 2 additions & 2 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ use crate::raft::InstallSnapshotRequest;
use crate::raft::VoteRequest;
use crate::raft_state::LogStateReader;
use crate::replication;
use crate::replication::Replicate;
use crate::replication::request::Replicate;
use crate::replication::response::ReplicationResult;
use crate::replication::ReplicationCore;
use crate::replication::ReplicationHandle;
use crate::replication::ReplicationResult;
use crate::replication::ReplicationSessionId;
use crate::runtime::RaftRuntime;
use crate::storage::LogFlushed;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::progress::Progress;
use crate::raft_state::LogStateReader;
use crate::replication::ReplicationResult;
use crate::replication::response::ReplicationResult;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::EffectiveMembership;
Expand Down
152 changes: 8 additions & 144 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
//! Replication stream.

mod replication_session_id;
mod response;
pub(crate) mod request;
pub(crate) mod response;

use std::fmt;
use std::io::SeekFrom;
use std::sync::Arc;
use std::time::Duration;

use anyerror::AnyError;
use futures::future::FutureExt;
pub(crate) use replication_session_id::ReplicationSessionId;
use request::Data;
use request::DataWithId;
use request::Replicate;
use response::ReplicationResult;
pub(crate) use response::Response;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
Expand Down Expand Up @@ -50,7 +54,6 @@ use crate::ErrorVerb;
use crate::Instant;
use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
use crate::RaftLogId;
use crate::RaftTypeConfig;
use crate::StorageError;
Expand Down Expand Up @@ -281,7 +284,7 @@ where
log_ids: DataWithId<LogIdRange<C::NodeId>>,
) -> Result<Option<Data<C>>, ReplicationError<C::NodeId, C::Node>> {
let request_id = log_ids.request_id();
let log_id_range = &log_ids.data;
let log_id_range = log_ids.data();

tracing::debug!(
request_id = display(request_id.display()),
Expand Down Expand Up @@ -601,7 +604,7 @@ where
snapshot_rx: DataWithId<oneshot::Receiver<Option<Snapshot<C>>>>,
) -> Result<Option<Data<C>>, ReplicationError<C::NodeId, C::Node>> {
let request_id = snapshot_rx.request_id();
let rx = snapshot_rx.data;
let rx = snapshot_rx.into_data();

tracing::info!(request_id = display(request_id.display()), "{}", func_name!());

Expand Down Expand Up @@ -762,142 +765,3 @@ where
);
}
}

pub struct DataWithId<T> {
/// The id of this replication request.
///
/// A replication request without an id does not need to send a reply to the caller.
request_id: Option<u64>,
data: T,
}

impl<T> DataWithId<T> {
pub fn new(request_id: Option<u64>, data: T) -> Self {
Self { request_id, data }
}

pub fn request_id(&self) -> Option<u64> {
self.request_id
}
}

/// Request to replicate a chunk of data, logs or snapshot.
///
/// It defines what data to send to a follower/learner and an id to identify who is sending this
/// data.
/// Thd data is either a series of logs or a snapshot.
pub(crate) enum Data<C>
where C: RaftTypeConfig
{
Logs(DataWithId<LogIdRange<C::NodeId>>),
Snapshot(DataWithId<oneshot::Receiver<Option<Snapshot<C>>>>),
}

impl<C> fmt::Debug for Data<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Logs(l) => f
.debug_struct("Data::Logs")
.field("request_id", &l.request_id())
.field("log_id_range", &l.data)
.finish(),
Self::Snapshot(s) => f.debug_struct("Data::Snapshot").field("request_id", &s.request_id()).finish(),
}
}
}

impl<C: RaftTypeConfig> fmt::Display for Data<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Logs(l) => {
write!(
f,
"Logs{{request_id: {}, log_id_range: {}}}",
l.request_id.display(),
l.data
)
}
Self::Snapshot(s) => {
write!(f, "Snapshot{{request_id: {}}}", s.request_id.display())
}
}
}
}

impl<C> MessageSummary<Data<C>> for Data<C>
where C: RaftTypeConfig
{
fn summary(&self) -> String {
self.to_string()
}
}

impl<C> Data<C>
where C: RaftTypeConfig
{
fn new_logs(request_id: Option<u64>, log_id_range: LogIdRange<C::NodeId>) -> Self {
Self::Logs(DataWithId::new(request_id, log_id_range))
}

fn new_snapshot(request_id: Option<u64>, snapshot_rx: oneshot::Receiver<Option<Snapshot<C>>>) -> Self {
Self::Snapshot(DataWithId::new(request_id, snapshot_rx))
}

fn request_id(&self) -> Option<u64> {
match self {
Self::Logs(l) => l.request_id(),
Self::Snapshot(s) => s.request_id(),
}
}
}

/// Result of an replication action.
#[derive(Clone, Debug)]
pub(crate) enum ReplicationResult<NID: NodeId> {
Matching(Option<LogId<NID>>),
Conflict(LogId<NID>),
}

/// A replication request sent by RaftCore leader state to replication stream.
pub(crate) enum Replicate<C>
where C: RaftTypeConfig
{
/// Inform replication stream to forward the committed log id to followers/learners.
Committed(Option<LogId<C::NodeId>>),

/// Send an empty AppendEntries RPC as heartbeat.
Heartbeat,

/// Send a chunk of data, e.g., logs or snapshot.
Data(Data<C>),
}

impl<C> Replicate<C>
where C: RaftTypeConfig
{
pub(crate) fn logs(id: Option<u64>, log_id_range: LogIdRange<C::NodeId>) -> Self {
Self::Data(Data::new_logs(id, log_id_range))
}

pub(crate) fn snapshot(id: Option<u64>, snapshot_rx: oneshot::Receiver<Option<Snapshot<C>>>) -> Self {
Self::Data(Data::new_snapshot(id, snapshot_rx))
}
}

impl<C> MessageSummary<Replicate<C>> for Replicate<C>
where C: RaftTypeConfig
{
fn summary(&self) -> String {
match self {
Replicate::Committed(c) => {
format!("Replicate::Committed: {:?}", c)
}
Replicate::Heartbeat => "Replicate::Heartbeat".to_string(),
Replicate::Data(d) => {
format!("Replicate::Data({})", d.summary())
}
}
}
}
150 changes: 150 additions & 0 deletions openraft/src/replication/request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
use std::fmt;

/// A replication request sent by RaftCore leader state to replication stream.
pub(crate) enum Replicate<C>
where C: RaftTypeConfig
{
/// Inform replication stream to forward the committed log id to followers/learners.
Committed(Option<LogId<C::NodeId>>),

/// Send an empty AppendEntries RPC as heartbeat.
Heartbeat,

/// Send a chunk of data, e.g., logs or snapshot.
Data(Data<C>),
}

impl<C> Replicate<C>
where C: RaftTypeConfig
{
pub(crate) fn logs(id: Option<u64>, log_id_range: LogIdRange<C::NodeId>) -> Self {
Self::Data(Data::new_logs(id, log_id_range))
}

pub(crate) fn snapshot(id: Option<u64>, snapshot_rx: oneshot::Receiver<Option<Snapshot<C>>>) -> Self {
Self::Data(Data::new_snapshot(id, snapshot_rx))
}
}

impl<C> MessageSummary<Replicate<C>> for Replicate<C>
where C: RaftTypeConfig
{
fn summary(&self) -> String {
match self {
Replicate::Committed(c) => {
format!("Replicate::Committed: {:?}", c)
}
Replicate::Heartbeat => "Replicate::Heartbeat".to_string(),
Replicate::Data(d) => {
format!("Replicate::Data({})", d.summary())
}
}
}
}

use tokio::sync::oneshot;

use crate::display_ext::DisplayOptionExt;
use crate::log_id_range::LogIdRange;
use crate::LogId;
use crate::MessageSummary;
use crate::RaftTypeConfig;
use crate::Snapshot;

/// Request to replicate a chunk of data, logs or snapshot.
///
/// It defines what data to send to a follower/learner and an id to identify who is sending this
/// data.
/// Thd data is either a series of logs or a snapshot.
pub(crate) enum Data<C>
where C: RaftTypeConfig
{
Logs(DataWithId<LogIdRange<C::NodeId>>),
Snapshot(DataWithId<oneshot::Receiver<Option<Snapshot<C>>>>),
}

impl<C> fmt::Debug for Data<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Logs(l) => f
.debug_struct("Data::Logs")
.field("request_id", &l.request_id())
.field("log_id_range", &l.data)
.finish(),
Self::Snapshot(s) => f.debug_struct("Data::Snapshot").field("request_id", &s.request_id()).finish(),
}
}
}

impl<C: RaftTypeConfig> fmt::Display for Data<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Logs(l) => {
write!(
f,
"Logs{{request_id: {}, log_id_range: {}}}",
l.request_id.display(),
l.data
)
}
Self::Snapshot(s) => {
write!(f, "Snapshot{{request_id: {}}}", s.request_id.display())
}
}
}
}

impl<C> MessageSummary<Data<C>> for Data<C>
where C: RaftTypeConfig
{
fn summary(&self) -> String {
self.to_string()
}
}

impl<C> Data<C>
where C: RaftTypeConfig
{
pub(crate) fn new_logs(request_id: Option<u64>, log_id_range: LogIdRange<C::NodeId>) -> Self {
Self::Logs(DataWithId::new(request_id, log_id_range))
}

pub(crate) fn new_snapshot(request_id: Option<u64>, snapshot_rx: oneshot::Receiver<Option<Snapshot<C>>>) -> Self {
Self::Snapshot(DataWithId::new(request_id, snapshot_rx))
}

pub(crate) fn request_id(&self) -> Option<u64> {
match self {
Self::Logs(l) => l.request_id(),
Self::Snapshot(s) => s.request_id(),
}
}
}

pub(crate) struct DataWithId<T> {
/// The id of this replication request.
///
/// A replication request without an id does not need to send a reply to the caller.
request_id: Option<u64>,
data: T,
}

impl<T> DataWithId<T> {
pub(crate) fn new(request_id: Option<u64>, data: T) -> Self {
Self { request_id, data }
}

pub(crate) fn request_id(&self) -> Option<u64> {
self.request_id
}

pub(crate) fn data(&self) -> &T {
&self.data
}

pub(crate) fn into_data(self) -> T {
self.data
}
}
10 changes: 9 additions & 1 deletion openraft/src/replication/response.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::replication::ReplicationResult;
use crate::replication::ReplicationSessionId;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Vote;
Expand Down Expand Up @@ -99,3 +100,10 @@ where C: RaftTypeConfig
}
}
}

/// Result of an replication action.
#[derive(Clone, Debug)]
pub(crate) enum ReplicationResult<NID: NodeId> {
Matching(Option<LogId<NID>>),
Conflict(LogId<NID>),
}

0 comments on commit 29995d8

Please sign in to comment.