Skip to content

Commit

Permalink
Merge pull request #12303 from rabbitmq/issue-1049
Browse files Browse the repository at this point in the history
forget_cluster_node: delete all local classic queues when using Khepri store
  • Loading branch information
michaelklishin authored Sep 13, 2024
2 parents 731fb2f + a1893fb commit 4ec0f5e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 1 deletion.
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1819,6 +1819,7 @@ internal_delete(Queue, ActingUser, Reason) ->
%% TODO this is used by `rabbit_mnesia:remove_node_if_mnesia_running`
%% Does it make any sense once mnesia is not used/removed?
forget_all_durable(Node) ->
rabbit_log:info("Will remove all classic queues from node ~ts. The node is likely being removed from the cluster.", [Node]),
UpdateFun = fun(Q) ->
forget_node_for_queue(Q)
end,
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ remove_reachable_member(NodeToRemove) ->
NodeToRemove, khepri_cluster, reset, [?RA_CLUSTER_NAME]),
case Ret of
ok ->
rabbit_amqqueue:forget_all_durable(NodeToRemove),
?LOG_DEBUG(
"Node ~s removed from Khepri cluster \"~s\"",
[NodeToRemove, ?RA_CLUSTER_NAME],
Expand All @@ -559,6 +560,7 @@ remove_down_member(NodeToRemove) ->
Ret = ra:remove_member(ServerRef, ServerId, Timeout),
case Ret of
{ok, _, _} ->
rabbit_amqqueue:forget_all_durable(NodeToRemove),
?LOG_DEBUG(
"Node ~s removed from Khepri cluster \"~s\"",
[NodeToRemove, ?RA_CLUSTER_NAME],
Expand Down
27 changes: 26 additions & 1 deletion deps/rabbit/test/cli_forget_cluster_node_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ groups() ->
forget_cluster_node_with_all_last_streams,
forget_cluster_node_with_quorum_queues_and_streams,
forget_cluster_node_with_one_last_quorum_member_and_streams,
forget_cluster_node_with_one_last_stream_and_quorum_queues
forget_cluster_node_with_one_last_stream_and_quorum_queues,
forget_cluster_node_with_one_classic_queue
]}
].

Expand Down Expand Up @@ -354,6 +355,30 @@ forget_cluster_node_with_one_last_stream_and_quorum_queues(Config) ->
?awaitMatch(Members when length(Members) == 2, get_quorum_members(Rabbit, QQ1), 30000),
?awaitMatch(Members when length(Members) == 2, get_quorum_members(Rabbit, QQ2), 30000).

forget_cluster_node_with_one_classic_queue(Config) ->
[Rabbit, Hare, Bunny] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

assert_clustered([Rabbit, Hare, Bunny]),

Ch = rabbit_ct_client_helpers:open_channel(Config, Bunny),
CQ1 = <<"classic-queue-1">>,
declare(Ch, CQ1, [{<<"x-queue-type">>, longstr, <<"classic">>}]),

?awaitMatch([_], rabbit_ct_broker_helpers:rabbitmqctl_list(
Config, Rabbit,
["list_queues", "name", "--no-table-headers"]),
30000),

?assertEqual(ok, rabbit_control_helper:command(stop_app, Bunny)),
?assertEqual(ok, forget_cluster_node(Rabbit, Bunny)),

assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Rabbit, Hare]},
[Rabbit, Hare]),
?awaitMatch([], rabbit_ct_broker_helpers:rabbitmqctl_list(
Config, Rabbit,
["list_queues", "name", "--no-table-headers"]),
30000).

forget_cluster_node(Node, Removee) ->
rabbit_control_helper:command(forget_cluster_node, Node, [atom_to_list(Removee)],
[]).
Expand Down

0 comments on commit 4ec0f5e

Please sign in to comment.