diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 3bdf8ad6a177..5dde1629fd64 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -252,22 +252,30 @@ get_queue_type(Args, DefaultQueueType) -> rabbit_queue_type:discover(V) end. --spec internal_declare(amqqueue:amqqueue(), boolean()) -> - {created | existing, amqqueue:amqqueue()} | queue_absent(). +-spec internal_declare(Queue, Recover) -> Ret when + Queue :: amqqueue:amqqueue(), + Recover :: boolean(), + Ret :: {created | existing, amqqueue:amqqueue()} | + queue_absent() | + rabbit_khepri:timeout_error(). internal_declare(Q, Recover) -> do_internal_declare(Q, Recover). do_internal_declare(Q0, true) -> - %% TODO Why do we return the old state instead of the actual one? - %% I'm leaving it like it was before the khepri refactor, because - %% rabbit_amqqueue_process:init_it2 compares the result of this declare to decide - %% if continue or stop. If we return the actual one, it fails and the queue stops - %% silently during init. - %% Maybe we should review this bit of code at some point. Q = amqqueue:set_state(Q0, live), - ok = store_queue(Q), - {created, Q0}; + case store_queue(Q) of + ok -> + %% TODO Why do we return the old state instead of the actual one? + %% I'm leaving it like it was before the khepri refactor, because + %% rabbit_amqqueue_process:init_it2 compares the result of this + %% declare to decide if continue or stop. If we return the actual + %% one, it fails and the queue stops silently during init. + %% Maybe we should review this bit of code at some point. + {created, Q0}; + {error, timeout} = Err -> + Err + end; do_internal_declare(Q0, false) -> Q = rabbit_policy:set(amqqueue:set_state(Q0, live)), Queue = rabbit_queue_decorator:set(Q), @@ -280,12 +288,18 @@ do_internal_declare(Q0, false) -> update(Name, Fun) -> rabbit_db_queue:update(Name, Fun). -%% only really used for quorum queues to ensure the rabbit_queue record +-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when + Queue :: amqqueue:amqqueue(), + Ret :: ok | {error, timeout}. + +%% only really used for stream queues to ensure the rabbit_queue record %% is initialised ensure_rabbit_queue_record_is_initialized(Q) -> store_queue(Q). --spec store_queue(amqqueue:amqqueue()) -> 'ok'. +-spec store_queue(Queue) -> Ret when + Queue :: amqqueue:amqqueue(), + Ret :: ok | {error, timeout}. store_queue(Q0) -> Q = rabbit_queue_decorator:set(Q0), @@ -325,12 +339,10 @@ is_server_named_allowed(Args) -> Type = get_queue_type(Args), rabbit_queue_type:is_server_named_allowed(Type). --spec lookup - (name()) -> - rabbit_types:ok(amqqueue:amqqueue()) | - rabbit_types:error('not_found'); - ([name()]) -> - [amqqueue:amqqueue()]. +-spec lookup(QueueName) -> Ret when + QueueName :: name(), + Ret :: rabbit_types:ok(amqqueue:amqqueue()) + | rabbit_types:error('not_found'). lookup(Name) when is_record(Name, resource) -> rabbit_db_queue:get(Name). diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index e2334235c335..ed4fc3ccaf78 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -226,6 +226,12 @@ init_it2(Recover, From, State = #q{q = Q, false -> {stop, normal, {existing, Q1}, State} end; + {error, timeout} -> + Reason = {protocol_error, internal_error, + "Could not declare ~ts on node '~ts' because the " + "metadata store operation timed out", + [rabbit_misc:rs(amqqueue:get_name(Q)), node()]}, + {stop, normal, Reason, State}; Err -> {stop, normal, Err, State} end. @@ -311,7 +317,7 @@ terminate(normal, State) -> %% delete case terminate(_Reason, State = #q{q = Q}) -> terminate_shutdown(fun (BQS) -> Q2 = amqqueue:set_state(Q, crashed), - rabbit_amqqueue:store_queue(Q2), + _ = rabbit_amqqueue:store_queue(Q2), BQS end, State). diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index d1e1829d5873..da5c8da47dff 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -875,7 +875,10 @@ get_all_by_type_and_node_in_khepri(VHostName, Type, Node) -> -spec create_or_get(Queue) -> Ret when Queue :: amqqueue:amqqueue(), - Ret :: {created, Queue} | {existing, Queue} | {absent, Queue, nodedown}. + Ret :: {created, Queue} | + {existing, Queue} | + {absent, Queue, nodedown} | + rabbit_khepri:timeout_error(). %% @doc Writes a queue record if it doesn't exist already or returns the existing one %% %% @returns the existing record if there is one in the database already, or the newly @@ -924,8 +927,9 @@ create_or_get_in_khepri(Q) -> %% set(). %% ------------------------------------------------------------------- --spec set(Queue) -> ok when - Queue :: amqqueue:amqqueue(). +-spec set(Queue) -> Ret when + Queue :: amqqueue:amqqueue(), + Ret :: ok | rabbit_khepri:timeout_error(). %% @doc Writes a queue record. If the queue is durable, it writes both instances: %% durable and transient. For the durable one, it resets decorators. %% The transient one is left as it is. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index e9a492a66881..88205a82942b 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -295,7 +295,12 @@ start_cluster(Q) -> declare_queue_error(Error, NewQ, LeaderNode, ActingUser) end; {existing, _} = Ex -> - Ex + Ex; + {error, timeout} -> + {protocol_error, internal_error, + "Could not declare quorum ~ts on node '~ts' because the metadata " + "store operation timed out", + [rabbit_misc:rs(QName), node()]} end. declare_queue_error(Error, Queue, Leader, ActingUser) -> diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 954030b98581..0846dd58d1e0 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -1231,7 +1231,7 @@ phase_update_mnesia(StreamId, Args, #{reference := QName, #{name := S} when S == StreamId -> rabbit_log:debug("~ts: initializing queue record for stream id ~ts", [?MODULE, StreamId]), - _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)), + ok = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)), ok; _ -> ok diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 37f3b52e2e42..e36ad708eb9a 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -177,25 +177,37 @@ create_stream(Q0) -> case rabbit_stream_coordinator:new_stream(Q, Leader) of {ok, {ok, LeaderPid}, _} -> %% update record with leader pid - set_leader_pid(LeaderPid, amqqueue:get_name(Q)), - rabbit_event:notify(queue_created, - [{name, QName}, - {durable, true}, - {auto_delete, false}, - {arguments, Arguments}, - {type, amqqueue:get_type(Q1)}, - {user_who_performed_action, - ActingUser}]), - {new, Q}; + case set_leader_pid(LeaderPid, amqqueue:get_name(Q)) of + ok -> + rabbit_event:notify(queue_created, + [{name, QName}, + {durable, true}, + {auto_delete, false}, + {arguments, Arguments}, + {type, amqqueue:get_type(Q1)}, + {user_who_performed_action, + ActingUser}]), + {new, Q}; + {error, timeout} -> + {protocol_error, internal_error, + "Could not set leader PID for ~ts on node '~ts' " + "because the metadata store operation timed out", + [rabbit_misc:rs(QName), node()]} + end; Error -> _ = rabbit_amqqueue:internal_delete(Q, ActingUser), - {protocol_error, internal_error, "Cannot declare a queue '~ts' on node '~ts': ~255p", + {protocol_error, internal_error, "Cannot declare ~ts on node '~ts': ~255p", [rabbit_misc:rs(QName), node(), Error]} end; {existing, Q} -> {existing, Q}; {absent, Q, Reason} -> - {absent, Q, Reason} + {absent, Q, Reason}; + {error, timeout} -> + {protocol_error, internal_error, + "Could not declare ~ts on node '~ts' because the metadata store " + "operation timed out", + [rabbit_misc:rs(QName), node()]} end. -spec delete(amqqueue:amqqueue(), boolean(), @@ -1291,6 +1303,11 @@ resend_all(#stream_client{leader = LeaderPid, end || {Seq, Msg} <- Msgs], State. +-spec set_leader_pid(Pid, QName) -> Ret when + Pid :: pid(), + QName :: rabbit_amqqueue:name(), + Ret :: ok | {error, timeout}. + set_leader_pid(Pid, QName) -> %% TODO this should probably be a single khepri transaction for better performance. Fun = fun (Q) -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl index 920276966c6c..47cf18e976a2 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl @@ -70,8 +70,10 @@ is_stateful() -> -spec declare(amqqueue:amqqueue(), node()) -> {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | - {'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}. + {'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()} | + {protocol_error, internal_error, string(), [string()]}. declare(Q0, _Node) -> + QName = amqqueue:get_name(Q0), Q1 = case amqqueue:get_pid(Q0) of none -> %% declaring process becomes the queue @@ -86,7 +88,7 @@ declare(Q0, _Node) -> Opts = amqqueue:get_options(Q), ActingUser = maps:get(user, Opts, ?UNKNOWN_USER), rabbit_event:notify(queue_created, - [{name, amqqueue:get_name(Q)}, + [{name, QName}, {durable, true}, {auto_delete, false}, {exclusive, true}, @@ -94,6 +96,11 @@ declare(Q0, _Node) -> {arguments, amqqueue:get_arguments(Q)}, {user_who_performed_action, ActingUser}]), {new, Q}; + {error, timeout} -> + {protocol_error, internal_error, + "Could not declare ~ts because the metadata store operation " + "timed out", + [rabbit_misc:rs(QName)]}; Other -> Other end.