Skip to content

Commit e43f861

Browse files
committed
fix: handle read_range tasks in one worker per storage_module
this reduces the disk thrash that can occur when doing a cross-module repack
1 parent 111cfaa commit e43f861

13 files changed

+458
-224
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ jobs:
221221
ar_block,
222222
ar_block_cache,
223223
ar_chain_stats,
224+
ar_chunk_copy,
224225
ar_chunk_storage,
225226
ar_data_sync_worker_master,
226227
ar_deep_hash,

apps/arweave/e2e/ar_e2e.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ start_source_node(Node, PackingType, WalletFixture) ->
117117
}, true)
118118
),
119119

120+
?LOG_INFO("Source node ~p started.", [Node]),
121+
120122
%% Note: small chunks will be padded to 256 KiB. So B1 actually contains 3 chunks of data
121123
%% and B2 starts at a chunk boundary and contains 1 chunk of data.
122124
B1 = mine_block(Node, Wallet, floor(2.5 * ?DATA_CHUNK_SIZE)),
@@ -137,7 +139,7 @@ start_source_node(Node, PackingType, WalletFixture) ->
137139
{B3, ?PARTITION_SIZE + (8*?DATA_CHUNK_SIZE), ?DATA_CHUNK_SIZE}
138140
],
139141

140-
?LOG_INFO("Source node ~p started.", [Node]),
142+
?LOG_INFO("Source node ~p blocks mined.", [Node]),
141143

142144
SourcePacking = ar_e2e:packing_type_to_packing(PackingType, RewardAddr),
143145

apps/arweave/src/ar_chunk_copy.erl

Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
%%% @doc The module maintains a queue of processes fetching data from the network
2+
%%% and from the local storage modules.
3+
-module(ar_chunk_copy).
4+
5+
-behaviour(gen_server).
6+
7+
-export([start_link/1, register_workers/0, ready_for_work/1, read_range/4]).
8+
9+
-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]).
10+
11+
-include_lib("arweave/include/ar.hrl").
12+
-include_lib("arweave/include/ar_sup.hrl").
13+
-include_lib("arweave/include/ar_config.hrl").
14+
-include_lib("eunit/include/eunit.hrl").
15+
16+
-define(READ_RANGE_CHUNKS, 400).
17+
-define(MAX_ACTIVE_TASKS, 10).
18+
-define(MAX_QUEUED_TASKS, 50).
19+
20+
-record(worker_tasks, {
21+
worker,
22+
task_queue = queue:new(),
23+
active_count = 0
24+
}).
25+
26+
-record(state, {
27+
workers = #{}
28+
}).
29+
30+
%%%===================================================================
31+
%%% Public interface.
32+
%%%===================================================================
33+
34+
%% @doc Start the server.
35+
start_link(WorkerMap) ->
36+
gen_server:start_link({local, ?MODULE}, ?MODULE, WorkerMap, []).
37+
38+
register_workers() ->
39+
{Workers, WorkerMap} = register_read_workers(),
40+
ChunkCopy = ?CHILD_WITH_ARGS(ar_chunk_copy, worker, ar_chunk_copy, [WorkerMap]),
41+
Workers ++ [ChunkCopy].
42+
43+
register_read_workers() ->
44+
{ok, Config} = application:get_env(arweave, config),
45+
{Workers, WorkerMap} =
46+
lists:foldl(
47+
fun(StorageModule, {AccWorkers, AccWorkerMap}) ->
48+
StoreID = ar_storage_module:id(StorageModule),
49+
Name = list_to_atom("ar_data_sync_worker_" ++ StoreID),
50+
51+
Worker = ?CHILD_WITH_ARGS(ar_data_sync_worker, worker, Name, [Name]),
52+
53+
{[ Worker | AccWorkers], AccWorkerMap#{StoreID => Name}}
54+
end,
55+
{[], #{}},
56+
Config#config.storage_modules
57+
),
58+
{Workers, WorkerMap}.
59+
60+
%% @doc Returns true if we can accept new tasks. Will always return false if syncing is
61+
%% disabled (i.e. sync_jobs = 0).
62+
ready_for_work(StoreID) ->
63+
try
64+
gen_server:call(?MODULE, {ready_for_work, StoreID}, 1000)
65+
catch
66+
exit:{timeout,_} ->
67+
false
68+
end.
69+
70+
read_range(Start, End, OriginStoreID, TargetStoreID) ->
71+
case ar_chunk_copy:ready_for_work(OriginStoreID) of
72+
true ->
73+
Args = {Start, End, OriginStoreID, TargetStoreID},
74+
gen_server:cast(?MODULE, {read_range, Args}),
75+
true;
76+
false ->
77+
false
78+
end.
79+
80+
%%%===================================================================
81+
%%% Generic server callbacks.
82+
%%%===================================================================
83+
84+
init(WorkerMap) ->
85+
?LOG_DEBUG([{event, init}, {module, ?MODULE}, {worker_map, WorkerMap}]),
86+
Workers = maps:fold(
87+
fun(StoreID, Name, Acc) ->
88+
Acc#{StoreID => #worker_tasks{worker = Name}}
89+
end,
90+
#{},
91+
WorkerMap
92+
),
93+
ar_util:cast_after(1000, self(), process_queues),
94+
{ok, #state{
95+
workers = Workers
96+
}}.
97+
98+
handle_call({ready_for_work, StoreID}, _From, State) ->
99+
{reply, do_ready_for_work(StoreID, State), State};
100+
101+
handle_call(Request, _From, State) ->
102+
?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {request, Request}]),
103+
{reply, ok, State}.
104+
105+
handle_cast({read_range, Args}, State) ->
106+
?LOG_DEBUG([{event, read_range}, {module, ?MODULE}, {args, Args}]),
107+
{noreply, enqueue_read_range(Args, State)};
108+
109+
handle_cast(process_queues, State) ->
110+
?LOG_DEBUG([{event, process_queues}, {module, ?MODULE}]),
111+
ar_util:cast_after(1000, self(), process_queues),
112+
{noreply, process_queues(State)};
113+
114+
handle_cast({task_completed, {read_range, {Worker, _, Args}}}, State) ->
115+
?LOG_DEBUG([{event, task_completed}, {module, ?MODULE}, {worker, Worker}, {args, Args}]),
116+
{noreply, task_completed(Args, State)};
117+
118+
handle_cast(Cast, State) ->
119+
?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]),
120+
{noreply, State}.
121+
122+
handle_info(Message, State) ->
123+
?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {message, Message}]),
124+
{noreply, State}.
125+
126+
terminate(Reason, _State) ->
127+
?LOG_DEBUG([{event, terminate}, {module, ?MODULE}, {reason, io_lib:format("~p", [Reason])}]),
128+
ok.
129+
130+
%%%===================================================================
131+
%%% Private functions.
132+
%%%===================================================================
133+
134+
do_ready_for_work(StoreID, State) ->
135+
Worker = maps:get(StoreID, State#state.workers, undefined),
136+
case Worker of
137+
undefined ->
138+
?LOG_ERROR([{event, worker_not_found}, {module, ?MODULE}, {call, ready_for_work},
139+
{store_id, StoreID}]),
140+
false;
141+
_ ->
142+
queue:len(Worker#worker_tasks.task_queue) < ?MAX_QUEUED_TASKS
143+
end.
144+
145+
enqueue_read_range(Args, State) ->
146+
{_Start, _End, OriginStoreID, _TargetStoreID} = Args,
147+
Worker = maps:get(OriginStoreID, State#state.workers, undefined),
148+
case Worker of
149+
undefined ->
150+
?LOG_ERROR([{event, worker_not_found}, {module, ?MODULE},
151+
{call, enqueue_read_range}, {store_id, OriginStoreID}]),
152+
State;
153+
_ ->
154+
Worker2 = do_enqueue_read_range(Args, Worker),
155+
State#state{
156+
workers = maps:put(OriginStoreID, Worker2, State#state.workers)
157+
}
158+
end.
159+
160+
do_enqueue_read_range(Args, Worker) ->
161+
{Start, End, OriginStoreID, TargetStoreID} = Args,
162+
End2 = min(Start + (?READ_RANGE_CHUNKS * ?DATA_CHUNK_SIZE), End),
163+
Args2 = {Start, End2, OriginStoreID, TargetStoreID},
164+
?LOG_DEBUG([{event, enqueue_read_range}, {module, ?MODULE}, {args, Args2}]),
165+
TaskQueue = queue:in(Args2, Worker#worker_tasks.task_queue),
166+
Worker2 = Worker#worker_tasks{task_queue = TaskQueue},
167+
case End2 == End of
168+
true ->
169+
Worker2;
170+
false ->
171+
Args3 = {End2, End, OriginStoreID, TargetStoreID},
172+
do_enqueue_read_range(Args3, Worker2)
173+
end.
174+
175+
process_queues(State) ->
176+
Workers = State#state.workers,
177+
UpdatedWorkers = maps:map(
178+
fun(_Key, Worker) ->
179+
process_queue(Worker)
180+
end,
181+
Workers
182+
),
183+
State#state{workers = UpdatedWorkers}.
184+
185+
process_queue(Worker) ->
186+
case Worker#worker_tasks.active_count < ?MAX_ACTIVE_TASKS of
187+
true ->
188+
case queue:out(Worker#worker_tasks.task_queue) of
189+
{empty, _} ->
190+
Worker;
191+
{{value, Args}, Q2}->
192+
?LOG_DEBUG([{event, process_queue}, {module, ?MODULE},
193+
{active_count, Worker#worker_tasks.active_count}, {args, Args}]),
194+
gen_server:cast(Worker#worker_tasks.worker, {read_range, Args}),
195+
Worker2 = Worker#worker_tasks{
196+
task_queue = Q2,
197+
active_count = Worker#worker_tasks.active_count + 1
198+
},
199+
process_queue(Worker2)
200+
end;
201+
false ->
202+
Worker
203+
end.
204+
205+
task_completed(Args, State) ->
206+
{_Start, _End, OriginStoreID, _TargetStoreID} = Args,
207+
Worker = maps:get(OriginStoreID, State#state.workers, undefined),
208+
case Worker of
209+
undefined ->
210+
?LOG_ERROR([{event, worker_not_found}, {module, ?MODULE}, {call, task_completed},
211+
{store_id, OriginStoreID}]),
212+
State;
213+
_ ->
214+
?LOG_DEBUG([{event, task_completed}, {module, ?MODULE},
215+
{worker, Worker#worker_tasks.worker},
216+
{active_count, Worker#worker_tasks.active_count}, {args, Args}]),
217+
ActiveCount = Worker#worker_tasks.active_count - 1,
218+
Worker2 = Worker#worker_tasks{active_count = ActiveCount},
219+
Worker3 = process_queue(Worker2),
220+
State2 = State#state{
221+
workers = maps:put(OriginStoreID, Worker3, State#state.workers)
222+
},
223+
State2
224+
end.
225+
226+
%%%===================================================================
227+
%%% Tests. Included in the module so they can reference private
228+
%%% functions.
229+
%%%===================================================================
230+
231+
helpers_test_() ->
232+
[
233+
{timeout, 30, fun test_ready_for_work/0},
234+
{timeout, 30, fun test_enqueue_read_range/0},
235+
{timeout, 30, fun test_process_queue/0}
236+
].
237+
238+
test_ready_for_work() ->
239+
State = #state{
240+
workers = #{
241+
"store1" => #worker_tasks{
242+
task_queue = queue:from_list(lists:seq(1, 100))},
243+
"store2" => #worker_tasks{
244+
task_queue = queue:from_list(lists:seq(1, 1001))}
245+
}
246+
},
247+
?assertEqual(true, do_ready_for_work("store1", State)),
248+
?assertEqual(false, do_ready_for_work("store2", State)).
249+
250+
test_enqueue_read_range() ->
251+
ExpectedWorker = #worker_tasks{
252+
task_queue = queue:from_list(
253+
[{floor(2.5 * ?DATA_CHUNK_SIZE), floor(12.5 * ?DATA_CHUNK_SIZE),
254+
"store1", "store2"},
255+
{floor(12.5 * ?DATA_CHUNK_SIZE), floor(22.5 * ?DATA_CHUNK_SIZE),
256+
"store1", "store2"},
257+
{floor(22.5 * ?DATA_CHUNK_SIZE), floor(30 * ?DATA_CHUNK_SIZE),
258+
"store1", "store2"}]
259+
)
260+
},
261+
Worker = do_enqueue_read_range(
262+
{floor(2.5 * ?DATA_CHUNK_SIZE), 30 * ?DATA_CHUNK_SIZE, "store1", "store2"},
263+
#worker_tasks{task_queue = queue:new()}
264+
),
265+
?assertEqual(
266+
queue:to_list(ExpectedWorker#worker_tasks.task_queue),
267+
queue:to_list(Worker#worker_tasks.task_queue)).
268+
269+
test_process_queue() ->
270+
Worker1 = #worker_tasks{
271+
active_count = ?MAX_ACTIVE_TASKS
272+
},
273+
?assertEqual(Worker1, process_queue(Worker1)),
274+
275+
Worker2 = #worker_tasks{
276+
active_count = ?MAX_ACTIVE_TASKS + 1
277+
},
278+
?assertEqual(Worker2, process_queue(Worker2)),
279+
280+
Worker3 = process_queue(
281+
#worker_tasks{
282+
active_count = ?MAX_ACTIVE_TASKS - 2,
283+
task_queue = queue:from_list(
284+
[{floor(2.5 * ?DATA_CHUNK_SIZE), floor(12.5 * ?DATA_CHUNK_SIZE),
285+
"store1", "store2"},
286+
{floor(12.5 * ?DATA_CHUNK_SIZE), floor(22.5 * ?DATA_CHUNK_SIZE),
287+
"store1", "store2"},
288+
{floor(22.5 * ?DATA_CHUNK_SIZE), floor(30 * ?DATA_CHUNK_SIZE),
289+
"store1", "store2"}])
290+
}
291+
),
292+
ExpectedWorker3 = #worker_tasks{
293+
active_count = ?MAX_ACTIVE_TASKS,
294+
task_queue = queue:from_list(
295+
[{floor(22.5 * ?DATA_CHUNK_SIZE), floor(30 * ?DATA_CHUNK_SIZE),
296+
"store1", "store2"}]
297+
)
298+
},
299+
?assertEqual(
300+
ExpectedWorker3#worker_tasks.active_count, Worker3#worker_tasks.active_count),
301+
?assertEqual(
302+
queue:to_list(ExpectedWorker3#worker_tasks.task_queue),
303+
queue:to_list(Worker3#worker_tasks.task_queue)).
304+

0 commit comments

Comments
 (0)