Skip to content

Commit

Permalink
Represent rabbit_binding:deletions() with a map instead of dict
Browse files Browse the repository at this point in the history
The `dict:dict()` typing of `rabbit_binding` appears to be a historical
artifact. `dict` has been superseded by `maps`. Switching to a map
makes deletions easier to inspect manually and faster. Though if
deletions grow so large that the map representation is important,
manipulation of the deletions is unlikely to be expensive compared to
any other operations that produced them, so performance is probably
irrelevant.

This commit refactors the bottom section of the `rabbit_binding` module
to switch to a map and also to switch the `deletions()` type to an
opaque, eliminating a TODO created when using Erlang/OTP 17.1. We also
eliminate some historical artifacts and "cruft":

* Deletions taking multiple forms needlessly, specifically the shape
  `{X, deleted | not_deleted, Bindings, none}` no longer being
  handled. `process_deletions/2` was responsible for creating this
  shape.
* Clauses to catch `{error, not_found}` are unnecessary after minor
  refactors of the callers. Removing them makes the type specs cleaner.
* `rabbit_binding:process_deletions/1` has no reason to update or change
  the deletions. This function uses `maps:foreach/2` instead and returns
  `ok` instead of mapped deletions.
* Remove `undefined` from the typespec of deletions. This value is no
  longer possible with a refactor to `maybe_auto_delete_exchange_in_*`
  functions for Mnesia and Khepri. The value was nonsensical since you
  do not delete bindings for an exchange that does not exist.
  • Loading branch information
the-mikedavis committed Oct 1, 2024
1 parent d10f889 commit 0688544
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 106 deletions.
16 changes: 8 additions & 8 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1818,8 +1818,8 @@ internal_delete(Queue, ActingUser, Reason) ->
{error, timeout} = Err ->
Err;
Deletions ->
_ = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER),
ok = rabbit_binding:process_deletions(Deletions),
ok = rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER),
rabbit_core_metrics:queue_deleted(QueueName),
ok = rabbit_event:notify(queue_deleted,
[{name, QueueName},
Expand Down Expand Up @@ -1942,14 +1942,14 @@ filter_transient_queues_to_delete(Node) ->
end.

notify_queue_binding_deletions(QueueDeletions) when is_list(QueueDeletions) ->
Deletions = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
rabbit_binding:new_deletions(),
QueueDeletions)),
Deletions = lists:foldl(
fun rabbit_binding:combine_deletions/2,
rabbit_binding:new_deletions(), QueueDeletions),
ok = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER);
notify_queue_binding_deletions(QueueDeletions) ->
Deletions = rabbit_binding:process_deletions(QueueDeletions),
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER).
ok = rabbit_binding:process_deletions(QueueDeletions),
rabbit_binding:notify_deletions(QueueDeletions, ?INTERNAL_USER).

notify_transient_queues_deleted(QueueDeletions) ->
lists:foreach(
Expand Down
160 changes: 91 additions & 69 deletions deps/rabbit/src/rabbit_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@
rabbit_types:ok_or_error(rabbit_types:amqp_error())).
-type bindings() :: [rabbit_types:binding()].

%% TODO this should really be opaque but that seems to confuse 17.1's
%% dialyzer into objecting to everything that uses it.
-type deletions() :: dict:dict().
-type deletion() :: {Exchange :: rabbit_types:exchange(),
WasDeleted :: 'deleted' | 'not_deleted',
Bindings :: bindings()}.

-opaque deletions() :: #{XName :: rabbit_exchange:name() => deletion()}.

%%----------------------------------------------------------------------------

Expand Down Expand Up @@ -160,6 +162,19 @@ binding_type0(false, true) ->
binding_type0(_, _) ->
transient.

binding_checks(Binding, InnerFun) ->
fun(Src, Dst) ->
case rabbit_exchange:validate_binding(Src, Binding) of
ok ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
InnerFun(Src, Dst);
Err ->
Err
end
end.

-spec remove(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
remove(Binding, ActingUser) -> remove(Binding, fun (_Src, _Dst) -> ok end, ActingUser).

Expand Down Expand Up @@ -371,57 +386,80 @@ index_route(#route{binding = #binding{source = Source,
%% ----------------------------------------------------------------------------
%% Binding / exchange deletion abstraction API
%% ----------------------------------------------------------------------------

anything_but( NotThis, NotThis, NotThis) -> NotThis;
anything_but( NotThis, NotThis, This) -> This;
anything_but( NotThis, This, NotThis) -> This;
anything_but(_NotThis, This, This) -> This.
%%
%% `deletions()' describe the deletion from the metadata store of bindings
%% and/or exchanges.
%%
%% These deletion records are used for two purposes:
%%
%% <ul>
%% <li>"<em>Processing</em>" of deletions. Processing here means that the
%% exchanges and bindings are passed into the {@link rabbit_exchange}
%% callbacks. When an exchange is deleted the `rabbit_exchange:delete/1'
%% callback is invoked and when the exchange is not deleted but some bindings
%% are deleted the `rabbit_exchange:remove_bindings/2' is invoked.</li>
%% <li><em>Notification</em> of metadata deletion. Like other internal
%% notifications, {@link rabbit_binding:notify_deletions()} uses {@link
%% rabbit_event} to notify any interested consumers of a resource deletion.
%% An example consumer of {@link rabbit_event} is the `rabbitmq_event_exchange'
%% plugin which routes these notifications as messages.</li>
%% </ul>
%%
%% The point of a specialized opaque type for this term is to be able to
%% collect all bindings deleted in one action into a list. This allows us to
%% invoke the `rabbit_exchange:remove_bindings/2' callback with all bindings
%% at once rather than passing each binding individually.

-spec new_deletions() -> deletions().

new_deletions() -> dict:new().
new_deletions() -> #{}.

-spec add_deletion
(rabbit_exchange:name(),
{'undefined' | rabbit_types:exchange(),
'deleted' | 'not_deleted',
bindings()},
deletions()) ->
deletions().
-spec add_deletion(XName, Deletion, Deletions) -> Deletions1 when
XName :: rabbit_exchange:name(),
Deletion :: deletion(),
Deletions :: deletions(),
Deletions1 :: deletions().

add_deletion(XName, Entry, Deletions) ->
dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end,
Entry, Deletions).
add_deletion(XName, Deletion, Deletions) ->
maps:update_with(
XName,
fun(Deletion1) ->
merge_entry(Deletion1, Deletion)
end, Deletion, Deletions).

-spec combine_deletions(deletions(), deletions()) -> deletions().

combine_deletions(Deletions1, Deletions2) ->
dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end,
Deletions1, Deletions2).

merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
{anything_but(undefined, X1, X2),
anything_but(not_deleted, Deleted1, Deleted2),
Bindings1 ++ Bindings2};
merge_entry({X1, Deleted1, Bindings1, none}, {X2, Deleted2, Bindings2, none}) ->
{anything_but(undefined, X1, X2),
anything_but(not_deleted, Deleted1, Deleted2),
Bindings1 ++ Bindings2, none}.

notify_deletions({error, not_found}, _) ->
ok;
maps:merge_with(
fun (_XName, Entry1, Entry2) ->
merge_entry(Entry1, Entry2)
end, Deletions1, Deletions2).

merge_entry({_X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
%% Assume that X2 is more up to date than X1.
X = X2,
Deleted = case {Deleted1, Deleted2} of
{deleted, _} ->
deleted;
{_, deleted} ->
deleted;
{not_deleted, not_deleted} ->
not_deleted
end,
{X, Deleted, Bindings1 ++ Bindings2}.

-spec notify_deletions(Deletions, ActingUser) -> ok when
Deletions :: rabbit_binding:deletions(),
ActingUser :: rabbit_types:username().

notify_deletions(Deletions, ActingUser) ->
dict:fold(fun (XName, {_X, deleted, Bs, _}, ok) ->
notify_exchange_deletion(XName, ActingUser),
notify_bindings_deletion(Bs, ActingUser);
(_XName, {_X, not_deleted, Bs, _}, ok) ->
notify_bindings_deletion(Bs, ActingUser);
(XName, {_X, deleted, Bs}, ok) ->
notify_exchange_deletion(XName, ActingUser),
notify_bindings_deletion(Bs, ActingUser);
(_XName, {_X, not_deleted, Bs}, ok) ->
notify_bindings_deletion(Bs, ActingUser)
end, ok, Deletions).
maps:foreach(
fun (XName, {_X, deleted, Bs}) ->
notify_exchange_deletion(XName, ActingUser),
notify_bindings_deletion(Bs, ActingUser);
(_XName, {_X, not_deleted, Bs}) ->
notify_bindings_deletion(Bs, ActingUser)
end, Deletions).

notify_exchange_deletion(XName, ActingUser) ->
ok = rabbit_event:notify(
Expand All @@ -435,29 +473,13 @@ notify_bindings_deletion(Bs, ActingUser) ->
|| B <- Bs],
ok.

-spec process_deletions(deletions()) -> deletions().
-spec process_deletions(deletions()) -> ok.
process_deletions(Deletions) ->
dict:map(fun (_XName, {X, deleted, Bindings}) ->
Bs = lists:flatten(Bindings),
Serial = rabbit_exchange:serial(X),
rabbit_exchange:callback(X, delete, Serial, [X]),
{X, deleted, Bs, none};
(_XName, {X, not_deleted, Bindings}) ->
Bs = lists:flatten(Bindings),
Serial = rabbit_exchange:serial(X),
rabbit_exchange:callback(X, remove_bindings, Serial, [X, Bs]),
{X, not_deleted, Bs, none}
end, Deletions).

binding_checks(Binding, InnerFun) ->
fun(Src, Dst) ->
case rabbit_exchange:validate_binding(Src, Binding) of
ok ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
InnerFun(Src, Dst);
Err ->
Err
end
end.
maps:foreach(
fun (_XName, {X, deleted, _Bs}) ->
Serial = rabbit_exchange:serial(X),
rabbit_exchange:callback(X, delete, Serial, [X]);
(_XName, {X, not_deleted, Bs}) ->
Serial = rabbit_exchange:serial(X),
rabbit_exchange:callback(X, remove_bindings, Serial, [X, Bs])
end, Deletions).
29 changes: 18 additions & 11 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,10 @@ delete_in_mnesia(Src, Dst, B) ->
should_index_table(Src), fun delete/3),
Deletions0 = maybe_auto_delete_exchange_in_mnesia(
B#binding.source, [B], rabbit_binding:new_deletions(), false),
fun() -> {ok, rabbit_binding:process_deletions(Deletions0)} end.
fun() ->
ok = rabbit_binding:process_deletions(Deletions0),
{ok, Deletions0}
end.

absent_errs_only_in_mnesia(Names) ->
Errs = [E || Name <- Names,
Expand Down Expand Up @@ -361,7 +364,8 @@ delete_in_khepri(#binding{source = SrcName,
{error, _} = Err ->
Err;
Deletions ->
{ok, rabbit_binding:process_deletions(Deletions)}
ok = rabbit_binding:process_deletions(Deletions),
{ok, Deletions}
end.

exists_in_khepri(Path, Binding) ->
Expand All @@ -388,15 +392,18 @@ delete_in_khepri(Binding) ->
end.

maybe_auto_delete_exchange_in_khepri(XName, Bindings, Deletions) ->
{Entry, Deletions1} =
case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName) of
{not_deleted, X} ->
{{X, not_deleted, Bindings}, Deletions};
{deleted, X, Deletions2} ->
{{X, deleted, Bindings},
rabbit_binding:combine_deletions(Deletions, Deletions2)}
end,
rabbit_binding:add_deletion(XName, Entry, Deletions1).
case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName) of
{not_deleted, undefined} ->
Deletions;
{not_deleted, X} ->
rabbit_binding:add_deletion(
XName, {X, not_deleted, Bindings}, Deletions);
{deleted, X, Deletions1} ->
Deletions2 = rabbit_binding:combine_deletions(
Deletions, Deletions1),
rabbit_binding:add_deletion(
XName, {X, deleted, Bindings}, Deletions2)
end.

%% -------------------------------------------------------------------
%% get_all().
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ next_serial_in_khepri_tx(#exchange{name = XName}) ->
IfUnused :: boolean(),
Exchange :: rabbit_types:exchange(),
Binding :: rabbit_types:binding(),
Deletions :: dict:dict(),
Deletions :: rabbit_binding:deletions(),
Ret :: {deleted, Exchange, [Binding], Deletions} |
{error, not_found} |
{error, in_use} |
Expand Down Expand Up @@ -624,7 +624,7 @@ unconditional_delete_in_mnesia(X, OnlyDurable) ->
RemoveBindingsForSource :: boolean(),
Exchange :: rabbit_types:exchange(),
Binding :: rabbit_types:binding(),
Deletions :: dict:dict(),
Deletions :: rabbit_binding:deletions(),
Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}.
delete_in_mnesia(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSource) ->
ok = mnesia:delete({?MNESIA_TABLE, XName}),
Expand Down
27 changes: 11 additions & 16 deletions deps/rabbit/src/rabbit_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -470,13 +470,15 @@ delete(XName, IfUnused, Username) ->
_ = rabbit_runtime_parameters:set(XName#resource.virtual_host,
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
XName#resource.name, true, Username),
Deletions = process_deletions(rabbit_db_exchange:delete(XName, IfUnused)),
case Deletions of
{error, _} ->
Deletions;
_ ->
rabbit_binding:notify_deletions(Deletions, Username),
ok
case rabbit_db_exchange:delete(XName, IfUnused) of
{deleted, #exchange{name = XName} = X, Bs, Deletions} ->
Deletions1 = rabbit_binding:add_deletion(
XName, {X, deleted, Bs}, Deletions),
ok = rabbit_binding:process_deletions(Deletions1),
ok = rabbit_binding:notify_deletions(Deletions1, Username),
ok;
{error, _} = Err ->
Err
end
after
rabbit_runtime_parameters:clear(XName#resource.virtual_host,
Expand All @@ -491,17 +493,10 @@ delete(XName, IfUnused, Username) ->

delete_all(VHostName, ActingUser) ->
{ok, Deletions} = rabbit_db_exchange:delete_all(VHostName),
Deletions1 = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions1, ActingUser),
ok = rabbit_binding:process_deletions(Deletions),
ok = rabbit_binding:notify_deletions(Deletions, ActingUser),
ok.

process_deletions({error, _} = E) ->
E;
process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) ->
rabbit_binding:process_deletions(
rabbit_binding:add_deletion(
XName, {X, deleted, Bs}, Deletions)).

-spec ensure_deleted(ExchangeName, IfUnused, Username) -> Ret when
ExchangeName :: name(),
IfUnused :: boolean(),
Expand Down

0 comments on commit 0688544

Please sign in to comment.