Skip to content

Commit cec9db1

Browse files
Lev BermanJamesPiechota
authored andcommitted
Improve invalidate bad data record interface
1 parent b46e305 commit cec9db1

File tree

4 files changed

+42
-38
lines changed

4 files changed

+42
-38
lines changed

apps/arweave/src/ar_chunk_copy.erl

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,9 @@ test_ready_for_work() ->
239239
State = #state{
240240
workers = #{
241241
"store1" => #worker_tasks{
242-
task_queue = queue:from_list(lists:seq(1, 100))},
242+
task_queue = queue:from_list(lists:seq(1, ?MAX_QUEUED_TASKS - 1))},
243243
"store2" => #worker_tasks{
244-
task_queue = queue:from_list(lists:seq(1, 1001))}
244+
task_queue = queue:from_list(lists:seq(1, ?MAX_QUEUED_TASKS))}
245245
}
246246
},
247247
?assertEqual(true, do_ready_for_work("store1", State)),
@@ -250,16 +250,29 @@ test_ready_for_work() ->
250250
test_enqueue_read_range() ->
251251
ExpectedWorker = #worker_tasks{
252252
task_queue = queue:from_list(
253-
[{floor(2.5 * ?DATA_CHUNK_SIZE), floor(12.5 * ?DATA_CHUNK_SIZE),
254-
"store1", "store2"},
255-
{floor(12.5 * ?DATA_CHUNK_SIZE), floor(22.5 * ?DATA_CHUNK_SIZE),
256-
"store1", "store2"},
257-
{floor(22.5 * ?DATA_CHUNK_SIZE), floor(30 * ?DATA_CHUNK_SIZE),
258-
"store1", "store2"}]
253+
[{
254+
floor(2.5 * ?DATA_CHUNK_SIZE),
255+
floor((2.5 + ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE),
256+
"store1", "store2"
257+
},
258+
{
259+
floor((2.5 + ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE),
260+
floor((2.5 + 2 * ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE),
261+
"store1", "store2"
262+
},
263+
{
264+
floor((2.5 + 2 * ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE),
265+
floor((2.5 + 3 * ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE),
266+
"store1", "store2"
267+
}]
259268
)
260269
},
261270
Worker = do_enqueue_read_range(
262-
{floor(2.5 * ?DATA_CHUNK_SIZE), 30 * ?DATA_CHUNK_SIZE, "store1", "store2"},
271+
{
272+
floor(2.5 * ?DATA_CHUNK_SIZE),
273+
floor((2.5 + 3 * ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE),
274+
"store1", "store2"
275+
},
263276
#worker_tasks{task_queue = queue:new()}
264277
),
265278
?assertEqual(

apps/arweave/src/ar_data_sync.erl

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ join(RecentBI) ->
8383
add_tip_block(BlockTXPairs, RecentBI) ->
8484
gen_server:cast(ar_data_sync_default, {add_tip_block, BlockTXPairs, RecentBI}).
8585

86-
invalidate_bad_data_record(Byte, AbsoluteEndOffset, StoreID, Case) ->
86+
invalidate_bad_data_record(AbsoluteEndOffset, ChunkSize, StoreID, Case) ->
8787
gen_server:cast(name(StoreID), {invalidate_bad_data_record,
88-
{Byte, AbsoluteEndOffset, StoreID, Case}}).
88+
{AbsoluteEndOffset, ChunkSize, StoreID, Case}}).
8989

9090
%% @doc The condition which is true if the chunk is too small compared to the proof.
9191
%% Small chunks make syncing slower and increase space amplification. A small chunk
@@ -1706,8 +1706,8 @@ get_chunk(Offset, SeekOffset, Pack, Packing, StoredPacking, StoreID, RequestOrig
17061706
{store_id, StoreID},
17071707
{expected_chunk_id, ar_util:encode(ChunkID)},
17081708
{chunk_id, ar_util:encode(ComputedChunkID)}]),
1709-
invalidate_bad_data_record({AbsoluteOffset - ChunkSize,
1710-
AbsoluteOffset, StoreID, get_chunk_invalid_id}),
1709+
invalidate_bad_data_record({AbsoluteOffset, ChunkSize,
1710+
StoreID, get_chunk_invalid_id}),
17111711
{error, chunk_not_found}
17121712
end
17131713
end
@@ -1792,7 +1792,7 @@ read_chunk_with_metadata(
17921792
{modules_covering_seek_offset, ModuleIDs},
17931793
{chunk_data_key, ar_util:encode(ChunkDataKey)},
17941794
{read_fun, ReadFun}]),
1795-
invalidate_bad_data_record({SeekOffset - 1, AbsoluteOffset, StoreID,
1795+
invalidate_bad_data_record({AbsoluteOffset, ChunkSize, StoreID,
17961796
failed_to_read_chunk_data_path}),
17971797
{error, chunk_not_found};
17981798
{error, Error} ->
@@ -1827,33 +1827,25 @@ read_chunk_with_metadata(
18271827
end
18281828
end.
18291829

1830-
invalidate_bad_data_record({Byte, AbsoluteEndOffset, StoreID, Type}) ->
1831-
case AbsoluteEndOffset - Byte =< ?DATA_CHUNK_SIZE of
1830+
invalidate_bad_data_record({AbsoluteEndOffset, ChunkSize, StoreID, Type}) ->
1831+
[{_, T}] = ets:lookup(ar_data_sync_state, disk_pool_threshold),
1832+
case AbsoluteEndOffset > T of
18321833
true ->
18331834
[{_, T}] = ets:lookup(ar_data_sync_state, disk_pool_threshold),
18341835
case AbsoluteEndOffset > T of
18351836
true ->
18361837
%% Do not invalidate fresh records - a reorg may be in progress.
18371838
ok;
18381839
false ->
1839-
invalidate_bad_data_record2({Byte, AbsoluteEndOffset, StoreID, Type})
1840+
invalidate_bad_data_record2({AbsoluteEndOffset, ChunkSize, StoreID, Type})
18401841
end;
18411842
false ->
1842-
?LOG_WARNING([{event, bad_offset_while_invalidating_data_record}, {type, Type},
1843-
{range_start, Byte}, {range_end, AbsoluteEndOffset}, {store_id, StoreID}]),
1844-
ok
1843+
invalidate_bad_data_record2({AbsoluteEndOffset, ChunkSize, StoreID, Type})
18451844
end.
18461845

1847-
invalidate_bad_data_record2({Byte, AbsoluteEndOffset, StoreID, Type}) ->
1846+
invalidate_bad_data_record2({AbsoluteEndOffset, ChunkSize, StoreID, Type}) ->
18481847
PaddedEndOffset = ar_block:get_chunk_padded_offset(AbsoluteEndOffset),
1849-
MaybePaddedStartOffset = ar_block:get_chunk_padded_offset(Byte),
1850-
StartOffset =
1851-
case MaybePaddedStartOffset == PaddedEndOffset of
1852-
true ->
1853-
PaddedEndOffset - ?DATA_CHUNK_SIZE;
1854-
false ->
1855-
MaybePaddedStartOffset
1856-
end,
1848+
StartOffset = AbsoluteEndOffset - ChunkSize,
18571849
?LOG_WARNING([{event, invalidating_bad_data_record}, {type, Type},
18581850
{range_start, StartOffset}, {range_end, PaddedEndOffset},
18591851
{store_id, StoreID}]),
@@ -1876,24 +1868,24 @@ invalidate_bad_data_record2({Byte, AbsoluteEndOffset, StoreID, Type}) ->
18761868

18771869
remove_invalid_sync_records(PaddedEndOffset, StartOffset, StoreID) ->
18781870
Remove1 = ar_sync_record:delete(PaddedEndOffset, StartOffset, ar_data_sync, StoreID),
1879-
IsSmallChunk = PaddedEndOffset - StartOffset < ?DATA_CHUNK_SIZE,
1871+
IsSmallChunkBeforeThreshold = PaddedEndOffset - StartOffset < ?DATA_CHUNK_SIZE,
18801872
Remove2 =
1881-
case {Remove1, IsSmallChunk} of
1873+
case {Remove1, IsSmallChunkBeforeThreshold} of
18821874
{ok, false} ->
18831875
ar_sync_record:delete(PaddedEndOffset, StartOffset,
18841876
ar_chunk_storage, StoreID);
18851877
_ ->
18861878
Remove1
18871879
end,
18881880
Remove3 =
1889-
case {Remove2, IsSmallChunk} of
1881+
case {Remove2, IsSmallChunkBeforeThreshold} of
18901882
{ok, false} ->
18911883
ar_sync_record:delete(PaddedEndOffset, StartOffset,
18921884
ar_chunk_storage_replica_2_9_1_entropy, StoreID);
18931885
_ ->
18941886
Remove2
18951887
end,
1896-
case {Remove3, IsSmallChunk} of
1888+
case {Remove3, IsSmallChunkBeforeThreshold} of
18971889
{ok, false} ->
18981890
ar_sync_record:delete(PaddedEndOffset, StartOffset,
18991891
ar_chunk_storage_replica_2_9_1_unpacked, StoreID);
@@ -1922,16 +1914,15 @@ validate_fetched_chunk(Args) ->
19221914
false ->
19231915
log_chunk_error(RequestOrigin, failed_to_validate_chunk_proofs,
19241916
[{absolute_end_offset, Offset}, {store_id, StoreID}]),
1925-
StartOffset = Offset - ChunkSize,
1926-
invalidate_bad_data_record({StartOffset, Offset, StoreID,
1917+
invalidate_bad_data_record({Offset, ChunkSize, StoreID,
19271918
failed_to_validate_chunk_proofs}),
19281919
false
19291920
end;
19301921
{_BlockStart, _BlockEnd, TXRoot2} ->
19311922
log_chunk_error(stored_chunk_invalid_tx_root,
19321923
[{end_offset, Offset}, {tx_root, ar_util:encode(TXRoot2)},
19331924
{stored_tx_root, ar_util:encode(TXRoot)}, {store_id, StoreID}]),
1934-
invalidate_bad_data_record({Offset - ChunkSize, Offset, StoreID,
1925+
invalidate_bad_data_record({Offset, ChunkSize, StoreID,
19351926
stored_chunk_invalid_tx_root}),
19361927
false
19371928
end

apps/arweave/src/ar_data_sync_worker.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ read_range2(MessagesRemaining, {Start, End, OriginStoreID, TargetStoreID}) ->
176176
case ReadChunk of
177177
not_found ->
178178
ar_data_sync:invalidate_bad_data_record(
179-
Start, AbsoluteOffset, OriginStoreID, read_range_chunk_not_found),
179+
AbsoluteOffset, ChunkSize, OriginStoreID, read_range_chunk_not_found),
180180
read_range2(MessagesRemaining-1,
181181
{Start + ChunkSize, End, OriginStoreID, TargetStoreID});
182182
{error, Error} ->

apps/arweave/src/ar_verify_chunks.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ invalidate_chunk(Type, Offset, ChunkSize, State) ->
165165

166166
invalidate_chunk(Type, Offset, ChunkSize, Logs, State) ->
167167
#state{ store_id = StoreID } = State,
168-
ar_data_sync:invalidate_bad_data_record(Offset - ChunkSize, Offset, StoreID, Type),
168+
ar_data_sync:invalidate_bad_data_record(Offset, ChunkSize, StoreID, Type),
169169
log_error(Type, Offset, ChunkSize, Logs, State).
170170

171171
log_error(Type, Offset, ChunkSize, Logs, State) ->

0 commit comments

Comments
 (0)