Skip to content

Commit

Permalink
Handle database timeouts in rabbit_amqqueue:store_queue/1
Browse files Browse the repository at this point in the history
(cherry picked from commit 8eef209)
(cherry picked from commit 1cd66e8)
  • Loading branch information
the-mikedavis authored and mergify[bot] committed Aug 15, 2024
1 parent 1ea7f70 commit 5f46547
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 24 deletions.
30 changes: 20 additions & 10 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,19 @@ internal_declare(Q, Recover) ->
do_internal_declare(Q, Recover).

do_internal_declare(Q0, true) ->
%% TODO Why do we return the old state instead of the actual one?
%% I'm leaving it like it was before the khepri refactor, because
%% rabbit_amqqueue_process:init_it2 compares the result of this declare to decide
%% if continue or stop. If we return the actual one, it fails and the queue stops
%% silently during init.
%% Maybe we should review this bit of code at some point.
Q = amqqueue:set_state(Q0, live),
ok = store_queue(Q),
{created, Q0};
case store_queue(Q) of
ok ->
%% TODO Why do we return the old state instead of the actual one?
%% I'm leaving it like it was before the khepri refactor, because
%% rabbit_amqqueue_process:init_it2 compares the result of this
%% declare to decide if continue or stop. If we return the actual
%% one, it fails and the queue stops silently during init.
%% Maybe we should review this bit of code at some point.
{created, Q0};
{error, timeout} = Err ->
Err
end;
do_internal_declare(Q0, false) ->
Q = rabbit_policy:set(amqqueue:set_state(Q0, live)),
Queue = rabbit_queue_decorator:set(Q),
Expand All @@ -310,12 +314,18 @@ do_internal_declare(Q0, false) ->
update(Name, Fun) ->
rabbit_db_queue:update(Name, Fun).

%% only really used for quorum queues to ensure the rabbit_queue record
-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when
Queue :: amqqueue:amqqueue(),
Ret :: ok | {error, timeout}.

%% only really used for stream queues to ensure the rabbit_queue record
%% is initialised
ensure_rabbit_queue_record_is_initialized(Q) ->
store_queue(Q).

-spec store_queue(amqqueue:amqqueue()) -> 'ok'.
-spec store_queue(Queue) -> Ret when
Queue :: amqqueue:amqqueue(),
Ret :: ok | {error, timeout}.

store_queue(Q0) ->
Q = rabbit_queue_decorator:set(Q0),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ terminate(normal, State) -> %% delete case
terminate(_Reason, State = #q{q = Q}) ->
terminate_shutdown(fun (BQS) ->
Q2 = amqqueue:set_state(Q, crashed),
rabbit_amqqueue:store_queue(Q2),
_ = rabbit_amqqueue:store_queue(Q2),
BQS
end, State).

Expand Down
5 changes: 3 additions & 2 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -942,8 +942,9 @@ create_or_get_in_khepri(Q) ->
%% set().
%% -------------------------------------------------------------------

-spec set(Queue) -> ok when
Queue :: amqqueue:amqqueue().
-spec set(Queue) -> Ret when
Queue :: amqqueue:amqqueue(),
Ret :: ok | rabbit_khepri:timeout_error().
%% @doc Writes a queue record. If the queue is durable, it writes both instances:
%% durable and transient. For the durable one, it resets mirrors and decorators.
%% The transient one is left as it is.
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,7 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
#{name := S} when S == StreamId ->
rabbit_log:debug("~ts: initializing queue record for stream id ~ts",
[?MODULE, StreamId]),
_ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
ok = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
ok;
_ ->
ok
Expand Down
32 changes: 22 additions & 10 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,23 @@ create_stream(Q0) ->
case rabbit_stream_coordinator:new_stream(Q, Leader) of
{ok, {ok, LeaderPid}, _} ->
%% update record with leader pid
set_leader_pid(LeaderPid, amqqueue:get_name(Q)),
rabbit_event:notify(queue_created,
[{name, QName},
{durable, true},
{auto_delete, false},
{arguments, Arguments},
{type, amqqueue:get_type(Q1)},
{user_who_performed_action,
ActingUser}]),
{new, Q};
case set_leader_pid(LeaderPid, amqqueue:get_name(Q)) of
ok ->
rabbit_event:notify(queue_created,
[{name, QName},
{durable, true},
{auto_delete, false},
{arguments, Arguments},
{type, amqqueue:get_type(Q1)},
{user_who_performed_action,
ActingUser}]),
{new, Q};
{error, timeout} ->
{protocol_error, internal_error,
"Could not set leader PID for ~ts on node '~ts' "
"because the metadata store operation timed out",
[rabbit_misc:rs(QName), node()]}
end;
Error ->
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
{protocol_error, internal_error, "Cannot declare ~ts on node '~ts': ~255p",
Expand Down Expand Up @@ -1183,6 +1190,11 @@ resend_all(#stream_client{leader = LeaderPid,
end || {Seq, Msg} <- Msgs],
State.

-spec set_leader_pid(Pid, QName) -> Ret when
Pid :: pid(),
QName :: rabbit_amqqueue:name(),
Ret :: ok | {error, timeout}.

set_leader_pid(Pid, QName) ->
%% TODO this should probably be a single khepri transaction for better performance.
Fun = fun (Q) ->
Expand Down

0 comments on commit 5f46547

Please sign in to comment.