Skip to content

Commit

Permalink
Properly handle of tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Apr 24, 2024
1 parent 25e5bde commit 5cdda57
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 26 deletions.
15 changes: 8 additions & 7 deletions lib/gen_mix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ defmodule Strom.GenMix do
end
end)
|> Stream.run()

{:task_done, name}
end)
end

Expand Down Expand Up @@ -184,13 +186,7 @@ defmodule Strom.GenMix do
end

@impl true
def handle_info({_task_ref, :ok}, mix) do
# do nothing for now
{:noreply, mix}
end

def handle_info({:DOWN, _task_ref, :process, task_pid, :normal}, mix) do
{name, _task} = Enum.find(mix.tasks, fn {_name, task} -> task.pid == task_pid end)
def handle_info({_task_ref, {:task_done, name}}, mix) do
mix = %{mix | tasks: Map.delete(mix.tasks, name)}

Enum.each(mix.waiting_clients, fn {_name, client_pid} ->
Expand All @@ -200,6 +196,11 @@ defmodule Strom.GenMix do
{:noreply, %{mix | waiting_clients: %{}}}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, mix) do
# do nothing
{:noreply, mix}
end

def handle_info({:DOWN, _task_ref, :process, task_pid, _not_normal}, mix) do
{name, _task} = Enum.find(mix.tasks, fn {_name, task} -> task.pid == task_pid end)
{{^name, function}, stream} = Enum.find(mix.input_streams, fn {{n, _}, _} -> n == name end)
Expand Down
11 changes: 7 additions & 4 deletions lib/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ defmodule Strom.Sink do
{[], sink}
end)
|> Stream.run()

:task_done
end)
end

Expand Down Expand Up @@ -114,13 +116,14 @@ defmodule Strom.Sink do
def handle_call(:__state__, _from, sink), do: {:reply, sink, sink}

@impl true
def handle_info({_task_ref, :ok}, sink) do
def handle_info({_task_ref, :task_done}, sink) do
# do nothing for now
{:noreply, sink}
{:noreply, %{sink | task: nil}}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, source) do
{:noreply, %{source | task: nil}}
def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, sink) do
# do nothing for now
{:noreply, sink}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, _not_normal}, sink) do
Expand Down
14 changes: 7 additions & 7 deletions lib/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ defmodule Strom.Source do
defp loop_call(source) do
case call_source(source) do
{:halt, _source} ->
:ok
:task_done

{events, source} ->
GenServer.cast(source.pid, {:new_data, events})
Expand Down Expand Up @@ -223,19 +223,19 @@ defmodule Strom.Source do
end

@impl true
def handle_info({_task_ref, :ok}, source) do
# do nothing for now
{:noreply, source}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, source) do
def handle_info({_task_ref, :task_done}, source) do
if source.waiting_client do
send(source.waiting_client, :continue_client)
end

{:noreply, %{source | task: nil, waiting_client: nil}}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, source) do
# do nothing for now
{:noreply, source}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, _not_normal}, source) do
task = async_run_input(source)

Expand Down
15 changes: 8 additions & 7 deletions lib/transformer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ defmodule Strom.Transformer do
{[], new_acc}
end)
|> Stream.run()

{:task_done, name}
end)
end

Expand Down Expand Up @@ -238,20 +240,19 @@ defmodule Strom.Transformer do
end

@impl true
def handle_info({_task_ref, :ok}, transformer) do
# do nothing for now
{:noreply, transformer}
end

def handle_info({:DOWN, _task_ref, :process, task_pid, :normal}, transformer) do
{name, _task} = Enum.find(transformer.tasks, fn {_name, task} -> task.pid == task_pid end)
def handle_info({_task_ref, {:task_done, name}}, transformer) do
tasks = Map.delete(transformer.tasks, name)

waiting_clients = continue_waiting_client(transformer.waiting_clients, name)

{:noreply, %{transformer | waiting_clients: waiting_clients, tasks: tasks}}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, transformer) do
# do nothing for now
{:noreply, transformer}
end

def handle_info({:DOWN, _task_ref, :process, task_pid, _not_normal}, transformer) do
{name, _task} = Enum.find(transformer.tasks, fn {_name, task} -> task.pid == task_pid end)
stream = Map.fetch!(transformer.input_streams, name)
Expand Down
1 change: 0 additions & 1 deletion test/examples/parcels_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
defmodule Strom.Examples.ParcelsTest do
use ExUnit.Case
@moduletag timeout: :infinity

alias Strom.Composite

Expand Down

0 comments on commit 5cdda57

Please sign in to comment.