From 42890afcd9ab756ddb4398ccda47578f146d5751 Mon Sep 17 00:00:00 2001 From: Lev Berman Date: Fri, 24 Jan 2025 14:59:42 +0100 Subject: [PATCH] Improve invalidate bad data record interface --- apps/arweave/src/ar_chunk_copy.erl | 31 +++++++++++----- apps/arweave/src/ar_data_sync.erl | 45 ++++++++++-------------- apps/arweave/src/ar_data_sync_worker.erl | 2 +- apps/arweave/src/ar_verify_chunks.erl | 2 +- 4 files changed, 42 insertions(+), 38 deletions(-) diff --git a/apps/arweave/src/ar_chunk_copy.erl b/apps/arweave/src/ar_chunk_copy.erl index 0931b109c..8bca70be5 100644 --- a/apps/arweave/src/ar_chunk_copy.erl +++ b/apps/arweave/src/ar_chunk_copy.erl @@ -239,9 +239,9 @@ test_ready_for_work() -> State = #state{ workers = #{ "store1" => #worker_tasks{ - task_queue = queue:from_list(lists:seq(1, 100))}, + task_queue = queue:from_list(lists:seq(1, ?MAX_QUEUED_TASKS - 1))}, "store2" => #worker_tasks{ - task_queue = queue:from_list(lists:seq(1, 1001))} + task_queue = queue:from_list(lists:seq(1, ?MAX_QUEUED_TASKS))} } }, ?assertEqual(true, do_ready_for_work("store1", State)), @@ -250,16 +250,29 @@ test_ready_for_work() -> test_enqueue_read_range() -> ExpectedWorker = #worker_tasks{ task_queue = queue:from_list( - [{floor(2.5 * ?DATA_CHUNK_SIZE), floor(12.5 * ?DATA_CHUNK_SIZE), - "store1", "store2"}, - {floor(12.5 * ?DATA_CHUNK_SIZE), floor(22.5 * ?DATA_CHUNK_SIZE), - "store1", "store2"}, - {floor(22.5 * ?DATA_CHUNK_SIZE), floor(30 * ?DATA_CHUNK_SIZE), - "store1", "store2"}] + [{ + floor(2.5 * ?DATA_CHUNK_SIZE), + floor((2.5 + ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE), + "store1", "store2" + }, + { + floor((2.5 + ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE), + floor((2.5 + 2 * ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE), + "store1", "store2" + }, + { + floor((2.5 + 2 * ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE), + floor((2.5 + 3 * ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE), + "store1", "store2" + }] ) }, Worker = do_enqueue_read_range( - {floor(2.5 * ?DATA_CHUNK_SIZE), 30 * ?DATA_CHUNK_SIZE, "store1", "store2"}, + { + floor(2.5 * ?DATA_CHUNK_SIZE), + floor((2.5 + 3 * ?READ_RANGE_CHUNKS) * ?DATA_CHUNK_SIZE), + "store1", "store2" + }, #worker_tasks{task_queue = queue:new()} ), ?assertEqual( diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index 92254cd2f..d701cb7e3 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -83,9 +83,9 @@ join(RecentBI) -> add_tip_block(BlockTXPairs, RecentBI) -> gen_server:cast(ar_data_sync_default, {add_tip_block, BlockTXPairs, RecentBI}). -invalidate_bad_data_record(Byte, AbsoluteEndOffset, StoreID, Case) -> +invalidate_bad_data_record(AbsoluteEndOffset, ChunkSize, StoreID, Case) -> gen_server:cast(name(StoreID), {invalidate_bad_data_record, - {Byte, AbsoluteEndOffset, StoreID, Case}}). + {AbsoluteEndOffset, ChunkSize, StoreID, Case}}). %% @doc The condition which is true if the chunk is too small compared to the proof. %% Small chunks make syncing slower and increase space amplification. A small chunk @@ -1706,8 +1706,8 @@ get_chunk(Offset, SeekOffset, Pack, Packing, StoredPacking, StoreID, RequestOrig {store_id, StoreID}, {expected_chunk_id, ar_util:encode(ChunkID)}, {chunk_id, ar_util:encode(ComputedChunkID)}]), - invalidate_bad_data_record({AbsoluteOffset - ChunkSize, - AbsoluteOffset, StoreID, get_chunk_invalid_id}), + invalidate_bad_data_record({AbsoluteOffset, ChunkSize, + StoreID, get_chunk_invalid_id}), {error, chunk_not_found} end end @@ -1792,7 +1792,7 @@ read_chunk_with_metadata( {modules_covering_seek_offset, ModuleIDs}, {chunk_data_key, ar_util:encode(ChunkDataKey)}, {read_fun, ReadFun}]), - invalidate_bad_data_record({SeekOffset - 1, AbsoluteOffset, StoreID, + invalidate_bad_data_record({AbsoluteOffset, ChunkSize, StoreID, failed_to_read_chunk_data_path}), {error, chunk_not_found}; {error, Error} -> @@ -1827,8 +1827,9 @@ read_chunk_with_metadata( end end. -invalidate_bad_data_record({Byte, AbsoluteEndOffset, StoreID, Type}) -> - case AbsoluteEndOffset - Byte =< ?DATA_CHUNK_SIZE of +invalidate_bad_data_record({AbsoluteEndOffset, ChunkSize, StoreID, Type}) -> + [{_, T}] = ets:lookup(ar_data_sync_state, disk_pool_threshold), + case AbsoluteEndOffset > T of true -> [{_, T}] = ets:lookup(ar_data_sync_state, disk_pool_threshold), case AbsoluteEndOffset > T of @@ -1836,24 +1837,15 @@ invalidate_bad_data_record({Byte, AbsoluteEndOffset, StoreID, Type}) -> %% Do not invalidate fresh records - a reorg may be in progress. ok; false -> - invalidate_bad_data_record2({Byte, AbsoluteEndOffset, StoreID, Type}) + invalidate_bad_data_record2({AbsoluteEndOffset, ChunkSize, StoreID, Type}) end; false -> - ?LOG_WARNING([{event, bad_offset_while_invalidating_data_record}, {type, Type}, - {range_start, Byte}, {range_end, AbsoluteEndOffset}, {store_id, StoreID}]), - ok + invalidate_bad_data_record2({AbsoluteEndOffset, ChunkSize, StoreID, Type}) end. -invalidate_bad_data_record2({Byte, AbsoluteEndOffset, StoreID, Type}) -> +invalidate_bad_data_record2({AbsoluteEndOffset, ChunkSize, StoreID, Type}) -> PaddedEndOffset = ar_block:get_chunk_padded_offset(AbsoluteEndOffset), - MaybePaddedStartOffset = ar_block:get_chunk_padded_offset(Byte), - StartOffset = - case MaybePaddedStartOffset == PaddedEndOffset of - true -> - PaddedEndOffset - ?DATA_CHUNK_SIZE; - false -> - MaybePaddedStartOffset - end, + StartOffset = AbsoluteEndOffset - ChunkSize, ?LOG_WARNING([{event, invalidating_bad_data_record}, {type, Type}, {range_start, StartOffset}, {range_end, PaddedEndOffset}, {store_id, StoreID}]), @@ -1876,9 +1868,9 @@ invalidate_bad_data_record2({Byte, AbsoluteEndOffset, StoreID, Type}) -> remove_invalid_sync_records(PaddedEndOffset, StartOffset, StoreID) -> Remove1 = ar_sync_record:delete(PaddedEndOffset, StartOffset, ar_data_sync, StoreID), - IsSmallChunk = PaddedEndOffset - StartOffset < ?DATA_CHUNK_SIZE, + IsSmallChunkBeforeThreshold = PaddedEndOffset - StartOffset < ?DATA_CHUNK_SIZE, Remove2 = - case {Remove1, IsSmallChunk} of + case {Remove1, IsSmallChunkBeforeThreshold} of {ok, false} -> ar_sync_record:delete(PaddedEndOffset, StartOffset, ar_chunk_storage, StoreID); @@ -1886,14 +1878,14 @@ remove_invalid_sync_records(PaddedEndOffset, StartOffset, StoreID) -> Remove1 end, Remove3 = - case {Remove2, IsSmallChunk} of + case {Remove2, IsSmallChunkBeforeThreshold} of {ok, false} -> ar_sync_record:delete(PaddedEndOffset, StartOffset, ar_chunk_storage_replica_2_9_1_entropy, StoreID); _ -> Remove2 end, - case {Remove3, IsSmallChunk} of + case {Remove3, IsSmallChunkBeforeThreshold} of {ok, false} -> ar_sync_record:delete(PaddedEndOffset, StartOffset, ar_chunk_storage_replica_2_9_1_unpacked, StoreID); @@ -1922,8 +1914,7 @@ validate_fetched_chunk(Args) -> false -> log_chunk_error(RequestOrigin, failed_to_validate_chunk_proofs, [{absolute_end_offset, Offset}, {store_id, StoreID}]), - StartOffset = Offset - ChunkSize, - invalidate_bad_data_record({StartOffset, Offset, StoreID, + invalidate_bad_data_record({Offset, ChunkSize, StoreID, failed_to_validate_chunk_proofs}), false end; @@ -1931,7 +1922,7 @@ validate_fetched_chunk(Args) -> log_chunk_error(stored_chunk_invalid_tx_root, [{end_offset, Offset}, {tx_root, ar_util:encode(TXRoot2)}, {stored_tx_root, ar_util:encode(TXRoot)}, {store_id, StoreID}]), - invalidate_bad_data_record({Offset - ChunkSize, Offset, StoreID, + invalidate_bad_data_record({Offset, ChunkSize, StoreID, stored_chunk_invalid_tx_root}), false end diff --git a/apps/arweave/src/ar_data_sync_worker.erl b/apps/arweave/src/ar_data_sync_worker.erl index 2e55e4a12..280fb26ba 100644 --- a/apps/arweave/src/ar_data_sync_worker.erl +++ b/apps/arweave/src/ar_data_sync_worker.erl @@ -176,7 +176,7 @@ read_range2(MessagesRemaining, {Start, End, OriginStoreID, TargetStoreID}) -> case ReadChunk of not_found -> ar_data_sync:invalidate_bad_data_record( - Start, AbsoluteOffset, OriginStoreID, read_range_chunk_not_found), + AbsoluteOffset, ChunkSize, OriginStoreID, read_range_chunk_not_found), read_range2(MessagesRemaining-1, {Start + ChunkSize, End, OriginStoreID, TargetStoreID}); {error, Error} -> diff --git a/apps/arweave/src/ar_verify_chunks.erl b/apps/arweave/src/ar_verify_chunks.erl index 107c8e437..58c3dfd51 100644 --- a/apps/arweave/src/ar_verify_chunks.erl +++ b/apps/arweave/src/ar_verify_chunks.erl @@ -165,7 +165,7 @@ invalidate_chunk(Type, Offset, ChunkSize, State) -> invalidate_chunk(Type, Offset, ChunkSize, Logs, State) -> #state{ store_id = StoreID } = State, - ar_data_sync:invalidate_bad_data_record(Offset - ChunkSize, Offset, StoreID, Type), + ar_data_sync:invalidate_bad_data_record(Offset, ChunkSize, StoreID, Type), log_error(Type, Offset, ChunkSize, Logs, State). log_error(Type, Offset, ChunkSize, Logs, State) ->