Skip to content

Commit

Permalink
feat: add [wolff, queuing_bytes] telemetry event for ram/disk usage…
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Oct 23, 2024
1 parent a7e5b04 commit fa78452
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 2 deletions.
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* 4.0.3
- Added the `[wolff, queuing_bytes]` telemetry event which reports the amount of RAM/disk used by producer queues.

* 4.0.2
- Fix dynamic topic producer initialization failure handling (introduced in 3.0.0).
- Fix `unexpected_id` crash when replayq overflow (introduced in 4.0.1).
Expand Down
7 changes: 7 additions & 0 deletions src/wolff_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
-export([
inflight_set/2,
queuing_set/2,
queuing_bytes_set/2,
dropped_inc/1,
dropped_inc/2,
dropped_queue_full_inc/1,
Expand All @@ -29,6 +30,12 @@ queuing_set(Config, Val) ->
#{gauge_set => Val},
telemetry_meta_data(Config)).

%% @doc Number of bytes (RAM and/or disk) currently queuing. [Gauge]
queuing_bytes_set(Config, Val) ->
telemetry:execute([wolff, queuing_bytes],
#{gauge_set => Val},
telemetry_meta_data(Config)).

%% @doc Count of messages that were sent asynchronously but ACKs are not
%% received. [Gauge]
inflight_set(Config, Val) ->
Expand Down
10 changes: 8 additions & 2 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ do_init(#{client_id := ClientId,
Config2 = resolve_max_linger_bytes(Config1, Q),
Config = maps:without([replayq_dir, replayq_seg_bytes], Config2),
wolff_metrics:queuing_set(Config, replayq:count(Q)),
wolff_metrics:queuing_bytes_set(Config, replayq:bytes(Q)),
wolff_metrics:inflight_set(Config, 0),
St#{replayq => Q,
config := Config,
Expand Down Expand Up @@ -376,9 +377,11 @@ clear_gauges(#{config := Config}, Q) ->
maybe_reset_queuing(Config, Q) ->
case {replayq:count(Q), is_replayq_durable(Config, Q)} of
{0, _} ->
wolff_metrics:queuing_set(Config, 0);
wolff_metrics:queuing_set(Config, 0),
wolff_metrics:queuing_bytes_set(Config, 0);
{_, false} ->
wolff_metrics:queuing_set(Config, 0);
wolff_metrics:queuing_set(Config, 0),
wolff_metrics:queuing_bytes_set(Config, 0);
{_, _} ->
ok
end.
Expand Down Expand Up @@ -505,6 +508,7 @@ send_to_kafka(#{sent_reqs := SentReqs,
IDs = lists:map(fun({ID, _}) -> ID end, get_calls_from_queue_items(Items)),
NewPendingAcks = wolff_pendack:move_backlog_to_inflight(PendingAcks, IDs),
wolff_metrics:queuing_set(Config, replayq:count(NewQ)),
wolff_metrics:queuing_bytes_set(Config, replayq:bytes(NewQ)),
NewSentReqsCount = SentReqsCount + 1,
NrOfCalls = count_calls(Items),
NewInflightCalls = InflightCalls + NrOfCalls,
Expand Down Expand Up @@ -923,6 +927,7 @@ enqueue_calls2(Calls,
end, {[], PendingAcks0, 0}, Calls),
NewQ = replayq:append(Q, lists:reverse(QueueItems)),
wolff_metrics:queuing_set(Config0, replayq:count(NewQ)),
wolff_metrics:queuing_bytes_set(Config0, replayq:bytes(NewQ)),
lists:foreach(fun maybe_reply_queued/1, Calls),
Overflow = case maps:get(drop_if_highmem, Config0, false)
andalso replayq:is_mem_only(NewQ)
Expand Down Expand Up @@ -968,6 +973,7 @@ handle_overflow(#{replayq := Q,
wolff_metrics:dropped_queue_full_inc(Config, NrOfCalls),
wolff_metrics:dropped_inc(Config, NrOfCalls),
wolff_metrics:queuing_set(Config, replayq:count(NewQ)),
wolff_metrics:queuing_bytes_set(Config, replayq:bytes(NewQ)),
ok = maybe_log_discard(St, NrOfCalls),
{CbList, NewPendingAcks} = wolff_pendack:drop_backlog(PendingAcks, CallIDs),
lists:foreach(fun(Cb) -> eval_ack_cb(Cb, ?buffer_overflow_discarded) end, CbList),
Expand Down
5 changes: 5 additions & 0 deletions test/wolff_supervised_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ supervised_client_test() ->
ok = application:stop(wolff),
?assertEqual(undefined, whereis(wolff_sup)),
assert_last_event_is_zero(queuing, CntrEventsTable),
assert_last_event_is_zero(queuing_bytes, CntrEventsTable),
assert_last_event_is_zero(inflight, CntrEventsTable),
[1] = get_telemetry_seq(CntrEventsTable, [wolff,success]),
ets:delete(CntrEventsTable),
Expand Down Expand Up @@ -65,6 +66,7 @@ test_supervised_producers(Name) ->
?assertEqual([], supervisor:which_children(wolff_client_sup)),
ok = application:stop(wolff),
assert_last_event_is_zero(queuing, CntrEventsTable),
assert_last_event_is_zero(queuing_bytes, CntrEventsTable),
assert_last_event_is_zero(inflight, CntrEventsTable),
[1] = get_telemetry_seq(CntrEventsTable, [wolff,success]),
ets:delete(CntrEventsTable),
Expand Down Expand Up @@ -175,6 +177,7 @@ test_client_restart(ClientId, Topic, Partition) ->
ok = application:stop(wolff),
[1,1] = get_telemetry_seq(CntrEventsTable, [wolff,success]),
assert_last_event_is_zero(queuing, CntrEventsTable),
assert_last_event_is_zero(queuing_bytes, CntrEventsTable),
assert_last_event_is_zero(inflight, CntrEventsTable),
ets:delete(CntrEventsTable),
wolff_tests:deinstall_event_logging(?FUNCTION_NAME),
Expand Down Expand Up @@ -256,6 +259,7 @@ producer_restart_test() ->
ok = application:stop(wolff),
[1,2] = get_telemetry_seq(CntrEventsTable, [wolff,success]),
assert_last_event_is_zero(queuing, CntrEventsTable),
assert_last_event_is_zero(queuing_bytes, CntrEventsTable),
assert_last_event_is_zero(inflight, CntrEventsTable),
ets:delete(CntrEventsTable),
wolff_tests:deinstall_event_logging(?FUNCTION_NAME),
Expand Down Expand Up @@ -322,6 +326,7 @@ test_partition_count_refresh() ->
?assertEqual(Partitions0, Partition1),
[1,1] = get_telemetry_seq(CntrEventsTable, [wolff,success]),
assert_last_event_is_zero(queuing, CntrEventsTable),
assert_last_event_is_zero(queuing_bytes, CntrEventsTable),
assert_last_event_is_zero(inflight, CntrEventsTable),
ets:delete(CntrEventsTable),
wolff_tests:deinstall_event_logging(?FUNCTION_NAME),
Expand Down
21 changes: 21 additions & 0 deletions test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ send_test() ->
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])),
[0,1,0]),
?assertMatch(
[0,N,0] when N > 0,
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes]))
),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])),
[0,1,0]),
Expand Down Expand Up @@ -155,6 +159,9 @@ send_one_msg_max_batch_test() ->
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])),
[0,1,0]),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])),
[0,EstimatedBytes,0]),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])),
[0,1,0]),
Expand Down Expand Up @@ -192,6 +199,9 @@ send_smallest_msg_max_batch_test() ->
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])),
[0,1,0]),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])),
[0,batch_bytes(Batch),0]),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])),
[0,1,0]),
Expand Down Expand Up @@ -432,6 +442,9 @@ replayq_overflow_test() ->
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])),
[0,2,1,0]),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])),
[0,64,32,0]),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])),
[0,1,0]),
Expand Down Expand Up @@ -503,6 +516,9 @@ replayq_highmem_overflow_test() ->
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])),
[0, 1, 0, 1, 0, 1, 0]),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])),
[0,32,0,32,0,32,0]),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])),
[0,1,0]),
Expand Down Expand Up @@ -599,6 +615,10 @@ replayq_offload_test() ->
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])),
[0, 1, 0]),
?assertMatch(
[0,N,0] when N > 0,
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes]))
),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])),
[0, 1, 0]),
Expand Down Expand Up @@ -1001,6 +1021,7 @@ telemetry_events() ->
[wolff, dropped_queue_full],
[wolff, matched],
[wolff, queuing],
[wolff, queuing_bytes],
[wolff, retried],
[wolff, failed],
[wolff, inflight],
Expand Down

0 comments on commit fa78452

Please sign in to comment.