diff --git a/src/wa_raft_snapshot_catchup.erl b/src/wa_raft_snapshot_catchup.erl index d41afcf..f25448b 100644 --- a/src/wa_raft_snapshot_catchup.erl +++ b/src/wa_raft_snapshot_catchup.erl @@ -34,8 +34,15 @@ terminate/2 ]). +%% Testing API +-export([ + init_tables/0 +]). + -define(SCAN_EVERY_MS, 500). +-define(PENDING_KEY(Peer, Table, Partition), {request_snapshot_transport_pending, Peer, Table, Partition}). + -type key() :: {node(), wa_raft:table(), wa_raft:partition()}. -type snapshot_key() :: {wa_raft:table(), wa_raft:partition(), wa_raft_log:log_pos()}. @@ -75,14 +82,28 @@ current_snapshot_transports() -> -spec request_snapshot_transport(App :: atom(), Peer :: node(), Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> ok. request_snapshot_transport(App, Peer, Table, Partition) -> - gen_server:cast(?MODULE, {request_snapshot_transport, App, Peer, Table, Partition}). + try + % Check ETS to avoid putting duplicate requests into the message queue. + ets:insert_new(?MODULE, {?PENDING_KEY(Peer, Table, Partition)}) andalso + gen_server:cast(?MODULE, {request_snapshot_transport, App, Peer, Table, Partition}), + ok + catch + error:badarg -> + ok + end. -spec init(Args :: term()) -> {ok, #state{}}. init([]) -> process_flag(trap_exit, true), + init_tables(), schedule_scan(), {ok, #state{}}. +-spec init_tables() -> ok. +init_tables() -> + ?MODULE = ets:new(?MODULE, [set, public, named_table]), + ok. + -spec handle_call(Request :: term(), From :: gen_server:from(), State :: #state{}) -> {noreply, #state{}} | {reply, term(), #state{}}. handle_call(current_snapshot_transports, _From, #state{transports = Transports} = State) -> {reply, [ID || #transport{id = ID} <- maps:values(Transports)], State}; @@ -92,6 +113,9 @@ handle_call(Request, From, #state{} = State) -> -spec handle_cast({request_snapshot_transport, atom(), node(), wa_raft:table(), wa_raft:partition()}, State :: #state{}) -> {noreply, #state{}}. handle_cast({request_snapshot_transport, App, Peer, Table, Partition}, #state{transports = Transports, snapshots = Snapshots, overload_backoffs = OverloadBackoffs, retry_backoffs = RetryBackoffs} = State) -> + % Just immediately remove the pending key from the ETS. Doing this here is simpler + % but permits a bounded number of extra requests to remain in the queue. + ets:delete(?MODULE, ?PENDING_KEY(Peer, Table, Partition)), Now = erlang:monotonic_time(millisecond), Key = {Peer, Table, Partition}, Exists = maps:is_key(Key, Transports),