Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle queue declaration failures in Khepri minority (backport #11980) #12019

Merged
merged 4 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -252,22 +252,30 @@ get_queue_type(Args, DefaultQueueType) ->
rabbit_queue_type:discover(V)
end.

-spec internal_declare(amqqueue:amqqueue(), boolean()) ->
{created | existing, amqqueue:amqqueue()} | queue_absent().
-spec internal_declare(Queue, Recover) -> Ret when
Queue :: amqqueue:amqqueue(),
Recover :: boolean(),
Ret :: {created | existing, amqqueue:amqqueue()} |
queue_absent() |
rabbit_khepri:timeout_error().

internal_declare(Q, Recover) ->
do_internal_declare(Q, Recover).

do_internal_declare(Q0, true) ->
%% TODO Why do we return the old state instead of the actual one?
%% I'm leaving it like it was before the khepri refactor, because
%% rabbit_amqqueue_process:init_it2 compares the result of this declare to decide
%% if continue or stop. If we return the actual one, it fails and the queue stops
%% silently during init.
%% Maybe we should review this bit of code at some point.
Q = amqqueue:set_state(Q0, live),
ok = store_queue(Q),
{created, Q0};
case store_queue(Q) of
ok ->
%% TODO Why do we return the old state instead of the actual one?
%% I'm leaving it like it was before the khepri refactor, because
%% rabbit_amqqueue_process:init_it2 compares the result of this
%% declare to decide if continue or stop. If we return the actual
%% one, it fails and the queue stops silently during init.
%% Maybe we should review this bit of code at some point.
{created, Q0};
{error, timeout} = Err ->
Err
end;
do_internal_declare(Q0, false) ->
Q = rabbit_policy:set(amqqueue:set_state(Q0, live)),
Queue = rabbit_queue_decorator:set(Q),
Expand All @@ -280,12 +288,18 @@ do_internal_declare(Q0, false) ->
update(Name, Fun) ->
rabbit_db_queue:update(Name, Fun).

%% only really used for quorum queues to ensure the rabbit_queue record
-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when
Queue :: amqqueue:amqqueue(),
Ret :: ok | {error, timeout}.

%% only really used for stream queues to ensure the rabbit_queue record
%% is initialised
ensure_rabbit_queue_record_is_initialized(Q) ->
store_queue(Q).

-spec store_queue(amqqueue:amqqueue()) -> 'ok'.
-spec store_queue(Queue) -> Ret when
Queue :: amqqueue:amqqueue(),
Ret :: ok | {error, timeout}.

store_queue(Q0) ->
Q = rabbit_queue_decorator:set(Q0),
Expand Down Expand Up @@ -325,12 +339,10 @@ is_server_named_allowed(Args) ->
Type = get_queue_type(Args),
rabbit_queue_type:is_server_named_allowed(Type).

-spec lookup
(name()) ->
rabbit_types:ok(amqqueue:amqqueue()) |
rabbit_types:error('not_found');
([name()]) ->
[amqqueue:amqqueue()].
-spec lookup(QueueName) -> Ret when
QueueName :: name(),
Ret :: rabbit_types:ok(amqqueue:amqqueue())
| rabbit_types:error('not_found').

lookup(Name) when is_record(Name, resource) ->
rabbit_db_queue:get(Name).
Expand Down
8 changes: 7 additions & 1 deletion deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ init_it2(Recover, From, State = #q{q = Q,
false ->
{stop, normal, {existing, Q1}, State}
end;
{error, timeout} ->
Reason = {protocol_error, internal_error,
"Could not declare ~ts on node '~ts' because the "
"metadata store operation timed out",
[rabbit_misc:rs(amqqueue:get_name(Q)), node()]},
{stop, normal, Reason, State};
Err ->
{stop, normal, Err, State}
end.
Expand Down Expand Up @@ -311,7 +317,7 @@ terminate(normal, State) -> %% delete case
terminate(_Reason, State = #q{q = Q}) ->
terminate_shutdown(fun (BQS) ->
Q2 = amqqueue:set_state(Q, crashed),
rabbit_amqqueue:store_queue(Q2),
_ = rabbit_amqqueue:store_queue(Q2),
BQS
end, State).

Expand Down
10 changes: 7 additions & 3 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,10 @@ get_all_by_type_and_node_in_khepri(VHostName, Type, Node) ->

-spec create_or_get(Queue) -> Ret when
Queue :: amqqueue:amqqueue(),
Ret :: {created, Queue} | {existing, Queue} | {absent, Queue, nodedown}.
Ret :: {created, Queue} |
{existing, Queue} |
{absent, Queue, nodedown} |
rabbit_khepri:timeout_error().
%% @doc Writes a queue record if it doesn't exist already or returns the existing one
%%
%% @returns the existing record if there is one in the database already, or the newly
Expand Down Expand Up @@ -924,8 +927,9 @@ create_or_get_in_khepri(Q) ->
%% set().
%% -------------------------------------------------------------------

-spec set(Queue) -> ok when
Queue :: amqqueue:amqqueue().
-spec set(Queue) -> Ret when
Queue :: amqqueue:amqqueue(),
Ret :: ok | rabbit_khepri:timeout_error().
%% @doc Writes a queue record. If the queue is durable, it writes both instances:
%% durable and transient. For the durable one, it resets decorators.
%% The transient one is left as it is.
Expand Down
7 changes: 6 additions & 1 deletion deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,12 @@ start_cluster(Q) ->
declare_queue_error(Error, NewQ, LeaderNode, ActingUser)
end;
{existing, _} = Ex ->
Ex
Ex;
{error, timeout} ->
{protocol_error, internal_error,
"Could not declare quorum ~ts on node '~ts' because the metadata "
"store operation timed out",
[rabbit_misc:rs(QName), node()]}
end.

declare_queue_error(Error, Queue, Leader, ActingUser) ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,7 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
#{name := S} when S == StreamId ->
rabbit_log:debug("~ts: initializing queue record for stream id ~ts",
[?MODULE, StreamId]),
_ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
ok = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
ok;
_ ->
ok
Expand Down
41 changes: 29 additions & 12 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -177,25 +177,37 @@ create_stream(Q0) ->
case rabbit_stream_coordinator:new_stream(Q, Leader) of
{ok, {ok, LeaderPid}, _} ->
%% update record with leader pid
set_leader_pid(LeaderPid, amqqueue:get_name(Q)),
rabbit_event:notify(queue_created,
[{name, QName},
{durable, true},
{auto_delete, false},
{arguments, Arguments},
{type, amqqueue:get_type(Q1)},
{user_who_performed_action,
ActingUser}]),
{new, Q};
case set_leader_pid(LeaderPid, amqqueue:get_name(Q)) of
ok ->
rabbit_event:notify(queue_created,
[{name, QName},
{durable, true},
{auto_delete, false},
{arguments, Arguments},
{type, amqqueue:get_type(Q1)},
{user_who_performed_action,
ActingUser}]),
{new, Q};
{error, timeout} ->
{protocol_error, internal_error,
"Could not set leader PID for ~ts on node '~ts' "
"because the metadata store operation timed out",
[rabbit_misc:rs(QName), node()]}
end;
Error ->
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
{protocol_error, internal_error, "Cannot declare a queue '~ts' on node '~ts': ~255p",
{protocol_error, internal_error, "Cannot declare ~ts on node '~ts': ~255p",
[rabbit_misc:rs(QName), node(), Error]}
end;
{existing, Q} ->
{existing, Q};
{absent, Q, Reason} ->
{absent, Q, Reason}
{absent, Q, Reason};
{error, timeout} ->
{protocol_error, internal_error,
"Could not declare ~ts on node '~ts' because the metadata store "
"operation timed out",
[rabbit_misc:rs(QName), node()]}
end.

-spec delete(amqqueue:amqqueue(), boolean(),
Expand Down Expand Up @@ -1291,6 +1303,11 @@ resend_all(#stream_client{leader = LeaderPid,
end || {Seq, Msg} <- Msgs],
State.

-spec set_leader_pid(Pid, QName) -> Ret when
Pid :: pid(),
QName :: rabbit_amqqueue:name(),
Ret :: ok | {error, timeout}.

set_leader_pid(Pid, QName) ->
%% TODO this should probably be a single khepri transaction for better performance.
Fun = fun (Q) ->
Expand Down
11 changes: 9 additions & 2 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ is_stateful() ->

-spec declare(amqqueue:amqqueue(), node()) ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}.
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()} |
{protocol_error, internal_error, string(), [string()]}.
declare(Q0, _Node) ->
QName = amqqueue:get_name(Q0),
Q1 = case amqqueue:get_pid(Q0) of
none ->
%% declaring process becomes the queue
Expand All @@ -86,14 +88,19 @@ declare(Q0, _Node) ->
Opts = amqqueue:get_options(Q),
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
rabbit_event:notify(queue_created,
[{name, amqqueue:get_name(Q)},
[{name, QName},
{durable, true},
{auto_delete, false},
{exclusive, true},
{type, amqqueue:get_type(Q)},
{arguments, amqqueue:get_arguments(Q)},
{user_who_performed_action, ActingUser}]),
{new, Q};
{error, timeout} ->
{protocol_error, internal_error,
"Could not declare ~ts because the metadata store operation "
"timed out",
[rabbit_misc:rs(QName)]};
Other ->
Other
end.
Expand Down
Loading