Skip to content

Commit

Permalink
Add check_fully_connected check in cets_join
Browse files Browse the repository at this point in the history
Add join_fails_in_check_fully_connected testcase
  • Loading branch information
arcusfelis committed Aug 1, 2023
1 parent f52bea9 commit 1b3f862
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 4 deletions.
60 changes: 56 additions & 4 deletions src/cets_join.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ join2(_Info, LocalPid, RemotePid, JoinOpts) ->
%% We still use LocalPid/RemotePid in names
%% (they are local and remote pids as passed from the cets_join and from the cets_discovery).
#{opts := Opts} = cets:info(LocalPid),
LocalOtherPids = cets:other_pids(LocalPid),
RemoteOtherPids = cets:other_pids(RemotePid),
LocPids = [LocalPid | LocalOtherPids],
RemPids = [RemotePid | RemoteOtherPids],
LocPids = get_pids(LocalPid),
RemPids = get_pids(RemotePid),
run_step(before_check_fully_connected, JoinOpts),
check_fully_connected(LocPids),
check_fully_connected(RemPids),
AllPids = LocPids ++ RemPids,
Paused = [{Pid, cets:pause(Pid)} || Pid <- AllPids],
%% Merges data from two partitions together.
Expand All @@ -82,6 +83,10 @@ join2(_Info, LocalPid, RemotePid, JoinOpts) ->
cets:sync(RemotePid),
{ok, LocalDump} = remote_or_local_dump(LocalPid),
{ok, RemoteDump} = remote_or_local_dump(RemotePid),
%% Check that still fully connected after getting the dumps
%% and before making any changes
check_fully_connected(LocPids),
check_fully_connected(RemPids),
{LocalDump2, RemoteDump2} = maybe_apply_resolver(LocalDump, RemoteDump, Opts),
RemF = fun(Pid) -> send_dump(Pid, LocPids, JoinRef, LocalDump2, JoinOpts) end,
LocF = fun(Pid) -> send_dump(Pid, RemPids, JoinRef, RemoteDump2, JoinOpts) end,
Expand Down Expand Up @@ -146,6 +151,53 @@ apply_resolver_for_sorted(
apply_resolver_for_sorted(LocalDump, RemoteDump, _F, _Pos, LocalAcc, RemoteAcc) ->
{lists:reverse(LocalAcc, LocalDump), lists:reverse(RemoteAcc, RemoteDump)}.

get_pids(Pid) ->
case cets:other_pids(Pid) of
Pids when is_list(Pids) ->
[Pid | Pids];
Other ->
error({get_other_pids_failed, Pid, Other})

Check warning on line 159 in src/cets_join.erl

View check run for this annotation

Codecov / codecov/patch

src/cets_join.erl#L159

Added line #L159 was not covered by tests
end.

%% Checks that other_pids lists match for all nodes
%% If they are not matching - the node removal process could be in progress
check_fully_connected(Pids) ->
Lists = [get_pids(Pid) || Pid <- Pids],
case are_fully_connected_lists([Pids | Lists]) of
true ->
check_same_join_ref(Pids);
false ->
?LOG_ERROR(#{
what => check_fully_connected_failed,
expected_pids => Pids,
server_lists => Lists
}),

Check warning on line 174 in src/cets_join.erl

View check run for this annotation

Codecov / codecov/patch

src/cets_join.erl#L174

Added line #L174 was not covered by tests
error(check_fully_connected_failed)
end.

%% Check that all elements of the list match (if sorted)
are_fully_connected_lists(Lists) ->
length(lists:usort([lists:sort(List) || List <- Lists])) =:= 1.

%% Check if all nodes have the same join_ref
%% If not - we don't want to continue joining
check_same_join_ref(Pids) ->
Refs = [pid_to_join_ref(Pid) || Pid <- Pids],
case lists:usort(Refs) of
[_] ->
ok;
_ ->
?LOG_ERROR(#{

Check warning on line 190 in src/cets_join.erl

View check run for this annotation

Codecov / codecov/patch

src/cets_join.erl#L190

Added line #L190 was not covered by tests
what => check_same_join_ref_failed,
refs => lists:zip(Pids, Refs)
}),
error(check_same_join_ref_failed)

Check warning on line 194 in src/cets_join.erl

View check run for this annotation

Codecov / codecov/patch

src/cets_join.erl#L193-L194

Added lines #L193 - L194 were not covered by tests
end.

pid_to_join_ref(Pid) ->
#{join_ref := JoinRef} = cets:info(Pid),
JoinRef.

run_step(Step, #{step_handler := F}) ->
F(Step);
run_step(_Step, _Opts) ->
Expand Down
24 changes: 24 additions & 0 deletions test/cets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ all() ->
join_fails_before_send_dump,
join_fails_before_send_dump_and_there_are_pending_remote_ops,
send_dump_fails_during_join_because_receiver_exits,
join_fails_in_check_fully_connected,
test_multinode,
test_multinode_remote_insert,
node_list_is_correct,
Expand Down Expand Up @@ -450,6 +451,29 @@ send_dump_fails_during_join_because_receiver_exits(Config) ->
cets:insert(Pid1, {1}),
{ok, [{1}]} = cets:remote_dump(Pid1).

join_fails_in_check_fully_connected(Config) ->
Me = self(),
{ok, Pid1} = cets:start(make_name(Config, 1), #{}),
{ok, Pid2} = cets:start(make_name(Config, 2), #{}),
{ok, Pid3} = cets:start(make_name(Config, 3), #{}),
%% Pid2 and Pid3 are connected
ok = cets_join:join(lock_name(Config), #{}, Pid2, Pid3, #{}),
[Pid3] = cets:other_pids(Pid2),
F = fun
(before_check_fully_connected) ->
%% Ask Pid2 to remove Pid3 from the list
Pid2 ! {'DOWN', make_ref(), process, Pid3, sim_error},
%% Ensure Pid2 did the cleaning
pong = cets:ping(Pid2),
[] = cets:other_pids(Pid2),
Me ! before_check_fully_connected_called;
(_) ->
ok
end,
{error, {error, check_fully_connected_failed, _}} =
cets_join:join(lock_name(Config), #{}, Pid1, Pid2, #{step_handler => F}),
receive_message(before_check_fully_connected_called).

test_multinode(Config) ->
Node1 = node(),
[Node2, Node3, Node4] = proplists:get_value(nodes, Config),
Expand Down

0 comments on commit 1b3f862

Please sign in to comment.