diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 0f880190e007..b22fbf7b4fb8 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -1144,7 +1144,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/policy_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/amqp_client:erlang_app"], + deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) erlang_bytecode( name = "priority_queue_SUITE_beam_files", diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 6839fbc24c27..4fc42f8f3db8 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -795,6 +795,8 @@ end}. _ -> V end, {["default_policies","operator",ID|T], NewV}; + ({["default_policies","operator",ID, "queue_pattern"], V}) -> + {["default_policies","operator",ID,"queue_pattern"], list_to_binary(V)}; (E) -> E end), case Props of diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 41ab36106abd..a4506982eba1 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -484,7 +484,7 @@ capabilities() -> <<"max-age">>, <<"stream-max-segment-size-bytes">>, <<"queue-leader-locator">>, <<"initial-cluster-size">>, %% Quorum policies - <<"delivery-limit">>, <<"dead-letter-strategy">>], + <<"delivery-limit">>, <<"dead-letter-strategy">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>, <<"target-group-size">>], queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>, <<"x-max-length">>, <<"x-max-length-bytes">>, <<"x-max-priority">>, diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 0204a1428a69..aca9b545482b 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -1043,7 +1043,7 @@ capabilities() -> <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>, <<"queue-master-locator">>, %% Quorum policies - <<"dead-letter-strategy">>], + <<"dead-letter-strategy">>, <<"target-group-size">>], queue_arguments => [<<"x-max-length-bytes">>, <<"x-queue-type">>, <<"x-max-age">>, <<"x-stream-max-segment-size-bytes">>, <<"x-initial-cluster-size">>, <<"x-queue-leader-locator">>], diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 22599502d305..45778ba119ea 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -158,7 +158,7 @@ ssl_options.fail_if_no_peer_cert = true", {<<"ha_mode">>, <<"exactly">>}, {<<"ha_params">>, 2}, {<<"ha_sync_mode">>, <<"automatic">>}, - {<<"queue_pattern">>, "apple"}, + {<<"queue_pattern">>, <<"apple">>}, {<<"vhost_pattern">>, "banana"}]}]}]}]}], []}, diff --git a/deps/rabbit/test/policy_SUITE.erl b/deps/rabbit/test/policy_SUITE.erl index 822b925c58cd..905e9a477ddd 100644 --- a/deps/rabbit/test/policy_SUITE.erl +++ b/deps/rabbit/test/policy_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("stdlib/include/assert.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile(export_all). @@ -26,7 +27,16 @@ groups() -> operator_policy_ttl, operator_retroactive_policy_ttl, operator_retroactive_policy_publish_ttl, - queue_type_specific_policies + queue_type_specific_policies, + is_supported_operator_policy_expires, + is_supported_operator_policy_message_ttl, + is_supported_operator_policy_max_length, + is_supported_operator_policy_max_length, + is_supported_operator_policy_max_in_memory_length, + is_supported_operator_policy_max_in_memory_bytes, + is_supported_operator_policy_delivery_limit, + is_supported_operator_policy_target_group_size, + is_supported_operator_policy_ha ]} ]. @@ -246,18 +256,124 @@ queue_type_specific_policies(Config) -> rabbit_ct_client_helpers:close_connection(Conn), passed. +%% See supported policies in https://www.rabbitmq.com/parameters.html#operator-policies +%% This test applies all supported operator policies to all queue types, +%% and later verifies the effective policy definitions. +%% Just those supported by each queue type should be present. + +is_supported_operator_policy_expires(Config) -> + Value = 6000000, + effective_operator_policy_per_queue_type( + Config, <<"expires">>, Value, Value, Value, undefined). + +is_supported_operator_policy_message_ttl(Config) -> + Value = 1000, + effective_operator_policy_per_queue_type( + Config, <<"message-ttl">>, Value, Value, Value, undefined). + +is_supported_operator_policy_max_length(Config) -> + Value = 500, + effective_operator_policy_per_queue_type( + Config, <<"max-length">>, Value, Value, Value, undefined). + +is_supported_operator_policy_max_length_bytes(Config) -> + Value = 1500, + effective_operator_policy_per_queue_type( + Config, <<"max-length-bytes">>, Value, Value, Value, Value). + +is_supported_operator_policy_max_in_memory_length(Config) -> + Value = 30, + effective_operator_policy_per_queue_type( + Config, <<"max-in-memory-length">>, Value, undefined, Value, undefined). + +is_supported_operator_policy_max_in_memory_bytes(Config) -> + Value = 50000, + effective_operator_policy_per_queue_type( + Config, <<"max-in-memory-bytes">>, Value, undefined, Value, undefined). + +is_supported_operator_policy_delivery_limit(Config) -> + Value = 3, + effective_operator_policy_per_queue_type( + Config, <<"delivery-limit">>, Value, undefined, Value, undefined). + +is_supported_operator_policy_target_group_size(Config) -> + Value = 5, + effective_operator_policy_per_queue_type( + Config, <<"target-group-size">>, Value, undefined, Value, undefined). + +is_supported_operator_policy_ha(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + ClassicQ = <<"classic_queue">>, + QuorumQ = <<"quorum_queue">>, + StreamQ = <<"stream_queue">>, + + declare(Ch, ClassicQ, [{<<"x-queue-type">>, longstr, <<"classic">>}]), + declare(Ch, QuorumQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + declare(Ch, StreamQ, [{<<"x-queue-type">>, longstr, <<"stream">>}]), + + rabbit_ct_broker_helpers:set_operator_policy( + Config, 0, <<"operator-policy">>, <<".*">>, <<"all">>, + [{<<"ha-mode">>, <<"exactly">>}, + {<<"ha-params">>, 2}, + {<<"ha-sync-mode">>, <<"automatic">>}]), + + ?awaitMatch(<<"exactly">>, check_policy_value(Server, ClassicQ, <<"ha-mode">>), 30_000), + ?awaitMatch(2, check_policy_value(Server, ClassicQ, <<"ha-params">>), 30_000), + ?awaitMatch(<<"automatic">>, check_policy_value(Server, ClassicQ, <<"ha-sync-mode">>), 30_000), + ?awaitMatch(undefined, check_policy_value(Server, QuorumQ, <<"ha-mode">>), 30_000), + ?awaitMatch(undefined, check_policy_value(Server, StreamQ, <<"ha-mode">>), 30_000), + + rabbit_ct_broker_helpers:clear_operator_policy(Config, 0, <<"operator-policy">>), + + delete(Ch, ClassicQ), + delete(Ch, QuorumQ), + delete(Ch, StreamQ), + + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), + passed. + +effective_operator_policy_per_queue_type(Config, Name, Value, ClassicValue, QuorumValue, StreamValue) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + ClassicQ = <<"classic_queue">>, + QuorumQ = <<"quorum_queue">>, + StreamQ = <<"stream_queue">>, + + declare(Ch, ClassicQ, [{<<"x-queue-type">>, longstr, <<"classic">>}]), + declare(Ch, QuorumQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + declare(Ch, StreamQ, [{<<"x-queue-type">>, longstr, <<"stream">>}]), + + rabbit_ct_broker_helpers:set_operator_policy( + Config, 0, <<"operator-policy">>, <<".*">>, <<"all">>, + [{Name, Value}]), + + ?awaitMatch(ClassicValue, check_policy_value(Server, ClassicQ, Name), 30_000), + ?awaitMatch(QuorumValue, check_policy_value(Server, QuorumQ, Name), 30_000), + ?awaitMatch(StreamValue, check_policy_value(Server, StreamQ, Name), 30_000), + + rabbit_ct_broker_helpers:clear_operator_policy(Config, 0, <<"operator-policy">>), + + delete(Ch, ClassicQ), + delete(Ch, QuorumQ), + delete(Ch, StreamQ), + + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), + passed. %%---------------------------------------------------------------------------- declare(Ch, Q) -> amqp_channel:call(Ch, #'queue.declare'{queue = Q, - durable = true}). + durable = true}). declare(Ch, Q, Args) -> amqp_channel:call(Ch, #'queue.declare'{queue = Q, - durable = true, - arguments = Args}). + durable = true, + arguments = Args}). delete(Ch, Q) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q}). @@ -305,8 +421,13 @@ get_messages(Number, Ch, Q) -> end. check_policy_value(Server, QName, Value) -> + ct:pal("QUEUES ~p", + [rpc:call(Server, rabbit_amqqueue, list, [])]), {ok, Q} = rpc:call(Server, rabbit_amqqueue, lookup, [rabbit_misc:r(<<"/">>, queue, QName)]), - proplists:get_value(Value, rpc:call(Server, rabbit_policy, effective_definition, [Q])). + case rpc:call(Server, rabbit_policy, effective_definition, [Q]) of + List when is_list(List) -> proplists:get_value(Value, List); + Any -> Any + end. verify_policies(Policy, OperPolicy, VerifyFuns, #{config := Config, server := Server, diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs index 0593e43427d4..7b0e7728659b 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/policies.ejs @@ -278,30 +278,37 @@
- + - + - +