Skip to content

Commit

Permalink
Startup fixes (#1008)
Browse files Browse the repository at this point in the history
* * Start grpcbox server last and clients first
* Fix console api to cache device req for 10s
* Fix console api to not do any request unless a token is found
* Avoiding multiple device updates on device worker startup
* Fix dead code in dc_tracker

* Avoid issues with device cache in tests

* Update src/apis/router_console_api.erl

Co-authored-by: Michael Jeffrey <michaeldjeffrey@gmail.com>

* Update src/apis/router_console_api.erl

Co-authored-by: Michael Jeffrey <michaeldjeffrey@gmail.com>

* We do not need xor filter tests anymore

* Fix channel_worker_SUITE

* More tests fixes

* Fix dialyzer

* Fix devaddr in routing tests

* Fix last tests

* Fix ddos_joins_test

---------

Co-authored-by: Michael Jeffrey <michaeldjeffrey@gmail.com>
  • Loading branch information
macpie and michaeldjeffrey authored Oct 10, 2023
1 parent a380d96 commit 3835896
Show file tree
Hide file tree
Showing 21 changed files with 739 additions and 580 deletions.
871 changes: 485 additions & 386 deletions src/apis/router_console_api.erl

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions src/apis/router_console_dc_tracker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ init(Args) ->
{PubKey, _, _} = router_blockchain:get_key(),
PubkeyBin = libp2p_crypto:pubkey_to_bin(PubKey),
_ = erlang:send_after(0, self(), prefetch_orgs),
_ = erlang:send_after(500, self(), post_init),
case router_blockchain:is_chain_dead() of
true -> ok;
false -> _ = erlang:send_after(500, self(), chain_init)
end,
{ok, #state{pubkey_bin = PubkeyBin}}.

handle_call(_Msg, _From, State) ->
Expand Down Expand Up @@ -245,10 +248,10 @@ handle_info(prefetch_orgs, #state{} = State) ->
lager:warning("failed to prefetch orgs: ~p", [_Reason])
end,
{noreply, State};
handle_info(post_init, #state{} = State) ->
handle_info(chain_init, #state{} = State) ->
case router_blockchain:privileged_maybe_get_blockchain() of
undefined ->
erlang:send_after(500, self(), post_init),
erlang:send_after(500, self(), chain_init),
{noreply, State};
_Chain ->
ok = blockchain_event:add_handler(self()),
Expand Down
15 changes: 11 additions & 4 deletions src/apis/router_console_ws_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
ws :: pid(),
ws_endpoint :: binary(),
db :: rocksdb:db_handle(),
cf :: rocksdb:cf_handle()
cf :: rocksdb:cf_handle(),
check_devices_pid = undefined :: undefined | pid()
}).

%% ------------------------------------------------------------------
Expand Down Expand Up @@ -70,13 +71,18 @@ handle_cast(_Msg, State) ->

handle_info(
{'EXIT', WSPid0, _Reason},
#state{ws = WSPid0, ws_endpoint = WSEndpoint, db = DB, cf = CF} = State
#state{ws = WSPid0, ws_endpoint = WSEndpoint, db = DB, cf = CF, check_devices_pid = OldPid} =
State
) ->
lager:error("websocket connetion went down: ~p, restarting", [_Reason]),
Token = router_console_api:get_token(),
WSPid1 = start_ws(WSEndpoint, Token),
check_devices(DB, CF),
{noreply, State#state{ws = WSPid1}};
case erlang:is_pid(OldPid) andalso erlang:is_process_alive(OldPid) of
true -> erlang:exit(OldPid, kill);
false -> ok
end,
Pid = check_devices(DB, CF),
{noreply, State#state{ws = WSPid1, check_devices_pid = Pid}};
handle_info(ws_joined, #state{ws = WSPid} = State) ->
lager:info("joined, sending router address to console", []),
Payload = get_router_address_msg(),
Expand Down Expand Up @@ -374,6 +380,7 @@ update_devices(DB, CF, DeviceIDs) ->
lager:info("got update for ~p devices: ~p from WS", [Total, DeviceIDs]),
lists:foreach(
fun({Index, DeviceID}) ->
_ = e2qc:evict(router_console_api_get_device, DeviceID),
case router_devices_sup:lookup_device_worker(DeviceID) of
{error, not_found} ->
lager:info(
Expand Down
1 change: 1 addition & 0 deletions src/device/router_device_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ make_devaddr_table(DevAddr) ->
init_from_db() ->
{ok, DB, [_DefaultCF, DevicesCF]} = router_db:get(),
Devices = router_device:get(DB, DevicesCF),
%% TODO: improve this maybe?
lists:foreach(fun(Device) -> ?MODULE:save(Device) end, Devices).

%% ------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions src/device/router_device_devaddr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ handle_cast(?RECONCILE_START, #state{conn_backoff = Backoff0} = State) ->
{error, _Reason} ->
{Delay, Backoff1} = backoff:fail(Backoff0),
_ = timer:apply_after(Delay, ?MODULE, reconcile, []),
lager:warning("fail to get_devaddrs ~p, retrying in ~wms", [
lager:error("fail to get_devaddrs ~p, retrying in ~wms", [
_Reason, Delay
]),
{noreply, State#state{conn_backoff = Backoff1}};
Expand All @@ -200,7 +200,7 @@ handle_cast(?RECONCILE_START, #state{conn_backoff = Backoff0} = State) ->
handle_cast({?RECONCILE_END, {error, Reason}}, #state{conn_backoff = Backoff0} = State) ->
{Delay, Backoff1} = backoff:fail(Backoff0),
_ = timer:apply_after(Delay, ?MODULE, reconcile, []),
lager:warning("fail to get_devaddrs ~p, retrying in ~wms", [
lager:error("fail to get_devaddrs ~p, retrying in ~wms", [
Reason, Delay
]),
{noreply, State#state{conn_backoff = Backoff1}};
Expand Down
7 changes: 4 additions & 3 deletions src/device/router_device_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ handle_continue({?INIT_ASYNC, ID}, #state{db = DB, cf = CF} = State) ->
Device = get_device(DB, CF, ID),
IsActive = router_device:is_active(Device),
ok = router_utils:lager_md(Device),
ok = ?MODULE:device_update(self()),
{ok, Pid} = router_device_channels_worker:start_link(#{
device_worker => self(), device => Device
}),
Expand Down Expand Up @@ -1438,7 +1437,7 @@ handle_join_skf([{NwkSKey, _} | _] = NewKeys, [NewDevAddr | _] = NewDevAddrs, Ma
Updates = lists:usort([{add, DevAddrInt, NwkSKey, MaxCopies}] ++ KeyRemoves ++ AddrRemoves),
ok = router_ics_skf_worker:update(Updates),

lager:debug("sending update skf for join ~p ~p", [Updates]),
lager:debug("sending update skf for join ~p", [Updates]),
ok.

%% Dual-Plan Code
Expand Down Expand Up @@ -2254,7 +2253,9 @@ get_device(DB, CF, DeviceID) ->
)},
{is_active, IsActive}
],
router_device:update(DeviceUpdates, Device0)
Device2 = router_device:update(DeviceUpdates, Device0),
ok = save_device(DB, CF, Device2),
Device2
end,
MultiBuyValue = maps:get(multi_buy, router_device:metadata(Device1), 1),
ok = router_device_multibuy:max(router_device:id(Device1), MultiBuyValue),
Expand Down
3 changes: 3 additions & 0 deletions src/device/router_devices_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ id(DeviceId) ->
init([]) ->
ets:new(?ETS, [public, named_table, set]),
ok = router_device_multibuy:init(),
lager:info("router_device_multibuy:init"),
ok = router_device_routing:init(),
lager:info("router_device_routing:init"),
ok = router_device_cache:init(),
lager:info("router_device_cache:init"),
{ok, {?FLAGS, [?WORKER(router_device_worker)]}}.

%% ------------------------------------------------------------------
Expand Down
73 changes: 73 additions & 0 deletions src/grpc/router_grpc_client_worker.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
%%%-------------------------------------------------------------------
%% @doc
%% @end
%%%-------------------------------------------------------------------
-module(router_grpc_client_worker).

-behavior(gen_server).

%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([
start_link/0
]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).

-define(SERVER, ?MODULE).

-record(state, {}).

%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?SERVER, #{}, []).

%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init(_Args) ->
lager:info("~p init with ~p", [?SERVER, _Args]),
case application:get_env(grpcbox, client) of
{ok, #{channels := Channels}} ->
lists:foreach(
fun({Name, Endpoints, Options}) ->
R = grpcbox_channel_sup:start_child(Name, Endpoints, Options),
lager:info("started ~p ~p", [{Name, Endpoints, Options}, R])
end,
Channels
);
_ ->
ok
end,
{ok, #state{}}.

handle_call(_Msg, _From, State) ->
lager:warning("rcvd unknown call msg: ~p from: ~p", [_Msg, _From]),
{reply, ok, State}.

handle_cast(_Msg, State) ->
lager:warning("rcvd unknown cast msg: ~p", [_Msg]),
{noreply, State}.

handle_info(_Msg, State) ->
lager:warning("rcvd unknown info msg: ~p", [_Msg]),
{noreply, State}.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

terminate(_Reason, #state{}) ->
ok.
72 changes: 72 additions & 0 deletions src/grpc/router_grpc_server_worker.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
%%%-------------------------------------------------------------------
%% @doc
%% @end
%%%-------------------------------------------------------------------
-module(router_grpc_server_worker).

-behavior(gen_server).

%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([
start_link/0
]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).

-define(SERVER, ?MODULE).

-record(state, {}).

%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?SERVER, #{}, []).

%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init(_Args) ->
lager:info("~p init with ~p", [?SERVER, _Args]),
lists:foreach(
fun(ServerOpts) ->
R = grpcbox_services_simple_sup:start_child(ServerOpts),
lager:info("started ~p ~p", [ServerOpts, R])
end,
application:get_env(grpcbox, servers, [])
),
{ok, #state{}}.

handle_call(_Msg, _From, State) ->
lager:warning("rcvd unknown call msg: ~p from: ~p", [_Msg, _From]),
{reply, ok, State}.

handle_cast(_Msg, State) ->
lager:warning("rcvd unknown cast msg: ~p", [_Msg]),
{noreply, State}.

handle_info(_Msg, State) ->
lager:warning("rcvd unknown info msg: ~p", [_Msg]),
{noreply, State}.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

terminate(_Reason, #state{}) ->
ok.

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
67 changes: 0 additions & 67 deletions src/grpc/router_ics_gateway_location_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,14 @@
%%%-------------------------------------------------------------------
-module(router_ics_gateway_location_worker).

-behavior(gen_server).

-include("./autogen/iot_config_pb.hrl").

%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([
start_link/1,
init_ets/0,
get/1
]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).

-define(SERVER, ?MODULE).
-define(ETS, router_ics_gateway_location_worker_ets).
-define(INIT, init).
-ifdef(TEST).
-define(BACKOFF_MIN, 100).
-else.
Expand All @@ -41,26 +21,12 @@
-define(BACKOFF_MAX, timer:minutes(5)).
-define(CACHED_NOT_FOUND, cached_not_found).

-record(state, {
pubkey_bin :: libp2p_crypto:pubkey_bin(),
sig_fun :: function()
}).

-record(location, {
gateway :: libp2p_crypto:pubkey_bin(),
timestamp :: non_neg_integer(),
h3_index :: h3:index() | undefined
}).

%% -type state() :: #state{}.

%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?SERVER, Args, []).

-spec init_ets() -> ok.
init_ets() ->
?ETS = ets:new(?ETS, [
Expand Down Expand Up @@ -101,39 +67,6 @@ get(PubKeyBin) ->
end
end.

%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init(
#{
pubkey_bin := PubKeyBin,
sig_fun := SigFun
} = Args
) ->
lager:info("~p init with ~p", [?SERVER, Args]),
{ok, #state{
pubkey_bin = PubKeyBin,
sig_fun = SigFun
}}.

handle_call(_Msg, _From, State) ->
lager:warning("rcvd unknown call msg: ~p from: ~p", [_Msg, _From]),
{reply, ok, State}.

handle_cast(_Msg, State) ->
lager:warning("rcvd unknown cast msg: ~p", [_Msg]),
{noreply, State}.

handle_info(_Msg, State) ->
lager:warning("rcvd unknown info msg: ~p", [_Msg]),
{noreply, State}.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

terminate(_Reason, _State) ->
ok.

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
Expand Down
3 changes: 1 addition & 2 deletions src/router.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@
iso8601,
clique,
inet_cidr,
grpcbox,
erlang_lorawan,
router_utils,
throttle,
e2qc
]},
{included_applications, [blockchain]},
{included_applications, [blockchain, grpcbox]},
{env, []},
{modules, []},
{maintainers, []},
Expand Down
Loading

0 comments on commit 3835896

Please sign in to comment.