diff --git a/deps/rabbitmq_federation/test/queue_SUITE.erl b/deps/rabbitmq_federation/test/queue_SUITE.erl index 90a89fabf90..b020380cdbf 100644 --- a/deps/rabbitmq_federation/test/queue_SUITE.erl +++ b/deps/rabbitmq_federation/test/queue_SUITE.erl @@ -14,7 +14,7 @@ -compile(export_all). -import(rabbit_federation_test_util, - [wait_for_federation/2, expect/3, expect/4, + [wait_for_federation/2, expect/3, expect/4, expect/2, set_upstream/4, set_upstream/5, clear_upstream/3, set_upstream_set/4, clear_upstream_set/3, set_policy/5, clear_policy/3, set_policy_pattern/5, set_policy_upstream/5, q/2, with_ch/3, @@ -22,7 +22,7 @@ federation_links_in_vhost/3]). -define(INITIAL_WAIT, 6000). --define(EXPECT_FEDERATION_TIMEOUT, 30000). +-define(EXPECT_FEDERATION_TIMEOUT, 60000). all() -> [ @@ -368,7 +368,20 @@ publish_expect(Config, Ch, X, Key, Q, Payload, Timeout) -> Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_federation_status, status, []), ct:pal("Federation status ~p", [Status]), - expect(Ch, Q, [Payload], Timeout). + expect0(Config, Ch, Q, [Payload], Timeout). + +expect0(Config, Ch, Q, Fun) when is_function(Fun) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, + no_ack = true}, self()), + CTag = receive + #'basic.consume_ok'{consumer_tag = CT} -> CT + end, + ct:pal("After subscribe, messages and consumers in broker ~p ", [rabbit_ct_broker_helpers:rabbitmqctl_list(Config, 0, ["list_queues", "name", "messages", "messages_ready", "consumers"])]), + Fun(), + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}). + +expect0(Config, Ch, Q, Payloads, Timeout) -> + expect0(Config, Ch, Q, fun() -> expect(Payloads, Timeout) end). %% Doubled due to our strange basic.get behaviour. expect_empty(Ch, Q) ->