Skip to content

Commit

Permalink
Merge pull request #12020 from rabbitmq/mergify/bp/v3.13.x/pr-12019
Browse files Browse the repository at this point in the history
  • Loading branch information
the-mikedavis authored Aug 15, 2024
2 parents 3e26f53 + 5f46547 commit dfae1e3
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 38 deletions.
48 changes: 30 additions & 18 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -278,22 +278,30 @@ get_queue_type(Args, DefaultQueueType) ->
rabbit_queue_type:discover(V)
end.

-spec internal_declare(amqqueue:amqqueue(), boolean()) ->
{created | existing, amqqueue:amqqueue()} | queue_absent().
-spec internal_declare(Queue, Recover) -> Ret when
Queue :: amqqueue:amqqueue(),
Recover :: boolean(),
Ret :: {created | existing, amqqueue:amqqueue()} |
queue_absent() |
rabbit_khepri:timeout_error().

internal_declare(Q, Recover) ->
do_internal_declare(Q, Recover).

do_internal_declare(Q0, true) ->
%% TODO Why do we return the old state instead of the actual one?
%% I'm leaving it like it was before the khepri refactor, because
%% rabbit_amqqueue_process:init_it2 compares the result of this declare to decide
%% if continue or stop. If we return the actual one, it fails and the queue stops
%% silently during init.
%% Maybe we should review this bit of code at some point.
Q = amqqueue:set_state(Q0, live),
ok = store_queue(Q),
{created, Q0};
case store_queue(Q) of
ok ->
%% TODO Why do we return the old state instead of the actual one?
%% I'm leaving it like it was before the khepri refactor, because
%% rabbit_amqqueue_process:init_it2 compares the result of this
%% declare to decide if continue or stop. If we return the actual
%% one, it fails and the queue stops silently during init.
%% Maybe we should review this bit of code at some point.
{created, Q0};
{error, timeout} = Err ->
Err
end;
do_internal_declare(Q0, false) ->
Q = rabbit_policy:set(amqqueue:set_state(Q0, live)),
Queue = rabbit_queue_decorator:set(Q),
Expand All @@ -306,12 +314,18 @@ do_internal_declare(Q0, false) ->
update(Name, Fun) ->
rabbit_db_queue:update(Name, Fun).

%% only really used for quorum queues to ensure the rabbit_queue record
-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when
Queue :: amqqueue:amqqueue(),
Ret :: ok | {error, timeout}.

%% only really used for stream queues to ensure the rabbit_queue record
%% is initialised
ensure_rabbit_queue_record_is_initialized(Q) ->
store_queue(Q).

-spec store_queue(amqqueue:amqqueue()) -> 'ok'.
-spec store_queue(Queue) -> Ret when
Queue :: amqqueue:amqqueue(),
Ret :: ok | {error, timeout}.

store_queue(Q0) ->
Q = rabbit_queue_decorator:set(Q0),
Expand Down Expand Up @@ -352,12 +366,10 @@ is_server_named_allowed(Args) ->
Type = get_queue_type(Args),
rabbit_queue_type:is_server_named_allowed(Type).

-spec lookup
(name()) ->
rabbit_types:ok(amqqueue:amqqueue()) |
rabbit_types:error('not_found');
([name()]) ->
[amqqueue:amqqueue()].
-spec lookup(QueueName) -> Ret when
QueueName :: name(),
Ret :: rabbit_types:ok(amqqueue:amqqueue())
| rabbit_types:error('not_found').

lookup(Name) when is_record(Name, resource) ->
rabbit_db_queue:get(Name).
Expand Down
8 changes: 7 additions & 1 deletion deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ init_it2(Recover, From, State = #q{q = Q,
false ->
{stop, normal, {existing, Q1}, State}
end;
{error, timeout} ->
Reason = {protocol_error, internal_error,
"Could not declare ~ts on node '~ts' because the "
"metadata store operation timed out",
[rabbit_misc:rs(amqqueue:get_name(Q)), node()]},
{stop, normal, Reason, State};
Err ->
{stop, normal, Err, State}
end.
Expand Down Expand Up @@ -305,7 +311,7 @@ terminate(normal, State) -> %% delete case
terminate(_Reason, State = #q{q = Q}) ->
terminate_shutdown(fun (BQS) ->
Q2 = amqqueue:set_state(Q, crashed),
rabbit_amqqueue:store_queue(Q2),
_ = rabbit_amqqueue:store_queue(Q2),
BQS
end, State).

Expand Down
10 changes: 7 additions & 3 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,10 @@ get_all_by_type_and_node_in_khepri(VHostName, Type, Node) ->

-spec create_or_get(Queue) -> Ret when
Queue :: amqqueue:amqqueue(),
Ret :: {created, Queue} | {existing, Queue} | {absent, Queue, nodedown}.
Ret :: {created, Queue} |
{existing, Queue} |
{absent, Queue, nodedown} |
rabbit_khepri:timeout_error().
%% @doc Writes a queue record if it doesn't exist already or returns the existing one
%%
%% @returns the existing record if there is one in the database already, or the newly
Expand Down Expand Up @@ -939,8 +942,9 @@ create_or_get_in_khepri(Q) ->
%% set().
%% -------------------------------------------------------------------

-spec set(Queue) -> ok when
Queue :: amqqueue:amqqueue().
-spec set(Queue) -> Ret when
Queue :: amqqueue:amqqueue(),
Ret :: ok | rabbit_khepri:timeout_error().
%% @doc Writes a queue record. If the queue is durable, it writes both instances:
%% durable and transient. For the durable one, it resets mirrors and decorators.
%% The transient one is left as it is.
Expand Down
7 changes: 6 additions & 1 deletion deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,12 @@ start_cluster(Q) ->
declare_queue_error(Error, NewQ, LeaderNode, ActingUser)
end;
{existing, _} = Ex ->
Ex
Ex;
{error, timeout} ->
{protocol_error, internal_error,
"Could not declare quorum ~ts on node '~ts' because the metadata "
"store operation timed out",
[rabbit_misc:rs(QName), node()]}
end.

declare_queue_error(Error, Queue, Leader, ActingUser) ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,7 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
#{name := S} when S == StreamId ->
rabbit_log:debug("~ts: initializing queue record for stream id ~ts",
[?MODULE, StreamId]),
_ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
ok = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
ok;
_ ->
ok
Expand Down
41 changes: 29 additions & 12 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,37 @@ create_stream(Q0) ->
case rabbit_stream_coordinator:new_stream(Q, Leader) of
{ok, {ok, LeaderPid}, _} ->
%% update record with leader pid
set_leader_pid(LeaderPid, amqqueue:get_name(Q)),
rabbit_event:notify(queue_created,
[{name, QName},
{durable, true},
{auto_delete, false},
{arguments, Arguments},
{type, amqqueue:get_type(Q1)},
{user_who_performed_action,
ActingUser}]),
{new, Q};
case set_leader_pid(LeaderPid, amqqueue:get_name(Q)) of
ok ->
rabbit_event:notify(queue_created,
[{name, QName},
{durable, true},
{auto_delete, false},
{arguments, Arguments},
{type, amqqueue:get_type(Q1)},
{user_who_performed_action,
ActingUser}]),
{new, Q};
{error, timeout} ->
{protocol_error, internal_error,
"Could not set leader PID for ~ts on node '~ts' "
"because the metadata store operation timed out",
[rabbit_misc:rs(QName), node()]}
end;
Error ->
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
{protocol_error, internal_error, "Cannot declare a queue '~ts' on node '~ts': ~255p",
{protocol_error, internal_error, "Cannot declare ~ts on node '~ts': ~255p",
[rabbit_misc:rs(QName), node(), Error]}
end;
{existing, Q} ->
{existing, Q};
{absent, Q, Reason} ->
{absent, Q, Reason}
{absent, Q, Reason};
{error, timeout} ->
{protocol_error, internal_error,
"Could not declare ~ts on node '~ts' because the metadata store "
"operation timed out",
[rabbit_misc:rs(QName), node()]}
end.

-spec delete(amqqueue:amqqueue(), boolean(),
Expand Down Expand Up @@ -1178,6 +1190,11 @@ resend_all(#stream_client{leader = LeaderPid,
end || {Seq, Msg} <- Msgs],
State.

-spec set_leader_pid(Pid, QName) -> Ret when
Pid :: pid(),
QName :: rabbit_amqqueue:name(),
Ret :: ok | {error, timeout}.

set_leader_pid(Pid, QName) ->
%% TODO this should probably be a single khepri transaction for better performance.
Fun = fun (Q) ->
Expand Down
11 changes: 9 additions & 2 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ is_stateful() ->

-spec declare(amqqueue:amqqueue(), node()) ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}.
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()} |
{protocol_error, internal_error, string(), [string()]}.
declare(Q0, _Node) ->
QName = amqqueue:get_name(Q0),
Q1 = case amqqueue:get_pid(Q0) of
none ->
%% declaring process becomes the queue
Expand All @@ -85,14 +87,19 @@ declare(Q0, _Node) ->
Opts = amqqueue:get_options(Q),
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
rabbit_event:notify(queue_created,
[{name, amqqueue:get_name(Q)},
[{name, QName},
{durable, true},
{auto_delete, false},
{exclusive, true},
{type, amqqueue:get_type(Q)},
{arguments, amqqueue:get_arguments(Q)},
{user_who_performed_action, ActingUser}]),
{new, Q};
{error, timeout} ->
{protocol_error, internal_error,
"Could not declare ~ts because the metadata store operation "
"timed out",
[rabbit_misc:rs(QName)]};
Other ->
Other
end.
Expand Down

0 comments on commit dfae1e3

Please sign in to comment.