From 9870562c1e6f51c029ef7328b1af2b66e1d3a937 Mon Sep 17 00:00:00 2001 From: Henry Sun Date: Thu, 12 Dec 2024 12:14:57 -0800 Subject: [PATCH] Report only confirmed log entries as matching in AppendEntriesResponse Summary: For successful appends, only report up to those log entries that were actually confirmed to match the leader's log entries in the current append rather than all log entries. This avoid establish false quorum when a prefix of the log matches. Reviewed By: jaher Differential Revision: D67120345 fbshipit-source-id: b6c55104547157a9c0a235671c35a0f9f0e49364 --- src/wa_raft_log.erl | 12 +++--- src/wa_raft_server.erl | 92 +++++++++--------------------------------- 2 files changed, 27 insertions(+), 77 deletions(-) diff --git a/src/wa_raft_log.erl b/src/wa_raft_log.erl index bc4b11b..712fadc 100644 --- a/src/wa_raft_log.erl +++ b/src/wa_raft_log.erl @@ -325,7 +325,7 @@ append(#log_view{last = Last} = View, Entries) -> %% all log entries after the mismatching log will be replaced with the new log %% entries provided. -spec append(View :: view(), Start :: log_index(), Entries :: [log_entry()]) -> - {ok, LastIndex :: log_index(), NewView :: view()} | wa_raft:error(). + {ok, MatchIndex :: log_index(), NewView :: view()} | wa_raft:error(). append(View, Start, _Entries) when Start =< 0 -> ?LOG_ERROR("[~p] rejecting append starting at invalid start index ~p", [log_name(View), Start], #{domain => [whatsapp, wa_raft]}), {error, invalid_start_index}; @@ -336,8 +336,9 @@ append(#log_view{log = Log, last = Last} = View0, Start, Entries) -> ok -> ?RAFT_COUNT('raft.log.append.ok'), View1 = update_config_cache(View0, Start, Entries), - NewLast = max(Last, Start + length(Entries) - 1), - {ok, NewLast, View1#log_view{last = NewLast}}; + NewMatch = Start + length(Entries) - 1, + NewLast = max(Last, NewMatch), + {ok, NewMatch, View1#log_view{last = NewLast}}; {mismatch, Index} -> ?RAFT_COUNT('raft.log.append.mismatch'), case truncate(View0, Index) of @@ -347,8 +348,9 @@ append(#log_view{log = Log, last = Last} = View0, Start, Entries) -> ok -> ?RAFT_COUNT('raft.log.append.ok'), View2 = update_config_cache(View1, Start, NewEntries), - NewLast = max(Index - 1, Start + length(Entries) - 1), - {ok, NewLast, View2#log_view{last = NewLast}}; + NewMatch = Start + length(Entries) - 1, + NewLast = max(Index - 1, NewMatch), + {ok, NewMatch, View2#log_view{last = NewLast}}; {error, Reason} -> ?RAFT_COUNT('raft.log.append.error'), {error, Reason} diff --git a/src/wa_raft_server.erl b/src/wa_raft_server.erl index c348b24..a4ff6dd 100644 --- a/src/wa_raft_server.erl +++ b/src/wa_raft_server.erl @@ -914,83 +914,33 @@ leader(cast, ?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender, ?APPE _ -> cancel_bulk_logs_for_follower(Sender, State1) end, - % Here, the RAFT protocol expects that the MatchIndex for the follower be set to - % the log index of the last log entry replicated by the AppendEntries RPC that - % triggered this AppendEntriesResponse RPC. However, we do not have enough state - % here to figure out that log index so instead assume that the follower's log - % matches completely after a successful AppendEntries RPC. - % - % This is perfectly valid during normal operations after the leadership has been - % stable for a while since all replication at that point occurs at the end of - % the log, and so FollowerEndIndex =:= PrevLogIndex + length(Entries). However, - % this may not be true at the start of a term. - % - % In our implementation of the RAFT protocol, the leader of a new term always - % appends a new log entry created by itself (with the new term) to the end of - % the log before starting replication (hereinafter the "initial log entry"). - % We store the index of the initial log entry in FirstCurrentTermLogIndex. - % For all followers, NextIndex is initialized to FirstCurrentTermLogIndex so - % replication for the new term always starts from the initial log entry. In - % addition, the leader will refuse to commit any log entries until it finds - % a quorum that contains at least the initial log entry has been established. - % - % Note that since the initial log entry is created by the RAFT leader at the - % start of a new term, it is impossible for followers with a log at least as - % long as the leader's to match. After the first round of AppendEntries, all - % followers will either match the leader or will have a log whose last log - % index is lesser than FirstCurrentTermLogIndex. - % - % * For any followers whose log matches, the condition is trivial. - % * For any followers whose log does not match and whose log ends with a log - % entry with an index lesser than (FirstCurrentTermLogIndex - 1), the first - % AppendEntries will fail due to the the previous log entry being missing. - % * For any followers whose log does not match and whose log ends with a log - % entry with an index at least (FirstCurrentTermLogIndex - 1), the first - % AppendEntries RPC will contain the initial log entry, which is guaranteed - % to not match, resulting in the log being truncated to end with the log - % entry at (FirstCurrentTermLogIndex - 1). Subsequent mismatches of this - % type will be detected by mismatching PrevLogIndex and PrevLogTerm. - % - % By the liveness of the RAFT protocol, since the AppendEntries step always - % results in a FollowerEndIndex that is less than FirstCurrentTermLogIndex - % if the follower's log does not match the leader's, we can conclude that - % once FollowerEndIndex reaches FirstCurrentTermLogIndex, the follower must - % have a matching log. Thus, once the quorum MatchIndex reaches a log index - % at least FirstCurrentTermLogIndex (a.k.a. the initial log entry), we can - % be sure that a quorum has been formed even when setting MatchIndex to - % FollowerEndIndex for all AppendEntries. - MatchIndex1 = maps:put(FollowerId, FollowerEndIndex, MatchIndex0), OldNextIndex = maps:get(FollowerId, NextIndex0, TermStartIndex), NextIndex1 = maps:put(FollowerId, erlang:max(OldNextIndex, FollowerEndIndex + 1), NextIndex0), State2 = State1#raft_state{match_index = MatchIndex1, next_index = NextIndex1}, - State3 = maybe_apply(FollowerEndIndex, State2), + State3 = maybe_apply(State2), ?RAFT_GATHER('raft.leader.apply.func', timer:now_diff(os:timestamp(), StartT)), {keep_state, maybe_heartbeat(State3), ?HEARTBEAT_TIMEOUT(State3)}; %% and failures. -leader(cast, ?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, false, FollowerEndIndex)), - #raft_state{name = Name, current_term = CurrentTerm, next_index = NextIndex0, match_index = MatchIndex0} = State0) -> +leader(cast, ?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender, ?APPEND_ENTRIES_RESPONSE(_PrevLogIndex, false, FollowerEndIndex)), + #raft_state{name = Name, current_term = CurrentTerm, next_index = NextIndex0} = State0) -> ?RAFT_COUNT('raft.leader.append.failure'), - ?LOG_DEBUG("Server[~0p, term ~0p, leader] append failure for follower ~p. Follower reports local log ends at ~0p.", + ?LOG_DEBUG("Server[~0p, term ~0p, leader] append failure for follower ~p. Follower reports log matches up to ~0p.", [Name, CurrentTerm, Sender, FollowerEndIndex], #{domain => [whatsapp, wa_raft]}), select_follower_replication_mode(FollowerEndIndex, State0) =:= snapshot andalso request_snapshot_for_follower(FollowerId, State0), cancel_bulk_logs_for_follower(Sender, State0), - % See comment in successful branch of AppendEntriesResponse RPC handling for - % reasoning as to why it is safe to set MatchIndex to FollowerEndIndex for this - % RAFT implementation. - MatchIndex1 = maps:put(FollowerId, FollowerEndIndex, MatchIndex0), % We must trust the follower's last log index here because the follower may have % applied a snapshot since the last successful heartbeat. In such case, we need % to fast-forward the follower's next index so that we resume replication at the % point after the snapshot. NextIndex1 = maps:put(FollowerId, FollowerEndIndex + 1, NextIndex0), - State1 = State0#raft_state{next_index = NextIndex1, match_index = MatchIndex1}, - State2 = maybe_apply(min(PrevLogIndex, FollowerEndIndex), State1), + State1 = State0#raft_state{next_index = NextIndex1}, + State2 = maybe_apply(State1), {keep_state, maybe_heartbeat(State2), ?HEARTBEAT_TIMEOUT(State2)}; %% [RequestVote RPC] We are already leader for the current term, so always decline votes (5.1, 5.2) @@ -2131,15 +2081,15 @@ apply_single_node_cluster(#raft_state{name = Name, log_view = View0} = State0) - {ok, L} -> L; _ -> View0 end, - maybe_apply(infinity, State0#raft_state{log_view = View1}); + maybe_apply(State0#raft_state{log_view = View1}); _ -> State0 end. %% Leader - check quorum and apply logs if necessary --spec maybe_apply(EndIndex :: infinity | wa_raft_log:log_index(), State0 :: #raft_state{}) -> State1 :: #raft_state{}. -maybe_apply(EndIndex, #raft_state{name = Name, log_view = View, current_term = CurrentTerm, - match_index = MatchIndex, commit_index = LastCommitIndex, last_applied = LastAppliedIndex} = State0) when EndIndex > LastCommitIndex -> +-spec maybe_apply(Data :: #raft_state{}) -> #raft_state{}. +maybe_apply(#raft_state{name = Name, log_view = View, current_term = CurrentTerm, + match_index = MatchIndex, commit_index = LastCommitIndex, last_applied = LastAppliedIndex} = State0) -> % Raft paper section 5.4.3 - Only log entries from the leader’s current term are committed % by counting replicas; once an entry from the current term has been committed in this way, % then all prior entries are committed indirectly because of the View Matching Property @@ -2156,17 +2106,15 @@ maybe_apply(EndIndex, #raft_state{name = Name, log_view = View, current_term = C {ok, Term} when Term < CurrentTerm -> % Raft paper section 5.4.3 - as a leader, don't commit entries from previous term if no log entry of current term has applied yet ?RAFT_COUNT('raft.apply.delay.old'), - ?LOG_WARNING("Server[~0p, term ~0p, leader] delays commit of log entry ~0p with old term ~0p.", - [Name, CurrentTerm, EndIndex, Term], #{domain => [whatsapp, wa_raft]}), + ?LOG_WARNING("Server[~0p, term ~0p, leader] delays commit of log up to ~0p due to old term ~0p.", + [Name, CurrentTerm, CommitIndex, Term], #{domain => [whatsapp, wa_raft]}), State0; _ -> State0 end; _ -> State0 %% no quorum yet - end; -maybe_apply(_EndIndex, State) -> - State. + end. % Return the max index to potentially apply on the leader. This is the latest log index that % has achieved replication on at least a quorum of nodes in the current RAFT cluster. @@ -2503,9 +2451,9 @@ handle_heartbeat(State, Event, Leader, PrevLogIndex, PrevLogTerm, Entries, Commi [Name, CurrentTerm, State, EntryCount, PrevLogIndex + 1, PrevLogIndex + EntryCount, wa_raft_log:last_index(View)], #{domain => [whatsapp, wa_raft]}), case append_entries(State, PrevLogIndex, PrevLogTerm, Entries, EntryCount, Data0) of - {ok, Accepted, NewLastIndex, Data1} -> - send_rpc(Leader, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, Accepted, NewLastIndex), Data1), - reply(Event, ?LEGACY_APPEND_ENTRIES_RESPONSE_RPC(CurrentTerm, node(), PrevLogIndex, Accepted, NewLastIndex)), + {ok, Accepted, NewMatchIndex, Data1} -> + send_rpc(Leader, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, Accepted, NewMatchIndex), Data1), + reply(Event, ?LEGACY_APPEND_ENTRIES_RESPONSE_RPC(CurrentTerm, node(), PrevLogIndex, Accepted, NewMatchIndex)), LocalTrimIndex = case ?RAFT_LOG_ROTATION_BY_TRIM_INDEX(App) of true -> TrimIndex; @@ -2513,7 +2461,7 @@ handle_heartbeat(State, Event, Leader, PrevLogIndex, PrevLogTerm, Entries, Commi end, Data2 = Data1#raft_state{leader_heartbeat_ts = erlang:monotonic_time(millisecond)}, Data3 = case Accepted of - true -> apply_log(Data2, min(CommitIndex, NewLastIndex), LocalTrimIndex, undefined); + true -> apply_log(Data2, min(CommitIndex, NewMatchIndex), LocalTrimIndex, undefined); _ -> Data2 end, check_follower_lagging(CommitIndex, Data3), @@ -2529,7 +2477,7 @@ handle_heartbeat(State, Event, Leader, PrevLogIndex, PrevLogTerm, Entries, Commi %% is encountered, returns a diagnostic that can be used as a reason to %% disable the current replica. -spec append_entries(State :: state(), PrevLogIndex :: wa_raft_log:log_index(), PrevLogTerm :: wa_raft_log:log_term(), Entries :: [wa_raft_log:log_entry()], EntryCount :: non_neg_integer(), Data :: #raft_state{}) -> - {ok, Accepted :: boolean(), NewLastIndex :: wa_raft_log:log_index(), NewData :: #raft_state{}} | {fatal, Reason :: term()}. + {ok, Accepted :: boolean(), NewMatchIndex :: wa_raft_log:log_index(), NewData :: #raft_state{}} | {fatal, Reason :: term()}. append_entries(State, PrevLogIndex, PrevLogTerm, Entries, EntryCount, #raft_state{name = Name, log_view = View, last_applied = LastApplied, current_term = CurrentTerm, leader_id = LeaderId} = Data) -> % Inspect the locally stored term associated with the previous log entry to discern if % appending the provided range of log entries is allowed. @@ -2538,8 +2486,8 @@ append_entries(State, PrevLogIndex, PrevLogTerm, Entries, EntryCount, #raft_stat % If the term of the log entry previous the entries to be applied matches the term stored % with the previous log entry in the local RAFT log, then this follower can proceed with % appending to the log. - {ok, NewLastIndex, NewView} = wa_raft_log:append(View, PrevLogIndex + 1, Entries), - {ok, true, NewLastIndex, Data#raft_state{log_view = NewView}}; + {ok, NewMatchIndex, NewView} = wa_raft_log:append(View, PrevLogIndex + 1, Entries), + {ok, true, NewMatchIndex, Data#raft_state{log_view = NewView}}; {ok, LocalPrevLogTerm} -> % If the term of the log entry proceeding the entries to be applied does not match the log % entry stored with the previous log entry in the local RAFT log, then we need to truncate