Skip to content

Commit

Permalink
Merge pull request #9896 from rabbitmq/opt-mgmt-queue-listings
Browse files Browse the repository at this point in the history
Fix streams minority calculation
  • Loading branch information
michaelklishin authored Nov 8, 2023
2 parents 4a4285a + 8d2c0a6 commit 5deacfc
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 15 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1676,7 +1676,7 @@ online(Q) when ?is_amqqueue(Q) ->

format(Q, Ctx) when ?is_amqqueue(Q) ->
%% TODO: this should really just be voters
Nodes = get_nodes(Q),
Nodes = lists:sort(get_nodes(Q)),
Running = case Ctx of
#{running_nodes := Running0} ->
Running0;
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ format(Q, Ctx) ->
case amqqueue:get_pid(Q) of
Pid when is_pid(Pid) ->
LeaderNode = node(Pid),
Nodes = get_nodes(Q),
Nodes = lists:sort(get_nodes(Q)),
Running = case Ctx of
#{running_nodes := Running0} ->
Running0;
Expand Down Expand Up @@ -1203,4 +1203,4 @@ get_nodes(Q) when ?is_amqqueue(Q) ->

is_minority(All, Up) ->
MinQuorum = length(All) div 2 + 1,
length(Up) =< MinQuorum.
length(Up) < MinQuorum.
74 changes: 65 additions & 9 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-import(queue_utils, [wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
wait_for_messages_total/3,
wait_for_messages/2,
dirty_query/3,
ra_name/1]).
wait_for_messages_pending_ack/3,
wait_for_messages_total/3,
wait_for_messages/2,
dirty_query/3,
ra_name/1]).

-import(clustering_utils, [
assert_cluster_status/2,
Expand All @@ -39,9 +39,10 @@ all() ->

groups() ->
[
{single_node, [], all_tests()
++ memory_tests()
++ [node_removal_is_quorum_critical]},
{single_node, [], all_tests() ++
memory_tests() ++
[node_removal_is_quorum_critical,
format]},
{unclustered, [], [
{uncluster_size_2, [], [add_member]}
]},
Expand Down Expand Up @@ -85,7 +86,8 @@ groups() ->
leader_locator_balanced_maintenance,
leader_locator_balanced_random_maintenance,
leader_locator_policy,
status
status,
format
]
++ all_tests()},
{cluster_size_5, [], [start_queue,
Expand Down Expand Up @@ -2758,6 +2760,60 @@ status(Config) ->
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
ok.

format(Config) ->
%% tests rabbit_quorum_queue:format/2
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Server = hd(Nodes),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

Vhost = ?config(rmq_vhost, Config),
QName = #resource{virtual_host = Vhost,
kind = queue,
name = Q},
{ok, QRecord} = rabbit_ct_broker_helpers:rpc(Config, Server,
rabbit_amqqueue,
lookup, [QName]),
%% restart the quorum
Fmt = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_quorum_queue,
?FUNCTION_NAME, [QRecord, #{}]),

%% test all up case
?assertEqual(quorum, proplists:get_value(type, Fmt)),
?assertEqual(running, proplists:get_value(state, Fmt)),
?assertEqual(Server, proplists:get_value(leader, Fmt)),
?assertEqual(Server, proplists:get_value(node, Fmt)),
?assertEqual(Nodes, proplists:get_value(online, Fmt)),
?assertEqual(Nodes, proplists:get_value(members, Fmt)),

case length(Nodes) of
3 ->
[_, Server2, Server3] = Nodes,
ok = rabbit_control_helper:command(stop_app, Server2),
ok = rabbit_control_helper:command(stop_app, Server3),

Fmt2 = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_quorum_queue,
?FUNCTION_NAME, [QRecord, #{}]),
ok = rabbit_control_helper:command(start_app, Server2),
ok = rabbit_control_helper:command(start_app, Server3),
?assertEqual(quorum, proplists:get_value(type, Fmt2)),
?assertEqual(minority, proplists:get_value(state, Fmt2)),
?assertEqual(Server, proplists:get_value(leader, Fmt2)),
?assertEqual(Server, proplists:get_value(node, Fmt2)),
?assertEqual([Server], proplists:get_value(online, Fmt2)),
?assertEqual(Nodes, proplists:get_value(members, Fmt2)),
ok;
1 ->
ok
end,
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
ok.

peek_with_wrong_queue_type(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Expand Down
62 changes: 59 additions & 3 deletions deps/rabbit/test/rabbit_stream_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ all() ->

groups() ->
[
{single_node, [], [restart_single_node, recover]},
{single_node, [],
[restart_single_node,
recover,
format]},
{single_node_parallel_1, [parallel], all_tests_1()},
{single_node_parallel_2, [parallel], all_tests_2()},
{single_node_parallel_3, [parallel], all_tests_3()},
Expand All @@ -74,6 +77,7 @@ groups() ->
select_nodes_with_least_replicas,
recover_after_leader_and_coordinator_kill,
restart_stream,
format,
rebalance
]},
{cluster_size_3_1, [], [shrink_coordinator_cluster]},
Expand Down Expand Up @@ -1423,6 +1427,60 @@ restart_stream(Config) ->
ok
end.

format(Config) ->
%% tests rabbit_stream_queue:format/2
Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Server = hd(Nodes),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),

publish_confirm(Ch, Q, [<<"msg">>]),
Vhost = ?config(rmq_vhost, Config),
QName = #resource{virtual_host = Vhost,
kind = queue,
name = Q},
{ok, QRecord} = rabbit_ct_broker_helpers:rpc(Config, Server,
rabbit_amqqueue,
lookup, [QName]),
%% restart the stream
Fmt = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_stream_queue,
?FUNCTION_NAME, [QRecord, #{}]),

%% test all up case
?assertEqual(stream, proplists:get_value(type, Fmt)),
?assertEqual(running, proplists:get_value(state, Fmt)),
?assertEqual(Server, proplists:get_value(leader, Fmt)),
?assertEqual(Server, proplists:get_value(node, Fmt)),
?assertEqual(Nodes, proplists:get_value(online, Fmt)),
?assertEqual(Nodes, proplists:get_value(members, Fmt)),

case length(Nodes) of
3 ->
[_, Server2, Server3] = Nodes,
ok = rabbit_control_helper:command(stop_app, Server2),
ok = rabbit_control_helper:command(stop_app, Server3),

Fmt2 = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_stream_queue,
?FUNCTION_NAME, [QRecord, #{}]),
ok = rabbit_control_helper:command(start_app, Server2),
ok = rabbit_control_helper:command(start_app, Server3),
?assertEqual(stream, proplists:get_value(type, Fmt2)),
?assertEqual(minority, proplists:get_value(state, Fmt2)),
?assertEqual(Server, proplists:get_value(leader, Fmt2)),
?assertEqual(Server, proplists:get_value(node, Fmt2)),
?assertEqual([Server], proplists:get_value(online, Fmt2)),
?assertEqual(Nodes, proplists:get_value(members, Fmt2)),
ok;
1 ->
ok
end,
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]),
ok.


consume_from_last(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Expand Down Expand Up @@ -1924,11 +1982,9 @@ leader_failover_dedupe(Config) ->
ok = rabbit_ct_broker_helpers:stop_node(Config, DownNode),
%% this should cause a new leader to be elected and the channel on node 2
%% to have to resend any pending messages to ensure none is lost
ct:pal("preinfo", []),
rabbit_ct_helpers:await_condition(
fun() ->
Info = find_queue_info(Config, PubNode, [leader, members]),
ct:pal("info ~tp", [Info]),
NewLeader = proplists:get_value(leader, Info),
NewLeader =/= DownNode
end),
Expand Down

0 comments on commit 5deacfc

Please sign in to comment.