Skip to content

Commit

Permalink
Cleanup, let the depcache monitor the writers
Browse files Browse the repository at this point in the history
  • Loading branch information
mmzeeman committed Jan 24, 2025
1 parent 654bc1b commit 240b57a
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 65 deletions.
126 changes: 63 additions & 63 deletions src/depcache.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% @author Arjan Scherpenisse
%% @copyright 2009-2020 Marc Worrell, Arjan Scherpenisse
%% @copyright 2009-2025 Marc Worrell, Arjan Scherpenisse
%% @doc Depcache API
%%
%% == depcache API ==
Expand All @@ -14,7 +14,7 @@
%% {@link cleanup/1}, {@link cleanup/5}
%%
%% @end
%% Copyright 2009-2020 Marc Worrell, Arjan Scherpenisse
%% Copyright 2009-2025 Marc Worrell, Arjan Scherpenisse
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,7 +92,7 @@
}).

-type tables() :: #tables{meta_table :: ets:tab(), deps_table :: ets:tab(), data_table :: ets:tab()}.
-type state() :: #state{now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map()}.
-type state() :: #state{now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map(), writers :: map()}.
-type depend() :: #depend{key :: key(), serial :: non_neg_integer()}.
-type cleanup_state() :: #cleanup_state{pid :: pid(), tables :: tables(), name :: atom(), memory_max :: non_neg_integer(), callback :: callback() | undefined}.
-type meta() :: #meta{key :: key(), expire :: sec(), serial :: non_neg_integer(), depend :: dependencies()}.
Expand Down Expand Up @@ -303,12 +303,12 @@ memo(Fun, Key, MaxAge, Dep, Server) ->
case ?MODULE:get_wait(Key1, Server) of
{ok, Value} ->
Value;
{throw, premature_exit} ->
?MODULE:memo(Fun, Key, MaxAge, Dep, Server);
{throw, R} ->
throw(R);
undefined ->
memo_key(Fun, Key, MaxAge, Dep, Server)
memo_key(Fun, Key, MaxAge, Dep, Server);
{error, premature_exit} ->
?MODULE:memo(Fun, Key, MaxAge, Dep, Server)
end.

%% @private
Expand All @@ -325,51 +325,29 @@ memo(Fun, Key, MaxAge, Dep, Server) ->
Server :: depcache_server(),
Result :: any().
memo_key(Fun, Key, MaxAge, Dep, Server) ->
%%ExitWatcher = start_exit_watcher(Key, Server),
%%try
try
{Value1, MaxAge1, Dep1} = case apply_fun(Fun) of
#memo{value=V, max_age=MA, deps=D} ->
MA1 = case is_integer(MA) of
true -> MA;
false -> MaxAge
end,
{V, MA1, Dep++D};
Value ->
{Value, MaxAge, Dep}
end,
case MaxAge of
0 -> memo_send_replies(Key, Value1, Server);
_ -> set(Key, Value1, MaxAge1, Dep1, Server)
end,
try
{Value1, MaxAge1, Dep1} = case apply_fun(Fun) of
#memo{value=V, max_age=MA, deps=D} ->
MA1 = case is_integer(MA) of
true -> MA;
false -> MaxAge
end,
{V, MA1, Dep++D};
Value ->
{Value, MaxAge, Dep}
end,
case MaxAge of
0 -> memo_send_replies(Key, Value1, Server);
_ -> set(Key, Value1, MaxAge1, Dep1, Server)
end,

Value1
catch
?WITH_STACKTRACE(Class, R, S)
memo_send_errors(Key, {throw, R}, Server),
erlang:raise(Class, R, S)
end.
%%after
%% stop_exit_watcher(ExitWatcher)
%%end.
Value1
catch
?WITH_STACKTRACE(Class, R, S)
memo_send_errors(Key, {throw, R}, Server),
erlang:raise(Class, R, S)
end.

%% @private
%% @doc Monitors the current process...
%% Sends premature_exit throw to depcache server when it detects one.
%%start_exit_watcher(Key, Server) ->
%% Self = self(),
%% spawn(fun() ->
%% Ref = monitor(process, Self),
%% receive
%% done ->
%% erlang:demonitor(Ref);
%% {'DOWN', Ref, process, Self, _Reason} ->
%% memo_send_errors(Key, {throw, premature_exit}, Server)
%% end
%% end).
%%
%%stop_exit_watcher(Pid) ->
%% Pid ! done.

%% @private
%% @doc Execute the memo function
Expand Down Expand Up @@ -487,7 +465,7 @@ set(Key, Data, MaxAge, Depend, Server) ->
-spec get_wait( Key, Server ) -> Result when
Key :: key(),
Server :: depcache_server(),
Result :: {ok, any()} | undefined | {throw, term()}.
Result :: {ok, any()} | undefined | {throw, term()} | {error, premature_exit}.
get_wait(Key, Server) ->
case get_process_dict(Key, Server) of
NoValue when NoValue =:= undefined orelse NoValue =:= depcache_disabled ->
Expand Down Expand Up @@ -941,10 +919,20 @@ handle_info(tick, State) ->
erase_process_dict(),
{noreply, State#state{now=now_sec()}};

handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) ->
io:fwrite(standard_error, "Down for ~p~n", [Ref]),

{noreply, State};
handle_info({'DOWN', Ref, process, _Pid, _Reason}, #state{ writers = Writers }=State) ->
case maps:take(Ref, Writers) of
error ->
{noreply, State};
{Key, Writers1} ->
WaitPids1 = case maps:take(Key, State#state.wait_pids) of
error ->
State#state.wait_pids;
{{_MaxAge, List, _WriterRef}, WaitPids} ->
_ = [ catch gen_server:reply(From, {error, premature_exit}) || From <- List ],
WaitPids
end,
{noreply, State#state{ writers = Writers1, wait_pids = WaitPids1 }}
end;

handle_info(_Msg, State) ->
{noreply, State}.
Expand Down Expand Up @@ -1008,19 +996,29 @@ handle_call_get_wait(Key, From, #state{tables = Tables} = State) ->
undefined -> State
end,
case State#state.wait_pids of
#{Key := {MaxAge, List}} when State#state.now < MaxAge ->
#{Key := {MaxAge, List, WriterRef}} when State#state.now < MaxAge ->
%% Another process is already calculating the value, let the caller wait.
WaitPids = maps:update(Key, {MaxAge, [From|List]}, State#state.wait_pids),
WaitPids = maps:update(Key, {MaxAge, [From|List], WriterRef}, State#state.wait_pids),
{noreply, State#state{wait_pids=WaitPids}};
_ ->
%% Monitor the sender as a writer for Key
%% de-monitor an old writer, if any.
Writers = case maps:find(Key, State#state.wait_pids) of
error ->
State#state.writers;
{ok, {_, _WaitPids, OldWriter}} ->
erlang:demonitor(OldWriter),
maps:without([OldWriter], State#state.writers)
end,

%% Monitor and register the writer
{Pid, _} = From,
Ref = erlang:monitor(process, Pid),
Writers = maps:put(Ref, Key, State#state.writers),
Writers1 = maps:put(Ref, Key, Writers),

%% Nobody waiting or we hit a timeout, let next requestors wait for this caller.
WaitPids = maps:put(Key, {State#state.now+?MAX_GET_WAIT, []}, State#state.wait_pids),
{reply, undefined, State#state{wait_pids=WaitPids, writers=Writers}}
WaitPids = maps:put(Key, {State#state.now+?MAX_GET_WAIT, [], Ref}, State#state.wait_pids),

{reply, undefined, State#state{wait_pids=WaitPids, writers=Writers1}}
end;
{ok, _Value} = Found ->
{reply, Found, State}
Expand All @@ -1039,7 +1037,7 @@ handle_call_get_wait(Key, From, #state{tables = Tables} = State) ->
Tag :: atom().
handle_call_get_waiting_pids(Key, State) ->
{State1, Pids} = case maps:take(Key, State#state.wait_pids) of
{{_MaxAge, List}, WaitPids} ->
{{_MaxAge, List, _WriterRef}, WaitPids} ->
{State#state{wait_pids=WaitPids}, List};
error ->
{State, []}
Expand Down Expand Up @@ -1122,9 +1120,11 @@ handle_call_set({Key, Data, MaxAge, Depend}, #state{tables = Tables} = State) ->

%% Check if other processes are waiting for this key, send them the data
case maps:take(Key, State1#state.wait_pids) of
{{_MaxAge, List}, WaitPids} ->
{{_MaxAge, List, WriterRef}, WaitPids} ->
_ = [ catch gen_server:reply(From, {ok, Data}) || From <- List ],
{reply, ok, State1#state{wait_pids=WaitPids}};
_ = erlang:demonitor(WriterRef),
Writers = maps:without([WriterRef], State#state.writers),
{reply, ok, State1#state{writers=Writers, wait_pids=WaitPids}};
error ->
{reply, ok, State1}
end.
Expand Down
4 changes: 2 additions & 2 deletions test/depcache_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ memo_premature_kill_test() ->
timer:sleep(500),
done
end,
depcache:memo(Fun, test, C)
depcache:memo(Fun, premature_kill_test, C)
end,

Pid = spawn(LongTask),
timer:kill_after(250, Pid),
timer:sleep(50),
?assertEqual({throw, premature_exit}, depcache:get_wait(test, C)),
?assertEqual({error, premature_exit}, depcache:get_wait(premature_kill_test, C)),

% Check if another process takes over processing in case of pre-mature exits
Task = spawn(LongTask),
Expand Down

0 comments on commit 240b57a

Please sign in to comment.