Skip to content

Commit

Permalink
Comply with §2.2.2 of Anonymous Terminus extension
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Sep 26, 2024
1 parent 212543a commit e26d88e
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 44 deletions.
3 changes: 2 additions & 1 deletion deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,8 @@ rcv_settle_mode(_) -> undefined.
% TODO: work out if we can assume accepted
translate_delivery_state(undefined) -> undefined;
translate_delivery_state(#'v1_0.accepted'{}) -> accepted;
translate_delivery_state(#'v1_0.rejected'{}) -> rejected;
translate_delivery_state(#'v1_0.rejected'{error = undefined}) -> rejected;
translate_delivery_state(#'v1_0.rejected'{error = Error}) -> {rejected, Error};
translate_delivery_state(#'v1_0.modified'{}) -> modified;
translate_delivery_state(#'v1_0.released'{}) -> released;
translate_delivery_state(#'v1_0.received'{}) -> received;
Expand Down
45 changes: 38 additions & 7 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2344,6 +2344,7 @@ incoming_link_transfer(
PayloadSize = iolist_size(PayloadBin),
validate_message_size(PayloadSize, MaxMessageSize),
rabbit_msg_size_metrics:observe(?PROTOCOL, PayloadSize),
messages_received(Settled),

Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
Expand All @@ -2352,7 +2353,6 @@ incoming_link_transfer(
check_user_id(Mc2, User),
TopicPermCache = check_write_permitted_on_topic(
X, User, RoutingKey, TopicPermCache0),
messages_received(Settled),
QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}),
rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace),
Opts = #{correlation => {HandleInt, DeliveryId}},
Expand Down Expand Up @@ -2388,9 +2388,34 @@ incoming_link_transfer(
[DeliveryTag, DeliveryId, Reason])
end;
{error, #'v1_0.error'{} = Err} ->
Disposition = released(DeliveryId),
Detach = detach(HandleInt, Link0, Err),
{error, [Disposition, Detach]}
Disposition = case Settled of
true -> [];
false -> [released(DeliveryId)]
end,
Detach = [detach(HandleInt, Link0, Err)],
{error, Disposition ++ Detach};
{error, anonymous_terminus, #'v1_0.error'{} = Err} ->
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
case Settled of
true ->
Info = {map, [{{symbol, <<"delivery-tag">>}, DeliveryTag}]},
Err1 = Err#'v1_0.error'{info = Info},
Detach = detach(HandleInt, Link0, Err1),
{error, [Detach]};
false ->
Disposition = rejected(DeliveryId, Err),
DeliveryCount = add(DeliveryCount0, 1),
Credit1 = Credit0 - 1,
{Credit, Reply0} = maybe_grant_link_credit(
Credit1, MaxLinkCredit,
DeliveryCount, map_size(U0), Handle),
Reply = [Disposition | Reply0],
Link = Link0#incoming_link{
delivery_count = DeliveryCount,
credit = Credit,
multi_transfer_msg = undefined},
{ok, Reply, Link, State0}
end
end.

lookup_target(#exchange{} = X, LinkRKey, Mc, _, _, PermCache) ->
Expand All @@ -2414,16 +2439,16 @@ lookup_target(to, to, Mc, Vhost, User, PermCache0) ->
check_internal_exchange(X),
lookup_routing_key(X, RKey, Mc, PermCache);
{error, not_found} ->
{error, error_not_found(XName)}
{error, anonymous_terminus, error_not_found(XName)}
end;
{error, bad_address} ->
{error,
{error, anonymous_terminus,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"bad 'to' address string: ", String/binary>>}}}
end;
undefined ->
{error,
{error, anonymous_terminus,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}}
Expand Down Expand Up @@ -2467,6 +2492,12 @@ released(DeliveryId) ->
settled = true,
state = #'v1_0.released'{}}.

rejected(DeliveryId, Error) ->
#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER,
first = ?UINT(DeliveryId),
settled = true,
state = #'v1_0.rejected'{error = Error}}.

maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) ->
case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of
true ->
Expand Down
99 changes: 68 additions & 31 deletions deps/rabbit/test/amqp_address_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ common_tests() ->
target_per_message_queue,
target_per_message_unset_to_address,
target_per_message_bad_to_address,
target_per_message_exchange_absent,
target_per_message_exchange_absent_settled,
target_per_message_exchange_absent_unsettled,
target_bad_address,
source_bad_address
].
Expand Down Expand Up @@ -393,16 +394,15 @@ target_per_message_unset_to_address(Config) ->
%% Send message with 'to' unset.
DTag = <<1>>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, <<0>>)),
ok = wait_for_settled(released, DTag),
receive {amqp10_event,
{link, Sender,
{detached,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}}}} -> ok
after 5000 -> ct:fail("server did not close our outgoing link")
ExpectedError = #'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}},
ok = wait_for_settled({rejected, ExpectedError}, DTag),

ok = amqp10_client:detach_link(Sender),
receive {amqp10_event, {link, Sender, {detached, normal}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
end,

ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).

Expand Down Expand Up @@ -449,34 +449,32 @@ bad_v2_addresses() ->

%% Test v2 target address 'null' with an invalid 'to' addresses.
target_per_message_bad_to_address(Config) ->
lists:foreach(fun(Addr) ->
ok = target_per_message_bad_to_address0(Addr, Config)
end, bad_v2_addresses()).

target_per_message_bad_to_address0(Address, Config) ->
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
ok = wait_for_credit(Sender),

DTag = <<255>>,
Msg = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag, <<0>>)),
ok = amqp10_client:send_msg(Sender, Msg),
ok = wait_for_settled(released, DTag),
receive {amqp10_event,
{link, Sender,
{detached,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"bad 'to' address", _Rest/binary>>}}}}} -> ok
after 5000 -> ct:fail("server did not close our outgoing link")
end,
lists:foreach(
fun(Addr) ->
DTag = <<"some delivery tag">>,
Msg = amqp10_msg:set_properties(#{to => Addr}, amqp10_msg:new(DTag, <<0>>, false)),
ok = amqp10_client:send_msg(Sender, Msg),
receive
{amqp10_disposition, {{rejected, Error}, DTag}} ->
?assertMatch(#'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"bad 'to' address", _Rest/binary>>}},
Error)
after 5000 ->
flush(missing_disposition),
ct:fail(missing_disposition)
end
end, bad_v2_addresses()),

ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).

target_per_message_exchange_absent(Config) ->
target_per_message_exchange_absent_settled(Config) ->
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
XName = <<"🎈"/utf8>>,
Address = rabbitmq_amqp_address:exchange(XName),
Expand All @@ -492,20 +490,59 @@ target_per_message_exchange_absent(Config) ->
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName),

DTag2 = <<2>>,
Msg2 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag2, <<"m2">>)),
Msg2 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag2, <<"m2">>, true)),
ok = amqp10_client:send_msg(Sender, Msg2),
ok = wait_for_settled(released, DTag2),

%% "the routing node MUST detach the link over which the message was sent with an error.
%% [...] Additionally the info field of error MUST contain an entry with symbolic key delivery-tag
%% and binary value of the delivery-tag of the message which caused the failure."
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
receive {amqp10_event, {link, Sender, {detached, Error}}} ->
?assertEqual(
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, <<"no exchange '", XName/binary, "' in vhost '/'">>}},
description = {utf8, <<"no exchange '", XName/binary, "' in vhost '/'">>},
info = {map, [{{symbol, <<"delivery-tag">>}, {binary, DTag2}}]}
},
Error)
after 5000 -> ct:fail("server did not close our outgoing link")
end,

ok = cleanup(Init).

target_per_message_exchange_absent_unsettled(Config) ->
Init = {_, LinkPair = #link_pair{session = Session}} = init(Config),
XName = <<"🎈"/utf8>>,
Address = rabbitmq_amqp_address:exchange(XName),
ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
ok = wait_for_credit(Sender),

DTag1 = <<"my tag">>,
Msg1 = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag1, <<"hey">>)),
ok = amqp10_client:send_msg(Sender, Msg1),
ok = wait_for_settled(released, DTag1),

ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName),

%% "If the source of the link supports the rejected outcome, and the message has not
%% already been settled by the sender, then the routing node MUST reject the message.
%% In this case the error field of rejected MUST contain the error which would have been communicated
%% in the detach which would have be sent if a link to the same address had been attempted."
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors
%% We test here multiple rejections implicilty checking that link flow control works correctly.
ExpectedError = #'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, <<"no exchange '", XName/binary, "' in vhost '/'">>}},
[begin
DTag = Body = integer_to_binary(N),
Msg = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag, Body, false)),
ok = amqp10_client:send_msg(Sender, Msg),
ok = wait_for_settled({rejected, ExpectedError}, DTag)
end || N <- lists:seq(1, 300)],

ok = cleanup(Init).

target_bad_address(Config) ->
%% bad v1 and bad v2 target address
TargetAddr = <<"/qqq/🎈"/utf8>>,
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/test/amqp_auth_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ target_per_message_internal_exchange(Config) ->
ExpectedErr = error_unauthorized(
<<"forbidden to publish to internal exchange '", XName/binary, "' in vhost 'test vhost'">>),
receive {amqp10_event, {session, Session1, {ended, ExpectedErr}}} -> ok
after 5000 -> flush(aaa),
after 5000 -> flush(missing_event),
ct:fail({missing_event, ?LINE})
end,
ok = close_connection_sync(Conn1),
Expand Down
20 changes: 16 additions & 4 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ groups() ->
server_closes_link_classic_queue,
server_closes_link_quorum_queue,
server_closes_link_stream,
server_closes_link_exchange,
server_closes_link_exchange_settled,
server_closes_link_exchange_unsettled,
link_target_classic_queue_deleted,
link_target_quorum_queue_deleted,
target_queues_deleted_accepted,
Expand Down Expand Up @@ -1513,7 +1514,13 @@ server_closes_link(QType, Config) ->
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

server_closes_link_exchange(Config) ->
server_closes_link_exchange_settled(Config) ->
server_closes_link_exchange(true, Config).

server_closes_link_exchange_unsettled(Config) ->
server_closes_link_exchange(false, Config).

server_closes_link_exchange(Settled, Config) ->
XName = atom_to_binary(?FUNCTION_NAME),
QName = <<"my queue">>,
RoutingKey = <<"my routing key">>,
Expand Down Expand Up @@ -1543,8 +1550,13 @@ server_closes_link_exchange(Config) ->
%% When we publish the next message, we expect:
%% 1. that the message is released because the exchange doesn't exist anymore, and
DTag2 = <<255>>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, false)),
ok = wait_for_settlement(DTag2, released),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, Settled)),
case Settled of
true ->
ok;
false ->
ok = wait_for_settlement(DTag2, released)
end,
%% 2. that the server closes the link, i.e. sends us a DETACH frame.
receive {amqp10_event,
{link, Sender,
Expand Down

0 comments on commit e26d88e

Please sign in to comment.