Skip to content

Commit 9906730

Browse files
committed
WIP
1 parent d0548b1 commit 9906730

File tree

9 files changed

+187
-486
lines changed

9 files changed

+187
-486
lines changed

apps/arweave/include/ar_config.hrl

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,6 @@
119119
%% The default rocksdb WAL sync interval, 1 minute.
120120
-define(DEFAULT_ROCKSDB_WAL_SYNC_INTERVAL_S, 60).
121121

122-
%% The maximum allowed total size (in bytes) of the entropies generated
123-
%% for the 2.9 replication.
124-
-define(DEFAULT_MAX_REPLICA_2_9_ENTROPY_CACHE_SIZE, 33_554_432). % 8_388_608 * 4.
125-
126122
%% The number of 2.9 storage modules allowed to prepare the storage at a time.
127123
-ifdef(AR_TEST).
128124
-define(DEFAULT_REPLICA_2_9_WORKERS, 2).
@@ -227,7 +223,6 @@
227223
pool_api_key = not_set,
228224
pool_worker_name = not_set,
229225
replica_2_9_workers = ?DEFAULT_REPLICA_2_9_WORKERS,
230-
replica_2_9_entropy_cache_size = ?DEFAULT_MAX_REPLICA_2_9_ENTROPY_CACHE_SIZE,
231226
%% Undocumented/unsupported options
232227
chunk_storage_file_size = ?CHUNK_GROUP_SIZE,
233228
rocksdb_flush_interval_s = ?DEFAULT_ROCKSDB_FLUSH_INTERVAL_S,

apps/arweave/src/ar_chunk_storage.erl

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -580,27 +580,37 @@ do_prepare_replica_2_9(State) ->
580580
is_recorded;
581581
false ->
582582
%% Get all the entropies needed to encipher the chunk at BucketEndOffset.
583-
Entropies = ar_entropy_storage:generate_entropies(
584-
RewardAddr, BucketEndOffset, SubChunkStart),
585-
EntropyKeys = ar_entropy_storage:generate_entropy_keys(
586-
RewardAddr, BucketEndOffset, SubChunkStart),
587-
SliceIndex = ar_replica_2_9:get_slice_index(BucketEndOffset),
588-
%% If we are not at the beginning of the entropy, shift the offset to
589-
%% the left. store_entropy will traverse the entire 2.9 partition shifting
590-
%% the offset by sector size. It may happen some sub-chunks will be written
591-
%% to the neighbouring storage module(s) on the left or on the right
592-
%% since the storage module may be configured to be smaller than the
593-
%% partition.
594-
BucketEndOffset2 = ar_entropy_storage:shift_entropy_offset(
595-
BucketEndOffset, -SliceIndex),
596-
%% The end of a recall partition (3.6TB) may fall in the middle of a chunk, so
597-
%% we'll use the padded offset to end the store_entropy iteration.
598-
PartitionEnd = (Partition + 1) * ?PARTITION_SIZE,
599-
PaddedPartitionEnd =
600-
get_chunk_bucket_end(ar_block:get_chunk_padded_offset(PartitionEnd)),
601-
ar_entropy_storage:store_entropy(
602-
Entropies, BucketEndOffset2, SubChunkStart, PaddedPartitionEnd,
603-
EntropyKeys, RewardAddr, 0, 0)
583+
Entropies = prometheus_histogram:observe_duration(
584+
replica_2_9_entropy_duration_milliseconds, ["32"],
585+
fun() ->
586+
ar_entropy_storage:generate_entropies(
587+
RewardAddr, BucketEndOffset, SubChunkStart)
588+
end),
589+
590+
case Entropies of
591+
{error, Reason} ->
592+
{error, Reason};
593+
_ ->
594+
EntropyKeys = ar_entropy_storage:generate_entropy_keys(
595+
RewardAddr, BucketEndOffset, SubChunkStart),
596+
SliceIndex = ar_replica_2_9:get_slice_index(BucketEndOffset),
597+
%% If we are not at the beginning of the entropy, shift the offset to
598+
%% the left. store_entropy will traverse the entire 2.9 partition shifting
599+
%% the offset by sector size. It may happen some sub-chunks will be written
600+
%% to the neighbouring storage module(s) on the left or on the right
601+
%% since the storage module may be configured to be smaller than the
602+
%% partition.
603+
BucketEndOffset2 = ar_entropy_storage:shift_entropy_offset(
604+
BucketEndOffset, -SliceIndex),
605+
%% The end of a recall partition (3.6TB) may fall in the middle of a chunk, so
606+
%% we'll use the padded offset to end the store_entropy iteration.
607+
PartitionEnd = (Partition + 1) * ?PARTITION_SIZE,
608+
PaddedPartitionEnd =
609+
get_chunk_bucket_end(ar_block:get_chunk_padded_offset(PartitionEnd)),
610+
ar_entropy_storage:store_entropy(
611+
Entropies, BucketEndOffset2, SubChunkStart, PaddedPartitionEnd,
612+
EntropyKeys, RewardAddr, 0, 0)
613+
end
604614
end,
605615
?LOG_DEBUG([{event, do_prepare_replica_2_9}, {store_id, StoreID},
606616
{start, Start}, {padded_end_offset, BucketEndOffset},

apps/arweave/src/ar_data_sync.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3006,9 +3006,15 @@ process_store_chunk_queue(State, StartLen) ->
30063006
orelse Now - Timestamp > ?STORE_CHUNK_QUEUE_FLUSH_TIME_THRESHOLD of
30073007
true ->
30083008
{{_Offset, _Timestamp, _Ref, ChunkArgs, Args}, Q2} = gb_sets:take_smallest(Q),
3009+
{_Packing, _Chunk, _AbsoluteOffset, _TXRoot, ChunkSize} = ChunkArgs,
3010+
3011+
StartTime = erlang:monotonic_time(),
3012+
3013+
store_chunk2(ChunkArgs, Args, State),
3014+
3015+
ar_metrics:record_rate_metric(
3016+
StartTime, ChunkSize, chunk_store_rate, [StoreID]),
30093017

3010-
prometheus_histogram:observe_duration(chunk_store_duration_milliseconds, [],
3011-
fun() -> store_chunk2(ChunkArgs, Args, State) end),
30123018
decrement_chunk_cache_size(),
30133019
State2 = State#sync_data_state{ store_chunk_queue = Q2,
30143020
store_chunk_queue_len = Len - 1,

apps/arweave/src/ar_device_lock.erl

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ do_acquire_lock(Mode, StoreID, State) ->
168168
Device = maps:get(StoreID, State#state.store_id_to_device),
169169
DeviceLock = maps:get(Device, State#state.device_locks, sync),
170170
PrepareLocks = count_prepare_locks(State),
171-
MaxPrepareLocks = 128,
171+
MaxPrepareLocks = 4,
172172
{Acquired, NewDeviceLock} = case Mode of
173173
sync ->
174174
%% Can only aquire a sync lock if the device is in sync mode
@@ -254,19 +254,26 @@ count_prepare_locks(State) ->
254254
log_device_locks(State) ->
255255
StoreIDToDevice = State#state.store_id_to_device,
256256
DeviceLocks = State#state.device_locks,
257-
maps:fold(
258-
fun(StoreID, Device, _) ->
257+
SortedStoreIDList = lists:sort(
258+
fun({StoreID1, Device1}, {StoreID2, Device2}) ->
259+
case Device1 =:= Device2 of
260+
true -> StoreID1 =< StoreID2;
261+
false -> Device1 < Device2
262+
end
263+
end,
264+
maps:to_list(StoreIDToDevice)),
265+
lists:foreach(
266+
fun({StoreID, Device}) ->
259267
DeviceLock = maps:get(Device, DeviceLocks, sync),
260268
Status = case DeviceLock of
261269
sync -> sync;
262270
{prepare, StoreID} -> prepare;
263271
{repack, StoreID} -> repack;
264272
_ -> paused
265273
end,
266-
?LOG_INFO([{event, device_lock_status}, {store_id, StoreID}, {status, Status}])
274+
?LOG_INFO([{event, device_lock_status}, {device, Device}, {store_id, StoreID}, {status, Status}])
267275
end,
268-
ok,
269-
StoreIDToDevice
276+
SortedStoreIDList
270277
).
271278

272279
%%%===================================================================

apps/arweave/src/ar_entropy_storage.erl

Lines changed: 72 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -90,19 +90,60 @@ delete_record(PaddedEndOffset, StoreID) ->
9090

9191
generate_missing_entropy(PaddedEndOffset, RewardAddr) ->
9292
Entropies = generate_entropies(RewardAddr, PaddedEndOffset, 0),
93-
EntropyIndex = ar_replica_2_9:get_slice_index(PaddedEndOffset),
94-
take_combined_entropy_by_index(Entropies, EntropyIndex).
93+
case Entropies of
94+
{error, Reason} ->
95+
{error, Reason};
96+
_ ->
97+
EntropyIndex = ar_replica_2_9:get_slice_index(PaddedEndOffset),
98+
take_combined_entropy_by_index(Entropies, EntropyIndex)
99+
end.
95100

96101
%% @doc Returns all the entropies needed to encipher the chunk at PaddedEndOffset.
97-
%% ar_packing_server:get_replica_2_9_entropy/3 will query a cached entropy, or generate it
98-
%% if it is not cached.
99102
generate_entropies(_RewardAddr, _PaddedEndOffset, SubChunkStart)
100103
when SubChunkStart == ?DATA_CHUNK_SIZE ->
101104
[];
102105
generate_entropies(RewardAddr, PaddedEndOffset, SubChunkStart) ->
103106
SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE,
104-
[ar_packing_server:get_replica_2_9_entropy(RewardAddr, PaddedEndOffset, SubChunkStart)
105-
| generate_entropies(RewardAddr, PaddedEndOffset, SubChunkStart + SubChunkSize)].
107+
EntropyTasks = lists:map(
108+
fun(Offset) ->
109+
Ref = make_ref(),
110+
ar_packing_server:request_entropy_generation(
111+
Ref, self(), {RewardAddr, PaddedEndOffset, Offset}),
112+
Ref
113+
end,
114+
lists:seq(SubChunkStart, ?DATA_CHUNK_SIZE - SubChunkSize, SubChunkSize)
115+
),
116+
Entropies = collect_entropies(EntropyTasks, []),
117+
case Entropies of
118+
{error, _Reason} ->
119+
flush_entropy_messages();
120+
_ ->
121+
ok
122+
end,
123+
Entropies.
124+
125+
collect_entropies([], Acc) ->
126+
lists:reverse(Acc);
127+
collect_entropies([Ref | Rest], Acc) ->
128+
receive
129+
{entropy_generated, Ref, {error, Reason}} ->
130+
?LOG_ERROR([{event, failed_to_generate_replica_2_9_entropy}, {error, Reason}]),
131+
{error, Reason};
132+
{entropy_generated, Ref, Entropy} ->
133+
collect_entropies(Rest, [Entropy | Acc])
134+
after 60000 ->
135+
?LOG_ERROR([{event, entropy_generation_timeout}, {ref, Ref}]),
136+
{error, timeout}
137+
end.
138+
139+
flush_entropy_messages() ->
140+
?LOG_INFO([{event, flush_entropy_messages}]),
141+
receive
142+
{entropy_generated, _, _} ->
143+
flush_entropy_messages()
144+
after 0 ->
145+
ok
146+
end.
106147

107148
generate_entropy_keys(_RewardAddr, _Offset, SubChunkStart)
108149
when SubChunkStart == ?DATA_CHUNK_SIZE ->
@@ -179,20 +220,10 @@ store_entropy(Entropies,
179220
StoreID2,
180221
RewardAddr),
181222

182-
EndTime = erlang:monotonic_time(),
183-
ElapsedTime =
184-
erlang:convert_time_unit(EndTime - StartTime,
185-
native,
186-
microsecond),
187-
%% bytes per second
188-
WriteRate =
189-
case ElapsedTime > 0 of
190-
true -> 1000000 * byte_size(ChunkEntropy) div ElapsedTime;
191-
false -> 0
192-
end,
193-
prometheus_gauge:set(replica_2_9_entropy_store_rate,
194-
[StoreID2],
195-
WriteRate),
223+
ar_metrics:record_rate_metric(StartTime,
224+
byte_size(ChunkEntropy),
225+
replica_2_9_entropy_write_rate,
226+
[StoreID2]),
196227
From ! {store_entropy_sub_chunk_written, WaitNAcc + 1}
197228
end),
198229
WaitNAcc + 1
@@ -240,34 +271,33 @@ record_chunk(PaddedEndOffset, Chunk, RewardAddr, StoreID, FileIndex, IsPrepared)
240271
true ->
241272
ar_chunk_storage:get(StartOffset, StartOffset, StoreID)
242273
end,
243-
case ReadEntropy of
274+
RecordChunk = case ReadEntropy of
244275
{error, _} = Error2 ->
245-
release_semaphore(Filepath),
246276
Error2;
247277
not_found ->
248-
release_semaphore(Filepath),
249278
{error, not_prepared_yet2};
250279
missing_entropy ->
251280
Packing = {replica_2_9, RewardAddr},
252281
Entropy = generate_missing_entropy(PaddedEndOffset, RewardAddr),
253-
PackedChunk = ar_packing_server:encipher_replica_2_9_chunk(Chunk, Entropy),
254-
Result = ar_chunk_storage:record_chunk(
255-
PaddedEndOffset, PackedChunk, Packing, StoreID, FileIndex),
256-
release_semaphore(Filepath),
257-
Result;
282+
case Entropy of
283+
{error, Reason} ->
284+
{error, Reason};
285+
_ ->
286+
PackedChunk = ar_packing_server:encipher_replica_2_9_chunk(Chunk, Entropy),
287+
ar_chunk_storage:record_chunk(
288+
PaddedEndOffset, PackedChunk, Packing, StoreID, FileIndex)
289+
end;
258290
no_entropy_yet ->
259-
Result = ar_chunk_storage:record_chunk(
260-
PaddedEndOffset, Chunk, unpacked_padded, StoreID, FileIndex),
261-
release_semaphore(Filepath),
262-
Result;
291+
ar_chunk_storage:record_chunk(
292+
PaddedEndOffset, Chunk, unpacked_padded, StoreID, FileIndex);
263293
{_EndOffset, Entropy} ->
264294
Packing = {replica_2_9, RewardAddr},
265295
PackedChunk = ar_packing_server:encipher_replica_2_9_chunk(Chunk, Entropy),
266-
Result = ar_chunk_storage:record_chunk(
267-
PaddedEndOffset, PackedChunk, Packing, StoreID, FileIndex),
268-
release_semaphore(Filepath),
269-
Result
270-
end.
296+
ar_chunk_storage:record_chunk(
297+
PaddedEndOffset, PackedChunk, Packing, StoreID, FileIndex)
298+
end,
299+
release_semaphore(Filepath),
300+
RecordChunk.
271301

272302
%% @doc Return the byte (>= ChunkStartOffset, < ChunkEndOffset)
273303
%% that necessarily belongs to the chunk stored
@@ -316,6 +346,7 @@ record_entropy(ChunkEntropy, BucketEndOffset, StoreID, RewardAddr) ->
316346
{error, _} = Error ->
317347
Error;
318348
{_, UnpackedChunk} ->
349+
ar_sync_record:delete(EndOffset, EndOffset - ?DATA_CHUNK_SIZE, ar_data_sync, StoreID),
319350
ar_packing_server:encipher_replica_2_9_chunk(UnpackedChunk, ChunkEntropy)
320351
end;
321352
false ->
@@ -328,12 +359,6 @@ record_entropy(ChunkEntropy, BucketEndOffset, StoreID, RewardAddr) ->
328359
{error, _} = Error2 ->
329360
Error2;
330361
_ ->
331-
case IsUnpackedChunkRecorded of
332-
true ->
333-
ar_sync_record:delete(EndOffset, EndOffset - ?DATA_CHUNK_SIZE, ar_data_sync, StoreID);
334-
false ->
335-
ok
336-
end,
337362
case ar_chunk_storage:write_chunk(EndOffset, Chunk, #{}, StoreID) of
338363
{ok, Filepath} ->
339364
ets:insert(chunk_storage_file_index,
@@ -347,8 +372,9 @@ record_entropy(ChunkEntropy, BucketEndOffset, StoreID, RewardAddr) ->
347372

348373
case Result of
349374
{error, Reason} ->
350-
?LOG_ERROR([{event, failed_to_store_replica_2_9_sub_chunk_entropy},
375+
?LOG_ERROR([{event, failed_to_store_replica_2_9_chunk_entropy},
351376
{filepath, Filepath},
377+
{byte, Byte},
352378
{padded_end_offset, EndOffset},
353379
{bucket_end_offset, BucketEndOffset},
354380
{store_id, StoreID},
@@ -423,7 +449,8 @@ shift_entropy_offset(Offset, SectorCount) ->
423449
acquire_semaphore(Filepath) ->
424450
case ets:insert_new(ar_entropy_storage, {{semaphore, Filepath}}) of
425451
false ->
426-
?LOG_DEBUG([{event, details_store_chunk}, {section, waiting_on_semaphore}, {filepath, Filepath}]),
452+
?LOG_DEBUG([
453+
{event, details_store_chunk}, {section, waiting_on_semaphore}, {filepath, Filepath}]),
427454
timer:sleep(20),
428455
acquire_semaphore(Filepath);
429456
true ->

0 commit comments

Comments
 (0)