From 7181ea95bdb6e8d615e5d53819939ded070eeef7 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Fri, 22 Sep 2023 12:57:31 -0700 Subject: [PATCH] drop packet when org cannot be fetched from console propogate errors from the console api all the way up in the case where a specific org balance cannot be fetched. Avoiding the case where we assume the balance is zero, and the next packet that comes through within the cache time will see that value of zero and force the depletion on Console through another event. --- src/apis/router_console_api.erl | 2 +- src/apis/router_console_dc_tracker.erl | 67 +++++++++++++------- src/device/router_device_routing.erl | 4 +- src/device/router_device_worker.erl | 25 +++++--- test/router_console_dc_tracker_SUITE.erl | 79 +++++++++++++++++++++++- test/test_utils.erl | 33 +++++++--- 6 files changed, 165 insertions(+), 45 deletions(-) diff --git a/src/apis/router_console_api.erl b/src/apis/router_console_api.erl index ad3a33e86..1bd8b4672 100644 --- a/src/apis/router_console_api.erl +++ b/src/apis/router_console_api.erl @@ -1044,7 +1044,7 @@ get_org_(OrgID) -> lager:debug("org ~p not found", [OrgID]), End = erlang:system_time(millisecond), ok = router_metrics:console_api_observe(get_org, not_found, End - Start), - {error, not_found}; + {error, org_not_found}; _Other -> End = erlang:system_time(millisecond), ok = router_metrics:console_api_observe(get_org, error, End - Start), diff --git a/src/apis/router_console_dc_tracker.erl b/src/apis/router_console_dc_tracker.erl index 28031324a..81fb30793 100644 --- a/src/apis/router_console_dc_tracker.erl +++ b/src/apis/router_console_dc_tracker.erl @@ -127,23 +127,28 @@ has_enough_dc(OrgID, PayloadSize) when is_binary(OrgID) -> lager:warning("failed to calculate dc amount ~p", [_Reason]), {error, failed_calculate_dc}; DCAmount -> - {Balance0, Nonce} = + MaybeBalance = case lookup(OrgID) of {error, not_found} -> fetch_and_save_org_balance(OrgID); - {ok, 0, _N} -> - fetch_and_save_org_balance(OrgID); + %% {ok, 0, _N} -> + %% fetch_and_save_org_balance(OrgID); {ok, B, N} -> - {B, N} + {ok, {B, N}} end, - Balance1 = Balance0 - DCAmount, - case {Balance1 >= 0, Nonce > 0} of - {false, _} -> - {error, {not_enough_dc, Balance0, DCAmount}}; - {_, false} -> - {error, bad_nonce}; - {true, true} -> - {ok, OrgID, Balance1, Nonce} + case MaybeBalance of + {error, _} = Err -> + Err; + {ok, {Balance0, Nonce}} -> + Balance1 = Balance0 - DCAmount, + case {Balance1 >= 0, Nonce > 0} of + {false, _} -> + {error, {not_enough_dc, Balance0, DCAmount}}; + {_, false} -> + {error, bad_nonce}; + {true, true} -> + {ok, OrgID, Balance1, Nonce} + end end end; has_enough_dc(Device, PayloadSize) -> @@ -209,6 +214,7 @@ init(Args) -> lager:info("~p init with ~p", [?SERVER, Args]), {PubKey, _, _} = router_blockchain:get_key(), PubkeyBin = libp2p_crypto:pubkey_to_bin(PubKey), + _ = erlang:send_after(0, self(), prefetch_orgs), _ = erlang:send_after(500, self(), post_init), {ok, #state{pubkey_bin = PubkeyBin}}. @@ -220,6 +226,25 @@ handle_cast(_Msg, State) -> lager:warning("rcvd unknown cast msg: ~p", [_Msg]), {noreply, State}. +handle_info(prefetch_orgs, #state{} = State) -> + case router_console_api:get_orgs() of + {ok, OrgList} -> + lists:foreach( + fun( + #{ + <<"id">> := OrgID, + <<"dc_balance">> := Balance, + <<"dc_balance_nonce">> := Nonce + } + ) -> + insert(OrgID, Balance, Nonce) + end, + OrgList + ); + {error, _Reason} -> + lager:warning("failed to prefetch orgs: ~p", [_Reason]) + end, + {noreply, State}; handle_info(post_init, #state{} = State) -> case router_blockchain:privileged_maybe_get_blockchain() of undefined -> @@ -293,19 +318,19 @@ txn_filter_fun(PubKeyBin, Txn) -> Payee == PubKeyBin andalso Memo =/= 0 end. --spec fetch_and_save_org_balance(binary()) -> {non_neg_integer(), non_neg_integer()}. +-spec fetch_and_save_org_balance(binary()) -> {ok, {non_neg_integer(), non_neg_integer()}} | {error, any()}. fetch_and_save_org_balance(OrgID) -> case router_console_api:get_org(OrgID) of - {error, _} -> - {0, 0}; + {error, _} = Err -> + Err; {ok, Map} -> Balance = maps:get(<<"dc_balance">>, Map, 0), case maps:get(<<"dc_balance_nonce">>, Map, 0) of 0 -> - {0, 0}; + {ok, {0, 0}}; Nonce -> ok = insert(OrgID, Balance, Nonce), - {Balance, Nonce} + {ok, {Balance, Nonce}} end end. @@ -388,7 +413,7 @@ has_enough_dc_test() -> Balance = 2, PayloadSize = 48, ?assertEqual( - {error, {not_enough_dc, 0, Balance}}, has_enough_dc(OrgID, PayloadSize) + {error, deal_with_it}, has_enough_dc(OrgID, PayloadSize) ), ?assertEqual(ok, refill(OrgID, Nonce, Balance)), ?assertEqual({ok, OrgID, 0, 1}, has_enough_dc(OrgID, PayloadSize)), @@ -405,7 +430,7 @@ charge_test() -> Balance = 2, PayloadSize = 48, ?assertEqual( - {error, {not_enough_dc, 0, Balance}}, has_enough_dc(OrgID, PayloadSize) + {error, deal_with_it}, has_enough_dc(OrgID, PayloadSize) ), ?assertEqual(ok, refill(OrgID, Nonce, Balance)), ?assertEqual({ok, 0, 1}, charge(OrgID, PayloadSize)), @@ -417,11 +442,11 @@ charge_test() -> current_balance_test() -> ok = init_ets(), meck:new(router_console_api, [passthrough]), - meck:expect(router_console_api, get_org, fun(_OrgID) -> {error, 0} end), + meck:expect(router_console_api, get_org, fun(_OrgID) -> {error, deal_with_it} end), OrgID = <<"ORG_ID">>, Nonce = 1, Balance = 100, - ?assertEqual({0, 0}, current_balance(OrgID)), + ?assertEqual({error, deal_with_it}, current_balance(OrgID)), ?assertEqual(ok, refill(OrgID, Nonce, Balance)), ?assertEqual({100, 1}, current_balance(OrgID)), ?assert(meck:validate(router_console_api)), diff --git a/src/device/router_device_routing.erl b/src/device/router_device_routing.erl index 838b8b2a2..106a922b0 100644 --- a/src/device/router_device_routing.erl +++ b/src/device/router_device_routing.erl @@ -781,7 +781,7 @@ check_device_is_active(Device, PubKeyBin) -> ) -> ok | {error, ?DEVICE_NO_DC}. check_device_balance(PayloadSize, Device, PubKeyBin) -> try router_console_dc_tracker:has_enough_dc(Device, PayloadSize) of - {error, _Reason} -> + {error, {not_enough_dc, _, _}} -> ok = router_utils:event_uplink_dropped_not_enough_dc( erlang:system_time(millisecond), router_device:fcnt(Device), @@ -789,6 +789,8 @@ check_device_balance(PayloadSize, Device, PubKeyBin) -> PubKeyBin ), {error, ?DEVICE_NO_DC}; + {error, _} = Err -> + Err; {ok, _OrgID, _Balance, _Nonce} -> ok catch diff --git a/src/device/router_device_worker.erl b/src/device/router_device_worker.erl index 97298e616..ebb37acd3 100644 --- a/src/device/router_device_worker.erl +++ b/src/device/router_device_worker.erl @@ -1669,16 +1669,21 @@ validate_frame_(PacketFCnt, Packet, PubKeyBin, HotspotRegion, Device0, OfferCach PHash = blockchain_helium_packet_v1:packet_hash(Packet), case maybe_charge(Device0, PayloadSize, PubKeyBin, PHash, OfferCache) of {error, Reason} -> - %% REVIEW: Do we want to update region and datarate for an uncharged packet? - DeviceUpdates = [{fcnt, PacketFCnt}, {location, PubKeyBin}], - Device1 = router_device:update(DeviceUpdates, Device0), - case FPort of - 0 when FOptsLen == 0 -> - {error, {not_enough_dc, Reason, Device1}}; - 0 when FOptsLen /= 0 -> - {error, {not_enough_dc, Reason, Device0}}; - _N -> - {error, {not_enough_dc, Reason, Device1}} + case Reason of + {not_enough_dc, _, _} -> + %% REVIEW: Do we want to update region and datarate for an uncharged packet? + DeviceUpdates = [{fcnt, PacketFCnt}, {location, PubKeyBin}], + Device1 = router_device:update(DeviceUpdates, Device0), + case FPort of + 0 when FOptsLen == 0 -> + {error, {not_enough_dc, Reason, Device1}}; + 0 when FOptsLen /= 0 -> + {error, {not_enough_dc, Reason, Device0}}; + _N -> + {error, {not_enough_dc, Reason, Device1}} + end; + _ -> + {error, Reason} end; {ok, Balance, Nonce} -> case FPort of diff --git a/test/router_console_dc_tracker_SUITE.erl b/test/router_console_dc_tracker_SUITE.erl index 6d11e41e8..8582f7655 100644 --- a/test/router_console_dc_tracker_SUITE.erl +++ b/test/router_console_dc_tracker_SUITE.erl @@ -6,7 +6,12 @@ end_per_testcase/2 ]). --export([dc_test/1, burned_test/1]). +-export([ + dc_test/1, + join_cannot_fetch_org_balance_test/1, + data_cannot_fetch_org_balance_test/1, + burned_test/1 +]). -include_lib("helium_proto/include/blockchain_state_channel_v1_pb.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -31,7 +36,12 @@ %% @end %%-------------------------------------------------------------------- all() -> - [dc_test, burned_test]. + [ + dc_test, + join_cannot_fetch_org_balance_test, + data_cannot_fetch_org_balance_test, + burned_test + ]. %%-------------------------------------------------------------------- %% TEST CASE SETUP @@ -49,6 +59,71 @@ end_per_testcase(TestCase, Config) -> %% TEST CASES %%-------------------------------------------------------------------- +join_cannot_fetch_org_balance_test(Config) -> + true = ets:delete(router_console_dc_tracker_ets, ?CONSOLE_ORG_ID), + + % Send two joins to cache the get_org return value. + _ = test_utils:send_device_join(Config), + _ = test_utils:send_device_join(Config), + + receive + {console_event, <<"uplink_dropped">>, <<"uplink_dropped_not_enough_dc">>, _} -> + ct:fail(do_not_drop_failed_dc) + after 1250 -> + ok + end, + + ok. + +data_cannot_fetch_org_balance_test(Config) -> + + #{ + pubkey_bin := PubKeyBin1, + stream := Stream + } = test_utils:join_device(Config), + + + {ok, DB, CF} = router_db:get_devices(), + WorkerID = router_devices_sup:id(?CONSOLE_DEVICE_ID), + {ok, Device0} = router_device:get_by_id(DB, CF, WorkerID), + + %% The device has a session, remove the org so it needs to be fetched and + %% will not be found. + true = ets:delete(router_console_dc_tracker_ets, ?CONSOLE_ORG_ID), + + %% Simulate multiple hotspot sending data + Stream ! + {send, + test_utils:frame_packet( + ?UNCONFIRMED_UP, + PubKeyBin1, + router_device:nwk_s_key(Device0), + router_device:app_s_key(Device0), + 0 + )}, + + #{public := PubKey2} = libp2p_crypto:generate_keys(ecc_compact), + PubKeyBin2 = libp2p_crypto:pubkey_to_bin(PubKey2), + + Stream ! + {send, + test_utils:frame_packet( + ?UNCONFIRMED_UP, + PubKeyBin2, + router_device:nwk_s_key(Device0), + router_device:app_s_key(Device0), + 0 + )}, + + receive + {console_event, <<"uplink_dropped">>, <<"uplink_dropped_not_enough_dc">>, _} -> + ct:fail(do_not_drop_failed_dc) + after 1250 -> + ok + end, + + ok. + dc_test(Config) -> #{ pubkey_bin := PubKeyBin1, diff --git a/test/test_utils.erl b/test/test_utils.erl index 0e87bd451..3a3ec63ae 100644 --- a/test/test_utils.erl +++ b/test/test_utils.erl @@ -31,6 +31,7 @@ wait_state_channel_packet/1, join_payload/2, join_packet/3, join_packet/4, + send_device_join/1, send_device_join/2, join_device/1, join_device/2, frame_payload/6, frame_packet/5, frame_packet/6, @@ -328,10 +329,10 @@ add_oui(Config) -> [{oui, OUI} | Config] end. -join_device(Config) -> - join_device(Config, #{}). +send_device_join(Config) -> + send_device_join(Config, #{}). -join_device(Config, JoinOpts) -> +send_device_join(Config, JoinOpts) -> AppKey = proplists:get_value(app_key, Config), DevNonce = crypto:strong_rand_bytes(2), @@ -377,7 +378,25 @@ join_device(Config, JoinOpts) -> blockchain_state_channel_v1_pb:encode_msg(#blockchain_state_channel_message_v1_pb{ msg = {packet, SCPacket} })}, + #{ + app_key => AppKey, + dev_nonce => DevNonce, + hotspot_name => HotspotName, + stream => Stream, + pubkey_bin => PubKeyBin, + sc_packet => SCPacket + }. +join_device(Config) -> + join_device(Config, #{}). + +join_device(Config, JoinOpts) -> + JoinRes = + #{ + hotspot_name := HotspotName, + sc_packet := SCPacket, + pubkey_bin := PubKeyBin + } = send_device_join(Config, JoinOpts), timer:sleep(router_utils:join_timeout()), %% Waiting for report device status on that join request @@ -441,13 +460,7 @@ join_device(Config, JoinOpts) -> } } }), - #{ - app_key => AppKey, - dev_nonce => DevNonce, - hotspot_name => HotspotName, - stream => Stream, - pubkey_bin => PubKeyBin - }. + JoinRes. get_device_channels_worker(DeviceID) -> {ok, WorkerPid} = router_devices_sup:lookup_device_worker(DeviceID),