Skip to content

Commit

Permalink
Track number of transports using snapshot in snapshot catchup
Browse files Browse the repository at this point in the history
Summary:
Track the number of transports that are using the same snapshot (a
snapshot for a particular table and partition at the same log position)
to avoid deleting a snapshot when a previously started transport
completes while another transport is still using the same snapshot.

Differential Revision: D53444494

fbshipit-source-id: 8a2eb4e54e7977e05772e42da19f12ce020a709e
  • Loading branch information
hsun324 authored and facebook-github-bot committed Feb 6, 2024
1 parent 3b9346f commit 8805d79
Showing 1 changed file with 55 additions and 21 deletions.
76 changes: 55 additions & 21 deletions src/wa_raft_snapshot_catchup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,17 @@
-define(SCAN_EVERY_MS, 500).

-type key() :: {node(), wa_raft:table(), wa_raft:partition()}.
-type snapshot_key() :: {wa_raft:table(), wa_raft:partition(), wa_raft_log:log_pos()}.

-record(transport, {
id :: wa_raft_transport:transport_id(),
snapshot :: #raft_log_pos{}
snapshot :: wa_raft_log:log_pos()
}).
-record(state, {
transports = #{} :: #{key() => #transport{}}
% currently active transports
transports = #{} :: #{key() => #transport{}},
% counts of active transports that are using a particular snapshot
snapshots = #{} :: #{snapshot_key() => pos_integer()}
}).

-spec child_spec() -> supervisor:child_spec().
Expand Down Expand Up @@ -75,13 +80,13 @@ init([]) ->

-spec handle_call(Request :: term(), From :: gen_server:from(), State :: #state{}) -> {noreply, #state{}} | {reply, term(), #state{}}.
handle_call(current_snapshot_transports, _From, #state{transports = Transports} = State) ->
{reply, maps:values(Transports), State};
{reply, [ID || #transport{id = ID} <- maps:values(Transports)], State};
handle_call(Request, From, #state{} = State) ->
?LOG_NOTICE("received unrecognized call ~P from ~0p", [Request, 25, From], #{domain => [whatsapp, wa_raft]}),
{noreply, State}.

-spec handle_cast({request_snapshot_transport, node(), wa_raft:table(), wa_raft:partition()}, State :: #state{}) -> {noreply, #state{}}.
handle_cast({request_snapshot_transport, Peer, Table, Partition}, #state{transports = Transports} = State) ->
handle_cast({request_snapshot_transport, Peer, Table, Partition}, #state{transports = Transports, snapshots = Snapshots} = State) ->
case Transports of
#{{Peer, Table, Partition} := _} ->
{noreply, State};
Expand All @@ -95,7 +100,9 @@ handle_cast({request_snapshot_transport, Peer, Table, Partition}, #state{transpo
{ok, ID} = wa_raft_transport:start_snapshot_transfer(Peer, Table, Partition, LogPos, Path, infinity),
?LOG_NOTICE("started sending snapshot for ~0p:~0p at ~0p:~0p over transport ~0p",
[Table, Partition, Index, Term, ID], #{domain => [whatsapp, wa_raft]}),
{noreply, State#state{transports = Transports#{{Peer, Table, Partition} => #transport{id = ID, snapshot = LogPos}}}}
NewTransports = Transports#{{Peer, Table, Partition} => #transport{id = ID, snapshot = LogPos}},
NewSnapshots = maps:update_with({Table, Partition, LogPos}, fun(V) -> V + 1 end, 1, Snapshots),
{noreply, State#state{transports = NewTransports, snapshots = NewSnapshots}}
catch
_T:_E:S ->
?LOG_ERROR("failed to start accepted snapshot transport of ~0p:~0p to ~0p at ~p",
Expand All @@ -112,29 +119,56 @@ handle_cast(Request, #state{} = State) ->

-spec handle_info(term(), #state{}) -> {noreply, #state{}}.
handle_info(scan, #state{transports = Transports} = State) ->
NewTransports =
maps:filter(
fun ({_Peer, Table, Partition}, #transport{id = ID, snapshot = #raft_log_pos{index = Index, term = Term}}) ->
Drop = case wa_raft_transport:transport_info(ID) of
{ok, #{status := Status}} -> Status =/= requested andalso Status =/= running;
_ -> true
end,
Drop andalso wa_raft_storage:delete_snapshot(wa_raft_storage:registered_name(Table, Partition), ?SNAPSHOT_NAME(Index, Term)),
not Drop
end, Transports),
NewState = maps:fold(fun scan_transport/3, State, Transports),
schedule_scan(),
{noreply, State#state{transports = NewTransports}};
{noreply, NewState};
handle_info(Info, #state{} = State) ->
?LOG_NOTICE("received unrecognized info ~P", [Info, 25], #{domain => [whatsapp, wa_raft]}),
{noreply, State}.

-spec terminate(Reason :: term(), #state{}) -> term().
terminate(_Reason, #state{transports = Transport}) ->
terminate(_Reason, #state{transports = Transports, snapshots = Snapshots}) ->
maps:foreach(
fun ({_Peer, _Table, _Partition}, #transport{id = ID}) ->
wa_raft_transport:cancel(ID, terminating)
end, Transports),
maps:foreach(
fun ({_Peer, Table, Partition}, #transport{id = ID, snapshot = #raft_log_pos{index = Index, term = Term}}) ->
wa_raft_transport:cancel(ID, terminating),
wa_raft_storage:delete_snapshot(wa_raft_storage:registered_name(Table, Partition), ?SNAPSHOT_NAME(Index, Term))
end, Transport).
fun ({Table, Partition, LogPos}, _) ->
delete_snapshot(Table, Partition, LogPos)
end, Snapshots).

-spec scan_transport(Key :: key(), Transport :: #transport{}, #state{}) -> #state{}.
scan_transport({_Peer, Table, Partition} = Key, #transport{id = ID, snapshot = LogPos},
#state{transports = Transports, snapshots = Snapshots} = State) ->
Status = case wa_raft_transport:transport_info(ID) of
{ok, #{status := S}} -> S;
_ -> undefined
end,
case Status =:= requested orelse Status =:= running of
true ->
State;
false ->
SnapshotKey = {Table, Partition, LogPos},
NewSnapshots = case Snapshots of
#{SnapshotKey := 1} ->
% try to delete a snapshot if it is the last transport using it
delete_snapshot(Table, Partition, LogPos),
maps:remove(SnapshotKey, Snapshots);
#{SnapshotKey := Count} ->
% otherwise decrement the reference count for the snapshot
Snapshots#{SnapshotKey => Count - 1};
#{} ->
% unexpected that the snapshot is missing, but just ignore
Snapshots
end,
State#state{transports = maps:remove(Key, Transports), snapshots = NewSnapshots}
end.

-spec delete_snapshot(Table :: wa_raft:table(), Partition :: wa_raft:partition(),
Position :: wa_raft_log:log_pos()) -> ok.
delete_snapshot(Table, Partition, #raft_log_pos{index = Index, term = Term}) ->
Storage = wa_raft_storage:registered_name(Table, Partition),
wa_raft_storage:delete_snapshot(Storage, ?SNAPSHOT_NAME(Index, Term)).

-spec schedule_scan() -> reference().
schedule_scan() ->
Expand Down

0 comments on commit 8805d79

Please sign in to comment.