From 256332757d4a1283016b4d11104024bb1af6db04 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 10 Jun 2024 11:26:50 +0200 Subject: [PATCH] Configure capabilities on the source/target field in the ATTACH frame --- deps/amqp10_client/BUILD.bazel | 2 + deps/amqp10_client/app.bzl | 8 + deps/amqp10_client/src/amqp10_client.erl | 5 +- .../src/amqp10_client_session.erl | 31 +- deps/amqp10_client/test/ibmmq_ct_helpers.erl | 89 +++++ deps/amqp10_client/test/system_SUITE.erl | 332 +++++++++++++++--- .../test/system_SUITE_data/ibmmq_runner | 99 ++++++ 7 files changed, 519 insertions(+), 47 deletions(-) create mode 100644 deps/amqp10_client/test/ibmmq_ct_helpers.erl create mode 100755 deps/amqp10_client/test/system_SUITE_data/ibmmq_runner diff --git a/deps/amqp10_client/BUILD.bazel b/deps/amqp10_client/BUILD.bazel index df8b879adae1..6e3808490087 100644 --- a/deps/amqp10_client/BUILD.bazel +++ b/deps/amqp10_client/BUILD.bazel @@ -116,6 +116,7 @@ rabbitmq_integration_suite( size = "medium", additional_beam = [ "test/activemq_ct_helpers.beam", + "test/ibmmq_ct_helpers.beam", "test/mock_server.beam", ], data = [ @@ -139,6 +140,7 @@ eunit( name = "eunit", compiled_suites = [ ":test_activemq_ct_helpers_beam", + ":test_ibmmq_ct_helpers_beam", ":test_mock_server_beam", ], target = ":test_erlang_app", diff --git a/deps/amqp10_client/app.bzl b/deps/amqp10_client/app.bzl index 8fcdad73cf9d..de6c84488338 100644 --- a/deps/amqp10_client/app.bzl +++ b/deps/amqp10_client/app.bzl @@ -126,6 +126,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"): app_name = "amqp10_client", erlc_opts = "//:test_erlc_opts", ) + erlang_bytecode( + name = "test_ibmmq_ct_helpers_beam", + testonly = True, + srcs = ["test/ibmmq_ct_helpers.erl"], + outs = ["test/ibmmq_ct_helpers.beam"], + app_name = "amqp10_client", + erlc_opts = "//:test_erlc_opts", + ) erlang_bytecode( name = "test_mock_server_beam", testonly = True, diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index 68cac2622265..64b8dbf0eff8 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -166,6 +166,8 @@ end_session(Pid) -> %% for the link before returning. attach_sender_link_sync(Session, Name, Target) -> attach_sender_link_sync(Session, Name, Target, mixed). +-spec attach_sender_link_sync(pid(), binary(), binary()) -> + {ok, link_ref()} | link_timeout. %% @doc Synchronously attach a link on 'Session'. %% This is a convenience function that awaits attached event @@ -273,7 +275,8 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) -> -spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(), terminus_durability(), filter(), properties()) -> {ok, link_ref()}. -attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) +attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, + Properties) when is_pid(Session) andalso is_binary(Name) andalso is_binary(Source) andalso diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index b66308a826b2..b5752e627e23 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -73,11 +73,14 @@ -type rcv_settle_mode() :: first | second. -type terminus_durability() :: none | configuration | unsettled_state. +-type terminus_capabilities() :: none | binary() | list(). -type target_def() :: #{address => link_address(), - durable => terminus_durability()}. + durable => terminus_durability(), + capabilities => terminus_capabilities()}. -type source_def() :: #{address => link_address(), - durable => terminus_durability()}. + durable => terminus_durability(), + capabilities => terminus_capabilities()}. -type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}. @@ -109,6 +112,7 @@ -export_type([snd_settle_mode/0, rcv_settle_mode/0, terminus_durability/0, + terminus_capabilities/0, attach_args/0, attach_role/0, target_def/0, @@ -713,20 +717,24 @@ make_source(#{role := {sender, _}}) -> make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) -> Durable = translate_terminus_durability(maps:get(durable, Source, none)), TranslatedFilter = translate_filters(Filter), + Capabilities = translate_terminus_capabilities(maps:get(capabilities, Source, none)), #'v1_0.source'{address = {utf8, Address}, durable = {uint, Durable}, - filter = TranslatedFilter}. + filter = TranslatedFilter, + capabilities = Capabilities}. make_target(#{role := {receiver, _Source, _Pid}}) -> #'v1_0.target'{}; make_target(#{role := {sender, #{address := Address} = Target}}) -> Durable = translate_terminus_durability(maps:get(durable, Target, none)), + Capabilities = translate_terminus_capabilities(maps:get(capabilities, Target, none)), TargetAddr = case is_binary(Address) of true -> {utf8, Address}; false -> Address end, #'v1_0.target'{address = TargetAddr, - durable = {uint, Durable}}. + durable = {uint, Durable}, + capabilities = Capabilities}. max_message_size(#{max_message_size := Size}) when is_integer(Size) andalso @@ -771,6 +779,19 @@ filter_value_type({T, _} = V) when is_atom(T) -> %% looks like an already tagged type, just pass it through V. +translate_terminus_capabilities(none) -> + undefined; +translate_terminus_capabilities(Capabilities) when is_binary(Capabilities) -> + {symbol, Capabilities}; +translate_terminus_capabilities(CapabilitiesList) when is_list(CapabilitiesList) -> + {array, symbol, [filter_capability(V) || V <- CapabilitiesList]}. + +filter_capability(V) when is_binary(V) -> + {symbol, V}; +filter_capability({T, _} = V) when is_atom(T) -> + %% looks like an already tagged type, just pass it through + V. + % https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html translate_legacy_amqp_headers_binding(LegacyHeaders) -> {map, @@ -847,7 +868,7 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _}, target = Target, max_message_size = MaxMessageSize}, ok = Send(Attach, State), - + Ref = make_link_ref(Role, self(), OutHandle), Link = #link{name = Name, ref = Ref, diff --git a/deps/amqp10_client/test/ibmmq_ct_helpers.erl b/deps/amqp10_client/test/ibmmq_ct_helpers.erl new file mode 100644 index 000000000000..38d075f2d3b1 --- /dev/null +++ b/deps/amqp10_client/test/ibmmq_ct_helpers.erl @@ -0,0 +1,89 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(ibmmq_ct_helpers). + +-include_lib("common_test/include/ct.hrl"). + +-export([setup_steps/0, + teardown_steps/0, + init_config/1, + start_ibmmq_server/1, + stop_ibmmq_server/1]). + +setup_steps() -> + [fun init_config/1, + fun start_ibmmq_server/1 + ]. + +teardown_steps() -> + [ + fun stop_ibmmq_server/1 + ]. + +init_config(Config) -> + NodeConfig = [{tcp_port_amqp, 5672}], + rabbit_ct_helpers:set_config(Config, [ {rmq_nodes, [NodeConfig]}, + {rmq_hostname, "localhost"}, + {tcp_hostname_amqp, "localhost"}, + {sasl, {plain, <<"app">>, <<"passw0rd">>}} ]). + +start_ibmmq_server(Config) -> + IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]), + Cmd = [IBMmqCmd, "start"], + ct:log("Running command ~p", [Cmd]), + case rabbit_ct_helpers:exec(Cmd, []) of + {ok, _} -> wait_for_ibmmq_nodes(Config); + Error -> ct:pal("Error: ~tp", [Error]), + {skip, "Failed to start IBM MQ"} + end. + +wait_for_ibmmq_nodes(Config) -> + Hostname = ?config(rmq_hostname, Config), + Ports = rabbit_ct_broker_helpers:get_node_configs(Config, tcp_port_amqp), + wait_for_ibmmq_ports(Config, Hostname, Ports). + +wait_for_ibmmq_ports(Config, Hostname, [Port | Rest]) -> + ct:log("Waiting for IBM MQ on port ~b", [Port]), + case wait_for_ibmmq_port(Hostname, Port, 60) of + ok -> + ct:log("IBM MQ ready on port ~b", [Port]), + wait_for_ibmmq_ports(Config, Hostname, Rest); + {error, _} -> + Msg = lists:flatten( + io_lib:format( + "Failed to start IBM MQ on port ~b; see IBM MQ logs", + [Port])), + ct:pal(?LOW_IMPORTANCE, Msg, []), + {skip, Msg} + end; +wait_for_ibmmq_ports(Config, _, []) -> + Config. + +wait_for_ibmmq_port(_, _, 0) -> + {error, econnrefused}; +wait_for_ibmmq_port(Hostname, Port, Retries) -> + case gen_tcp:connect(Hostname, Port, []) of + {ok, Connection} -> + gen_tcp:close(Connection), + ok; + {error, econnrefused} -> + timer:sleep(1000), + wait_for_ibmmq_port(Hostname, Port, Retries - 1); + Error -> + Error + end. + +stop_ibmmq_server(Config) -> + IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]), + Cmd = [IBMmqCmd, "stop"], + ct:log("Running command ~p", [Cmd]), + case rabbit_ct_helpers:exec(Cmd, []) of + {ok, _} -> Config; + Error -> ct:pal("Error: ~tp", [Error]), + {skip, "Failed to stop IBM MQ"} + end. diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 9b72dd4f6e5d..0a9dc569b064 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -21,17 +21,22 @@ suite() -> all() -> [ - {group, rabbitmq}, - {group, rabbitmq_strict}, - {group, activemq}, - {group, activemq_no_anon}, - {group, mock} +% {group, rabbitmq}, +% {group, rabbitmq_strict}, +% {group, activemq}, + {group, ibmmq} +% {group, activemq_no_anon}, +% {group, mock} ]. groups() -> [ {rabbitmq, [], shared()}, {activemq, [], shared()}, + {ibmmq, [], [ + open_close_connection, + basic_roundtrip_ibmmq + ]}, {rabbitmq_strict, [], [ basic_roundtrip_tls, roundtrip_tls_global_config, @@ -53,7 +58,10 @@ groups() -> {mock, [], [ insufficient_credit, incoming_heartbeat, - multi_transfer_without_delivery_id + multi_transfer_without_delivery_id, + set_receiver_capabilities, + set_sender_capabilities, + set_sender_sync_capabilities ]} ]. @@ -103,9 +111,9 @@ stop_amqp10_client_app(Config) -> %% ------------------------------------------------------------------- init_per_group(rabbitmq, Config0) -> - Config = rabbit_ct_helpers:set_config(Config0, - {sasl, {plain, <<"guest">>, <<"guest">>}}), + Config = rabbit_ct_helpers:set_config(Config0,{sasl, {plain, <<"guest">>, <<"guest">>}}), rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:setup_steps()); + init_per_group(rabbitmq_strict, Config0) -> Config = rabbit_ct_helpers:set_config(Config0, {sasl, {plain, <<"guest">>, <<"guest">>}}), @@ -117,6 +125,10 @@ init_per_group(activemq, Config0) -> Config = rabbit_ct_helpers:set_config(Config0, {sasl, anon}), rabbit_ct_helpers:run_steps(Config, activemq_ct_helpers:setup_steps("activemq.xml")); + +init_per_group(ibmmq, Config) -> + rabbit_ct_helpers:run_steps(Config, ibmmq_ct_helpers:setup_steps()); + init_per_group(activemq_no_anon, Config0) -> Config = rabbit_ct_helpers:set_config( Config0, {sasl, {plain, <<"user">>, <<"password">>}}), @@ -132,7 +144,9 @@ init_per_group(azure, Config) -> ]); init_per_group(mock, Config) -> rabbit_ct_helpers:set_config(Config, [{mock_port, 25000}, + {tcp_port_amqp, 25000}, {mock_host, "localhost"}, + {tcp_hostname_amqp, "localhost"}, {sasl, none} ]). end_per_group(rabbitmq, Config) -> @@ -141,6 +155,8 @@ end_per_group(rabbitmq_strict, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()); end_per_group(activemq, Config) -> rabbit_ct_helpers:run_steps(Config, activemq_ct_helpers:teardown_steps()); +end_per_group(ibmmq, Config) -> + rabbit_ct_helpers:run_steps(Config, ibmmq_ct_helpers:teardown_steps()); end_per_group(activemq_no_anon, Config) -> rabbit_ct_helpers:run_steps(Config, activemq_ct_helpers:teardown_steps()); end_per_group(_, Config) -> @@ -151,9 +167,11 @@ end_per_group(_, Config) -> %% ------------------------------------------------------------------- init_per_testcase(_Test, Config) -> + ct:log("Setting per test case"), case lists:keyfind(mock_port, 1, Config) of {_, Port} -> M = mock_server:start(Port), + ct:log("Setting mock server"), rabbit_ct_helpers:set_config(Config, {mock_server, M}); _ -> Config end. @@ -170,19 +188,20 @@ open_close_connection(Config) -> Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), %% an address list - OpnConf = #{addresses => [Hostname], + OpenConf = #{addresses => [Hostname], port => Port, notify => self(), container_id => <<"open_close_connection_container">>, sasl => ?config(sasl, Config)}, - {ok, Connection} = amqp10_client:open_connection(Hostname, Port), - {ok, Connection2} = amqp10_client:open_connection(OpnConf), + ct:log("Connecting to ~p", [OpenConf]), + {ok, Connection2} = amqp10_client:open_connection(OpenConf), receive - {amqp10_event, {connection, Connection2, opened}} -> ok + {amqp10_event, {connection, Connection2, opened}} -> ct:log("connection opened"), ok after 5000 -> exit(connection_timeout) end, + ct:log("Closing connection ..."), ok = amqp10_client:close_connection(Connection2), - ok = amqp10_client:close_connection(Connection). + ct:log("Closed connection ."). open_connection_plain_sasl(Config) -> Hostname = ?config(rmq_hostname, Config), @@ -251,7 +270,7 @@ basic_roundtrip(Config) -> application:start(sasl), Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - OpenConf = #{address => Hostname, port => Port, sasl => anon}, + OpenConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config)}, roundtrip(OpenConf). basic_roundtrip_tls(Config) -> @@ -318,61 +337,107 @@ roundtrip_large_messages(Config) -> DataMb = rand:bytes(1024 * 1024), Data8Mb = rand:bytes(8 * 1024 * 1024), Data64Mb = rand:bytes(64 * 1024 * 1024), - ok = roundtrip(OpenConf, DataKb), - ok = roundtrip(OpenConf, DataMb), - ok = roundtrip(OpenConf, Data8Mb), - ok = roundtrip(OpenConf, Data64Mb). + ok = roundtrip(OpenConf, [{body, DataKb}]), + ok = roundtrip(OpenConf, [{body, DataMb}]), + ok = roundtrip(OpenConf, [{body, Data8Mb}]), + ok = roundtrip(OpenConf, [{body, Data64Mb}]). + +basic_roundtrip_ibmmq(Config) -> + application:start(sasl), + Hostname = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + OpenConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config)}, + roundtrip(OpenConf, [ + {body, <<"banana">>}, + {destination, <<"DEV.QUEUE.3">>}, + {sender_capabilities, <<"queue">>}, + {receiver_capabilities, <<"queue">>}, + {message_annotations, #{}} + ]). roundtrip(OpenConf) -> - roundtrip(OpenConf, <<"banana">>). + roundtrip(OpenConf, []). + +roundtrip(OpenConf, Args) -> + Body = proplists:get_value(body, Args, <<"banana">>), + Destination = proplists:get_value(destination, Args, <<"test1">>), + SenderCapabilities = proplists:get_value(sender_capabilities, Args, <<>>), + ReceiverCapabilities = proplists:get_value(receiver_capabilities, Args, <<>>), + MessageAnnotations = proplists:get_value(message_annotations, Args, + #{<<"x-key">> => <<"x-value">>, + <<"x_key">> => <<"x_value">>}), -roundtrip(OpenConf, Body) -> {ok, Connection} = amqp10_client:open_connection(OpenConf), {ok, Session} = amqp10_client:begin_session(Connection), - {ok, Sender} = amqp10_client:attach_sender_link( - Session, <<"banana-sender">>, <<"test1">>, settled, unsettled_state), - await_link(Sender, credited, link_credit_timeout), + SenderAttachArgs = #{name => <<"banana-sender">>, + role => {sender, #{address => Destination, + durable => unsettled_state, + capabilities => SenderCapabilities}}, + snd_settle_mode => settled, + rcv_settle_mode => first, + filter => #{}, + properties => #{} + }, + {ok, Sender} = amqp10_client:attach_link(Session, SenderAttachArgs), + %%await_link(Sender, credited, link_credit_timeout), + await_link(Sender, attached, attached_timeout), Now = os:system_time(millisecond), - Props = #{creation_time => Now, - message_id => <<"my message ID">>, - correlation_id => <<"my correlation ID">>, + Props = #{content_encoding => <<"my content encoding">>, content_type => <<"my content type">>, - content_encoding => <<"my content encoding">>, - group_id => <<"my group ID">>}, + correlation_id => <<"my correlation ID">>, + creation_time => Now, + group_id => <<"my group ID">>, + message_id => <<"my message ID">>, + to => <<"localhost">> + }, Msg0 = amqp10_msg:new(<<"my-tag">>, Body, true), Msg1 = amqp10_msg:set_application_properties(#{"a_key" => "a_value"}, Msg0), Msg2 = amqp10_msg:set_properties(Props, Msg1), - Msg = amqp10_msg:set_message_annotations(#{<<"x-key">> => "x-value", - <<"x_key">> => "x_value"}, Msg2), + Msg = amqp10_msg:set_message_annotations(MessageAnnotations, Msg2), ok = amqp10_client:send_msg(Sender, Msg), ok = amqp10_client:detach_link(Sender), await_link(Sender, {detached, normal}, link_detach_timeout), {error, link_not_found} = amqp10_client:detach_link(Sender), - {ok, Receiver} = amqp10_client:attach_receiver_link( - Session, <<"banana-receiver">>, <<"test1">>, settled, unsettled_state), + ReceiverAttachArgs = #{ + name => <<"banana-receiver">>, + role => {receiver, #{address => Destination, + durable => unsettled_state, + capabilities => ReceiverCapabilities}, self()}, + snd_settle_mode => settled, + rcv_settle_mode => first, + filter => #{}, + properties => #{} + }, + {ok, Receiver} = amqp10_client:attach_link(Session, ReceiverAttachArgs), {ok, OutMsg} = amqp10_client:get_msg(Receiver, 4 * 60_000), ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection), % ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]), - ?assertMatch(Props, amqp10_msg:properties(OutMsg)), + ActualProps = amqp10_msg:properties(OutMsg), + [ ?assertEqual(V, maps:get(K, ActualProps)) || K := V <- Props, K =/= creation_time], + ?assertEqual(#{<<"a_key">> => <<"a_value">>}, amqp10_msg:application_properties(OutMsg)), - ?assertMatch(#{<<"x-key">> := <<"x-value">>, - <<"x_key">> := <<"x_value">>}, amqp10_msg:message_annotations(OutMsg)), + ActualMessageAnnotations = amqp10_msg:message_annotations(OutMsg), + [ ?assertEqual(V, maps:get(K, ActualMessageAnnotations)) || K := V <- MessageAnnotations], + ?assertEqual([Body], amqp10_msg:body(OutMsg)), ok. filtered_roundtrip(OpenConf) -> - filtered_roundtrip(OpenConf, <<"banana">>). + filtered_roundtrip(OpenConf, []). + +filtered_roundtrip(OpenConf, Args) -> + Body = proplists:get_value(body, Args, <<"banana">>), + Destination = proplists:get_value(destination, Args, <<"test1">>), -filtered_roundtrip(OpenConf, Body) -> {ok, Connection} = amqp10_client:open_connection(OpenConf), {ok, Session} = amqp10_client:begin_session(Connection), {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"default-sender">>, - <<"test1">>, + Destination, settled, unsettled_state), await_link(Sender, credited, link_credit_timeout), @@ -384,7 +449,7 @@ filtered_roundtrip(OpenConf, Body) -> {ok, DefaultReceiver} = amqp10_client:attach_receiver_link(Session, <<"default-receiver">>, - <<"test1">>, + Destination, settled, unsettled_state), ok = amqp10_client:send_msg(Sender, Msg1), @@ -400,12 +465,14 @@ filtered_roundtrip(OpenConf, Body) -> ok = amqp10_client:send_msg(Sender, Msg2), + SelectorFilter = #{<<"apache.org:selector-filter:string">> => + <<"amqp.annotation.x-opt-enqueuedtimeutc > ", Now2Binary/binary>>}, {ok, FilteredReceiver} = amqp10_client:attach_receiver_link(Session, <<"filtered-receiver">>, - <<"test1">>, + Destination, settled, unsettled_state, - #{<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > ", Now2Binary/binary>>}), + SelectorFilter), {ok, OutMsg2} = amqp10_client:get_msg(DefaultReceiver, 60_000 * 4), ?assertEqual(<<"msg-2-tag">>, amqp10_msg:delivery_tag(OutMsg2)), @@ -708,6 +775,7 @@ subscribe_with_auto_flow_unsettled(Config) -> ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection). + insufficient_credit(Config) -> Hostname = ?config(mock_host, Config), Port = ?config(mock_port, Config), @@ -811,6 +879,188 @@ multi_transfer_without_delivery_id(Config) -> ok = amqp10_client:close_connection(Connection), ok. +set_receiver_capabilities(Config) -> + Hostname = ?config(tcp_hostname_amqp, Config), + Port = ?config(tcp_port_amqp, Config), + +% Hostname = ?config(mock_host, Config), +% Port = ?config(mock_port, Config), + + OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> + {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} + end, + BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, 1}, + next_outgoing_id = {uint, 1}, + incoming_window = {uint, 1000}, + outgoing_window = {uint, 1000}} + ]} + end, + AttachStep = fun({1 = Ch, #'v1_0.attach'{role = true, + name = Name, + source = #'v1_0.source'{ + capabilities = {symbol, <<"capability-1">>}}}, <<>>}) -> + {Ch, [#'v1_0.attach'{name = Name, + handle = {uint, 99}, + initial_delivery_count = {uint, 1}, + role = false} + ]} + end, + + LinkCreditStep = fun({1 = Ch, #'v1_0.flow'{}, <<>>}) -> + {Ch, {multi, [[#'v1_0.transfer'{handle = {uint, 99}, + delivery_id = {uint, 12}, + more = true}, + #'v1_0.data'{content = <<"hello ">>}], + [#'v1_0.transfer'{handle = {uint, 99}, + % delivery_id can be omitted + % for continuation frames + delivery_id = undefined, + settled = undefined, + more = false}, + #'v1_0.data'{content = <<"world">>}] + ]}} + end, + Steps = [fun mock_server:recv_amqp_header_step/1, + fun mock_server:send_amqp_header_step/1, + mock_server:amqp_step(OpenStep), + mock_server:amqp_step(BeginStep), + mock_server:amqp_step(AttachStep), + mock_server:amqp_step(LinkCreditStep) + ], + + ok = mock_server:set_steps(?config(mock_server, Config), Steps), + + Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()}, + {ok, Connection} = amqp10_client:open_connection(Cfg), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + AttachArgs = #{name => <<"mock1-received">>, + role => {receiver, #{address => <<"test">>, + durable => none, + capabilities => <<"capability-1">>}, self()}, + snd_settle_mode => setlled, + rcv_settle_mode => first, + filter => #{}, + properties => #{}}, + {ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs), + amqp10_client:flow_link_credit(Receiver, 100, 50), + receive + {amqp10_msg, Receiver, _InMsg} -> + ok + after 2000 -> + exit(delivery_timeout) + end, + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection), + ok. + +set_sender_capabilities(Config) -> + Hostname = ?config(tcp_hostname_amqp, Config), + Port = ?config(tcp_port_amqp, Config), + OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> + {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} + end, + BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, 1}, + next_outgoing_id = {uint, 1}, + incoming_window = {uint, 1000}, + outgoing_window = {uint, 1000}} + ]} + end, + AttachStep = fun({1 = Ch, #'v1_0.attach'{role = false, + name = Name, + source = #'v1_0.source'{ + + }, + target = #'v1_0.target'{ + capabilities = {symbol, <<"capability-1">>}}}, <<>>}) -> + {Ch, [#'v1_0.attach'{name = Name, + handle = {uint, 99}, + role = true}]} + end, + Steps = [fun mock_server:recv_amqp_header_step/1, + fun mock_server:send_amqp_header_step/1, + mock_server:amqp_step(OpenStep), + mock_server:amqp_step(BeginStep), + mock_server:amqp_step(AttachStep)], + + ok = mock_server:set_steps(?config(mock_server, Config), Steps), + + Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()}, + {ok, Connection} = amqp10_client:open_connection(Cfg), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + AttachArgs = #{name => <<"mock1-sender">>, + role => {sender, #{address => <<"test">>, + durable => none, + capabilities => <<"capability-1">>}}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, Sender} = amqp10_client:attach_link(Session, AttachArgs), + await_link(Sender, attached, attached_timeout), + Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true), + {error, insufficient_credit} = amqp10_client:send_msg(Sender, Msg), + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection), + ok. + + +set_sender_sync_capabilities(Config) -> + Hostname = ?config(tcp_hostname_amqp, Config), + Port = ?config(tcp_port_amqp, Config), + + OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> + {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} + end, + BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, 1}, + next_outgoing_id = {uint, 1}, + incoming_window = {uint, 1000}, + outgoing_window = {uint, 1000}} + ]} + end, + AttachStep = fun({1 = Ch, #'v1_0.attach'{role = false, + name = Name, + source = #'v1_0.source'{ + + }, + target = #'v1_0.target'{ + capabilities = {array, symbol, [ + {symbol,<<"capability-1">>}, + {symbol,<<"capability-2">>} + ]} + }}, <<>>}) -> + {Ch, [#'v1_0.attach'{name = Name, + handle = {uint, 99}, + role = true}]} + end, + Steps = [fun mock_server:recv_amqp_header_step/1, + fun mock_server:send_amqp_header_step/1, + mock_server:amqp_step(OpenStep), + mock_server:amqp_step(BeginStep), + mock_server:amqp_step(AttachStep)], + + ok = mock_server:set_steps(?config(mock_server, Config), Steps), + + Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()}, + {ok, Connection} = amqp10_client:open_connection(Cfg), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + AttachArgs = #{name => <<"mock1-sender">>, + role => {sender, #{address => <<"test">>, + durable => none, + capabilities => [<<"capability-1">>,<<"capability-2">>]}}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, Sender} = amqp10_client:attach_link(Session, AttachArgs), + await_link(Sender, attached, attached_timeout), + Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true), + {error, insufficient_credit} = amqp10_client:send_msg(Sender, Msg), + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection), + ok. + outgoing_heartbeat(Config) -> Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), diff --git a/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner new file mode 100755 index 000000000000..9515cdbb9cbc --- /dev/null +++ b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner @@ -0,0 +1,99 @@ +#!/bin/bash + +SCRIPT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +set -u + +# Image built by CI workflow .github/workflows/ibm-mq-make.yaml +# To use a newer vrsion of the IBM MQ server, bump up the version +# in the CI workflow and the IMAGE_TAG variable below +IMAGE=pivotalrabbitmq/ibm-mqadvanced-server-dev +IMAGE_TAG=9.3.5.1-amd64 + +kill_container_if_exist() { + if docker stop $1 &> /dev/null; then + docker rm $1 &> /dev/null + fi +} + +ensure_docker_image_exists() { + TAG=`docker images --filter reference=$IMAGE | grep $IMAGE_TAG` + if [ -z ${TAG+x} ] + then + echo "Docker image ${IMAGE}:${IMAGE_TAG} does not exist" + exit 1 + else + echo "Docker image ${IMAGE}:${IMAGE_TAG} ready" + fi +} + +wait_for_message() { + attemps_left=10 + while ! docker logs $1 2>&1 | grep -q "$2"; + do + sleep 5 + print "Waiting 5sec for $1 to start ($attemps_left attempts left )..." + ((attemps_left--)) + if [[ "$attemps_left" -lt 1 ]]; then + print "Timed out waiting" + save_container_log $1 + exit 1 + fi + done +} +declare -i PADDING_LEVEL=0 + +print() { + tabbing="" + if [[ $PADDING_LEVEL -gt 0 ]]; then + for i in $(seq $PADDING_LEVEL); do + tabbing="$tabbing\t" + done + fi + echo -e "$tabbing$1" +} + +invoke_start(){ + kill_container_if_exist ibmmq + ensure_docker_image_exists + + docker run --name ibmmq \ + --env LICENSE=accept \ + --env MQ_QMGR_NAME=QM1 \ + --env MQ_APP_PASSWORD=passw0rd \ + --env MQ_ADMIN_PASSWORD=passw0rd \ + --publish 1414:1414 \ + --publish 9443:9443 \ + --publish 5672:5672 \ + --detach \ + $IMAGE:$IMAGE_TAG + wait_for_message ibmmq "The listener 'SYSTEM.LISTENER.TCP.1' has started." + wait_for_message ibmmq "Successfully loaded default keystore" + + docker exec ibmmq bash -c 'echo "SET CHLAUTH(SYSTEM.DEF.AMQP) TYPE(ADDRESSMAP) ADDRESS(*) MCAUSER(app)" | /opt/mqm/bin/runmqsc QM1' + docker exec ibmmq bash -c 'echo "STOP SERVICE(SYSTEM.AMQP.SERVICE)" | /opt/mqm/bin/runmqsc QM1' + docker exec ibmmq bash -c 'echo "START SERVICE(SYSTEM.AMQP.SERVICE)" | /opt/mqm/bin/runmqsc QM1' + docker exec ibmmq bash -c 'echo "START CHANNEL(SYSTEM.DEF.AMQP)" | /opt/mqm/bin/runmqsc QM1' + wait_for_message ibmmq "The Server 'SYSTEM.AMQP.SERVICE' has started" + sleep 5 +} + +invoke_stop(){ + kill_container_if_exist ibmmq +} + +case "$1" in + version) + echo "IBM MQ ${IMAGE}:${IMAGE_TAG}" + ;; + build) + build_docker_image + ;; + start) + invoke_start + ;; + stop) + invoke_stop + exit $? + ;; +esac