diff --git a/.github/workflows/ibm-mq-make.yaml b/.github/workflows/ibm-mq-make.yaml index f23d8a437a2e..fe85416a0227 100644 --- a/.github/workflows/ibm-mq-make.yaml +++ b/.github/workflows/ibm-mq-make.yaml @@ -9,7 +9,7 @@ on: pull_request: paths: - '.github/workflows/ibm-mq-make.yaml' - + env: REGISTRY_IMAGE: pivotalrabbitmq/ibm-mqadvanced-server-dev IBM_MQ_REPOSITORY: ibm-messaging/mq-container @@ -19,13 +19,13 @@ jobs: docker: runs-on: ubuntu-latest steps: - + - name: Docker meta id: meta uses: docker/metadata-action@v5 with: images: ${{ env.REGISTRY_IMAGE }} - + - name: Set up QEMU uses: docker/setup-qemu-action@v3 @@ -39,9 +39,9 @@ jobs: repository: ${{ env.IBM_MQ_REPOSITORY }} ref: ${{ env.IBM_MQ_BRANCH_NAME }} - - name: Prepare image + - name: Prepare image run: | - ls + ls echo "Enabling AMQP capability" sed -i -e 's/genmqpkg_incamqp=0/genmqpkg_incamqp=1/g' Dockerfile-server echo "AMQP Bootstrap instructions" @@ -51,11 +51,12 @@ jobs: SET AUTHREC PROFILE('SYSTEM.BASE.TOPIC') PRINCIPAL('app') OBJTYPE(TOPIC) AUTHADD(PUB,SUB) SET AUTHREC PROFILE('SYSTEM.DEFAULT.MODEL.QUEUE') PRINCIPAL('app') OBJTYPE(QUEUE) AUTHADD(PUT,DSP) ALTER CHANNEL(SYSTEM.DEF.AMQP) CHLTYPE(AMQP) MCAUSER('app') + STOP SERVICE(SYSTEM.AMQP.SERVICE) START SERVICE(SYSTEM.AMQP.SERVICE) START CHANNEL(SYSTEM.DEF.AMQP) EOF - make build-devserver - docker tag ibm-mqadvanced-server-dev:${{ env.IMAGE_TAG }} ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }} + make build-devserver + docker tag ibm-mqadvanced-server-dev:${{ env.IMAGE_TAG }} ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }} - name: Login to Docker Hub uses: docker/login-action@v3 @@ -64,4 +65,4 @@ jobs: password: ${{ secrets.DOCKERHUB_PASSWORD }} - name: Push run: | - docker push ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }} \ No newline at end of file + docker push ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }} diff --git a/deps/amqp10_client/.gitignore b/deps/amqp10_client/.gitignore index ac3616494721..685d2fea4246 100644 --- a/deps/amqp10_client/.gitignore +++ b/deps/amqp10_client/.gitignore @@ -4,3 +4,5 @@ # Downloaded ActiveMQ. /test/system_SUITE_data/apache-activemq-* +/test/system_SUITE_data/ibmmq/mq-container +/test/system_SUITE_data/ibmmq/*.tar.gz diff --git a/deps/amqp10_client/BUILD.bazel b/deps/amqp10_client/BUILD.bazel index 6606efaf289c..bba484f0e35a 100644 --- a/deps/amqp10_client/BUILD.bazel +++ b/deps/amqp10_client/BUILD.bazel @@ -117,13 +117,14 @@ rabbitmq_integration_suite( size = "medium", additional_beam = [ "test/activemq_ct_helpers.beam", + "test/ibmmq_ct_helpers.beam", "test/mock_server.beam", ], data = [ - "@activemq//:exec_dir", + "@activemq//:exec_dir", ], test_env = { - "ACTIVEMQ": "$TEST_SRCDIR/$TEST_WORKSPACE/external/activemq/bin/activemq", + "ACTIVEMQ": "$TEST_SRCDIR/$TEST_WORKSPACE/external/activemq/bin/activemq" }, deps = TEST_DEPS, ) @@ -140,6 +141,7 @@ eunit( name = "eunit", compiled_suites = [ ":test_activemq_ct_helpers_beam", + ":test_ibmmq_ct_helpers_beam", ":test_mock_server_beam", ], target = ":test_erlang_app", diff --git a/deps/amqp10_client/app.bzl b/deps/amqp10_client/app.bzl index 8fcdad73cf9d..de6c84488338 100644 --- a/deps/amqp10_client/app.bzl +++ b/deps/amqp10_client/app.bzl @@ -126,6 +126,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"): app_name = "amqp10_client", erlc_opts = "//:test_erlc_opts", ) + erlang_bytecode( + name = "test_ibmmq_ct_helpers_beam", + testonly = True, + srcs = ["test/ibmmq_ct_helpers.erl"], + outs = ["test/ibmmq_ct_helpers.beam"], + app_name = "amqp10_client", + erlc_opts = "//:test_erlc_opts", + ) erlang_bytecode( name = "test_mock_server_beam", testonly = True, diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index e296d3ff8533..0eafc16ab936 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -164,6 +164,8 @@ end_session(Pid) -> %% @doc Synchronously attach a link on 'Session'. %% This is a convenience function that awaits attached event %% for the link before returning. +-spec attach_sender_link_sync(pid(), binary(), binary()) -> + {ok, link_ref()} | link_timeout. attach_sender_link_sync(Session, Name, Target) -> attach_sender_link_sync(Session, Name, Target, mixed). @@ -275,7 +277,8 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) -> -spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(), terminus_durability(), filter(), properties()) -> {ok, link_ref()}. -attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) +attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, + Properties) when is_pid(Session) andalso is_binary(Name) andalso is_binary(Source) andalso diff --git a/deps/amqp10_client/src/amqp10_client_connection.erl b/deps/amqp10_client/src/amqp10_client_connection.erl index 8fbcb22f3d1b..32eb996da686 100644 --- a/deps/amqp10_client/src/amqp10_client_connection.erl +++ b/deps/amqp10_client/src/amqp10_client_connection.erl @@ -228,20 +228,27 @@ sasl_hdr_rcvds({call, From}, begin_session, sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, 0}}, #state{socket = Socket} = State) -> + logger:warning("sasl_init_sent received v1_0.sasl_outcome"), ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER), + logger:warning("sasl_init_sent socket_send AMQP_PROTOCOL_HEADER ok"), {next_state, hdr_sent, State}; sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, C}}, #state{} = State) when C==1;C==2;C==3;C==4 -> + logger:warning("sasl_init_sent received sasl_auth_failure"), {stop, sasl_auth_failure, State}; sasl_init_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> + logger:warning("sasl_init_sent call to begin_session"), State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, {keep_state, State1}. hdr_sent(_EvtType, {protocol_header_received, 0, 1, 0, 0}, State) -> + logger:warning("hdr_sent received {protocol_header_received"), case send_open(State) of ok -> {next_state, open_sent, State}; - Error -> {stop, Error, State} + Error -> + logger:warning("client_connection hdr_sent ~p", [Error]), + {stop, Error, State} end; hdr_sent(_EvtType, {protocol_header_received, Protocol, Maj, Min, Rev}, State) -> @@ -250,6 +257,7 @@ hdr_sent(_EvtType, {protocol_header_received, Protocol, Maj, Min, {stop, normal, State}; hdr_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> + logger:warning("hdr_sent received call begin_session"), State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, {keep_state, State1}. @@ -257,6 +265,7 @@ open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize, idle_time_out = Timeout} = Open, #state{pending_session_reqs = PendingSessionReqs, config = Config} = State0) -> + logger:warning("open_sent received 'v1_0.open' with pending_session_reqs: ~p", [PendingSessionReqs]), State = case Timeout of undefined -> State0; {uint, T} when T > 0 -> @@ -283,13 +292,16 @@ open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize, {next_state, opened, State2#state{pending_session_reqs = []}}; open_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> + logger:warning("open_sent received call begin_session with pending_session_reqs: ~p", [PendingSessionReqs]), State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, {keep_state, State1}; open_sent(info, {'DOWN', MRef, _, _, _}, #state{reader_m_ref = MRef}) -> + logger:warning("open_sent received 'DOWN"), {stop, {shutdown, reader_down}}. opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) -> + logger:warning("opened received heartbeat"), ok = send_heartbeat(State), {ok, Tmr} = start_heartbeat_timer(T), {keep_state, State#state{heartbeat_timer = Tmr}}; @@ -308,11 +320,14 @@ opened(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) -> _ = send_close(State, none), {stop, normal, State}; opened({call, From}, begin_session, State) -> + logger:warning("opened received call being_session"), {Ret, State1} = handle_begin_session(From, State), + logger:warning("handle_begin_session ret: ~p", [Ret]), {keep_state, State1, [{reply, From, Ret}]}; opened(info, {'DOWN', MRef, _, _, _Info}, State = #state{reader_m_ref = MRef, config = Config}) -> %% reader has gone down and we are not already shutting down + logger:warning("opened info received 'DOWN'"), ok = notify_closed(Config, shutdown), {stop, normal, State}; opened(_EvtType, Frame, State) -> @@ -328,11 +343,13 @@ close_sent(_EvtType, {'EXIT', _Pid, shutdown}, State) -> close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _}, #state{reader = ReaderPid} = State) -> %% if the reader exits we probably wont receive a close frame + logger:warning("client_connection close_sent( DOWN"), {stop, normal, State}; close_sent(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) -> ok = notify_closed(Config, Close), %% TODO: we should probably set up a timer before this to ensure %% we close down event if no reply is received + logger:warning("client_connection close_sent( v1_0.close"), {stop, normal, State}. set_other_procs0(OtherProcs, State) -> @@ -450,6 +467,7 @@ send_close(#state{socket = Socket}, _Reason) -> ok; _ -> ok end, + logger:warning("client_connetion send_close Ret: ~p", [Ret]), Ret. send_sasl_init(State, anon) -> @@ -468,7 +486,10 @@ send_sasl_init(State, {plain, User, Pass}) -> Response = <<0:8, User/binary, 0:8, Pass/binary>>, Frame = #'v1_0.sasl_init'{mechanism = {symbol, <<"PLAIN">>}, initial_response = {binary, Response}}, - send(Frame, 1, State). + logger:warning("send_sasl_init send v1_0.sasl_init ~p ~p", [User, Pass]), + Ret = send(Frame, 1, State), + logger:warning("send_sasl_init ~p ~p Ret : ~p", [User, Pass, Ret]), + Ret. send(Record, FrameType, #state{socket = Socket}) -> Encoded = amqp10_framing:encode_bin(Record), diff --git a/deps/amqp10_client/src/amqp10_client_frame_reader.erl b/deps/amqp10_client/src/amqp10_client_frame_reader.erl index 364748b16c85..fe314092e3ca 100644 --- a/deps/amqp10_client/src/amqp10_client_frame_reader.erl +++ b/deps/amqp10_client/src/amqp10_client_frame_reader.erl @@ -152,6 +152,7 @@ handle_event(cast, {unregister_session, _Session, OutgoingChannel, IncomingChann incoming_channels = IncomingChannels1}, {keep_state, State1}; handle_event(cast, close, _StateName, State = #state{socket = Socket}) -> + logger:warning("frame_reader handle_event cast close"), _ = close_socket(Socket), {stop, normal, State#state{socket = undefined}}; @@ -167,11 +168,13 @@ handle_event(info, {Tcp, _, Packet}, StateName, #state{buffer = Buffer} = State) set_active_once(NewState), {next_state, NextState, NewState#state{buffer = Remaining}}; {error, Reason, NewState} -> + logger:warning("frame_reader info handle_input error ~p", [Reason]), {stop, Reason, NewState} end; handle_event(info, {TcpError, _, Reason}, StateName, State) when TcpError == tcp_error orelse TcpError == ssl_error -> + logger:warning("frame_reader handle_event info TcpError: ~p", [TcpError]), logger:warning("AMQP 1.0 connection socket errored, connection state: '~ts', reason: '~tp'", [StateName, Reason]), State1 = State#state{socket = undefined, @@ -180,6 +183,7 @@ handle_event(info, {TcpError, _, Reason}, StateName, State) {stop, {error, Reason}, State1}; handle_event(info, {TcpClosed, _}, StateName, State) when TcpClosed == tcp_closed orelse TcpClosed == ssl_closed -> + logger:warning("frame_reader handle_event info TcpClosed: ~p", [TcpClosed]), logger:warning("AMQP 1.0 connection socket was closed, connection state: '~ts'", [StateName]), State1 = State#state{socket = undefined, diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 7e2c82560398..12f90af6d4bd 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -70,11 +70,14 @@ -type input_handle() :: link_handle(). -type terminus_durability() :: none | configuration | unsettled_state. +-type terminus_capabilities() :: binary() | [binary(),...]. -type target_def() :: #{address => link_address(), - durable => terminus_durability()}. + durable => terminus_durability(), + capabilities => terminus_capabilities()}. -type source_def() :: #{address => link_address(), - durable => terminus_durability()}. + durable => terminus_durability(), + capabilities => terminus_capabilities()}. -type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}. @@ -188,7 +191,10 @@ begin_sync(Connection, Timeout) -> -spec attach(pid(), attach_args()) -> {ok, link_ref()}. attach(Session, Args) -> - gen_statem:call(Session, {attach, Args}, ?TIMEOUT). + logger:warning("amqp10_session: call_session attach ~p", [Args]), + Ret = gen_statem:call(Session, {attach, Args}, ?TIMEOUT), + logger:warning("amqp10_session: call_session attach returned ~p", [Ret]), + Ret. -spec detach(pid(), output_handle()) -> ok | {error, link_not_found | half_attached}. detach(Session, Handle) -> @@ -243,11 +249,13 @@ init([FromPid, Channel, Reader, ConnConfig]) -> {ok, unmapped, State}. unmapped(cast, {socket_ready, Socket}, State) -> + logger:warning("unmapped socket_ready calling send_begin"), State1 = State#state{socket = Socket}, ok = send_begin(State1), {next_state, begin_sent, State1}; unmapped({call, From}, {attach, Attach}, #state{early_attach_requests = EARs} = State) -> + logger:warning("on unmapped state, received call attach. Storing it in early_attach_requests"), {keep_state, State#state{early_attach_requests = [{From, Attach} | EARs]}}. @@ -256,10 +264,13 @@ begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, incoming_window = {uint, InWindow}, outgoing_window = {uint, OutWindow}} = Begin, #state{early_attach_requests = EARs} = State) -> - + logger:warning("on state begin_sent, received v1_0.beging with remote channlel : ~p", [RemoteChannel]), State1 = State#state{remote_channel = RemoteChannel}, + logger:warning("sending early attach requests ~p", [EARs]), State2 = lists:foldr(fun({From, Attach}, S) -> + logger:warning("send early attach request ~tp", [Attach]), {S2, H} = send_attach(fun send/2, Attach, From, S), + logger:warning("sent attach request ~p with result ~p", [Attach, H]), gen_statem:reply(From, {ok, H}), S2 end, State1, EARs), @@ -304,7 +315,7 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name}, max_message_size = MaybeMaxMessageSize} = Attach, #state{links = Links, link_index = LinkIndex, link_handle_index = LHI} = State0) -> - + logger:warning("on mapped state, received v1_0.attach frame"), OurRoleBool = not PeerRoleBool, OurRole = boolean_to_role(OurRoleBool), LinkIndexKey = {OurRole, Name}, @@ -537,6 +548,7 @@ mapped({call, From}, {keep_state, State, {reply, From, Res}}; mapped({call, From}, {attach, Attach}, State) -> + logger:warning("on mapped state, received call to attach ~p", [Attach]), {State1, LinkRef} = send_attach(fun send/2, Attach, From, State), {keep_state, State1, {reply, From, {ok, LinkRef}}}; @@ -579,8 +591,12 @@ send_begin(#state{socket = Socket, Begin = #'v1_0.begin'{next_outgoing_id = uint(NextOutId), incoming_window = uint(InWin), outgoing_window = ?UINT_OUTGOING_WINDOW}, + logger:warning("send_begin encode_frame ..."), Frame = encode_frame(Begin, State), - socket_send(Socket, Frame). + logger:warning("send_begin encoded frame ~p", [Frame]), + Ret = socket_send(Socket, Frame), + logger:warning("socket_send ~p", [Ret]), + Ret. send_end(State) -> send_end(State, undefined). @@ -706,20 +722,38 @@ make_source(#{role := {sender, _}}) -> make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) -> Durable = translate_terminus_durability(maps:get(durable, Source, none)), TranslatedFilter = translate_filters(Filter), - #'v1_0.source'{address = {utf8, Address}, + try translate_terminus_capabilities(maps:get(capabilities, Source, [])) of + Capabilities -> + logger:warning("make_source capabilities : ~p", [Capabilities]), + #'v1_0.source'{address = {utf8, Address}, durable = {uint, Durable}, - filter = TranslatedFilter}. + filter = TranslatedFilter, + capabilities = Capabilities} + catch + throw:Err -> + logger:warning("make_source failed due to ~p", [Err]), + {error, Err} + end. make_target(#{role := {receiver, _Source, _Pid}}) -> #'v1_0.target'{}; make_target(#{role := {sender, #{address := Address} = Target}}) -> Durable = translate_terminus_durability(maps:get(durable, Target, none)), - TargetAddr = case is_binary(Address) of - true -> {utf8, Address}; - false -> Address - end, - #'v1_0.target'{address = TargetAddr, - durable = {uint, Durable}}. + try translate_terminus_capabilities(maps:get(capabilities, Target, [])) of + Capabilities -> + logger:warning("make_target capabilities : ~p", [Capabilities]), + TargetAddr = case is_binary(Address) of + true -> {utf8, Address}; + false -> Address + end, + #'v1_0.target'{address = TargetAddr, + durable = {uint, Durable}, + capabilities = Capabilities} + catch + throw:Err -> + logger:warning("make_target failed due to ~p", [Err]), + {error, Err} + end. max_message_size(#{max_message_size := Size}) when is_integer(Size) andalso @@ -762,6 +796,13 @@ filter_value_type({T, _} = V) when is_atom(T) -> %% looks like an already tagged type, just pass it through V. +translate_terminus_capabilities(Capabilities) when is_binary(Capabilities) -> + {symbol, Capabilities}; +translate_terminus_capabilities(CapabilitiesList) when is_list(CapabilitiesList) -> + {array, symbol, [{symbol, V} || V <- CapabilitiesList, is_binary(V)]}; +translate_terminus_capabilities(_) -> + []. + % https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html translate_legacy_amqp_headers_binding(LegacyHeaders) -> {map, @@ -805,8 +846,11 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _}, #state{next_link_handle = OutHandle0, links = Links, link_index = LinkIndex} = State) -> + logger:warning("make_source ..."), Source = make_source(Args), + logger:warning("make_target ..."), Target = make_target(Args), + logger:warning("make properties ..."), Properties = amqp10_client_types:make_properties(Args), {LinkTarget, InitialDeliveryCount, MaxMessageSize} = @@ -837,7 +881,10 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _}, rcv_settle_mode = rcv_settle_mode(Args), target = Target, max_message_size = MaxMessageSize}, + + logger:warning("sending ..."), ok = Send(Attach, State), + logger:warning("sent ..."), Ref = make_link_ref(Role, self(), OutHandle), Link = #link{name = Name, @@ -1170,7 +1217,8 @@ amqp10_session_event(Evt) -> socket_send(Sock, Data) -> case socket_send0(Sock, Data) of ok -> ok; - {error, _Reason} -> + {error, Reason} -> + logger:warning("socket_send ~p", [Reason]), throw({stop, normal}) end. diff --git a/deps/amqp10_client/test/activemq_ct_helpers.erl b/deps/amqp10_client/test/activemq_ct_helpers.erl index ba1b7fe5721e..2c0ebb5ae845 100644 --- a/deps/amqp10_client/test/activemq_ct_helpers.erl +++ b/deps/amqp10_client/test/activemq_ct_helpers.erl @@ -63,6 +63,7 @@ start_activemq_nodes(Config) -> ActivemqCmd = ?config(activemq_cmd, Config1), TCPPort = rabbit_ct_broker_helpers:get_node_config(Config1, 0, tcp_port_amqp), ConfigFile = ?config(activemq_config_filename, Config1), + ct:log("Running ~p", [ActivemqCmd]), Cmd = [ActivemqCmd, "start", {"-Dtestsuite.tcp_port_amqp=~b", [TCPPort]}, diff --git a/deps/amqp10_client/test/ibmmq_ct_helpers.erl b/deps/amqp10_client/test/ibmmq_ct_helpers.erl new file mode 100644 index 000000000000..aa64bcdb6dd3 --- /dev/null +++ b/deps/amqp10_client/test/ibmmq_ct_helpers.erl @@ -0,0 +1,100 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(ibmmq_ct_helpers). + +-include_lib("common_test/include/ct.hrl"). + +-export([setup_steps/0, + teardown_steps/0, + init_config/1, + capture_logs/1, + start_ibmmq_server/1, + stop_ibmmq_server/1]). + +setup_steps() -> + [fun init_config/1, + fun start_ibmmq_server/1 + ]. + +teardown_steps() -> + [ + fun stop_ibmmq_server/1 + ]. + +init_config(Config) -> + NodeConfig = [{tcp_port_amqp, 5672}], + rabbit_ct_helpers:set_config(Config, [ {rmq_nodes, [NodeConfig]}, + {rmq_hostname, "localhost"}, + {tcp_hostname_amqp, "localhost"}, + {sasl, {plain, <<"app">>, <<"passw0rd">>}} ]). + +start_ibmmq_server(Config) -> + IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]), + Cmd = [IBMmqCmd, "start"], + ct:log("Running command ~p", [Cmd]), + case rabbit_ct_helpers:exec(Cmd, []) of + {ok, _} -> wait_for_ibmmq_nodes(Config); + Error -> ct:pal("Error: ~tp", [Error]), + {skip, "Failed to start IBM MQ"} + end. + +wait_for_ibmmq_nodes(Config) -> + Hostname = ?config(rmq_hostname, Config), + Ports = rabbit_ct_broker_helpers:get_node_configs(Config, tcp_port_amqp), + wait_for_ibmmq_ports(Config, Hostname, Ports). + +wait_for_ibmmq_ports(Config, Hostname, [Port | Rest]) -> + ct:log("Waiting for IBM MQ on port ~b", [Port]), + case wait_for_ibmmq_port(Hostname, Port, 60) of + ok -> + ct:log("IBM MQ ready on port ~b", [Port]), + wait_for_ibmmq_ports(Config, Hostname, Rest); + {error, _} -> + Msg = lists:flatten( + io_lib:format( + "Failed to start IBM MQ on port ~b; see IBM MQ logs", + [Port])), + ct:pal(?LOW_IMPORTANCE, Msg, []), + {skip, Msg} + end; +wait_for_ibmmq_ports(Config, _, []) -> + Config. + +wait_for_ibmmq_port(_, _, 0) -> + {error, econnrefused}; +wait_for_ibmmq_port(Hostname, Port, Retries) -> + case gen_tcp:connect(Hostname, Port, []) of + {ok, Connection} -> + gen_tcp:close(Connection), + ok; + {error, econnrefused} -> + timer:sleep(1000), + wait_for_ibmmq_port(Hostname, Port, Retries - 1); + Error -> + Error + end. + +capture_logs(Config) -> + IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]), + Cmd = [IBMmqCmd, "logs", "/tmp/ibmmq.log"], + ct:log("Running command ~p", [Cmd]), + case rabbit_ct_helpers:exec(Cmd, []) of + {ok, _} -> Config; + Error -> ct:pal("Error: ~tp", [Error]), + {skip, "Failed to stop IBM MQ"} + end. + +stop_ibmmq_server(Config) -> + IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]), + Cmd = [IBMmqCmd, "stop"], + ct:log("Running command ~p", [Cmd]), + case rabbit_ct_helpers:exec(Cmd, []) of + {ok, _} -> Config; + Error -> ct:pal("Error: ~tp", [Error]), + {skip, "Failed to stop IBM MQ"} + end. diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 27a59d5efef8..d94ff74f4fb9 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -24,6 +24,7 @@ all() -> {group, rabbitmq}, {group, rabbitmq_strict}, {group, activemq}, + % {group, ibmmq}, {group, activemq_no_anon}, {group, mock} ]. @@ -32,6 +33,11 @@ groups() -> [ {rabbitmq, [], shared() ++ [notify_with_performative]}, {activemq, [], shared()}, + {ibmmq, [], [ + open_close_connection, + basic_roundtrip_with_sender_and_receiver_capabilities, + basic_roundtrip_with_non_binary_capability + ]}, {rabbitmq_strict, [], [ basic_roundtrip_tls, roundtrip_tls_global_config, @@ -53,7 +59,10 @@ groups() -> {mock, [], [ insufficient_credit, incoming_heartbeat, - multi_transfer_without_delivery_id + multi_transfer_without_delivery_id, + set_receiver_capabilities, + set_sender_capabilities, + set_sender_sync_capabilities ]} ]. @@ -61,6 +70,8 @@ shared() -> [ open_close_connection, basic_roundtrip, + basic_roundtrip_with_sender_and_receiver_capabilities, + basic_roundtrip_with_non_binary_capability, early_transfer, split_transfer, transfer_unsettled, @@ -122,6 +133,14 @@ init_per_group(activemq, Config0) -> Config = rabbit_ct_helpers:set_config(Config0, {sasl, anon}), rabbit_ct_helpers:run_steps(Config, activemq_ct_helpers:setup_steps("activemq.xml")); + +init_per_group(ibmmq, Config) -> + ct:log("Found arch: ~p", [erlang:system_info(system_architecture)]), + case string:find(erlang:system_info(system_architecture), "x86_64") of + nomatch -> {skip, no_arm64_docker_image_for_ibmmq}; + _ -> rabbit_ct_helpers:run_steps(Config, ibmmq_ct_helpers:setup_steps()) + end; + init_per_group(activemq_no_anon, Config0) -> Config = rabbit_ct_helpers:set_config( Config0, {sasl, {plain, <<"user">>, <<"password">>}}), @@ -137,7 +156,9 @@ init_per_group(azure, Config) -> ]); init_per_group(mock, Config) -> rabbit_ct_helpers:set_config(Config, [{mock_port, 25000}, + {tcp_port_amqp, 25000}, {mock_host, "localhost"}, + {tcp_hostname_amqp, "localhost"}, {sasl, none} ]). end_per_group(rabbitmq, Config) -> @@ -146,6 +167,8 @@ end_per_group(rabbitmq_strict, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()); end_per_group(activemq, Config) -> rabbit_ct_helpers:run_steps(Config, activemq_ct_helpers:teardown_steps()); +end_per_group(ibmmq, Config) -> + rabbit_ct_helpers:run_steps(Config, ibmmq_ct_helpers:teardown_steps()); end_per_group(activemq_no_anon, Config) -> rabbit_ct_helpers:run_steps(Config, activemq_ct_helpers:teardown_steps()); end_per_group(_, Config) -> @@ -156,9 +179,11 @@ end_per_group(_, Config) -> %% ------------------------------------------------------------------- init_per_testcase(_Test, Config) -> + ct:log("Setting per test case"), case lists:keyfind(mock_port, 1, Config) of {_, Port} -> M = mock_server:start(Port), + ct:log("Setting mock server"), rabbit_ct_helpers:set_config(Config, {mock_server, M}); _ -> Config end. @@ -175,19 +200,18 @@ open_close_connection(Config) -> Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), %% an address list - OpnConf = #{addresses => [Hostname], + OpenConf = #{addresses => [Hostname], port => Port, notify => self(), container_id => <<"open_close_connection_container">>, sasl => ?config(sasl, Config)}, - {ok, Connection} = amqp10_client:open_connection(Hostname, Port), - {ok, Connection2} = amqp10_client:open_connection(OpnConf), + ct:log("Connecting to ~p", [OpenConf]), + {ok, Connection2} = amqp10_client:open_connection(OpenConf), receive - {amqp10_event, {connection, Connection2, opened}} -> ok + {amqp10_event, {connection, Connection2, opened}} -> ct:log("connection opened"), ok after 5000 -> exit(connection_timeout) end, - ok = amqp10_client:close_connection(Connection2), - ok = amqp10_client:close_connection(Connection). + ok = amqp10_client:close_connection(Connection2). open_connection_plain_sasl(Config) -> Hostname = ?config(rmq_hostname, Config), @@ -256,7 +280,7 @@ basic_roundtrip(Config) -> application:start(sasl), Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - OpenConf = #{address => Hostname, port => Port, sasl => anon}, + OpenConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config)}, roundtrip(OpenConf). basic_roundtrip_tls(Config) -> @@ -323,54 +347,122 @@ roundtrip_large_messages(Config) -> DataMb = rand:bytes(1024 * 1024), Data8Mb = rand:bytes(8 * 1024 * 1024), Data64Mb = rand:bytes(64 * 1024 * 1024), - ok = roundtrip(OpenConf, DataKb), - ok = roundtrip(OpenConf, DataMb), - ok = roundtrip(OpenConf, Data8Mb), - ok = roundtrip(OpenConf, Data64Mb). + ok = roundtrip(OpenConf, [{body, DataKb}]), + ok = roundtrip(OpenConf, [{body, DataMb}]), + ok = roundtrip(OpenConf, [{body, Data8Mb}]), + ok = roundtrip(OpenConf, [{body, Data64Mb}]). + +basic_roundtrip_with_sender_and_receiver_capabilities(Config) -> + application:start(sasl), + Hostname = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + OpenConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config)}, + roundtrip(OpenConf, [ + {body, <<"banana">>}, + {destination, <<"DEV.QUEUE.3">>}, + {sender_capabilities, <<"queue">>}, + {receiver_capabilities, <<"queue">>}, + {message_annotations, #{}} + ], [creation_time]), + timer:sleep(20000). + +basic_roundtrip_with_non_binary_capability(Config) -> + application:start(sasl), + Hostname = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + OpenConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config)}, + roundtrip(OpenConf, [ + {body, <<"banana">>}, + {destination, <<"DEV.QUEUE.3">>}, + %{sender_capabilities, [<<"queue">>, dummy]}, + {sender_capabilities, [<<"queue">>]}, + {receiver_capabilities, <<"queue">>}, + {message_annotations, #{}} + ], [creation_time]). roundtrip(OpenConf) -> - roundtrip(OpenConf, <<"banana">>). + roundtrip(OpenConf, [], []). + +roundtrip(OpenConf, Args) -> + roundtrip(OpenConf, Args, []). + +roundtrip(OpenConf, Args, DoNotAssertMessageProperties) -> + Body = proplists:get_value(body, Args, <<"banana">>), + Destination = proplists:get_value(destination, Args, <<"test1">>), + SenderCapabilities = proplists:get_value(sender_capabilities, Args, <<>>), + ReceiverCapabilities = proplists:get_value(receiver_capabilities, Args, <<>>), + MessageAnnotations = proplists:get_value(message_annotations, Args, + #{<<"x-key">> => <<"x-value">>, + <<"x_key">> => <<"x_value">>}), -roundtrip(OpenConf, Body) -> {ok, Connection} = amqp10_client:open_connection(OpenConf), + receive + {amqp10_event, {connection, Connection, opened}} -> ok + after 5000 -> exit(connection_timeout) + end, {ok, Session} = amqp10_client:begin_session(Connection), - {ok, Sender} = amqp10_client:attach_sender_link( - Session, <<"banana-sender">>, <<"test1">>, settled, unsettled_state), - await_link(Sender, credited, link_credit_timeout), + receive + {amqp10_event, {session, Session, begun}} -> ok + after 5000 -> exit(connection_timeout) + end, + SenderAttachArgs = #{name => <<"banana-sender">>, + role => {sender, #{address => Destination, + durable => unsettled_state, + capabilities => SenderCapabilities}}, + snd_settle_mode => settled, + rcv_settle_mode => first, + filter => #{}, + properties => #{} + }, + {ok, Sender} = amqp10_client:attach_link(Session, SenderAttachArgs), + await_link(Sender, attached, attached_timeout), Now = os:system_time(millisecond), - Props = #{creation_time => Now, - message_id => <<"my message ID">>, - correlation_id => <<"my correlation ID">>, + Props = #{content_encoding => <<"my content encoding">>, content_type => <<"my content type">>, - content_encoding => <<"my content encoding">>, - group_id => <<"my group ID">>}, + correlation_id => <<"my correlation ID">>, + creation_time => Now, + group_id => <<"my group ID">>, + message_id => <<"my message ID">>, + to => <<"localhost">> + }, Msg0 = amqp10_msg:new(<<"my-tag">>, Body, true), Msg1 = amqp10_msg:set_application_properties(#{"a_key" => "a_value"}, Msg0), Msg2 = amqp10_msg:set_properties(Props, Msg1), - Msg = amqp10_msg:set_message_annotations(#{<<"x-key 1">> => "value 1", - <<"x-key 2">> => "value 2"}, Msg2), + Msg = amqp10_msg:set_message_annotations(MessageAnnotations, Msg2), ok = amqp10_client:send_msg(Sender, Msg), ok = amqp10_client:detach_link(Sender), await_link(Sender, {detached, normal}, link_detach_timeout), {error, link_not_found} = amqp10_client:detach_link(Sender), - {ok, Receiver} = amqp10_client:attach_receiver_link( - Session, <<"banana-receiver">>, <<"test1">>, settled, unsettled_state), + ReceiverAttachArgs = #{ + name => <<"banana-receiver">>, + role => {receiver, #{address => Destination, + durable => unsettled_state, + capabilities => ReceiverCapabilities}, self()}, + snd_settle_mode => settled, + rcv_settle_mode => first, + filter => #{}, + properties => #{} + }, + {ok, Receiver} = amqp10_client:attach_link(Session, ReceiverAttachArgs), {ok, OutMsg} = amqp10_client:get_msg(Receiver, 4 * 60_000), ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection), - % ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]), - ?assertMatch(Props, amqp10_msg:properties(OutMsg)), + ActualProps = amqp10_msg:properties(OutMsg), + [ ?assertEqual(V, maps:get(K, ActualProps)) || K := V <- Props, + not lists:member(K, DoNotAssertMessageProperties)], + ?assertEqual(#{<<"a_key">> => <<"a_value">>}, amqp10_msg:application_properties(OutMsg)), - ?assertMatch(#{<<"x-key 1">> := <<"value 1">>, - <<"x-key 2">> := <<"value 2">>}, amqp10_msg:message_annotations(OutMsg)), + ActualMessageAnnotations = amqp10_msg:message_annotations(OutMsg), + [ ?assertEqual(V, maps:get(K, ActualMessageAnnotations)) || K := V <- MessageAnnotations], + ?assertEqual([Body], amqp10_msg:body(OutMsg)), ok. filtered_roundtrip(OpenConf) -> - filtered_roundtrip(OpenConf, <<"banana">>). + filtered_roundtrip(OpenConf, <<"test1">>). filtered_roundtrip(OpenConf, Body) -> {ok, Connection} = amqp10_client:open_connection(OpenConf), @@ -389,7 +481,7 @@ filtered_roundtrip(OpenConf, Body) -> {ok, DefaultReceiver} = amqp10_client:attach_receiver_link(Session, <<"default-receiver">>, - <<"test1">>, + <<"test1">>, settled, unsettled_state), ok = amqp10_client:send_msg(Sender, Msg1), @@ -405,12 +497,14 @@ filtered_roundtrip(OpenConf, Body) -> ok = amqp10_client:send_msg(Sender, Msg2), + SelectorFilter = #{<<"apache.org:selector-filter:string">> => + <<"amqp.annotation.x-opt-enqueuedtimeutc > ", Now2Binary/binary>>}, {ok, FilteredReceiver} = amqp10_client:attach_receiver_link(Session, <<"filtered-receiver">>, - <<"test1">>, + <<"test1">>, settled, unsettled_state, - #{<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > ", Now2Binary/binary>>}), + SelectorFilter), {ok, OutMsg2} = amqp10_client:get_msg(DefaultReceiver, 60_000 * 4), ?assertEqual(<<"msg-2-tag">>, amqp10_msg:delivery_tag(OutMsg2)), @@ -759,6 +853,7 @@ subscribe_with_auto_flow_unsettled(Config) -> ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection). + insufficient_credit(Config) -> Hostname = ?config(mock_host, Config), Port = ?config(mock_port, Config), @@ -862,6 +957,189 @@ multi_transfer_without_delivery_id(Config) -> ok = amqp10_client:close_connection(Connection), ok. +set_receiver_capabilities(Config) -> + Hostname = ?config(tcp_hostname_amqp, Config), + Port = ?config(tcp_port_amqp, Config), + +% Hostname = ?config(mock_host, Config), +% Port = ?config(mock_port, Config), + + OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> + {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} + end, + BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch}, + next_outgoing_id = {uint, 1}, + incoming_window = {uint, 1000}, + outgoing_window = {uint, 1000}} + ]} + end, + AttachStep = fun({0 = Ch, #'v1_0.attach'{role = true, + name = Name, + source = #'v1_0.source'{ + capabilities = {symbol, <<"capability-1">>}} + }, <<>>}) -> + {Ch, [#'v1_0.attach'{name = Name, + handle = {uint, 99}, + initial_delivery_count = {uint, 1}, + role = false} + ]} + end, + + LinkCreditStep = fun({0 = Ch, #'v1_0.flow'{}, <<>>}) -> + {Ch, {multi, [[#'v1_0.transfer'{handle = {uint, 99}, + delivery_id = {uint, 12}, + more = true}, + #'v1_0.data'{content = <<"hello ">>}], + [#'v1_0.transfer'{handle = {uint, 99}, + % delivery_id can be omitted + % for continuation frames + delivery_id = undefined, + settled = undefined, + more = false}, + #'v1_0.data'{content = <<"world">>}] + ]}} + end, + Steps = [fun mock_server:recv_amqp_header_step/1, + fun mock_server:send_amqp_header_step/1, + mock_server:amqp_step(OpenStep), + mock_server:amqp_step(BeginStep), + mock_server:amqp_step(AttachStep), + mock_server:amqp_step(LinkCreditStep) + ], + + ok = mock_server:set_steps(?config(mock_server, Config), Steps), + + Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()}, + {ok, Connection} = amqp10_client:open_connection(Cfg), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + AttachArgs = #{name => <<"mock1-received">>, + role => {receiver, #{address => <<"test">>, + durable => none, + capabilities => <<"capability-1">>}, self()}, + snd_settle_mode => setlled, + rcv_settle_mode => first, + filter => #{}, + properties => #{}}, + {ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs), + amqp10_client:flow_link_credit(Receiver, 100, 50), + receive + {amqp10_msg, Receiver, _InMsg} -> + ok + after 2000 -> + exit(delivery_timeout) + end, + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection), + ok. + +set_sender_capabilities(Config) -> + Hostname = ?config(tcp_hostname_amqp, Config), + Port = ?config(tcp_port_amqp, Config), + OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> + {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} + end, + BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch}, + next_outgoing_id = {uint, 1}, + incoming_window = {uint, 1000}, + outgoing_window = {uint, 1000}} + ]} + end, + AttachStep = fun({0 = Ch, #'v1_0.attach'{role = false, + name = Name, + source = #'v1_0.source'{ + + }, + target = #'v1_0.target'{ + capabilities = {symbol, <<"capability-1">>}}}, <<>>}) -> + {Ch, [#'v1_0.attach'{name = Name, + handle = {uint, 99}, + role = true}]} + end, + Steps = [fun mock_server:recv_amqp_header_step/1, + fun mock_server:send_amqp_header_step/1, + mock_server:amqp_step(OpenStep), + mock_server:amqp_step(BeginStep), + mock_server:amqp_step(AttachStep)], + + ok = mock_server:set_steps(?config(mock_server, Config), Steps), + + Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()}, + {ok, Connection} = amqp10_client:open_connection(Cfg), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + AttachArgs = #{name => <<"mock1-sender">>, + role => {sender, #{address => <<"test">>, + durable => none, + capabilities => <<"capability-1">>}}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, Sender} = amqp10_client:attach_link(Session, AttachArgs), + await_link(Sender, attached, attached_timeout), + Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true), + {error, insufficient_credit} = amqp10_client:send_msg(Sender, Msg), + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection), + ok. + + +set_sender_sync_capabilities(Config) -> + Hostname = ?config(tcp_hostname_amqp, Config), + Port = ?config(tcp_port_amqp, Config), + + OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> + {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} + end, + BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch}, + next_outgoing_id = {uint, 1}, + incoming_window = {uint, 1000}, + outgoing_window = {uint, 1000}} + ]} + end, + AttachStep = fun({0 = Ch, #'v1_0.attach'{role = false, + name = Name, + source = #'v1_0.source'{ + + }, + target = #'v1_0.target'{ + capabilities = {array, symbol, [ + {symbol,<<"capability-1">>}, + {symbol,<<"capability-2">>} + ]} + }}, <<>>}) -> + {Ch, [#'v1_0.attach'{name = Name, + handle = {uint, 99}, + role = true}]} + end, + Steps = [fun mock_server:recv_amqp_header_step/1, + fun mock_server:send_amqp_header_step/1, + mock_server:amqp_step(OpenStep), + mock_server:amqp_step(BeginStep), + mock_server:amqp_step(AttachStep)], + + ok = mock_server:set_steps(?config(mock_server, Config), Steps), + + Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()}, + {ok, Connection} = amqp10_client:open_connection(Cfg), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + AttachArgs = #{name => <<"mock1-sender">>, + role => {sender, #{address => <<"test">>, + durable => none, + capabilities => [<<"capability-1">>,<<"capability-2">>]}}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, Sender} = amqp10_client:attach_link(Session, AttachArgs), + await_link(Sender, attached, attached_timeout), + Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true), + {error, insufficient_credit} = amqp10_client:send_msg(Sender, Msg), + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection), + ok. + outgoing_heartbeat(Config) -> Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), @@ -918,6 +1196,7 @@ incoming_heartbeat(Config) -> %%% await_link(Who, What, Err) -> + ct:log("await_link ..."), receive {amqp10_event, {link, Who0, What0}} when Who0 =:= Who andalso @@ -925,8 +1204,10 @@ await_link(Who, What, Err) -> ok; {amqp10_event, {link, Who0, {detached, Why}}} when Who0 =:= Who -> + ct:log("await_link fail ..."), ct:fail(Why) after 5000 -> + ct:log("await_link fail ~p", [Err]), flush(), ct:fail(Err) end. diff --git a/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner new file mode 100755 index 000000000000..3740a70f29a3 --- /dev/null +++ b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner @@ -0,0 +1,104 @@ +#!/bin/bash + +SCRIPT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +set -u + +IMAGE=pivotalrabbitmq/ibm-mqadvanced-server-dev +IMAGE_TAG=9.4.0.5-amd64 + +kill_container_if_exist() { + if docker stop $1 &> /dev/null; then + docker rm $1 &> /dev/null + fi +} + +ensure_docker_image_exists() { + TAG=`docker images --filter reference=$IMAGE | grep $IMAGE_TAG` + if [ -z ${TAG+x} ] + then + echo "Docker image ${IMAGE}:${IMAGE_TAG} does not exist" + exit 1 + else + echo "Docker image ${IMAGE}:${IMAGE_TAG} ready" + fi +} + +wait_for_message() { + attemps_left=10 + while ! docker logs $1 2>&1 | grep -q "$2"; + do + sleep 5 + print "Waiting 5sec for $1 to start ($attemps_left attempts left )..." + ((attemps_left--)) + if [[ "$attemps_left" -lt 1 ]]; then + print "Timed out waiting" + save_container_log $1 + exit 1 + fi + done +} +declare -i PADDING_LEVEL=0 + +print() { + tabbing="" + if [[ $PADDING_LEVEL -gt 0 ]]; then + for i in $(seq $PADDING_LEVEL); do + tabbing="$tabbing\t" + done + fi + echo -e "$tabbing$1" +} + +invoke_start(){ + kill_container_if_exist ibmmq + ensure_docker_image_exists + + docker run --name ibmmq \ + --env LICENSE=accept \ + --env MQ_QMGR_NAME=QM1 \ + --env MQ_APP_PASSWORD=passw0rd \ + --env MQ_ADMIN_PASSWORD=passw0rd \ + --env LICENSE=accept \ + --publish 1414:1414 \ + --publish 9443:9443 \ + --publish 5672:5672 \ + --detach \ + $IMAGE:$IMAGE_TAG + wait_for_message ibmmq "The listener 'SYSTEM.LISTENER.TCP.1' has started." + wait_for_message ibmmq "Successfully loaded default keystore" + + docker exec ibmmq bash -c 'echo "SET CHLAUTH(SYSTEM.DEF.AMQP) TYPE(ADDRESSMAP) ADDRESS(*) MCAUSER(app)" | /opt/mqm/bin/runmqsc QM1' + docker exec ibmmq bash -c 'echo "STOP SERVICE(SYSTEM.AMQP.SERVICE)" | /opt/mqm/bin/runmqsc QM1' + docker exec ibmmq bash -c 'echo "START SERVICE(SYSTEM.AMQP.SERVICE)" | /opt/mqm/bin/runmqsc QM1' + docker exec ibmmq bash -c 'echo "START CHANNEL(SYSTEM.DEF.AMQP)" | /opt/mqm/bin/runmqsc QM1' + wait_for_message ibmmq "The Server 'SYSTEM.AMQP.SERVICE' has started" + sleep 10 + print "Waited 10 seconds for container to start" +} +capture_logs() { + print "Capturing ibmmq logs to $1" + docker logs ibmmq > $1 +} +invoke_stop(){ + kill_container_if_exist ibmmq +} + +case "$1" in + version) + echo "IBM MQ ${IMAGE}:${IMAGE_TAG}" + ;; + build) + build_docker_image + ;; + logs) + capture_logs "$2" + ;; + start) + invoke_start + ;; + stop) + invoke_stop + exit $? + ;; +esac