Skip to content

Commit

Permalink
Followerでのhandle_timeoutでDelete中のタイムアウトは禁止する。禁止しない場合に問題と なる検証コードも付記
Browse files Browse the repository at this point in the history
  • Loading branch information
yuezato committed Aug 17, 2021
1 parent e60ba99 commit da95d11
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 13 deletions.
1 change: 0 additions & 1 deletion src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ pub trait Io {
/// ただし、`start`とは異なる位置から、エントリの取得を開始することは許可されない.
fn load_log(&mut self, start: LogIndex, end: Option<LogIndex>) -> Self::LoadLog;


/// ローカルログのうち末尾部分について、
/// 指定された位置 `from` を含むそれ以降 [from..) を全て削除する.
fn delete_suffix_from(&mut self, from: LogIndex) -> Self::DeleteLog;
Expand Down
132 changes: 131 additions & 1 deletion src/node_state/follower/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{Io, Result};
pub struct FollowerDelete<IO: Io> {
future: IO::DeleteLog,
from: LogPosition,
message: AppendEntriesCall,
message: AppendEntriesCall,
}
impl<IO: Io> FollowerDelete<IO> {
pub fn new(common: &mut Common<IO>, from: LogPosition, message: AppendEntriesCall) -> Self {
Expand Down Expand Up @@ -45,3 +45,133 @@ impl<IO: Io> FollowerDelete<IO> {
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::test_dsl::dsl::*;
use crate::test_dsl::impl_io::*;
use crate::test_dsl::*;

#[test]
#[rustfmt::skip]
fn delete_test_scenario1() {
use Command::*;
use LogEntry::*;

let a = NodeName(0);
let b = NodeName(1);
let c = NodeName(2);
let (mut service, _cluster) = build_complete_graph(&[a, b, c]);

interpret(
&vec![
RunAllUntilStabilize,
Timeout(a),
RunAllUntilStabilize,
// ここまでで a がリーダーになっている

// 実際にリーダーになっていることを確認する
Check(a, Pred::IsLeader),
Check(b, Pred::IsFollower),
Check(c, Pred::IsFollower),

RecvBan(a, b), RecvBan(a, c), // aはbからもcからも受け取らない
RecvBan(b, a), // bはaからは受け取らない
RecvBan(c, a), // cはaからは受け取らない

// aが孤立している状況で
// データをproposeすることで
// aにデータをためる
Propose(a), Propose(a), Propose(a),

// bとcは新しいTermへ移る準備(aのfollowを外す)
Timeout(b), Timeout(c), RunAllUntilStabilize,

// bを新しいリーダーにする
Timeout(b),

// bとcだけ適当な回数計算を進める
Step(b), Step(c),
Step(b), Step(c),
Step(b), Step(c),
Step(b), Step(c),
Step(b), Step(c),
Step(b), Step(c),

// cを独立させる
RecvBan(c, b),
RecvBan(c, a),
RunAllUntilStabilize,

// 想定している状況になっていることを確認する
Check(a, Pred::IsLeader),
Check(b, Pred::IsLeader),
Check(c, Pred::IsFollower),
Check(a, Pred::RawLogIs(0, 0, vec![Noop(2), Com(2), Com(2), Com(2)])),
Check(b, Pred::RawLogIs(0, 0, vec![Noop(2), Noop(4)])),
Check(c, Pred::RawLogIs(0, 0, vec![Noop(2)])), // Noop(4)がないことに注意

// a<->b は通信可能にする
RecvAllow(b, a),
RecvAllow(a, b),

// bからハートビートを送る
Heartbeat(b),
// メモリ上のlogとdisk上のlogにgapが生じるところまで進める
Step(b), Step(a),
Step(b), Step(a),
Step(b), Step(a),
Step(b), Step(a),

// これは a のメモリ上のlogの終端位置が
// 4にあって、その直前のtermが2であることを意味している
Check(a, Pred::HistoryTail(2, 4)),

// 一方で、既にDiskからは削除済みである
Check(a, Pred::RawLogIs(0, 0, vec![Noop(2)])),

// この状態のまま、aがタイムアウトしてしまうと
// 上の食い違った状態でleaderになろうとする。
//
// follower/mod.rs において handle_timeout で
// Delete中のタイムアウトは禁止している場合は
// このような場合を防ぐためであり、
// 以下のコードで問題は起きない。
//
// 一方で、タイムアウトを許す場合は
// 以下のコード(最後のRunAllUntilStabilize)で問題が起こる。

// まず a<->c 間だけ通信を可能にする
RecvAllow(a, c),
RecvAllow(c, a),
RecvBan(b, a),
RecvBan(b, c),
RecvBan(c, b),
RecvBan(a, b),

Timeout(a), Timeout(c), RunAllUntilStabilize,
Timeout(a),

// Delete中のタイムアウトを許していない場合 => aはfollowerのまま, cはcandidate
// Delete中のタイムアウトを許している場合 => a も c も candidate になる
//
// Delete中の「タイムアウトを許可している場合」は、
// aがleaderとなった後で、
// メモリ上のlogと実体との差に起因するエラーが生じる。
//
// 発生するエラーについて:
// 今回は `impl_io::over_write` で
// Disk上で「連続しないlog」を作成しようとしてエラーとなる。
//
// RaftlogではDisk上のlogは
// 論理上連続していることが仮定されているが
// (IO traitのload_log methodの引数を参照せよ)
// 仮にエラーとなる部分のassertionを外したとしても、
// 存在しない領域へのloadが発生し、どのみちエラーになる。
RunAllUntilStabilize,
],
&mut service
);
}
}
2 changes: 1 addition & 1 deletion src/node_state/follower/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::marker::PhantomData;
use trackable::error::ErrorKindExt;

use super::super::{Common, NextState, RoleState};
use super::{Follower, FollowerAppend, FollowerSnapshot, FollowerDelete};
use super::{Follower, FollowerAppend, FollowerDelete, FollowerSnapshot};
use crate::log::{LogPosition, LogSuffix};
use crate::message::{AppendEntriesCall, Message};
use crate::{ErrorKind, Io, Result};
Expand Down
16 changes: 13 additions & 3 deletions src/node_state/follower/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use self::append::FollowerAppend;
use self::delete::FollowerDelete;
use self::idle::FollowerIdle;
use self::init::FollowerInit;
use self::snapshot::FollowerSnapshot;
use self::delete::FollowerDelete;
use super::{Common, NextState};
use crate::election::Role;
use crate::message::{Message, MessageHeader};
use crate::{Io, Result};

mod append;
mod delete;
mod idle;
mod init;
mod snapshot;
mod delete;

/// 別の人(ノード)に投票しているフォロワー.
///
Expand Down Expand Up @@ -43,7 +43,17 @@ impl<IO: Io> Follower<IO> {
Follower::Init(follower)
}
pub fn handle_timeout(&mut self, common: &mut Common<IO>) -> Result<NextState<IO>> {
Ok(Some(common.transit_to_candidate()))
match self {
Follower::Delete(_) => {
// Delete中はタイムアウトしても削除処理を続行する。
// もしタイムアウトによってキャンセルした場合は
// follower/delete.rs にある
// delete_test_scenario1 でプログラムが異常終了する。
// 詳しくは当該テストを参考のこと。
Ok(None)
}
_ => Ok(Some(common.transit_to_candidate())),
}
}
pub fn handle_message(
&mut self,
Expand Down
24 changes: 17 additions & 7 deletions src/test_dsl/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use std::collections::{BTreeMap, BTreeSet};

use crate::cluster::ClusterMembers;
use crate::election::Role;
use crate::log::{self, LogPosition};
use crate::election::{Role, Term};
use crate::log::{self, LogIndex, LogPosition};
use crate::node::NodeId;
use crate::test_dsl::impl_io::*;
use crate::ReplicatedLog;
Expand All @@ -16,7 +16,7 @@ use std::fmt;
/// DSL中でノード名を表すために用いる構造体
/// 現時点ではu8のnewtypeに過ぎない
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub struct NodeName(u8);
pub struct NodeName(pub u8);

impl fmt::Display for NodeName {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand Down Expand Up @@ -67,6 +67,9 @@ pub enum Pred {
/// 1. snapshotとrawlogが正しく接合している
/// 2. rawlogのtermが昇順になっている
LogTermConsistency,

/// Historyのtailを調べる
HistoryTail(u64 /* term */, u64 /* index */),
}

/// 引数`rlog`で表される特定のノードが、述語`pred`を満たすかどうかを調べる
Expand Down Expand Up @@ -152,6 +155,13 @@ fn check(rlog: &ReplicatedLog<TestIo>, pred: Pred) -> bool {
false
}
}
HistoryTail(term, pos) => {
rlog.local_history().tail()
== LogPosition {
prev_term: Term::new(term),
index: LogIndex::new(pos),
}
}
}
}

Expand Down Expand Up @@ -219,7 +229,7 @@ pub fn interpret(cs: &[Command], service: &mut Service) {

/// DSLの一つのコマンドを実行する関数
fn interpret_command(c: Command, service: &mut Service) {
use crate::futures::Stream;
use futures::Stream;
use Command::*;

println!("\n Now executing {:?} ...", &c);
Expand All @@ -232,7 +242,7 @@ fn interpret_command(c: Command, service: &mut Service) {
}
Check(node, pred) => {
let rlog = service.get_mut(&node).unwrap();
assert!(check(&rlog, pred));
assert!(check(rlog, pred));
}
Heartbeat(node) => {
let rlog = service.get_mut(&node).unwrap();
Expand Down Expand Up @@ -313,10 +323,10 @@ pub fn build_complete_graph(names: &[NodeName]) -> (Service, ClusterMembers) {
}

for src in &nodes {
let mut io_src = ios.remove(&src).unwrap();
let mut io_src = ios.remove(src).unwrap();
for dst in &nodes {
if src != dst {
let io_dst = ios.get(&dst).unwrap();
let io_dst = ios.get(dst).unwrap();
io_src.set_channel(dst.clone(), io_dst.copy_sender());
}
}
Expand Down
39 changes: 39 additions & 0 deletions src/test_dsl/impl_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ pub enum IOEvent {
SaveSuffix,
/// ログを読み込んだ
LoadLog,
/// ログのSuffixを削除した
DeleteSuffix,
}

/// `Io`-traitを実装する構造体
Expand Down Expand Up @@ -362,6 +364,29 @@ impl Io for TestIo {
LogSaver(SaveMode::RawLogSave, self.node_id.clone())
}

type DeleteLog = LogDeleter;
fn delete_suffix_from(&mut self, from: LogIndex) -> Self::DeleteLog {
self.io_events.push(IOEvent::DeleteSuffix);

if self.debug {
println!("DELETE RAWLOG from : {:?}", &from);
}

if let Some(rawlog) = &mut self.rawlog {
assert!(rawlog.head.index <= from);
rawlog
.truncate(from)
.expect("should not make an error in test senarios");
} else {
unreachable!("don't come here in the test code");
}

// 削除が完了するまでに(見かけ上)必要なターン数
// 5にしているが深い意味はない
const REQUIRED_TURNS: usize = 5;
LogDeleter(self.node_id.clone(), REQUIRED_TURNS)
}

type LoadLog = LogLoader;
fn load_log(&mut self, start: LogIndex, end: Option<LogIndex>) -> Self::LoadLog {
self.io_events.push(IOEvent::LoadLog);
Expand Down Expand Up @@ -527,6 +552,20 @@ impl Future for LogSaver {
}
}

/// ログ削除を表現するためのfuture(の実体)
pub struct LogDeleter(pub NodeId, usize /* required turns until finish */);
impl Future for LogDeleter {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.1 == 0 {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}

/// 票の保存を表現するためのfuture(の実体)
pub struct BallotSaver(pub NodeId, pub Ballot);

Expand Down
5 changes: 5 additions & 0 deletions src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub mod tests {
type SaveLog = NoopSaveLog;
type LoadLog = LoadLogImpl;
type Timeout = FibersTimeout;
type DeleteLog = NoopSaveLog;

fn try_recv_message(&mut self) -> Result<Option<Message>> {
Ok(None)
Expand All @@ -133,6 +134,10 @@ pub mod tests {
NoopSaveLog
}

fn delete_suffix_from(&mut self, _: LogIndex) -> Self::SaveLog {
panic!("do not use this in test codes OR please write the adequate code here");
}

fn load_log(&mut self, start: LogIndex, end: Option<LogIndex>) -> Self::LoadLog {
let mut logs = self.logs.lock().expect("Never fails");
if let Some(log) = logs.remove(&(start, end)) {
Expand Down

0 comments on commit da95d11

Please sign in to comment.