Skip to content

suffixの明示的な削除コードを追加(issue18対策) #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub trait Io {
/// ローカルログを取得するための`Future`.
type LoadLog: Future<Item = Log, Error = Error>;

/// ローカルログの末尾部分を削除するための`Future`.
type DeleteLog: Future<Item = (), Error = Error>;

/// タイムアウトを表現するための`Future`.
type Timeout: Future<Item = (), Error = Error>;

Expand Down Expand Up @@ -83,6 +86,10 @@ pub trait Io {
/// ただし、`start`とは異なる位置から、エントリの取得を開始することは許可されない.
fn load_log(&mut self, start: LogIndex, end: Option<LogIndex>) -> Self::LoadLog;

/// ローカルログのうち末尾部分について、
/// 指定された位置 `from` を含むそれ以降 [from..) を全て削除する.
fn delete_suffix_from(&mut self, from: LogIndex) -> Self::DeleteLog;

/// 選挙における役割に応じた時間のタイムアウトオブジェクトを生成する.
fn create_timeout(&mut self, role: Role) -> Self::Timeout;

Expand Down
38 changes: 38 additions & 0 deletions src/node_state/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ pub struct Common<IO: Io> {
load_committed: Option<IO::LoadLog>,
install_snapshot: Option<InstallSnapshot<IO>>,
metrics: NodeStateMetrics,

// ストレージ中のlogに対する削除処理が
// 進行中であるかどうかを表すフラグ。
//
// このフラグが true である場合は
// ストレージ中のlogと
// メモリ中のcacheに相当する`history`とでズレが生じている。
// false である場合は、2つは一致している。
//
// 削除処理を行う箇所:
// * FollowerDelete(Followerのsubstate)
log_is_being_deleted: bool,
}
impl<IO> Common<IO>
where
Expand All @@ -53,6 +65,7 @@ where
load_committed: None,
install_snapshot: None,
metrics,
log_is_being_deleted: false,
}
}

Expand Down Expand Up @@ -244,6 +257,11 @@ where
self.io.load_log(start, end)
}

/// `from`以降のsuffixエントリ [from..) を削除する
pub fn delete_suffix_from(&mut self, from: LogIndex) -> IO::DeleteLog {
self.io.delete_suffix_from(from)
}

/// ローカルログの末尾部分に`suffix`を追記する.
pub fn save_log_suffix(&mut self, suffix: &LogSuffix) -> IO::SaveLog {
self.io.save_log_suffix(suffix)
Expand Down Expand Up @@ -321,6 +339,13 @@ where
return HandleMessageResult::Handled(None);
}

// このif文以降の計算では、historyに基づき状態を遷移させる。
// 一方で、ストレージ上のlogと食い違ったhistoryで遷移を行うと問題が生じるため、
// それを阻止するべく、logに対する削除中の場合には何もしない。
if self.log_is_being_deleted {
return HandleMessageResult::Handled(None);
}

self.local_node.ballot.term = message.header().term;
let next_state = if let Message::RequestVoteCall(m) = message {
if m.log_tail.is_newer_or_equal_than(self.history.tail()) {
Expand Down Expand Up @@ -358,6 +383,14 @@ where
HandleMessageResult::Handled(None)
}
Message::AppendEntriesCall { .. } if !self.is_following_sender(&message) => {
// このclauseに入る
// <=>
// "自分は何かのnodeに投票して待機している状態で、
// 自分が投票したnode以外がLeaderとなり計算を開始していることに気づく"
// ということなので、絶対に削除処理は進行中ではない。
// よって安全に状態遷移を行うことができる。
debug_assert!(!self.log_is_being_deleted);

// リーダが確定したので、フォロー先を変更する
let leader = message.header().sender.clone();
self.unread_message = Some(message);
Expand Down Expand Up @@ -417,6 +450,11 @@ where
RpcCallee::new(self, caller)
}

/// ストレージにあるlogに対する削除処理の開始・終了を管理する。
pub fn set_if_log_is_being_deleted(&mut self, deleting: bool) {
self.log_is_being_deleted = deleting;
}

fn handle_committed(&mut self, suffix: LogSuffix) -> Result<()> {
let new_tail = suffix.tail();
for (index, entry) in (suffix.head.index.as_u64()..)
Expand Down
203 changes: 203 additions & 0 deletions src/node_state/follower/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
use futures::{Async, Future};

use super::super::{Common, NextState, RoleState};
use super::{Follower, FollowerIdle};
use crate::log::LogPosition;
use crate::message::{AppendEntriesCall, Message};
use crate::{Io, Result};

/// ローカルログの削除を行うサブ状態
pub struct FollowerDelete<IO: Io> {
future: IO::DeleteLog,
from: LogPosition,
message: AppendEntriesCall,

// 削除処理中にtimeoutしたかどうかを記録するフラグ。
// trueの場合は、削除処理後にcandidateに遷移する。
// falseの場合は、FollowerIdleに遷移する。
timeouted: bool,
}

impl<IO: Io> FollowerDelete<IO> {
pub fn new(common: &mut Common<IO>, from: LogPosition, message: AppendEntriesCall) -> Self {
let future = common.delete_suffix_from(from.index);
FollowerDelete {
future,
from,
message,
timeouted: false,
}
}
pub fn handle_message(
&mut self,
common: &mut Common<IO>,
message: Message,
) -> Result<NextState<IO>> {
if let Message::AppendEntriesCall(m) = message {
common.rpc_callee(&m.header).reply_busy();
}
Ok(None)
}
pub fn run_once(&mut self, common: &mut Common<IO>) -> Result<NextState<IO>> {
// logに対する削除が進行中であることを
// commonに通知(フラグをセット)する。
common.set_if_log_is_being_deleted(true);

if let Async::Ready(_) = track!(self.future.poll())? {
track!(common.handle_log_rollbacked(self.from))?;

// logに対する削除が完了し
// common.historyとlogが一致したので
// commonに通知する。
common.set_if_log_is_being_deleted(false);

common
.rpc_callee(&self.message.header)
.reply_append_entries(self.from);

if self.timeouted {
Ok(Some(common.transit_to_candidate()))
} else {
let next = Follower::Idle(FollowerIdle::new());
Ok(Some(RoleState::Follower(next)))
}
} else {
Ok(None)
}
}
/// 削除処理中にtimeoutが発生した場合にそれを記録するためのメソッド
pub fn set_timeout(&mut self) {
self.timeouted = true;
}
}

#[cfg(test)]
mod test {
use crate::test_dsl::dsl::*;

#[test]
#[rustfmt::skip]
fn delete_test_scenario1() {
use Command::*;
use LogEntry::*;

let a = NodeName(0);
let b = NodeName(1);
let c = NodeName(2);
let (mut service, _cluster) = build_complete_graph(&[a, b, c]);

interpret(
&vec![
RunAllUntilStabilize,
Timeout(a),
RunAllUntilStabilize,
// ここまでで a がリーダーになっている

// 実際にリーダーになっていることを確認する
Check(a, Pred::IsLeader),
Check(b, Pred::IsFollower),
Check(c, Pred::IsFollower),

RecvBan(a, b), RecvBan(a, c), // aはbからもcからも受け取らない
RecvBan(b, a), // bはaからは受け取らない
RecvBan(c, a), // cはaからは受け取らない

// aが孤立している状況で
// データをproposeすることで
// aにデータをためる
Propose(a), Propose(a), Propose(a),

// bとcは新しいTermへ移る準備(aのfollowを外す)
Timeout(b), Timeout(c), RunAllUntilStabilize,

// bを新しいリーダーにする
Timeout(b),

// bとcだけ適当な回数計算を進める
Step(b), Step(c),
Step(b), Step(c),
Step(b), Step(c),
Step(b), Step(c),
Step(b), Step(c),
Step(b), Step(c),

// cを独立させる
RecvBan(c, b),
RecvBan(c, a),
RunAllUntilStabilize,

// 想定している状況になっていることを確認する
Check(a, Pred::IsLeader),
Check(b, Pred::IsLeader),
Check(c, Pred::IsFollower),
Check(a, Pred::RawLogIs(0, 0, vec![Noop(2), Com(2), Com(2), Com(2)])),
Check(b, Pred::RawLogIs(0, 0, vec![Noop(2), Noop(4)])),
Check(c, Pred::RawLogIs(0, 0, vec![Noop(2)])), // Noop(4)がないことに注意

// a<->b は通信可能にする
RecvAllow(b, a),
RecvAllow(a, b),

// bからハートビートを送る
Heartbeat(b),
// メモリ上のlogとdisk上のlogにgapが生じるところまで進める
Step(b), Step(a),
Step(b), Step(a),
Step(b), Step(a),
Step(b), Step(a),

// これは a のメモリ上のlogの終端位置が
// 4にあって、その直前のtermが2であることを意味している
Check(a, Pred::HistoryTail(2, 4)),

// 一方で、既にDiskからは削除済みである
Check(a, Pred::RawLogIs(0, 0, vec![Noop(2)])),

// この状態のまま、aがタイムアウトしてしまうと
// 上の食い違った状態でleaderになろうとする。
//
// follower/mod.rs において handle_timeout で
// Delete中のタイムアウトは禁止している場合は
// このような場合を防ぐためであり、
// 以下のコードで問題は起きない。
//
// 一方で、タイムアウトを許す場合は
// 以下のコード(最後のStepAll)で問題が起こる。

// まず a<->c 間だけ通信を可能にする
RecvAllow(a, c),
RecvAllow(c, a),
RecvBan(b, a),
RecvBan(b, c),
RecvBan(c, b),
RecvBan(a, b),

// aとcをタイムアウトさせて十分に実行させることで
// 両者をcandidateにする
Timeout(a), Timeout(c), StepAll(100),

// そのあと、aがLeaderになれるようにTermを増やす手助け
Timeout(a),

// Delete中のタイムアウトを許していない場合 => aはfollowerのまま, cはcandidate
// Delete中のタイムアウトを許している場合 => a も c も candidate になる
//
// Delete中の「タイムアウトを許可している場合」は、
// aがleaderとなった後で、
// メモリ上のlogと実体との差に起因するエラーが生じる。
//
// 発生するエラーについて:
// 今回は `impl_io::over_write` で
// Disk上で「連続しないlog」を作成しようとしてエラーとなる。
//
// RaftlogではDisk上のlogは
// 論理上連続していることが仮定されているが
// (IO traitのload_log methodの引数を参照せよ)
// 仮にエラーとなる部分のassertionを外したとしても、
// 存在しない領域へのloadが発生し、どのみちエラーになる。
StepAll(100),
],
&mut service
);
}
}
10 changes: 3 additions & 7 deletions src/node_state/follower/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::marker::PhantomData;
use trackable::error::ErrorKindExt;

use super::super::{Common, NextState, RoleState};
use super::{Follower, FollowerAppend, FollowerSnapshot};
use super::{Follower, FollowerAppend, FollowerDelete, FollowerSnapshot};
use crate::log::{LogPosition, LogSuffix};
use crate::message::{AppendEntriesCall, Message};
use crate::{ErrorKind, Io, Result};
Expand Down Expand Up @@ -93,12 +93,8 @@ impl<IO: Io> FollowerIdle<IO> {
if !matched {
// 両者が分岐している
// => ローカルログ(の未コミット領域)をロールバックして、同期位置まで戻る
let new_log_tail = lcp;
track!(common.handle_log_rollbacked(new_log_tail))?;
common
.rpc_callee(&message.header)
.reply_append_entries(new_log_tail);
Ok(None)
let next = FollowerDelete::new(common, lcp, message);
Ok(Some(RoleState::Follower(Follower::Delete(next))))
} else {
// 両者は包含関係にあるので、追記が可能
track!(message.suffix.skip_to(lcp.index))?;
Expand Down
30 changes: 29 additions & 1 deletion src/node_state/follower/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use self::append::FollowerAppend;
use self::delete::FollowerDelete;
use self::idle::FollowerIdle;
use self::init::FollowerInit;
use self::snapshot::FollowerSnapshot;
Expand All @@ -8,6 +9,7 @@ use crate::message::{Message, MessageHeader};
use crate::{Io, Result};

mod append;
mod delete;
mod idle;
mod init;
mod snapshot;
Expand All @@ -30,6 +32,9 @@ pub enum Follower<IO: Io> {

/// ローカルログへのスナップショット保存中.
Snapshot(FollowerSnapshot<IO>),

/// ローカルログの末尾部分を削除中
Delete(FollowerDelete<IO>),
}
impl<IO: Io> Follower<IO> {
pub fn new(common: &mut Common<IO>, pending_vote: Option<MessageHeader>) -> Self {
Expand All @@ -38,7 +43,28 @@ impl<IO: Io> Follower<IO> {
Follower::Init(follower)
}
pub fn handle_timeout(&mut self, common: &mut Common<IO>) -> Result<NextState<IO>> {
Ok(Some(common.transit_to_candidate()))
match self {
Follower::Delete(delete) => {
// Delete中にタイムアウトしたことを記録する。
// これによって削除完了後にはcandidateに遷移するようになる。
//
// * IMPORTANT REMARK *
// 削除後にcandidateに遷移する振る舞いにしているのは
// `Io`トレイではタイマーに周期性を要求していないからである。
// もし非周期的なタイマー(一度だけ発火するタイマー)が使われている場合に、
// かつ、このような遷移処理を行わない場合では、
// 極端な状況で全員がFollowerになりクラスタが硬直する。
delete.set_timeout();

// Delete中はタイムアウトしても削除処理を続行する。
// もしタイムアウトによってキャンセルした場合は
// follower/delete.rs にある
// delete_test_scenario1 でプログラムが異常終了する。
// 詳しくは当該テストを参考のこと。
Ok(None)
}
_ => Ok(Some(common.transit_to_candidate())),
}
}
pub fn handle_message(
&mut self,
Expand All @@ -58,6 +84,7 @@ impl<IO: Io> Follower<IO> {
Follower::Idle(ref mut t) => track!(t.handle_message(common, message)),
Follower::Append(ref mut t) => track!(t.handle_message(common, message)),
Follower::Snapshot(ref mut t) => track!(t.handle_message(common, message)),
Follower::Delete(ref mut t) => track!(t.handle_message(common, message)),
}
}
pub fn run_once(&mut self, common: &mut Common<IO>) -> Result<NextState<IO>> {
Expand All @@ -66,6 +93,7 @@ impl<IO: Io> Follower<IO> {
Follower::Idle(_) => Ok(None),
Follower::Append(ref mut t) => track!(t.run_once(common)),
Follower::Snapshot(ref mut t) => track!(t.run_once(common)),
Follower::Delete(ref mut t) => track!(t.run_once(common)),
}
}
}
Loading