Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent duplicate vhost label on queue exchange metrics (backport #12373) #12376

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,13 @@ rabbitmq_integration_suite(
],
)

rabbitmq_integration_suite(
name = "msg_size_metrics_SUITE",
runtime_deps = [
"//deps/rabbitmq_amqp_client:erlang_app",
],
)

rabbitmq_integration_suite(
name = "list_consumers_sanity_check_SUITE",
size = "medium",
Expand Down Expand Up @@ -993,6 +1000,11 @@ rabbitmq_integration_suite(
size = "medium",
)

rabbitmq_suite(
name = "unit_msg_size_metrics_SUITE",
size = "small",
)

rabbitmq_suite(
name = "unit_operator_policy_SUITE",
size = "small",
Expand Down
31 changes: 31 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_size_metrics.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
Expand Down Expand Up @@ -425,6 +426,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_size_metrics.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
Expand Down Expand Up @@ -703,6 +705,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_misc.erl",
"src/rabbit_mnesia.erl",
"src/rabbit_msg_size_metrics.erl",
"src/rabbit_msg_store.erl",
"src/rabbit_msg_store_gc.erl",
"src/rabbit_networking.erl",
Expand Down Expand Up @@ -1714,6 +1717,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
)
erlang_bytecode(
name = "unit_msg_size_metrics_SUITE_beam_files",
testonly = True,
srcs = ["test/unit_msg_size_metrics_SUITE.erl"],
outs = ["test/unit_msg_size_metrics_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "unit_operator_policy_SUITE_beam_files",
testonly = True,
Expand Down Expand Up @@ -2175,3 +2186,23 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
<<<<<<< HEAD
=======
erlang_bytecode(
name = "test_rabbit_ct_hook_beam",
testonly = True,
srcs = ["test/rabbit_ct_hook.erl"],
outs = ["test/rabbit_ct_hook.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "msg_size_metrics_SUITE_beam_files",
testonly = True,
srcs = ["test/msg_size_metrics_SUITE.erl"],
outs = ["test/msg_size_metrics_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
>>>>>>> 1e3f4e5db9 (Emit histogram metric for received message sizes per protocol (#12342))
13 changes: 8 additions & 5 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2336,7 +2336,9 @@ incoming_link_transfer(
{MsgBin0, FirstDeliveryId, FirstSettled}
end,
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
validate_message_size(PayloadBin, MaxMessageSize),
PayloadSize = iolist_size(PayloadBin),
validate_message_size(PayloadSize, MaxMessageSize),
rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize),

Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
Expand Down Expand Up @@ -3066,9 +3068,8 @@ validate_transfer_rcv_settle_mode(_, _) ->

validate_message_size(_, unlimited) ->
ok;
validate_message_size(Message, MaxMsgSize)
when is_integer(MaxMsgSize) ->
MsgSize = iolist_size(Message),
validate_message_size(MsgSize, MaxMsgSize)
when is_integer(MsgSize) ->
case MsgSize =< MaxMsgSize of
true ->
ok;
Expand All @@ -3082,7 +3083,9 @@ validate_message_size(Message, MaxMsgSize)
?V_1_0_LINK_ERROR_MESSAGE_SIZE_EXCEEDED,
"message size (~b bytes) > maximum message size (~b bytes)",
[MsgSize, MaxMsgSize])
end.
end;
validate_message_size(Msg, MaxMsgSize) ->
validate_message_size(iolist_size(Msg), MaxMsgSize).

-spec ensure_terminus(source | target,
term(),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ check_msg_size(Content, GCThreshold) ->
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
case Size =< MaxMessageSize of
true ->
ok;
rabbit_msg_size_metrics:observe(amqp091, Size);
false ->
Fmt = case MaxMessageSize of
?MAX_MSG_SIZE ->
Expand Down
21 changes: 14 additions & 7 deletions deps/rabbit/src/rabbit_global_counters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
boot_step/0,
init/1,
init/2,
overview/0,
prometheus_format/0,
increase_protocol_counter/3,
messages_received/2,
Expand All @@ -38,6 +37,10 @@
messages_dead_lettered_confirmed/3
]).

-ifdef(TEST).
-export([overview/0]).
-endif.

%% PROTOCOL COUNTERS:
-define(MESSAGES_RECEIVED, 1).
-define(MESSAGES_RECEIVED_CONFIRM, 2).
Expand Down Expand Up @@ -132,12 +135,14 @@
boot_step() ->
[begin
%% Protocol counters
init([{protocol, Proto}]),
Protocol = {protocol, Proto},
init([Protocol]),
rabbit_msg_size_metrics:init(Proto),

%% Protocol & Queue Type counters
init([{protocol, Proto}, {queue_type, rabbit_classic_queue}]),
init([{protocol, Proto}, {queue_type, rabbit_quorum_queue}]),
init([{protocol, Proto}, {queue_type, rabbit_stream_queue}])
init([Protocol, {queue_type, rabbit_classic_queue}]),
init([Protocol, {queue_type, rabbit_quorum_queue}]),
init([Protocol, {queue_type, rabbit_stream_queue}])
end || Proto <- [amqp091, amqp10]],

%% Dead Letter counters
Expand Down Expand Up @@ -192,8 +197,10 @@ init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetter
Counters = seshat:new(?MODULE, Labels, DeadLetterCounters),
persistent_term:put({?MODULE, QueueType, DLS}, Counters).

-ifdef(TEST).
overview() ->
seshat:overview(?MODULE).
-endif.

prometheus_format() ->
seshat:format(?MODULE).
Expand Down Expand Up @@ -247,13 +254,13 @@ publisher_created(Protocol) ->
counters:add(fetch(Protocol), ?PUBLISHERS, 1).

publisher_deleted(Protocol) ->
counters:add(fetch(Protocol), ?PUBLISHERS, -1).
counters:sub(fetch(Protocol), ?PUBLISHERS, 1).

consumer_created(Protocol) ->
counters:add(fetch(Protocol), ?CONSUMERS, 1).

consumer_deleted(Protocol) ->
counters:add(fetch(Protocol), ?CONSUMERS, -1).
counters:sub(fetch(Protocol), ?CONSUMERS, 1).

messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) ->
Index = case Reason of
Expand Down
143 changes: 143 additions & 0 deletions deps/rabbit/src/rabbit_msg_size_metrics.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

%% This module tracks received message size distribution as histogram.
%% (A histogram is represented by a set of counters, one for each bucket.)
-module(rabbit_msg_size_metrics).

-export([init/1,
observe/2,
prometheus_format/0]).

%% Integration tests.
-export([raw_buckets/1,
diff_raw_buckets/2]).

-ifdef(TEST).
-export([cleanup/1]).
-endif.

-define(BUCKET_1, 100).
-define(BUCKET_2, 1_000).
-define(BUCKET_3, 10_000).
-define(BUCKET_4, 100_000).
-define(BUCKET_5, 1_000_000).
-define(BUCKET_6, 10_000_000).
%% rabbit.max_message_size up to RabbitMQ 3.13 was 128 MiB.
%% rabbit.max_message_size since RabbitMQ 4.0 is 16 MiB.
%% To help finding an appropriate rabbit.max_message_size we also add a bucket for 50 MB.
-define(BUCKET_7, 50_000_000).
-define(BUCKET_8, 100_000_000).
%% 'infinity' means practically 512 MiB as hard limited in
%% https://github.com/rabbitmq/rabbitmq-server/blob/v4.0.2/deps/rabbit_common/include/rabbit.hrl#L254-L257
-define(BUCKET_9, 'infinity').

-define(MSG_SIZE_BUCKETS,
[{1, ?BUCKET_1},
{2, ?BUCKET_2},
{3, ?BUCKET_3},
{4, ?BUCKET_4},
{5, ?BUCKET_5},
{6, ?BUCKET_6},
{7, ?BUCKET_7},
{8, ?BUCKET_8},
{9, ?BUCKET_9}]).

-define(POS_MSG_SIZE_SUM, 10).

-type raw_buckets() :: [{BucketUpperBound :: non_neg_integer(),
NumObservations :: non_neg_integer()}].

-spec init(atom()) -> ok.
init(Protocol) ->
Size = ?POS_MSG_SIZE_SUM,
Counters = counters:new(Size, [write_concurrency]),
put_counters(Protocol, Counters).

-spec observe(atom(), non_neg_integer()) -> ok.
observe(Protocol, MessageSize) ->
BucketPos = find_bucket_pos(MessageSize),
Counters = get_counters(Protocol),
counters:add(Counters, BucketPos, 1),
counters:add(Counters, ?POS_MSG_SIZE_SUM, MessageSize).

-spec prometheus_format() -> #{atom() => map()}.
prometheus_format() ->
Values = [prometheus_values(Counters) || Counters <- get_labels_counters()],
#{message_size_bytes => #{type => histogram,
help => "Size of messages received from publishers",
values => Values}}.

find_bucket_pos(Size) when Size =< ?BUCKET_1 -> 1;
find_bucket_pos(Size) when Size =< ?BUCKET_2 -> 2;
find_bucket_pos(Size) when Size =< ?BUCKET_3 -> 3;
find_bucket_pos(Size) when Size =< ?BUCKET_4 -> 4;
find_bucket_pos(Size) when Size =< ?BUCKET_5 -> 5;
find_bucket_pos(Size) when Size =< ?BUCKET_6 -> 6;
find_bucket_pos(Size) when Size =< ?BUCKET_7 -> 7;
find_bucket_pos(Size) when Size =< ?BUCKET_8 -> 8;
find_bucket_pos(_Size) -> 9.

raw_buckets(Protocol)
when is_atom(Protocol) ->
Counters = get_counters(Protocol),
raw_buckets(Counters);
raw_buckets(Counters) ->
[{UpperBound, counters:get(Counters, Pos)}
|| {Pos, UpperBound} <- ?MSG_SIZE_BUCKETS].

-spec diff_raw_buckets(raw_buckets(), raw_buckets()) -> raw_buckets().
diff_raw_buckets(After, Before) ->
diff_raw_buckets(After, Before, []).

diff_raw_buckets([], [], Acc) ->
lists:reverse(Acc);
diff_raw_buckets([{UpperBound, CounterAfter} | After],
[{UpperBound, CounterBefore} | Before],
Acc) ->
case CounterAfter - CounterBefore of
0 ->
diff_raw_buckets(After, Before, Acc);
Diff ->
diff_raw_buckets(After, Before, [{UpperBound, Diff} | Acc])
end.

%% "If you have looked at a /metrics for a histogram, you probably noticed that the buckets
%% aren’t just a count of events that fall into them. The buckets also include a count of
%% events in all the smaller buckets, all the way up to the +Inf, bucket which is the total
%% number of events. This is known as a cumulative histogram, and why the bucket label
%% is called le, standing for less than or equal to.
%% This is in addition to buckets being counters, so Prometheus histograms are cumula‐
%% tive in two different ways."
%% [Prometheus: Up & Running]
prometheus_values({Labels, Counters}) ->
{Buckets, Count} = lists:mapfoldl(
fun({UpperBound, NumObservations}, Acc0) ->
Acc = Acc0 + NumObservations,
{{UpperBound, Acc}, Acc}
end, 0, raw_buckets(Counters)),
Sum = counters:get(Counters, ?POS_MSG_SIZE_SUM),
{Labels, Buckets, Count, Sum}.

put_counters(Protocol, Counters) ->
persistent_term:put({?MODULE, Protocol}, Counters).

get_counters(Protocol) ->
persistent_term:get({?MODULE, Protocol}).

get_labels_counters() ->
[{[{protocol, Protocol}], Counters}
|| {{?MODULE, Protocol}, Counters} <- persistent_term:get()].

-ifdef(TEST).
%% "Counters are not tied to the current process and are automatically
%% garbage collected when they are no longer referenced."
-spec cleanup(atom()) -> ok.
cleanup(Protocol) ->
persistent_term:erase({?MODULE, Protocol}),
ok.
-endif.
Loading
Loading