Skip to content

Commit

Permalink
rabbit_db_binding: Use a sproc for binding deletion
Browse files Browse the repository at this point in the history
Note that the ChecksFun arg was always passed `fun(_, _,) -> ok end` so
we eliminate this part of the deletion transaction.
  • Loading branch information
the-mikedavis committed Oct 2, 2024
1 parent 9f757ab commit c429d26
Showing 1 changed file with 39 additions and 30 deletions.
69 changes: 39 additions & 30 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

-module(rabbit_db_binding).

-feature(maybe_expr, enable).

-include_lib("khepri/include/khepri.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

Expand Down Expand Up @@ -59,6 +61,7 @@
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding).
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).
-define(KHEPRI_CREATION_SPROC_PATH, [rabbitmq, sprocs, ?MODULE, create]).
-define(KHEPRI_DELETION_SPROC_PATH, [rabbitmq, sprocs, ?MODULE, delete]).

%% -------------------------------------------------------------------
%% setup().
Expand All @@ -67,7 +70,10 @@
-spec setup() -> ok | rabbit_khepri:timeout_error().

setup() ->
rabbit_khepri:put(?KHEPRI_CREATION_SPROC_PATH, create_in_khepri_tx_fn()).
maybe
ok ?= rabbit_khepri:put(?KHEPRI_CREATION_SPROC_PATH, create_in_khepri_tx_fn()),
ok ?= rabbit_khepri:put(?KHEPRI_DELETION_SPROC_PATH, delete_in_khepri_tx_fn())
end.

%% -------------------------------------------------------------------
%% exists().
Expand Down Expand Up @@ -293,7 +299,7 @@ serial_in_khepri(true, X) ->
delete(Binding, ChecksFun) ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> delete_in_mnesia(Binding, ChecksFun) end,
khepri => fun() -> delete_in_khepri(Binding, ChecksFun) end
khepri => fun() -> delete_in_khepri(Binding) end
}).

delete_in_mnesia(Binding, ChecksFun) ->
Expand Down Expand Up @@ -350,49 +356,52 @@ not_found_or_absent_in_mnesia(#resource{kind = queue} = Name) ->
{ok, Q} -> {absent, Q, nodedown}
end.

delete_in_khepri(#binding{source = SrcName,
destination = DstName} = Binding, ChecksFun) ->
Path = khepri_route_path(Binding),
case rabbit_khepri:transaction(
fun () ->
case {lookup_resource_in_khepri_tx(SrcName),
lookup_resource_in_khepri_tx(DstName)} of
{[Src], [Dst]} ->
case exists_in_khepri(Path, Binding) of
false ->
ok;
true ->
case ChecksFun(Src, Dst) of
ok ->
ok = delete_in_khepri(Binding),
maybe_auto_delete_exchange_in_khepri(Binding#binding.source, [Binding], rabbit_binding:new_deletions());
{error, _} = Err ->
Err
end
end;
_Errs ->
%% No absent queues, always present on disk
ok
end
end) of
delete_in_khepri(Binding) ->
case khepri:transaction(
rabbitmq_metadata,
?KHEPRI_DELETION_SPROC_PATH,
[Binding], rw, #{}) of
ok ->
ok;
{error, _} = Err ->
Err;
Deletions ->
{ok, Deletions} ->
ok = rabbit_binding:process_deletions(Deletions),
{ok, Deletions}
end.

exists_in_khepri(Path, Binding) ->
delete_in_khepri_tx_fn() ->
fun(#binding{source = SrcName, destination = DstName} = Binding) ->
Path = khepri_route_path(Binding),
%% TODO: Can we remove this as well?
case {lookup_resource_in_khepri_tx(SrcName),
lookup_resource_in_khepri_tx(DstName)} of
{[_Src], [_Dst]} ->
case exists_in_khepri_tx(Path, Binding) of
false ->
ok;
true ->
ok = delete_in_khepri_tx(Binding),
maybe_auto_delete_exchange_in_khepri(
Binding#binding.source,
[Binding],
rabbit_binding:new_deletions())
end;
_Errs ->
%% No absent queues, always present on disk
ok
end
end.

exists_in_khepri_tx(Path, Binding) ->
case khepri_tx:get(Path) of
{ok, Set} ->
sets:is_element(Binding, Set);
_ ->
false
end.

delete_in_khepri(Binding) ->
delete_in_khepri_tx(Binding) ->
Path = khepri_route_path(Binding),
case khepri_tx:get(Path) of
{ok, Set0} ->
Expand Down

0 comments on commit c429d26

Please sign in to comment.