Skip to content

Commit

Permalink
Adapt to new bridge plugin (#27)
Browse files Browse the repository at this point in the history
Adapt to new bridge plugin
  • Loading branch information
gilbertwong96 authored and terry-xiaoyu committed Jun 29, 2019
1 parent 77ffe70 commit 0c3a9b9
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 39 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ before_install:
- git clone https://github.com/erlang/rebar3.git; cd rebar3; ./bootstrap; sudo mv rebar3 /usr/local/bin/; cd ..

script:
- make compile
- make xref
- make eunit
- make ct
Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export REBAR_GIT_CLONE_OPTIONS
REBAR = rebar3
all: compile

compile:
compile: unlock
$(REBAR) compile

ct: compile
Expand All @@ -15,6 +15,10 @@ ct: compile
eunit: compile
$(REBAR) as test eunit

unlock: unlock
@rm -rf _build/*/lib/*/rebar.lock
$(REBAR) unlock

xref:
$(REBAR) xref

Expand Down
9 changes: 4 additions & 5 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
{deps, []}.

{profiles,
[{test, [
{deps, [{emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.1.1"}}}
]}
]}
]}.
[{test,
[{deps,
[{emqx_ct_helpers, "1.1.4"}]}]}]
}.

{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
Expand Down
4 changes: 3 additions & 1 deletion rebar.config.script
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ UrlPrefix = "https://github.com/emqx/",

EMQX_DEP = {emqx, {git, UrlPrefix ++ "emqx", {branch, BRANCH}}},

EMQX_BRIDGE_MQTT_DEP = {emqx_bridge_mqtt, {git, UrlPrefix ++ "emqx-bridge-mqtt", {branch, BRANCH}}},

EMQX_MGMT_DEP = {emqx_management, {git, UrlPrefix ++ "emqx-management", {branch, BRANCH}}},

NewDeps = [EMQX_DEP, EMQX_MGMT_DEP | DEPS],
NewDeps = [EMQX_DEP, EMQX_MGMT_DEP, EMQX_BRIDGE_MQTT_DEP | DEPS],

CONFIG2 = lists:keystore(deps, 1, CONFIG1, {deps, NewDeps}),

Expand Down
28 changes: 3 additions & 25 deletions src/emqx_cube_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,11 @@
, stop/1
]).

%%%===================================================================
%%% Application callbacks
%%%===================================================================

%%--------------------------------------------------------------------
%% @private
%% @doc
%% start the cube client app
%% @end
%%--------------------------------------------------------------------
-spec start(StartType :: normal |
{takeover, Node :: node()} |
{failover, Node :: node()},
StartArgs :: term()) ->
{ok, Pid :: pid()} |
{ok, Pid :: pid(), State :: map()} |
{error, Reason :: term()}.
start(_StartType, _StartArgs) ->
ok = application:ensure_started(emqx_bridge_mqtt),
emqx_cube_sup:start_link().
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called whenever an application has stopped. It
%% is intended to be the opposite of Module:start/2 and should do
%% any necessary cleaning up. The return value is ignored.
%% @end
%%--------------------------------------------------------------------


-spec stop(State :: term()) -> any().
stop(_State) ->
ok.
12 changes: 6 additions & 6 deletions src/emqx_cube_datasync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ list(_Bindings) ->

-spec(update(BridgeSpec :: map()) -> {ok, list()}).
update(BridgeSpec = #{id := Id, name := Name}) ->
emqx_bridge_sup:drop_bridge(maybe_b2a(Name)),
emqx_bridge_mqtt_sup:drop_bridge(maybe_b2a(Name)),
update_bridge(Id, Name, BridgeSpec),
post_start_bridge(BridgeSpec).

Expand Down Expand Up @@ -168,7 +168,7 @@ insert_bridge(Bridge = #?TAB{id = Id}) ->

remove_bridge(Id) ->
handle_lookup(Id, fun(Name, _Options) ->
emqx_bridge_sup:drop_bridge(maybe_b2a(Name))
emqx_bridge_mqtt_sup:drop_bridge(maybe_b2a(Name))
end),
mnesia:transaction(fun mnesia:delete/1, [{?TAB, Id}]).

Expand All @@ -187,8 +187,8 @@ start_bridge(#{ id := Id, rsp_topic := RspTopic, cube_pid := CubePid}) ->
StartBridge = fun(Name, Options) ->
Name1 = maybe_b2a(Name),
Options1 = trans_opts(maps:to_list(Options), Name),
emqx_bridge_sup:create_bridge(Name1, Options1#{bridge_handler => BridgeHandler}),
try emqx_bridge:ensure_started(Name1) of
emqx_bridge_mqtt_sup:create_bridge(Name1, Options1#{bridge_handler => BridgeHandler}),
try emqx_bridge_worker:ensure_started(Name1) of
ok -> [{code, ?SUCCESS},
{data, <<"Start bridge successfully">>}];
connected -> [{code, ?SUCCESS},
Expand All @@ -207,7 +207,7 @@ bridges_status() ->
|| {_TabName, _Id, Name, _Bridge} <- Bridges]}]}.

get_bridge_status(Name) ->
try emqx_bridge:status(Name) of
try emqx_bridge_worker:status(Name) of
standing_by -> disconnected;
Status -> Status
catch
Expand All @@ -217,7 +217,7 @@ get_bridge_status(Name) ->
stop_bridge(Id) ->
DropBridge = fun(Name, _Options) ->
Name1 = maybe_b2a(Name),
case emqx_bridge_sup:drop_bridge(Name1) of
case emqx_bridge_mqtt_sup:drop_bridge(Name1) of
ok -> [{code, ?SUCCESS}, {data, <<"stop bridge successfully">>}];
{error, _Error} -> [{code, ?ERROR4}, {data, <<"stop bridge failed">>}]
end
Expand Down
2 changes: 1 addition & 1 deletion test/emqx_cube_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ bridge_params(BridgeName) ->
{<<"queue">>,
[{<<"batch_count_limit">>,32},
{<<"batch_bytes_limit">>,<<"1000MB">>},
{<<"replayq_dir">>,<<"data/emqx_bridge/">>},
{<<"replayq_dir">>,<<"data/emqx_bridge_mqtt/">>},
{<<"replayq_seg_bytes">>,<<"10MB">>}]},
{<<"reconnect_interval">>,<<"30s">>},
{<<"retry_interval">>,<<"20s">>},
Expand Down

0 comments on commit 0c3a9b9

Please sign in to comment.