From caaa465b6652a2e88fedc8eb9c45a5c318695c67 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 11 Oct 2024 09:35:46 +0200 Subject: [PATCH] Track requeue history Support tracking the requeue history as described in https://github.com/rabbitmq/rabbitmq-website/pull/2095 This commit: 1. adds a test case tracing the requeue history via AMQP 1.0 using the modified outcome and 2. fixes bugs in the broker which crashed if a modified message annotation value is an AMQP 1.0 list, map, or array. Modified annotations are stored as tagged values from now on. These modified annotations can be consumed via AMQP 1.0, but not via AMQP 0.9.1, which is okay. --- .../src/amqp10_client_session.erl | 8 +- deps/rabbit/src/mc.erl | 2 +- deps/rabbit/src/mc_util.erl | 5 +- deps/rabbit/src/rabbit_amqp_session.erl | 4 +- deps/rabbit/test/amqp_client_SUITE.erl | 83 +++++++++++++------ 5 files changed, 68 insertions(+), 34 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 981e291a3853..5be222c8b499 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -1178,14 +1178,16 @@ wrap_map_value(true) -> {boolean, true}; wrap_map_value(false) -> {boolean, false}; -wrap_map_value(V) when is_integer(V) -> - {uint, V}; +wrap_map_value(V) when is_integer(V) andalso V >= 0 -> + uint(V); wrap_map_value(V) when is_binary(V) -> utf8(V); wrap_map_value(V) when is_list(V) -> utf8(list_to_binary(V)); wrap_map_value(V) when is_atom(V) -> - utf8(atom_to_list(V)). + utf8(atom_to_list(V)); +wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) -> + TaggedValue. utf8(V) -> amqp10_client_types:utf8(V). diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 9c23ac13daf8..62a81d3a14f0 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -44,7 +44,7 @@ -type str() :: atom() | string() | binary(). -type internal_ann_key() :: atom(). -type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt- --type x_ann_value() :: str() | integer() | float() | [x_ann_value()]. +-type x_ann_value() :: str() | integer() | float() | tuple() | [x_ann_value()]. -type protocol() :: module(). -type annotations() :: #{internal_ann_key() => term(), x_ann_key() => x_ann_value()}. diff --git a/deps/rabbit/src/mc_util.erl b/deps/rabbit/src/mc_util.erl index 669dace41f45..1f20d15699db 100644 --- a/deps/rabbit/src/mc_util.erl +++ b/deps/rabbit/src/mc_util.erl @@ -52,9 +52,8 @@ infer_type(V) when is_integer(V) -> {long, V}; infer_type(V) when is_boolean(V) -> {boolean, V}; -infer_type({T, _} = V) when is_atom(T) -> - %% looks like a pre-tagged type - V. +infer_type(TaggedValue) when is_atom(element(1, TaggedValue)) -> + TaggedValue. utf8_string_is_ascii(UTF8String) -> utf8_scan(UTF8String, fun(Char) -> Char >= 0 andalso Char < 128 end). diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 3be9ea2b00fc..289a119512b7 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -1937,8 +1937,8 @@ settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed, {map, KVList} -> Anns1 = lists:map( %% "all symbolic keys except those beginning with "x-" are reserved." [3.2.10] - fun({{symbol, <<"x-", _/binary>> = K}, V}) -> - {K, unwrap(V)} + fun({{symbol, <<"x-", _/binary>> = K}, TaggedVal}) -> + {K, TaggedVal} end, KVList), maps:from_list(Anns1) end, diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index e8c64690a012..604c223c51c6 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -501,61 +501,94 @@ modified_quorum_queue(Config) -> ok = amqp10_client:send_msg(Sender, Msg2), ok = amqp10_client:detach_link(Sender), - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled), + Receiver1Name = <<"receiver 1">>, + Receiver2Name = <<"receiver 2">>, + {ok, Receiver1} = amqp10_client:attach_receiver_link(Session, Receiver1Name, Address, unsettled), + {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, Receiver2Name, Address, unsettled), - {ok, M1} = amqp10_client:get_msg(Receiver), + {ok, M1} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m1">>], amqp10_msg:body(M1)), ?assertMatch(#{delivery_count := 0, first_acquirer := true}, amqp10_msg:headers(M1)), - ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}), + ok = amqp10_client:settle_msg(Receiver1, M1, {modified, false, true, #{}}), - {ok, M2a} = amqp10_client:get_msg(Receiver), + {ok, M2a} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2a)), ?assertMatch(#{delivery_count := 0, first_acquirer := true}, amqp10_msg:headers(M2a)), - ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}), + ok = amqp10_client:settle_msg(Receiver1, M2a, {modified, false, false, #{}}), - {ok, M2b} = amqp10_client:get_msg(Receiver), + {ok, M2b} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2b)), ?assertMatch(#{delivery_count := 0, first_acquirer := false}, amqp10_msg:headers(M2b)), - ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}), + ok = amqp10_client:settle_msg(Receiver1, M2b, {modified, true, false, #{}}), - {ok, M2c} = amqp10_client:get_msg(Receiver), + {ok, M2c} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2c)), ?assertMatch(#{delivery_count := 1, first_acquirer := false}, amqp10_msg:headers(M2c)), - ok = amqp10_client:settle_msg(Receiver, M2c, - {modified, true, false, - #{<<"x-opt-key">> => <<"val 1">>}}), - - {ok, M2d} = amqp10_client:get_msg(Receiver), + ok = amqp10_client:settle_msg( + Receiver1, M2c, + {modified, true, false, + %% Test that a history of requeue events can be tracked as described in + %% https://rabbitmq.com/blog/2024/10/11/modified-outcome + #{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver1Name}]}, + <<"x-opt-requeue-reason">> => {list, [{utf8, <<"reason 1">>}]}, + <<"x-opt-my-map">> => {map, [ + {{utf8, <<"k1">>}, {byte, -1}}, + {{utf8, <<"k2">>}, {ulong, 2}} + ]}}}), + + {ok, M2d} = amqp10_client:get_msg(Receiver2), ?assertEqual([<<"m2">>], amqp10_msg:body(M2d)), ?assertMatch(#{delivery_count := 2, first_acquirer := false}, amqp10_msg:headers(M2d)), - ?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)), - ok = amqp10_client:settle_msg(Receiver, M2d, - {modified, false, false, - #{<<"x-opt-key">> => <<"val 2">>, - <<"x-other">> => 99}}), - - {ok, M2e} = amqp10_client:get_msg(Receiver), + #{<<"x-opt-requeued-by">> := {array, utf8, L0}, + <<"x-opt-requeue-reason">> := L1, + <<"x-opt-my-map">> := L2} = amqp10_msg:message_annotations(M2d), + ok = amqp10_client:settle_msg( + Receiver1, M2d, + {modified, false, false, + #{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver2Name} | L0]}, + <<"x-opt-requeue-reason">> => {list, [{symbol, <<"reason 2">>} | L1]}, + <<"x-opt-my-map">> => {map, L2 ++ [{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}]}, + <<"x-other">> => 99}}), + + {ok, M2e} = amqp10_client:get_msg(Receiver1), ?assertEqual([<<"m2">>], amqp10_msg:body(M2e)), ?assertMatch(#{delivery_count := 2, first_acquirer := false}, amqp10_msg:headers(M2e)), - ?assertMatch(#{<<"x-opt-key">> := <<"val 2">>, + ?assertMatch(#{<<"x-opt-requeued-by">> := {array, utf8, [{utf8, Receiver2Name}, {utf8, Receiver1Name}]}, + <<"x-opt-requeue-reason">> := [{symbol, <<"reason 2">>}, {utf8, <<"reason 1">>}], + <<"x-opt-my-map">> := [ + {{utf8, <<"k1">>}, {byte, -1}}, + {{utf8, <<"k2">>}, {ulong, 2}}, + {{symbol, <<"k3">>}, {symbol, <<"val 3">>}} + ], <<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)), - ok = amqp10_client:settle_msg(Receiver, M2e, modified), + ok = amqp10_client:settle_msg(Receiver1, M2e, modified), - ok = amqp10_client:detach_link(Receiver), - ?assertMatch({ok, #{message_count := 1}}, - rabbitmq_amqp_client:delete_queue(LinkPair, QName)), + %% Test that we can consume via AMQP 0.9.1 + Ch = rabbit_ct_client_helpers:open_channel(Config), + {#'basic.get_ok'{}, + #amqp_msg{payload = <<"m2">>, + props = #'P_basic'{headers = Headers}} + } = amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true}), + %% We don't necessarily expect to receive the modified AMQP 1.0 message annotations via AMQP 0.9.1. + ct:pal("AMQP 0.9.1 headers: ~p", [Headers]), + ?assertEqual({value, {<<"x-delivery-count">>, long, 5}}, + lists:keysearch(<<"x-delivery-count">>, 1, Headers)), + ok = rabbit_ct_client_helpers:close_channel(Ch), + + ok = amqp10_client:detach_link(Receiver1), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection).