Skip to content

Commit

Permalink
Add more usage tracking and core functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
chewbranca committed Oct 30, 2023
1 parent e5e4820 commit ecda8f3
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 19 deletions.
4 changes: 4 additions & 0 deletions src/chttpd/src/chttpd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ handle_request_int(MochiReq) ->
% Save client socket so that it can be monitored for disconnects
chttpd_util:mochiweb_client_req_set(MochiReq),

%% This is probably better in before_request, but having Path is nice
couch_stats_resource_tracker:create_coordinator_context(HttpReq0, Path),
{HttpReq2, Response} =
case before_request(HttpReq0) of
{ok, HttpReq1} ->
Expand Down Expand Up @@ -353,6 +355,8 @@ handle_request_int(MochiReq) ->

before_request(HttpReq) ->
try
%% TODO: re-enable this here once we have Path
%% couch_stats_resource_tracker:create_coordinator_context(HttpReq),
chttpd_stats:init(),
chttpd_plugin:before_request(HttpReq)
catch
Expand Down
1 change: 1 addition & 0 deletions src/chttpd/src/chttpd_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@

% Database request handlers
handle_request(#httpd{path_parts = [DbName | RestParts], method = Method} = Req) ->
couch_stats_resource_tracker:set_context_dbname(DbName),
case {Method, RestParts} of
{'PUT', []} ->
create_db_req(Req, DbName);
Expand Down
150 changes: 135 additions & 15 deletions src/couch_stats/src/couch_stats_resource_tracker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,17 @@

-export([
create_context/0, create_context/1, create_context/3,
create_coordinator_context/1, create_coordinator_context/2,
set_context_dbname/1,
set_context_username/1,
track/1,
should_track/1
]).

-export([
active/0
active/0,
active_coordinators/0,
active_workers/0
]).

-export([
Expand Down Expand Up @@ -93,6 +98,7 @@
%% TODO: overlap between this and couch btree fold invocations
%% TODO: need some way to distinguish fols on views vs find vs all_docs
-define(FRPC_CHANGES_ROW, changes_processed).
%%-define(FRPC_CHANGES_ROW, ?ROWS_READ).

%% Module pdict markers
-define(DELTA_TA, csrt_delta_ta).
Expand All @@ -109,13 +115,19 @@
%% TODO: switch to:
%% -record(?RCTX, {
-record(rctx, {
%% Metadata
updated_at = os:timestamp(),
exited_at,
pid_ref,
mfa,
nonce,
from,
type = unknown, %% unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc
state = alive,
dbname,
username,

%% Stats counters
db_open = 0,
docs_read = 0,
rows_read = 0,
Expand All @@ -132,8 +144,7 @@
%% TODO: switch record definitions to be macro based, eg:
%% ?COUCH_BT_GET_KP_NODE = 0,
get_kv_node = 0,
get_kp_node = 0,
state = alive
get_kp_node = 0
}).

db_opened() -> inc(db_opened).
Expand Down Expand Up @@ -208,7 +219,7 @@ inc(?MANGO_EVAL_MATCH, N) ->
inc(?DB_OPEN_DOC, N) ->
update_counter(#rctx.?DB_OPEN_DOC, N);
inc(?FRPC_CHANGES_ROW, N) ->
update_counter(#rctx.?FRPC_CHANGES_ROW, N);
update_counter(#rctx.?ROWS_READ, N); %% TODO: rework double use of rows_read
inc(?COUCH_BT_GET_KP_NODE, N) ->
update_counter(#rctx.?COUCH_BT_GET_KP_NODE, N);
inc(?COUCH_BT_GET_KV_NODE, N) ->
Expand Down Expand Up @@ -238,8 +249,8 @@ maybe_inc([couchdb, query_server, js_filter], Val) ->
inc(?COUCH_JS_FILTER, Val);
maybe_inc([couchdb, query_server, js_filtered_docs], Val) ->
inc(?COUCH_JS_FILTERED_DOCS, Val);
maybe_inc(Metric, Val) ->
io:format("SKIPPING MAYBE_INC METRIC[~p]: ~p~n", [Val, Metric]),
maybe_inc(_Metric, _Val) ->
%%io:format("SKIPPING MAYBE_INC METRIC[~p]: ~p~n", [Val, Metric]),
0.


Expand All @@ -248,6 +259,8 @@ should_track([fabric_rpc, all_docs, spawned]) ->
true;
should_track([fabric_rpc, changes, spawned]) ->
true;
should_track([fabric_rpc, changes, processed]) ->
true;
should_track([fabric_rpc, map_view, spawned]) ->
true;
should_track([fabric_rpc, reduce_view, spawned]) ->
Expand Down Expand Up @@ -283,7 +296,26 @@ update_counter({_Pid,_Ref}=Key, Field, Count) ->
ets:update_counter(?MODULE, Key, {Field, Count}, #rctx{pid_ref=Key}).


active() ->
active() -> active_int(all).
active_coordinators() -> active_int(coordinators).
active_workers() -> active_int(workers).


active_int(coordinators) ->
select_by_type(coordinators);
active_int(workers) ->
select_by_type(workers);
active_int(all) ->
lists:map(fun to_json/1, ets:tab2list(?MODULE)).


select_by_type(coordinators) ->
ets:select(couch_stats_resource_tracker,
[{#rctx{type = {coordinator,'_','_'}, _ = '_'}, [], ['$_']}]);

This comment has been minimized.

Copy link
@iilyak

iilyak Oct 31, 2023

Contributor

Did you consider ets:fun2ms here?

select_by_type(workers) ->
ets:select(couch_stats_resource_tracker,
[{#rctx{type = {worker,'_','_'}, _ = '_'}, [], ['$_']}]);
select_by_type(all) ->
lists:map(fun to_json/1, ets:tab2list(?MODULE)).


Expand All @@ -294,11 +326,17 @@ to_json(#rctx{}=Rctx) ->
mfa = MFA0,
nonce = Nonce0,
from = From0,
dbname = DbName,
username = UserName,
db_open = DbOpens,
docs_read = DocsRead,
rows_read = RowsRead,
state = State0,
type = Type,
btree_folds = BtFolds,
get_kp_node = KpNodes,
get_kv_node = KvNodes,
ioq_calls = IoqCalls,
changes_processed = ChangesProcessed
} = Rctx,
%%io:format("TO_JSON_MFA: ~p~n", [MFA0]),
Expand Down Expand Up @@ -338,27 +376,43 @@ to_json(#rctx{}=Rctx) ->
nonce => term_to_json(Nonce),
%%from => From,
from => term_to_json(From),
dbname => DbName,
username => UserName,
db_open => DbOpens,
docs_read => DocsRead,
rows_read => RowsRead,
state => State,
type => term_to_json(Type),
type => term_to_json({type, Type}),
btree_folds => BtFolds,
kp_nodes => KpNodes,
kv_nodes => KvNodes,
ioq_calls => IoqCalls,
changes_processed => ChangesProcessed
}.

term_to_json({Pid, Ref}) when is_pid(Pid), is_reference(Ref) ->
[?l2b(pid_to_list(Pid)), ?l2b(ref_to_list(Ref))];
term_to_json({type, {coordinator, _, _} = Type}) ->
%%io:format("SETTING JSON TYPE: ~p~n", [Type]),
?l2b(io_lib:format("~p", [Type]));
term_to_json({A, B, C}) ->
[A, B, C];
term_to_json(undefined) ->
null;
term_to_json(null) ->
null;
term_to_json(T) ->
T.

term_to_flat_json({type, {coordinator, _, _} = Type}) ->
%%io:format("SETTING FLAT JSON TYPE: ~p~n", [Type]),
?l2b(io_lib:format("~p", [Type]));
term_to_flat_json(Tuple) when is_tuple(Tuple) ->
?l2b(io_lib:format("~w", [Tuple]));
term_to_flat_json(undefined) ->
null;
term_to_flat_json(null) ->
null;
term_to_flat_json(T) ->
T.

Expand All @@ -369,11 +423,17 @@ to_flat_json(#rctx{}=Rctx) ->
mfa = MFA0,
nonce = Nonce0,
from = From0,
dbname = DbName,
username = UserName,
db_open = DbOpens,
docs_read = DocsRead,
rows_read = RowsRead,
state = State0,
type = Type,
btree_folds = ChangesProcessed
get_kp_node = KpNodes,
get_kv_node = KvNodes,
btree_folds = ChangesProcessed,
ioq_calls = IoqCalls
} = Rctx,
io:format("TO_JSON_MFA: ~p~n", [MFA0]),
MFA = case MFA0 of
Expand Down Expand Up @@ -402,6 +462,7 @@ to_flat_json(#rctx{}=Rctx) ->
Nonce0 ->
list_to_binary(Nonce0)
end,
io:format("NONCE IS: ~p||~p~n", [Nonce0, Nonce]),
#{
%%updated_at => ?l2b(io_lib:format("~w", [TP])),
updated_at => term_to_flat_json(TP),
Expand All @@ -410,11 +471,17 @@ to_flat_json(#rctx{}=Rctx) ->
mfa => MFA,
nonce => Nonce,
from => From,
dbname => DbName,
username => UserName,
db_open => DbOpens,
docs_read => DocsRead,
rows_read => RowsRead,
state => State,
type => term_to_flat_json(Type),
btree_folds => ChangesProcessed
type => term_to_flat_json({type, Type}),
kp_nodes => KpNodes,
kv_nodes => KvNodes,
btree_folds => ChangesProcessed,
ioq_calls => IoqCalls
}.

get_pid_ref() ->
Expand All @@ -440,22 +507,71 @@ create_context(Pid) ->

%% add type to disnguish coordinator vs rpc_worker
create_context(From, {M,F,_A} = MFA, Nonce) ->
io:format("CREAT_CONTEXT MFA[~p]: {~p}: ~p~n", [From, MFA, Nonce]),
Ref = make_ref(),
io:format("[~p] CREAT_CONTEXT MFA[~p]: {~p}: ~p~n", [self(), From, MFA, Nonce]),
PidRef = get_pid_ref(), %% this will instantiate a new PidRef
%%Rctx = make_record(self(), Ref),
%% TODO: extract user_ctx and db/shard from
Rctx = #rctx{
pid_ref = {self(), Ref},
pid_ref = PidRef,
from = From,
mfa = MFA,
type = {worker, M, F},
nonce = Nonce
},
track(Rctx),
erlang:put(?DELTA_TZ, Rctx),
ets:insert(?MODULE, Rctx),
true = ets:insert(?MODULE, Rctx),
Rctx.

create_coordinator_context(#httpd{path_parts=Parts} = Req) ->
create_coordinator_context(Req, io_lib:format("~p", [Parts])).

create_coordinator_context(#httpd{} = Req, Path) ->
io:format("CREATING COORDINATOR CONTEXT ON {~p}~n", [Path]),
#httpd{
method = Verb,
%%path_parts = Parts,
nonce = Nonce
} = Req,
PidRef = get_pid_ref(), %% this will instantiate a new PidRef
%%Rctx = make_record(self(), Ref),
%% TODO: extract user_ctx and db/shard from Req
Rctx = #rctx{
pid_ref = PidRef,
%%type = {cooridantor, Verb, Parts},
type = {coordinator, Verb, [$/ | Path]},
nonce = Nonce
},
track(Rctx),
erlang:put(?DELTA_TZ, Rctx),
true = ets:insert(?MODULE, Rctx),
Rctx.

set_context_dbname(DbName) ->
case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.dbname, DbName}]) of
false ->
Stk = try throw(42) catch _:_:Stk0 -> Stk0 end,
io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [DbName, get_resource(), Stk, process_info(self(), current_stacktrace)]),
timer:sleep(1000),
erlang:halt(kaboomz);
true ->
true
end.

set_context_username(null) ->
ok;
set_context_username(UserName) ->
io:format("CSRT SETTING USERNAME CONTEXT: ~p~n", [UserName]),
case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.username, UserName}]) of
false ->
Stk = try throw(42) catch _:_:Stk0 -> Stk0 end,
io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [UserName, get_resource(), Stk, process_info(self(), current_stacktrace)]),
timer:sleep(1000),
erlang:halt(kaboomz);
true ->
true
end.

track(#rctx{}=Rctx) ->
%% TODO: should this block or not? If no, what cleans up zombies?
%% gen_server:call(?MODULE, {track, PR}).
Expand Down Expand Up @@ -522,6 +638,10 @@ make_delta(#rctx{}=TA, #rctx{}=TB) ->
docs_read => TB#rctx.docs_read - TA#rctx.docs_read,
rows_read => TB#rctx.rows_read - TA#rctx.rows_read,
btree_folds => TB#rctx.btree_folds - TA#rctx.btree_folds,
get_kp_node => TB#rctx.get_kp_node - TA#rctx.get_kp_node,
get_kv_node => TB#rctx.get_kv_node - TA#rctx.get_kv_node,
db_open => TB#rctx.db_open - TA#rctx.db_open,
ioq_calls => TB#rctx.ioq_calls - TA#rctx.ioq_calls,
dt => timer:now_diff(TB#rctx.updated_at, TA#rctx.updated_at)
},
%% TODO: reevaluate this decision
Expand Down
9 changes: 9 additions & 0 deletions src/fabric/src/fabric_rpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ set_purge_infos_limit(DbName, Limit, Options) ->
with_db(DbName, Options, {couch_db, set_purge_infos_limit, [Limit]}).

open_doc(DbName, DocId, Options) ->
io:format("frpc:open_doc(~p, ~p, ~p)~n", [DbName, DocId, Options]),
with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).

open_revs(DbName, IdRevsOpts, Options) ->
Expand Down Expand Up @@ -352,6 +353,14 @@ get_uuid(DbName) ->

with_db(DbName, Options, {M, F, A}) ->
set_io_priority(DbName, Options),
couch_stats_resource_tracker:set_context_dbname(DbName),
%% TODO: better approach here than using proplists?

This comment has been minimized.

Copy link
@iilyak

iilyak Oct 31, 2023

Contributor

We set it in few places like this

Options = [{user_ctx, Req#httpd.user_ctx} | Options0],

It is very unfortunate that we don't have a mandatory request_context argument for all fabric.erl functions. The life would be so easier in this case. Instead we are mixing in request context data into database options and also injecting ADMIN_CTX over the place.

The above runt has nothing to do with you code. The best we can do is to create a helper fun in fabric_util.

case proplists:get_value(user_ctx, Options) of
undefined ->
ok;
#user_ctx{name = UserName} ->
couch_stats_resource_tracker:set_context_username(UserName)
end,
case get_or_create_db(DbName, Options) of
{ok, Db} ->
rexi:reply(
Expand Down
9 changes: 5 additions & 4 deletions src/rexi/src/rexi.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ async_server_call(Server, Caller, Request) ->
-spec reply(any()) -> any().
reply(Reply) ->
{Caller, Ref} = get(rexi_from),
Delta = couch_stats_resource_tracker:make_delta(),
erlang:send(Caller, {Ref, Reply, {delta, Delta}}).
erlang:send(Caller, {Ref, Reply, get_delta()}).

%% @equiv sync_reply(Reply, 300000)
sync_reply(Reply) ->
Expand Down Expand Up @@ -215,8 +214,7 @@ stream(Msg, Limit, Timeout) ->
{ok, Count} ->
put(rexi_unacked, Count + 1),
{Caller, Ref} = get(rexi_from),
Delta = couch_stats_resource_tracker:make_delta(),
erlang:send(Caller, {Ref, self(), Msg, {delta, Delta}}),
erlang:send(Caller, {Ref, self(), Msg, get_delta()}),
ok
catch
throw:timeout ->
Expand Down Expand Up @@ -330,3 +328,6 @@ drain_acks(Count) ->
after 0 ->
{ok, Count}
end.

get_delta() ->
{delta, couch_stats_resource_tracker:make_delta()}.
Loading

0 comments on commit ecda8f3

Please sign in to comment.