diff --git a/changelog.md b/changelog.md index 5c99f92..8851d4e 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,6 @@ +* 4.0.3 + - Added the `[wolff, queuing_bytes]` telemetry event which reports the amount of RAM/disk used by producer queues. + * 4.0.2 - Fix dynamic topic producer initialization failure handling (introduced in 3.0.0). - Fix `unexpected_id` crash when replayq overflow (introduced in 4.0.1). diff --git a/src/wolff_metrics.erl b/src/wolff_metrics.erl index 6afdd02..3f3443d 100644 --- a/src/wolff_metrics.erl +++ b/src/wolff_metrics.erl @@ -4,6 +4,7 @@ -export([ inflight_set/2, queuing_set/2, + queuing_bytes_set/2, dropped_inc/1, dropped_inc/2, dropped_queue_full_inc/1, @@ -29,6 +30,12 @@ queuing_set(Config, Val) -> #{gauge_set => Val}, telemetry_meta_data(Config)). +%% @doc Number of bytes (RAM and/or disk) currently queuing. [Gauge] +queuing_bytes_set(Config, Val) -> + telemetry:execute([wolff, queuing_bytes], + #{gauge_set => Val}, + telemetry_meta_data(Config)). + %% @doc Count of messages that were sent asynchronously but ACKs are not %% received. [Gauge] inflight_set(Config, Val) -> diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index adaaedd..af1ae60 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -275,6 +275,7 @@ do_init(#{client_id := ClientId, Config2 = resolve_max_linger_bytes(Config1, Q), Config = maps:without([replayq_dir, replayq_seg_bytes], Config2), wolff_metrics:queuing_set(Config, replayq:count(Q)), + wolff_metrics:queuing_bytes_set(Config, replayq:bytes(Q)), wolff_metrics:inflight_set(Config, 0), St#{replayq => Q, config := Config, @@ -376,9 +377,11 @@ clear_gauges(#{config := Config}, Q) -> maybe_reset_queuing(Config, Q) -> case {replayq:count(Q), is_replayq_durable(Config, Q)} of {0, _} -> - wolff_metrics:queuing_set(Config, 0); + wolff_metrics:queuing_set(Config, 0), + wolff_metrics:queuing_bytes_set(Config, 0); {_, false} -> - wolff_metrics:queuing_set(Config, 0); + wolff_metrics:queuing_set(Config, 0), + wolff_metrics:queuing_bytes_set(Config, 0); {_, _} -> ok end. @@ -505,6 +508,7 @@ send_to_kafka(#{sent_reqs := SentReqs, IDs = lists:map(fun({ID, _}) -> ID end, get_calls_from_queue_items(Items)), NewPendingAcks = wolff_pendack:move_backlog_to_inflight(PendingAcks, IDs), wolff_metrics:queuing_set(Config, replayq:count(NewQ)), + wolff_metrics:queuing_bytes_set(Config, replayq:bytes(NewQ)), NewSentReqsCount = SentReqsCount + 1, NrOfCalls = count_calls(Items), NewInflightCalls = InflightCalls + NrOfCalls, @@ -923,6 +927,7 @@ enqueue_calls2(Calls, end, {[], PendingAcks0, 0}, Calls), NewQ = replayq:append(Q, lists:reverse(QueueItems)), wolff_metrics:queuing_set(Config0, replayq:count(NewQ)), + wolff_metrics:queuing_bytes_set(Config0, replayq:bytes(NewQ)), lists:foreach(fun maybe_reply_queued/1, Calls), Overflow = case maps:get(drop_if_highmem, Config0, false) andalso replayq:is_mem_only(NewQ) @@ -968,6 +973,7 @@ handle_overflow(#{replayq := Q, wolff_metrics:dropped_queue_full_inc(Config, NrOfCalls), wolff_metrics:dropped_inc(Config, NrOfCalls), wolff_metrics:queuing_set(Config, replayq:count(NewQ)), + wolff_metrics:queuing_bytes_set(Config, replayq:bytes(NewQ)), ok = maybe_log_discard(St, NrOfCalls), {CbList, NewPendingAcks} = wolff_pendack:drop_backlog(PendingAcks, CallIDs), lists:foreach(fun(Cb) -> eval_ack_cb(Cb, ?buffer_overflow_discarded) end, CbList), diff --git a/test/wolff_supervised_tests.erl b/test/wolff_supervised_tests.erl index b4f60cb..a6c7d01 100644 --- a/test/wolff_supervised_tests.erl +++ b/test/wolff_supervised_tests.erl @@ -31,6 +31,7 @@ supervised_client_test() -> ok = application:stop(wolff), ?assertEqual(undefined, whereis(wolff_sup)), assert_last_event_is_zero(queuing, CntrEventsTable), + assert_last_event_is_zero(queuing_bytes, CntrEventsTable), assert_last_event_is_zero(inflight, CntrEventsTable), [1] = get_telemetry_seq(CntrEventsTable, [wolff,success]), ets:delete(CntrEventsTable), @@ -65,6 +66,7 @@ test_supervised_producers(Name) -> ?assertEqual([], supervisor:which_children(wolff_client_sup)), ok = application:stop(wolff), assert_last_event_is_zero(queuing, CntrEventsTable), + assert_last_event_is_zero(queuing_bytes, CntrEventsTable), assert_last_event_is_zero(inflight, CntrEventsTable), [1] = get_telemetry_seq(CntrEventsTable, [wolff,success]), ets:delete(CntrEventsTable), @@ -175,6 +177,7 @@ test_client_restart(ClientId, Topic, Partition) -> ok = application:stop(wolff), [1,1] = get_telemetry_seq(CntrEventsTable, [wolff,success]), assert_last_event_is_zero(queuing, CntrEventsTable), + assert_last_event_is_zero(queuing_bytes, CntrEventsTable), assert_last_event_is_zero(inflight, CntrEventsTable), ets:delete(CntrEventsTable), wolff_tests:deinstall_event_logging(?FUNCTION_NAME), @@ -256,6 +259,7 @@ producer_restart_test() -> ok = application:stop(wolff), [1,2] = get_telemetry_seq(CntrEventsTable, [wolff,success]), assert_last_event_is_zero(queuing, CntrEventsTable), + assert_last_event_is_zero(queuing_bytes, CntrEventsTable), assert_last_event_is_zero(inflight, CntrEventsTable), ets:delete(CntrEventsTable), wolff_tests:deinstall_event_logging(?FUNCTION_NAME), @@ -322,6 +326,7 @@ test_partition_count_refresh() -> ?assertEqual(Partitions0, Partition1), [1,1] = get_telemetry_seq(CntrEventsTable, [wolff,success]), assert_last_event_is_zero(queuing, CntrEventsTable), + assert_last_event_is_zero(queuing_bytes, CntrEventsTable), assert_last_event_is_zero(inflight, CntrEventsTable), ets:delete(CntrEventsTable), wolff_tests:deinstall_event_logging(?FUNCTION_NAME), diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index e65e538..b91ac3b 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -117,6 +117,10 @@ send_test() -> ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), [0,1,0]), + ?assertMatch( + [0,N,0] when N > 0, + wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])) + ), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -155,6 +159,9 @@ send_one_msg_max_batch_test() -> ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), [0,1,0]), + ?assert_eq_optional_tail( + wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])), + [0,EstimatedBytes,0]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -192,6 +199,9 @@ send_smallest_msg_max_batch_test() -> ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), [0,1,0]), + ?assert_eq_optional_tail( + wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])), + [0,batch_bytes(Batch),0]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -432,6 +442,9 @@ replayq_overflow_test() -> ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), [0,2,1,0]), + ?assert_eq_optional_tail( + wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])), + [0,64,32,0]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -503,6 +516,9 @@ replayq_highmem_overflow_test() -> ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), [0, 1, 0, 1, 0, 1, 0]), + ?assert_eq_optional_tail( + wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])), + [0,32,0,32,0,32,0]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -599,6 +615,9 @@ replayq_offload_test() -> ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), [0, 1, 0]), + ?assert_eq_optional_tail( + wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing_bytes])), + [0, 61, 0]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0, 1, 0]), @@ -1001,6 +1020,7 @@ telemetry_events() -> [wolff, dropped_queue_full], [wolff, matched], [wolff, queuing], + [wolff, queuing_bytes], [wolff, retried], [wolff, failed], [wolff, inflight],