Skip to content

Commit

Permalink
send timeout is not needed, as value has alredy been sent
Browse files Browse the repository at this point in the history
  • Loading branch information
grrrisu committed Oct 10, 2023
1 parent d7d98fb commit 8720898
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 38 deletions.
27 changes: 11 additions & 16 deletions apps/sim/lib/sim/realm/access_proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ defmodule Sim.AccessProxy do
GenServer.start_link(__MODULE__, Keyword.delete(opts, :name), name: opts[:name] || __MODULE__)
end

def get(func \\ & &1, server \\ __MODULE__) do
def get(server \\ __MODULE__, func \\ & &1) do
GenServer.call(server, {:get, func})
end

def exclusive_get(func \\ & &1, server \\ __MODULE__) do
def exclusive_get(server \\ __MODULE__, func \\ & &1) do
GenServer.call(server, {:exclusive_get, func})
end

def update(data, server \\ __MODULE__)
def update(server \\ __MODULE__, data)

def update(func, server) when is_function(func) do
def update(server, func) when is_function(func) do
GenServer.call(server, {:update, func})
end

def update(data, server) do
def update(server, data) do
GenServer.call(server, {:update, fn _ -> data end})
end

Expand Down Expand Up @@ -95,24 +95,25 @@ defmodule Sim.AccessProxy do
end

def handle_call({:update, _func}, _from, state) do
{:reply, {:error, "request the data first with AccessProxy#exclusive_get"}, state}
{:reply,
{:error,
"request the data first with AccessProxy#exclusive_get or maybe too much time elapsed since exclusive_get was called"},
state}
end

def handle_info(
{:check_timeout, {pid, _ref} = caller, monitor_ref},
{:check_timeout, {pid, _ref}, monitor_ref},
%{caller: {pid, monitor_ref}, requests: []} = state
) do
Process.demonitor(monitor_ref, [:flush])
reply_with_timeout(caller)
{:noreply, %{state | caller: nil}}
end

def handle_info(
{:check_timeout, {pid, _ref} = caller, monitor_ref},
{:check_timeout, {pid, _ref}, monitor_ref},
%{caller: {pid, monitor_ref}} = state
) do
Process.demonitor(monitor_ref, [:flush])
reply_with_timeout(caller)
{next_caller, state} = reply_to_next_caller(state)
{:noreply, %{state | caller: next_caller}}
end
Expand Down Expand Up @@ -164,10 +165,4 @@ defmodule Sim.AccessProxy do
defp start_check_timeout(current_caller, monitor_ref, max_duration) do
Process.send_after(self(), {:check_timeout, current_caller, monitor_ref}, max_duration)
end

defp reply_with_timeout(caller) do
msg = "Timeout: exclusive_get took too long to release it again with an update call"
# we don't care if the caller receives the message or not
:ok = GenServer.reply(caller, {:error, msg})
end
end
71 changes: 49 additions & 22 deletions apps/sim/test/sim/realm/access_process_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,43 @@ defmodule Sim.AccessProxyTest do
end

describe "exclusive_get" do
test "are executed in sequence" do
test "are executed in sequence", %{proxy: proxy} do
1..3
|> Enum.map(fn _n ->
Task.async(fn ->
value = AccessProxy.exclusive_get()
value = AccessProxy.exclusive_get(proxy)
Process.sleep(100)
:ok = AccessProxy.update(value + 1)
:ok = AccessProxy.update(proxy, value + 1)
end)
end)
|> Task.await_many()

assert 45 == AccessProxy.get()
end

test "allow update only after exclusive_get" do
value = AccessProxy.get()
assert {:error, _} = AccessProxy.update(value + 1)
assert 42 == AccessProxy.get()
test "allow update only after exclusive_get", %{proxy: proxy} do
value = AccessProxy.get(proxy)
assert {:error, _} = AccessProxy.update(proxy, value + 1)
assert 42 == AccessProxy.get(proxy)
end

test "get never blocks" do
test "get never blocks", %{proxy: proxy} do
result =
[
Task.async(fn ->
value = AccessProxy.exclusive_get()
value = AccessProxy.exclusive_get(proxy)
Process.sleep(100)
:ok = AccessProxy.update(value + 1)
:ok = AccessProxy.update(proxy, value + 1)
AccessProxy.get()
end),
Task.async(fn ->
Process.sleep(10)
AccessProxy.get()
end),
Task.async(fn ->
value = AccessProxy.exclusive_get()
value = AccessProxy.exclusive_get(proxy)
Process.sleep(100)
:ok = AccessProxy.update(value + 1)
:ok = AccessProxy.update(proxy, value + 1)
AccessProxy.get()
end)
]
Expand All @@ -57,18 +57,18 @@ defmodule Sim.AccessProxyTest do
assert [43, 42, 44] = result
end

test "remove lock if current client crashes", %{supervisor: supervisor} do
test "remove lock if current client crashes", %{proxy: proxy, supervisor: supervisor} do
result =
[
Task.Supervisor.async_stream_nolink(supervisor, [1], fn _n ->
_value = AccessProxy.exclusive_get()
_value = AccessProxy.exclusive_get(proxy)
Process.sleep(10)
Process.exit(self(), :upps)
end),
Task.Supervisor.async_stream_nolink(supervisor, [2], fn _n ->
value = AccessProxy.exclusive_get()
value = AccessProxy.exclusive_get(proxy)
Process.sleep(100)
:ok = AccessProxy.update(value + 1)
:ok = AccessProxy.update(proxy, value + 1)
AccessProxy.get()
end)
]
Expand All @@ -78,24 +78,24 @@ defmodule Sim.AccessProxyTest do
assert [exit: :upps, ok: 43] = result
end

test "remove from requests if queued client crashes", %{supervisor: supervisor} do
test "remove from requests if queued client crashes", %{proxy: proxy, supervisor: supervisor} do
result =
[
Task.Supervisor.async_stream_nolink(supervisor, [2], fn _n ->
value = AccessProxy.exclusive_get()
value = AccessProxy.exclusive_get(proxy)
Process.sleep(100)
:ok = AccessProxy.update(value + 1)
:ok = AccessProxy.update(proxy, value + 1)
AccessProxy.get()
end),
Task.Supervisor.async_stream_nolink(supervisor, [1], fn _n ->
_value = AccessProxy.exclusive_get()
_value = AccessProxy.exclusive_get(proxy)
Process.sleep(10)
Process.exit(self(), :upps)
end),
Task.Supervisor.async_stream_nolink(supervisor, [2], fn _n ->
value = AccessProxy.exclusive_get()
value = AccessProxy.exclusive_get(proxy)
Process.sleep(100)
:ok = AccessProxy.update(value + 1)
:ok = AccessProxy.update(proxy, value + 1)
AccessProxy.get()
end)
]
Expand All @@ -104,5 +104,32 @@ defmodule Sim.AccessProxyTest do

assert [ok: 43, exit: :upps, ok: 44] = result
end

test "timeouted request should not be able to update", %{agent: agent} do
proxy =
start_supervised!(
{AccessProxy, [name: :fast_access, agent: agent, max_duration: 50]},
id: :fast_access
)

[
Task.async(fn ->
value = AccessProxy.exclusive_get(proxy)
Process.sleep(100)
{:error, _msg} = AccessProxy.update(proxy, value + 10)
end),
Task.async(fn ->
value = AccessProxy.exclusive_get(proxy)
:ok = AccessProxy.update(proxy, value + 1)
end),
Task.async(fn ->
value = AccessProxy.exclusive_get(proxy)
:ok = AccessProxy.update(proxy, value + 1)
end)
]
|> Task.await_many()

assert 44 == AccessProxy.get()
end
end
end

0 comments on commit 8720898

Please sign in to comment.