diff --git a/src/cets_join.erl b/src/cets_join.erl index 04aa62d4..843a98b6 100644 --- a/src/cets_join.erl +++ b/src/cets_join.erl @@ -68,10 +68,11 @@ join2(_Info, LocalPid, RemotePid, JoinOpts) -> %% We still use LocalPid/RemotePid in names %% (they are local and remote pids as passed from the cets_join and from the cets_discovery). #{opts := Opts} = cets:info(LocalPid), - LocalOtherPids = cets:other_pids(LocalPid), - RemoteOtherPids = cets:other_pids(RemotePid), - LocPids = [LocalPid | LocalOtherPids], - RemPids = [RemotePid | RemoteOtherPids], + LocPids = get_pids(LocalPid), + RemPids = get_pids(RemotePid), + run_step(before_check_fully_connected, JoinOpts), + check_fully_connected(LocPids), + check_fully_connected(RemPids), AllPids = LocPids ++ RemPids, Paused = [{Pid, cets:pause(Pid)} || Pid <- AllPids], %% Merges data from two partitions together. @@ -82,6 +83,10 @@ join2(_Info, LocalPid, RemotePid, JoinOpts) -> cets:sync(RemotePid), {ok, LocalDump} = remote_or_local_dump(LocalPid), {ok, RemoteDump} = remote_or_local_dump(RemotePid), + %% Check that still fully connected after getting the dumps + %% and before making any changes + check_fully_connected(LocPids), + check_fully_connected(RemPids), {LocalDump2, RemoteDump2} = maybe_apply_resolver(LocalDump, RemoteDump, Opts), RemF = fun(Pid) -> send_dump(Pid, LocPids, JoinRef, LocalDump2, JoinOpts) end, LocF = fun(Pid) -> send_dump(Pid, RemPids, JoinRef, RemoteDump2, JoinOpts) end, @@ -146,6 +151,53 @@ apply_resolver_for_sorted( apply_resolver_for_sorted(LocalDump, RemoteDump, _F, _Pos, LocalAcc, RemoteAcc) -> {lists:reverse(LocalAcc, LocalDump), lists:reverse(RemoteAcc, RemoteDump)}. +get_pids(Pid) -> + case cets:other_pids(Pid) of + Pids when is_list(Pids) -> + [Pid | Pids]; + Other -> + error({get_other_pids_failed, Pid, Other}) + end. + +%% Checks that other_pids lists match for all nodes +%% If they are not matching - the node removal process could be in progress +check_fully_connected(Pids) -> + Lists = [get_pids(Pid) || Pid <- Pids], + case are_fully_connected_lists([Pids | Lists]) of + true -> + check_same_join_ref(Pids); + false -> + ?LOG_ERROR(#{ + what => check_fully_connected_failed, + expected_pids => Pids, + server_lists => Lists + }), + error(check_fully_connected_failed) + end. + +%% Check that all elements of the list match (if sorted) +are_fully_connected_lists(Lists) -> + length(lists:usort([lists:sort(List) || List <- Lists])) =:= 1. + +%% Check if all nodes have the same join_ref +%% If not - we don't want to continue joining +check_same_join_ref(Pids) -> + Refs = [pid_to_join_ref(Pid) || Pid <- Pids], + case lists:usort(Refs) of + [_] -> + ok; + _ -> + ?LOG_ERROR(#{ + what => check_same_join_ref_failed, + refs => lists:zip(Pids, Refs) + }), + error(check_same_join_ref_failed) + end. + +pid_to_join_ref(Pid) -> + #{join_ref := JoinRef} = cets:info(Pid), + JoinRef. + run_step(Step, #{step_handler := F}) -> F(Step); run_step(_Step, _Opts) -> diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 143b37b8..293268e5 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -31,6 +31,7 @@ all() -> join_fails_before_send_dump, join_fails_before_send_dump_and_there_are_pending_remote_ops, send_dump_fails_during_join_because_receiver_exits, + join_fails_in_check_fully_connected, test_multinode, test_multinode_remote_insert, node_list_is_correct, @@ -450,6 +451,29 @@ send_dump_fails_during_join_because_receiver_exits(Config) -> cets:insert(Pid1, {1}), {ok, [{1}]} = cets:remote_dump(Pid1). +join_fails_in_check_fully_connected(Config) -> + Me = self(), + {ok, Pid1} = cets:start(make_name(Config, 1), #{}), + {ok, Pid2} = cets:start(make_name(Config, 2), #{}), + {ok, Pid3} = cets:start(make_name(Config, 3), #{}), + %% Pid2 and Pid3 are connected + ok = cets_join:join(lock_name(Config), #{}, Pid2, Pid3, #{}), + [Pid3] = cets:other_pids(Pid2), + F = fun + (before_check_fully_connected) -> + %% Ask Pid2 to remove Pid3 from the list + Pid2 ! {'DOWN', make_ref(), process, Pid3, sim_error}, + %% Ensure Pid2 did the cleaning + pong = cets:ping(Pid2), + [] = cets:other_pids(Pid2), + Me ! before_check_fully_connected_called; + (_) -> + ok + end, + {error, {error, check_fully_connected_failed, _}} = + cets_join:join(lock_name(Config), #{}, Pid1, Pid2, #{step_handler => F}), + receive_message(before_check_fully_connected_called). + test_multinode(Config) -> Node1 = node(), [Node2, Node3, Node4] = proplists:get_value(nodes, Config),