diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index 0c92f65d4c2..96102645939 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -7,6 +7,8 @@ -module(rabbit_db_binding). +-feature(maybe_expr, enable). + -include_lib("khepri/include/khepri.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -59,6 +61,7 @@ -define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding). -define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route). -define(KHEPRI_CREATION_SPROC_PATH, [rabbitmq, sprocs, ?MODULE, create]). +-define(KHEPRI_DELETION_SPROC_PATH, [rabbitmq, sprocs, ?MODULE, delete]). %% ------------------------------------------------------------------- %% setup(). @@ -67,7 +70,10 @@ -spec setup() -> ok | rabbit_khepri:timeout_error(). setup() -> - rabbit_khepri:put(?KHEPRI_CREATION_SPROC_PATH, create_in_khepri_tx_fn()). + maybe + ok ?= rabbit_khepri:put(?KHEPRI_CREATION_SPROC_PATH, create_in_khepri_tx_fn()), + ok ?= rabbit_khepri:put(?KHEPRI_DELETION_SPROC_PATH, delete_in_khepri_tx_fn()) + end. %% ------------------------------------------------------------------- %% exists(). @@ -293,7 +299,7 @@ serial_in_khepri(true, X) -> delete(Binding, ChecksFun) -> rabbit_khepri:handle_fallback( #{mnesia => fun() -> delete_in_mnesia(Binding, ChecksFun) end, - khepri => fun() -> delete_in_khepri(Binding, ChecksFun) end + khepri => fun() -> delete_in_khepri(Binding) end }). delete_in_mnesia(Binding, ChecksFun) -> @@ -350,41 +356,44 @@ not_found_or_absent_in_mnesia(#resource{kind = queue} = Name) -> {ok, Q} -> {absent, Q, nodedown} end. -delete_in_khepri(#binding{source = SrcName, - destination = DstName} = Binding, ChecksFun) -> - Path = khepri_route_path(Binding), - case rabbit_khepri:transaction( - fun () -> - case {lookup_resource_in_khepri_tx(SrcName), - lookup_resource_in_khepri_tx(DstName)} of - {[Src], [Dst]} -> - case exists_in_khepri(Path, Binding) of - false -> - ok; - true -> - case ChecksFun(Src, Dst) of - ok -> - ok = delete_in_khepri(Binding), - maybe_auto_delete_exchange_in_khepri(Binding#binding.source, [Binding], rabbit_binding:new_deletions()); - {error, _} = Err -> - Err - end - end; - _Errs -> - %% No absent queues, always present on disk - ok - end - end) of +delete_in_khepri(Binding) -> + case khepri:transaction( + rabbitmq_metadata, + ?KHEPRI_DELETION_SPROC_PATH, + [Binding], rw, #{}) of ok -> ok; {error, _} = Err -> Err; - Deletions -> + {ok, Deletions} -> ok = rabbit_binding:process_deletions(Deletions), {ok, Deletions} end. -exists_in_khepri(Path, Binding) -> +delete_in_khepri_tx_fn() -> + fun(#binding{source = SrcName, destination = DstName} = Binding) -> + Path = khepri_route_path(Binding), + %% TODO: Can we remove this as well? + case {lookup_resource_in_khepri_tx(SrcName), + lookup_resource_in_khepri_tx(DstName)} of + {[_Src], [_Dst]} -> + case exists_in_khepri_tx(Path, Binding) of + false -> + ok; + true -> + ok = delete_in_khepri_tx(Binding), + maybe_auto_delete_exchange_in_khepri( + Binding#binding.source, + [Binding], + rabbit_binding:new_deletions()) + end; + _Errs -> + %% No absent queues, always present on disk + ok + end + end. + +exists_in_khepri_tx(Path, Binding) -> case khepri_tx:get(Path) of {ok, Set} -> sets:is_element(Binding, Set); @@ -392,7 +401,7 @@ exists_in_khepri(Path, Binding) -> false end. -delete_in_khepri(Binding) -> +delete_in_khepri_tx(Binding) -> Path = khepri_route_path(Binding), case khepri_tx:get(Path) of {ok, Set0} ->