diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index a1a3c65da44c..ae6c83faec90 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -289,15 +289,19 @@ internal_declare(Q, Recover) -> do_internal_declare(Q, Recover). do_internal_declare(Q0, true) -> - %% TODO Why do we return the old state instead of the actual one? - %% I'm leaving it like it was before the khepri refactor, because - %% rabbit_amqqueue_process:init_it2 compares the result of this declare to decide - %% if continue or stop. If we return the actual one, it fails and the queue stops - %% silently during init. - %% Maybe we should review this bit of code at some point. Q = amqqueue:set_state(Q0, live), - ok = store_queue(Q), - {created, Q0}; + case store_queue(Q) of + ok -> + %% TODO Why do we return the old state instead of the actual one? + %% I'm leaving it like it was before the khepri refactor, because + %% rabbit_amqqueue_process:init_it2 compares the result of this + %% declare to decide if continue or stop. If we return the actual + %% one, it fails and the queue stops silently during init. + %% Maybe we should review this bit of code at some point. + {created, Q0}; + {error, timeout} = Err -> + Err + end; do_internal_declare(Q0, false) -> Q = rabbit_policy:set(amqqueue:set_state(Q0, live)), Queue = rabbit_queue_decorator:set(Q), @@ -310,12 +314,18 @@ do_internal_declare(Q0, false) -> update(Name, Fun) -> rabbit_db_queue:update(Name, Fun). -%% only really used for quorum queues to ensure the rabbit_queue record +-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when + Queue :: amqqueue:amqqueue(), + Ret :: ok | {error, timeout}. + +%% only really used for stream queues to ensure the rabbit_queue record %% is initialised ensure_rabbit_queue_record_is_initialized(Q) -> store_queue(Q). --spec store_queue(amqqueue:amqqueue()) -> 'ok'. +-spec store_queue(Queue) -> Ret when + Queue :: amqqueue:amqqueue(), + Ret :: ok | {error, timeout}. store_queue(Q0) -> Q = rabbit_queue_decorator:set(Q0), diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 0cbde4e8047d..bd85bf562cd9 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -311,7 +311,7 @@ terminate(normal, State) -> %% delete case terminate(_Reason, State = #q{q = Q}) -> terminate_shutdown(fun (BQS) -> Q2 = amqqueue:set_state(Q, crashed), - rabbit_amqqueue:store_queue(Q2), + _ = rabbit_amqqueue:store_queue(Q2), BQS end, State). diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index be0a0d835f6b..d86268b2b780 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -942,8 +942,9 @@ create_or_get_in_khepri(Q) -> %% set(). %% ------------------------------------------------------------------- --spec set(Queue) -> ok when - Queue :: amqqueue:amqqueue(). +-spec set(Queue) -> Ret when + Queue :: amqqueue:amqqueue(), + Ret :: ok | rabbit_khepri:timeout_error(). %% @doc Writes a queue record. If the queue is durable, it writes both instances: %% durable and transient. For the durable one, it resets mirrors and decorators. %% The transient one is left as it is. diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 3859875df43e..cf49164fe171 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -1241,7 +1241,7 @@ phase_update_mnesia(StreamId, Args, #{reference := QName, #{name := S} when S == StreamId -> rabbit_log:debug("~ts: initializing queue record for stream id ~ts", [?MODULE, StreamId]), - _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)), + ok = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)), ok; _ -> ok diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index cc2b13cf52a3..d6ab6db327d2 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -169,16 +169,23 @@ create_stream(Q0) -> case rabbit_stream_coordinator:new_stream(Q, Leader) of {ok, {ok, LeaderPid}, _} -> %% update record with leader pid - set_leader_pid(LeaderPid, amqqueue:get_name(Q)), - rabbit_event:notify(queue_created, - [{name, QName}, - {durable, true}, - {auto_delete, false}, - {arguments, Arguments}, - {type, amqqueue:get_type(Q1)}, - {user_who_performed_action, - ActingUser}]), - {new, Q}; + case set_leader_pid(LeaderPid, amqqueue:get_name(Q)) of + ok -> + rabbit_event:notify(queue_created, + [{name, QName}, + {durable, true}, + {auto_delete, false}, + {arguments, Arguments}, + {type, amqqueue:get_type(Q1)}, + {user_who_performed_action, + ActingUser}]), + {new, Q}; + {error, timeout} -> + {protocol_error, internal_error, + "Could not set leader PID for ~ts on node '~ts' " + "because the metadata store operation timed out", + [rabbit_misc:rs(QName), node()]} + end; Error -> _ = rabbit_amqqueue:internal_delete(Q, ActingUser), {protocol_error, internal_error, "Cannot declare ~ts on node '~ts': ~255p", @@ -1183,6 +1190,11 @@ resend_all(#stream_client{leader = LeaderPid, end || {Seq, Msg} <- Msgs], State. +-spec set_leader_pid(Pid, QName) -> Ret when + Pid :: pid(), + QName :: rabbit_amqqueue:name(), + Ret :: ok | {error, timeout}. + set_leader_pid(Pid, QName) -> %% TODO this should probably be a single khepri transaction for better performance. Fun = fun (Q) ->