Skip to content

Commit

Permalink
feat: avoid creating atoms to register
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Oct 17, 2024
1 parent 1708143 commit 730ad81
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 20 deletions.
3 changes: 2 additions & 1 deletion src/replayq.app.src
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
{application, replayq,
[{description, "A Disk Queue for Log Replay in Erlang"},
{vsn, "git"},
{registered, []},
{registered, [replayq_sup, replayq_registry]},
{applications,
[kernel,
stdlib
]},
{env,[]},
{mod, {replayq_app, []}},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, [{"GitHub", "https://github.com/emqx/replayq"}]}
Expand Down
17 changes: 4 additions & 13 deletions src/replayq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@
default_marshaller/1,
default_stop_before_func/2]).

-ifdef(TEST).
-export([committer_process_name/1]).
-endif.

-export_type([config/0, q/0, ack_ref/0, sizer/0, marshaller/0]).

-define(NOTHING_TO_ACK, nothing_to_ack).
Expand Down Expand Up @@ -519,17 +515,12 @@ ensure_deleted(Filename) ->
%% The committer writes consumer's acked segmeng number + item ID
%% to a file. The file is only read at start/restart.
spawn_committer(ReaderSegno, Dir) ->
%% register a name to avoid having two committers spawned for the same dir
RegName = committer_process_name(Dir),
Pid = erlang:spawn_link(fun() -> committer_loop(ReaderSegno, Dir) end),
true = erlang:register(RegName, Pid),
Pid = erlang:spawn_link(fun() ->
committer_loop(ReaderSegno, Dir)
end),
ok = replayq_registry:register_committer(Dir, Pid),
Pid.

committer_process_name(Dir) ->
Name = iolist_to_binary(filename:join([Dir, committer])),
NameBin = binary:encode_hex(erlang:md5(Name)),
binary_to_atom(NameBin, utf8).

committer_loop(ReaderSegno, Dir) ->
receive
?COMMIT(Segno0, Id0, false) ->
Expand Down
18 changes: 18 additions & 0 deletions src/replayq_app.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-module(replayq_app).

-behaviour(application).

%% `application' API
-export([start/2, stop/1]).

%%------------------------------------------------------------------------------
%% `application' API
%%------------------------------------------------------------------------------

-spec start(application:start_type(), term()) -> {ok, pid()}.
start(_Type, _Args) ->
replayq_sup:start_link().

-spec stop(term()) -> ok.
stop(_State) ->
ok.
86 changes: 86 additions & 0 deletions src/replayq_registry.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
-module(replayq_registry).

-behaviour(gen_server).

%% API
-export([
start_link/0,

register_committer/2
]).

%% `gen_server' API
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2
]).

%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------

%% call/cast/info events
-record(register_committer, {dir :: filename:filename_all(), pid :: pid()}).

%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

register_committer(Dir, Pid) ->
gen_server:call(?MODULE, #register_committer{dir = Dir, pid = Pid}, infinity).

%%------------------------------------------------------------------------------
%% `gen_server' API
%%------------------------------------------------------------------------------

init(_Opts) ->
process_flag(trap_exit, true),
State = #{committers => #{}},
{ok, State}.

handle_call(#register_committer{dir = Dir, pid = Pid}, _From, State0) ->
%% Should we expand the directory path to avoid tricks with links and relative paths?
#{committers := Committers0} = State0,
case Committers0 of
#{Dir := Pid} ->
%% Already registered to the same pid.
{reply, ok, State0};
#{Dir := OtherPid} ->
{reply, {error, {already_started, OtherPid}}, State0};
_ ->
link(Pid),
Committers = add_committer(Committers0, Pid, Dir),
State = State0#{committers := Committers},
{reply, ok, State}
end;
handle_call(_Call, _From, State) ->
{reply, {error, unknown_call}, State}.

handle_cast(_Cast, State) ->
{noreply, State}.

handle_info({'EXIT', Pid, _Reason}, #{committers := Committers0} = State0) when
is_map_key(Pid, Committers0)
->
{_, Committers} = pop_committer(Committers0, Pid),
State = State0#{committers := Committers},
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.

%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

add_committer(Committers, Pid, Dir) ->
Committers#{Pid => Dir, Dir => Pid}.

pop_committer(Committers0, Pid) ->
{Dir, Committers1} = maps:take(Pid, Committers0),
{Pid, Committers} = maps:take(Dir, Committers1),
{{Pid, Dir}, Committers}.
43 changes: 43 additions & 0 deletions src/replayq_sup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-module(replayq_sup).

-behaviour(supervisor).

%% API
-export([start_link/0]).

%% `supervisor' API
-export([init/1]).

%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

%%------------------------------------------------------------------------------
%% `supervisor' API
%%------------------------------------------------------------------------------

init([]) ->
Registry = worker_spec(replayq_registry),
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 10
},
ChildSpecs = [Registry],
{ok, {SupFlags, ChildSpecs}}.

%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

worker_spec(Mod) ->
#{
id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => 5_000,
type => worker
}.
Loading

0 comments on commit 730ad81

Please sign in to comment.