Skip to content

Commit

Permalink
Use a sproc for binding creation
Browse files Browse the repository at this point in the history
  • Loading branch information
the-mikedavis committed Oct 2, 2024
1 parent 49fb8cd commit 9f757ab
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 27 deletions.
68 changes: 42 additions & 26 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
-include_lib("khepri/include/khepri.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

-export([exists/1,
-export([setup/0,

exists/1,
create/2,
delete/2,
get_all/0,
Expand Down Expand Up @@ -56,6 +58,16 @@
-define(MNESIA_INDEX_TABLE, rabbit_index_route).
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding).
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).
-define(KHEPRI_CREATION_SPROC_PATH, [rabbitmq, sprocs, ?MODULE, create]).

%% -------------------------------------------------------------------
%% setup().
%% -------------------------------------------------------------------

-spec setup() -> ok | rabbit_khepri:timeout_error().

setup() ->
rabbit_khepri:put(?KHEPRI_CREATION_SPROC_PATH, create_in_khepri_tx_fn()).

%% -------------------------------------------------------------------
%% exists().
Expand Down Expand Up @@ -198,32 +210,11 @@ create_in_khepri(#binding{source = SrcName,
{[Src], [Dst]} ->
case ChecksFun(Src, Dst) of
ok ->
RoutePath = khepri_route_path(Binding),
QueuePath = rabbit_db_queue:khepri_queue_path(DstName),
MaybeSerial = rabbit_exchange:serialise_events(Src),
Serial = rabbit_khepri:transaction(
fun() ->
%% Tie the lifetime of the binding to
%% the lifetime of the queue.
KeepWhile = #{QueuePath => #if_node_exists{exists = true}},
Options = #{keep_while => KeepWhile},
case khepri_tx:get(RoutePath) of
{ok, Set} ->
case sets:is_element(Binding, Set) of
true ->
already_exists;
false ->
Set1 = sets:add_element(Binding, Set),
ok = khepri_tx:put(RoutePath, Set1, Options),
serial_in_khepri(MaybeSerial, Src)
end;
_ ->
Set = sets:new([{version, 2}]),
Set1 = sets:add_element(Binding, Set),
ok = khepri_tx:put(RoutePath, Set1, Options),
serial_in_khepri(MaybeSerial, Src)
end
end, rw),
Serial = khepri:transaction(
rabbitmq_metadata,
?KHEPRI_CREATION_SPROC_PATH,
[Binding, Src, DstName, MaybeSerial], rw, #{}),
case Serial of
already_exists ->
ok;
Expand All @@ -239,6 +230,31 @@ create_in_khepri(#binding{source = SrcName,
not_found_errs_in_khepri(not_found(Errs, SrcName, DstName))
end.

create_in_khepri_tx_fn() ->
fun(Binding, Src, DstName, MaybeSerial) ->
RoutePath = khepri_route_path(Binding),
QueuePath = rabbit_db_queue:khepri_queue_path(DstName),
%% Tie the lifetime of the binding to the lifetime of the queue.
KeepWhile = #{QueuePath => #if_node_exists{exists = true}},
Options = #{keep_while => KeepWhile},
case khepri_tx:get(RoutePath) of
{ok, Set} ->
case sets:is_element(Binding, Set) of
true ->
already_exists;
false ->
Set1 = sets:add_element(Binding, Set),
ok = khepri_tx:put(RoutePath, Set1, Options),
serial_in_khepri(MaybeSerial, Src)
end;
_ ->
Set = sets:new([{version, 2}]),
Set1 = sets:add_element(Binding, Set),
ok = khepri_tx:put(RoutePath, Set1, Options),
serial_in_khepri(MaybeSerial, Src)
end
end.

lookup_resource(#resource{kind = queue} = Name) ->
case rabbit_db_queue:get(Name) of
{error, _} -> [];
Expand Down
8 changes: 7 additions & 1 deletion deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ init(IsVirgin) ->
%% while registering projections above though so this deletion
%% is likely to succeed.
ok ?= rabbit_amqqueue:delete_transient_queues_on_node(node()),
ok ?= rabbit_db_queue:setup()
ok ?= rabbit_db_queue:setup(),
ok ?= rabbit_db_binding:setup()
end
end.

Expand Down Expand Up @@ -1592,6 +1593,11 @@ khepri_db_migration_enable(#{feature_name := FeatureName}) ->
[FeatureName],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok ?= rabbit_db_queue:setup(),
?LOG_INFO(
"Feature flag `~s`: setting up rabbit_db_binding",
[FeatureName],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok ?= rabbit_db_binding:setup(),
migrate_mnesia_tables(FeatureName)
end.

Expand Down

0 comments on commit 9f757ab

Please sign in to comment.