From 730ad814002b41663dfcc7ac4b7e769be6ccf4ee Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 17 Oct 2024 17:10:01 -0300 Subject: [PATCH] feat: avoid creating atoms to register Fixes https://github.com/emqx/replayq/pull/18#discussion_r1804958356 --- src/replayq.app.src | 3 +- src/replayq.erl | 17 ++------ src/replayq_app.erl | 18 +++++++++ src/replayq_registry.erl | 86 ++++++++++++++++++++++++++++++++++++++++ src/replayq_sup.erl | 43 ++++++++++++++++++++ test/replayq_tests.erl | 52 +++++++++++++++++++++--- 6 files changed, 199 insertions(+), 20 deletions(-) create mode 100644 src/replayq_app.erl create mode 100644 src/replayq_registry.erl create mode 100644 src/replayq_sup.erl diff --git a/src/replayq.app.src b/src/replayq.app.src index 115a5cf..387c341 100644 --- a/src/replayq.app.src +++ b/src/replayq.app.src @@ -1,12 +1,13 @@ {application, replayq, [{description, "A Disk Queue for Log Replay in Erlang"}, {vsn, "git"}, - {registered, []}, + {registered, [replayq_sup, replayq_registry]}, {applications, [kernel, stdlib ]}, {env,[]}, + {mod, {replayq_app, []}}, {modules, []}, {licenses, ["Apache 2.0"]}, {links, [{"GitHub", "https://github.com/emqx/replayq"}]} diff --git a/src/replayq.erl b/src/replayq.erl index def0ed4..cfee4ec 100644 --- a/src/replayq.erl +++ b/src/replayq.erl @@ -12,10 +12,6 @@ default_marshaller/1, default_stop_before_func/2]). --ifdef(TEST). --export([committer_process_name/1]). --endif. - -export_type([config/0, q/0, ack_ref/0, sizer/0, marshaller/0]). -define(NOTHING_TO_ACK, nothing_to_ack). @@ -519,17 +515,12 @@ ensure_deleted(Filename) -> %% The committer writes consumer's acked segmeng number + item ID %% to a file. The file is only read at start/restart. spawn_committer(ReaderSegno, Dir) -> - %% register a name to avoid having two committers spawned for the same dir - RegName = committer_process_name(Dir), - Pid = erlang:spawn_link(fun() -> committer_loop(ReaderSegno, Dir) end), - true = erlang:register(RegName, Pid), + Pid = erlang:spawn_link(fun() -> + committer_loop(ReaderSegno, Dir) + end), + ok = replayq_registry:register_committer(Dir, Pid), Pid. -committer_process_name(Dir) -> - Name = iolist_to_binary(filename:join([Dir, committer])), - NameBin = binary:encode_hex(erlang:md5(Name)), - binary_to_atom(NameBin, utf8). - committer_loop(ReaderSegno, Dir) -> receive ?COMMIT(Segno0, Id0, false) -> diff --git a/src/replayq_app.erl b/src/replayq_app.erl new file mode 100644 index 0000000..26c4c8e --- /dev/null +++ b/src/replayq_app.erl @@ -0,0 +1,18 @@ +-module(replayq_app). + +-behaviour(application). + +%% `application' API +-export([start/2, stop/1]). + +%%------------------------------------------------------------------------------ +%% `application' API +%%------------------------------------------------------------------------------ + +-spec start(application:start_type(), term()) -> {ok, pid()}. +start(_Type, _Args) -> + replayq_sup:start_link(). + +-spec stop(term()) -> ok. +stop(_State) -> + ok. diff --git a/src/replayq_registry.erl b/src/replayq_registry.erl new file mode 100644 index 0000000..2c6c35f --- /dev/null +++ b/src/replayq_registry.erl @@ -0,0 +1,86 @@ +-module(replayq_registry). + +-behaviour(gen_server). + +%% API +-export([ + start_link/0, + + register_committer/2 +]). + +%% `gen_server' API +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2 +]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +%% call/cast/info events +-record(register_committer, {dir :: filename:filename_all(), pid :: pid()}). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +register_committer(Dir, Pid) -> + gen_server:call(?MODULE, #register_committer{dir = Dir, pid = Pid}, infinity). + +%%------------------------------------------------------------------------------ +%% `gen_server' API +%%------------------------------------------------------------------------------ + +init(_Opts) -> + process_flag(trap_exit, true), + State = #{committers => #{}}, + {ok, State}. + +handle_call(#register_committer{dir = Dir, pid = Pid}, _From, State0) -> + %% Should we expand the directory path to avoid tricks with links and relative paths? + #{committers := Committers0} = State0, + case Committers0 of + #{Dir := Pid} -> + %% Already registered to the same pid. + {reply, ok, State0}; + #{Dir := OtherPid} -> + {reply, {error, {already_started, OtherPid}}, State0}; + _ -> + link(Pid), + Committers = add_committer(Committers0, Pid, Dir), + State = State0#{committers := Committers}, + {reply, ok, State} + end; +handle_call(_Call, _From, State) -> + {reply, {error, unknown_call}, State}. + +handle_cast(_Cast, State) -> + {noreply, State}. + +handle_info({'EXIT', Pid, _Reason}, #{committers := Committers0} = State0) when + is_map_key(Pid, Committers0) +-> + {_, Committers} = pop_committer(Committers0, Pid), + State = State0#{committers := Committers}, + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +add_committer(Committers, Pid, Dir) -> + Committers#{Pid => Dir, Dir => Pid}. + +pop_committer(Committers0, Pid) -> + {Dir, Committers1} = maps:take(Pid, Committers0), + {Pid, Committers} = maps:take(Dir, Committers1), + {{Pid, Dir}, Committers}. diff --git a/src/replayq_sup.erl b/src/replayq_sup.erl new file mode 100644 index 0000000..33b5fc8 --- /dev/null +++ b/src/replayq_sup.erl @@ -0,0 +1,43 @@ +-module(replayq_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% `supervisor' API +-export([init/1]). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%%------------------------------------------------------------------------------ +%% `supervisor' API +%%------------------------------------------------------------------------------ + +init([]) -> + Registry = worker_spec(replayq_registry), + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 10 + }, + ChildSpecs = [Registry], + {ok, {SupFlags, ChildSpecs}}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +worker_spec(Mod) -> + #{ + id => Mod, + start => {Mod, start_link, []}, + restart => permanent, + shutdown => 5_000, + type => worker + }. diff --git a/test/replayq_tests.erl b/test/replayq_tests.erl index 56bb6e5..014e673 100644 --- a/test/replayq_tests.erl +++ b/test/replayq_tests.erl @@ -7,6 +7,7 @@ %% the very first run init_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, Config = #{dir => Dir, seg_bytes => 100}, Q1 = replayq:open(Config), @@ -20,6 +21,7 @@ init_test() -> ok = cleanup(Dir). reopen_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, Config = #{dir => Dir, seg_bytes => 100}, Q0 = replayq:open(Config), @@ -47,6 +49,7 @@ volatile_test() -> %% when popping from in-mem segment, the segment size stats may overflow %% but not consuming as much memory offload_in_mem_seg_overflow_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, Config = #{dir => Dir, seg_bytes => 11, offload => true}, Q0 = replayq:open(Config), @@ -62,6 +65,7 @@ offload_in_mem_seg_overflow_test() -> ok = cleanup(Dir). offload_file_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, Config = #{dir => Dir, seg_bytes => 10, offload => true}, Q0 = replayq:open(Config), @@ -89,6 +93,7 @@ offload_file_test() -> ok = cleanup(Dir). offload_reopen_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, Config = #{dir => Dir, seg_bytes => 100, offload => true}, Q0 = replayq:open(Config), @@ -128,11 +133,13 @@ reopen_v0_test() -> ok = cleanup(Dir). append_pop_disk_default_marshaller_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, Config = #{dir => Dir, seg_bytes => 1}, test_append_pop_disk(Config). append_pop_disk_my_marshaller_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, Config = #{dir => Dir, seg_bytes => 1, @@ -144,6 +151,7 @@ append_pop_disk_my_marshaller_test() -> test_append_pop_disk(Config). test_append_pop_disk(#{dir := Dir} = Config) -> + {ok, _} = application:ensure_all_started(replayq), Q0 = replayq:open(Config), Q1 = replayq:append(Q0, [<<"item1">>, <<"item2">>]), Q2 = replayq:append(Q1, [<<"item3">>]), @@ -170,10 +178,12 @@ test_append_pop_disk(#{dir := Dir} = Config) -> ok = cleanup(Dir). append_pop_mem_default_marshaller_test_test() -> + {ok, _} = application:ensure_all_started(replayq), Config = #{mem_only => true}, test_append_pop_mem(Config). append_pop_mem_my_marshaller_test_test() -> + {ok, _} = application:ensure_all_started(replayq), Config = #{mem_only => true, sizer => fun(Item) -> size(Item) end, marshaller => fun(<<"mmp", I/binary>>) -> I; @@ -202,6 +212,7 @@ test_append_pop_mem(Config) -> ok = replayq:close(Q5). append_max_total_bytes_mem_test() -> + {ok, _} = application:ensure_all_started(replayq), Config = #{mem_only => true, sizer => fun(Item) -> size(Item) end, marshaller => fun(<<"mmp", I/binary>>) -> I; @@ -213,6 +224,7 @@ append_max_total_bytes_mem_test() -> ok. append_max_total_bytes_disk_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, Config = #{dir => Dir, seg_bytes => 1, @@ -226,6 +238,7 @@ append_max_total_bytes_disk_test() -> ok = cleanup(Dir). test_append_max_total_bytes(Config) -> + {ok, _} = application:ensure_all_started(replayq), Q0 = replayq:open(Config), ?assertEqual(-10, replayq:overflow(Q0)), Q1 = replayq:append(Q0, [<<"item1">>, <<"item2">>, <<"item3">>, <<"item4">>]), @@ -235,12 +248,14 @@ test_append_max_total_bytes(Config) -> ok = replayq:close(Q2). pop_limit_disk_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, Config = #{dir => Dir, seg_bytes => 1}, ok = test_pop_limit(Config), ok = cleanup(Dir). pop_limit_mem_test() -> + {ok, _} = application:ensure_all_started(replayq), Config = #{mem_only => true}, ok = test_pop_limit(Config). @@ -257,6 +272,7 @@ test_pop_limit(Config) -> ok = replayq:close(Q4). commit_in_the_middle_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, Config = #{dir => Dir, seg_bytes => 1000}, Q0 = replayq:open(Config), @@ -279,6 +295,7 @@ commit_in_the_middle_test() -> ok = cleanup(Dir). first_segment_corrupted_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, SegBytes = 10, Config = #{dir => Dir, seg_bytes => SegBytes}, @@ -300,6 +317,7 @@ first_segment_corrupted_test() -> ok = cleanup(Dir). second_segment_corrupted_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, SegBytes = 10, Config = #{dir => Dir, seg_bytes => SegBytes}, @@ -322,6 +340,7 @@ second_segment_corrupted_test() -> ok = cleanup(Dir). last_segment_corrupted_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, SegBytes = 10, Config = #{dir => Dir, seg_bytes => SegBytes}, @@ -351,6 +370,7 @@ last_segment_corrupted_test() -> ok = cleanup(Dir). corrupted_segment_test_() -> + {ok, _} = application:ensure_all_started(replayq), [{"ramdom", fun() -> test_corrupted_segment(<<"foo">>) end}, {"v0-bad-crc", fun() -> test_corrupted_segment(<<0:8, 0:32, 1:32, 1:8>>) end}, {"v0-zero-crc", fun() -> test_corrupted_segment(<<0:8, 0:32, 0:32, "randomtail">>) end}, @@ -377,12 +397,12 @@ test_corrupted_segment(BadBytes) -> ok = cleanup(Dir). comitter_crash_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir = ?DIR, - ComitterName = replayq:committer_process_name(Dir), Config = #{dir => Dir, seg_bytes => 1000}, - _ = replayq:open(Config), + #{committer := Committer} = replayq:open(Config), erlang:process_flag(trap_exit, true), - ComitterName ! <<"foo">>, + Committer ! <<"foo">>, receive {'EXIT', _Pid, {replayq_committer_unkown_msg, <<"foo">>}} -> ok @@ -391,15 +411,32 @@ comitter_crash_test() -> %% Checks that our spawned committer can register a name for itself when using filepaths %% larger than 255 bytes. huge_filepath_test() -> + {ok, _} = application:ensure_all_started(replayq), Dir0 = ?DIR, Dir = filename:join(Dir0, binary:copy(<<"a">>, 255)), Config = #{dir => Dir, seg_bytes => 1000}, - _ = replayq:open(Config), - CommitterName = replayq:committer_process_name(Dir), - ?assert(is_process_alive(whereis(CommitterName))), + Q = #{committer := Committer} = replayq:open(Config), + ?assert(is_process_alive(Committer)), + replayq:close(Q), + ok. + +%% Checks that we don't allow having the same directory open by multiple replayqs. +same_directory_committer_clash_test() -> + {ok, _} = application:ensure_all_started(replayq), + Dir = ?DIR, + Config = #{dir => Dir, seg_bytes => 1000}, + Q1 = replayq:open(Config), + try replayq:open(Config) of + Q2 -> error({"should not allow opening a second replayq", Q2}) + catch + error:{badmatch, {error, {already_started, _}}} -> + ok + end, + replayq:close(Q1), ok. is_in_mem_test_() -> + {ok, _} = application:ensure_all_started(replayq), [ {"mem queue", fun() -> Q = replayq:open(#{mem_only => true}), true = replayq:is_mem_only(Q), @@ -414,6 +451,7 @@ is_in_mem_test_() -> ]. stop_before_test_() -> + {ok, _} = application:ensure_all_started(replayq), [{"mem queue", fun() -> Config = #{mem_only => true}, @@ -435,6 +473,7 @@ stop_before_test_() -> end}]. stop_before_test(Config) -> + {ok, _} = application:ensure_all_started(replayq), Q0 = replayq:open(Config), Q1 = replayq:append(Q0, [<<"1">>, <<"2">>, <<"3">>, <<"4">>, <<"5">>]), StopBeforeFun = @@ -457,6 +496,7 @@ stop_before_test(Config) -> %% Test that the example in the readme file works stop_before_readme_example_test(Config) -> + {ok, _} = application:ensure_all_started(replayq), Q0 = replayq:open(Config), Q1 = replayq:append(Q0, [