diff --git a/apps/arweave/include/ar.hrl b/apps/arweave/include/ar.hrl index 929c346d4..4e66a7ff3 100644 --- a/apps/arweave/include/ar.hrl +++ b/apps/arweave/include/ar.hrl @@ -7,6 +7,8 @@ %% (e.g. bin/test or bin/shell) -define(IS_TEST, erlang:get_cookie() == test). +-define(DATA_SIZE(Term), erlang:byte_size(term_to_binary(Term))). + %% The mainnet name. Does not change at the hard forks. -ifndef(NETWORK_NAME). -ifdef(DEBUG). diff --git a/apps/arweave/include/ar_data_discovery.hrl b/apps/arweave/include/ar_data_discovery.hrl index 21373d5fe..e6d1ff8d9 100644 --- a/apps/arweave/include/ar_data_discovery.hrl +++ b/apps/arweave/include/ar_data_discovery.hrl @@ -3,23 +3,23 @@ %% to have some data there are asked for the intervals they have and check which of them %% cross the desired interval. -ifdef(DEBUG). --define(NETWORK_DATA_BUCKET_SIZE, 10000000). +-define(NETWORK_DATA_BUCKET_SIZE, 10_000_000). % 10 MB -else. --define(NETWORK_DATA_BUCKET_SIZE, 10000000000). % 10 GB +-define(NETWORK_DATA_BUCKET_SIZE, 10_000_000_000). % 10 GB -endif. %% The maximum number of synced intervals shared with peers. -ifdef(DEBUG). -define(MAX_SHARED_SYNCED_INTERVALS_COUNT, 20). -else. --define(MAX_SHARED_SYNCED_INTERVALS_COUNT, 10000). +-define(MAX_SHARED_SYNCED_INTERVALS_COUNT, 10_000). -endif. %% The upper limit for the size of a sync record serialized using Erlang Term Format. -define(MAX_ETF_SYNC_RECORD_SIZE, 80 * ?MAX_SHARED_SYNCED_INTERVALS_COUNT). %% The upper limit for the size of the serialized (in Erlang Term Format) sync buckets. --define(MAX_SYNC_BUCKETS_SIZE, 100000). +-define(MAX_SYNC_BUCKETS_SIZE, 100_000). %% How many peers with the biggest synced shares in the given bucket to query per bucket %% per sync job iteration. diff --git a/apps/arweave/include/ar_data_sync.hrl b/apps/arweave/include/ar_data_sync.hrl index 87522277b..9da841eac 100644 --- a/apps/arweave/include/ar_data_sync.hrl +++ b/apps/arweave/include/ar_data_sync.hrl @@ -49,10 +49,6 @@ %% The frequency of storing the server state on disk. -define(STORE_STATE_FREQUENCY_MS, 30000). -%% Do not repeat the missing interval collection procedure while there are more than this -%% number of intervals in the queue. --define(SYNC_INTERVALS_MAX_QUEUE_SIZE, 10). - %% The maximum number of chunks currently being downloaded or processed. -ifdef(DEBUG). -define(SYNC_BUFFER_SIZE, 100). @@ -213,5 +209,7 @@ %% The threshold controlling the brief accumuluation of the chunks in the queue before %% the actual disk dump, to reduce the chance of out-of-order write causing disk %% fragmentation. - store_chunk_queue_threshold = ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD + store_chunk_queue_threshold = ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD, + %% Cache mapping peers to /data_sync_record responses + all_peers_intervals = #{} }). diff --git a/apps/arweave/src/ar_data_discovery.erl b/apps/arweave/src/ar_data_discovery.erl index 4320570a9..91aedda1e 100644 --- a/apps/arweave/src/ar_data_discovery.erl +++ b/apps/arweave/src/ar_data_discovery.erl @@ -57,7 +57,16 @@ get_bucket_peers(Bucket, Cursor, Peers) -> case ets:next(?MODULE, Cursor) of '$end_of_table' -> UniquePeers = sets:to_list(sets:from_list(Peers)), - pick_peers(UniquePeers, ?QUERY_BEST_PEERS_COUNT); + PickedPeers = pick_peers(UniquePeers, ?QUERY_BEST_PEERS_COUNT), + ?LOG_DEBUG([ + {event, get_bucket_peers}, + {pid, self()}, + {bucket, Bucket}, + {peers, length(Peers)}, + {unique_peers, length(UniquePeers)}, + {picked_peers, length(PickedPeers)} + ]), + PickedPeers; {Bucket, _Share, Peer} = Key -> get_bucket_peers(Bucket, Key, [Peer | Peers]); _ -> @@ -65,6 +74,7 @@ get_bucket_peers(Bucket, Cursor, Peers) -> PickedPeers = pick_peers(UniquePeers, ?QUERY_BEST_PEERS_COUNT), ?LOG_DEBUG([ {event, get_bucket_peers}, + {pid, self()}, {bucket, Bucket}, {peers, length(Peers)}, {unique_peers, length(UniquePeers)}, @@ -115,6 +125,7 @@ handle_cast(update_network_data_map, #state{ peers_pending = N } = State) get_sync_buckets(Peer); Error -> ?LOG_DEBUG([{event, failed_to_fetch_sync_buckets}, + {peer, ar_util:format_peer(Peer)}, {reason, io_lib:format("~p", [Error])}]) end end diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 03809020a..d5a543814 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -24,7 +24,7 @@ -ifdef(DEBUG). -define(COLLECT_SYNC_INTERVALS_FREQUENCY_MS, 5000). -else. --define(COLLECT_SYNC_INTERVALS_FREQUENCY_MS, 300000). +-define(COLLECT_SYNC_INTERVALS_FREQUENCY_MS, 300_000). -endif. %%%=================================================================== @@ -707,13 +707,38 @@ handle_cast({pack_and_store_chunk, Args} = Cast, end; %% Schedule syncing of the unsynced intervals. Choose a peer for each of the intervals. +%% There are two message payloads: +%% 1. collect_peer_intervals +%% Start the collection process over the full storage_module range. +%% 2. {collect_peer_intervals, Start, End} +%% Collect intervals for the specified range. This interface is used to pick up where +%% we left off after a pause. There are 2 main conditions that can trigger a pause: +%% a. Insufficient disk space. Will pause until disk space frees up +%% b. Sync queue is busy. Will pause until previously queued intervals are scheduled to the +%% ar_data_sync_worker_master for syncing. handle_cast(collect_peer_intervals, State) -> - #sync_data_state{ range_start = Start, range_end = End, sync_intervals_queue = Q, - store_id = StoreID, disk_pool_threshold = DiskPoolThreshold } = State, + #sync_data_state{ range_start = Start, range_end = End } = State, + gen_server:cast(self(), {collect_peer_intervals, Start, End}), + {noreply, State}; + +handle_cast({collect_peer_intervals, Start, End}, State) when Start >= End -> + #sync_data_state{ store_id = StoreID } = State, + ?LOG_DEBUG([{event, collect_peer_intervals_end}, {pid, self()}, {store_id, StoreID}, + {start, Start}]), + %% We've finished collecting intervals for the whole storage_module range. Schedule + %% the collection process to restart in ?COLLECT_SYNC_INTERVALS_FREQUENCY_MS and + %% clear the all_peers_intervals cache so we can start fresh and requery peers for + %% their advertised intervals. + ar_util:cast_after(?COLLECT_SYNC_INTERVALS_FREQUENCY_MS, self(), collect_peer_intervals), + {noreply, State#sync_data_state{ all_peers_intervals = #{} }}; +handle_cast({collect_peer_intervals, Start, End}, State) -> + #sync_data_state{ sync_intervals_queue = Q, + store_id = StoreID, disk_pool_threshold = DiskPoolThreshold, + all_peers_intervals = AllPeersIntervals } = State, IsJoined = case ar_node:is_joined() of false -> - ar_util:cast_after(1000, self(), collect_peer_intervals), + ar_util:cast_after(1000, self(), {collect_peer_intervals, Start, End}), false; true -> true @@ -727,7 +752,7 @@ handle_cast(collect_peer_intervals, State) -> true -> true; _ -> - ar_util:cast_after(30000, self(), collect_peer_intervals), + ar_util:cast_after(30_000, self(), {collect_peer_intervals, Start, End}), false end end, @@ -736,51 +761,56 @@ handle_cast(collect_peer_intervals, State) -> false -> true; true -> - case gb_sets:size(Q) > ?SYNC_INTERVALS_MAX_QUEUE_SIZE of + %% Q is the number of chunks that we've already queued for syncing. We need + %% to manage the queue length. + %% 1. Periodically sync_intervals will pull from Q and send work to + %% ar_data_sync_worker_master. We need to make sure Q is long enough so + %% that we never starve ar_data_sync_worker_master of work. + %% 2. On the flip side we don't want Q to get so long as to trigger an + %% out-of-memory condition. In the extreme case we could collect and + %% enqueue all chunks in a full 3.6TB storage_module. A Q of this length + %% would have a roughly 500MB memory footprint per storage_module. For a + %% node that is syncing multiple storage modules, this can add up fast. + %% 3. We also want to make sure we are using the most up to date information + %% we can. Every time we add a task to the Q we're locking in a specific + %% view of Peer data availability. If that peer goes offline before we + %% get to the task it can result in wasted work or syncing stalls. A + %% shorter queue helps ensure we're always syncing from the "best" peers + %% at any point in time. + %% + %% With all that in mind, we'll pause collection once the Q hits roughly + %% a bucket size worth of chunks. This number is slightly arbitrary and we + %% should feel free to adjust as necessary. + case gb_sets:size(Q) > (?NETWORK_DATA_BUCKET_SIZE / ?DATA_CHUNK_SIZE) of true -> - ar_util:cast_after(500, self(), collect_peer_intervals), + ar_util:cast_after(500, self(), {collect_peer_intervals, Start, End}), true; false -> false end end, - IsBelowDiskPoolThreshold = + AllPeersIntervals2 = case IsSyncQueueBusy of true -> - false; + AllPeersIntervals; false -> case Start >= DiskPoolThreshold of true -> - ar_util:cast_after(500, self(), collect_peer_intervals), - false; + ar_util:cast_after(500, self(), {collect_peer_intervals, Start, End}), + AllPeersIntervals; false -> - true + %% All checks have pased, find and enqueue intervals for one + %% sync bucket worth of chunks starting at offset Start + find_peer_intervals( + Start, min(End, DiskPoolThreshold), StoreID, AllPeersIntervals) end end, - case IsBelowDiskPoolThreshold of - false -> - ok; - true -> - {ok, Config} = application:get_env(arweave, config), - SyncWorkers = Config#config.sync_jobs, - %% We do not want to bother peers if we still have a lot of data scheduled for - %% local copying and repacking. - case ar_data_sync_worker_master:get_total_task_count() > SyncWorkers * 2 of - true -> - ar_util:cast_after(1000, self(), collect_peer_intervals); - false -> - Self = self(), - monitor(process, spawn( - fun() -> - find_peer_intervals(Start, min(End, DiskPoolThreshold), StoreID, - Self, #{}), - ar_util:cast_after(?COLLECT_SYNC_INTERVALS_FREQUENCY_MS, Self, - collect_peer_intervals) - end - )) - end - end, - {noreply, State}; + %% While we are working through the storage_module range we'll maintain a cache + %% of the mapping of peers to their advertised intervals. This ensures we don't query + %% each peer's /data_sync_record endpoint too often. Once we've made a full pass through + %% the range, we'll clear the cache so that we can incorporate any peer interval changes + %% the next time through. + {noreply, State#sync_data_state{ all_peers_intervals = AllPeersIntervals2 }}; handle_cast({enqueue_intervals, []}, State) -> {noreply, State}; @@ -802,7 +832,7 @@ handle_cast({enqueue_intervals, Intervals}, State) -> %% This is an approximation. The intent is to enqueue one sync_bucket at a time - but %% due to the selection of each peer's intervals, the total number of bytes may be - %% less than a full sync_bucket. But for the purposes of distrubiting requests among + %% less than a full sync_bucket. But for the purposes of distributing requests among %% many peers - the approximation is fine (and much cheaper to calculate than taking %% the sum of all the peer intervals). TotalChunksToEnqueue = ?DEFAULT_SYNC_BUCKET_SIZE div ?DATA_CHUNK_SIZE, @@ -815,6 +845,10 @@ handle_cast({enqueue_intervals, Intervals}, State) -> {Q2, QIntervals2} = enqueue_intervals( ar_util:shuffle_list(Intervals), ChunksPerPeer, {Q, QIntervals}), + ?LOG_DEBUG([{event, enqueue_intervals}, {pid, self()}, + {queue_before, gb_sets:size(Q)}, {queue_after, gb_sets:size(Q2)}, + {num_peers, NumPeers}, {chunks_per_peer, ChunksPerPeer}]), + {noreply, State#sync_data_state{ sync_intervals_queue = Q2, sync_intervals_queue_intervals = QIntervals2 }}; handle_cast(sync_intervals, State) -> @@ -2154,20 +2188,39 @@ get_unsynced_intervals_from_other_storage_modules(TargetStoreID, StoreID, RangeS end end. -find_peer_intervals(Start, End, _StoreID, _Self, _AllPeersIntervals) when Start >= End -> - ok; -find_peer_intervals(Start, End, StoreID, Self, AllPeersIntervals) -> +find_peer_intervals(Start, End, StoreID, _AllPeersIntervals) when Start >= End -> + %% We've reached the end of the range, next time through we'll start with a clear cache. + ?LOG_DEBUG([{event, find_peer_intervals_end}, {pid, self()}, {store_id, StoreID}, + {start, Start}]), + #{}; +find_peer_intervals(Start, End, StoreID, AllPeersIntervals) -> Start2 = Start - Start rem ?NETWORK_DATA_BUCKET_SIZE, End2 = min(Start2 + ?NETWORK_DATA_BUCKET_SIZE, End), UnsyncedIntervals = get_unsynced_intervals(Start, End2, StoreID), - AllPeersIntervals2 = - case ar_intervals:is_empty(UnsyncedIntervals) of + + Bucket = Start div ?NETWORK_DATA_BUCKET_SIZE, + {ok, Config} = application:get_env(arweave, config), + Peers = + case Config#config.sync_from_local_peers_only of true -> - AllPeersIntervals; + Config#config.local_peers; false -> - find_peer_intervals2(Start, UnsyncedIntervals, Self, AllPeersIntervals) + ar_data_discovery:get_bucket_peers(Bucket) end, - find_peer_intervals(End2, End, StoreID, Self, AllPeersIntervals2). + ?LOG_DEBUG([{event, find_peer_intervals}, {pid, self()}, {store_id, StoreID}, + {start, Start}, {bucket, Bucket}, + {peers, io_lib:format("~p", [[ar_util:format_peer(Peer) || Peer <- Peers]])}]), + + %% Schedule the next sync bucket. The cast handler logic will pause collection if needed. + gen_server:cast(self(), {collect_peer_intervals, End2, End}), + + %% The updated AllPeersIntervals cache is returned so it can be added to the State + case ar_intervals:is_empty(UnsyncedIntervals) of + true -> + AllPeersIntervals; + false -> + find_peer_intervals2(Start, Peers, UnsyncedIntervals, AllPeersIntervals) + end. %% @doc Collect the unsynced intervals between Start and End excluding the blocklisted %% intervals. @@ -2193,24 +2246,7 @@ get_unsynced_intervals(Start, End, Intervals, StoreID) -> end end. -find_peer_intervals2(Start, UnsyncedIntervals, Self, AllPeersIntervals) -> - Bucket = Start div ?NETWORK_DATA_BUCKET_SIZE, - {ok, Config} = application:get_env(arweave, config), - Peers = - case Config#config.sync_from_local_peers_only of - true -> - Config#config.local_peers; - false -> - ar_data_discovery:get_bucket_peers(Bucket) - end, - case ar_intervals:is_empty(UnsyncedIntervals) of - true -> - AllPeersIntervals; - false -> - find_peer_intervals3(Start, UnsyncedIntervals, Self, AllPeersIntervals, Peers) - end. - -find_peer_intervals3(Start, UnsyncedIntervals, Self, AllPeersIntervals, Peers) -> +find_peer_intervals2(Start, Peers, UnsyncedIntervals, AllPeersIntervals) -> Intervals = ar_util:pmap( fun(Peer) -> @@ -2226,22 +2262,6 @@ find_peer_intervals3(Start, UnsyncedIntervals, Self, AllPeersIntervals, Peers) - end, Peers ), - AllPeersIntervals2 = - lists:foldl( - fun ({Peer, _, PeerIntervals, Left}, Acc) -> - case ar_intervals:is_empty(PeerIntervals) of - true -> - Acc; - false -> - Right = element(1, ar_intervals:largest(PeerIntervals)), - maps:put(Peer, {Right, Left, PeerIntervals}, Acc) - end; - (_, Acc) -> - Acc - end, - AllPeersIntervals, - Intervals - ), EnqueueIntervals = lists:foldl( fun ({Peer, SoughtIntervals, _, _}, Acc) -> @@ -2257,8 +2277,24 @@ find_peer_intervals3(Start, UnsyncedIntervals, Self, AllPeersIntervals, Peers) - [], Intervals ), - gen_server:cast(Self, {enqueue_intervals, EnqueueIntervals}), - AllPeersIntervals2. + gen_server:cast(self(), {enqueue_intervals, EnqueueIntervals}), + + %% Update and return AllPeersIntervals + lists:foldl( + fun ({Peer, _, PeerIntervals, Left}, Acc) -> + case ar_intervals:is_empty(PeerIntervals) of + true -> + Acc; + false -> + Right = element(1, ar_intervals:largest(PeerIntervals)), + maps:put(Peer, {Right, Left, PeerIntervals}, Acc) + end; + (_, Acc) -> + Acc + end, + AllPeersIntervals, + Intervals + ). %% @doc %% @return {ok, Intervals, PeerIntervals, Left} | Error @@ -2290,6 +2326,10 @@ enqueue_intervals([{Peer, Intervals} | Rest], ChunksToEnqueue, {Q, QIntervals}) enqueue_intervals(Rest, ChunksToEnqueue, {Q2, QIntervals2}). enqueue_peer_intervals(Peer, Intervals, ChunksToEnqueue, {Q, QIntervals}) -> + ?LOG_DEBUG([{event, enqueue_peer_intervals}, {pid, self()}, + {peer, ar_util:format_peer(Peer)}, {num_intervals, gb_sets:size(Intervals)}, + {chunks_to_enqueue, ChunksToEnqueue}]), + %% Only keep unique intervals. We may get some duplicates for two %% reasons: %% 1) find_peer_intervals might choose the same interval several diff --git a/apps/arweave/src/ar_data_sync_worker_master.erl b/apps/arweave/src/ar_data_sync_worker_master.erl index 38119c48e..bed3ec61c 100644 --- a/apps/arweave/src/ar_data_sync_worker_master.erl +++ b/apps/arweave/src/ar_data_sync_worker_master.erl @@ -187,6 +187,9 @@ cut_peer_queue(Peer, TaskQueue, MaxActive, EMA) -> TasksToCut when TasksToCut > 0 -> %% The peer has a large queue of tasks. Reduce the queue size by removing the %% oldest tasks. + ?LOG_DEBUG([{event, cut_peer_queue}, + {peer, Peer}, {max_active, MaxActive}, {ema, EMA}, + {max_queue, MaxQueue}, {tasks_to_cut, TasksToCut}]), {TaskQueue2, _} = queue:split(MaxQueue, TaskQueue), update_counters(queued, sync_range, ar_util:format_peer(Peer), -TasksToCut), TaskQueue2; diff --git a/apps/arweave/src/ar_http.erl b/apps/arweave/src/ar_http.erl index 962d900e6..b08281285 100644 --- a/apps/arweave/src/ar_http.erl +++ b/apps/arweave/src/ar_http.erl @@ -218,12 +218,14 @@ handle_info({gun_down, PID, Protocol, Reason, _KilledStreams, _UnprocessedStream handle_info({'DOWN', _Ref, process, PID, Reason}, #state{ pid_by_peer = PIDByPeer, status_by_pid = StatusByPID } = State) -> - ?LOG_DEBUG([{event, gun_connection_process_down}, - {reason, io_lib:format("~p", [Reason])}]), case maps:get(PID, StatusByPID, not_found) of not_found -> + ?LOG_DEBUG([{event, gun_connection_process_down}, {pid, PID}, {peer, unknown}, + {reason, io_lib:format("~p", [Reason])}]), {noreply, State}; {Status, _MonitorRef, Peer} -> + ?LOG_DEBUG([{event, gun_connection_process_down}, {pid, PID}, + {peer, ar_util:format_peer(Peer)}, {reason, io_lib:format("~p", [Reason])}]), PIDByPeer2 = maps:remove(Peer, PIDByPeer), StatusByPID2 = maps:remove(PID, StatusByPID), case Status of diff --git a/apps/arweave/src/ar_metrics.erl b/apps/arweave/src/ar_metrics.erl index ad09eba3a..8311ee6bc 100644 --- a/apps/arweave/src/ar_metrics.erl +++ b/apps/arweave/src/ar_metrics.erl @@ -419,7 +419,11 @@ register(MetricsDir) -> "is intended for - for 'read_range' tasks this will be 'localhost'."}]), prometheus_counter:new([{name, process_functions}, - {help, "Sampling active processes"}, {labels, [process]}]). + {labels, [process]}, + {help, "Sampling active functions. The 'process' label is a fully qualified " + "function name with the format 'process~module:function/arith'"}]), + prometheus_gauge:new([{name, process_memory}, + {help, "Sampling active process memory usage."}, {labels, [process]}]). %% @doc Store the given metric in a file. store(Name) -> diff --git a/apps/arweave/src/ar_node_worker.erl b/apps/arweave/src/ar_node_worker.erl index 5e999de38..7a901321b 100644 --- a/apps/arweave/src/ar_node_worker.erl +++ b/apps/arweave/src/ar_node_worker.erl @@ -296,11 +296,11 @@ handle_info({join, BI, Blocks}, State) -> false -> Blocks3 end, - ar_events:send(node_state, {initializing, Blocks4}), ets:insert(node_state, [ {recent_block_index, lists:sublist(BI, ?BLOCK_INDEX_HEAD_LEN)}, {joined_blocks, Blocks4} ]), + ar_events:send(node_state, initializing), {noreply, State}; handle_info(wallets_ready, State) -> diff --git a/apps/arweave/src/ar_nonce_limiter.erl b/apps/arweave/src/ar_nonce_limiter.erl index 31425b421..43f521e13 100644 --- a/apps/arweave/src/ar_nonce_limiter.erl +++ b/apps/arweave/src/ar_nonce_limiter.erl @@ -562,7 +562,8 @@ handle_cast(Cast, State) -> ?LOG_WARNING("event: unhandled_cast, cast: ~p", [Cast]), {noreply, State}. -handle_info({event, node_state, {initializing, Blocks}}, State) -> +handle_info({event, node_state, initializing}, State) -> + [{joined_blocks, Blocks}] = ets:lookup(node_state, joined_blocks), {noreply, handle_initialized(lists:sublist(Blocks, ?STORE_BLOCKS_BEHIND_CURRENT), State)}; handle_info({event, node_state, {validated_pre_fork_2_6_block, B}}, State) -> @@ -1141,7 +1142,8 @@ test_applies_validated_steps() -> NextSeed2 = crypto:strong_rand_bytes(32), InitialOutput = crypto:strong_rand_bytes(32), B1 = test_block(1, InitialOutput, Seed, NextSeed, [], []), - ar_events:send(node_state, {initializing, [B1]}), + ets:insert(node_state, [{joined_blocks, [B1]}]), + ar_events:send(node_state, initializing), true = ar_util:do_until(fun() -> get_current_step_number() == 1 end, 100, 1000), {ok, Output2, _} = compute(2, InitialOutput), B2 = test_block(2, Output2, Seed, NextSeed, [], [Output2]), diff --git a/apps/arweave/src/ar_packing_server.erl b/apps/arweave/src/ar_packing_server.erl index 92dfd4859..ed1b7c344 100644 --- a/apps/arweave/src/ar_packing_server.erl +++ b/apps/arweave/src/ar_packing_server.erl @@ -234,6 +234,10 @@ log_packing_rate(PackingRate, Max) -> "Estimated maximum rate: ~.2f chunks/s.~n", [PackingRate, Max]). +calculate_throttle_delay(0, _PackingRate) -> + 0; +calculate_throttle_delay(_SpawnSchedulers, 0) -> + 0; calculate_throttle_delay(SpawnSchedulers, PackingRate) -> Load = PackingRate / (SpawnSchedulers * (1000 / (?PACKING_LATENCY_MS))), case Load >= 1 of @@ -601,3 +605,18 @@ pack_test() -> Cases )), ?assertEqual(length(PackedList), sets:size(sets:from_list(PackedList))). + +calculate_throttle_delay_test() -> + %% 1000 / ?PACKING_LATENCY_MS = 16.666666 + ?assertEqual(0, calculate_throttle_delay(1, 17), + "PackingRate > SpawnSchedulers capacity -> no throttle"), + ?assertEqual(0, calculate_throttle_delay(8, 1000), + "PackingRate > SpawnSchedulers capacity -> no throttle"), + ?assertEqual(2, calculate_throttle_delay(1, 16), + "PackingRate < SpawnSchedulers capacity -> throttle"), + ?assertEqual(15, calculate_throttle_delay(8, 100), + "PackingRate < SpawnSchedulers capacity -> throttle"), + ?assertEqual(0, calculate_throttle_delay(0, 100), + "0 schedulers -> no throttle"), + ?assertEqual(0, calculate_throttle_delay(8, 0), + "no packing -> no throttle"). \ No newline at end of file diff --git a/apps/arweave/src/ar_process_sampler.erl b/apps/arweave/src/ar_process_sampler.erl index 0d7517ddf..4a0a101df 100644 --- a/apps/arweave/src/ar_process_sampler.erl +++ b/apps/arweave/src/ar_process_sampler.erl @@ -6,7 +6,8 @@ -export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(SAMPLE_INTERVAL, 1000). +%% Sample relatively infrequently - every 5 seconds - to minimize the impact on the system. +-define(SAMPLE_INTERVAL, 5000). %% API start_link() -> @@ -25,8 +26,11 @@ handle_cast(_Msg, State) -> handle_info(sample, State) -> Processes = erlang:processes(), - ProcessNames = lists:filtermap(fun(Pid) -> process_function(Pid) end, Processes), - lists:foreach(fun(Name) -> prometheus_counter:inc(process_functions, [Name]) end, ProcessNames), + ProcessData = lists:filtermap(fun(Pid) -> process_function(Pid) end, Processes), + lists:foreach(fun({ProcessName, FunctionName, Bytes}) -> + prometheus_counter:inc(process_functions, [FunctionName]), + prometheus_gauge:set(process_memory, [ProcessName], Bytes) + end, ProcessData), {noreply, State}; handle_info(_Info, State) -> @@ -40,13 +44,24 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions process_function(Pid) -> - case process_info(Pid, [current_function, status]) of - [{current_function, {?MODULE, process_function, _A}}, _] -> + case process_info(Pid, [current_function, registered_name, status, memory]) of + [{current_function, {?MODULE, process_function, _A}}, _, _, _] -> false; - [{current_function, {erlang, process_info, _A}}, _] -> + [{current_function, {erlang, process_info, _A}}, _, _, _] -> false; - [{current_function, {M, F, A}}, {status, running}] -> - {true, atom_to_list(M) ++ ":" ++ atom_to_list(F) ++ "/" ++ integer_to_list(A)}; + [{current_function, {M, F, A}}, {registered_name, Name}, {status, running}, + {memory, Bytes}] -> + ProcessName = process_name(Name), + FunctionName = function_name(ProcessName, M, F, A), + {true, {ProcessName, FunctionName, Bytes}}; _ -> false - end. \ No newline at end of file + end. + +process_name([]) -> + process_name(unknown); +process_name(ProcessName) -> + atom_to_list(ProcessName). + +function_name(ProcessName, M, F, A) -> + ProcessName ++ "~" ++ atom_to_list(M) ++ ":" ++ atom_to_list(F) ++ "/" ++ integer_to_list(A). \ No newline at end of file