diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 64226a6e4b3..cd3a484e04e 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -129,6 +129,7 @@ groups() -> modified_classic_queue, modified_quorum_queue, modified_dead_letter_headers_exchange, + modified_dead_letter_history, dead_letter_headers_exchange, dead_letter_reject, dead_letter_reject_message_order_classic_queue, @@ -264,7 +265,8 @@ init_per_testcase(T, Config) end; init_per_testcase(T, Config) when T =:= modified_quorum_queue orelse - T =:= modified_dead_letter_headers_exchange -> + T =:= modified_dead_letter_headers_exchange orelse + T =:= modified_dead_letter_history -> case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of true -> rabbit_ct_helpers:testcase_started(Config, T); @@ -727,6 +729,85 @@ modified_dead_letter_headers_exchange(Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). +%% Test that custom dead lettering event tracking works as described in +%% https://rabbitmq.com/blog/2024/10/11/modified-outcome +modified_dead_letter_history(Config) -> + {Connection, Session, LinkPair} = init(Config), + Q1 = <<"qq 1">>, + Q2 = <<"qq 2">>, + + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair, Q1, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>}, + <<"x-dead-letter-exchange">> => {utf8, <<"amq.fanout">>}}}), + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair, Q2, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>}, + <<"x-dead-letter-exchange">> => {utf8, <<>>}}}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, Q2, <<"amq.fanout">>, <<>>, #{}), + + {ok, Sender} = amqp10_client:attach_sender_link( + Session, <<"test-sender">>, rabbitmq_amqp_address:queue(Q1)), + wait_for_credit(Sender), + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(Q1), unsettled), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(Q2), unsettled), + + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t">>, <<"m">>)), + ok = wait_for_accepts(1), + ok = detach_link_sync(Sender), + + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + ?assertMatch(#{delivery_count := 0, + first_acquirer := true}, + amqp10_msg:headers(Msg1)), + ok = amqp10_client:settle_msg( + Receiver1, Msg1, + {modified, true, true, + #{<<"x-opt-history-list">> => {list, [{utf8, <<"l1">>}]}, + <<"x-opt-history-map">> => {map, [{{symbol, <<"k1">>}, {byte, -1}}]}, + <<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a1">>}]}} + }), + + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertMatch(#{delivery_count := 1, + first_acquirer := false}, + amqp10_msg:headers(Msg2)), + #{<<"x-opt-history-list">> := L1, + <<"x-opt-history-map">> := L2, + <<"x-opt-history-array">> := {array, utf8, L0} + } = amqp10_msg:message_annotations(Msg2), + ok = amqp10_client:settle_msg( + Receiver2, Msg2, + {modified, true, true, + #{<<"x-opt-history-list">> => {list, [{int, -99} | L1]}, + <<"x-opt-history-map">> => {map, [{{symbol, <<"k2">>}, {symbol, <<"v2">>}} | L2]}, + <<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a2">>} | L0]}, + <<"x-other">> => 99}}), + + {ok, Msg3} = amqp10_client:get_msg(Receiver1), + ?assertEqual([<<"m">>], amqp10_msg:body(Msg3)), + ?assertMatch(#{delivery_count := 2, + first_acquirer := false}, + amqp10_msg:headers(Msg3)), + ?assertMatch(#{<<"x-opt-history-array">> := {array, utf8, [{utf8, <<"a2">>}, {utf8, <<"a1">>}]}, + <<"x-opt-history-list">> := [{int, -99}, {utf8, <<"l1">>}], + <<"x-opt-history-map">> := [{{symbol, <<"k2">>}, {symbol, <<"v2">>}}, + {{symbol, <<"k1">>}, {byte, -1}}], + <<"x-other">> := 99}, amqp10_msg:message_annotations(Msg3)), + ok = amqp10_client:accept_msg(Receiver1, Msg3), + + ok = detach_link_sync(Receiver1), + ok = detach_link_sync(Receiver2), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + %% Tests that confirmations are returned correctly %% when sending many messages async to a quorum queue. sender_settle_mode_unsettled(Config) ->