Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make CETS joining logic more reliable #12

Merged
merged 17 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 100 additions & 44 deletions src/cets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
delete_objects/2,
dump/1,
remote_dump/1,
send_dump/3,
send_dump/4,
table_name/1,
other_nodes/1,
other_pids/1,
Expand Down Expand Up @@ -93,6 +93,8 @@
| {via, module(), term()}.
-type request_id() :: gen_server:request_id().
-type from() :: gen_server:from().
-type join_ref() :: cets_join:join_ref().
-type ack_pid() :: cets_ack:ack_pid().
-type op() ::
{insert, tuple()}
| {delete, term()}
Expand All @@ -102,13 +104,15 @@
| {delete_objects, [term()]}
| {insert_new, tuple()}
| {leader_op, op()}.
-type remote_op() :: {remote_op, Op :: op(), From :: from(), AckPid :: cets_ack:ack_pid()}.
-type remote_op() ::
{remote_op, Op :: op(), From :: from(), AckPid :: ack_pid(), JoinRef :: join_ref()}.
-type backlog_entry() :: {op(), from()}.
-type table_name() :: atom().
-type pause_monitor() :: reference().
-type state() :: #{
tab := table_name(),
ack_pid := cets_ack:ack_pid(),
ack_pid := ack_pid(),
join_ref := join_ref(),
%% Updated by set_other_servers/2 function only
other_servers := [server_pid()],
leader := server_pid(),
Expand All @@ -129,14 +133,15 @@
| {unpause, reference()}
| get_leader
| {set_leader, boolean()}
| {send_dump, [server_pid()], [tuple()]}.
| {send_dump, [server_pid()], join_ref(), [tuple()]}.

-type info() :: #{
table := table_name(),
nodes := [node()],
size := non_neg_integer(),
memory := non_neg_integer(),
ack_pid := cets_ack:ack_pid(),
ack_pid := ack_pid(),
join_ref := join_ref(),
opts := start_opts()
}.

Expand Down Expand Up @@ -197,10 +202,10 @@ table_name(Tab) when is_atom(Tab) ->
table_name(Server) ->
cets_call:long_call(Server, table_name).

-spec send_dump(server_ref(), [server_pid()], [tuple()]) -> ok.
send_dump(Server, NewPids, OurDump) ->
Info = #{msg => send_dump, count => length(OurDump)},
cets_call:long_call(Server, {send_dump, NewPids, OurDump}, Info).
-spec send_dump(server_ref(), [server_pid()], join_ref(), [tuple()]) -> ok.
send_dump(Server, NewPids, JoinRef, OurDump) ->
Info = #{msg => send_dump, join_ref => JoinRef, count => length(OurDump)},
cets_call:long_call(Server, {send_dump, NewPids, JoinRef, OurDump}, Info).

%% Only the node that owns the data could update/remove the data.
%% Ideally, Key should contain inserter node info so cleaning and merging is simplified.
Expand Down Expand Up @@ -332,6 +337,8 @@ init({Tab, Opts}) ->
tab => Tab,
ack_pid => AckPid,
other_servers => [],
%% Initial join_ref is random
join_ref => make_ref(),
leader => self(),
is_leader => true,
opts => Opts,
Expand Down Expand Up @@ -367,8 +374,8 @@ handle_call(remote_dump, From, State = #{tab := Tab}) ->
%% Do not block the main process (also reduces GC of the main process)
proc_lib:spawn_link(fun() -> gen_server:reply(From, {ok, dump(Tab)}) end),
{noreply, State};
handle_call({send_dump, NewPids, Dump}, _From, State) ->
handle_send_dump(NewPids, Dump, State);
handle_call({send_dump, NewPids, JoinRef, Dump}, _From, State) ->
handle_send_dump(NewPids, JoinRef, Dump, State);
handle_call(pause, _From = {FromPid, _}, State = #{pause_monitors := Mons}) ->
%% We monitor who pauses our server
Mon = erlang:monitor(process, FromPid),
Expand All @@ -386,11 +393,14 @@ handle_cast(Msg, State) ->
{noreply, State}.

-spec handle_info(term(), state()) -> {noreply, state()}.
handle_info({remote_op, Op, From, AckPid}, State) ->
handle_remote_op(Op, From, AckPid, State),
handle_info({remote_op, Op, From, AckPid, JoinRef}, State) ->
handle_remote_op(Op, From, AckPid, JoinRef, State),
{noreply, State};
handle_info({'DOWN', Mon, process, Pid, _Reason}, State) ->
{noreply, handle_down(Mon, Pid, State)};
handle_info({check_server, FromPid, JoinRef}, State) ->
handle_check_server(FromPid, JoinRef, State),
{noreply, State};
handle_info(Msg, State) ->
?LOG_ERROR(#{what => unexpected_info, msg => Msg}),
{noreply, State}.
Expand All @@ -403,21 +413,22 @@ code_change(_OldVsn, State, _Extra) ->

%% Internal logic

-spec handle_send_dump([server_pid()], [tuple()], state()) -> {reply, ok, state()}.
handle_send_dump(NewPids, Dump, State = #{tab := Tab, other_servers := Servers}) ->
-spec handle_send_dump([server_pid()], join_ref(), [tuple()], state()) -> {reply, ok, state()}.
handle_send_dump(NewPids, JoinRef, Dump, State = #{tab := Tab, other_servers := Servers}) ->
ets:insert(Tab, Dump),
Servers2 = add_servers(NewPids, Servers),
{reply, ok, set_other_servers(Servers2, State)}.
{reply, ok, set_other_servers(Servers2, State#{join_ref := JoinRef})}.

-spec handle_down(reference(), pid(), state()) -> state().
handle_down(Mon, Pid, State = #{pause_monitors := Mons}) ->
case lists:member(Mon, Mons) of
true ->
?LOG_ERROR(#{
Log = #{
what => pause_owner_crashed,
state => State,
paused_by_pid => Pid
}),
},
?LOG_ERROR(Log),
handle_unpause2(Mon, Mons, State);
false ->
handle_down2(Pid, State)
Expand All @@ -433,37 +444,38 @@ handle_down2(RemotePid, State = #{other_servers := Servers, ack_pid := AckPid})
set_other_servers(Servers2, State);
false ->
%% This should not happen
?LOG_ERROR(#{
Log = #{
what => handle_down_failed,
remote_pid => RemotePid,
state => State
}),
},
?LOG_ERROR(Log),
State
end.

%% Merge two lists of pids, create the missing monitors.
-spec add_servers(Servers, Servers) -> Servers when Servers :: [server_pid()].
add_servers(Pids, Servers) ->
lists:sort(add_servers2(self(), Pids, Servers) ++ Servers).

add_servers2(SelfPid, [SelfPid | OtherPids], Servers) ->
?LOG_INFO(#{what => join_to_the_same_pid_ignored}),
add_servers2(SelfPid, OtherPids, Servers);
add_servers2(SelfPid, [RemotePid | OtherPids], Servers) when is_pid(RemotePid) ->
case lists:member(RemotePid, Servers) of
false ->
erlang:monitor(process, RemotePid),
[RemotePid | add_servers2(SelfPid, OtherPids, Servers)];
true ->
?LOG_INFO(#{
%% Ignore ourself in the list
%% Also filter out already added servers
OtherServers = lists:delete(self(), lists:usort(Pids)),
NewServers = ordsets:subtract(OtherServers, Servers),
case ordsets:intersection(OtherServers, Servers) of
[] ->
ok;
Overlap ->
%% Should not happen (cets_join checks for it)
%% Still log it, if that happens
Log = #{
what => already_added,
remote_pid => RemotePid,
remote_node => node(RemotePid)
}),
add_servers2(SelfPid, OtherPids, Servers)
end;
add_servers2(_SelfPid, [], _Servers) ->
[].
already_added_servers => Overlap,
pids => Pids,
servers => Servers
},
?LOG_ERROR(Log)
end,
[erlang:monitor(process, Pid) || Pid <- NewServers],
ordsets:union(NewServers, Servers).

%% Sets other_servers field, chooses the leader
-spec set_other_servers([server_pid()], state()) -> state().
Expand Down Expand Up @@ -495,9 +507,20 @@ ets_delete_objects(Tab, Objects) ->
ok.

%% Handle operation from a remote node
-spec handle_remote_op(op(), from(), cets_ack:ack_pid(), state()) -> ok.
handle_remote_op(Op, From, AckPid, State) ->
-spec handle_remote_op(op(), from(), ack_pid(), join_ref(), state()) -> ok.
handle_remote_op(Op, From, AckPid, JoinRef, State = #{join_ref := JoinRef}) ->
do_op(Op, State),
cets_ack:ack(AckPid, From, self());
handle_remote_op(Op, From, AckPid, RemoteJoinRef, #{join_ref := JoinRef}) ->
Log = #{
what => drop_remote_op,
from => From,
remote_join_ref => RemoteJoinRef,
join_ref => JoinRef,
op => Op
},
?LOG_ERROR(Log),
%% We still need to reply to the remote process so it could stop waiting
cets_ack:ack(AckPid, From, self()).

%% Apply operation for one local table only
Expand Down Expand Up @@ -549,9 +572,9 @@ handle_leader_op(Op, From, State = #{leader := Leader}) ->
replicate(_Op, From, #{other_servers := []}) ->
%% Skip replication
gen_server:reply(From, ok);
replicate(Op, From, #{ack_pid := AckPid, other_servers := Servers}) ->
replicate(Op, From, #{ack_pid := AckPid, other_servers := Servers, join_ref := JoinRef}) ->
cets_ack:add(AckPid, From),
RemoteOp = {remote_op, Op, From, AckPid},
RemoteOp = {remote_op, Op, From, AckPid, JoinRef},
[send_remote_op(Server, RemoteOp) || Server <- Servers],
%% AckPid would call gen_server:reply(From, ok) once all the remote servers reply
ok.
Expand Down Expand Up @@ -584,17 +607,48 @@ handle_unpause2(Mon, Mons, State) ->
State2 = State#{pause_monitors := Mons2},
case Mons2 of
[] ->
send_check_servers(State2),
apply_backlog(State2);
_ ->
State2
end.

-spec send_check_servers(state()) -> ok.
send_check_servers(#{join_ref := JoinRef, other_servers := OtherPids}) ->
[send_check_server(Pid, JoinRef) || Pid <- OtherPids],
ok.

%% Send check_server before sending any new remote_op messages,
%% so the remote node has a chance to disconnect from us
%% (i.e. remove our pid from other_servers list and not allow remote ops)
-spec send_check_server(pid(), reference()) -> ok.
send_check_server(Pid, JoinRef) ->
Pid ! {check_server, self(), JoinRef},
ok.

handle_check_server(_FromPid, JoinRef, #{join_ref := JoinRef}) ->
ok;
handle_check_server(FromPid, RemoteJoinRef, #{join_ref := JoinRef}) ->
Log = #{
what => cets_check_server_failed,
text => <<"Disconnect the remote server">>,
remote_pid => FromPid,
remote_join_ref => RemoteJoinRef,
join_ref => JoinRef
},
?LOG_WARNING(Log),
%% Ask the remote server to disconnect from us
Reason = {check_server_failed, {RemoteJoinRef, JoinRef}},
FromPid ! {'DOWN', make_ref(), process, self(), Reason},
ok.

-spec handle_get_info(state()) -> info().
handle_get_info(
#{
tab := Tab,
other_servers := Servers,
ack_pid := AckPid,
join_ref := JoinRef,
opts := Opts
}
) ->
Expand All @@ -604,6 +658,7 @@ handle_get_info(
size => ets:info(Tab, size),
memory => ets:info(Tab, memory),
ack_pid => AckPid,
join_ref => JoinRef,
opts => Opts
}.

Expand All @@ -619,7 +674,8 @@ call_user_handle_down(RemotePid, #{tab := Tab, opts := Opts}) ->
remote_pid => RemotePid,
remote_node => node(RemotePid)
},
cets_long:run_safely(Info, FF);
%% Errors would be logged inside run_tracked
catch cets_long:run_tracked(Info, FF);
_ ->
ok
end.
Expand Down
9 changes: 5 additions & 4 deletions src/cets_call.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
Pid when is_pid(Pid) ->
Info2 = Info#{server => Server, pid => Pid, node => node(Pid)},
F = fun() -> gen_server:call(Pid, Msg, infinity) end,
cets_long:run_safely(Info2, F);
cets_long:run_tracked(Info2, F);
undefined ->
{error, pid_not_found}
error({pid_not_found, Server})

Check warning on line 34 in src/cets_call.erl

View check run for this annotation

Codecov / codecov/patch

src/cets_call.erl#L34

Added line #L34 was not covered by tests
end.

%% Contacts the local server to broadcast multinode operation.
Expand Down Expand Up @@ -68,13 +68,14 @@
Res = sync_operation(Leader, {leader_op, Op}),
case Res of
{error, {wrong_leader, ExpectedLeader}} ->
?LOG_WARNING(#{
Log = #{
what => wrong_leader,
server => Server,
operation => Op,
called_leader => Leader,
expected_leader => ExpectedLeader
}),
},
?LOG_WARNING(Log),
%% This could happen if a new node joins the cluster.
%% So, a simple retry should help.
case Backoff of
Expand Down
5 changes: 3 additions & 2 deletions src/cets_discovery_file.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
get_nodes(State = #{disco_file := Filename}) ->
case file:read_file(Filename) of
{error, Reason} ->
?LOG_ERROR(#{
Log = #{

Check warning on line 26 in src/cets_discovery_file.erl

View check run for this annotation

Codecov / codecov/patch

src/cets_discovery_file.erl#L26

Added line #L26 was not covered by tests
what => discovery_failed,
filename => Filename,
reason => Reason
}),
},
?LOG_ERROR(Log),

Check warning on line 31 in src/cets_discovery_file.erl

View check run for this annotation

Codecov / codecov/patch

src/cets_discovery_file.erl#L31

Added line #L31 was not covered by tests
{{error, Reason}, State};
{ok, Text} ->
Lines = binary:split(Text, [<<"\r">>, <<"\n">>, <<" ">>], [global]),
Expand Down
Loading