Skip to content

Commit

Permalink
Add custom dead letter history test
Browse files Browse the repository at this point in the history
Test the use case described in rabbitmq/rabbitmq-website#2095:

> Rather than relying solely on RabbitMQ's built-in dead lettering tracking via x-opt-deaths,
consumers can customise dead lettering event tracking.
  • Loading branch information
ansd committed Oct 11, 2024
1 parent 855a32a commit 2e90619
Showing 1 changed file with 82 additions and 1 deletion.
83 changes: 82 additions & 1 deletion deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ groups() ->
modified_classic_queue,
modified_quorum_queue,
modified_dead_letter_headers_exchange,
modified_dead_letter_history,
dead_letter_headers_exchange,
dead_letter_reject,
dead_letter_reject_message_order_classic_queue,
Expand Down Expand Up @@ -264,7 +265,8 @@ init_per_testcase(T, Config)
end;
init_per_testcase(T, Config)
when T =:= modified_quorum_queue orelse
T =:= modified_dead_letter_headers_exchange ->
T =:= modified_dead_letter_headers_exchange orelse
T =:= modified_dead_letter_history ->
case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of
true ->
rabbit_ct_helpers:testcase_started(Config, T);
Expand Down Expand Up @@ -727,6 +729,85 @@ modified_dead_letter_headers_exchange(Config) ->
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

%% Test that custom dead lettering event tracking works as described in
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
modified_dead_letter_history(Config) ->
{Connection, Session, LinkPair} = init(Config),
Q1 = <<"qq 1">>,
Q2 = <<"qq 2">>,

{ok, _} = rabbitmq_amqp_client:declare_queue(
LinkPair, Q1,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
<<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>},
<<"x-dead-letter-exchange">> => {utf8, <<"amq.fanout">>}}}),
{ok, _} = rabbitmq_amqp_client:declare_queue(
LinkPair, Q2,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
<<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>},
<<"x-dead-letter-exchange">> => {utf8, <<>>}}}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, Q2, <<"amq.fanout">>, <<>>, #{}),

{ok, Sender} = amqp10_client:attach_sender_link(
Session, <<"test-sender">>, rabbitmq_amqp_address:queue(Q1)),
wait_for_credit(Sender),
{ok, Receiver1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(Q1), unsettled),
{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(Q2), unsettled),

ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t">>, <<"m">>)),
ok = wait_for_accepts(1),
ok = detach_link_sync(Sender),

{ok, Msg1} = amqp10_client:get_msg(Receiver1),
?assertMatch(#{delivery_count := 0,
first_acquirer := true},
amqp10_msg:headers(Msg1)),
ok = amqp10_client:settle_msg(
Receiver1, Msg1,
{modified, true, true,
#{<<"x-opt-history-list">> => {list, [{utf8, <<"l1">>}]},
<<"x-opt-history-map">> => {map, [{{symbol, <<"k1">>}, {byte, -1}}]},
<<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a1">>}]}}
}),

{ok, Msg2} = amqp10_client:get_msg(Receiver2),
?assertMatch(#{delivery_count := 1,
first_acquirer := false},
amqp10_msg:headers(Msg2)),
#{<<"x-opt-history-list">> := L1,
<<"x-opt-history-map">> := L2,
<<"x-opt-history-array">> := {array, utf8, L0}
} = amqp10_msg:message_annotations(Msg2),
ok = amqp10_client:settle_msg(
Receiver2, Msg2,
{modified, true, true,
#{<<"x-opt-history-list">> => {list, [{int, -99} | L1]},
<<"x-opt-history-map">> => {map, [{{symbol, <<"k2">>}, {symbol, <<"v2">>}} | L2]},
<<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a2">>} | L0]},
<<"x-other">> => 99}}),

{ok, Msg3} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m">>], amqp10_msg:body(Msg3)),
?assertMatch(#{delivery_count := 2,
first_acquirer := false},
amqp10_msg:headers(Msg3)),
?assertMatch(#{<<"x-opt-history-array">> := {array, utf8, [{utf8, <<"a2">>}, {utf8, <<"a1">>}]},
<<"x-opt-history-list">> := [{int, -99}, {utf8, <<"l1">>}],
<<"x-opt-history-map">> := [{{symbol, <<"k2">>}, {symbol, <<"v2">>}},
{{symbol, <<"k1">>}, {byte, -1}}],
<<"x-other">> := 99}, amqp10_msg:message_annotations(Msg3)),
ok = amqp10_client:accept_msg(Receiver1, Msg3),

ok = detach_link_sync(Receiver1),
ok = detach_link_sync(Receiver2),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q1),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

%% Tests that confirmations are returned correctly
%% when sending many messages async to a quorum queue.
sender_settle_mode_unsettled(Config) ->
Expand Down

0 comments on commit 2e90619

Please sign in to comment.