Skip to content

Commit

Permalink
Track requeue history
Browse files Browse the repository at this point in the history
Support tracking the requeue history as described in
rabbitmq/rabbitmq-website#2095

This commit:
1. adds a test case tracing the requeue history via AMQP 1.0
   using the modified outcome and
2. fixes bugs in the broker which crashed if a modified message
   annotation value is an AMQP 1.0 list, map, or array.

Modified annotations are stored as tagged values from now on.
These modified annotations can be consumed via AMQP 1.0, but not via
AMQP 0.9.1, which is okay.
  • Loading branch information
ansd committed Oct 11, 2024
1 parent d9ff6a0 commit caaa465
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 34 deletions.
8 changes: 5 additions & 3 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1178,14 +1178,16 @@ wrap_map_value(true) ->
{boolean, true};
wrap_map_value(false) ->
{boolean, false};
wrap_map_value(V) when is_integer(V) ->
{uint, V};
wrap_map_value(V) when is_integer(V) andalso V >= 0 ->
uint(V);
wrap_map_value(V) when is_binary(V) ->
utf8(V);
wrap_map_value(V) when is_list(V) ->
utf8(list_to_binary(V));
wrap_map_value(V) when is_atom(V) ->
utf8(atom_to_list(V)).
utf8(atom_to_list(V));
wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) ->
TaggedValue.

utf8(V) -> amqp10_client_types:utf8(V).

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
-type str() :: atom() | string() | binary().
-type internal_ann_key() :: atom().
-type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt-
-type x_ann_value() :: str() | integer() | float() | [x_ann_value()].
-type x_ann_value() :: str() | integer() | float() | tuple() | [x_ann_value()].
-type protocol() :: module().
-type annotations() :: #{internal_ann_key() => term(),
x_ann_key() => x_ann_value()}.
Expand Down
5 changes: 2 additions & 3 deletions deps/rabbit/src/mc_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ infer_type(V) when is_integer(V) ->
{long, V};
infer_type(V) when is_boolean(V) ->
{boolean, V};
infer_type({T, _} = V) when is_atom(T) ->
%% looks like a pre-tagged type
V.
infer_type(TaggedValue) when is_atom(element(1, TaggedValue)) ->
TaggedValue.

utf8_string_is_ascii(UTF8String) ->
utf8_scan(UTF8String, fun(Char) -> Char >= 0 andalso Char < 128 end).
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1937,8 +1937,8 @@ settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed,
{map, KVList} ->
Anns1 = lists:map(
%% "all symbolic keys except those beginning with "x-" are reserved." [3.2.10]
fun({{symbol, <<"x-", _/binary>> = K}, V}) ->
{K, unwrap(V)}
fun({{symbol, <<"x-", _/binary>> = K}, TaggedVal}) ->
{K, TaggedVal}
end, KVList),
maps:from_list(Anns1)
end,
Expand Down
83 changes: 58 additions & 25 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -501,61 +501,94 @@ modified_quorum_queue(Config) ->
ok = amqp10_client:send_msg(Sender, Msg2),
ok = amqp10_client:detach_link(Sender),

{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
Receiver1Name = <<"receiver 1">>,
Receiver2Name = <<"receiver 2">>,
{ok, Receiver1} = amqp10_client:attach_receiver_link(Session, Receiver1Name, Address, unsettled),
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, Receiver2Name, Address, unsettled),

{ok, M1} = amqp10_client:get_msg(Receiver),
{ok, M1} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
?assertMatch(#{delivery_count := 0,
first_acquirer := true},
amqp10_msg:headers(M1)),
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M1, {modified, false, true, #{}}),

{ok, M2a} = amqp10_client:get_msg(Receiver),
{ok, M2a} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2a)),
?assertMatch(#{delivery_count := 0,
first_acquirer := true},
amqp10_msg:headers(M2a)),
ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M2a, {modified, false, false, #{}}),

{ok, M2b} = amqp10_client:get_msg(Receiver),
{ok, M2b} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2b)),
?assertMatch(#{delivery_count := 0,
first_acquirer := false},
amqp10_msg:headers(M2b)),
ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M2b, {modified, true, false, #{}}),

{ok, M2c} = amqp10_client:get_msg(Receiver),
{ok, M2c} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2c)),
?assertMatch(#{delivery_count := 1,
first_acquirer := false},
amqp10_msg:headers(M2c)),
ok = amqp10_client:settle_msg(Receiver, M2c,
{modified, true, false,
#{<<"x-opt-key">> => <<"val 1">>}}),

{ok, M2d} = amqp10_client:get_msg(Receiver),
ok = amqp10_client:settle_msg(
Receiver1, M2c,
{modified, true, false,
%% Test that a history of requeue events can be tracked as described in
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver1Name}]},
<<"x-opt-requeue-reason">> => {list, [{utf8, <<"reason 1">>}]},
<<"x-opt-my-map">> => {map, [
{{utf8, <<"k1">>}, {byte, -1}},
{{utf8, <<"k2">>}, {ulong, 2}}
]}}}),

{ok, M2d} = amqp10_client:get_msg(Receiver2),
?assertEqual([<<"m2">>], amqp10_msg:body(M2d)),
?assertMatch(#{delivery_count := 2,
first_acquirer := false},
amqp10_msg:headers(M2d)),
?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)),
ok = amqp10_client:settle_msg(Receiver, M2d,
{modified, false, false,
#{<<"x-opt-key">> => <<"val 2">>,
<<"x-other">> => 99}}),

{ok, M2e} = amqp10_client:get_msg(Receiver),
#{<<"x-opt-requeued-by">> := {array, utf8, L0},
<<"x-opt-requeue-reason">> := L1,
<<"x-opt-my-map">> := L2} = amqp10_msg:message_annotations(M2d),
ok = amqp10_client:settle_msg(
Receiver1, M2d,
{modified, false, false,
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver2Name} | L0]},
<<"x-opt-requeue-reason">> => {list, [{symbol, <<"reason 2">>} | L1]},
<<"x-opt-my-map">> => {map, L2 ++ [{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}]},
<<"x-other">> => 99}}),

{ok, M2e} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2e)),
?assertMatch(#{delivery_count := 2,
first_acquirer := false},
amqp10_msg:headers(M2e)),
?assertMatch(#{<<"x-opt-key">> := <<"val 2">>,
?assertMatch(#{<<"x-opt-requeued-by">> := {array, utf8, [{utf8, Receiver2Name}, {utf8, Receiver1Name}]},
<<"x-opt-requeue-reason">> := [{symbol, <<"reason 2">>}, {utf8, <<"reason 1">>}],
<<"x-opt-my-map">> := [
{{utf8, <<"k1">>}, {byte, -1}},
{{utf8, <<"k2">>}, {ulong, 2}},
{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}
],
<<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)),
ok = amqp10_client:settle_msg(Receiver, M2e, modified),
ok = amqp10_client:settle_msg(Receiver1, M2e, modified),

ok = amqp10_client:detach_link(Receiver),
?assertMatch({ok, #{message_count := 1}},
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
%% Test that we can consume via AMQP 0.9.1
Ch = rabbit_ct_client_helpers:open_channel(Config),
{#'basic.get_ok'{},
#amqp_msg{payload = <<"m2">>,
props = #'P_basic'{headers = Headers}}
} = amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true}),
%% We don't necessarily expect to receive the modified AMQP 1.0 message annotations via AMQP 0.9.1.
ct:pal("AMQP 0.9.1 headers: ~p", [Headers]),
?assertEqual({value, {<<"x-delivery-count">>, long, 5}},
lists:keysearch(<<"x-delivery-count">>, 1, Headers)),
ok = rabbit_ct_client_helpers:close_channel(Ch),

ok = amqp10_client:detach_link(Receiver1),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).
Expand Down

0 comments on commit caaa465

Please sign in to comment.