diff --git a/README.md b/README.md
index d4dfdb2..92e7f50 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@
Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
-__Version:__ Aug 5 2014 16:25:27
+__Version:__ Aug 18 2014 16:34:44
__Authors:__ Ulf Wiger ([`ulf.wiger@feuerlabs.com`](mailto:ulf.wiger@feuerlabs.com)), Magnus Feuer ([`magnus.feuer@feuerlabs.com`](mailto:magnus.feuer@feuerlabs.com)).
diff --git a/doc/README.md b/doc/README.md
index 3d1696a..ecc76c2 100644
--- a/doc/README.md
+++ b/doc/README.md
@@ -4,7 +4,7 @@
Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
-__Version:__ Aug 5 2014 16:25:27
+__Version:__ Aug 18 2014 16:34:44
__Authors:__ Ulf Wiger ([`ulf.wiger@feuerlabs.com`](mailto:ulf.wiger@feuerlabs.com)), Magnus Feuer ([`magnus.feuer@feuerlabs.com`](mailto:magnus.feuer@feuerlabs.com)).
diff --git a/doc/exometer_report.md b/doc/exometer_report.md
index a57ea18..e3fbafd 100644
--- a/doc/exometer_report.md
+++ b/doc/exometer_report.md
@@ -121,12 +121,14 @@ as a list of atoms.
-+ `DataPoint`
Specifies the data point within the subscribed-to metric as an atom, or a list of atoms.
++ `DataPoint`
Specifies the data point within the subscribed-to metric
+as an atom, or a list of atoms.
-+ `Interval`
Specifies the interval, in milliseconds, that the subscribed-to
-value will be reported at.
++ `Interval`
Specifies the interval, in milliseconds, that the
+subscribed-to value will be reported at, or an atom, referring to a named
+interval configured in the reporter.
@@ -240,6 +242,30 @@ datapoint() = atom()
+### delay() ###
+
+
+
+
+delay() = time_ms()
+
+
+
+
+
+
+### error() ###
+
+
+
+
+error() = {error, any()}
+
+
+
+
+
+
### extra() ###
@@ -257,7 +283,7 @@ extra() = any()
-interval() = pos_integer()
+interval() = pos_integer() | atom()
@@ -299,12 +325,24 @@ reporter_name() = atom()
Restart specification
+
+
+
+### time_ms() ###
+
+
+
+
+time_ms() = pos_integer()
+
+
+
## Function Index ##
-add_reporter/2 | Add a reporter. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
call_reporter/2 | Send a custom (synchronous) call to Reporter . | ||||||||||||||||||||||||||||||||||||||||||||||||||||
cast_reporter/2 | Send a custom (asynchronous) cast to Reporter . | ||||||||||||||||||||||||||||||||||||||||||||||||||||
disable_me/2 | Used by a reporter to disable itself. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
disable_reporter/1 | Disable Reporter . | ||||||||||||||||||||||||||||||||||||||||||||||||||||
enable_reporter/1 | Enable Reporter . | ||||||||||||||||||||||||||||||||||||||||||||||||||||
list_metrics/0 | Equivalent to list_metrics([]). | ||||||||||||||||||||||||||||||||||||||||||||||||||||
list_metrics/1 | List all metrics matching Path , together with subscription status. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
list_reporters/0 | List the name and pid of each known reporter. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
list_subscriptions/1 | List all subscriptions for Reporter . | ||||||||||||||||||||||||||||||||||||||||||||||||||||
new_entry/1 | Called by exometer whenever a new entry is created. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
remove_reporter/1 | Remove reporter and all its subscriptions. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
remove_reporter/2 | Remove Reporter (non-blocking call). | ||||||||||||||||||||||||||||||||||||||||||||||||||||
setopts/3 | Called by exometer when options of a metric entry are changed. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
start_link/0 | Starts the server
+
++ +Delete a named interval. + ### disable_me/2 ### @@ -429,6 +491,17 @@ a restart. If the reporter was already enabled, nothing is changed. + + +### get_intervals/1 ### + + +
++ +List the named intervals for `Reporter`. ### list_metrics/0 ### @@ -527,6 +600,45 @@ Remove `Reporter` (non-blocking call). This function can be used to order removal of a reporter with a custom reason. Note that the function is asynchronous, making it suitable e.g. for calling from within the reporter itself. + + +### restart_intervals/1 ### + + +
++ + +Restart all named intervals, respecting specified delays. + + +This function can be used if named intervals are added incrementally, and +it is important that all intervals trigger separated by the given delays. + + +### set_interval/3 ### + + +
++ + +Specify a named interval. + + + +See [`add_reporter/2`](#add_reporter-2) for a description of named intervals. +The named interval is here specified as either `Time` (milliseconds) or +`{Time, Delay}`, where a delay in milliseconds is provided. + + +If the named interval exists, it will be replaced with the new definition. +Otherwise, it will be added. Use [`restart_intervals/1`](#restart_intervals-1) if you want +all intervals to be restarted/resynched with corresponding relative delays. ### setopts/3 ### @@ -594,7 +706,10 @@ is either a single data point (an atom) or a list of data points (a list). -`Interval` is the sampling/reporting interval in milliseconds. +`Interval` is the sampling/reporting interval in milliseconds, or an atom, +referring to a named interval configured in the reporter. The named +interval need not be defined yet in the reporter (the subscription will +not trigger until it _is_ defined.) `Extra` can be anything that the chosen reporter understands (default: `[]`). @@ -634,8 +749,8 @@ Removes a subscription. Note that the subscription is identified by the combination -`{Reporter, Metric, DataPoint, Extra}`. The exact information can be extracted -using [`list_subscriptions/1`](#list_subscriptions-1). +`{Reporter, Metric, DataPoint, Extra}`. The exact information can be +extracted using [`list_subscriptions/1`](#list_subscriptions-1). ### unsubscribe_all/2 ### diff --git a/include/exometer.hrl b/include/exometer.hrl index 4915383..eac8541 100644 --- a/include/exometer.hrl +++ b/include/exometer.hrl @@ -10,6 +10,8 @@ -define(EXOMETER_SHARED, exometer_shared). -define(EXOMETER_ENTRIES, exometer_entries). +-define(EXOMETER_SUBS, exometer_subscriptions). +-define(EXOMETER_REPORTERS, exometer_reporters). -record(exometer_event, { time = exometer_util:timestamp(), diff --git a/src/exometer_admin.erl b/src/exometer_admin.erl index 6975a22..b3f481a 100644 --- a/src/exometer_admin.erl +++ b/src/exometer_admin.erl @@ -333,8 +333,19 @@ terminate(_, _) -> ok. code_change(_, S, _) -> + case ets:info(?EXOMETER_REPORTERS, name) of + undefined -> create_reporter_tabs(); + _ -> ok + end, {ok, S}. +create_reporter_tabs() -> + Heir = {heir, whereis(exometer_sup), []}, + ets:new(?EXOMETER_REPORTERS, [public, named_table, set, + {keypos, 2}, Heir]), + ets:new(?EXOMETER_SUBS, [public, named_table, ordered_set, + {keypos, 2}, Heir]). + create_ets_tabs() -> case ets:info(?EXOMETER_SHARED, name) of @@ -344,7 +355,11 @@ create_ets_tabs() -> ets:new(?EXOMETER_SHARED, [public, named_table, ordered_set, {keypos, 2}]), ets:new(?EXOMETER_ENTRIES, [public, named_table, ordered_set, - {keypos, 2}]); + {keypos, 2}]), + ets:new(?EXOMETER_REPORTERS, [public, named_table, set, + {keypos, 2}]), + ets:new(?EXOMETER_SUBS, [public, named_table, ordered_set, + {keypos, 2}]); _ -> true end. diff --git a/src/exometer_function.erl b/src/exometer_function.erl index f4329ba..980ad12 100644 --- a/src/exometer_function.erl +++ b/src/exometer_function.erl @@ -552,6 +552,8 @@ e(A, _) when is_atom(A) -> A; e({T,I}, _) when T==i; T==integer -> I; e({T,A}, _) when T==a; T==atom -> A; e({cons,Eh,Et}, Bs) -> [e(Eh, Bs)|e(Et, Bs)]; +e({hd,E}, Bs) -> hd(e(E, Bs)); +e({tl,E}, Bs) -> tl(e(E, Bs)); e({l, Es}, Bs) -> [e(E, Bs) || E <- Es]; e({T,S}, _) when T==s; T==string -> S; e({T,Es}, Bs) when T==t; T==tuple -> list_to_tuple([e(E,Bs) || E <- Es]); @@ -651,6 +653,18 @@ call1(length , [L]) -> length(L); call1(size , [T]) -> size(T); call1(byte_size, [B]) -> byte_size(B); call1(bit_size , [B]) -> bit_size(B); +call1(tuple_to_list , [T]) -> tuple_to_list(T); +call1(list_to_tuple , [L]) -> list_to_tuple(L); +call1(atom_to_list , [A]) -> atom_to_list(A); +call1(list_to_atom , [L]) -> list_to_atom(L); +call1(list_to_binary, [L]) -> list_to_binary(L); +call1(binary_to_list, [B]) -> binary_to_list(B); +call1(t2l, [T]) -> tuple_to_list(T); +call1(l2t, [L]) -> list_to_tuple(L); +call1(a2l, [A]) -> atom_to_list(A); +call1(l2a, [L]) -> list_to_atom(L); +call1(l2b, [L]) -> list_to_binary(L); +call1(b2l, [B]) -> binary_to_list(B); call1({M,F}, As) when is_atom(M), is_atom(F) -> apply(M, F, As). diff --git a/src/exometer_report.erl b/src/exometer_report.erl index 34cdf01..c6d6f69 100644 --- a/src/exometer_report.erl +++ b/src/exometer_report.erl @@ -76,10 +76,12 @@ %% + `Metric' Specifies the metric that is now subscribed to by the plugin %% as a list of atoms. %% -%% + `DataPoint' Specifies the data point within the subscribed-to metric as an atom, or a list of atoms. +%% + `DataPoint' Specifies the data point within the subscribed-to metric +%% as an atom, or a list of atoms. %% -%% + `Interval' Specifies the interval, in milliseconds, that the subscribed-to -%% value will be reported at. +%% + `Interval' Specifies the interval, in milliseconds, that the +%% subscribed-to value will be reported at, or an atom, referring to a named +%% interval configured in the reporter. %% %% + `State' Contains the state returned by the last called plugin function. %% @@ -155,6 +157,10 @@ list_reporters/0, list_subscriptions/1, add_reporter/2, + set_interval/3, + delete_interval/2, + restart_intervals/1, + get_intervals/1, remove_reporter/1, remove_reporter/2, terminate_reporter/1, enable_reporter/1, @@ -181,6 +187,7 @@ -define(SERVER, ?MODULE). +-type error() :: {error, any()}. -type metric() :: exometer:name() | {find, exometer:name()} | {select, ets:match_spec()}. @@ -188,7 +195,11 @@ -type options() :: [{atom(), any()}]. -type mod_state() :: any(). -type value() :: any(). --type interval() :: pos_integer(). +-type interval() :: pos_integer() | atom(). +-type time_ms() :: pos_integer(). +-type delay() :: time_ms(). +-type named_interval() :: {atom(), time_ms()} + | {atom(), time_ms(), delay()}. -type callback_result() :: {ok, mod_state()} | any(). -type extra() :: any(). -type reporter_name() :: atom(). @@ -243,7 +254,7 @@ -record(subscriber, { key :: #key{}, interval :: interval(), - t_ref :: reference() + t_ref :: reference() | undefined }). -record(restart, { @@ -252,12 +263,20 @@ save_n = 10 :: pos_integer()} ). +-record(interval, { + name :: atom(), + time = 0 :: non_neg_integer(), + delay = 0 :: non_neg_integer(), + t_ref :: reference() | undefined + }). + -record(reporter, { name :: atom(), pid :: pid(), mref :: reference(), module :: module(), opts = [] :: [{atom(), any()}], + intervals = [] :: [#interval{}], restart = #restart{}, status = enabled :: enabled | disabled }). @@ -293,7 +312,10 @@ subscribe(Reporter, Metric, DataPoint, Interval) -> %% a static configuration. `Metric' is the name of an exometer entry. `DataPoint' %% is either a single data point (an atom) or a list of data points (a list). %% -%% `Interval' is the sampling/reporting interval in milliseconds. +%% `Interval' is the sampling/reporting interval in milliseconds, or an atom, +%% referring to a named interval configured in the reporter. The named +%% interval need not be defined yet in the reporter (the subscription will +%% not trigger until it is defined.) %% %% `Extra' can be anything that the chosen reporter understands (default: `[]'). %% If the reporter uses {@link exometer_util:report_type/3}, `Extra' should be @@ -318,8 +340,8 @@ unsubscribe(Reporter, Metric, DataPoint) -> %% @doc Removes a subscription. %% %% Note that the subscription is identified by the combination -%% `{Reporter, Metric, DataPoint, Extra}'. The exact information can be extracted -%% using {@link list_subscriptions/1}. +%% `{Reporter, Metric, DataPoint, Extra}'. The exact information can be +%% extracted using {@link list_subscriptions/1}. %% @end unsubscribe(Reporter, Metric, DataPoint, Extra) -> call({unsubscribe, #key{reporter = Reporter, @@ -382,6 +404,16 @@ list_subscriptions(Reporter) -> %% reporter process will be terminated and subscription timers canceled, but %% the subscriptions will remain, and it will also be possible to add new %% subscriptions to the reporter. +%% +%% `{intervals, [named_interval()]}' +%% named_interval() :: {Name::atom(), Interval::pos_integer()} +%% | {Name::atom(), Interval::time_ms(), delay()::time_ms()} +%% Define named intervals. The name can be used by subscribers, so that all +%% subsriptions for a given named interval will be reported when the interval +%% triggers. An optional delay (in ms) can be given: this will cause the first +%% interval to start in `Delay' milliseconds. When all intervals are named +%% at the same time, the delay parameter can be used to achieve staggered +%% reporting. %% @end add_reporter(Reporter, Options) -> call({add_reporter, Reporter, Options}). @@ -391,6 +423,51 @@ add_reporter(Reporter, Options) -> remove_reporter(Reporter) -> call({remove_reporter, Reporter}). +-spec set_interval(reporter_name(), atom(), + time_ms() | {time_ms(), delay()}) -> ok |error(). +%% @doc Specify a named interval. +%% +%% See {@link add_reporter/2} for a description of named intervals. +%% The named interval is here specified as either `Time' (milliseconds) or +%% `{Time, Delay}', where a delay in milliseconds is provided. +%% +%% If the named interval exists, it will be replaced with the new definition. +%% Otherwise, it will be added. Use {@link restart_intervals/1} if you want +%% all intervals to be restarted/resynched with corresponding relative delays. +%% @end +set_interval(Reporter, Name, Time) when is_atom(Name), + is_integer(Time), Time >= 0 -> + call({set_interval, Reporter, Name, Time}); +set_interval(Reporter, Name, {Time, Delay}) when is_atom(Name), + is_integer(Time), Time >= 0, + is_integer(Delay), + Delay >= 0 -> + call({set_interval, Reporter, Name, {Time, Delay}}). + +-spec delete_interval(reporter_name(), atom()) -> ok | error(). +%% @doc Delete a named interval. +%% +delete_interval(Reporter, Name) -> + call({delete_interval, Reporter, Name}). + +-spec restart_intervals(reporter_name()) -> ok. +%% @doc Restart all named intervals, respecting specified delays. +%% +%% This function can be used if named intervals are added incrementally, and +%% it is important that all intervals trigger separated by the given delays. +%% @end +restart_intervals(Reporter) -> + call({restart_intervals, Reporter}). + +-spec get_intervals(reporter_name()) -> + [{atom(), [{time, pos_integer()} + | {delay, pos_integer()} + | {timer_ref, reference()}]}]. +%% @doc List the named intervals for `Reporter'. +get_intervals(Reporter) -> + call({get_intervals, Reporter}). + + -spec enable_reporter(reporter_name()) -> ok | {error, any()}. %% @doc Enable `Reporter'. %% @@ -435,10 +512,12 @@ disable_me(Mod, St) -> %% instance. Note that the reporter type must recognize the request. %% @end call_reporter(Reporter, Msg) -> - case lists:keyfind(Reporter, 1, list_reporters()) of - {_, Pid} -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{pid = Pid}] when is_pid(Pid) -> exometer_proc:call(Pid, Msg); - false -> + [#reporter{status = disabled}] -> + {error, disabled}; + [] -> {error, {no_such_reporter, Reporter}} end. @@ -449,10 +528,12 @@ call_reporter(Reporter, Msg) -> %% instance. Note that the reporter type must recognize the message. %% @end cast_reporter(Reporter, Msg) -> - case lists:keyfind(Reporter, 1, list_reporters()) of - {_, Pid} -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{pid = Pid}] when is_pid(Pid) -> exometer_proc:cast(Pid, Msg); - false -> + [#reporter{status = disabled}] -> + {error, disabled}; + [] -> {error, {no_such_reporter, Reporter}} end. @@ -485,7 +566,7 @@ setopts(Metric, Options, Status) -> %% `Mod:exometer_newentry(Entry, St)'. %% @end new_entry(Entry) -> - call({new_entry, Entry}). + cast({new_entry, Entry}). %%%=================================================================== %%% gen_server callbacks @@ -517,53 +598,89 @@ do_start_reporters(S) -> %% { reporters, [ {reporter1, [{opt1, val}, ...]}, {reporter2, [...]}]} %% Traverse list of reporter and launch reporter gen servers as dynamic %% supervisor children. - Reporters0 = case lists:keyfind(reporters, 1, Opts) of - {reporters, ReporterList} -> - ReporterRecs = make_reporter_recs(ReporterList), - assert_no_duplicates(ReporterRecs), - lists:foldr( - fun(#reporter{name = Reporter, - status = Status, - opts = ROpts} = R, Acc) -> - Restart = get_restart(ROpts), - {Pid, MRef} = - if Status =:= enabled -> - spawn_reporter(Reporter, ROpts); - true -> {undefined, undefined} - end, - [ R#reporter{pid = Pid, - mref = MRef, - restart = Restart} | Acc] - end, [], ReporterRecs); - false -> - [] - end, + case lists:keyfind(reporters, 1, Opts) of + {reporters, ReporterList} -> + ReporterRecs = make_reporter_recs(ReporterList), + assert_no_duplicates(ReporterRecs), + lists:foreach( + fun(#reporter{name = Reporter, + status = Status, + opts = ROpts, + intervals = Ints0} = R) -> + Restart = get_restart(ROpts), + {Pid, MRef, Ints} = + if Status =:= enabled -> + {P1,R1} = spawn_reporter(Reporter, ROpts), + I1 = start_interval_timers(R), + {P1,R1,I1}; + true -> {undefined, undefined, Ints0} + end, + ets:insert(?EXOMETER_REPORTERS, + R#reporter{pid = Pid, + mref = MRef, + intervals = Ints, + restart = Restart}) + end, ReporterRecs); + false -> + [] + end, %% Dig out configured 'static' subscribers - SubsList = - case lists:keyfind(subscribers, 1, Opts) of - {subscribers, Subscribers} -> - lists:foldr(fun(Subscr, Acc) -> - init_subscriber(Subscr, Acc, Reporters0) - end, [], Subscribers); - false -> [] - end, - - S#st{ - reporters = Reporters0, - subscribers = SubsList - }. + case lists:keyfind(subscribers, 1, Opts) of + {subscribers, Subscribers} -> + lists:foreach(fun init_subscriber/1, Subscribers); + false -> [] + end, + S#st{}. make_reporter_recs([{R, Opts}|T]) -> [#reporter{name = R, module = get_module(R, Opts), status = proplists:get_value(status, Opts, enabled), - opts = Opts}|make_reporter_recs(T)]; + opts = Opts, + intervals = get_interval_opts(Opts)}|make_reporter_recs(T)]; make_reporter_recs([]) -> []. get_module(R, Opts) -> proplists:get_value(module, Opts, R). +-spec get_interval_opts([named_interval() | any()]) -> [#interval{}]. +get_interval_opts(Opts) -> + case lists:keyfind(intervals, 1, Opts) of + false -> []; + {_, Is} -> + lists:map( + fun({Name, Time}) when is_atom(Name), + is_integer(Time), Time >= 0 -> + #interval{name = Name, time = Time}; + ({Name, Time, Delay}) when is_atom(Name), + is_integer(Time), Time >= 0, + is_integer(Delay), Delay >= 0 -> + #interval{name = Name, time = Time, delay = Delay}; + (Other) -> + error({invalid_interval, Other}) + end, Is) + end. + +start_interval_timers(#reporter{name = R, intervals = Ints}) -> + lists:map(fun(I) -> start_interval_timer(I, R) end, Ints). + +start_interval_timer(#interval{name = Name, delay = Delay, + t_ref = Ref} = I, R) -> + cancel_timer(Ref), + case Delay of + 0 -> + do_start_interval_timer(I, R); + D -> + TRef = erlang:send_after(D, self(), {start_interval, R, Name}), + I#interval{t_ref = TRef} + end. + +do_start_interval_timer(#interval{name = Name, time = Time} = I, R) -> + TRef = erlang:send_after(Time, self(), {report_batch, R, Name}), + I#interval{t_ref = TRef}. + + get_report_env() -> Opts0 = exometer_util:get_env(report, []), {Rs1, Opts1} = split_env(reporters, Opts0), @@ -613,11 +730,11 @@ handle_call({subscribe, datapoint = DataPoint, retry_failed_metrics = RetryFailedMetrics, extra = Extra} , Interval }, - _From, #st{reporters = Rs, subscribers = Subs} = St) -> + _From, #st{} = St) -> %% Verify that the given metric/data point actually exist. - case lists:keyfind(Reporter, #reporter.name, Rs) of - #reporter{status = Status} -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{status = Status}] -> case is_valid_metric(Metric, DataPoint) of true -> if Status =:= enabled -> @@ -625,14 +742,14 @@ handle_call({subscribe, DataPoint, Interval, Extra}; true -> ignore end, - Sub = subscribe_(Reporter, Metric, DataPoint, - Interval, RetryFailedMetrics, - Extra, Status), - {reply, ok, St#st{ subscribers = [Sub | Subs] }}; + subscribe_(Reporter, Metric, DataPoint, + Interval, RetryFailedMetrics, + Extra, Status), + {reply, ok, St}; %% Nope - Not found. false -> {reply, not_found, St } end; - false -> + [] -> {reply, unknown_reporter, St} end; @@ -640,35 +757,26 @@ handle_call({unsubscribe, #key{reporter = Reporter, metric = Metric, datapoint = DataPoint, - extra = Extra}}, _, - #st{subscribers = Subs} = St) -> - - {Res, NSubs} = unsubscribe_(Reporter, Metric, DataPoint, Extra, Subs), - {reply, Res, St#st{ subscribers = NSubs } }; + extra = Extra}}, _, #st{} = St) -> + Res = unsubscribe_(Reporter, Metric, DataPoint, Extra), + {reply, Res, St}; handle_call({unsubscribe_all, Reporter, Metric}, _, - #st{subscribers=Subs0}=St) -> - Subs1 = lists:foldl( - fun - (#subscriber{key=#key{metric=Metric1}=Key, t_ref=TRef}, Acc) - when Metric == Metric1 -> - #key{datapoint=Dp, extra=Extra} = Key, - try Reporter ! {exometer_unsubscribe, Metric, Dp, Extra} - catch error:_ -> ok end, - cancel_timer(TRef), - Acc; - (Sub, Acc) -> - [Sub | Acc] - end, [], Subs0), - {reply, ok, St#st{subscribers=Subs1}}; + #st{}=St) -> + Subs = ets:select(?EXOMETER_SUBS, + [{#subscriber{key = #key{reporter = Reporter, + metric = Metric}, + _ = '_'}, [], ['$_']}]), + lists:foreach(fun unsubscribe_/1, Subs), + {reply, ok, St}; handle_call({list_metrics, Path}, _, St) -> DP = lists:foldr(fun(Metric, Acc) -> - retrieve_metric(Metric, St#st.subscribers, Acc) + retrieve_metric(Metric, Acc) end, [], exometer:find_entries(Path)), {reply, {ok, DP}, St}; -handle_call({list_subscriptions, Reporter}, _, #st{subscribers = Subs0} = St) -> +handle_call({list_subscriptions, Reporter}, _, #st{} = St) -> Subs1 = lists:foldl( fun (#subscriber{key=#key{reporter=Rep}}=Sub, Acc) when Reporter == Rep -> @@ -681,55 +789,122 @@ handle_call({list_subscriptions, Reporter}, _, #st{subscribers = Subs0} = St) -> [{Metric, Dp, Interval, Extra} | Acc]; (_, Acc) -> Acc - end, [], Subs0), + end, [], ets:select(?EXOMETER_SUBS, [{'_',[],['$_']}])), {reply, Subs1, St}; -handle_call(list_reporters, _, #st{reporters = Reporters} = St) -> - Info = [{N, Pid} || #reporter{name = N, pid = Pid} <- Reporters], +handle_call(list_reporters, _, #st{} = St) -> + Info = ets:select(?EXOMETER_REPORTERS, + [{#reporter{name = '$1', pid = '$2', _ = '_'}, + [], [{{'$1', '$2'}}]}]), {reply, Info, St}; -handle_call({add_reporter, Reporter, Opts}, _, #st{reporters = Rs} = St) -> - case lists:keymember(Reporter, #reporter.name, Rs) of +handle_call({add_reporter, Reporter, Opts}, _, #st{} = St) -> + case ets:member(?EXOMETER_REPORTERS, Reporter) of true -> {reply, {error, already_running}, St}; false -> try + [R] = make_reporter_recs([{Reporter, Opts}]), {Pid, MRef} = spawn_reporter(Reporter, Opts), - Rs1 = [#reporter {name = Reporter, - module = get_module(Reporter, Opts), - opts = Opts, - pid = Pid, - mref = MRef} | Rs], - {reply, ok, St#st{reporters = Rs1}} + Ints = start_interval_timers(R), + R1 = R#reporter{intervals = Ints, + pid = Pid, + mref = MRef}, + ets:insert(?EXOMETER_REPORTERS, R1), + {reply, ok, St} catch error:Reason -> {reply, {error, Reason}, St} end end; -handle_call({remove_reporter, Reporter}, _, St0) -> - case do_remove_reporter(Reporter, St0) of - {ok, St1} -> - {reply, ok, St1}; +handle_call({remove_reporter, Reporter}, _, St) -> + case do_remove_reporter(Reporter) of + ok -> + {reply, ok, St}; E -> - {reply, E, St0} + {reply, E, St} end; -handle_call({change_reporter_status, Reporter, Status}, _, St0) -> - case change_reporter_status(Status, Reporter, St0) of - {ok, St1} -> - {reply, ok, St1}; +handle_call({change_reporter_status, Reporter, Status}, _, St) -> + case change_reporter_status(Reporter, Status) of + ok -> + {reply, ok, St}; E -> - {reply, E, St0} + {reply, E, St} + end; +handle_call({set_interval, Reporter, Name, Int}, _, #st{}=St) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{intervals = Ints}] -> + try + I0 = case lists:keyfind(Name, #interval.name, Ints) of + false -> #interval{name = Name}; + Interval -> Interval + end, + I1 = case Int of + {Time, Delay} when is_integer(Time), Time >= 0, + is_integer(Delay), Delay >= 0 -> + I0#interval{time = Time, delay = Delay}; + Time when is_integer(Time), Time >= 0 -> + I0#interval{time = Time} + end, + ets:update_element(?EXOMETER_REPORTERS, Reporter, + [{#reporter.intervals, + lists:keystore( + Name, #interval.name, Ints, + start_interval_timer(I1, Reporter))}]), + {reply, ok, St} + catch + error:Reason -> + {reply, {error, Reason}, St} + end; + [] -> + {reply, {error, not_found}, St} + end; +handle_call({delete_interval, Reporter, Name}, _, #st{} = St) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{intervals = Ints}] -> + case lists:keyfind(Name, #interval.name, Ints) of + #interval{t_ref = TRef} -> + cancel_timer(TRef), + ets:update_element(?EXOMETER_REPORTERS, Reporter, + [{#reporter.intervals, + lists:keydelete( + Name, #interval.name, Ints)}]), + {reply, ok, St}; + false -> + {reply, {error, not_found}, St} + end; + [] -> + {reply, {error, not_found}, St} + end; +handle_call({restart_intervals, Reporter}, _, #st{} = St) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{} = R] -> + Ints = start_interval_timers(R), + ets:update_element(?EXOMETER_REPORTERS, Reporter, + [{#reporter.intervals, Ints}]), + {reply, ok, St}; + [] -> + {reply, {error, not_found}, St} + end; +handle_call({get_intervals, Reporter}, _, #st{} = St) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{intervals = Ints}] -> + Info = + [{Name, [{time, T}, + {delay, D}, + {timer_ref, TR}]} || #interval{name = Name, + time = T, + delay = D, + t_ref = TR} <- Ints], + {reply, Info, St}; + [] -> + {reply, {error, not_found}, St} end; -handle_call({setopts, Metric, Options, Status}, _, #st{reporters=Rs}=St) -> +handle_call({setopts, Metric, Options, Status}, _, #st{}=St) -> [erlang:send(Pid, {exometer_setopts, Metric, Options, Status}) - || #reporter{pid = Pid} <- Rs], - {reply, ok, St}; - -handle_call({new_entry, Entry}, _, #st{reporters=Rs}=St) -> - [erlang:send(Pid, {exometer_newentry, Entry}) - || #reporter{pid = Pid} <- Rs], + || Pid <- reporter_pids()], {reply, ok, St}; handle_call(_Request, _From, State) -> @@ -744,27 +919,28 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_cast({remove_reporter, Reporter, Reason}, St0) -> +handle_cast({new_entry, Entry}, #st{} = St) -> + [try erlang:send(Pid, {exometer_newentry, Entry}) catch error:_ -> ok end + || Pid <- reporter_pids()], + maybe_enable_subscriptions(Entry), + {noreply, St}; + +handle_cast({remove_reporter, Reporter, Reason}, St) -> Terminate = case Reason of user -> true; _ -> false end, - case do_remove_reporter(Reporter, St0, Terminate) of - {ok, St1} -> - {noreply, St1}; - _ -> - {noreply, St0} - end; -handle_cast({disable, Pid}, #st{reporters = Rs} = St) -> - case lists:keyfind(Pid, #reporter.pid, Rs) of - #reporter{name = Reporter} -> - {ok, St1} = change_reporter_status(disabled, Reporter, St), - {noreply, St1}; - false -> - {noreply, St} - end; + do_remove_reporter(Reporter, Terminate), + {noreply, St}; +handle_cast({disable, Pid}, #st{} = St) -> + case reporter_by_pid(Pid) of + [#reporter{} = Reporter] -> + do_change_reporter_status(Reporter, disabled); + [] -> ok + end, + {noreply, St}; handle_cast(_Msg, State) -> {noreply, State}. @@ -777,14 +953,50 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- +handle_info({start_interval, Reporter, Name}, #st{} = St) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{intervals = Ints}] -> + case lists:keyfind(Name, #interval.name, Ints) of + #interval{} = I -> + I1 = do_start_interval_timer(I, Reporter), + ets:update_element(?EXOMETER_REPORTERS, Reporter, + [{#reporter.intervals, + lists:keyreplace( + Name, #interval.name, Ints, I1)}]); + false -> + ok + end; + [] -> + ok + end, + {noreply, St}; +handle_info({report_batch, Reporter, Name}, #st{} = St) -> + %% Find all entries where reporter is Reporter and interval is Name, + %% and report them. + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [R] -> + Entries = ets:select(?EXOMETER_SUBS, + [{#subscriber{key = #key{reporter = Reporter, + _ = '_'}, + interval = Name, + _ = '_'}, [], ['$_']}]), + lists:foreach( + fun(#subscriber{key = Key}) -> + do_report(Key, Name) + end, Entries), + restart_batch_timer(Name, R); + [] -> + skip + end, + {noreply, St}; handle_info({ report, #key{ reporter = Reporter, metric = Metric, datapoint = DataPoint, retry_failed_metrics = RetryFailedMetrics, - extra = Extra} = Key, Interval}, - #st{subscribers = Subs} = St) -> - case lists:keyfind(Key, #subscriber.key, Subs) of - #subscriber{} = Sub -> + extra = Extra} = Key, Interval}, #st{} = St) -> + case ets:member(?EXOMETER_SUBS, Key) of + true -> + do_report(Key, Interval), case {RetryFailedMetrics, get_values(Metric, DataPoint)} of %% We found a value, or values. {_, [_|_] = Found} -> @@ -793,15 +1005,8 @@ handle_info({ report, #key{ reporter = Reporter, || {DP, Val} <- Values] || {Name, Values} <- Found], %% Re-arm the timer for next round - TRef = erlang:send_after(Interval, self(), - {report, Key, Interval}), - - %% Replace the pid_subscriber info with a record having - %% the new timer ref. - {noreply, St#st{subscribers = - lists:keyreplace( - Key, #subscriber.key, Subs, - Sub#subscriber{ t_ref = TRef })}}; + restart_subscr_timer(Key, Interval), + {noreply, St}; %% We did not find a value, but we should try again. {true, _ } -> @@ -812,15 +1017,8 @@ handle_info({ report, #key{ reporter = Reporter, true -> ok end, %% Re-arm the timer for next round - TRef = erlang:send_after(Interval, self(), - {report, Key, Interval}), - - %% Replace the pid_subscriber info with a record having - %% the new timer ref. - {noreply, St#st{subscribers = - lists:keyreplace( - Key, #subscriber.key, Subs, - Sub#subscriber{ t_ref = TRef })}}; + restart_subscr_timer(Key, Interval), + {noreply, St}; %% We did not find a value, and we should not retry. _ -> %% Entry removed while timer in progress. @@ -834,61 +1032,134 @@ handle_info({ report, #key{ reporter = Reporter, {noreply, St} end; -handle_info({'DOWN', Ref, process, _Pid, Reason}, - #st{reporters = Rs} = S) -> - S1 = case lists:keyfind(Ref, #reporter.mref, Rs) of - #reporter {module = Module, restart = Restart} = R -> - case add_restart(Restart) of - {remove, How} -> - case How of - {M, F} when is_atom(M), is_atom(F) -> - try M:F(Module, Reason) catch _:_ -> ok end; - _ -> - ok - end, - S; - {restart, Restart1} -> - restart_reporter(R#reporter{restart = Restart1}, S) - end; - _ -> S - end, - {noreply, S1}; +handle_info({'DOWN', Ref, process, _Pid, Reason}, #st{} = S) -> + case reporter_by_mref(Ref) of + [#reporter{module = Module, restart = Restart} = R] -> + case add_restart(Restart) of + {remove, How} -> + case How of + {M, F} when is_atom(M), is_atom(F) -> + try M:F(Module, Reason) catch _:_ -> ok end; + _ -> + ok + end, + S; + {restart, Restart1} -> + restart_reporter(R#reporter{restart = Restart1}) + end; + _ -> S + end, + {noreply, S}; handle_info(_Info, State) -> ?warning("exometer_report:info(??): ~p~n", [ _Info ]), {noreply, State}. -restart_reporter(#reporter{name = Name, opts = Opts} = R, - #st{subscribers = Subs, reporters = Reporters} = S) -> +restart_reporter(#reporter{name = Name, opts = Opts, restart = Restart}) -> {Pid, MRef} = spawn_reporter(Name, Opts), - Subs1 = re_subscribe(Subs, Name), - R1 = R#reporter{pid = Pid, mref = MRef, status = enabled}, - S#st{subscribers = Subs1, - reporters = lists:keyreplace(Name, #reporter.name, Reporters, R1)}. - -re_subscribe([#subscriber{key = #key{reporter = RName, - metric = Metric, - datapoint = DataPoint, - extra = Extra} = Key, - t_ref = OldTRef, - interval = Interval} = S | Subs], RName) -> - RName ! {exometer_subscribe, Metric, DataPoint, Interval, Extra}, + [resubscribe(S) || + S <- ets:select(?EXOMETER_SUBS, + [{#subscriber{key = #key{reporter = Name, + _ = '_'}, + _ = '_'}, [], ['$_']}])], + ets:update_element(?EXOMETER_REPORTERS, Name, + [{#reporter.pid, Pid}, + {#reporter.mref, MRef}, + {#reporter.restart, Restart}, + {#reporter.status, enabled}]), + ok. + +%% If there are already subscriptions, enable them. +maybe_enable_subscriptions(Entry) -> + lists:foreach( + fun(#subscriber{key = #key{reporter = RName}} = S) -> + case get_reporter_status(RName) of + enabled -> + resubscribe(S); + _ -> + ok + end + end, ets:select(?EXOMETER_SUBS, + [{#subscriber{key = #key{metric = Entry, + _ = '_'}, + _ = '_'}, [], ['$_']}])). + +resubscribe(#subscriber{key = #key{reporter = RName, + metric = Metric, + datapoint = DataPoint, + extra = Extra} = Key, + t_ref = OldTRef, + interval = Interval}) -> + try_send(RName, {exometer_subscribe, Metric, DataPoint, Interval, Extra}), cancel_timer(OldTRef), TRef = erlang:send_after(Interval, self(), {report, Key, Interval}), - [S#subscriber{t_ref = TRef} | re_subscribe(Subs, RName)]; -re_subscribe([S|Subs], R) -> - [S|re_subscribe(Subs, R)]; -re_subscribe([], _) -> - []. + ets:update_element(?EXOMETER_SUBS, Key, [{#subscriber.t_ref, TRef}]). + -cancel_subscr_timers(Reporter, Subs) -> - lists:map( - fun(#subscriber{key = #key{reporter = R}, - t_ref = TRef} = S) when R =:= Reporter -> +do_report(#key{reporter = Reporter, + metric = Metric, + datapoint = DataPoint, + retry_failed_metrics = RetryFailedMetrics, + extra = Extra} = Key, Interval) -> + case {RetryFailedMetrics, get_values(Metric, DataPoint)} of + %% We found a value, or values. + {_, [_|_] = Found} -> + %% Distribute metric value to the correct process + [[report_value(Reporter, Name, DP, Extra, Val) + || {DP, Val} <- Values] || {Name, Values} <- Found], + %% Re-arm the timer for next round + restart_subscr_timer(Key, Interval); + %% We did not find a value, but we should try again. + {true, _ } -> + if is_list(Metric) -> + ?debug("Metric(~p) Datapoint(~p) not found." + " Will try again in ~p msec~n", + [Metric, DataPoint, Interval]); + true -> ok + end, + %% Re-arm the timer for next round + restart_subscr_timer(Key, Interval); + %% We did not find a value, and we should not retry. + _ -> + %% Entry removed while timer in progress. + ?warning("Metric(~p) Datapoint(~p) not found. Will not try again~n", + [Metric, DataPoint]) + end, + ok. + + +cancel_subscr_timers(Reporter) -> + lists:foreach( + fun(#subscriber{key = Key, t_ref = TRef}) -> cancel_timer(TRef), - S#subscriber{t_ref = undefined}; - (S) -> S - end, Subs). + ets:update_element(?EXOMETER_SUBS, Key, + [{#subscriber.t_ref, undefined}]) + end, ets:select(?EXOMETER_SUBS, + [{#subscriber{key = #key{reporter = Reporter, + _ = '_'}, + _ = '_'}, [], ['$_']}])). + +restart_subscr_timer(Key, Interval) when is_integer(Interval) -> + TRef = erlang:send_after(Interval, self(), + {report, Key, Interval}), + ets:update_element(?EXOMETER_SUBS, Key, + [{#subscriber.t_ref, TRef}]); +restart_subscr_timer(_, _) -> + true. + +restart_batch_timer(Name, #reporter{name = Reporter, + intervals = Ints}) -> + case lists:keyfind(Name, #interval.name, Ints) of + #interval{time = Time} = I -> + TRef = erlang:send_after(Time, self(), + {report_batch, Reporter, Name}), + ets:update_element(?EXOMETER_REPORTERS, Reporter, + [{#reporter.intervals, + lists:keyreplace(Name, #interval.name, Ints, + I#interval{t_ref = TRef})}]); + false -> + false + end. cancel_timer(undefined) -> false; @@ -907,8 +1178,8 @@ cancel_timer(TRef) -> %% @spec terminate(Reason, State) -> void() %% @end %%-------------------------------------------------------------------- -terminate(_Reason, #st{reporters=Rs}) -> - rpc:pmap({?MODULE, terminate_reporter}, [], Rs), +terminate(_Reason, _) -> + [terminate_reporter(R) || R <- ets:tab2list(?EXOMETER_REPORTERS)], ok. %%-------------------------------------------------------------------- @@ -928,7 +1199,7 @@ terminate(_Reason, #st{reporters=Rs}) -> %% opts = [] :: [{atom(), any()}], %% restart = #restart{} %% }). -code_change(_OldVan, #st{reporters = Rs} = S, _Extra) -> +code_change(_OldVan, #st{reporters = Rs, subscribers = Ss} = S, _Extra) -> Rs1 = lists:map( fun({reporter,Pid,MRef,Module,Opts,Restart}) -> #reporter{name = Module, pid = Pid, mref = MRef, @@ -938,9 +1209,15 @@ code_change(_OldVan, #st{reporters = Rs} = S, _Extra) -> #reporter{name = Name, pid = Pid, mref = MRef, module = Module, opts = Opts, restart = Restart}; + ({reporter,Name,Pid,Mref,Module,Opts,Restart,Status}) -> + #reporter{name = Name, pid = Pid, mref = Mref, + module = Module, opts = Opts, + restart = Restart, status = Status}; (#reporter{} = R) -> R end, Rs), - {ok, S#st{reporters = Rs1}}; + [ets:insert(?EXOMETER_REPORTERS, R) || R <- Rs1], + [ets:insert(?EXOMETER_SUBS, Sub) || Sub <- Ss], + {ok, S#st{reporters = [], subscribers = []}}; code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -948,6 +1225,25 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== +reporter_pids() -> + ets:select(?EXOMETER_REPORTERS, + [{#reporter{pid = '$1', _ = '_'}, [], ['$1']}]). + +reporter_by_pid(Pid) -> + ets:select(?EXOMETER_REPORTERS, + [{#reporter{pid = Pid, _='_'}, [], ['$_']}]). + +reporter_by_mref(Ref) -> + ets:select(?EXOMETER_REPORTERS, + [{#reporter{mref = Ref, _='_'}, [], ['$_']}]). + +try_send(To, Msg) -> + try To ! Msg + catch + error:_ -> + Msg + end. + is_valid_metric({find, Name}, _DataPoint) when is_list(Name) -> true; is_valid_metric({select, Name}, _DataPoint) when is_list(Name) -> @@ -1054,36 +1350,40 @@ subscribe_(Reporter, Metric, DataPoint, Interval, RetryFailedMetrics, extra = Extra, retry_failed_metrics = RetryFailedMetrics }, + ets:insert(?EXOMETER_SUBS, + #subscriber{key = Key, + interval = Interval, + t_ref = maybe_send_after(Status, Key, Interval)}). - %% FIXME: Validate Metric and datapoint - %% ?info("Subscribe_(Intv(~p), self(~p))~n", [ Interval, self()]), - #subscriber{key = Key, - interval = Interval, - t_ref = maybe_send_after(Status, Key, Interval)}. - -maybe_send_after(enabled, Key, Interval) -> +maybe_send_after(enabled, Key, Interval) when is_integer(Interval) -> erlang:send_after(Interval, self(), {report, Key, Interval}); maybe_send_after(_, _, _) -> undefined. -unsubscribe_(Reporter, Metric, DataPoint, Extra, Subs) -> - ?info("unsubscribe_(~p, ~p, ~p, ~p, ~p)~n", - [ Reporter, Metric, DataPoint, Extra, Subs]), - case lists:keytake(#key{reporter = Reporter, - metric = Metric, - datapoint = DataPoint, - extra = Extra}, - #subscriber.key, Subs) of - {value, #subscriber{t_ref = TRef}, Rem} -> - %% FIXME: Validate Metric and datapoint - try Reporter ! { exometer_unsubscribe, Metric, DataPoint, Extra } - catch error:_ -> ok end, - cancel_timer(TRef), - {ok, Rem}; - _ -> - {not_found, Subs} +unsubscribe_(Reporter, Metric, DataPoint, Extra) -> + ?info("unsubscribe_(~p, ~p, ~p, ~p)~n", + [ Reporter, Metric, DataPoint, Extra]), + case ets:lookup(?EXOMETER_SUBS, #key{reporter = Reporter, + metric = Metric, + datapoint = DataPoint, + extra = Extra}) of + [#subscriber{} = Sub] -> + unsubscribe_(Sub); + [] -> + not_found end. +unsubscribe_(#subscriber{key = #key{reporter = Reporter, + metric = Metric, + datapoint = DataPoint, + extra = Extra} = Key, t_ref = TRef}) -> + try_send( + Reporter, {exometer_unsubscribe, Metric, DataPoint, Extra}), + cancel_timer(TRef), + ets:delete(?EXOMETER_SUBS, Key), + ok. + + report_value(Reporter, Metric, DataPoint, Extra, Val) -> try Reporter ! {exometer_report, Metric, DataPoint, Extra, Val}, true @@ -1092,17 +1392,21 @@ report_value(Reporter, Metric, DataPoint, Extra, Val) -> exit:_ -> false end. -retrieve_metric({Metric, Type, Enabled}, Subscribers, Acc) -> +retrieve_metric({Metric, Type, Enabled}, Acc) -> + Cands = ets:select( + ?EXOMETER_SUBS, + [{#subscriber{key = #key{metric = Metric, _='_'}, + _ = '_'}, [], ['$_']}]), [ { Metric, exometer:info(Metric, datapoints), - get_subscribers(Metric, Type, Enabled, Subscribers), Enabled } | Acc ]. + get_subscribers(Metric, Type, Enabled, Cands), Enabled } | Acc ]. -find_entries_in_list(find, Path, List) -> - Pat = Path ++ '_', - Spec = ets:match_spec_compile([{ {Pat, '_', '_'}, [], ['$_'] }]), - ets:match_spec_run(List, Spec); -find_entries_in_list(select, Pat, List) -> - Spec = ets:match_spec_compile(Pat), - ets:match_spec_run(List, Spec). +%% find_entries_in_list(find, Path, List) -> +%% Pat = Path ++ '_', +%% Spec = ets:match_spec_compile([{ {Pat, '_', '_'}, [], ['$_'] }]), +%% ets:match_spec_run(List, Spec); +%% find_entries_in_list(select, Pat, List) -> +%% Spec = ets:match_spec_compile(Pat), +%% ets:match_spec_run(List, Spec). get_subscribers(_Metric, _Type, _Status, []) -> []; @@ -1118,20 +1422,20 @@ get_subscribers(Metric, Type, Status, ?debug("get_subscribers(~p, ~p, ~p): match~n", [ Metric, SDataPoint, SReporter]), [ { SReporter, SDataPoint } | get_subscribers(Metric, Type, Status, T) ]; -get_subscribers(Metric, Type, Status, - [ #subscriber { - key = #key { - metric = {How, Path}, - reporter = SReporter, - datapoint = SDataPoint - }} | T ]) -> - case find_entries_in_list(How, Path, [{Metric, Type, Status}]) of - [] -> - get_subscribers(Metric, Type, Status, T); - [_] -> - [ { SReporter, SDataPoint } - | get_subscribers(Metric, Type, Status, T) ] - end; +%% get_subscribers(Metric, Type, Status, +%% [ #subscriber { +%% key = #key { +%% metric = {How, Path}, +%% reporter = SReporter, +%% datapoint = SDataPoint +%% }} | T ]) -> +%% case find_entries_in_list(How, Path, [{Metric, Type, Status}]) of +%% [] -> +%% get_subscribers(Metric, Type, Status, T); +%% [_] -> +%% [ { SReporter, SDataPoint } +%% | get_subscribers(Metric, Type, Status, T) ] +%% end; %% This subscription does not match Metric. get_subscribers(Metric, Type, Status, @@ -1147,19 +1451,19 @@ get_subscribers(Metric, Type, Status, %% Purge all subscriptions associated with a specific reporter %% (that just went down). -purge_subscriptions(R, Subs) -> +purge_subscriptions(R) -> %% Go through all #subscriber elements in Subs and %% cancel the timer of those who match the provided reporter %% %% Return new #subscriber list with all original subscribers %% that do not reference reporter R. - lists:foldr(fun(#subscriber { key = #key {reporter = Rptr}, - t_ref = TRef}, Acc) when Rptr =:= R-> - cancel_timer(TRef), - Acc; - (Subscriber, Acc) -> - [ Subscriber | Acc ] - end, [], Subs). + Subs = ets:select(?EXOMETER_SUBS, + [{#subscriber{key = #key{reporter = R, _='_'}, + _ = '_'}, [], ['$_']}]), + lists:foreach(fun(#subscriber {key = Key, t_ref = TRef}) -> + cancel_timer(TRef), + ets:delete(?EXOMETER_SUBS, Key) + end, Subs). %% Called by the spawn_monitor() call in init %% Loop and run reporters. @@ -1242,57 +1546,49 @@ call(Req) -> cast(Req) -> gen_server:cast(?MODULE, Req). +init_subscriber({Reporter, Metric, DataPoint, Interval, RetryFailedMetrics}) -> + Status = get_reporter_status(Reporter), + subscribe_(Reporter, Metric, DataPoint, Interval, + RetryFailedMetrics, undefined, Status); init_subscriber({Reporter, Metric, DataPoint, Interval, - RetryFailedMetrics}, Acc, Rs) -> - Status = get_reporter_status(Reporter, Rs), - [subscribe_(Reporter, Metric, DataPoint, Interval, - RetryFailedMetrics, undefined, Status) | Acc]; - -init_subscriber({Reporter, Metric, DataPoint, Interval, - RetryFailedMetrics, Extra}, Acc, Rs) -> - Status = get_reporter_status(Reporter, Rs), - [subscribe_(Reporter, Metric, DataPoint, Interval, - RetryFailedMetrics, Extra, Status) | Acc]; - -init_subscriber({Reporter, Metric, DataPoint, Interval}, Acc, Rs) -> - Status = get_reporter_status(Reporter, Rs), - [subscribe_(Reporter, Metric, DataPoint, Interval, - true, undefined, Status) | Acc]; - -init_subscriber({apply, {M, F, A}}, Acc, Rs) -> - lists:foldr(fun(Sub, Acc1) -> - init_subscriber(Sub, Acc1, Rs) - end, Acc, apply(M, F, A)); - -init_subscriber({select, Expr}, Acc, Rs) when tuple_size(Expr)==3; - tuple_size(Expr)==4; - tuple_size(Expr)==5 -> + RetryFailedMetrics, Extra}) -> + Status = get_reporter_status(Reporter), + subscribe_(Reporter, Metric, DataPoint, Interval, + RetryFailedMetrics, Extra, Status); +init_subscriber({Reporter, Metric, DataPoint, Interval}) -> + Status = get_reporter_status(Reporter), + subscribe_(Reporter, Metric, DataPoint, Interval, + true, undefined, Status); +init_subscriber({apply, {M, F, A}}) -> + lists:foreach(fun(Sub) -> + init_subscriber(Sub) + end, apply(M, F, A)); +init_subscriber({select, Expr}) when tuple_size(Expr)==3; + tuple_size(Expr)==4; + tuple_size(Expr)==5 -> {Pattern, Reporter, DataPoint, Interval, Retry, Extra} = case Expr of {P, R, D, I} -> {P, R, D, I, true, undefined}; {P, R, D, I, Rf} -> {P, R, D, I, Rf, undefined}; {P, R, D, I, Rf, X} -> {P, R, D, I, Rf, X} end, - Status = get_reporter_status(Reporter, Rs), + Status = get_reporter_status(Reporter), Entries = exometer:select(Pattern), - lists:foldr( - fun({Entry, _, _}, Acc1) -> - [subscribe_(Reporter, Entry, DataPoint, Interval, - Retry, Extra, Status) - | Acc1] - end, Acc, Entries); + lists:foreach( + fun({Entry, _, _}) -> + subscribe_(Reporter, Entry, DataPoint, Interval, + Retry, Extra, Status) + end, Entries); -init_subscriber(Other, Acc, _) -> +init_subscriber(Other) -> ?warning("Incorrect static subscriber spec ~p. " - "Use { Reporter, Metric, DataPoint, Interval [, Extra ]}~n", [ Other ]), - Acc. + "Use { Reporter, Metric, DataPoint, Interval [, Extra ]}~n", + [ Other ]). -get_reporter_status(R, Rs) -> - case lists:keyfind(R, #reporter.name, Rs) of - #reporter{status = St} -> - St; - false -> - disabled +get_reporter_status(R) -> + try ets:lookup_element(?EXOMETER_REPORTERS, R, #reporter.status) + catch + error:_ -> disabled end. add_restart(#restart{spec = Spec, @@ -1367,43 +1663,40 @@ valid_restart(L) when is_list(L) -> end, L), L. -do_remove_reporter(Reporter, St0) -> - do_remove_reporter(Reporter, St0, true). +do_remove_reporter(Reporter) -> + do_remove_reporter(Reporter, true). -do_remove_reporter(Reporter, #st{subscribers=Subs, reporters=Rs}=St0, Terminate) -> - case lists:keyfind(Reporter, #reporter.name, Rs) of - #reporter{} = R -> +do_remove_reporter(Reporter, Terminate) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [#reporter{} = R] -> case Terminate of true -> terminate_reporter(R); false -> ok end, - St1 = St0#st{reporters = lists:keydelete( - Reporter, #reporter.name, Rs), - subscribers = purge_subscriptions( - Reporter, Subs)}, - {ok, St1}; - false -> + ets:delete(?EXOMETER_REPORTERS, Reporter), + purge_subscriptions(Reporter), + ok; + [] -> {error, not_found} end. -change_reporter_status(New, Reporter, #st{subscribers = Subs, - reporters = Rs} = St0) -> - case lists:keyfind(Reporter, #reporter.name, Rs) of - #reporter{status = disabled} = R when New==enabled -> - St1 = restart_reporter(R, St0), - {ok, St1}; - #reporter{status = enabled} = R when New==disabled -> - Subs1 = cancel_subscr_timers(Reporter, Subs), - terminate_reporter(R), - St1 = St0#st{reporters = lists:keyreplace( - Reporter, #reporter.name, Rs, - R#reporter{status = disabled}), - subscribers = Subs1}, - {ok, St1}; - #reporter{status = New} -> - {ok, St0}; - false -> - {error, not_found} +change_reporter_status(Reporter, New) -> + case ets:lookup(?EXOMETER_REPORTERS, Reporter) of + [R] -> do_change_reporter_status(R, New); + [] -> {error, not_found} end. + +do_change_reporter_status(#reporter{name = Reporter, + status = Old} = R, New) -> + case {Old, New} of + {disabled, enabled} -> + restart_reporter(R); + {enabled, disabled} -> + cancel_subscr_timers(Reporter), + terminate_reporter(R), + ets:update_element(?EXOMETER_REPORTERS, + Reporter, [{#reporter.status, disabled}]) + end, + ok. diff --git a/src/exometer_report_snmp.erl b/src/exometer_report_snmp.erl index ad29fd0..80816c3 100644 --- a/src/exometer_report_snmp.erl +++ b/src/exometer_report_snmp.erl @@ -51,7 +51,7 @@ -define(OBJECT_GROUP_NAME, <<"allObjects">>). -define(INFORM_GROUP_NAME, <<"allNotifications">>). --type snmp_option() :: {exometer_entry:datapoint(), exometer_report:interval()} | +-type snmp_option() :: {exometer_entry:datapoint(), exometer_report:interval()} | {exometer_entry:datapoint(), exometer_report:interval(), exometer_report:extra()}. -type snmp() :: disabled | [snmp_option()]. @@ -82,7 +82,7 @@ exometer_init(Opts) -> ets:insert(?MIB_NR_MAP, {?MIB_NR_NEXT, 0}), ets:insert(?MIB_NR_MAP, {?MIB_NR_FREE, []}), - % load MIB template which is used through the operation of + % load MIB template which is used through the operation of % the process to dynamically export metrics MibPath0 = proplists:get_value(mib_template, Opts, ?MIB_TEMPLATE), MibWorkPath = proplists:get_value(mib_dir, Opts, ?MIB_DIR), @@ -101,9 +101,9 @@ exometer_init(Opts) -> {ok, Vsn} = load_mib(0, MibPath1, true), State0 = #st{mib_version=Vsn, - mib_file_path=MibPath1, - mib_file=FileBin, - mib_domain=Id, + mib_file_path=MibPath1, + mib_file=FileBin, + mib_domain=Id, mib_funcs_file_path=FuncsPath}, % ensure the mib is synced with exometer in case of reporter restarts State = sync_mib(State0), @@ -178,14 +178,14 @@ exometer_terminate(_, #st{mib_file_path=MibPath0}) -> % @doc Returns the latest mib and its metadata. get_mib() -> - try + try exometer_proc:call(?MODULE, get_mib) catch error:badarg -> {error, not_running} end. -% @doc +% @doc % Callback function used by the SNMP master agent upon operations performed by a manager. % Currently only get operations are handled. % @end @@ -207,7 +207,7 @@ snmp_operation(Op, Val, Key) -> %%%=================================================================== enable_metric(#exometer_entry{} = E, #st{mib_version=Vsn0, - mib_file_path=MibPath, + mib_file_path=MibPath, mib_file=Mib0, mib_domain=Domain, mib_funcs_file_path=FuncsPath}=S) -> @@ -223,7 +223,7 @@ enable_metric(#exometer_entry{} = E, #st{mib_version=Vsn0, end. disable_metric(#exometer_entry{} = E, #st{mib_version=Vsn0, - mib_file_path=MibPath, + mib_file_path=MibPath, mib_file=Mib0, mib_domain=Domain, mib_funcs_file_path=FuncsPath}=S) -> @@ -239,7 +239,7 @@ disable_metric(#exometer_entry{} = E, #st{mib_version=Vsn0, end. enable_inform(E, Dp, Extra, #st{mib_version=Vsn0, - mib_file_path=MibPath, + mib_file_path=MibPath, mib_file=Mib0, mib_domain=Domain, mib_funcs_file_path=FuncsPath}=S) -> @@ -261,7 +261,7 @@ enable_inform(E, Dp, Extra, #st{mib_version=Vsn0, end. disable_inform(E, Dp, Extra, #st{mib_version=Vsn0, - mib_file_path=MibPath, + mib_file_path=MibPath, mib_file=Mib0, mib_domain=Domain, mib_funcs_file_path=FuncsPath}=S) -> @@ -336,7 +336,7 @@ modify_mib(enable_metric, #exometer_entry{name = Metric} = E, case create_bin(Name, Dp, E) of {ok, Bin} -> L = [ - A, B, + A, B, <<"-- METRIC ", Name/binary, " START\n">>, Bin, <<" ::= { ", Domain/binary, " ">>, Nr1, <<" }\n">>, @@ -640,7 +640,7 @@ update_subscriptions_(M, {[Opt | A], R, Ch, Co}) -> option({Dp, Int}) -> {Dp, Int, undefined}; option({_, _, _}=Opt) -> Opt. --spec compare_subscriptions([snmp_option()], [snmp_option()]) -> +-spec compare_subscriptions([snmp_option()], [snmp_option()]) -> {[snmp_option()], [snmp_option()], [{snmp_option(), snmp_option()}], [snmp_option()]}. compare_subscriptions(Old, New) -> {A, Ch, Co} = lists:foldl( diff --git a/test/exometer_snmp_SUITE.erl b/test/exometer_snmp_SUITE.erl index eea079a..c9077b4 100644 --- a/test/exometer_snmp_SUITE.erl +++ b/test/exometer_snmp_SUITE.erl @@ -455,6 +455,7 @@ deps_code_flags() -> string:join(Deps1, " "). start_manager(Config) -> + io:fwrite(user, "STARTMGR: ~p~n", [Config]), Host = gethostname(), Node = test_manager, Opts = [{boot_timeout, 30}, {monitor_master, true}, |