diff --git a/include/fox.hrl b/include/fox.hrl index 0d2f729..5a9f76f 100644 --- a/include/fox.hrl +++ b/include/fox.hrl @@ -1,8 +1,8 @@ -include_lib("amqp_client/include/amqp_client.hrl"). --type(pool_name() :: binary() | string() | atom()). --type(queue_name() :: binary()). --type(subscribe_queue() :: queue_name() | #'basic.consume'{}). +-type pool_name() :: binary() | string() | atom(). +-type queue_name() :: binary(). +-type subscribe_queue() :: queue_name() | #'basic.consume'{}. -record(conn_worker_state, { connection :: pid() | undefined, diff --git a/src/fox.erl b/src/fox.erl index 5b01ab5..8afb437 100644 --- a/src/fox.erl +++ b/src/fox.erl @@ -5,7 +5,8 @@ create_connection_pool/3, close_connection_pool/1, get_channel/1, - subscribe/3, subscribe/4, unsubscribe/2, + subscribe/3, subscribe/4, subscribe/5, + unsubscribe/2, declare_exchange/2, declare_exchange/3, delete_exchange/2, delete_exchange/3, bind_exchange/4, bind_exchange/5, @@ -77,15 +78,20 @@ get_channel(PoolName0) -> -spec subscribe(pool_name(), subscribe_queue(), module()) -> {ok, SubscriptionReference :: reference()} | {error, Reason :: term()}. subscribe(PoolName, Queue, SubsModule) -> - subscribe(PoolName, Queue, SubsModule, []). + subscribe(PoolName, Queue, SubsModule, [], []). -spec subscribe(pool_name(), subscribe_queue(), module(), list()) -> {ok, SubscriptionReference :: reference()}. subscribe(PoolName0, BasicConsumeOrQueueName, SubsModule, SubsArgs) -> + subscribe(PoolName0, BasicConsumeOrQueueName, SubsModule, SubsArgs, []). + +-spec subscribe(pool_name(), subscribe_queue(), module(), list(), [gen_server:start_opt()]) -> + {ok, SubscriptionReference :: reference()}. +subscribe(PoolName0, BasicConsumeOrQueueName, SubsModule, SubsArgs, GenServerStartOptions) -> PoolName = fox_utils:name_to_atom(PoolName0), CPid = fox_conn_pool:get_conn_worker(PoolName), - BasicConsume = + BasicConsume = case BasicConsumeOrQueueName of #'basic.consume'{} = Consume -> Consume; Name when is_binary(Name) -> #'basic.consume'{queue = Name} @@ -99,7 +105,7 @@ subscribe(PoolName0, BasicConsumeOrQueueName, SubsModule, SubsArgs) -> subs_module = SubsModule, subs_args = SubsArgs }, - {ok, _} = fox_subs_sup:start_subscriber(PoolName, Subs), + {ok, _} = fox_subs_sup:start_subscriber(PoolName, Subs, GenServerStartOptions), {ok, SubsRef}. diff --git a/src/subscription/fox_subs_sup.erl b/src/subscription/fox_subs_sup.erl index e81abac..282025c 100644 --- a/src/subscription/fox_subs_sup.erl +++ b/src/subscription/fox_subs_sup.erl @@ -1,7 +1,7 @@ -module(fox_subs_sup). -behaviour(supervisor). --export([start_link/1, start_subscriber/2, init/1]). +-export([start_link/1, start_subscriber/3, init/1]). -include("otp_types.hrl"). -include("fox.hrl"). @@ -13,10 +13,10 @@ start_link(PoolName) -> supervisor:start_link({local, RegName}, ?MODULE, no_args). --spec start_subscriber(atom(), #subscription{}) -> startchild_ret(). -start_subscriber(PoolName, Sub) -> +-spec start_subscriber(atom(), #subscription{}, [gen_server:start_opt()]) -> startchild_ret(). +start_subscriber(PoolName, Subscription, GenServerStartOptions) -> RegName = fox_utils:make_reg_name(?MODULE, PoolName), - supervisor:start_child(RegName, [Sub]). + supervisor:start_child(RegName, [Subscription, GenServerStartOptions]). -spec(init(gs_args()) -> sup_init_reply()). diff --git a/src/subscription/fox_subs_worker.erl b/src/subscription/fox_subs_worker.erl index a1230f2..a10ebe7 100644 --- a/src/subscription/fox_subs_worker.erl +++ b/src/subscription/fox_subs_worker.erl @@ -1,7 +1,7 @@ -module(fox_subs_worker). -behavior(gen_server). --export([start_link/1, connection_established/2, stop/1]). +-export([start_link/2, connection_established/2, stop/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -include("otp_types.hrl"). @@ -14,9 +14,9 @@ %%% module API --spec start_link(#subscription{}) -> gs_start_link_reply(). -start_link(State) -> - gen_server:start_link(?MODULE, State, []). +-spec start_link(#subscription{}, [gen_server:start_opt()]) -> gs_start_link_reply(). +start_link(State, StartOptions) -> + gen_server:start_link(?MODULE, State, StartOptions). -spec connection_established(pid(), pid()) -> ok. @@ -74,7 +74,7 @@ handle_cast({connection_established, Conn}, = State) -> State2 = unsubscribe(State), case amqp_connection:open_channel(Conn) of - {ok, Channel} -> + {ok, Channel} -> logger:info("~s subscribe to queue", [worker_name(State)]), Ref = erlang:monitor(process, Channel), @@ -85,11 +85,11 @@ handle_cast({connection_established, Conn}, {noreply, State2#subscription{ connection = Conn, - channel = Channel, - channel_ref = Ref, - subs_state = SubsState, + channel = Channel, + channel_ref = Ref, + subs_state = SubsState, subs_tag = Tag}}; - Other -> + Other -> logger:info("~s can't subscribe to queue, reason: ~w", [worker_name(State), Other]), {noreply, State2} end;