Skip to content

Commit

Permalink
fix: fix some iteration errors in repack_in_place
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesPiechota committed Jan 26, 2025
1 parent e88c467 commit 6b9cc4a
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 133 deletions.
47 changes: 45 additions & 2 deletions apps/arweave/e2e/ar_e2e.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

-export([delayed_print/2, packing_type_to_packing/2,
start_source_node/3, source_node_storage_modules/3, max_chunk_offset/1,
assert_block/2, assert_syncs_range/3, assert_does_not_sync_range/3,
assert_chunks/3, assert_no_chunks/2, assert_partition_size/3, assert_empty_partition/3]).
assert_block/2, assert_syncs_range/3, assert_does_not_sync_range/3, assert_has_entropy/4,
assert_chunks/3, assert_no_chunks/2, assert_partition_size/3, assert_empty_partition/3,
assert_mine_and_validate/3]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_config.hrl").
Expand Down Expand Up @@ -220,6 +221,31 @@ assert_block({replica_2_9, Address}, MinedBlock) ->
?assertEqual(Address, MinedBlock#block.reward_addr),
?assertEqual(?REPLICA_2_9_PACKING_DIFFICULTY, MinedBlock#block.packing_difficulty).

assert_has_entropy(Node, StartOffset, EndOffset, StoreID) ->
RangeSize = EndOffset - StartOffset,
HasEntropy = ar_util:do_until(
fun() ->
Intersection = ar_test_node:remote_call(
Node, ar_sync_record, get_intersection_size,
[EndOffset, StartOffset, ar_chunk_storage_replica_2_9_1_entropy, StoreID]),
?LOG_INFO("Intersection: ~p, RangeSize: ~p, StoreID: ~p", [Intersection, RangeSize, StoreID]),
Intersection >= RangeSize
end,
100,
60_000
),
case HasEntropy of
true ->
ok;
false ->
Intersection = ar_test_node:remote_call(
Node, ar_sync_record, get_intersection_size,
[EndOffset, StartOffset, ar_chunk_storage_replica_2_9_1_entropy, StoreID]),
?assert(false,
iolist_to_binary(io_lib:format(
"~s failed to prepare entropy range ~p - ~p. Intersection: ~p",
[Node, StartOffset, EndOffset, Intersection])))
end.

assert_syncs_range(Node, StartOffset, EndOffset) ->
HasRange = ar_util:do_until(
Expand Down Expand Up @@ -285,6 +311,23 @@ assert_empty_partition(Node, PartitionNumber, Packing) ->
"~s partition ~p,~p os not empty", [Node, PartitionNumber,
ar_serialize:encode_packing(Packing, true)]))).

assert_mine_and_validate(MinerNode, ValidatorNode, MinerPacking) ->
CurrentHeight = max(
ar_test_node:remote_call(ValidatorNode, ar_node, get_height, []),
ar_test_node:remote_call(MinerNode, ar_node, get_height, [])
),
ar_test_node:wait_until_height(ValidatorNode, CurrentHeight),
ar_test_node:wait_until_height(MinerNode, CurrentHeight),
ar_test_node:mine(MinerNode),

MinerBI = ar_test_node:wait_until_height(MinerNode, CurrentHeight + 1),
{ok, MinerBlock} = ar_test_node:http_get_block(element(1, hd(MinerBI)), MinerNode),
ar_e2e:assert_block(MinerPacking, MinerBlock),

ValidatorBI = ar_test_node:wait_until_height(ValidatorNode, MinerBlock#block.height),
{ok, ValidatorBlock} = ar_test_node:http_get_block(element(1, hd(ValidatorBI)), ValidatorNode),
?assertEqual(MinerBlock, ValidatorBlock).

has_range(Node, StartOffset, EndOffset) ->
NodeIP = ar_test_node:peer_ip(Node),
case ar_http_iface_client:get_sync_record(NodeIP) of
Expand Down
22 changes: 4 additions & 18 deletions apps/arweave/e2e/ar_repack_in_place_mine_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
repack_in_place_mine_test_() ->
Timeout = ?REPACK_IN_PLACE_MINE_TEST_TIMEOUT,
[
% {timeout, Timeout, {with, {unpacked, replica_2_9}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {spora_2_6, replica_2_9}, [fun test_repack_in_place_mine/1]}}
% {timeout, Timeout, {with, {composite_1, replica_2_9}, [fun test_repack_in_place_mine/1]}}
{timeout, Timeout, {with, {unpacked, replica_2_9}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {spora_2_6, replica_2_9}, [fun test_repack_in_place_mine/1]}},
{timeout, Timeout, {with, {composite_1, replica_2_9}, [fun test_repack_in_place_mine/1]}}
].

%% --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -83,21 +83,7 @@ test_repack_in_place_mine({FromPackingType, ToPackingType}) ->
unpacked ->
ok;
_ ->
CurrentHeight = max(
ar_test_node:remote_call(ValidatorNode, ar_node, get_height, []),
ar_test_node:remote_call(RepackerNode, ar_node, get_height, [])
),
ar_test_node:wait_until_height(ValidatorNode, CurrentHeight),
ar_test_node:wait_until_height(RepackerNode, CurrentHeight),
ar_test_node:mine(RepackerNode),

RepackerBI = ar_test_node:wait_until_height(RepackerNode, CurrentHeight + 1),
{ok, RepackerBlock} = ar_test_node:http_get_block(element(1, hd(RepackerBI)), RepackerNode),
ar_e2e:assert_block(ToPacking, RepackerBlock),

ValidatorBI = ar_test_node:wait_until_height(ValidatorNode, RepackerBlock#block.height),
{ok, ValidatorBlock} = ar_test_node:http_get_block(element(1, hd(ValidatorBI)), ValidatorNode),
?assertEqual(RepackerBlock, ValidatorBlock)
ar_e2e:assert_mine_and_validate(RepackerNode, ValidatorNode, ToPacking)
end.


Expand Down
16 changes: 1 addition & 15 deletions apps/arweave/e2e/ar_repack_mine_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,7 @@ test_repack_mine({FromPackingType, ToPackingType}) ->
unpacked ->
ok;
_ ->
CurrentHeight = max(
ar_test_node:remote_call(ValidatorNode, ar_node, get_height, []),
ar_test_node:remote_call(RepackerNode, ar_node, get_height, [])
),
ar_test_node:wait_until_height(ValidatorNode, CurrentHeight),
ar_test_node:wait_until_height(RepackerNode, CurrentHeight),
ar_test_node:mine(RepackerNode),

RepackerBI = ar_test_node:wait_until_height(RepackerNode, CurrentHeight + 1),
{ok, RepackerBlock} = ar_test_node:http_get_block(element(1, hd(RepackerBI)), RepackerNode),
ar_e2e:assert_block(ToPacking, RepackerBlock),

ValidatorBI = ar_test_node:wait_until_height(ValidatorNode, RepackerBlock#block.height),
{ok, ValidatorBlock} = ar_test_node:http_get_block(element(1, hd(ValidatorBI)), ValidatorNode),
?assertEqual(RepackerBlock, ValidatorBlock)
ar_e2e:assert_mine_and_validate(RepackerNode, ValidatorNode, ToPacking)
end.

test_repacking_blocked({FromPackingType, ToPackingType}) ->
Expand Down
153 changes: 127 additions & 26 deletions apps/arweave/e2e/ar_sync_pack_mine_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,34 @@ unpacked_sync_pack_mine_test_() ->
]
end}.

unpacked_and_packed_sync_pack_mine_test_() ->
unpacked_edge_case_test_() ->
{setup, fun () -> setup_source_node(unpacked) end,
fun (GenesisData) ->
[
instantiator(GenesisData, replica_2_9,
fun test_unpacked_and_packed_sync_pack_mine/1)
fun test_unpacked_and_packed_sync_pack_mine/1),
instantiator(GenesisData, replica_2_9,
fun test_entropy_first_sync_pack_mine/1),
instantiator(GenesisData, replica_2_9,
fun test_entropy_last_sync_pack_mine/1)
]
end}.

spora_2_6_edge_case_test_() ->
{setup, fun () -> setup_source_node(spora_2_6) end,
fun (GenesisData) ->
[
instantiator(GenesisData, replica_2_9,
fun test_unpacked_and_packed_sync_pack_mine/1),
instantiator(GenesisData, replica_2_9,
fun test_entropy_first_sync_pack_mine/1),
instantiator(GenesisData, replica_2_9,
fun test_entropy_last_sync_pack_mine/1)

]
end}.


%% --------------------------------------------------------------------------------------------
%% test_sync_pack_mine
%% --------------------------------------------------------------------------------------------
Expand All @@ -90,27 +109,14 @@ test_sync_pack_mine({{Blocks, Chunks, SourcePackingType}, SinkPackingType}) ->
SinkNode,
?PARTITION_SIZE,
2*?PARTITION_SIZE + ar_storage_module:get_overlap(SinkPacking)),
ar_e2e:assert_partition_size(SinkNode, 1, SinkPacking),
ar_e2e:assert_chunks(SinkNode, SinkPacking, Chunks),

case SinkPackingType of
unpacked ->
ok;
_ ->
CurrentHeight = max(
ar_test_node:remote_call(SourceNode, ar_node, get_height, []),
ar_test_node:remote_call(SinkNode, ar_node, get_height, [])
),
ar_test_node:wait_until_height(SourceNode, CurrentHeight),
ar_test_node:wait_until_height(SinkNode, CurrentHeight),
ar_test_node:mine(SinkNode),

SinkBI = ar_test_node:wait_until_height(SinkNode, CurrentHeight + 1),
{ok, SinkBlock} = ar_test_node:http_get_block(element(1, hd(SinkBI)), SinkNode),
ar_e2e:assert_block(SinkPacking, SinkBlock),

SourceBI = ar_test_node:wait_until_height(SourceNode, SinkBlock#block.height),
{ok, SourceBlock} = ar_test_node:http_get_block(element(1, hd(SourceBI)), SourceNode),
?assertEqual(SinkBlock, SourceBlock),
ar_e2e:assert_mine_and_validate(SinkNode, SourceNode, SinkPacking),
ok
end.

Expand All @@ -135,21 +141,116 @@ test_unpacked_and_packed_sync_pack_mine({{Blocks, Chunks, SourcePackingType}, Pa
SinkNode,
?PARTITION_SIZE,
2*?PARTITION_SIZE + ar_storage_module:get_overlap(SinkPacking)),
ar_e2e:assert_chunks(SinkNode, SinkPacking, Chunks),
ar_e2e:assert_partition_size(SinkNode, 1, SinkPacking),
ar_e2e:assert_partition_size(SinkNode, 1, unpacked),

ar_e2e:assert_mine_and_validate(SinkNode, SourceNode, SinkPacking),
ok.


test_entropy_first_sync_pack_mine({{Blocks, Chunks, SourcePackingType}, SinkPackingType}) ->
ar_e2e:delayed_print(<<" ~p -> ~p ">>, [SourcePackingType, SinkPackingType]),
[B0 | _] = Blocks,
SourceNode = peer1,
SinkNode = peer2,

Wallet = ar_test_node:remote_call(SinkNode, ar_e2e, load_wallet_fixture, [wallet_b]),
SinkAddr = ar_wallet:to_address(Wallet),
SinkPacking = ar_e2e:packing_type_to_packing(SinkPackingType, SinkAddr),
{ok, Config} = ar_test_node:get_config(SinkNode),

Module = {?PARTITION_SIZE, 1, SinkPacking},
StoreID = ar_storage_module:id(Module),
StorageModules = [ Module ],

CurrentHeight = ar_test_node:remote_call(SinkNode, ar_node, get_height, []),
ar_test_node:mine(SinkNode),

SinkBI = ar_test_node:wait_until_height(SinkNode, CurrentHeight + 1),
{ok, SinkBlock} = ar_test_node:http_get_block(element(1, hd(SinkBI)), SinkNode),
ar_e2e:assert_block(SinkPacking, SinkBlock),
%% 1. Run node with no sync jobs so that it only prepares entropy
Config2 = Config#config{
peers = [ar_test_node:peer_ip(SourceNode)],
start_from_latest_state = true,
storage_modules = StorageModules,
auto_join = true,
mining_addr = SinkAddr,
sync_jobs = 0
},
?assertEqual(ar_test_node:peer_name(SinkNode),
ar_test_node:start_other_node(SinkNode, B0, Config2, true)
),
ar_e2e:assert_has_entropy(SinkNode, ?PARTITION_SIZE, 2*?PARTITION_SIZE, StoreID),
ar_e2e:assert_empty_partition(SinkNode, 1, unpacked),
ar_e2e:assert_empty_partition(SinkNode, 1, unpacked_padded),

SourceBI = ar_test_node:wait_until_height(SourceNode, SinkBlock#block.height),
{ok, SourceBlock} = ar_test_node:http_get_block(element(1, hd(SourceBI)), SourceNode),
?assertEqual(SinkBlock, SourceBlock),
%% 2. Run node with sync jobs so that it syncs and packs data
ar_test_node:update_config(SinkNode, Config2#config{
sync_jobs = 100
}),
ar_test_node:restart(SinkNode),

ar_e2e:assert_syncs_range(SinkNode,
?PARTITION_SIZE,
2*?PARTITION_SIZE + ar_storage_module:get_overlap(SinkPacking)),
ar_e2e:assert_partition_size(SinkNode, 1, SinkPacking),
ar_e2e:assert_empty_partition(SinkNode, 1, unpacked),
ar_e2e:assert_empty_partition(SinkNode, 1, unpacked_padded),
ar_e2e:assert_chunks(SinkNode, SinkPacking, Chunks),

%% 3. Make sure the data is minable
ar_e2e:assert_mine_and_validate(SinkNode, SourceNode, SinkPacking),
ok.

test_entropy_last_sync_pack_mine({{Blocks, Chunks, SourcePackingType}, SinkPackingType}) ->
ar_e2e:delayed_print(<<" ~p -> ~p ">>, [SourcePackingType, SinkPackingType]),
[B0 | _] = Blocks,
SourceNode = peer1,
SinkNode = peer2,

Wallet = ar_test_node:remote_call(SinkNode, ar_e2e, load_wallet_fixture, [wallet_b]),
SinkAddr = ar_wallet:to_address(Wallet),
SinkPacking = ar_e2e:packing_type_to_packing(SinkPackingType, SinkAddr),
{ok, Config} = ar_test_node:get_config(SinkNode),

Module = {?PARTITION_SIZE, 1, SinkPacking},
StoreID = ar_storage_module:id(Module),
StorageModules = [ Module ],

%% 1. Run node with no replica_2_9 workers so that it only syncs chunks
Config2 = Config#config{
peers = [ar_test_node:peer_ip(SourceNode)],
start_from_latest_state = true,
storage_modules = StorageModules,
auto_join = true,
mining_addr = SinkAddr,
replica_2_9_workers = 0
},
?assertEqual(ar_test_node:peer_name(SinkNode),
ar_test_node:start_other_node(SinkNode, B0, Config2, true)
),

ar_e2e:assert_syncs_range(SinkNode,
?PARTITION_SIZE,
2*?PARTITION_SIZE + ar_storage_module:get_overlap(SinkPacking)),
ar_e2e:assert_partition_size(SinkNode, 1, unpacked_padded),
ar_e2e:assert_empty_partition(SinkNode, 1, unpacked),

%% 2. Run node with sync jobs so that it syncs and packs data
ar_test_node:update_config(SinkNode, Config2#config{
replica_2_9_workers = 8
}),
ar_test_node:restart(SinkNode),

ar_e2e:assert_has_entropy(SinkNode, ?PARTITION_SIZE, 2*?PARTITION_SIZE, StoreID),
ar_e2e:assert_syncs_range(SinkNode,
?PARTITION_SIZE,
2*?PARTITION_SIZE + ar_storage_module:get_overlap(SinkPacking)),
ar_e2e:assert_partition_size(SinkNode, 1, SinkPacking),
ar_e2e:assert_empty_partition(SinkNode, 1, unpacked_padded),
ar_e2e:assert_empty_partition(SinkNode, 1, unpacked),
ar_e2e:assert_chunks(SinkNode, SinkPacking, Chunks),

%% 3. Make sure the data is minable
ar_e2e:assert_mine_and_validate(SinkNode, SourceNode, SinkPacking),
ok.

start_sink_node(Node, SourceNode, B0, PackingType) ->
Wallet = ar_test_node:remote_call(Node, ar_e2e, load_wallet_fixture, [wallet_b]),
Expand Down Expand Up @@ -193,4 +294,4 @@ start_sink_node(Node, SourceNode, B0, PackingType1, PackingType2) ->
mining_addr = SinkAddr
}, true)
),
{SinkPacking1, SinkPacking2}.
{SinkPacking1, SinkPacking2}.
7 changes: 0 additions & 7 deletions apps/arweave/src/ar_chunk_copy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,13 @@ handle_call(Request, _From, State) ->
{reply, ok, State}.

handle_cast({read_range, Args}, State) ->
?LOG_DEBUG([{event, read_range}, {module, ?MODULE}, {args, Args}]),
{noreply, enqueue_read_range(Args, State)};

handle_cast(process_queues, State) ->
?LOG_DEBUG([{event, process_queues}, {module, ?MODULE}]),
ar_util:cast_after(1000, self(), process_queues),
{noreply, process_queues(State)};

handle_cast({task_completed, {read_range, {Worker, _, Args}}}, State) ->
?LOG_DEBUG([{event, task_completed}, {module, ?MODULE}, {worker, Worker}, {args, Args}]),
{noreply, task_completed(Args, State)};

handle_cast(Cast, State) ->
Expand Down Expand Up @@ -161,7 +158,6 @@ do_enqueue_read_range(Args, Worker) ->
{Start, End, OriginStoreID, TargetStoreID} = Args,
End2 = min(Start + (?READ_RANGE_CHUNKS * ?DATA_CHUNK_SIZE), End),
Args2 = {Start, End2, OriginStoreID, TargetStoreID},
?LOG_DEBUG([{event, enqueue_read_range}, {module, ?MODULE}, {args, Args2}]),
TaskQueue = queue:in(Args2, Worker#worker_tasks.task_queue),
Worker2 = Worker#worker_tasks{task_queue = TaskQueue},
case End2 == End of
Expand Down Expand Up @@ -211,9 +207,6 @@ task_completed(Args, State) ->
{store_id, OriginStoreID}]),
State;
_ ->
?LOG_DEBUG([{event, task_completed}, {module, ?MODULE},
{worker, Worker#worker_tasks.worker},
{active_count, Worker#worker_tasks.active_count}, {args, Args}]),
ActiveCount = Worker#worker_tasks.active_count - 1,
Worker2 = Worker#worker_tasks{active_count = ActiveCount},
Worker3 = process_queue(Worker2),
Expand Down
12 changes: 7 additions & 5 deletions apps/arweave/src/ar_chunk_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@ get_chunk_storage_path(DataDir, StoreID) ->

%% @doc Return the start and end offset of the bucket containing the given offset.
%% A chunk bucket a 0-based, 256-KiB wide, 256-KiB aligned range that fully contains a chunk.
-spec get_chunk_bucket_start(PaddedEndOffset :: non_neg_integer()) -> non_neg_integer().
get_chunk_bucket_start(PaddedEndOffset) ->
-spec get_chunk_bucket_start(Offset :: non_neg_integer()) -> non_neg_integer().
get_chunk_bucket_start(Offset) ->
PaddedEndOffset = ar_block:get_chunk_padded_offset(Offset),
ar_util:floor_int(max(0, PaddedEndOffset - ?DATA_CHUNK_SIZE), ?DATA_CHUNK_SIZE).

-spec get_chunk_bucket_end(Offset :: non_neg_integer()) -> non_neg_integer().
get_chunk_bucket_end(EndOffset) ->
PaddedEndOffset = ar_block:get_chunk_padded_offset(EndOffset),
get_chunk_bucket_start(PaddedEndOffset) + ?DATA_CHUNK_SIZE.
get_chunk_bucket_end(Offset) ->
get_chunk_bucket_start(Offset) + ?DATA_CHUNK_SIZE.

set_entropy_complete(StoreID) ->
gen_server:cast(name(StoreID), entropy_complete).
Expand Down Expand Up @@ -369,6 +369,8 @@ init({StoreID, RepackInPlacePacking}) ->
gen_server:cast(self(), {repack, Packing}),
?LOG_INFO([{event, starting_repack_in_place},
{tags, [repack_in_place]},
{range_start, RangeStart},
{range_end, RangeEnd},
{cursor, RepackCursor},
{store_id, StoreID},
{target_packing, ar_serialize:encode_packing(Packing, true)}]),
Expand Down
Loading

0 comments on commit 6b9cc4a

Please sign in to comment.