diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 0df1d4356b17..79be15ea3e42 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -478,6 +478,13 @@ rabbitmq_integration_suite( ], ) +rabbitmq_integration_suite( + name = "msg_size_metrics_SUITE", + runtime_deps = [ + "//deps/rabbitmq_amqp_client:erlang_app", + ], +) + rabbitmq_integration_suite( name = "list_consumers_sanity_check_SUITE", size = "medium", @@ -993,6 +1000,11 @@ rabbitmq_integration_suite( size = "medium", ) +rabbitmq_suite( + name = "unit_msg_size_metrics_SUITE", + size = "small", +) + rabbitmq_suite( name = "unit_operator_policy_SUITE", size = "small", diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 2f282740260d..ed504806794a 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -169,6 +169,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_metrics.erl", "src/rabbit_mirror_queue_misc.erl", "src/rabbit_mnesia.erl", + "src/rabbit_msg_size_metrics.erl", "src/rabbit_msg_store.erl", "src/rabbit_msg_store_gc.erl", "src/rabbit_networking.erl", @@ -425,6 +426,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_metrics.erl", "src/rabbit_mirror_queue_misc.erl", "src/rabbit_mnesia.erl", + "src/rabbit_msg_size_metrics.erl", "src/rabbit_msg_store.erl", "src/rabbit_msg_store_gc.erl", "src/rabbit_networking.erl", @@ -703,6 +705,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_metrics.erl", "src/rabbit_mirror_queue_misc.erl", "src/rabbit_mnesia.erl", + "src/rabbit_msg_size_metrics.erl", "src/rabbit_msg_store.erl", "src/rabbit_msg_store_gc.erl", "src/rabbit_networking.erl", @@ -1714,6 +1717,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) + erlang_bytecode( + name = "unit_msg_size_metrics_SUITE_beam_files", + testonly = True, + srcs = ["test/unit_msg_size_metrics_SUITE.erl"], + outs = ["test/unit_msg_size_metrics_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + ) erlang_bytecode( name = "unit_operator_policy_SUITE_beam_files", testonly = True, @@ -2175,3 +2186,23 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["//deps/amqp_client:erlang_app"], ) +<<<<<<< HEAD +======= + erlang_bytecode( + name = "test_rabbit_ct_hook_beam", + testonly = True, + srcs = ["test/rabbit_ct_hook.erl"], + outs = ["test/rabbit_ct_hook.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + ) + erlang_bytecode( + name = "msg_size_metrics_SUITE_beam_files", + testonly = True, + srcs = ["test/msg_size_metrics_SUITE.erl"], + outs = ["test/msg_size_metrics_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp_client:erlang_app"], + ) +>>>>>>> 1e3f4e5db9 (Emit histogram metric for received message sizes per protocol (#12342)) diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index a631927340f9..2885dd2b79fc 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -2336,7 +2336,9 @@ incoming_link_transfer( {MsgBin0, FirstDeliveryId, FirstSettled} end, validate_transfer_rcv_settle_mode(RcvSettleMode, Settled), - validate_message_size(PayloadBin, MaxMessageSize), + PayloadSize = iolist_size(PayloadBin), + validate_message_size(PayloadSize, MaxMessageSize), + rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize), Mc0 = mc:init(mc_amqp, PayloadBin, #{}), case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of @@ -3066,9 +3068,8 @@ validate_transfer_rcv_settle_mode(_, _) -> validate_message_size(_, unlimited) -> ok; -validate_message_size(Message, MaxMsgSize) - when is_integer(MaxMsgSize) -> - MsgSize = iolist_size(Message), +validate_message_size(MsgSize, MaxMsgSize) + when is_integer(MsgSize) -> case MsgSize =< MaxMsgSize of true -> ok; @@ -3082,7 +3083,9 @@ validate_message_size(Message, MaxMsgSize) ?V_1_0_LINK_ERROR_MESSAGE_SIZE_EXCEEDED, "message size (~b bytes) > maximum message size (~b bytes)", [MsgSize, MaxMsgSize]) - end. + end; +validate_message_size(Msg, MaxMsgSize) -> + validate_message_size(iolist_size(Msg), MaxMsgSize). -spec ensure_terminus(source | target, term(), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 908892781574..4be86370c390 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -985,7 +985,7 @@ check_msg_size(Content, GCThreshold) -> Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold), case Size =< MaxMessageSize of true -> - ok; + rabbit_msg_size_metrics:observe(amqp091, Size); false -> Fmt = case MaxMessageSize of ?MAX_MSG_SIZE -> diff --git a/deps/rabbit/src/rabbit_global_counters.erl b/deps/rabbit/src/rabbit_global_counters.erl index b5cdc5b627e1..7b480c91d6cf 100644 --- a/deps/rabbit/src/rabbit_global_counters.erl +++ b/deps/rabbit/src/rabbit_global_counters.erl @@ -13,7 +13,6 @@ boot_step/0, init/1, init/2, - overview/0, prometheus_format/0, increase_protocol_counter/3, messages_received/2, @@ -38,6 +37,10 @@ messages_dead_lettered_confirmed/3 ]). +-ifdef(TEST). +-export([overview/0]). +-endif. + %% PROTOCOL COUNTERS: -define(MESSAGES_RECEIVED, 1). -define(MESSAGES_RECEIVED_CONFIRM, 2). @@ -132,12 +135,14 @@ boot_step() -> [begin %% Protocol counters - init([{protocol, Proto}]), + Protocol = {protocol, Proto}, + init([Protocol]), + rabbit_msg_size_metrics:init(Proto), %% Protocol & Queue Type counters - init([{protocol, Proto}, {queue_type, rabbit_classic_queue}]), - init([{protocol, Proto}, {queue_type, rabbit_quorum_queue}]), - init([{protocol, Proto}, {queue_type, rabbit_stream_queue}]) + init([Protocol, {queue_type, rabbit_classic_queue}]), + init([Protocol, {queue_type, rabbit_quorum_queue}]), + init([Protocol, {queue_type, rabbit_stream_queue}]) end || Proto <- [amqp091, amqp10]], %% Dead Letter counters @@ -192,8 +197,10 @@ init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetter Counters = seshat:new(?MODULE, Labels, DeadLetterCounters), persistent_term:put({?MODULE, QueueType, DLS}, Counters). +-ifdef(TEST). overview() -> seshat:overview(?MODULE). +-endif. prometheus_format() -> seshat:format(?MODULE). @@ -247,13 +254,13 @@ publisher_created(Protocol) -> counters:add(fetch(Protocol), ?PUBLISHERS, 1). publisher_deleted(Protocol) -> - counters:add(fetch(Protocol), ?PUBLISHERS, -1). + counters:sub(fetch(Protocol), ?PUBLISHERS, 1). consumer_created(Protocol) -> counters:add(fetch(Protocol), ?CONSUMERS, 1). consumer_deleted(Protocol) -> - counters:add(fetch(Protocol), ?CONSUMERS, -1). + counters:sub(fetch(Protocol), ?CONSUMERS, 1). messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) -> Index = case Reason of diff --git a/deps/rabbit/src/rabbit_msg_size_metrics.erl b/deps/rabbit/src/rabbit_msg_size_metrics.erl new file mode 100644 index 000000000000..1faaa311a515 --- /dev/null +++ b/deps/rabbit/src/rabbit_msg_size_metrics.erl @@ -0,0 +1,143 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +%% This module tracks received message size distribution as histogram. +%% (A histogram is represented by a set of counters, one for each bucket.) +-module(rabbit_msg_size_metrics). + +-export([init/1, + observe/2, + prometheus_format/0]). + +%% Integration tests. +-export([raw_buckets/1, + diff_raw_buckets/2]). + +-ifdef(TEST). +-export([cleanup/1]). +-endif. + +-define(BUCKET_1, 100). +-define(BUCKET_2, 1_000). +-define(BUCKET_3, 10_000). +-define(BUCKET_4, 100_000). +-define(BUCKET_5, 1_000_000). +-define(BUCKET_6, 10_000_000). +%% rabbit.max_message_size up to RabbitMQ 3.13 was 128 MiB. +%% rabbit.max_message_size since RabbitMQ 4.0 is 16 MiB. +%% To help finding an appropriate rabbit.max_message_size we also add a bucket for 50 MB. +-define(BUCKET_7, 50_000_000). +-define(BUCKET_8, 100_000_000). +%% 'infinity' means practically 512 MiB as hard limited in +%% https://github.com/rabbitmq/rabbitmq-server/blob/v4.0.2/deps/rabbit_common/include/rabbit.hrl#L254-L257 +-define(BUCKET_9, 'infinity'). + +-define(MSG_SIZE_BUCKETS, + [{1, ?BUCKET_1}, + {2, ?BUCKET_2}, + {3, ?BUCKET_3}, + {4, ?BUCKET_4}, + {5, ?BUCKET_5}, + {6, ?BUCKET_6}, + {7, ?BUCKET_7}, + {8, ?BUCKET_8}, + {9, ?BUCKET_9}]). + +-define(POS_MSG_SIZE_SUM, 10). + +-type raw_buckets() :: [{BucketUpperBound :: non_neg_integer(), + NumObservations :: non_neg_integer()}]. + +-spec init(atom()) -> ok. +init(Protocol) -> + Size = ?POS_MSG_SIZE_SUM, + Counters = counters:new(Size, [write_concurrency]), + put_counters(Protocol, Counters). + +-spec observe(atom(), non_neg_integer()) -> ok. +observe(Protocol, MessageSize) -> + BucketPos = find_bucket_pos(MessageSize), + Counters = get_counters(Protocol), + counters:add(Counters, BucketPos, 1), + counters:add(Counters, ?POS_MSG_SIZE_SUM, MessageSize). + +-spec prometheus_format() -> #{atom() => map()}. +prometheus_format() -> + Values = [prometheus_values(Counters) || Counters <- get_labels_counters()], + #{message_size_bytes => #{type => histogram, + help => "Size of messages received from publishers", + values => Values}}. + +find_bucket_pos(Size) when Size =< ?BUCKET_1 -> 1; +find_bucket_pos(Size) when Size =< ?BUCKET_2 -> 2; +find_bucket_pos(Size) when Size =< ?BUCKET_3 -> 3; +find_bucket_pos(Size) when Size =< ?BUCKET_4 -> 4; +find_bucket_pos(Size) when Size =< ?BUCKET_5 -> 5; +find_bucket_pos(Size) when Size =< ?BUCKET_6 -> 6; +find_bucket_pos(Size) when Size =< ?BUCKET_7 -> 7; +find_bucket_pos(Size) when Size =< ?BUCKET_8 -> 8; +find_bucket_pos(_Size) -> 9. + +raw_buckets(Protocol) + when is_atom(Protocol) -> + Counters = get_counters(Protocol), + raw_buckets(Counters); +raw_buckets(Counters) -> + [{UpperBound, counters:get(Counters, Pos)} + || {Pos, UpperBound} <- ?MSG_SIZE_BUCKETS]. + +-spec diff_raw_buckets(raw_buckets(), raw_buckets()) -> raw_buckets(). +diff_raw_buckets(After, Before) -> + diff_raw_buckets(After, Before, []). + +diff_raw_buckets([], [], Acc) -> + lists:reverse(Acc); +diff_raw_buckets([{UpperBound, CounterAfter} | After], + [{UpperBound, CounterBefore} | Before], + Acc) -> + case CounterAfter - CounterBefore of + 0 -> + diff_raw_buckets(After, Before, Acc); + Diff -> + diff_raw_buckets(After, Before, [{UpperBound, Diff} | Acc]) + end. + +%% "If you have looked at a /metrics for a histogram, you probably noticed that the buckets +%% aren’t just a count of events that fall into them. The buckets also include a count of +%% events in all the smaller buckets, all the way up to the +Inf, bucket which is the total +%% number of events. This is known as a cumulative histogram, and why the bucket label +%% is called le, standing for less than or equal to. +%% This is in addition to buckets being counters, so Prometheus histograms are cumula‐ +%% tive in two different ways." +%% [Prometheus: Up & Running] +prometheus_values({Labels, Counters}) -> + {Buckets, Count} = lists:mapfoldl( + fun({UpperBound, NumObservations}, Acc0) -> + Acc = Acc0 + NumObservations, + {{UpperBound, Acc}, Acc} + end, 0, raw_buckets(Counters)), + Sum = counters:get(Counters, ?POS_MSG_SIZE_SUM), + {Labels, Buckets, Count, Sum}. + +put_counters(Protocol, Counters) -> + persistent_term:put({?MODULE, Protocol}, Counters). + +get_counters(Protocol) -> + persistent_term:get({?MODULE, Protocol}). + +get_labels_counters() -> + [{[{protocol, Protocol}], Counters} + || {{?MODULE, Protocol}, Counters} <- persistent_term:get()]. + +-ifdef(TEST). +%% "Counters are not tied to the current process and are automatically +%% garbage collected when they are no longer referenced." +-spec cleanup(atom()) -> ok. +cleanup(Protocol) -> + persistent_term:erase({?MODULE, Protocol}), + ok. +-endif. diff --git a/deps/rabbit/test/msg_size_metrics_SUITE.erl b/deps/rabbit/test/msg_size_metrics_SUITE.erl new file mode 100644 index 000000000000..0b33ecf1a36b --- /dev/null +++ b/deps/rabbit/test/msg_size_metrics_SUITE.erl @@ -0,0 +1,154 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(msg_size_metrics_SUITE). + +-compile([export_all, nowarn_export_all]). +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-import(rabbit_ct_broker_helpers, + [rpc/4]). + +all() -> + [ + {group, tests} + ]. + +groups() -> + [ + {tests, [shuffle], + [message_size, + over_max_message_size]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(amqp10_client), + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Test cases +%% ------------------------------------------------------------------- + +message_size(Config) -> + AmqplBefore = get_msg_size_metrics(amqp091, Config), + AmqpBefore = get_msg_size_metrics(amqp10, Config), + + Binary2B = <<"12">>, + Binary200K = binary:copy(<<"x">>, 200_000), + Payloads = [Binary2B, Binary200K, Binary2B], + + {AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + [amqp_channel:call(Ch, + #'basic.publish'{routing_key = <<"nowhere">>}, + #amqp_msg{payload = Payload}) + || Payload <- Payloads], + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), + {ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address), + receive {amqp10_event, {link, Sender, credited}} -> ok + after 5000 -> ct:fail(credited_timeout) + end, + + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)), + + ok = wait_for_settlement(released, <<"tag1">>), + ok = wait_for_settlement(released, <<"tag2">>), + ok = wait_for_settlement(released, <<"tag3">>), + + AmqplAfter = get_msg_size_metrics(amqp091, Config), + AmqpAfter = get_msg_size_metrics(amqp10, Config), + + ExpectedDiff = [{100, 2}, + {1_000_000, 1}], + ?assertEqual(ExpectedDiff, + rabbit_msg_size_metrics:diff_raw_buckets(AmqplAfter, AmqplBefore)), + ?assertEqual(ExpectedDiff, + rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)), + + ok = amqp10_client:close_connection(Connection), + ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch). + +over_max_message_size(Config) -> + DefaultMaxMessageSize = rpc(Config, persistent_term, get, [max_message_size]), + %% Limit the server to only accept messages up to 2KB. + MaxMessageSize = 2_000, + ok = rpc(Config, persistent_term, put, [max_message_size, MaxMessageSize]), + + Before = get_msg_size_metrics(amqp091, Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + MonitorRef = erlang:monitor(process, Ch), + MessageTooLarge = binary:copy(<<"x">>, MaxMessageSize + 1), + amqp_channel:call(Ch, + #'basic.publish'{routing_key = <<"none">>}, + #amqp_msg{payload = MessageTooLarge}), + receive {'DOWN', MonitorRef, process, Ch, Info} -> + ?assertEqual({shutdown, + {server_initiated_close, + 406, + <<"PRECONDITION_FAILED - message size 2001 is larger than configured max size 2000">>}}, + Info) + after 2000 -> ct:fail(expected_channel_closed) + end, + + After = get_msg_size_metrics(amqp091, Config), + %% No metrics should be increased if client sent message that is too large. + ?assertEqual(Before, After), + + ok = rabbit_ct_client_helpers:close_connection(Conn), + ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]). + +get_msg_size_metrics(Protocol, Config) -> + rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]). + +connection_config(Config) -> + Host = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + #{address => Host, + port => Port, + container_id => <<"my container">>, + sasl => anon}. + +wait_for_settlement(State, Tag) -> + receive + {amqp10_disposition, {State, Tag}} -> + ok + after 5000 -> + ct:fail({disposition_timeout, Tag}) + end. diff --git a/deps/rabbit/test/unit_msg_size_metrics_SUITE.erl b/deps/rabbit/test/unit_msg_size_metrics_SUITE.erl new file mode 100644 index 000000000000..cd496932cd92 --- /dev/null +++ b/deps/rabbit/test/unit_msg_size_metrics_SUITE.erl @@ -0,0 +1,64 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(unit_msg_size_metrics_SUITE). + +-include_lib("stdlib/include/assert.hrl"). + +-compile([nowarn_export_all, export_all]). + +all() -> + [ + {group, tests} + ]. + +groups() -> + [ + {tests, [], + [ + prometheus_format + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + ok = rabbit_msg_size_metrics:init(fake_protocol), + Config. + +end_per_suite(Config) -> + ok = rabbit_msg_size_metrics:cleanup(fake_protocol), + Config. + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +prometheus_format(_Config) -> + MsgSizes = [1, 100, 1_000_000_000, 99_000_000, 15_000, 15_000], + [ok = rabbit_msg_size_metrics:observe(fake_protocol, MsgSize) || MsgSize <- MsgSizes], + + ?assertEqual( + #{message_size_bytes => + #{type => histogram, + help => "Size of messages received from publishers", + values => [{ + [{protocol, fake_protocol}], + [{100, 2}, + {1_000, 2}, + {10_000, 2}, + {100_000, 4}, + {1_000_000, 4}, + {10_000_000, 4}, + {50_000_000, 4}, + {100_000_000, 5}, + {infinity, 6}], + length(MsgSizes), + lists:sum(MsgSizes)}]}}, + rabbit_msg_size_metrics:prometheus_format()). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl index c5ea59abedea..54eb7ed603df 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -87,7 +87,8 @@ init_global_counters(ProtoVer) -> rabbit_global_counters:init([Proto]), rabbit_global_counters:init([Proto, {queue_type, rabbit_classic_queue}]), rabbit_global_counters:init([Proto, {queue_type, rabbit_quorum_queue}]), - rabbit_global_counters:init([Proto, {queue_type, ?QUEUE_TYPE_QOS_0}]). + rabbit_global_counters:init([Proto, {queue_type, ?QUEUE_TYPE_QOS_0}]), + rabbit_msg_size_metrics:init(ProtoVer). persist_static_configuration() -> rabbit_mqtt_util:init_sparkplug(), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 15a65ff5f986..939d82b0d9e8 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -391,6 +391,7 @@ process_request(?PUBLISH, {ok, Topic, Props, State1} -> EffectiveQos = maybe_downgrade_qos(Qos), rabbit_global_counters:messages_received(ProtoVer, 1), + rabbit_msg_size_metrics:observe(ProtoVer, iolist_size(Payload)), State = maybe_increment_publisher(State1), Msg = #mqtt_msg{retain = Retain, qos = EffectiveQos, diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index e265243d9c99..16afac557d82 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -88,6 +88,7 @@ cluster_size_1_tests_v3() -> cluster_size_1_tests() -> [ global_counters %% must be the 1st test case + ,message_size_metrics ,block_only_publisher ,many_qos1_messages ,session_expiry @@ -691,6 +692,34 @@ global_counters(Config) -> messages_unroutable_returned_total => 1}, get_global_counters(Config, ProtoVer))). +message_size_metrics(Config) -> + Protocol = case ?config(mqtt_version, Config) of + v4 -> mqtt311; + v5 -> mqtt50 + end, + BucketsBefore = rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]), + + Topic = ClientId = atom_to_binary(?FUNCTION_NAME), + C = connect(ClientId, Config), + {ok, _, [0]} = emqtt:subscribe(C, Topic, qos0), + Payload1B = <<255>>, + Payload500B = binary:copy(Payload1B, 500), + Payload5KB = binary:copy(Payload1B, 5_000), + Payload2MB = binary:copy(Payload1B, 2_000_000), + Payloads = [Payload2MB, Payload5KB, Payload500B, Payload1B, Payload500B], + [ok = emqtt:publish(C, Topic, P, qos0) || P <- Payloads], + ok = expect_publishes(C, Topic, Payloads), + + BucketsAfter = rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]), + ?assertEqual( + [{100, 1}, + {1000, 2}, + {10_000, 1}, + {10_000_000, 1}], + rabbit_msg_size_metrics:diff_raw_buckets(BucketsAfter, BucketsBefore)), + + ok = emqtt:disconnect(C). + pubsub(Config) -> Topic0 = <<"t/0">>, Topic1 = <<"t/1">>, diff --git a/deps/rabbitmq_prometheus/app.bzl b/deps/rabbitmq_prometheus/app.bzl index a77dcbb9bb09..3084d1ced302 100644 --- a/deps/rabbitmq_prometheus/app.bzl +++ b/deps/rabbitmq_prometheus/app.bzl @@ -14,6 +14,7 @@ def all_beam_files(name = "all_beam_files"): "src/collectors/prometheus_rabbitmq_core_metrics_collector.erl", "src/collectors/prometheus_rabbitmq_dynamic_collector.erl", "src/collectors/prometheus_rabbitmq_global_metrics_collector.erl", + "src/collectors/prometheus_rabbitmq_message_size_metrics_collector.erl", "src/rabbit_prometheus_app.erl", "src/rabbit_prometheus_dispatcher.erl", "src/rabbit_prometheus_handler.erl", @@ -44,6 +45,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/collectors/prometheus_rabbitmq_core_metrics_collector.erl", "src/collectors/prometheus_rabbitmq_dynamic_collector.erl", "src/collectors/prometheus_rabbitmq_global_metrics_collector.erl", + "src/collectors/prometheus_rabbitmq_message_size_metrics_collector.erl", "src/rabbit_prometheus_app.erl", "src/rabbit_prometheus_dispatcher.erl", "src/rabbit_prometheus_handler.erl", @@ -85,6 +87,7 @@ def all_srcs(name = "all_srcs"): "src/collectors/prometheus_rabbitmq_core_metrics_collector.erl", "src/collectors/prometheus_rabbitmq_dynamic_collector.erl", "src/collectors/prometheus_rabbitmq_global_metrics_collector.erl", + "src/collectors/prometheus_rabbitmq_message_size_metrics_collector.erl", "src/rabbit_prometheus_app.erl", "src/rabbit_prometheus_dispatcher.erl", "src/rabbit_prometheus_handler.erl", diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index 0e4ed2c1294c..ac2a64383989 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -489,6 +489,14 @@ label({RemoteAddress, Username, Protocol}) when is_binary(RemoteAddress), is_bin V =/= <<>> end, [{remote_address, RemoteAddress}, {username, Username}, {protocol, atom_to_binary(Protocol, utf8)}]); +label({ + #resource{kind=queue, virtual_host=VHost, name=QName}, + #resource{kind=exchange, name=ExName} + }) -> + %% queue_exchange_metrics {queue_id, exchange_id} + <<"vhost=\"", (escape_label_value(VHost))/binary, "\",", + "exchange=\"", (escape_label_value(ExName))/binary, "\",", + "queue=\"", (escape_label_value(QName))/binary, "\"">>; label({I1, I2}) -> case {label(I1), label(I2)} of {<<>>, L} -> L; diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_global_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_global_metrics_collector.erl index af2073737724..0e7b027b8503 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_global_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_global_metrics_collector.erl @@ -29,22 +29,16 @@ register() -> ok = prometheus_registry:register_collector(?MODULE). -deregister_cleanup(_) -> ok. +deregister_cleanup(_) -> + ok. collect_mf(_Registry, Callback) -> - _ = maps:fold( - fun (Name, #{type := Type, help := Help, values := Values}, Acc) -> - Callback( - create_mf(?METRIC_NAME(Name), - Help, - Type, - maps:to_list(Values))), - Acc - end, - ok, - rabbit_global_counters:prometheus_format() - ). - -%% =================================================================== -%% Private functions -%% =================================================================== + maps:foreach( + fun(Name, #{type := Type, help := Help, values := Values}) -> + Callback( + create_mf(?METRIC_NAME(Name), + Help, + Type, + maps:to_list(Values))) + end, + rabbit_global_counters:prometheus_format()). diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_message_size_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_message_size_metrics_collector.erl new file mode 100644 index 000000000000..54a349547744 --- /dev/null +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_message_size_metrics_collector.erl @@ -0,0 +1,33 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% +-module(prometheus_rabbitmq_message_size_metrics_collector). + +-behaviour(prometheus_collector). +-include_lib("prometheus/include/prometheus.hrl"). + +-export([register/0, + deregister_cleanup/1, + collect_mf/2]). + +-define(METRIC_NAME_PREFIX, "rabbitmq_"). + +register() -> + ok = prometheus_registry:register_collector(?MODULE). + +deregister_cleanup(_) -> + ok. + +collect_mf(_Registry, Callback) -> + maps:foreach( + fun(Name, #{type := Type, + help := Help, + values := Values}) -> + MetricsFamily = prometheus_model_helpers:create_mf( + ?METRIC_NAME(Name), Help, Type, Values), + Callback(MetricsFamily) + end, + rabbit_msg_size_metrics:prometheus_format()). diff --git a/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl b/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl index 850494e00666..2b07be760098 100644 --- a/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl +++ b/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl @@ -16,6 +16,7 @@ build_dispatcher() -> prometheus_registry:register_collectors([ prometheus_rabbitmq_core_metrics_collector, prometheus_rabbitmq_global_metrics_collector, + prometheus_rabbitmq_message_size_metrics_collector, prometheus_rabbitmq_alarm_metrics_collector, prometheus_rabbitmq_dynamic_collector, prometheus_process_collector]), @@ -27,7 +28,8 @@ build_dispatcher() -> prometheus_vm_statistics_collector, prometheus_vm_msacc_collector, prometheus_rabbitmq_core_metrics_collector, - prometheus_rabbitmq_global_metrics_collector + prometheus_rabbitmq_global_metrics_collector, + prometheus_rabbitmq_message_size_metrics_collector ]), prometheus_registry:register_collectors('detailed', [ prometheus_rabbitmq_core_metrics_collector diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 1a9c514391be..a0c64ebc6c5d 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -38,13 +38,15 @@ groups() -> aggregated_metrics_test, specific_erlang_metrics_present_test, global_metrics_present_test, - global_metrics_single_metric_family_test + global_metrics_single_metric_family_test, + message_size_metrics_present ]}, {per_object_metrics, [], [ globally_configure_per_object_metrics_test, specific_erlang_metrics_present_test, global_metrics_present_test, - global_metrics_single_metric_family_test + global_metrics_single_metric_family_test, + message_size_metrics_present ]}, {per_object_endpoint_metrics, [], [ endpoint_per_object_metrics, @@ -490,6 +492,35 @@ global_metrics_present_test(Config) -> ?assertEqual(match, re:run(Body, "^rabbitmq_global_publishers{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_global_consumers{", [{capture, none}, multiline])). +message_size_metrics_present(Config) -> + {_Headers, Body} = http_get_with_pal(Config, [], 200), + + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"100\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"1000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"10000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"100000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"1000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"10000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"10000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"50000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"100000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp091\",le=\"\\+Inf\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_count{protocol=\"amqp091\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_sum{protocol=\"amqp091\"}", [{capture, none}, multiline])), + + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"100\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"1000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"10000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"100000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"1000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"10000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"10000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"50000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"100000000\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_bucket{protocol=\"amqp10\",le=\"\\+Inf\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_count{protocol=\"amqp10\"}", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_message_size_bytes_sum{protocol=\"amqp10\"}", [{capture, none}, multiline])). + global_metrics_single_metric_family_test(Config) -> {_Headers, Body} = http_get_with_pal(Config, [], 200), {match, MetricFamilyMatches} = re:run(Body, "TYPE rabbitmq_global_messages_acknowledged_total", [global]), diff --git a/moduleindex.yaml b/moduleindex.yaml index 02f800fcd252..ebadcd41d644 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -670,6 +670,7 @@ rabbit: - rabbit_metrics - rabbit_mirror_queue_misc - rabbit_mnesia +- rabbit_msg_size_metrics - rabbit_msg_store - rabbit_msg_store_gc - rabbit_networking @@ -1097,6 +1098,7 @@ rabbitmq_prometheus: - prometheus_rabbitmq_core_metrics_collector - prometheus_rabbitmq_dynamic_collector - prometheus_rabbitmq_global_metrics_collector +- prometheus_rabbitmq_message_size_metrics_collector - rabbit_prometheus_app - rabbit_prometheus_dispatcher - rabbit_prometheus_handler