diff --git a/src/apis/router_console_api.erl b/src/apis/router_console_api.erl index ad3a33e86..1bd8b4672 100644 --- a/src/apis/router_console_api.erl +++ b/src/apis/router_console_api.erl @@ -1044,7 +1044,7 @@ get_org_(OrgID) -> lager:debug("org ~p not found", [OrgID]), End = erlang:system_time(millisecond), ok = router_metrics:console_api_observe(get_org, not_found, End - Start), - {error, not_found}; + {error, org_not_found}; _Other -> End = erlang:system_time(millisecond), ok = router_metrics:console_api_observe(get_org, error, End - Start), diff --git a/src/apis/router_console_dc_tracker.erl b/src/apis/router_console_dc_tracker.erl index 28031324a..81fb30793 100644 --- a/src/apis/router_console_dc_tracker.erl +++ b/src/apis/router_console_dc_tracker.erl @@ -127,23 +127,28 @@ has_enough_dc(OrgID, PayloadSize) when is_binary(OrgID) -> lager:warning("failed to calculate dc amount ~p", [_Reason]), {error, failed_calculate_dc}; DCAmount -> - {Balance0, Nonce} = + MaybeBalance = case lookup(OrgID) of {error, not_found} -> fetch_and_save_org_balance(OrgID); - {ok, 0, _N} -> - fetch_and_save_org_balance(OrgID); + %% {ok, 0, _N} -> + %% fetch_and_save_org_balance(OrgID); {ok, B, N} -> - {B, N} + {ok, {B, N}} end, - Balance1 = Balance0 - DCAmount, - case {Balance1 >= 0, Nonce > 0} of - {false, _} -> - {error, {not_enough_dc, Balance0, DCAmount}}; - {_, false} -> - {error, bad_nonce}; - {true, true} -> - {ok, OrgID, Balance1, Nonce} + case MaybeBalance of + {error, _} = Err -> + Err; + {ok, {Balance0, Nonce}} -> + Balance1 = Balance0 - DCAmount, + case {Balance1 >= 0, Nonce > 0} of + {false, _} -> + {error, {not_enough_dc, Balance0, DCAmount}}; + {_, false} -> + {error, bad_nonce}; + {true, true} -> + {ok, OrgID, Balance1, Nonce} + end end end; has_enough_dc(Device, PayloadSize) -> @@ -209,6 +214,7 @@ init(Args) -> lager:info("~p init with ~p", [?SERVER, Args]), {PubKey, _, _} = router_blockchain:get_key(), PubkeyBin = libp2p_crypto:pubkey_to_bin(PubKey), + _ = erlang:send_after(0, self(), prefetch_orgs), _ = erlang:send_after(500, self(), post_init), {ok, #state{pubkey_bin = PubkeyBin}}. @@ -220,6 +226,25 @@ handle_cast(_Msg, State) -> lager:warning("rcvd unknown cast msg: ~p", [_Msg]), {noreply, State}. +handle_info(prefetch_orgs, #state{} = State) -> + case router_console_api:get_orgs() of + {ok, OrgList} -> + lists:foreach( + fun( + #{ + <<"id">> := OrgID, + <<"dc_balance">> := Balance, + <<"dc_balance_nonce">> := Nonce + } + ) -> + insert(OrgID, Balance, Nonce) + end, + OrgList + ); + {error, _Reason} -> + lager:warning("failed to prefetch orgs: ~p", [_Reason]) + end, + {noreply, State}; handle_info(post_init, #state{} = State) -> case router_blockchain:privileged_maybe_get_blockchain() of undefined -> @@ -293,19 +318,19 @@ txn_filter_fun(PubKeyBin, Txn) -> Payee == PubKeyBin andalso Memo =/= 0 end. --spec fetch_and_save_org_balance(binary()) -> {non_neg_integer(), non_neg_integer()}. +-spec fetch_and_save_org_balance(binary()) -> {ok, {non_neg_integer(), non_neg_integer()}} | {error, any()}. fetch_and_save_org_balance(OrgID) -> case router_console_api:get_org(OrgID) of - {error, _} -> - {0, 0}; + {error, _} = Err -> + Err; {ok, Map} -> Balance = maps:get(<<"dc_balance">>, Map, 0), case maps:get(<<"dc_balance_nonce">>, Map, 0) of 0 -> - {0, 0}; + {ok, {0, 0}}; Nonce -> ok = insert(OrgID, Balance, Nonce), - {Balance, Nonce} + {ok, {Balance, Nonce}} end end. @@ -388,7 +413,7 @@ has_enough_dc_test() -> Balance = 2, PayloadSize = 48, ?assertEqual( - {error, {not_enough_dc, 0, Balance}}, has_enough_dc(OrgID, PayloadSize) + {error, deal_with_it}, has_enough_dc(OrgID, PayloadSize) ), ?assertEqual(ok, refill(OrgID, Nonce, Balance)), ?assertEqual({ok, OrgID, 0, 1}, has_enough_dc(OrgID, PayloadSize)), @@ -405,7 +430,7 @@ charge_test() -> Balance = 2, PayloadSize = 48, ?assertEqual( - {error, {not_enough_dc, 0, Balance}}, has_enough_dc(OrgID, PayloadSize) + {error, deal_with_it}, has_enough_dc(OrgID, PayloadSize) ), ?assertEqual(ok, refill(OrgID, Nonce, Balance)), ?assertEqual({ok, 0, 1}, charge(OrgID, PayloadSize)), @@ -417,11 +442,11 @@ charge_test() -> current_balance_test() -> ok = init_ets(), meck:new(router_console_api, [passthrough]), - meck:expect(router_console_api, get_org, fun(_OrgID) -> {error, 0} end), + meck:expect(router_console_api, get_org, fun(_OrgID) -> {error, deal_with_it} end), OrgID = <<"ORG_ID">>, Nonce = 1, Balance = 100, - ?assertEqual({0, 0}, current_balance(OrgID)), + ?assertEqual({error, deal_with_it}, current_balance(OrgID)), ?assertEqual(ok, refill(OrgID, Nonce, Balance)), ?assertEqual({100, 1}, current_balance(OrgID)), ?assert(meck:validate(router_console_api)), diff --git a/src/device/router_device_routing.erl b/src/device/router_device_routing.erl index 838b8b2a2..106a922b0 100644 --- a/src/device/router_device_routing.erl +++ b/src/device/router_device_routing.erl @@ -781,7 +781,7 @@ check_device_is_active(Device, PubKeyBin) -> ) -> ok | {error, ?DEVICE_NO_DC}. check_device_balance(PayloadSize, Device, PubKeyBin) -> try router_console_dc_tracker:has_enough_dc(Device, PayloadSize) of - {error, _Reason} -> + {error, {not_enough_dc, _, _}} -> ok = router_utils:event_uplink_dropped_not_enough_dc( erlang:system_time(millisecond), router_device:fcnt(Device), @@ -789,6 +789,8 @@ check_device_balance(PayloadSize, Device, PubKeyBin) -> PubKeyBin ), {error, ?DEVICE_NO_DC}; + {error, _} = Err -> + Err; {ok, _OrgID, _Balance, _Nonce} -> ok catch diff --git a/src/device/router_device_worker.erl b/src/device/router_device_worker.erl index 97298e616..ebb37acd3 100644 --- a/src/device/router_device_worker.erl +++ b/src/device/router_device_worker.erl @@ -1669,16 +1669,21 @@ validate_frame_(PacketFCnt, Packet, PubKeyBin, HotspotRegion, Device0, OfferCach PHash = blockchain_helium_packet_v1:packet_hash(Packet), case maybe_charge(Device0, PayloadSize, PubKeyBin, PHash, OfferCache) of {error, Reason} -> - %% REVIEW: Do we want to update region and datarate for an uncharged packet? - DeviceUpdates = [{fcnt, PacketFCnt}, {location, PubKeyBin}], - Device1 = router_device:update(DeviceUpdates, Device0), - case FPort of - 0 when FOptsLen == 0 -> - {error, {not_enough_dc, Reason, Device1}}; - 0 when FOptsLen /= 0 -> - {error, {not_enough_dc, Reason, Device0}}; - _N -> - {error, {not_enough_dc, Reason, Device1}} + case Reason of + {not_enough_dc, _, _} -> + %% REVIEW: Do we want to update region and datarate for an uncharged packet? + DeviceUpdates = [{fcnt, PacketFCnt}, {location, PubKeyBin}], + Device1 = router_device:update(DeviceUpdates, Device0), + case FPort of + 0 when FOptsLen == 0 -> + {error, {not_enough_dc, Reason, Device1}}; + 0 when FOptsLen /= 0 -> + {error, {not_enough_dc, Reason, Device0}}; + _N -> + {error, {not_enough_dc, Reason, Device1}} + end; + _ -> + {error, Reason} end; {ok, Balance, Nonce} -> case FPort of diff --git a/test/router_console_dc_tracker_SUITE.erl b/test/router_console_dc_tracker_SUITE.erl index 6d11e41e8..8582f7655 100644 --- a/test/router_console_dc_tracker_SUITE.erl +++ b/test/router_console_dc_tracker_SUITE.erl @@ -6,7 +6,12 @@ end_per_testcase/2 ]). --export([dc_test/1, burned_test/1]). +-export([ + dc_test/1, + join_cannot_fetch_org_balance_test/1, + data_cannot_fetch_org_balance_test/1, + burned_test/1 +]). -include_lib("helium_proto/include/blockchain_state_channel_v1_pb.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -31,7 +36,12 @@ %% @end %%-------------------------------------------------------------------- all() -> - [dc_test, burned_test]. + [ + dc_test, + join_cannot_fetch_org_balance_test, + data_cannot_fetch_org_balance_test, + burned_test + ]. %%-------------------------------------------------------------------- %% TEST CASE SETUP @@ -49,6 +59,71 @@ end_per_testcase(TestCase, Config) -> %% TEST CASES %%-------------------------------------------------------------------- +join_cannot_fetch_org_balance_test(Config) -> + true = ets:delete(router_console_dc_tracker_ets, ?CONSOLE_ORG_ID), + + % Send two joins to cache the get_org return value. + _ = test_utils:send_device_join(Config), + _ = test_utils:send_device_join(Config), + + receive + {console_event, <<"uplink_dropped">>, <<"uplink_dropped_not_enough_dc">>, _} -> + ct:fail(do_not_drop_failed_dc) + after 1250 -> + ok + end, + + ok. + +data_cannot_fetch_org_balance_test(Config) -> + + #{ + pubkey_bin := PubKeyBin1, + stream := Stream + } = test_utils:join_device(Config), + + + {ok, DB, CF} = router_db:get_devices(), + WorkerID = router_devices_sup:id(?CONSOLE_DEVICE_ID), + {ok, Device0} = router_device:get_by_id(DB, CF, WorkerID), + + %% The device has a session, remove the org so it needs to be fetched and + %% will not be found. + true = ets:delete(router_console_dc_tracker_ets, ?CONSOLE_ORG_ID), + + %% Simulate multiple hotspot sending data + Stream ! + {send, + test_utils:frame_packet( + ?UNCONFIRMED_UP, + PubKeyBin1, + router_device:nwk_s_key(Device0), + router_device:app_s_key(Device0), + 0 + )}, + + #{public := PubKey2} = libp2p_crypto:generate_keys(ecc_compact), + PubKeyBin2 = libp2p_crypto:pubkey_to_bin(PubKey2), + + Stream ! + {send, + test_utils:frame_packet( + ?UNCONFIRMED_UP, + PubKeyBin2, + router_device:nwk_s_key(Device0), + router_device:app_s_key(Device0), + 0 + )}, + + receive + {console_event, <<"uplink_dropped">>, <<"uplink_dropped_not_enough_dc">>, _} -> + ct:fail(do_not_drop_failed_dc) + after 1250 -> + ok + end, + + ok. + dc_test(Config) -> #{ pubkey_bin := PubKeyBin1, diff --git a/test/test_utils.erl b/test/test_utils.erl index 0e87bd451..3a3ec63ae 100644 --- a/test/test_utils.erl +++ b/test/test_utils.erl @@ -31,6 +31,7 @@ wait_state_channel_packet/1, join_payload/2, join_packet/3, join_packet/4, + send_device_join/1, send_device_join/2, join_device/1, join_device/2, frame_payload/6, frame_packet/5, frame_packet/6, @@ -328,10 +329,10 @@ add_oui(Config) -> [{oui, OUI} | Config] end. -join_device(Config) -> - join_device(Config, #{}). +send_device_join(Config) -> + send_device_join(Config, #{}). -join_device(Config, JoinOpts) -> +send_device_join(Config, JoinOpts) -> AppKey = proplists:get_value(app_key, Config), DevNonce = crypto:strong_rand_bytes(2), @@ -377,7 +378,25 @@ join_device(Config, JoinOpts) -> blockchain_state_channel_v1_pb:encode_msg(#blockchain_state_channel_message_v1_pb{ msg = {packet, SCPacket} })}, + #{ + app_key => AppKey, + dev_nonce => DevNonce, + hotspot_name => HotspotName, + stream => Stream, + pubkey_bin => PubKeyBin, + sc_packet => SCPacket + }. +join_device(Config) -> + join_device(Config, #{}). + +join_device(Config, JoinOpts) -> + JoinRes = + #{ + hotspot_name := HotspotName, + sc_packet := SCPacket, + pubkey_bin := PubKeyBin + } = send_device_join(Config, JoinOpts), timer:sleep(router_utils:join_timeout()), %% Waiting for report device status on that join request @@ -441,13 +460,7 @@ join_device(Config, JoinOpts) -> } } }), - #{ - app_key => AppKey, - dev_nonce => DevNonce, - hotspot_name => HotspotName, - stream => Stream, - pubkey_bin => PubKeyBin - }. + JoinRes. get_device_channels_worker(DeviceID) -> {ok, WorkerPid} = router_devices_sup:lookup_device_worker(DeviceID),