From 8805d798bbc37e7bbac3cab15d37947372e315e1 Mon Sep 17 00:00:00 2001 From: Henry Sun Date: Mon, 5 Feb 2024 22:05:25 -0800 Subject: [PATCH] Track number of transports using snapshot in snapshot catchup Summary: Track the number of transports that are using the same snapshot (a snapshot for a particular table and partition at the same log position) to avoid deleting a snapshot when a previously started transport completes while another transport is still using the same snapshot. Differential Revision: D53444494 fbshipit-source-id: 8a2eb4e54e7977e05772e42da19f12ce020a709e --- src/wa_raft_snapshot_catchup.erl | 76 +++++++++++++++++++++++--------- 1 file changed, 55 insertions(+), 21 deletions(-) diff --git a/src/wa_raft_snapshot_catchup.erl b/src/wa_raft_snapshot_catchup.erl index 724e512..f56e7ed 100644 --- a/src/wa_raft_snapshot_catchup.erl +++ b/src/wa_raft_snapshot_catchup.erl @@ -37,12 +37,17 @@ -define(SCAN_EVERY_MS, 500). -type key() :: {node(), wa_raft:table(), wa_raft:partition()}. +-type snapshot_key() :: {wa_raft:table(), wa_raft:partition(), wa_raft_log:log_pos()}. + -record(transport, { id :: wa_raft_transport:transport_id(), - snapshot :: #raft_log_pos{} + snapshot :: wa_raft_log:log_pos() }). -record(state, { - transports = #{} :: #{key() => #transport{}} + % currently active transports + transports = #{} :: #{key() => #transport{}}, + % counts of active transports that are using a particular snapshot + snapshots = #{} :: #{snapshot_key() => pos_integer()} }). -spec child_spec() -> supervisor:child_spec(). @@ -75,13 +80,13 @@ init([]) -> -spec handle_call(Request :: term(), From :: gen_server:from(), State :: #state{}) -> {noreply, #state{}} | {reply, term(), #state{}}. handle_call(current_snapshot_transports, _From, #state{transports = Transports} = State) -> - {reply, maps:values(Transports), State}; + {reply, [ID || #transport{id = ID} <- maps:values(Transports)], State}; handle_call(Request, From, #state{} = State) -> ?LOG_NOTICE("received unrecognized call ~P from ~0p", [Request, 25, From], #{domain => [whatsapp, wa_raft]}), {noreply, State}. -spec handle_cast({request_snapshot_transport, node(), wa_raft:table(), wa_raft:partition()}, State :: #state{}) -> {noreply, #state{}}. -handle_cast({request_snapshot_transport, Peer, Table, Partition}, #state{transports = Transports} = State) -> +handle_cast({request_snapshot_transport, Peer, Table, Partition}, #state{transports = Transports, snapshots = Snapshots} = State) -> case Transports of #{{Peer, Table, Partition} := _} -> {noreply, State}; @@ -95,7 +100,9 @@ handle_cast({request_snapshot_transport, Peer, Table, Partition}, #state{transpo {ok, ID} = wa_raft_transport:start_snapshot_transfer(Peer, Table, Partition, LogPos, Path, infinity), ?LOG_NOTICE("started sending snapshot for ~0p:~0p at ~0p:~0p over transport ~0p", [Table, Partition, Index, Term, ID], #{domain => [whatsapp, wa_raft]}), - {noreply, State#state{transports = Transports#{{Peer, Table, Partition} => #transport{id = ID, snapshot = LogPos}}}} + NewTransports = Transports#{{Peer, Table, Partition} => #transport{id = ID, snapshot = LogPos}}, + NewSnapshots = maps:update_with({Table, Partition, LogPos}, fun(V) -> V + 1 end, 1, Snapshots), + {noreply, State#state{transports = NewTransports, snapshots = NewSnapshots}} catch _T:_E:S -> ?LOG_ERROR("failed to start accepted snapshot transport of ~0p:~0p to ~0p at ~p", @@ -112,29 +119,56 @@ handle_cast(Request, #state{} = State) -> -spec handle_info(term(), #state{}) -> {noreply, #state{}}. handle_info(scan, #state{transports = Transports} = State) -> - NewTransports = - maps:filter( - fun ({_Peer, Table, Partition}, #transport{id = ID, snapshot = #raft_log_pos{index = Index, term = Term}}) -> - Drop = case wa_raft_transport:transport_info(ID) of - {ok, #{status := Status}} -> Status =/= requested andalso Status =/= running; - _ -> true - end, - Drop andalso wa_raft_storage:delete_snapshot(wa_raft_storage:registered_name(Table, Partition), ?SNAPSHOT_NAME(Index, Term)), - not Drop - end, Transports), + NewState = maps:fold(fun scan_transport/3, State, Transports), schedule_scan(), - {noreply, State#state{transports = NewTransports}}; + {noreply, NewState}; handle_info(Info, #state{} = State) -> ?LOG_NOTICE("received unrecognized info ~P", [Info, 25], #{domain => [whatsapp, wa_raft]}), {noreply, State}. -spec terminate(Reason :: term(), #state{}) -> term(). -terminate(_Reason, #state{transports = Transport}) -> +terminate(_Reason, #state{transports = Transports, snapshots = Snapshots}) -> + maps:foreach( + fun ({_Peer, _Table, _Partition}, #transport{id = ID}) -> + wa_raft_transport:cancel(ID, terminating) + end, Transports), maps:foreach( - fun ({_Peer, Table, Partition}, #transport{id = ID, snapshot = #raft_log_pos{index = Index, term = Term}}) -> - wa_raft_transport:cancel(ID, terminating), - wa_raft_storage:delete_snapshot(wa_raft_storage:registered_name(Table, Partition), ?SNAPSHOT_NAME(Index, Term)) - end, Transport). + fun ({Table, Partition, LogPos}, _) -> + delete_snapshot(Table, Partition, LogPos) + end, Snapshots). + +-spec scan_transport(Key :: key(), Transport :: #transport{}, #state{}) -> #state{}. +scan_transport({_Peer, Table, Partition} = Key, #transport{id = ID, snapshot = LogPos}, + #state{transports = Transports, snapshots = Snapshots} = State) -> + Status = case wa_raft_transport:transport_info(ID) of + {ok, #{status := S}} -> S; + _ -> undefined + end, + case Status =:= requested orelse Status =:= running of + true -> + State; + false -> + SnapshotKey = {Table, Partition, LogPos}, + NewSnapshots = case Snapshots of + #{SnapshotKey := 1} -> + % try to delete a snapshot if it is the last transport using it + delete_snapshot(Table, Partition, LogPos), + maps:remove(SnapshotKey, Snapshots); + #{SnapshotKey := Count} -> + % otherwise decrement the reference count for the snapshot + Snapshots#{SnapshotKey => Count - 1}; + #{} -> + % unexpected that the snapshot is missing, but just ignore + Snapshots + end, + State#state{transports = maps:remove(Key, Transports), snapshots = NewSnapshots} + end. + +-spec delete_snapshot(Table :: wa_raft:table(), Partition :: wa_raft:partition(), + Position :: wa_raft_log:log_pos()) -> ok. +delete_snapshot(Table, Partition, #raft_log_pos{index = Index, term = Term}) -> + Storage = wa_raft_storage:registered_name(Table, Partition), + wa_raft_storage:delete_snapshot(Storage, ?SNAPSHOT_NAME(Index, Term)). -spec schedule_scan() -> reference(). schedule_scan() ->