Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

suffixの明示的な削除コードを追加(issue18対策) #42

Merged
merged 6 commits into from
Oct 12, 2021
Merged
7 changes: 7 additions & 0 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub trait Io {
/// ローカルログを取得するための`Future`.
type LoadLog: Future<Item = Log, Error = Error>;

/// ローカルログの末尾部分を削除するための`Future`.
type DeleteLog: Future<Item = (), Error = Error>;

/// タイムアウトを表現するための`Future`.
type Timeout: Future<Item = (), Error = Error>;

Expand Down Expand Up @@ -83,6 +86,10 @@ pub trait Io {
/// ただし、`start`とは異なる位置から、エントリの取得を開始することは許可されない.
fn load_log(&mut self, start: LogIndex, end: Option<LogIndex>) -> Self::LoadLog;

/// ローカルログのうち末尾部分について、
/// 指定された位置 `from` を含むそれ以降 [from..) を全て削除する.
fn delete_suffix_from(&mut self, from: LogIndex) -> Self::DeleteLog;

/// 選挙における役割に応じた時間のタイムアウトオブジェクトを生成する.
fn create_timeout(&mut self, role: Role) -> Self::Timeout;

Expand Down
50 changes: 50 additions & 0 deletions src/node_state/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ pub struct Common<IO: Io> {
load_committed: Option<IO::LoadLog>,
install_snapshot: Option<InstallSnapshot<IO>>,
metrics: NodeStateMetrics,

// ストレージ中のlogに対する削除処理が
// 進行中であるかどうかを表すフラグ。
//
// このフラグが true である場合は
// ストレージ中のlogと
// メモリ中のcacheに相当する`history`とでズレが生じている。
// false である場合は、2つは一致している。
//
// 削除処理を行う箇所:
// * FollowerDelete(Followerのsubstate)
log_is_being_deleted: bool,
}
impl<IO> Common<IO>
where
Expand All @@ -53,6 +65,7 @@ where
load_committed: None,
install_snapshot: None,
metrics,
log_is_being_deleted: false,
}
}

Expand Down Expand Up @@ -244,6 +257,11 @@ where
self.io.load_log(start, end)
}

/// `from`以降のsuffixエントリ [from..) を削除する
pub fn delete_suffix_from(&mut self, from: LogIndex) -> IO::DeleteLog {
self.io.delete_suffix_from(from)
}

/// ローカルログの末尾部分に`suffix`を追記する.
pub fn save_log_suffix(&mut self, suffix: &LogSuffix) -> IO::SaveLog {
self.io.save_log_suffix(suffix)
Expand Down Expand Up @@ -321,6 +339,13 @@ where
return HandleMessageResult::Handled(None);
}

// このif文以降の計算では、historyに基づき状態を遷移させる。
// 一方で、ストレージ上のlogと食い違ったhistoryで遷移を行うと問題が生じるため、
// それを阻止するべく、logに対する削除中の場合には何もしない。
if self.log_is_being_deleted {
return HandleMessageResult::Handled(None);
}

self.local_node.ballot.term = message.header().term;
let next_state = if let Message::RequestVoteCall(m) = message {
if m.log_tail.is_newer_or_equal_than(self.history.tail()) {
Expand Down Expand Up @@ -358,6 +383,26 @@ where
HandleMessageResult::Handled(None)
}
Message::AppendEntriesCall { .. } if !self.is_following_sender(&message) => {
/*
* この節に入るときには削除処理中ではない。なぜなら……
*
* 1. 自分と同じTerm Tからメッセージが届き
* かつ、それがAppendEntriesCall (AE-call) であるということは
* そのメッセージの送り主 N が T のリーダーである。
*
* 2. 非リーダーである自分については、
* いま T にいる以上、Term S (S < T) から遷移してTになっている。
* logに対する処理中でhistoryとズレている場合は遷移を遅延させるので、
* T になった"時点"では、logに対する変更は行われていない。
*
* 3. S から T になって以降は AE-call は受け取っていない。
* (受け取っているなら N をfollowしている筈なので矛盾)
* Term T での主な計算(logへの変更も含む)は
* 最初の AE-call を受け取ってから開始するので、
* T になった"以降"も、一度もlogに対する変更は行われていない。
*/
debug_assert!(!self.log_is_being_deleted);
brly marked this conversation as resolved.
Show resolved Hide resolved

// リーダが確定したので、フォロー先を変更する
let leader = message.header().sender.clone();
self.unread_message = Some(message);
Expand Down Expand Up @@ -417,6 +462,11 @@ where
RpcCallee::new(self, caller)
}

/// ストレージにあるlogに対する削除処理の開始・終了を管理する。
pub fn set_if_log_is_being_deleted(&mut self, deleting: bool) {
self.log_is_being_deleted = deleting;
}

fn handle_committed(&mut self, suffix: LogSuffix) -> Result<()> {
let new_tail = suffix.tail();
for (index, entry) in (suffix.head.index.as_u64()..)
Expand Down
215 changes: 215 additions & 0 deletions src/node_state/follower/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use futures::{Async, Future};

use super::super::{Common, NextState, RoleState};
use super::{Follower, FollowerIdle};
use crate::log::LogPosition;
use crate::message::{AppendEntriesCall, Message};
use crate::{Io, Result};

/// ローカルログの削除を行うサブ状態
pub struct FollowerDelete<IO: Io> {
future: IO::DeleteLog,
from: LogPosition,
message: AppendEntriesCall,

// 削除処理中にtimeoutしたかどうかを記録するフラグ。
// trueの場合は、削除処理後にcandidateに遷移する。
// falseの場合は、FollowerIdleに遷移する。
timeouted: bool,
}

impl<IO: Io> FollowerDelete<IO> {
pub fn new(common: &mut Common<IO>, from: LogPosition, message: AppendEntriesCall) -> Self {
let future = common.delete_suffix_from(from.index);
FollowerDelete {
future,
from,
message,
timeouted: false,
}
}
pub fn handle_message(
&mut self,
common: &mut Common<IO>,
message: Message,
) -> Result<NextState<IO>> {
if let Message::AppendEntriesCall(m) = message {
common.rpc_callee(&m.header).reply_busy();
}
Ok(None)
}
pub fn run_once(&mut self, common: &mut Common<IO>) -> Result<NextState<IO>> {
// logに対する削除が進行中であることを
// commonに通知(フラグをセット)する。
common.set_if_log_is_being_deleted(true);

if let Async::Ready(_) = track!(self.future.poll())? {
track!(common.handle_log_rollbacked(self.from))?;

// logに対する削除が完了し
// common.historyとlogが一致したので
// commonに通知する。
common.set_if_log_is_being_deleted(false);

common
.rpc_callee(&self.message.header)
.reply_append_entries(self.from);

if self.timeouted {
Ok(Some(common.transit_to_candidate()))
} else {
let next = Follower::Idle(FollowerIdle::new());
Ok(Some(RoleState::Follower(next)))
}
} else {
Ok(None)
}
}
/// 削除処理中にtimeoutが発生した場合にそれを記録するためのメソッド
pub fn set_timeout(&mut self) {
self.timeouted = true;
}
}

#[cfg(test)]
mod test {
use crate::test_dsl::dsl::*;

#[test]
#[rustfmt::skip]
fn delete_test_scenario1() {
use Command::*;
use LogEntry::*;

let a = NodeName(0);
let b = NodeName(1);
let c = NodeName(2);
let (mut service, _cluster) = build_complete_graph(&[a, b, c]);

interpret(
&vec![
RunAllUntilStabilize,
Timeout(a),
RunAllUntilStabilize,
// ここまでで a がリーダーになっている

// 実際にリーダーになっていることを確認する
Check(a, Pred::IsLeader),
Check(b, Pred::IsFollower),
Check(c, Pred::IsFollower),

RecvBan(a, b), RecvBan(a, c), // aはbからもcからも受け取らない
RecvBan(b, a), // bはaからは受け取らない
RecvBan(c, a), // cはaからは受け取らない

// aが孤立している状況で
// データをproposeすることで
// aにデータをためる
Propose(a), Propose(a), Propose(a),

// bとcは新しいTermへ移る準備(aのfollowを外す)
Timeout(b), Timeout(c), RunAllUntilStabilize,

// bを新しいリーダーにするための準備
Timeout(b),

// bにvoteさせてリーダーとするために、
// bとcだけ適当な回数計算を進める
Step(b), Step(c),
Step(b), Step(c),
Step(b), Step(c),
Step(b), Step(c),
Step(b), Step(c),
Step(b), Step(c),

// cには、bへのvoteはさせるが
// 一方で(後でaとリーダーを競わせる都合上)
// bからリーダーになった通知であるNoopを受け取りたくないので
// 適当なタイミングで通信を閉じる。
RecvBan(c, b),
RunAllUntilStabilize,

// 想定している状況になっていることを確認する
Check(a, Pred::IsLeader),
Check(b, Pred::IsLeader),
Check(c, Pred::IsFollower),
Check(a, Pred::RawLogIs(0, 0, vec![Noop(2), Com(2), Com(2), Com(2)])),
Check(b, Pred::RawLogIs(0, 0, vec![Noop(2), Noop(4)])),
Check(c, Pred::RawLogIs(0, 0, vec![Noop(2)])),

// ここからは a で log と history が(削除処理中に)ズレる状況を作る
// まず a<->b は通信可能にする
RecvAllow(b, a),
RecvAllow(a, b),

// bからハートビートを送る
Heartbeat(b),
// aとbに対して計算を行い、
// a側で削除処理が走り
// historyとlogにズレが起こるところまで進める
Step(b), Step(a),
Step(b), Step(a),
Step(b), Step(a),
Step(b), Step(a),

// aにおいてズレが起こっていることを確認する。
//
// まず、Disk上のlogについては削除完了している。
Check(a, Pred::RawLogIs(0, 0, vec![Noop(2)])),
//
// 一方で、 historyから見ると
// 終端位置が4で、その直前のtermが2のままである。
// すなわちまだhistoryが更新されておらず、
// aでlogとhistoryがズレている。
Check(a, Pred::HistoryTail(2, 4)),

// この状態のまま、aがcandidateになれてしまうと、
// 上の食い違った状態でleaderになれる。
//
// Delete中のタイムアウトで即座にCandidateへ遷移させない場合は
// 勿論問題にならない。
//
// 一方で、
// follower/mod.rs の handle_timeout を編集し、
// タイムアウトを許すようにコードを変更した場合には、
// 以下のコード(最後のStepAll)で問題が起こる。

// まず a<->c 間だけ通信を可能にする
RecvAllow(a, c),
RecvAllow(c, a),
RecvBan(b, a),
RecvBan(b, c),
RecvBan(c, b),
RecvBan(a, b),

// aとcをタイムアウトさせて十分に実行させることで
// 両者をcandidateにする
Timeout(a), Timeout(c), StepAll(100),

// そのあと、aがLeaderになれるようにTermを増やす手助け
Timeout(a),

// ここまででaをLeaderにする準備が整った。
//
// Delete中のタイムアウトによる遷移を
// 許さない場合 => aはfollowerのまま, cはcandidateなので問題がない
// 許す場合 => aがcandidateになった後、ズレたままleaderになる。
//
// 後者の場合は、aがleaderとなった後で、
// historyとlogのズレに起因するエラーが生じる。
//
// 発生するエラーについて:
// 今回は `impl_io::over_write` で
// Disk上で「連続しないlog」を作成しようとしてエラーとなる。
//
// RaftlogではDisk上のlogは
// 論理上連続していることが仮定されているが
// (IO traitのload_log methodの引数を参照せよ)
// 仮にエラーとなる部分のassertionを外したとしても、
// 存在しない領域へのloadが発生し、どのみちエラーになる。
StepAll(100),
],
&mut service
);
}
}
10 changes: 3 additions & 7 deletions src/node_state/follower/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::marker::PhantomData;
use trackable::error::ErrorKindExt;

use super::super::{Common, NextState, RoleState};
use super::{Follower, FollowerAppend, FollowerSnapshot};
use super::{Follower, FollowerAppend, FollowerDelete, FollowerSnapshot};
use crate::log::{LogPosition, LogSuffix};
use crate::message::{AppendEntriesCall, Message};
use crate::{ErrorKind, Io, Result};
Expand Down Expand Up @@ -93,12 +93,8 @@ impl<IO: Io> FollowerIdle<IO> {
if !matched {
// 両者が分岐している
// => ローカルログ(の未コミット領域)をロールバックして、同期位置まで戻る
let new_log_tail = lcp;
track!(common.handle_log_rollbacked(new_log_tail))?;
common
.rpc_callee(&message.header)
.reply_append_entries(new_log_tail);
Ok(None)
let next = FollowerDelete::new(common, lcp, message);
Ok(Some(RoleState::Follower(Follower::Delete(next))))
} else {
// 両者は包含関係にあるので、追記が可能
track!(message.suffix.skip_to(lcp.index))?;
Expand Down
Loading