Skip to content

Commit

Permalink
Represent rabbit_binding:deletions() with a map instead of dict
Browse files Browse the repository at this point in the history
The `dict:dict()` typing of `rabbit_binding` appears to be a historical
artifact. `dict` has been superseded by `maps`. Switching to a map
makes deletions easier to inspect manually and faster. Though if
deletions grow so large that the map representation is important,
manipulation of the deletions is unlikely to be expensive compared to
any other operations that produced them, so performance is probably
irrelevant.

This commit refactors the bottom section of the `rabbit_binding` module
to switch to a map, switch the `deletions()` type to an opaque,
eliminating a TODO created when using Erlang/OTP 17.1, and the deletion
value to a record. We eliminate some historical artifacts and "cruft":

* Deletions taking multiple forms needlessly, specifically the shape
  `{X, deleted | not_deleted, Bindings, none}` no longer being
  handled. `process_deletions/2` was responsible for creating this
  shape. Instead we now use a record to clearly define the fields.
* Clauses to catch `{error, not_found}` are unnecessary after minor
  refactors of the callers. Removing them makes the type specs cleaner.
* `rabbit_binding:process_deletions/1` has no need to update or change
  the deletions. This function uses `maps:foreach/2` instead and returns
  `ok` instead of mapped deletions.
* Remove `undefined` from the typespec of deletions. This value is no
  longer possible with a refactor to `maybe_auto_delete_exchange_in_*`
  functions for Mnesia and Khepri. The value was nonsensical since you
  cannot delete bindings for an exchange that does not exist.
  • Loading branch information
the-mikedavis committed Oct 1, 2024
1 parent fa5b738 commit 4aa68ca
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 131 deletions.
16 changes: 8 additions & 8 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1818,8 +1818,8 @@ internal_delete(Queue, ActingUser, Reason) ->
{error, timeout} = Err ->
Err;
Deletions ->
_ = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER),
ok = rabbit_binding:process_deletions(Deletions),
ok = rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER),
rabbit_core_metrics:queue_deleted(QueueName),
ok = rabbit_event:notify(queue_deleted,
[{name, QueueName},
Expand Down Expand Up @@ -1942,14 +1942,14 @@ filter_transient_queues_to_delete(Node) ->
end.

notify_queue_binding_deletions(QueueDeletions) when is_list(QueueDeletions) ->
Deletions = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
rabbit_binding:new_deletions(),
QueueDeletions)),
Deletions = lists:foldl(
fun rabbit_binding:combine_deletions/2,
rabbit_binding:new_deletions(), QueueDeletions),
ok = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER);
notify_queue_binding_deletions(QueueDeletions) ->
Deletions = rabbit_binding:process_deletions(QueueDeletions),
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER).
ok = rabbit_binding:process_deletions(QueueDeletions),
rabbit_binding:notify_deletions(QueueDeletions, ?INTERNAL_USER).

notify_transient_queues_deleted(QueueDeletions) ->
lists:foreach(
Expand Down
235 changes: 158 additions & 77 deletions deps/rabbit/src/rabbit_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-export([list/1, list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2, list_for_source_and_destination/3,
list_explicit/0]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
-export([new_deletions/0, combine_deletions/2, add_deletion/5,
process_deletions/1, notify_deletions/2, group_bindings_fold/3]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]).

Expand All @@ -22,6 +22,9 @@
-export([reverse_route/1, index_route/1]).
-export([binding_type/2]).

%% For testing only
-export([fetch_deletion/2]).

-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath,
kind = exchange,
name = <<>>}).
Expand Down Expand Up @@ -50,9 +53,12 @@
rabbit_types:ok_or_error(rabbit_types:amqp_error())).
-type bindings() :: [rabbit_types:binding()].

%% TODO this should really be opaque but that seems to confuse 17.1's
%% dialyzer into objecting to everything that uses it.
-type deletions() :: dict:dict().
-record(deletion, {exchange :: rabbit_types:exchange(),
%% Whether the exchange was deleted.
deleted :: boolean(),
bindings :: sets:set(rabbit_types:binding())}).

-opaque deletions() :: #{XName :: rabbit_exchange:name() => #deletion{}}.

%%----------------------------------------------------------------------------

Expand Down Expand Up @@ -159,6 +165,19 @@ binding_type0(false, true) ->
binding_type0(_, _) ->
transient.

binding_checks(Binding, InnerFun) ->
fun(Src, Dst) ->
case rabbit_exchange:validate_binding(Src, Binding) of
ok ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
InnerFun(Src, Dst);
Err ->
Err
end
end.

-spec remove(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
remove(Binding, ActingUser) -> remove(Binding, fun (_Src, _Dst) -> ok end, ActingUser).

Expand Down Expand Up @@ -360,93 +379,155 @@ index_route(#route{binding = #binding{source = Source,
%% ----------------------------------------------------------------------------
%% Binding / exchange deletion abstraction API
%% ----------------------------------------------------------------------------

anything_but( NotThis, NotThis, NotThis) -> NotThis;
anything_but( NotThis, NotThis, This) -> This;
anything_but( NotThis, This, NotThis) -> This;
anything_but(_NotThis, This, This) -> This.
%%
%% `deletions()' describe a set of removals of bindings and/or exchanges from
%% the metadata store.
%%
%% This deletion collection is used for two purposes:
%%
%% <ul>
%% <li>"<em>Processing</em>" of deletions. Processing here means that the
%% exchanges and bindings are passed into the {@link rabbit_exchange}
%% callbacks. When an exchange is deleted the `rabbit_exchange:delete/1'
%% callback is invoked and when the exchange is not deleted but some bindings
%% are deleted the `rabbit_exchange:remove_bindings/2' is invoked.</li>
%% <li><em>Notification</em> of metadata deletion. Like other internal
%% notifications, {@link rabbit_binding:notify_deletions()} uses {@link
%% rabbit_event} to notify any interested consumers of a resource deletion.
%% An example consumer of {@link rabbit_event} is the `rabbitmq_event_exchange'
%% plugin which publishes these notifications as messages.</li>
%% </ul>
%%
%% The point of collecting deletions into this opaque type is to be able to
%% collect all bindings deleted for a given exchange into a list. This allows
%% us to invoke the `rabbit_exchange:remove_bindings/2' callback with all
%% deleted bindings at once rather than passing each deleted binding
%% individually.

-spec new_deletions() -> deletions().

new_deletions() -> dict:new().

-spec add_deletion
(rabbit_exchange:name(),
{'undefined' | rabbit_types:exchange(),
'deleted' | 'not_deleted',
bindings()},
deletions()) ->
deletions().

add_deletion(XName, Entry, Deletions) ->
dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end,
Entry, Deletions).
new_deletions() -> #{}.

-spec add_deletion(XName, X, XDeleted, Bindings, Deletions) -> Deletions1
when
XName :: rabbit_exchange:name(),
X :: rabbit_types:exchange(),
XDeleted :: deleted | not_deleted,
Bindings :: bindings(),
Deletions :: deletions(),
Deletions1 :: deletions().

add_deletion(XName, X, WasDeleted, Bindings, Deletions)
when (WasDeleted =:= deleted orelse WasDeleted =:= not_deleted) andalso
is_list(Bindings) andalso is_map(Deletions) ->
WasDeleted1 = case WasDeleted of
deleted -> true;
not_deleted -> false
end,
Bindings1 = sets:from_list(Bindings, [{version, 2}]),
Deletion = #deletion{exchange = X,
deleted = WasDeleted1,
bindings = Bindings1},
maps:update_with(
XName,
fun(Deletion1) ->
merge_deletion(Deletion1, Deletion)
end, Deletion, Deletions).

-spec combine_deletions(deletions(), deletions()) -> deletions().

combine_deletions(Deletions1, Deletions2) ->
dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end,
Deletions1, Deletions2).

merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
{anything_but(undefined, X1, X2),
anything_but(not_deleted, Deleted1, Deleted2),
Bindings1 ++ Bindings2};
merge_entry({X1, Deleted1, Bindings1, none}, {X2, Deleted2, Bindings2, none}) ->
{anything_but(undefined, X1, X2),
anything_but(not_deleted, Deleted1, Deleted2),
Bindings1 ++ Bindings2, none}.

notify_deletions({error, not_found}, _) ->
ok;
notify_deletions(Deletions, ActingUser) ->
dict:fold(fun (XName, {_X, deleted, Bs, _}, ok) ->
notify_exchange_deletion(XName, ActingUser),
notify_bindings_deletion(Bs, ActingUser);
(_XName, {_X, not_deleted, Bs, _}, ok) ->
notify_bindings_deletion(Bs, ActingUser);
(XName, {_X, deleted, Bs}, ok) ->
combine_deletions(Deletions1, Deletions2)
when is_map(Deletions1) andalso is_map(Deletions2) ->
maps:merge_with(
fun (_XName, Deletion1, Deletion2) ->
merge_deletion(Deletion1, Deletion2)
end, Deletions1, Deletions2).

merge_deletion(
#deletion{deleted = Deleted1, bindings = Bindings1},
#deletion{exchange = X2, deleted = Deleted2, bindings = Bindings2}) ->
%% Assume that X2 is more up to date than X1.
X = X2,
Deleted = Deleted1 orelse Deleted2,
Bindings = sets:union(Bindings1, Bindings2),
#deletion{exchange = X,
deleted = Deleted,
bindings = Bindings}.

-spec notify_deletions(Deletions, ActingUser) -> ok when
Deletions :: rabbit_binding:deletions(),
ActingUser :: rabbit_types:username().

notify_deletions(Deletions, ActingUser) when is_map(Deletions) ->
maps:foreach(
fun (XName, #deletion{deleted = XDeleted, bindings = Bindings}) ->
case XDeleted of
true ->
notify_exchange_deletion(XName, ActingUser),
notify_bindings_deletion(Bs, ActingUser);
(_XName, {_X, not_deleted, Bs}, ok) ->
notify_bindings_deletion(Bs, ActingUser)
end, ok, Deletions).
notify_bindings_deletion(Bindings, ActingUser);
false ->
notify_bindings_deletion(Bindings, ActingUser)
end
end, Deletions).

notify_exchange_deletion(XName, ActingUser) ->
ok = rabbit_event:notify(
exchange_deleted,
[{name, XName},
{user_who_performed_action, ActingUser}]).

notify_bindings_deletion(Bs, ActingUser) ->
[rabbit_event:notify(binding_deleted,
info(B) ++ [{user_who_performed_action, ActingUser}])
|| B <- Bs],
ok.
notify_bindings_deletion(Bindings, ActingUser) ->
sets:fold(
fun(Binding, ok) ->
rabbit_event:notify(
binding_deleted,
info(Binding) ++ [{user_who_performed_action, ActingUser}]),
ok
end, ok, Bindings).

-spec process_deletions(deletions()) -> deletions().
-spec process_deletions(deletions()) -> ok.
process_deletions(Deletions) ->
dict:map(fun (_XName, {X, deleted, Bindings}) ->
Bs = lists:flatten(Bindings),
Serial = rabbit_exchange:serial(X),
rabbit_exchange:callback(X, delete, Serial, [X]),
{X, deleted, Bs, none};
(_XName, {X, not_deleted, Bindings}) ->
Bs = lists:flatten(Bindings),
Serial = rabbit_exchange:serial(X),
rabbit_exchange:callback(X, remove_bindings, Serial, [X, Bs]),
{X, not_deleted, Bs, none}
end, Deletions).

binding_checks(Binding, InnerFun) ->
fun(Src, Dst) ->
case rabbit_exchange:validate_binding(Src, Binding) of
ok ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
InnerFun(Src, Dst);
Err ->
Err
end
maps:foreach(
fun (_XName, #deletion{exchange = X,
deleted = XDeleted,
bindings = Bindings}) ->
Serial = rabbit_exchange:serial(X),
case XDeleted of
true ->
rabbit_exchange:callback(X, delete, Serial, [X]);
false ->
Bindings1 = sets:to_list(Bindings),
rabbit_exchange:callback(
X, remove_bindings, Serial, [X, Bindings1])
end
end, Deletions).

-spec fetch_deletion(XName, Deletions) -> Ret when
XName :: rabbit_exchange:name(),
Deletions :: deletions(),
Ret :: {X, WasDeleted, Bindings},
X :: rabbit_types:exchange(),
WasDeleted :: deleted | not_deleted,
Bindings :: bindings().
%% @doc Fetches the deletions for the given exchange name.
%%
%% This function is only intended for use in tests.
%%
%% @private

fetch_deletion(XName, Deletions) ->
case maps:find(XName, Deletions) of
{ok, #deletion{exchange = X,
deleted = Deleted,
bindings = Bindings}} ->
WasDeleted = case Deleted of
true ->
deleted;
false ->
not_deleted
end,
Bindings1 = sets:to_list(Bindings),
{X, WasDeleted, Bindings1};
error ->
error
end.
50 changes: 30 additions & 20 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,10 @@ delete_in_mnesia(Src, Dst, B) ->
should_index_table(Src), fun delete/3),
Deletions0 = maybe_auto_delete_exchange_in_mnesia(
B#binding.source, [B], rabbit_binding:new_deletions(), false),
fun() -> {ok, rabbit_binding:process_deletions(Deletions0)} end.
fun() ->
ok = rabbit_binding:process_deletions(Deletions0),
{ok, Deletions0}
end.

absent_errs_only_in_mnesia(Names) ->
Errs = [E || Name <- Names,
Expand Down Expand Up @@ -352,7 +355,8 @@ delete_in_khepri(#binding{source = SrcName,
{error, _} = Err ->
Err;
Deletions ->
{ok, rabbit_binding:process_deletions(Deletions)}
ok = rabbit_binding:process_deletions(Deletions),
{ok, Deletions}
end.

exists_in_khepri(Path, Binding) ->
Expand All @@ -379,15 +383,18 @@ delete_in_khepri(Binding) ->
end.

maybe_auto_delete_exchange_in_khepri(XName, Bindings, Deletions, OnlyDurable) ->
{Entry, Deletions1} =
case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName, OnlyDurable) of
{not_deleted, X} ->
{{X, not_deleted, Bindings}, Deletions};
{deleted, X, Deletions2} ->
{{X, deleted, Bindings},
rabbit_binding:combine_deletions(Deletions, Deletions2)}
end,
rabbit_binding:add_deletion(XName, Entry, Deletions1).
case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName, OnlyDurable) of
{not_deleted, undefined} ->
Deletions;
{not_deleted, X} ->
rabbit_binding:add_deletion(
XName, X, not_deleted, Bindings, Deletions);
{deleted, X, Deletions1} ->
Deletions2 = rabbit_binding:combine_deletions(
Deletions, Deletions1),
rabbit_binding:add_deletion(
XName, X, deleted, Bindings, Deletions2)
end.

%% -------------------------------------------------------------------
%% get_all().
Expand Down Expand Up @@ -1152,15 +1159,18 @@ sync_index_route(_, _, _) ->
OnlyDurable :: boolean(),
Ret :: rabbit_binding:deletions().
maybe_auto_delete_exchange_in_mnesia(XName, Bindings, Deletions, OnlyDurable) ->
{Entry, Deletions1} =
case rabbit_db_exchange:maybe_auto_delete_in_mnesia(XName, OnlyDurable) of
{not_deleted, X} ->
{{X, not_deleted, Bindings}, Deletions};
{deleted, X, Deletions2} ->
{{X, deleted, Bindings},
rabbit_binding:combine_deletions(Deletions, Deletions2)}
end,
rabbit_binding:add_deletion(XName, Entry, Deletions1).
case rabbit_db_exchange:maybe_auto_delete_in_mnesia(XName, OnlyDurable) of
{not_deleted, undefined} ->
Deletions;
{not_deleted, X} ->
rabbit_binding:add_deletion(
XName, X, not_deleted, Bindings, Deletions);
{deleted, X, Deletions1} ->
Deletions2 = rabbit_binding:combine_deletions(
Deletions, Deletions1),
rabbit_binding:add_deletion(
XName, X, deleted, Bindings, Deletions2)
end.

%% Instead of locking entire table on remove operations we can lock the
%% affected resource only.
Expand Down
Loading

0 comments on commit 4aa68ca

Please sign in to comment.