From c9d97e61de062ca05b060b7ab327837e54e226d2 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Wed, 2 Oct 2024 12:28:18 +0100 Subject: [PATCH] Add test for QQ force_vhost_queues_shrink_member_to_current_member/1 (cherry picked from commit de0c0dbd89b7c278a1145833cbeda6b7d3de34eb) --- deps/rabbit/test/quorum_queue_SUITE.erl | 69 ++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 775555eac2c..deaf095409d 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -94,7 +94,8 @@ groups() -> single_active_consumer_priority_take_over, single_active_consumer_priority, force_shrink_member_to_current_member, - force_all_queues_shrink_member_to_current_member + force_all_queues_shrink_member_to_current_member, + force_vhost_queues_shrink_member_to_current_member ] ++ all_tests()}, {cluster_size_5, [], [start_queue, @@ -1233,6 +1234,72 @@ force_all_queues_shrink_member_to_current_member(Config) -> ?assertEqual(3, length(Nodes0)) end || Q <- QQs]. +force_vhost_queues_shrink_member_to_current_member(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch0 = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + AQ = ?config(alt_queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch0, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch0, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + QQs = [QQ, AQ], + + VHost1 = <<"/">>, + VHost2 = <<"another-vhost">>, + VHosts = [VHost1, VHost2], + + User = ?config(rmq_username, Config), + ok = rabbit_ct_broker_helpers:add_vhost(Config, Server0, VHost2, User), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost2), + Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Server0, VHost2), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch1, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + [rabbit_ct_client_helpers:publish(Ch, Q, 3) || Q <- QQs, Ch <- [Ch0, Ch1]], + + [begin + QQRes = rabbit_misc:r(VHost, queue, Q), + {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(3, length(Nodes0)) + end || Q <- QQs, VHost <- VHosts], + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_vhost_queues_shrink_member_to_current_member, [VHost2]), + + [begin + QQRes = rabbit_misc:r(VHost, queue, Q), + {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + case VHost of + VHost1 -> ?assertEqual(3, length(Nodes0)); + VHost2 -> ?assertEqual(1, length(Nodes0)) + end + end || Q <- QQs, VHost <- VHosts], + + %% grow queues back to all nodes in VHost2 only + [rpc:call(Server0, rabbit_quorum_queue, grow, [S, VHost2, <<".*">>, all]) || S <- [Server1, Server2]], + + [begin + QQRes = rabbit_misc:r(VHost, queue, Q), + {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(3, length(Nodes0)) + end || Q <- QQs, VHost <- VHosts]. + priority_queue_fifo(Config) -> %% testing: if hi priority messages are published before lo priority %% messages they are always consumed first (fifo)