Skip to content

Commit

Permalink
Improve invalidate bad data record interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Lev Berman authored and JamesPiechota committed Jan 24, 2025
1 parent e43f861 commit 42890af
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 38 deletions.
31 changes: 22 additions & 9 deletions apps/arweave/src/ar_chunk_copy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ test_ready_for_work() ->
State = #state{
workers = #{
"store1" => #worker_tasks{
task_queue = queue:from_list(lists:seq(1, 100))},
task_queue = queue:from_list(lists:seq(1, ?MAX_QUEUED_TASKS - 1))},
"store2" => #worker_tasks{
task_queue = queue:from_list(lists:seq(1, 1001))}
task_queue = queue:from_list(lists:seq(1, ?MAX_QUEUED_TASKS))}
}
},
?assertEqual(true, do_ready_for_work("store1", State)),
Expand All @@ -250,16 +250,29 @@ test_ready_for_work() ->
test_enqueue_read_range() ->
ExpectedWorker = #worker_tasks{
task_queue = queue:from_list(
[{floor(2.5 * ?DATA_CHUNK_SIZE), floor(12.5 * ?DATA_CHUNK_SIZE),
"store1", "store2"},
{floor(12.5 * ?DATA_CHUNK_SIZE), floor(22.5 * ?DATA_CHUNK_SIZE),
"store1", "store2"},
{floor(22.5 * ?DATA_CHUNK_SIZE), floor(30 * ?DATA_CHUNK_SIZE),
"store1", "store2"}]
[{
floor(2.5 * ?DATA_CHUNK_SIZE),
floor((2.5 + ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE),
"store1", "store2"
},
{
floor((2.5 + ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE),
floor((2.5 + 2 * ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE),
"store1", "store2"
},
{
floor((2.5 + 2 * ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE),
floor((2.5 + 3 * ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE),
"store1", "store2"
}]
)
},
Worker = do_enqueue_read_range(
{floor(2.5 * ?DATA_CHUNK_SIZE), 30 * ?DATA_CHUNK_SIZE, "store1", "store2"},
{
floor(2.5 * ?DATA_CHUNK_SIZE),
floor((2.5 + 3 * ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE),
"store1", "store2"
},
#worker_tasks{task_queue = queue:new()}
),
?assertEqual(
Expand Down
45 changes: 18 additions & 27 deletions apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ join(RecentBI) ->
add_tip_block(BlockTXPairs, RecentBI) ->
gen_server:cast(ar_data_sync_default, {add_tip_block, BlockTXPairs, RecentBI}).

invalidate_bad_data_record(Byte, AbsoluteEndOffset, StoreID, Case) ->
invalidate_bad_data_record(AbsoluteEndOffset, ChunkSize, StoreID, Case) ->
gen_server:cast(name(StoreID), {invalidate_bad_data_record,
{Byte, AbsoluteEndOffset, StoreID, Case}}).
{AbsoluteEndOffset, ChunkSize, StoreID, Case}}).

%% @doc The condition which is true if the chunk is too small compared to the proof.
%% Small chunks make syncing slower and increase space amplification. A small chunk
Expand Down Expand Up @@ -1706,8 +1706,8 @@ get_chunk(Offset, SeekOffset, Pack, Packing, StoredPacking, StoreID, RequestOrig
{store_id, StoreID},
{expected_chunk_id, ar_util:encode(ChunkID)},
{chunk_id, ar_util:encode(ComputedChunkID)}]),
invalidate_bad_data_record({AbsoluteOffset - ChunkSize,
AbsoluteOffset, StoreID, get_chunk_invalid_id}),
invalidate_bad_data_record({AbsoluteOffset, ChunkSize,
StoreID, get_chunk_invalid_id}),
{error, chunk_not_found}
end
end
Expand Down Expand Up @@ -1792,7 +1792,7 @@ read_chunk_with_metadata(
{modules_covering_seek_offset, ModuleIDs},
{chunk_data_key, ar_util:encode(ChunkDataKey)},
{read_fun, ReadFun}]),
invalidate_bad_data_record({SeekOffset - 1, AbsoluteOffset, StoreID,
invalidate_bad_data_record({AbsoluteOffset, ChunkSize, StoreID,
failed_to_read_chunk_data_path}),
{error, chunk_not_found};
{error, Error} ->
Expand Down Expand Up @@ -1827,33 +1827,25 @@ read_chunk_with_metadata(
end
end.

invalidate_bad_data_record({Byte, AbsoluteEndOffset, StoreID, Type}) ->
case AbsoluteEndOffset - Byte =< ?DATA_CHUNK_SIZE of
invalidate_bad_data_record({AbsoluteEndOffset, ChunkSize, StoreID, Type}) ->
[{_, T}] = ets:lookup(ar_data_sync_state, disk_pool_threshold),
case AbsoluteEndOffset > T of
true ->
[{_, T}] = ets:lookup(ar_data_sync_state, disk_pool_threshold),
case AbsoluteEndOffset > T of
true ->
%% Do not invalidate fresh records - a reorg may be in progress.
ok;
false ->
invalidate_bad_data_record2({Byte, AbsoluteEndOffset, StoreID, Type})
invalidate_bad_data_record2({AbsoluteEndOffset, ChunkSize, StoreID, Type})
end;
false ->
?LOG_WARNING([{event, bad_offset_while_invalidating_data_record}, {type, Type},
{range_start, Byte}, {range_end, AbsoluteEndOffset}, {store_id, StoreID}]),
ok
invalidate_bad_data_record2({AbsoluteEndOffset, ChunkSize, StoreID, Type})
end.

invalidate_bad_data_record2({Byte, AbsoluteEndOffset, StoreID, Type}) ->
invalidate_bad_data_record2({AbsoluteEndOffset, ChunkSize, StoreID, Type}) ->
PaddedEndOffset = ar_block:get_chunk_padded_offset(AbsoluteEndOffset),
MaybePaddedStartOffset = ar_block:get_chunk_padded_offset(Byte),
StartOffset =
case MaybePaddedStartOffset == PaddedEndOffset of
true ->
PaddedEndOffset - ?DATA_CHUNK_SIZE;
false ->
MaybePaddedStartOffset
end,
StartOffset = AbsoluteEndOffset - ChunkSize,
?LOG_WARNING([{event, invalidating_bad_data_record}, {type, Type},
{range_start, StartOffset}, {range_end, PaddedEndOffset},
{store_id, StoreID}]),
Expand All @@ -1876,24 +1868,24 @@ invalidate_bad_data_record2({Byte, AbsoluteEndOffset, StoreID, Type}) ->

remove_invalid_sync_records(PaddedEndOffset, StartOffset, StoreID) ->
Remove1 = ar_sync_record:delete(PaddedEndOffset, StartOffset, ar_data_sync, StoreID),
IsSmallChunk = PaddedEndOffset - StartOffset < ?DATA_CHUNK_SIZE,
IsSmallChunkBeforeThreshold = PaddedEndOffset - StartOffset < ?DATA_CHUNK_SIZE,
Remove2 =
case {Remove1, IsSmallChunk} of
case {Remove1, IsSmallChunkBeforeThreshold} of
{ok, false} ->
ar_sync_record:delete(PaddedEndOffset, StartOffset,
ar_chunk_storage, StoreID);
_ ->
Remove1
end,
Remove3 =
case {Remove2, IsSmallChunk} of
case {Remove2, IsSmallChunkBeforeThreshold} of
{ok, false} ->
ar_sync_record:delete(PaddedEndOffset, StartOffset,
ar_chunk_storage_replica_2_9_1_entropy, StoreID);
_ ->
Remove2
end,
case {Remove3, IsSmallChunk} of
case {Remove3, IsSmallChunkBeforeThreshold} of
{ok, false} ->
ar_sync_record:delete(PaddedEndOffset, StartOffset,
ar_chunk_storage_replica_2_9_1_unpacked, StoreID);
Expand Down Expand Up @@ -1922,16 +1914,15 @@ validate_fetched_chunk(Args) ->
false ->
log_chunk_error(RequestOrigin, failed_to_validate_chunk_proofs,
[{absolute_end_offset, Offset}, {store_id, StoreID}]),
StartOffset = Offset - ChunkSize,
invalidate_bad_data_record({StartOffset, Offset, StoreID,
invalidate_bad_data_record({Offset, ChunkSize, StoreID,
failed_to_validate_chunk_proofs}),
false
end;
{_BlockStart, _BlockEnd, TXRoot2} ->
log_chunk_error(stored_chunk_invalid_tx_root,
[{end_offset, Offset}, {tx_root, ar_util:encode(TXRoot2)},
{stored_tx_root, ar_util:encode(TXRoot)}, {store_id, StoreID}]),
invalidate_bad_data_record({Offset - ChunkSize, Offset, StoreID,
invalidate_bad_data_record({Offset, ChunkSize, StoreID,
stored_chunk_invalid_tx_root}),
false
end
Expand Down
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_data_sync_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ read_range2(MessagesRemaining, {Start, End, OriginStoreID, TargetStoreID}) ->
case ReadChunk of
not_found ->
ar_data_sync:invalidate_bad_data_record(
Start, AbsoluteOffset, OriginStoreID, read_range_chunk_not_found),
AbsoluteOffset, ChunkSize, OriginStoreID, read_range_chunk_not_found),
read_range2(MessagesRemaining-1,
{Start + ChunkSize, End, OriginStoreID, TargetStoreID});
{error, Error} ->
Expand Down
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_verify_chunks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ invalidate_chunk(Type, Offset, ChunkSize, State) ->

invalidate_chunk(Type, Offset, ChunkSize, Logs, State) ->
#state{ store_id = StoreID } = State,
ar_data_sync:invalidate_bad_data_record(Offset - ChunkSize, Offset, StoreID, Type),
ar_data_sync:invalidate_bad_data_record(Offset, ChunkSize, StoreID, Type),
log_error(Type, Offset, ChunkSize, Logs, State).

log_error(Type, Offset, ChunkSize, Logs, State) ->
Expand Down

0 comments on commit 42890af

Please sign in to comment.