From b8065122c24e041613c286fb48e90d2285998fde Mon Sep 17 00:00:00 2001 From: yuuya uezato Date: Fri, 4 Jun 2021 20:29:05 +0900 Subject: [PATCH 1/5] =?UTF-8?q?suffix=E3=81=AE=E6=98=8E=E7=A4=BA=E7=9A=84?= =?UTF-8?q?=E3=81=AA=E5=89=8A=E9=99=A4=E3=82=B3=E3=83=BC=E3=83=89=E3=82=92?= =?UTF-8?q?=E8=BF=BD=E5=8A=A0(issue18=E5=AF=BE=E7=AD=96)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/io.rs | 8 ++++++ src/node_state/common/mod.rs | 5 ++++ src/node_state/follower/delete.rs | 47 +++++++++++++++++++++++++++++++ src/node_state/follower/idle.rs | 10 ++----- src/node_state/follower/mod.rs | 7 +++++ 5 files changed, 70 insertions(+), 7 deletions(-) create mode 100644 src/node_state/follower/delete.rs diff --git a/src/io.rs b/src/io.rs index a84e834..ebeac4c 100644 --- a/src/io.rs +++ b/src/io.rs @@ -34,6 +34,9 @@ pub trait Io { /// ローカルログを取得するための`Future`. type LoadLog: Future; + /// ローカルログの末尾部分を削除するための`Future`. + type DeleteLog: Future; + /// タイムアウトを表現するための`Future`. type Timeout: Future; @@ -83,6 +86,11 @@ pub trait Io { /// ただし、`start`とは異なる位置から、エントリの取得を開始することは許可されない. fn load_log(&mut self, start: LogIndex, end: Option) -> Self::LoadLog; + + /// ローカルログのうち末尾部分について、 + /// 指定された位置 `from` を含むそれ以降 [from..) を全て削除する. + fn delete_suffix_from(&mut self, from: LogIndex) -> Self::DeleteLog; + /// 選挙における役割に応じた時間のタイムアウトオブジェクトを生成する. fn create_timeout(&mut self, role: Role) -> Self::Timeout; diff --git a/src/node_state/common/mod.rs b/src/node_state/common/mod.rs index 3c4497d..24b9e10 100644 --- a/src/node_state/common/mod.rs +++ b/src/node_state/common/mod.rs @@ -238,6 +238,11 @@ where self.io.load_log(start, end) } + /// `from`以降のsuffixエントリ [from..) を削除する + pub fn delete_suffix_from(&mut self, from: LogIndex) -> IO::DeleteLog { + self.io.delete_suffix_from(from) + } + /// ローカルログの末尾部分に`suffix`を追記する. pub fn save_log_suffix(&mut self, suffix: &LogSuffix) -> IO::SaveLog { self.io.save_log_suffix(suffix) diff --git a/src/node_state/follower/delete.rs b/src/node_state/follower/delete.rs new file mode 100644 index 0000000..3f2ef5a --- /dev/null +++ b/src/node_state/follower/delete.rs @@ -0,0 +1,47 @@ +use futures::{Async, Future}; + +use super::super::{Common, NextState, RoleState}; +use super::{Follower, FollowerIdle}; +use crate::log::LogPosition; +use crate::message::{AppendEntriesCall, Message}; +use crate::{Io, Result}; + +/// ローカルログの削除を行うサブ状態 +pub struct FollowerDelete { + future: IO::DeleteLog, + from: LogPosition, + message: AppendEntriesCall, +} +impl FollowerDelete { + pub fn new(common: &mut Common, from: LogPosition, message: AppendEntriesCall) -> Self { + let future = common.delete_suffix_from(from.index); + FollowerDelete { + future, + from, + message, + } + } + pub fn handle_message( + &mut self, + common: &mut Common, + message: Message, + ) -> Result> { + if let Message::AppendEntriesCall(m) = message { + common.rpc_callee(&m.header).reply_busy(); + } + Ok(None) + } + pub fn run_once(&mut self, common: &mut Common) -> Result> { + if let Async::Ready(_) = track!(self.future.poll())? { + track!(common.handle_log_rollbacked(self.from))?; + common + .rpc_callee(&self.message.header) + .reply_append_entries(self.from); + + let next = Follower::Idle(FollowerIdle::new()); + Ok(Some(RoleState::Follower(next))) + } else { + Ok(None) + } + } +} diff --git a/src/node_state/follower/idle.rs b/src/node_state/follower/idle.rs index 2b479f6..c01edf1 100644 --- a/src/node_state/follower/idle.rs +++ b/src/node_state/follower/idle.rs @@ -2,7 +2,7 @@ use std::marker::PhantomData; use trackable::error::ErrorKindExt; use super::super::{Common, NextState, RoleState}; -use super::{Follower, FollowerAppend, FollowerSnapshot}; +use super::{Follower, FollowerAppend, FollowerSnapshot, FollowerDelete}; use crate::log::{LogPosition, LogSuffix}; use crate::message::{AppendEntriesCall, Message}; use crate::{ErrorKind, Io, Result}; @@ -93,12 +93,8 @@ impl FollowerIdle { if !matched { // 両者が分岐している // => ローカルログ(の未コミット領域)をロールバックして、同期位置まで戻る - let new_log_tail = lcp; - track!(common.handle_log_rollbacked(new_log_tail))?; - common - .rpc_callee(&message.header) - .reply_append_entries(new_log_tail); - Ok(None) + let next = FollowerDelete::new(common, lcp, message); + Ok(Some(RoleState::Follower(Follower::Delete(next)))) } else { // 両者は包含関係にあるので、追記が可能 track!(message.suffix.skip_to(lcp.index))?; diff --git a/src/node_state/follower/mod.rs b/src/node_state/follower/mod.rs index c4c25f2..7b9ba62 100644 --- a/src/node_state/follower/mod.rs +++ b/src/node_state/follower/mod.rs @@ -2,6 +2,7 @@ use self::append::FollowerAppend; use self::idle::FollowerIdle; use self::init::FollowerInit; use self::snapshot::FollowerSnapshot; +use self::delete::FollowerDelete; use super::{Common, NextState}; use crate::election::Role; use crate::message::{Message, MessageHeader}; @@ -11,6 +12,7 @@ mod append; mod idle; mod init; mod snapshot; +mod delete; /// 別の人(ノード)に投票しているフォロワー. /// @@ -30,6 +32,9 @@ pub enum Follower { /// ローカルログへのスナップショット保存中. Snapshot(FollowerSnapshot), + + /// ローカルログの末尾部分を削除中 + Delete(FollowerDelete), } impl Follower { pub fn new(common: &mut Common, pending_vote: Option) -> Self { @@ -58,6 +63,7 @@ impl Follower { Follower::Idle(ref mut t) => track!(t.handle_message(common, message)), Follower::Append(ref mut t) => track!(t.handle_message(common, message)), Follower::Snapshot(ref mut t) => track!(t.handle_message(common, message)), + Follower::Delete(ref mut t) => track!(t.handle_message(common, message)), } } pub fn run_once(&mut self, common: &mut Common) -> Result> { @@ -66,6 +72,7 @@ impl Follower { Follower::Idle(_) => Ok(None), Follower::Append(ref mut t) => track!(t.run_once(common)), Follower::Snapshot(ref mut t) => track!(t.run_once(common)), + Follower::Delete(ref mut t) => track!(t.run_once(common)), } } } From a6e89c202af7e7c877ad9e9a3d26e2036b550e9b Mon Sep 17 00:00:00 2001 From: yuuya uezato Date: Fri, 13 Aug 2021 03:33:23 +0900 Subject: [PATCH 2/5] =?UTF-8?q?Follower=E3=81=A7=E3=81=AEhandle=5Ftimeout?= =?UTF-8?q?=E3=81=A7Delete=E4=B8=AD=E3=81=AE=E3=82=BF=E3=82=A4=E3=83=A0?= =?UTF-8?q?=E3=82=A2=E3=82=A6=E3=83=88=E3=81=AF=E7=A6=81=E6=AD=A2=E3=81=99?= =?UTF-8?q?=E3=82=8B=E3=80=82=E7=A6=81=E6=AD=A2=E3=81=97=E3=81=AA=E3=81=84?= =?UTF-8?q?=E5=A0=B4=E5=90=88=E3=81=AB=E5=95=8F=E9=A1=8C=E3=81=A8=20?= =?UTF-8?q?=E3=81=AA=E3=82=8B=E6=A4=9C=E8=A8=BC=E3=82=B3=E3=83=BC=E3=83=89?= =?UTF-8?q?=E3=82=82=E4=BB=98=E8=A8=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/io.rs | 1 - src/node_state/follower/delete.rs | 133 +++++++++++++++++++++++++++++- src/node_state/follower/idle.rs | 2 +- src/node_state/follower/mod.rs | 16 +++- src/test_dsl/dsl.rs | 106 +++++++++++++++--------- src/test_dsl/impl_io.rs | 60 +++++++++++++- src/test_util.rs | 5 ++ 7 files changed, 275 insertions(+), 48 deletions(-) diff --git a/src/io.rs b/src/io.rs index ebeac4c..6cf8dd9 100644 --- a/src/io.rs +++ b/src/io.rs @@ -86,7 +86,6 @@ pub trait Io { /// ただし、`start`とは異なる位置から、エントリの取得を開始することは許可されない. fn load_log(&mut self, start: LogIndex, end: Option) -> Self::LoadLog; - /// ローカルログのうち末尾部分について、 /// 指定された位置 `from` を含むそれ以降 [from..) を全て削除する. fn delete_suffix_from(&mut self, from: LogIndex) -> Self::DeleteLog; diff --git a/src/node_state/follower/delete.rs b/src/node_state/follower/delete.rs index 3f2ef5a..e2a75c0 100644 --- a/src/node_state/follower/delete.rs +++ b/src/node_state/follower/delete.rs @@ -10,7 +10,7 @@ use crate::{Io, Result}; pub struct FollowerDelete { future: IO::DeleteLog, from: LogPosition, - message: AppendEntriesCall, + message: AppendEntriesCall, } impl FollowerDelete { pub fn new(common: &mut Common, from: LogPosition, message: AppendEntriesCall) -> Self { @@ -45,3 +45,134 @@ impl FollowerDelete { } } } + +#[cfg(test)] +mod test { + use crate::test_dsl::dsl::*; + + #[test] + #[rustfmt::skip] + fn delete_test_scenario1() { + use Command::*; + use LogEntry::*; + + let a = NodeName(0); + let b = NodeName(1); + let c = NodeName(2); + let (mut service, _cluster) = build_complete_graph(&[a, b, c]); + + interpret( + &vec![ + RunAllUntilStabilize, + Timeout(a), + RunAllUntilStabilize, + // ここまでで a がリーダーになっている + + // 実際にリーダーになっていることを確認する + Check(a, Pred::IsLeader), + Check(b, Pred::IsFollower), + Check(c, Pred::IsFollower), + + RecvBan(a, b), RecvBan(a, c), // aはbからもcからも受け取らない + RecvBan(b, a), // bはaからは受け取らない + RecvBan(c, a), // cはaからは受け取らない + + // aが孤立している状況で + // データをproposeすることで + // aにデータをためる + Propose(a), Propose(a), Propose(a), + + // bとcは新しいTermへ移る準備(aのfollowを外す) + Timeout(b), Timeout(c), RunAllUntilStabilize, + + // bを新しいリーダーにする + Timeout(b), + + // bとcだけ適当な回数計算を進める + Step(b), Step(c), + Step(b), Step(c), + Step(b), Step(c), + Step(b), Step(c), + Step(b), Step(c), + Step(b), Step(c), + + // cを独立させる + RecvBan(c, b), + RecvBan(c, a), + RunAllUntilStabilize, + + // 想定している状況になっていることを確認する + Check(a, Pred::IsLeader), + Check(b, Pred::IsLeader), + Check(c, Pred::IsFollower), + Check(a, Pred::RawLogIs(0, 0, vec![Noop(2), Com(2), Com(2), Com(2)])), + Check(b, Pred::RawLogIs(0, 0, vec![Noop(2), Noop(4)])), + Check(c, Pred::RawLogIs(0, 0, vec![Noop(2)])), // Noop(4)がないことに注意 + + // a<->b は通信可能にする + RecvAllow(b, a), + RecvAllow(a, b), + + // bからハートビートを送る + Heartbeat(b), + // メモリ上のlogとdisk上のlogにgapが生じるところまで進める + Step(b), Step(a), + Step(b), Step(a), + Step(b), Step(a), + Step(b), Step(a), + + // これは a のメモリ上のlogの終端位置が + // 4にあって、その直前のtermが2であることを意味している + Check(a, Pred::HistoryTail(2, 4)), + + // 一方で、既にDiskからは削除済みである + Check(a, Pred::RawLogIs(0, 0, vec![Noop(2)])), + + // この状態のまま、aがタイムアウトしてしまうと + // 上の食い違った状態でleaderになろうとする。 + // + // follower/mod.rs において handle_timeout で + // Delete中のタイムアウトは禁止している場合は + // このような場合を防ぐためであり、 + // 以下のコードで問題は起きない。 + // + // 一方で、タイムアウトを許す場合は + // 以下のコード(最後のStepAll)で問題が起こる。 + + // まず a<->c 間だけ通信を可能にする + RecvAllow(a, c), + RecvAllow(c, a), + RecvBan(b, a), + RecvBan(b, c), + RecvBan(c, b), + RecvBan(a, b), + + // aとcをタイムアウトさせて十分に実行させることで + // 両者をcandidateにする + Timeout(a), Timeout(c), StepAll(100), + + // そのあと、aがLeaderになれるようにTermを増やす手助け + Timeout(a), + + // Delete中のタイムアウトを許していない場合 => aはfollowerのまま, cはcandidate + // Delete中のタイムアウトを許している場合 => a も c も candidate になる + // + // Delete中の「タイムアウトを許可している場合」は、 + // aがleaderとなった後で、 + // メモリ上のlogと実体との差に起因するエラーが生じる。 + // + // 発生するエラーについて: + // 今回は `impl_io::over_write` で + // Disk上で「連続しないlog」を作成しようとしてエラーとなる。 + // + // RaftlogではDisk上のlogは + // 論理上連続していることが仮定されているが + // (IO traitのload_log methodの引数を参照せよ) + // 仮にエラーとなる部分のassertionを外したとしても、 + // 存在しない領域へのloadが発生し、どのみちエラーになる。 + StepAll(100), + ], + &mut service + ); + } +} diff --git a/src/node_state/follower/idle.rs b/src/node_state/follower/idle.rs index c01edf1..5ed1b0c 100644 --- a/src/node_state/follower/idle.rs +++ b/src/node_state/follower/idle.rs @@ -2,7 +2,7 @@ use std::marker::PhantomData; use trackable::error::ErrorKindExt; use super::super::{Common, NextState, RoleState}; -use super::{Follower, FollowerAppend, FollowerSnapshot, FollowerDelete}; +use super::{Follower, FollowerAppend, FollowerDelete, FollowerSnapshot}; use crate::log::{LogPosition, LogSuffix}; use crate::message::{AppendEntriesCall, Message}; use crate::{ErrorKind, Io, Result}; diff --git a/src/node_state/follower/mod.rs b/src/node_state/follower/mod.rs index 7b9ba62..e53cee9 100644 --- a/src/node_state/follower/mod.rs +++ b/src/node_state/follower/mod.rs @@ -1,18 +1,18 @@ use self::append::FollowerAppend; +use self::delete::FollowerDelete; use self::idle::FollowerIdle; use self::init::FollowerInit; use self::snapshot::FollowerSnapshot; -use self::delete::FollowerDelete; use super::{Common, NextState}; use crate::election::Role; use crate::message::{Message, MessageHeader}; use crate::{Io, Result}; mod append; +mod delete; mod idle; mod init; mod snapshot; -mod delete; /// 別の人(ノード)に投票しているフォロワー. /// @@ -43,7 +43,17 @@ impl Follower { Follower::Init(follower) } pub fn handle_timeout(&mut self, common: &mut Common) -> Result> { - Ok(Some(common.transit_to_candidate())) + match self { + Follower::Delete(_) => { + // Delete中はタイムアウトしても削除処理を続行する。 + // もしタイムアウトによってキャンセルした場合は + // follower/delete.rs にある + // delete_test_scenario1 でプログラムが異常終了する。 + // 詳しくは当該テストを参考のこと。 + Ok(None) + } + _ => Ok(Some(common.transit_to_candidate())), + } } pub fn handle_message( &mut self, diff --git a/src/test_dsl/dsl.rs b/src/test_dsl/dsl.rs index a78771c..7b02eb0 100644 --- a/src/test_dsl/dsl.rs +++ b/src/test_dsl/dsl.rs @@ -3,8 +3,8 @@ use std::collections::{BTreeMap, BTreeSet}; use crate::cluster::ClusterMembers; -use crate::election::Role; -use crate::log::{self, LogPosition}; +use crate::election::{Role, Term}; +use crate::log::{self, LogIndex, LogPosition}; use crate::node::NodeId; use crate::test_dsl::impl_io::*; use crate::ReplicatedLog; @@ -16,7 +16,7 @@ use std::fmt; /// DSL中でノード名を表すために用いる構造体 /// 現時点ではu8のnewtypeに過ぎない #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] -pub struct NodeName(u8); +pub struct NodeName(pub u8); impl fmt::Display for NodeName { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -67,6 +67,9 @@ pub enum Pred { /// 1. snapshotとrawlogが正しく接合している /// 2. rawlogのtermが昇順になっている LogTermConsistency, + + /// Historyのtailを調べる + HistoryTail(u64 /* term */, u64 /* index */), } /// 引数`rlog`で表される特定のノードが、述語`pred`を満たすかどうかを調べる @@ -115,8 +118,6 @@ fn check(rlog: &ReplicatedLog, pred: Pred) -> bool { } RawLogIs(term, index, entries) => { if let Some(rawlog) = &rlog.io().rawlog() { - dbg!(&rawlog); - let head: LogPosition = rawlog.head; let head_check: bool = (head.prev_term.as_u64() == term as u64) @@ -152,6 +153,13 @@ fn check(rlog: &ReplicatedLog, pred: Pred) -> bool { false } } + HistoryTail(term, pos) => { + rlog.local_history().tail() + == LogPosition { + prev_term: Term::new(term), + index: LogIndex::new(pos), + } + } } } @@ -184,6 +192,12 @@ pub enum Command { /// 指定されたノードを、raftlogの実装としての意味で1-stepだけ実行する Step(NodeName), + /// すべてのノードを、iter-step実行する。 + /// この時ノードの実行はfairである。 + /// すなわち各ノードをすべて1-step実行してから + /// 2-step目の実行を各ノードについて行い、その後で3-step目として実行していく。 + StepAll(usize), + /// 指定されたノードが、述語を満たすかどうかを検査する Check(NodeName, Pred), @@ -219,7 +233,7 @@ pub fn interpret(cs: &[Command], service: &mut Service) { /// DSLの一つのコマンドを実行する関数 fn interpret_command(c: Command, service: &mut Service) { - use crate::futures::Stream; + use futures::Stream; use Command::*; println!("\n Now executing {:?} ...", &c); @@ -232,7 +246,7 @@ fn interpret_command(c: Command, service: &mut Service) { } Check(node, pred) => { let rlog = service.get_mut(&node).unwrap(); - assert!(check(&rlog, pred)); + assert!(check(rlog, pred)); } Heartbeat(node) => { let rlog = service.get_mut(&node).unwrap(); @@ -248,6 +262,13 @@ fn interpret_command(c: Command, service: &mut Service) { Step(node) => { service.get_mut(&node).unwrap().poll().unwrap(); } + StepAll(iter) => { + for _ in 0..iter { + for (_n, io) in service.iter_mut() { + io.poll().expect("never fails in test senarios"); + } + } + } RunAllUntilStabilize => loop { let mut check = true; for (_n, io) in service.iter_mut() { @@ -313,10 +334,10 @@ pub fn build_complete_graph(names: &[NodeName]) -> (Service, ClusterMembers) { } for src in &nodes { - let mut io_src = ios.remove(&src).unwrap(); + let mut io_src = ios.remove(src).unwrap(); for dst in &nodes { if src != dst { - let io_dst = ios.get(&dst).unwrap(); + let io_dst = ios.get(dst).unwrap(); io_src.set_channel(dst.clone(), io_dst.copy_sender()); } } @@ -338,10 +359,12 @@ pub fn build_complete_graph(names: &[NodeName]) -> (Service, ClusterMembers) { mod test { use super::*; + /// Logの削除処理を実装していない場合には、 /// あるノードで、 /// snapshot(prefix)の表す最終termと /// rawlog(suffix)の先頭エントリが期待するtermで /// ズレが生じる現象を再現させる。 + /// (削除処理を実装している場合は問題ない) /// /// このテストではノードaでズレが生じるようにする。 #[test] @@ -380,7 +403,7 @@ mod test { Timeout(b), Timeout(c), RunAllUntilStabilize, // bを新しいリーダーにする - Timeout(b), RunAllUntilStabilize, + Timeout(b), StepAll(100), // 想定している状況になっていることを確認する Check(a, Pred::IsLeader), @@ -399,34 +422,36 @@ mod test { RecvAllow(c, a), // bからaとcへheartbeatを送る - Heartbeat(b), RunAllUntilStabilize, - - // aがstaleしていることがこの時点で判明して - // bのsnapshotがaに移る - // snapshotは、rawlog[2].term = 4 であることを要求しているが - Check(a, Pred::SnapShotIs(4 /* term */ , 2 /* index */)), - - // 一方でbのrawlogは、snapshotの保存にあたって先頭部分が削除され次のようになる + Heartbeat(b), StepAll(100), + + // **FollowerDeleteを実装している場合** + // 以下のように適切にログが整理される。 + Check(a, Pred::SnapShotIs(4, 2)), + Check(a, Pred::RawLogIs(0, 0, vec![])), + + // **FollowerDeleteを実装していない場合(以前のraftlog)** + // aがstaleしていることがこの時点で判明してbのsnapshotがaに移る + // snapshotは、rawlog[2].term = 4 であることを要求する + // Check(a, Pred::SnapShotIs(4 /* term */ , 2 /* index */)), + // + // 一方でaのrawlogは、snapshotの保存にあたって先頭部分が削除され次のようになる // snapshotの要求ではterm4から始まることになっているが、これに反する。 - Check(a, Pred::RawLogIs(2, 2, vec![Com(2), Com(2), Com(2), Com(2)])), - + // Check(a, Pred::RawLogIs(2, 2, vec![Com(2), Com(2), Com(2), Com(2)])), + // // 最終的な確認として、Termの並びが"In"consistentであることを調べる。 - Check(a, Pred::Not(Box::new(Pred::LogTermConsistency))), - - // 不整合が実際に生じるのは、reboot時のエラーチェックである。 - // panicを確認するには、次のコマンド2つを実行すれば良い。 - // (既に直前のCheckで不整合が明らかになっているので実行しない) - // Reboot(a, _cluster), Step(a) + // Check(a, Pred::Not(Box::new(Pred::LogTermConsistency))), ], &mut service, ); } + /// Logの削除処理を実装していない場合には、 /// あるノードで, /// rawlogの一部分だけを書き換えた結果として、 /// termの昇順整列制約が敗れることを確認する。 /// 上のscenario1ではsnapshot, rawlog間でズレが生じていたが /// rawlog単体でもズレが生じることを示す。 + /// (削除処理を実装している場合は問題ない) /// /// このテストではノードaで不整合が起こることを確認する。 #[test] @@ -463,9 +488,9 @@ mod test { // bとcの間のやりとりで、bを新しいリーダーにする Timeout(b), Timeout(c), - RunAllUntilStabilize, + StepAll(100), Timeout(b), // b を leaderにする - RunAllUntilStabilize, + StepAll(100), Check(a, Pred::IsLeader), Check(b, Pred::IsLeader), @@ -478,20 +503,21 @@ mod test { RecvAllow(c, a), // bからハートビートを送る - Heartbeat(b), RunAllUntilStabilize, + Heartbeat(b), StepAll(100), // aがstaleしているため - // bの長さ2のrawlogにより上書きされるが - // rawlog[1].term = 4 と rawlog[2].term = 2 で - // 不整合が生じている - Check(a, Pred::RawLogIs(0, 0, vec![Noop(2), Noop(4), Com(2)])), - - // 最終的な確認として、Termの並びが"In"consistentであることを調べる。 - Check(a, Pred::Not(Box::new(Pred::LogTermConsistency))), - - // この不整合は、実際にreboot処理を行うと - // raftlog内部でエラーになる - // Reboot(a, _cluster), Step(a), + // bの長さ2のrawlogにより上書きされる。 + + // **FollowerDeleteを実装している場合** + // 以下のように適切にログが整理される。 + Check(a, Pred::RawLogIs(0, 0, vec![Noop(2), Noop(4)])), + + // **FollowerDeleteを実装していない場合(以前のraftlog)** + // rawlog[1].term = 4 と rawlog[2].term = 2 で不整合が生じる + // Check(a, Pred::RawLogIs(0, 0, vec![Noop(2), Noop(4), Com(2)])), + // + // この時、Termの並びが"In"consistentになる。 + // Check(a, Pred::Not(Box::new(Pred::LogTermConsistency))), ], &mut service, ); diff --git a/src/test_dsl/impl_io.rs b/src/test_dsl/impl_io.rs index cdefc93..2cacfb5 100644 --- a/src/test_dsl/impl_io.rs +++ b/src/test_dsl/impl_io.rs @@ -135,6 +135,8 @@ pub enum IOEvent { SaveSuffix, /// ログを読み込んだ LoadLog, + /// ログのSuffixを削除した + DeleteSuffix, } /// `Io`-traitを実装する構造体 @@ -154,7 +156,10 @@ pub struct TestIo { /// これまでに保存したballot全体 ballots: Vec, + /// 永続化されているスナップショット(prefix) snapshot: Option, + + /// 永続化されている非スナップショットなログ(suffix) rawlog: Option, /// このIoを保持するnodeをタイムアウトさせたい時に @@ -330,8 +335,16 @@ impl Io for TestIo { if let Some(rawlog) = &mut self.rawlog { // prefixは手持ちのrawlogの先頭部分を被覆している assert!(rawlog.head.index <= prefix.tail.index); - // 被覆されている部分をskipする - assert!(rawlog.skip_to(prefix.tail.index).is_ok()); + + if prefix.tail.index <= rawlog.tail().index { + // 新しいprefixによって削除するべき部分がある場合は + // 被覆されている部分をskipする + assert!(rawlog.skip_to(prefix.tail.index).is_ok()); + } else { + // 新しいprefixがrawlogより長い場合には + // skip_to が使えないので直接clearする + *rawlog = LogSuffix::default(); + } } else { unreachable!("the argument `prefix` contains invalid entries"); } @@ -362,6 +375,29 @@ impl Io for TestIo { LogSaver(SaveMode::RawLogSave, self.node_id.clone()) } + type DeleteLog = LogDeleter; + fn delete_suffix_from(&mut self, from: LogIndex) -> Self::DeleteLog { + self.io_events.push(IOEvent::DeleteSuffix); + + if self.debug { + println!("DELETE RAWLOG from : {:?}", &from); + } + + if let Some(rawlog) = &mut self.rawlog { + assert!(rawlog.head.index <= from); + rawlog + .truncate(from) + .expect("should not make an error in test senarios"); + } else { + unreachable!("don't come here in test scenarios"); + } + + // 削除が完了するまでに(見かけ上)必要なターン数 + // 5にしているが深い意味はない + const REQUIRED_TURNS: usize = 5; + LogDeleter(self.node_id.clone(), REQUIRED_TURNS) + } + type LoadLog = LogLoader; fn load_log(&mut self, start: LogIndex, end: Option) -> Self::LoadLog { self.io_events.push(IOEvent::LoadLog); @@ -527,6 +563,26 @@ impl Future for LogSaver { } } +/// ログ削除を表現するためのfuture(の実体) +pub struct LogDeleter( + pub NodeId, + usize, /* #turns required to finish deleting */ +); +impl Future for LogDeleter { + type Item = (); + type Error = Error; + fn poll(&mut self) -> Poll { + if self.1 == 0 { + // 削除終了 + Ok(Async::Ready(())) + } else { + // 完了までに必要なターンを減らす + self.1 -= 1; + Ok(Async::NotReady) + } + } +} + /// 票の保存を表現するためのfuture(の実体) pub struct BallotSaver(pub NodeId, pub Ballot); diff --git a/src/test_util.rs b/src/test_util.rs index cec10ff..5296b49 100644 --- a/src/test_util.rs +++ b/src/test_util.rs @@ -109,6 +109,7 @@ pub mod tests { type SaveLog = NoopSaveLog; type LoadLog = LoadLogImpl; type Timeout = FibersTimeout; + type DeleteLog = NoopSaveLog; fn try_recv_message(&mut self) -> Result> { Ok(None) @@ -133,6 +134,10 @@ pub mod tests { NoopSaveLog } + fn delete_suffix_from(&mut self, _: LogIndex) -> Self::SaveLog { + panic!("do not use this in test codes OR please write the adequate code here"); + } + fn load_log(&mut self, start: LogIndex, end: Option) -> Self::LoadLog { let mut logs = self.logs.lock().expect("Never fails"); if let Some(log) = logs.remove(&(start, end)) { From 9aea0b16c92eaab71edd1df3a879b17dbba81565 Mon Sep 17 00:00:00 2001 From: yuuya uezato Date: Fri, 10 Sep 2021 09:41:27 +0900 Subject: [PATCH 3/5] =?UTF-8?q?history=E3=81=A8log=E3=81=AB=E3=82=BA?= =?UTF-8?q?=E3=83=AC=E3=81=8C=E3=81=82=E3=82=8B=E7=8A=B6=E6=85=8B=E3=81=A7?= =?UTF-8?q?=E3=81=AE=E7=8A=B6=E6=85=8B=E9=81=B7=E7=A7=BB=E3=82=92=E9=98=B2?= =?UTF-8?q?=E3=81=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/node_state/common/mod.rs | 33 +++++++++++++++++++++++++++++++ src/node_state/follower/delete.rs | 29 +++++++++++++++++++++++++-- src/node_state/follower/mod.rs | 13 +++++++++++- src/node_state/mod.rs | 9 ++++++--- src/test_dsl/dsl.rs | 2 +- src/test_dsl/impl_io.rs | 2 +- 6 files changed, 80 insertions(+), 8 deletions(-) diff --git a/src/node_state/common/mod.rs b/src/node_state/common/mod.rs index 69456cc..ca5d768 100644 --- a/src/node_state/common/mod.rs +++ b/src/node_state/common/mod.rs @@ -28,6 +28,18 @@ pub struct Common { load_committed: Option, install_snapshot: Option>, metrics: NodeStateMetrics, + + // ストレージ中のlogに対する削除処理が + // 進行中であるかどうかを表すフラグ。 + // + // このフラグが true である場合は + // ストレージ中のlogと + // メモリ中のcacheに相当する`history`とでズレが生じている。 + // false である場合は、2つは一致している。 + // + // 削除処理を行う箇所: + // * FollowerDelete(Followerのsubstate) + log_is_being_deleted: bool, } impl Common where @@ -53,6 +65,7 @@ where load_committed: None, install_snapshot: None, metrics, + log_is_being_deleted: false, } } @@ -326,6 +339,13 @@ where return HandleMessageResult::Handled(None); } + // このif文以降の計算では、historyに基づき状態を遷移させる。 + // 一方で、ストレージ上のlogと食い違ったhistoryで遷移を行うと問題が生じるため、 + // それを阻止するべく、logに対する削除中の場合には何もしない。 + if self.log_is_being_deleted { + return HandleMessageResult::Handled(None); + } + self.local_node.ballot.term = message.header().term; let next_state = if let Message::RequestVoteCall(m) = message { if m.log_tail.is_newer_or_equal_than(self.history.tail()) { @@ -363,6 +383,14 @@ where HandleMessageResult::Handled(None) } Message::AppendEntriesCall { .. } if !self.is_following_sender(&message) => { + // このclauseに入る + // <=> + // "自分は何かのnodeに投票して待機している状態で、 + // 自分が投票したnode以外がLeaderとなり計算を開始していることに気づく" + // ということなので、絶対に削除処理は進行中ではない。 + // よって安全に状態遷移を行うことができる。 + debug_assert!(!self.log_is_being_deleted); + // リーダが確定したので、フォロー先を変更する let leader = message.header().sender.clone(); self.unread_message = Some(message); @@ -422,6 +450,11 @@ where RpcCallee::new(self, caller) } + /// ストレージにあるlogに対する削除処理の開始・終了を管理する。 + pub fn set_if_log_is_being_deleted(&mut self, deleting: bool) { + self.log_is_being_deleted = deleting; + } + fn handle_committed(&mut self, suffix: LogSuffix) -> Result<()> { let new_tail = suffix.tail(); for (index, entry) in (suffix.head.index.as_u64()..) diff --git a/src/node_state/follower/delete.rs b/src/node_state/follower/delete.rs index e2a75c0..f56db60 100644 --- a/src/node_state/follower/delete.rs +++ b/src/node_state/follower/delete.rs @@ -11,7 +11,13 @@ pub struct FollowerDelete { future: IO::DeleteLog, from: LogPosition, message: AppendEntriesCall, + + // 削除処理中にtimeoutしたかどうかを記録するフラグ。 + // trueの場合は、削除処理後にcandidateに遷移する。 + // falseの場合は、FollowerIdleに遷移する。 + timeouted: bool, } + impl FollowerDelete { pub fn new(common: &mut Common, from: LogPosition, message: AppendEntriesCall) -> Self { let future = common.delete_suffix_from(from.index); @@ -19,6 +25,7 @@ impl FollowerDelete { future, from, message, + timeouted: false, } } pub fn handle_message( @@ -32,18 +39,36 @@ impl FollowerDelete { Ok(None) } pub fn run_once(&mut self, common: &mut Common) -> Result> { + // logに対する削除が進行中であることを + // commonに通知(フラグをセット)する。 + common.set_if_log_is_being_deleted(true); + if let Async::Ready(_) = track!(self.future.poll())? { track!(common.handle_log_rollbacked(self.from))?; + + // logに対する削除が完了し + // common.historyとlogが一致したので + // commonに通知する。 + common.set_if_log_is_being_deleted(false); + common .rpc_callee(&self.message.header) .reply_append_entries(self.from); - let next = Follower::Idle(FollowerIdle::new()); - Ok(Some(RoleState::Follower(next))) + if self.timeouted { + Ok(Some(common.transit_to_candidate())) + } else { + let next = Follower::Idle(FollowerIdle::new()); + Ok(Some(RoleState::Follower(next))) + } } else { Ok(None) } } + /// 削除処理中にtimeoutが発生した場合にそれを記録するためのメソッド + pub fn set_timeout(&mut self) { + self.timeouted = true; + } } #[cfg(test)] diff --git a/src/node_state/follower/mod.rs b/src/node_state/follower/mod.rs index e53cee9..2e3b4ef 100644 --- a/src/node_state/follower/mod.rs +++ b/src/node_state/follower/mod.rs @@ -44,7 +44,18 @@ impl Follower { } pub fn handle_timeout(&mut self, common: &mut Common) -> Result> { match self { - Follower::Delete(_) => { + Follower::Delete(delete) => { + // Delete中にタイムアウトしたことを記録する。 + // これによって削除完了後にはcandidateに遷移するようになる。 + // + // * IMPORTANT REMARK * + // 削除後にcandidateに遷移する振る舞いにしているのは + // `Io`トレイではタイマーに周期性を要求していないからである。 + // もし非周期的なタイマー(一度だけ発火するタイマー)が使われている場合に、 + // かつ、このような遷移処理を行わない場合では、 + // 極端な状況で全員がFollowerになりクラスタが硬直する。 + delete.set_timeout(); + // Delete中はタイムアウトしても削除処理を続行する。 // もしタイムアウトによってキャンセルした場合は // follower/delete.rs にある diff --git a/src/node_state/mod.rs b/src/node_state/mod.rs index d535e53..4ee8cb8 100644 --- a/src/node_state/mod.rs +++ b/src/node_state/mod.rs @@ -48,9 +48,12 @@ impl NodeState { self.role.is_loader() } pub fn start_election(&mut self) { - if let RoleState::Follower(_) = self.role { - let next = self.common.transit_to_candidate(); - self.handle_role_change(next); + if let RoleState::Follower(follower) = &mut self.role { + let next = follower.handle_timeout(&mut self.common); + let next = track_try_unwrap!(next); + if let Some(next) = next { + self.handle_role_change(next); + } } } fn handle_timeout(&mut self) -> Result>> { diff --git a/src/test_dsl/dsl.rs b/src/test_dsl/dsl.rs index 7b02eb0..3b10d5e 100644 --- a/src/test_dsl/dsl.rs +++ b/src/test_dsl/dsl.rs @@ -265,7 +265,7 @@ fn interpret_command(c: Command, service: &mut Service) { StepAll(iter) => { for _ in 0..iter { for (_n, io) in service.iter_mut() { - io.poll().expect("never fails in test senarios"); + io.poll().expect("never fails in test scenarios"); } } } diff --git a/src/test_dsl/impl_io.rs b/src/test_dsl/impl_io.rs index 2cacfb5..32dda3c 100644 --- a/src/test_dsl/impl_io.rs +++ b/src/test_dsl/impl_io.rs @@ -387,7 +387,7 @@ impl Io for TestIo { assert!(rawlog.head.index <= from); rawlog .truncate(from) - .expect("should not make an error in test senarios"); + .expect("should not make an error in test scenarios"); } else { unreachable!("don't come here in test scenarios"); } From ba7659adf1027be8b7cc8fc2305af335760c9de1 Mon Sep 17 00:00:00 2001 From: yuuya uezato Date: Fri, 10 Sep 2021 14:55:18 +0900 Subject: [PATCH 4/5] =?UTF-8?q?=E3=83=86=E3=82=B9=E3=83=88=E3=82=B1?= =?UTF-8?q?=E3=83=BC=E3=82=B9=E3=81=AE=E8=AA=AC=E6=98=8E=E6=96=87=E3=82=92?= =?UTF-8?q?=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/node_state/follower/delete.rs | 60 ++++++++++++++++++------------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/src/node_state/follower/delete.rs b/src/node_state/follower/delete.rs index f56db60..fe15bf0 100644 --- a/src/node_state/follower/delete.rs +++ b/src/node_state/follower/delete.rs @@ -110,9 +110,10 @@ mod test { // bとcは新しいTermへ移る準備(aのfollowを外す) Timeout(b), Timeout(c), RunAllUntilStabilize, - // bを新しいリーダーにする + // bを新しいリーダーにするための準備 Timeout(b), + // bにvoteさせてリーダーとするために、 // bとcだけ適当な回数計算を進める Step(b), Step(c), Step(b), Step(c), @@ -121,9 +122,11 @@ mod test { Step(b), Step(c), Step(b), Step(c), - // cを独立させる + // cには、bへのvoteはさせるが + // 一方で(後でaとリーダーを競わせる都合上) + // bからリーダーになった通知であるNoopを受け取りたくないので + // 適当なタイミングで通信を閉じる。 RecvBan(c, b), - RecvBan(c, a), RunAllUntilStabilize, // 想定している状況になっていることを確認する @@ -132,36 +135,43 @@ mod test { Check(c, Pred::IsFollower), Check(a, Pred::RawLogIs(0, 0, vec![Noop(2), Com(2), Com(2), Com(2)])), Check(b, Pred::RawLogIs(0, 0, vec![Noop(2), Noop(4)])), - Check(c, Pred::RawLogIs(0, 0, vec![Noop(2)])), // Noop(4)がないことに注意 + Check(c, Pred::RawLogIs(0, 0, vec![Noop(2)])), - // a<->b は通信可能にする + // ここからは a で log と history が(削除処理中に)ズレる状況を作る + // まず a<->b は通信可能にする RecvAllow(b, a), RecvAllow(a, b), // bからハートビートを送る Heartbeat(b), - // メモリ上のlogとdisk上のlogにgapが生じるところまで進める + // aとbに対して計算を行い、 + // a側で削除処理が走り + // historyとlogにズレが起こるところまで進める Step(b), Step(a), Step(b), Step(a), Step(b), Step(a), Step(b), Step(a), - - // これは a のメモリ上のlogの終端位置が - // 4にあって、その直前のtermが2であることを意味している - Check(a, Pred::HistoryTail(2, 4)), - - // 一方で、既にDiskからは削除済みである + + // aにおいてズレが起こっていることを確認する。 + // + // まず、Disk上のlogについては削除完了している。 Check(a, Pred::RawLogIs(0, 0, vec![Noop(2)])), + // + // 一方で、 historyから見ると + // 終端位置が4で、その直前のtermが2のままである。 + // すなわちまだhistoryが更新されておらず、 + // aでlogとhistoryがズレている。 + Check(a, Pred::HistoryTail(2, 4)), - // この状態のまま、aがタイムアウトしてしまうと - // 上の食い違った状態でleaderになろうとする。 + // この状態のまま、aがcandidateになれてしまうと、 + // 上の食い違った状態でleaderになれる。 // - // follower/mod.rs において handle_timeout で - // Delete中のタイムアウトは禁止している場合は - // このような場合を防ぐためであり、 - // 以下のコードで問題は起きない。 + // Delete中のタイムアウトで即座にCandidateへ遷移させない場合は + // 勿論問題にならない。 // - // 一方で、タイムアウトを許す場合は + // 一方で、 + // follower/mod.rs の handle_timeout を編集し、 + // タイムアウトを許すようにコードを変更した場合には、 // 以下のコード(最後のStepAll)で問題が起こる。 // まず a<->c 間だけ通信を可能にする @@ -179,12 +189,14 @@ mod test { // そのあと、aがLeaderになれるようにTermを増やす手助け Timeout(a), - // Delete中のタイムアウトを許していない場合 => aはfollowerのまま, cはcandidate - // Delete中のタイムアウトを許している場合 => a も c も candidate になる + // ここまででaをLeaderにする準備が整った。 + // + // Delete中のタイムアウトによる遷移を + // 許さない場合 => aはfollowerのまま, cはcandidateなので問題がない + // 許す場合 => aがcandidateになった後、ズレたままleaderになる。 // - // Delete中の「タイムアウトを許可している場合」は、 - // aがleaderとなった後で、 - // メモリ上のlogと実体との差に起因するエラーが生じる。 + // 後者の場合は、aがleaderとなった後で、 + // historyとlogのズレに起因するエラーが生じる。 // // 発生するエラーについて: // 今回は `impl_io::over_write` で From 159c6a6ea2ecb2bb171812d71d11b7bcd13fbf5e Mon Sep 17 00:00:00 2001 From: yuuya uezato Date: Tue, 21 Sep 2021 14:53:28 +0900 Subject: [PATCH 5/5] =?UTF-8?q?assertion=E3=81=AE=E6=88=90=E7=AB=8B?= =?UTF-8?q?=E3=81=AB=E9=96=A2=E3=81=99=E3=82=8B=E3=82=B3=E3=83=A1=E3=83=B3?= =?UTF-8?q?=E3=83=88=E3=81=AE=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/node_state/common/mod.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/node_state/common/mod.rs b/src/node_state/common/mod.rs index ca5d768..6ec5274 100644 --- a/src/node_state/common/mod.rs +++ b/src/node_state/common/mod.rs @@ -383,12 +383,24 @@ where HandleMessageResult::Handled(None) } Message::AppendEntriesCall { .. } if !self.is_following_sender(&message) => { - // このclauseに入る - // <=> - // "自分は何かのnodeに投票して待機している状態で、 - // 自分が投票したnode以外がLeaderとなり計算を開始していることに気づく" - // ということなので、絶対に削除処理は進行中ではない。 - // よって安全に状態遷移を行うことができる。 + /* + * この節に入るときには削除処理中ではない。なぜなら…… + * + * 1. 自分と同じTerm Tからメッセージが届き + * かつ、それがAppendEntriesCall (AE-call) であるということは + * そのメッセージの送り主 N が T のリーダーである。 + * + * 2. 非リーダーである自分については、 + * いま T にいる以上、Term S (S < T) から遷移してTになっている。 + * logに対する処理中でhistoryとズレている場合は遷移を遅延させるので、 + * T になった"時点"では、logに対する変更は行われていない。 + * + * 3. S から T になって以降は AE-call は受け取っていない。 + * (受け取っているなら N をfollowしている筈なので矛盾) + * Term T での主な計算(logへの変更も含む)は + * 最初の AE-call を受け取ってから開始するので、 + * T になった"以降"も、一度もlogに対する変更は行われていない。 + */ debug_assert!(!self.log_is_being_deleted); // リーダが確定したので、フォロー先を変更する