From 8563bf6044206d0d1e66e3f190431a94f39e6b42 Mon Sep 17 00:00:00 2001 From: Madan Jampani Date: Mon, 18 Mar 2024 10:37:58 -0700 Subject: [PATCH] Support for snapshot catchup receiver to throttle incoming transports Summary: Allows a peer to communicate it is at the limit for incoming shard transfers. Sender will not bother such a peer until a backoff period (30s) expires. Differential Revision: D54828872 fbshipit-source-id: 9ef30657086ce92fcacbf9e1cef3f4c741c73280 --- include/wa_raft.hrl | 3 + src/wa_raft_server.erl | 4 +- src/wa_raft_snapshot_catchup.erl | 45 +++++++++------ src/wa_raft_transport.erl | 95 +++++++++++++++++++++----------- 4 files changed, 98 insertions(+), 49 deletions(-) diff --git a/include/wa_raft.hrl b/include/wa_raft.hrl index 4e6b5fa..d1fffb6 100644 --- a/include/wa_raft.hrl +++ b/include/wa_raft.hrl @@ -237,6 +237,9 @@ %% Maximum bytes per heartbeat for catchup by bulk log transfer -define(RAFT_CATCHUP_MAX_BYTES_PER_BATCH, raft_catchup_log_batch_bytes). -define(RAFT_CATCHUP_MAX_BYTES_PER_BATCH(App), ?RAFT_APP_CONFIG(App, ?RAFT_CATCHUP_MAX_BYTES_PER_BATCH, 4 * 1024 * 1024)). +% Time to wait before retrying snapshot transport to a overloaded peer. +-define(RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS, snapshot_catchup_overloaded_backoff_ms). +-define(RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS(App), ?RAFT_APP_CONFIG(App, ?RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS, 1000)). %% Time in seconds to retain transport destination directories after use -define(RAFT_TRANSPORT_RETAIN_INTERVAL, transport_retain_min_secs). diff --git a/src/wa_raft_server.erl b/src/wa_raft_server.erl index 4dbcb8b..565c9b9 100644 --- a/src/wa_raft_server.erl +++ b/src/wa_raft_server.erl @@ -2437,7 +2437,7 @@ select_follower_replication_mode(FollowerLastIndex, #raft_state{application = Ap %% transports have been started then no transport is created. This function %% always performs this request asynchronously. -spec request_snapshot_for_follower(node(), #raft_state{}) -> term(). -request_snapshot_for_follower(FollowerId, #raft_state{name = Name, table = Table, partition = Partition, data_dir = DataDir, log_view = View} = State) -> +request_snapshot_for_follower(FollowerId, #raft_state{application = App, name = Name, table = Table, partition = Partition, data_dir = DataDir, log_view = View} = State) -> case lists:member({Name, FollowerId}, config_witnesses(config(State))) of true -> % If node is a witness, we can bypass the transport process since we don't have to @@ -2447,7 +2447,7 @@ request_snapshot_for_follower(FollowerId, #raft_state{name = Name, table = Table LastLogPos = #raft_log_pos{index = LastLogIndex, term = LastLogTerm}, wa_raft_server:snapshot_available({Name, FollowerId}, DataDir, LastLogPos); false -> - wa_raft_snapshot_catchup:request_snapshot_transport(FollowerId, Table, Partition) + wa_raft_snapshot_catchup:request_snapshot_transport(App, FollowerId, Table, Partition) end. -spec request_bulk_logs_for_follower(#raft_identity{}, wa_raft_log:log_index(), #raft_state{}) -> ok. diff --git a/src/wa_raft_snapshot_catchup.erl b/src/wa_raft_snapshot_catchup.erl index f56e7ed..b9c3ff1 100644 --- a/src/wa_raft_snapshot_catchup.erl +++ b/src/wa_raft_snapshot_catchup.erl @@ -22,7 +22,7 @@ %% Internal API -export([ current_snapshot_transports/0, - request_snapshot_transport/3 + request_snapshot_transport/4 ]). %% Snapshot catchup server implementation @@ -47,7 +47,9 @@ % currently active transports transports = #{} :: #{key() => #transport{}}, % counts of active transports that are using a particular snapshot - snapshots = #{} :: #{snapshot_key() => pos_integer()} + snapshots = #{} :: #{snapshot_key() => pos_integer()}, + % backoff windows for nodes that previously reported being overloaded + backoff_windows = #{} :: #{node() => pos_integer()} }). -spec child_spec() -> supervisor:child_spec(). @@ -68,9 +70,9 @@ start_link() -> current_snapshot_transports() -> gen_server:call(?MODULE, current_snapshot_transports). --spec request_snapshot_transport(Peer :: node(), Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> ok. -request_snapshot_transport(Peer, Table, Partition) -> - gen_server:cast(?MODULE, {request_snapshot_transport, Peer, Table, Partition}). +-spec request_snapshot_transport(App :: atom(), Peer :: node(), Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> ok. +request_snapshot_transport(App, Peer, Table, Partition) -> + gen_server:cast(?MODULE, {request_snapshot_transport, App, Peer, Table, Partition}). -spec init(Args :: term()) -> {ok, #state{}}. init([]) -> @@ -85,24 +87,35 @@ handle_call(Request, From, #state{} = State) -> ?LOG_NOTICE("received unrecognized call ~P from ~0p", [Request, 25, From], #{domain => [whatsapp, wa_raft]}), {noreply, State}. --spec handle_cast({request_snapshot_transport, node(), wa_raft:table(), wa_raft:partition()}, State :: #state{}) -> {noreply, #state{}}. -handle_cast({request_snapshot_transport, Peer, Table, Partition}, #state{transports = Transports, snapshots = Snapshots} = State) -> - case Transports of - #{{Peer, Table, Partition} := _} -> +-spec handle_cast({request_snapshot_transport, atom(), node(), wa_raft:table(), wa_raft:partition()}, State :: #state{}) -> {noreply, #state{}}. +handle_cast({request_snapshot_transport, App, Peer, Table, Partition}, #state{transports = Transports, snapshots = Snapshots, backoff_windows = BackoffWindows} = State) -> + NowMillis = erlang:monotonic_time(millisecond), + case {Transports, BackoffWindows} of + {#{{Peer, Table, Partition} := _}, _} -> {noreply, State}; - _ -> + {_, #{Peer := RetryAfterTs}} when RetryAfterTs > NowMillis -> + {noreply, State}; + {_, _} -> case maps:size(Transports) < ?RAFT_MAX_CONCURRENT_SNAPSHOT_CATCHUP() of true -> try StorageRef = wa_raft_storage:registered_name(Table, Partition), {ok, #raft_log_pos{index = Index, term = Term} = LogPos} = wa_raft_storage:create_snapshot(StorageRef), Path = ?RAFT_SNAPSHOT_PATH(Table, Partition, Index, Term), - {ok, ID} = wa_raft_transport:start_snapshot_transfer(Peer, Table, Partition, LogPos, Path, infinity), - ?LOG_NOTICE("started sending snapshot for ~0p:~0p at ~0p:~0p over transport ~0p", - [Table, Partition, Index, Term, ID], #{domain => [whatsapp, wa_raft]}), - NewTransports = Transports#{{Peer, Table, Partition} => #transport{id = ID, snapshot = LogPos}}, - NewSnapshots = maps:update_with({Table, Partition, LogPos}, fun(V) -> V + 1 end, 1, Snapshots), - {noreply, State#state{transports = NewTransports, snapshots = NewSnapshots}} + case wa_raft_transport:start_snapshot_transfer(Peer, Table, Partition, LogPos, Path, infinity) of + {error, receiver_overloaded} -> + ?LOG_NOTICE("Peer ~0p reported being overloaded. Not sending snapshot for ~0p:~0p. Will try again later", + [Peer, Table, Partition], #{domain => [whatsapp, wa_raft]}), + NewRetryAfterTs = NowMillis + ?RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS(App), + {noreply, State#state{backoff_windows = BackoffWindows#{Peer => NewRetryAfterTs}}}; + {ok, ID} -> + ?LOG_NOTICE("started sending snapshot for ~0p:~0p at ~0p:~0p over transport ~0p", + [Table, Partition, Index, Term, ID], #{domain => [whatsapp, wa_raft]}), + NewTransports = Transports#{{Peer, Table, Partition} => #transport{id = ID, snapshot = LogPos}}, + NewSnapshots = maps:update_with({Table, Partition, LogPos}, fun(V) -> V + 1 end, 1, Snapshots), + NewBackoffWindows = maps:remove(Peer, BackoffWindows), + {noreply, State#state{transports = NewTransports, snapshots = NewSnapshots, backoff_windows = NewBackoffWindows}} + end catch _T:_E:S -> ?LOG_ERROR("failed to start accepted snapshot transport of ~0p:~0p to ~0p at ~p", diff --git a/src/wa_raft_transport.erl b/src/wa_raft_transport.erl index a89fa1d..761ce15 100644 --- a/src/wa_raft_transport.erl +++ b/src/wa_raft_transport.erl @@ -74,6 +74,14 @@ -define(RAFT_TRANSPORT_SCAN_INTERVAL_SECS, 30). +-define(RAFT_TRANSPORT_MAX_CONCURRENT_ACTIVE_RECEIVES, 10). + +%% Number of counters +-define(RAFT_TRANSPORT_COUNTERS, 1). + +%% Counter - inflight receives +-define(RAFT_TRANSPORT_COUNTER_ACTIVE_RECEIVES, 1). + -define(INFO_KEY(ID), {ID, info}). -define(FILE_KEY(ID, FileID), {ID, {file, FileID}}). @@ -132,7 +140,9 @@ error => Reason :: term() }. --record(state, {}). +-record(state, { + counters :: counters:counters_ref() +}). %%% ------------------------------------------------------------------------ %%% Behaviour callbacks @@ -260,15 +270,16 @@ transport_info(ID, Item) -> % This function should only be called from the "factory" process since it does not % provide any atomicity guarantees. --spec set_transport_info(ID :: transport_id(), Info :: transport_info()) -> term(). -set_transport_info(ID, #{atomics := TransportAtomics} = Info) -> +-spec set_transport_info(ID :: transport_id(), Info :: transport_info(), Counters :: counters:counters_ref()) -> term(). +set_transport_info(ID, #{atomics := TransportAtomics} = Info, Counters) -> true = ets:insert(?MODULE, {?INFO_KEY(ID), Info}), + maybe_update_active_inbound_transport_counts(undefined, Info, Counters), ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, erlang:system_time(millisecond)). % This function should only be called from the "factory" process since it does not % provide any atomicity guarantees. --spec update_transport_info(ID :: transport_id(), Fun :: fun((Info :: transport_info()) -> NewInfo :: transport_info())) -> ok | not_found. -update_transport_info(ID, Fun) -> +-spec update_transport_info(ID :: transport_id(), Fun :: fun((Info :: transport_info()) -> NewInfo :: transport_info()), Counters :: counters:counters_ref()) -> ok | not_found. +update_transport_info(ID, Fun, Counters) -> case transport_info(ID) of {ok, #{atomics := TransportAtomics} = Info} -> case Fun(Info) of @@ -277,7 +288,7 @@ update_transport_info(ID, Fun) -> NewInfo -> true = ets:insert(?MODULE, {?INFO_KEY(ID), NewInfo}), ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, erlang:system_time(millisecond)), - ok + maybe_update_active_inbound_transport_counts(Info, NewInfo, Counters) end; not_found -> not_found @@ -290,6 +301,16 @@ file_info(ID, FileID) -> [] -> not_found end. +-spec maybe_update_active_inbound_transport_counts(OldInfo :: transport_info() | undefined, NewInfo :: transport_info(), Counters :: counters:counters_ref()) -> ok. +maybe_update_active_inbound_transport_counts(undefined, #{type := receiver, status := running}, Counters) -> + counters:add(Counters, ?RAFT_TRANSPORT_COUNTER_ACTIVE_RECEIVES, 1); +maybe_update_active_inbound_transport_counts(#{type := receiver, status := OldStatus}, #{status := running}, Counters) when OldStatus =/= running -> + counters:add(Counters, ?RAFT_TRANSPORT_COUNTER_ACTIVE_RECEIVES, 1); +maybe_update_active_inbound_transport_counts(#{type := receiver, status := running}, #{status := NewStatus}, Counters) when NewStatus =/= running -> + counters:sub(Counters, ?RAFT_TRANSPORT_COUNTER_ACTIVE_RECEIVES, 1); +maybe_update_active_inbound_transport_counts(_, _, _) -> + ok. + % This function should only be called from the "worker" process responsible for the % transport of the specified file since it does not provide any atomicity guarantees. -spec set_file_info(ID :: transport_id(), FileID :: file_id(), Info :: file_info()) -> term(). @@ -354,8 +375,9 @@ registered_module(Table, Partition) -> -spec init(Args :: term()) -> {ok, State :: #state{}}. init(_) -> process_flag(trap_exit, true), + Counters = counters:new(?RAFT_TRANSPORT_COUNTERS, [atomics]), schedule_scan(), - {ok, #state{}}. + {ok, #state{counters = Counters}}. -spec handle_call(Request, From :: gen_server:from(), State :: #state{}) -> {reply, Reply :: term(), NewState :: #state{}} | {noreply, NewState :: #state{}} when @@ -364,21 +386,23 @@ init(_) -> {start_wait, Peer :: node(), Meta :: meta(), Root :: string()} | {transport, ID :: transport_id(), Peer :: node(), Module :: module(), Meta :: meta(), Files :: [{file_id(), RelPath :: string(), Size :: integer()}]} | {cancel, ID :: transport_id(), Reason :: term()}. -handle_call({start, Peer, Meta, Root}, _From, #state{} = State) -> - {reply, handle_transport_start(undefined, Peer, Meta, Root), State}; -handle_call({start_wait, Peer, Meta, Root}, From, #state{} = State) -> - case handle_transport_start(From, Peer, Meta, Root) of +handle_call({start, Peer, Meta, Root}, _From, #state{counters = Counters} = State) -> + {reply, handle_transport_start(undefined, Peer, Meta, Root, Counters), State}; +handle_call({start_wait, Peer, Meta, Root}, From, #state{counters = Counters} = State) -> + case handle_transport_start(From, Peer, Meta, Root, Counters) of {ok, _ID} -> {noreply, State}; {error, Reason} -> {reply, {error, Reason}, State} end; -handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{} = State) -> +handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{counters = Counters} = State) -> try - case transport_info(ID) of - {ok, _Info} -> + case {transport_info(ID), counters:get(Counters, ?RAFT_TRANSPORT_COUNTER_ACTIVE_RECEIVES)} of + {{ok, _Info}, _} -> ?LOG_WARNING("wa_raft_transport got duplicate transport receive start for ~p from ~p", [ID, From], #{domain => [whatsapp, wa_raft]}), {reply, duplicate, State}; - not_found -> + {not_found, NumActiveReceives} when NumActiveReceives >= ?RAFT_TRANSPORT_MAX_CONCURRENT_ACTIVE_RECEIVES -> + {reply, {error, receiver_overloaded}, State}; + {not_found, _} -> ?RAFT_COUNT('raft.transport.receive'), ?LOG_NOTICE("wa_raft_transport starting transport receive for ~p", [ID], #{domain => [whatsapp, wa_raft]}), @@ -403,7 +427,7 @@ handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{} = State) start_ts => NowMillis, total_files => TotalFiles, completed_files => 0 - }), + }, Counters), [ begin FileAtomics = atomics:new(?RAFT_TRANSPORT_FILE_ATOMICS_COUNT, []), @@ -426,7 +450,7 @@ handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{} = State) {error, Reason} -> Info1#{status => failed, error => {notify_failed, Reason}} end, maybe_notify(ID, Info2) - end), + end, Counters), {reply, ok, State} end @@ -435,10 +459,10 @@ handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{} = State) ?RAFT_COUNT('raft.transport.receive.error'), ?LOG_WARNING("wa_raft_transport failed to accept transport ~p due to ~p ~p: ~n~p", [ID, T, E, S], #{domain => [whatsapp, wa_raft]}), - update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => erlang:system_time(millisecond), error => {receive_failed, {T, E, S}}} end), + update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => erlang:system_time(millisecond), error => {receive_failed, {T, E, S}}} end, Counters), {reply, {error, failed}, State} end; -handle_call({cancel, ID, Reason}, _From, #state{} = State) -> +handle_call({cancel, ID, Reason}, _From, #state{counters = Counters} = State) -> ?LOG_NOTICE("wa_raft_transport got cancellation request for ~p for reason ~p", [ID, Reason], #{domain => [whatsapp, wa_raft]}), Result = @@ -449,7 +473,8 @@ handle_call({cancel, ID, Reason}, _From, #state{} = State) -> Info#{status => cancelled, end_ts => NowMillis, error => {cancelled, Reason}}; (Info) -> Info - end), + end, + Counters), Reply = case Result of ok -> ok; not_found -> {error, not_found} @@ -462,7 +487,7 @@ handle_call(Request, _From, #state{} = State) -> -spec handle_cast(Request, State :: #state{}) -> {noreply, NewState :: #state{}} when Request :: {complete, ID :: transport_id(), FileID :: file_id(), Status :: term(), Pid :: pid()}. -handle_cast({complete, ID, FileID, Status, Pid}, #state{} = State) -> +handle_cast({complete, ID, FileID, Status, Pid}, #state{counters = Counters} = State) -> ?RAFT_COUNT('raft.transport.file.complete'), NowMillis = erlang:system_time(millisecond), Result0 = update_file_info(ID, FileID, @@ -498,7 +523,8 @@ handle_cast({complete, ID, FileID, Status, Pid}, #state{} = State) -> maybe_notify(ID, Info5); (Info) -> Info - end), + end, + Counters), Result1 =:= not_found andalso ?LOG_WARNING("wa_raft_transport got complete report for unknown transfer ~p", [ID], #{domain => [whatsapp, wa_raft]}), @@ -508,10 +534,10 @@ handle_cast(Request, State) -> {noreply, State}. -spec handle_info(Info :: term(), State :: #state{}) -> {noreply, NewState :: #state{}}. -handle_info(scan, State) -> +handle_info(scan, #state{counters = Counters} = State) -> lists:foreach( fun (ID) -> - update_transport_info(ID, fun (Info) -> scan_transport(ID, Info) end) + update_transport_info(ID, fun (Info) -> scan_transport(ID, Info) end, Counters) end, transports()), schedule_scan(), {noreply, State}; @@ -532,8 +558,8 @@ make_id() -> not_found -> ID end. --spec handle_transport_start(From :: gen_server:from() | undefined, Peer :: node(), Meta :: meta(), Root :: string()) -> {ok, ID :: transport_id()} | wa_raft:error(). -handle_transport_start(From, Peer, Meta, Root) -> +-spec handle_transport_start(From :: gen_server:from() | undefined, Peer :: node(), Meta :: meta(), Root :: string(), Counters :: counters:counters_ref()) -> {ok, ID :: transport_id()} | wa_raft:error(). +handle_transport_start(From, Peer, Meta, Root, Counters) -> ID = make_id(), ?RAFT_COUNT('raft.transport.start'), @@ -559,7 +585,7 @@ handle_transport_start(From, Peer, Meta, Root) -> start_ts => NowMillis, total_files => TotalFiles, completed_files => 0 - }), + }, Counters), [ begin FileAtomics = atomics:new(?RAFT_TRANSPORT_FILE_ATOMICS_COUNT, []), @@ -595,21 +621,28 @@ handle_transport_start(From, Peer, Meta, Root) -> Workers = [Pid || {_Id, Pid, _Type, _Modules} <- supervisor:which_children(Sup), is_pid(Pid)], lists:foldl(fun (Pid, InfoN) -> maybe_submit_one(ID, InfoN, Pid) end, Info2, Workers) end - end), + end, + Counters), {ok, ID}; + {error, receiver_overloaded} -> + ?RAFT_COUNT('raft.transport.rejected.receiver_overloaded'), + ?LOG_WARNING("wa_raft_transport peer ~p rejected transport ~p because of overload", + [Peer, ID], #{domain => [whatsapp, wa_raft]}), + update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => NowMillis, error => {rejected, receiver_overloaded}} end, Counters), + {error, receiver_overloaded}; Error -> ?RAFT_COUNT('raft.transport.rejected'), ?LOG_WARNING("wa_raft_transport peer ~p rejected transport ~p with error ~p", [Peer, ID, Error], #{domain => [whatsapp, wa_raft]}), - update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => NowMillis, error => {rejected, Error}} end), - {error, rejected} + update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => NowMillis, error => {rejected, Error}} end, Counters), + {error, Error} end catch T:E:S -> ?RAFT_COUNT('raft.transport.start.error'), ?LOG_WARNING("wa_raft_transport failed to start transport ~p due to ~p ~p: ~n~p", [ID, T, E, S], #{domain => [whatsapp, wa_raft]}), - update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => erlang:system_time(millisecond), error => {start, {T, E, S}}} end), + update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => erlang:system_time(millisecond), error => {start, {T, E, S}}} end, Counters), {error, failed} end.