diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 18cc10f55ef..6d4eb2cae82 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -74,6 +74,7 @@ -export([validate_policy/1, merge_policy_value/3]). -export([force_shrink_member_to_current_member/2, + force_vhost_queues_shrink_member_to_current_member/1, force_all_queues_shrink_member_to_current_member/0]). %% for backwards compatibility @@ -1376,6 +1377,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> _ = rabbit_amqqueue:update(QName, Fun), case ra:force_delete_server(?RA_SYSTEM, ServerId) of ok -> + rabbit_log:info("Deleted a replica of quorum ~ts on node ~ts", [rabbit_misc:rs(QName), Node]), ok; {error, {badrpc, nodedown}} -> ok; @@ -1951,12 +1953,14 @@ notify_decorators(QName, F, A) -> is_stateful() -> true. force_shrink_member_to_current_member(VHost, Name) -> - rabbit_log:warning("Disaster recovery procedure: shrinking ~p queue at vhost ~p to a single node cluster", [Name, VHost]), Node = node(), QName = rabbit_misc:r(VHost, queue, Name), + QNameFmt = rabbit_misc:rs(QName), + rabbit_log:warning("Shrinking ~ts to a single node: ~ts", [QNameFmt, Node]), case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?is_amqqueue(Q) -> {RaName, _} = amqqueue:get_pid(Q), + OtherNodes = lists:delete(Node, get_nodes(Q)), ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}), Fun = fun (Q0) -> TS0 = amqqueue:get_type_state(Q0), @@ -1964,28 +1968,40 @@ force_shrink_member_to_current_member(VHost, Name) -> amqqueue:set_type_state(Q, TS) end, _ = rabbit_amqqueue:update(QName, Fun), - rabbit_log:warning("Disaster recovery procedure: shrinking finished"); + _ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes], + rabbit_log:warning("Shrinking ~ts finished", [QNameFmt]); _ -> - rabbit_log:warning("Disaster recovery procedure: shrinking failed, queue ~p not found at vhost ~p", [Name, VHost]), + rabbit_log:warning("Shrinking failed, ~ts not found", [QNameFmt]), {error, not_found} end. +force_vhost_queues_shrink_member_to_current_member(VHost) when is_binary(VHost) -> + rabbit_log:warning("Shrinking all quorum queues in vhost '~ts' to a single node: ~ts", [VHost, node()]), + ListQQs = fun() -> rabbit_amqqueue:list(VHost) end, + force_all_queues_shrink_member_to_current_member(ListQQs). + force_all_queues_shrink_member_to_current_member() -> - rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues to a single node cluster"), + rabbit_log:warning("Shrinking all quorum queues to a single node: ~ts", [node()]), + ListQQs = fun() -> rabbit_amqqueue:list() end, + force_all_queues_shrink_member_to_current_member(ListQQs). + +force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(ListQQFun) -> Node = node(), _ = [begin QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), - rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]), + OtherNodes = lists:delete(Node, get_nodes(Q)), + rabbit_log:warning("Shrinking queue ~ts to a single node: ~ts", [rabbit_misc:rs(QName), Node]), ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}), Fun = fun (QQ) -> TS0 = amqqueue:get_type_state(QQ), TS = TS0#{nodes => [Node]}, amqqueue:set_type_state(QQ, TS) end, - _ = rabbit_amqqueue:update(QName, Fun) - end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE], - rabbit_log:warning("Disaster recovery procedure: shrinking finished"), + _ = rabbit_amqqueue:update(QName, Fun), + _ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes] + end || Q <- ListQQFun(), amqqueue:get_type(Q) == ?MODULE], + rabbit_log:warning("Shrinking finished"), ok. is_minority(All, Up) -> diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 06341e37b85..deaf095409d 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -92,7 +92,10 @@ groups() -> format, add_member_2, single_active_consumer_priority_take_over, - single_active_consumer_priority + single_active_consumer_priority, + force_shrink_member_to_current_member, + force_all_queues_shrink_member_to_current_member, + force_vhost_queues_shrink_member_to_current_member ] ++ all_tests()}, {cluster_size_5, [], [start_queue, @@ -1152,6 +1155,151 @@ single_active_consumer_priority(Config) -> rpc:call(Server0, ra, local_query, [RaNameQ3, QueryFun])), ok. +force_shrink_member_to_current_member(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + rabbit_ct_client_helpers:publish(Ch, QQ, 3), + wait_for_messages_ready([Server0], RaName, 3), + + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(3, length(Nodes0)), + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_shrink_member_to_current_member, [<<"/">>, QQ]), + + wait_for_messages_ready([Server0], RaName, 3), + + {ok, Q1} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), + #{nodes := Nodes1} = amqqueue:get_type_state(Q1), + ?assertEqual(1, length(Nodes1)), + + %% grow queues back to all nodes + [rpc:call(Server0, rabbit_quorum_queue, grow, [S, <<"/">>, <<".*">>, all]) || S <- [Server1, Server2]], + + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q2} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), + #{nodes := Nodes2} = amqqueue:get_type_state(Q2), + ?assertEqual(3, length(Nodes2)). + +force_all_queues_shrink_member_to_current_member(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + AQ = ?config(alt_queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + QQs = [QQ, AQ], + + [begin + RaName = ra_name(Q), + rabbit_ct_client_helpers:publish(Ch, Q, 3), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(3, length(Nodes0)) + end || Q <- QQs], + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + + [begin + RaName = ra_name(Q), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(1, length(Nodes0)) + end || Q <- QQs], + + %% grow queues back to all nodes + [rpc:call(Server0, rabbit_quorum_queue, grow, [S, <<"/">>, <<".*">>, all]) || S <- [Server1, Server2]], + + [begin + RaName = ra_name(Q), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(3, length(Nodes0)) + end || Q <- QQs]. + +force_vhost_queues_shrink_member_to_current_member(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch0 = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + AQ = ?config(alt_queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch0, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch0, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + QQs = [QQ, AQ], + + VHost1 = <<"/">>, + VHost2 = <<"another-vhost">>, + VHosts = [VHost1, VHost2], + + User = ?config(rmq_username, Config), + ok = rabbit_ct_broker_helpers:add_vhost(Config, Server0, VHost2, User), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost2), + Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Server0, VHost2), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch1, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + [rabbit_ct_client_helpers:publish(Ch, Q, 3) || Q <- QQs, Ch <- [Ch0, Ch1]], + + [begin + QQRes = rabbit_misc:r(VHost, queue, Q), + {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(3, length(Nodes0)) + end || Q <- QQs, VHost <- VHosts], + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_vhost_queues_shrink_member_to_current_member, [VHost2]), + + [begin + QQRes = rabbit_misc:r(VHost, queue, Q), + {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + case VHost of + VHost1 -> ?assertEqual(3, length(Nodes0)); + VHost2 -> ?assertEqual(1, length(Nodes0)) + end + end || Q <- QQs, VHost <- VHosts], + + %% grow queues back to all nodes in VHost2 only + [rpc:call(Server0, rabbit_quorum_queue, grow, [S, VHost2, <<".*">>, all]) || S <- [Server1, Server2]], + + [begin + QQRes = rabbit_misc:r(VHost, queue, Q), + {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(3, length(Nodes0)) + end || Q <- QQs, VHost <- VHosts]. + priority_queue_fifo(Config) -> %% testing: if hi priority messages are published before lo priority %% messages they are always consumed first (fifo)