Skip to content

Commit

Permalink
feat(buffer worker): report queued bytes as a metric
Browse files Browse the repository at this point in the history
Fixes https://emqx.atlassian.net/browse/EMQX-13074

N.B.: Kafka/Confluent/Azure Event Hub Producers and Pulsar Producers have internal
buffering, and thus they'll need to implement support for reporting such metrics
separately.
  • Loading branch information
thalesmg committed Oct 23, 2024
1 parent 885ce20 commit 88a8fb4
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 14 deletions.
92 changes: 91 additions & 1 deletion apps/emqx_bridge/include/emqx_bridge.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,95 @@
%% limitations under the License.
%%--------------------------------------------------------------------

-define(EMPTY_METRICS_V1,
?METRICS_V1(
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
)
).

-define(METRICS_V1(
Dropped,
DroppedOther,
DroppedExpired,
DroppedQueueFull,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Queued,
Retried,
LateReply,
SentFailed,
SentInflight,
SentSucc,
RATE,
RATE_5,
RATE_MAX,
Rcvd
),
#{
'dropped' => Dropped,
'dropped.other' => DroppedOther,
'dropped.expired' => DroppedExpired,
'dropped.queue_full' => DroppedQueueFull,
'dropped.resource_not_found' => DroppedResourceNotFound,
'dropped.resource_stopped' => DroppedResourceStopped,
'matched' => Matched,
'queuing' => Queued,
'retried' => Retried,
'late_reply' => LateReply,
'failed' => SentFailed,
'inflight' => SentInflight,
'success' => SentSucc,
rate => RATE,
rate_last5m => RATE_5,
rate_max => RATE_MAX,
received => Rcvd
}
).

-define(metrics_v1(
Dropped,
DroppedOther,
DroppedExpired,
DroppedQueueFull,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Queued,
Retried,
LateReply,
SentFailed,
SentInflight,
SentSucc,
RATE,
RATE_5,
RATE_MAX,
Rcvd
),
#{
'dropped' := Dropped,
'dropped.other' := DroppedOther,
'dropped.expired' := DroppedExpired,
'dropped.queue_full' := DroppedQueueFull,
'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
'queuing' := Queued,
'retried' := Retried,
'late_reply' := LateReply,
'failed' := SentFailed,
'inflight' := SentInflight,
'success' := SentSucc,
rate := RATE,
rate_last5m := RATE_5,
rate_max := RATE_MAX,
received := Rcvd
}
).

-define(EMPTY_METRICS,
?METRICS(
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
)
).

Expand All @@ -29,6 +115,7 @@
DroppedResourceStopped,
Matched,
Queued,
QueuedBytes,
Retried,
LateReply,
SentFailed,
Expand All @@ -48,6 +135,7 @@
'dropped.resource_stopped' => DroppedResourceStopped,
'matched' => Matched,
'queuing' => Queued,
'queuing_bytes' => QueuedBytes,
'retried' => Retried,
'late_reply' => LateReply,
'failed' => SentFailed,
Expand All @@ -69,6 +157,7 @@
DroppedResourceStopped,
Matched,
Queued,
QueuedBytes,
Retried,
LateReply,
SentFailed,
Expand All @@ -88,6 +177,7 @@
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
'queuing' := Queued,
'queuing_bytes' := QueuedBytes,
'retried' := Retried,
'late_reply' := LateReply,
'failed' := SentFailed,
Expand Down
12 changes: 6 additions & 6 deletions apps/emqx_bridge/src/emqx_bridge_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -893,20 +893,20 @@ collect_metrics(Bridges) ->
[#{node => Node, metrics => Metrics} || {Node, Metrics} <- Bridges].

aggregate_metrics(AllMetrics) ->
InitMetrics = ?EMPTY_METRICS,
InitMetrics = ?EMPTY_METRICS_V1,
lists:foldl(fun aggregate_metrics/2, InitMetrics, AllMetrics).

aggregate_metrics(
#{
metrics := ?metrics(
metrics := ?metrics_v1(
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
)
},
?metrics(
?metrics_v1(
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
)
) ->
?METRICS(
?METRICS_V1(
M1 + N1,
M2 + N2,
M3 + N3,
Expand Down Expand Up @@ -980,7 +980,7 @@ format_metrics(#{
}) ->
Queued = maps:get('queuing', Gauges, 0),
SentInflight = maps:get('inflight', Gauges, 0),
?METRICS(
?METRICS_V1(
Dropped,
DroppedOther,
DroppedExpired,
Expand All @@ -1003,7 +1003,7 @@ format_metrics(_Metrics) ->
%% Empty metrics: can happen when a node joins another and a
%% bridge is not yet replicated to it, so the counters map is
%% empty.
?METRICS(
?METRICS_V1(
_Dropped = 0,
_DroppedOther = 0,
_DroppedExpired = 0,
Expand Down
10 changes: 7 additions & 3 deletions apps/emqx_bridge/src/emqx_bridge_v2_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,7 @@ format_metrics(#{
}
}) ->
Queued = maps:get('queuing', Gauges, 0),
QueuedBytes = maps:get('queuing_bytes', Gauges, 0),
SentInflight = maps:get('inflight', Gauges, 0),
?METRICS(
Dropped,
Expand All @@ -1306,6 +1307,7 @@ format_metrics(#{
DroppedResourceStopped,
Matched,
Queued,
QueuedBytes,
Retried,
LateReply,
SentFailed,
Expand All @@ -1332,6 +1334,7 @@ empty_metrics() ->
_DroppedResourceStopped = 0,
_Matched = 0,
_Queued = 0,
_QueuedBytes = 0,
_Retried = 0,
_LateReply = 0,
_SentFailed = 0,
Expand Down Expand Up @@ -1365,11 +1368,11 @@ aggregate_metrics(AllMetrics) ->
aggregate_metrics(
#{
metrics := ?metrics(
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17, M18
)
},
?metrics(
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17, N18
)
) ->
?METRICS(
Expand All @@ -1389,7 +1392,8 @@ aggregate_metrics(
M14 + N14,
M15 + N15,
M16 + N16,
M17 + N17
M17 + N17,
M18 + N18
).

fill_defaults(ConfRootKey, Type, RawConf) ->
Expand Down
15 changes: 13 additions & 2 deletions apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1491,8 +1491,19 @@ t_metrics(Config) ->

?assertMatch(
{ok, 200, #{
<<"metrics">> := #{<<"matched">> := 0},
<<"node_metrics">> := [#{<<"metrics">> := #{<<"matched">> := 0}} | _]
<<"metrics">> := #{
<<"matched">> := 0,
<<"queuing_bytes">> := 0
},
<<"node_metrics">> := [
#{
<<"metrics">> := #{
<<"matched">> := 0,
<<"queuing_bytes">> := 0
}
}
| _
]
}},
request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config)
),
Expand Down
6 changes: 6 additions & 0 deletions apps/emqx_resource/src/emqx_resource_buffer_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ init({Id, Index, Opts}) ->
QueueOpts = replayq_opts(Id, Index, Opts),
Queue = replayq:open(QueueOpts),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
emqx_resource_metrics:queuing_bytes_set(Id, Index, queue_bytes(Queue)),
emqx_resource_metrics:inflight_set(Id, Index, 0),
InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT),
InflightTID = inflight_new(InflightWinSize),
Expand Down Expand Up @@ -375,6 +376,7 @@ terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
%% since we want volatile queues, this will be 0 after
%% termination.
emqx_resource_metrics:queuing_set(Id, Index, 0),
emqx_resource_metrics:queuing_bytes_set(Id, Index, 0),
gproc_pool:disconnect_worker(Id, {Id, Index}),
ok.

Expand Down Expand Up @@ -1207,6 +1209,7 @@ log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counte
-spec set_gauges(data()) -> ok.
set_gauges(_Data = #{id := Id, index := Index, queue := Q, inflight_tid := InflightTID}) ->
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)),
emqx_resource_metrics:queuing_bytes_set(Id, Index, queue_bytes(Q)),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
ok.

Expand Down Expand Up @@ -2117,6 +2120,9 @@ assert_ok_result(R) ->
queue_count(Q) ->
replayq:count(Q).

queue_bytes(Q) ->
replayq:bytes(Q).

disk_queue_dir(Id, Index) ->
QDir0 = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
QDir = filename:join([emqx:data_dir(), "bufs", node(), QDir0]),
Expand Down
16 changes: 16 additions & 0 deletions apps/emqx_resource/src/emqx_resource_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
inflight_get/1,
queuing_set/3,
queuing_get/1,
queuing_bytes_set/3,
queuing_bytes_get/1,
dropped_inc/1,
dropped_inc/2,
dropped_get/1,
Expand Down Expand Up @@ -92,6 +94,7 @@ events() ->
inflight,
matched,
queuing,
queuing_bytes,
received,
retried_failed,
retried_success,
Expand Down Expand Up @@ -218,6 +221,8 @@ handle_gauge_telemetry_event(Event, ID, WorkerID, Val) ->
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val);
queuing ->
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val);
queuing_bytes ->
emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing_bytes', Val);
_ ->
ok
end.
Expand All @@ -237,6 +242,17 @@ queuing_set(ID, WorkerID, Val) ->
queuing_get(ID) ->
emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'queuing').

%% @doc Number of bytes currently queued. [Gauge]
queuing_bytes_set(ID, WorkerID, Val) ->
telemetry:execute(
[?TELEMETRY_PREFIX, queuing_bytes],
#{gauge_set => Val},
#{resource_id => ID, worker_id => WorkerID}
).

queuing_bytes_get(ID) ->
emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'queuing_bytes').

%% @doc Count of batches of messages that were sent asynchronously but
%% ACKs are not yet received. [Gauge]
inflight_set(ID, WorkerID, Val) ->
Expand Down
16 changes: 14 additions & 2 deletions apps/emqx_resource/test/emqx_resource_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1432,8 +1432,10 @@ t_delete_and_re_create_with_same_name(_Config) ->
),
%% pre-condition: we should have just created a new queue
Queuing0 = emqx_resource_metrics:queuing_get(?ID),
QueuingBytes0 = emqx_resource_metrics:queuing_bytes_get(?ID),
Inflight0 = emqx_resource_metrics:inflight_get(?ID),
?assertEqual(0, Queuing0),
?assertEqual(0, QueuingBytes0),
?assertEqual(0, Inflight0),
?check_trace(
begin
Expand All @@ -1445,7 +1447,8 @@ t_delete_and_re_create_with_same_name(_Config) ->
_Timeout = 5_000
),
%% ensure replayq offloads to disk
Payload = binary:copy(<<"a">>, 119),
NumBytes = 119,
Payload = binary:copy(<<"a">>, NumBytes),
lists:foreach(
fun(N) ->
spawn_link(fun() ->
Expand All @@ -1468,6 +1471,13 @@ t_delete_and_re_create_with_same_name(_Config) ->
_Attempts0 = 20,
?assert(emqx_resource_metrics:queuing_get(?ID) > 0)
),
?retry(
_Sleep = 300,
_Attempts0 = 20,
%% `> NumBytes' because replayq reports total usage, not just payload, so
%% headers and metadata are included.
?assert(emqx_resource_metrics:queuing_bytes_get(?ID) > NumBytes)
),
?retry(
_Sleep = 300,
_Attempts0 = 20,
Expand Down Expand Up @@ -1501,8 +1511,10 @@ t_delete_and_re_create_with_same_name(_Config) ->

%% it shouldn't have anything enqueued, as it's a fresh resource
Queuing2 = emqx_resource_metrics:queuing_get(?ID),
Inflight2 = emqx_resource_metrics:queuing_get(?ID),
QueuingBytes2 = emqx_resource_metrics:queuing_bytes_get(?ID),
Inflight2 = emqx_resource_metrics:inflight_get(?ID),
?assertEqual(0, Queuing2),
?assertEqual(0, QueuingBytes2),
?assertEqual(0, Inflight2),

ok
Expand Down

0 comments on commit 88a8fb4

Please sign in to comment.