Skip to content

Commit

Permalink
Simplify gen_mix and transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Apr 23, 2024
1 parent 789ccd2 commit 49f078e
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 198 deletions.
2 changes: 1 addition & 1 deletion lib/composite.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ defmodule Strom.Composite do
end

@impl true
def handle_info(:continue, composite) do
def handle_info(:continue_client, composite) do
{:noreply, composite}
end

Expand Down
148 changes: 100 additions & 48 deletions lib/gen_mix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ defmodule Strom.GenMix do
use GenServer

@chunk 1
@buffer 1000

defstruct pid: nil,
inputs: [],
outputs: [],
opts: [],
running: false,
chunk: @chunk,
producers: %{},
consumers: %{}

alias Strom.GenMix.Consumer
buffer: @buffer,
tasks: %{},
data: %{},
waiting_clients: %{}

def start(%__MODULE__{opts: opts} = gen_mix) when is_list(opts) do
gen_mix = %{
gen_mix
| chunk: Keyword.get(opts, :chunk, @chunk)
| chunk: Keyword.get(opts, :chunk, @chunk),
buffer: Keyword.get(opts, :buffer, @buffer)
}

start_link(gen_mix)
Expand All @@ -44,36 +45,43 @@ defmodule Strom.GenMix do
GenServer.call(pid, :stop)
end

defp run_inputs(streams, pid, chunk) do
defp run_inputs(streams, mix) do
Enum.reduce(streams, %{}, fn {{name, fun}, stream}, acc ->
task = async_run_stream({name, fun}, stream, chunk, pid)
Map.put(acc, {name, fun}, task)
task = async_run_stream({name, fun}, stream, mix)
Map.put(acc, name, task)
end)
end

defp async_run_stream({name, fun}, stream, chunk, pid) do
defp async_run_stream({name, fun}, stream, mix) do
Task.async(fn ->
stream
|> Stream.chunk_every(chunk)
|> Stream.chunk_every(mix.chunk)
|> Stream.each(fn chunk ->
{chunk, _} = Enum.split_with(chunk, fun)
GenServer.cast(pid, {:new_data, {name, fun}, chunk})

new_data =
Enum.reduce(mix.outputs, %{}, fn {name, fun}, acc ->
{data, _} = Enum.split_with(chunk, fun)
Map.put(acc, name, data)
end)

GenServer.cast(mix.pid, {:new_data, name, new_data})

receive do
:continue ->
flush()
:continue_task ->
flush(:continue_task)
end
end)
|> Stream.run()

GenServer.cast(pid, {:done, {name, fun}})
GenServer.cast(mix.pid, {:done, name})
end)
end

defp flush do
defp flush(message) do
receive do
:continue ->
flush()
^message ->
flush(message)
after
0 -> :ok
end
Expand All @@ -86,60 +94,104 @@ defmodule Strom.GenMix do
Map.put(acc, {name, fun}, Map.fetch!(flow, name))
end)

{sub_flow, mix} =
mix.outputs
|> Enum.reduce({%{}, mix}, fn {name, fun}, {flow, mix} ->
consumer = Consumer.start({name, fun}, mix.pid)

mix = %{mix | consumers: Map.put(mix.consumers, {name, fun}, consumer)}
tasks = run_inputs(input_streams, mix)

stream = Consumer.call(consumer)
{Map.put(flow, name, stream), mix}
sub_flow =
mix.outputs
|> Enum.reduce(%{}, fn {name, _fun}, flow ->
stream =
Stream.resource(
fn ->
nil
end,
fn nil ->
case GenServer.call(mix.pid, {:get_data, name}, :infinity) do
{:data, data} ->
{data, nil}

:done ->
{:halt, nil}

:pause ->
receive do
:continue_client ->
flush(:continue_client)
{[], nil}
end
end
end,
fn nil -> nil end
)

Map.put(flow, name, stream)
end)

producers = run_inputs(input_streams, mix.pid, mix.chunk)

flow =
flow
|> Map.drop(Map.keys(mix.inputs))
|> Map.merge(sub_flow)

{:reply, flow, %{mix | running: true, producers: producers}}
{:reply, flow, %{mix | tasks: tasks}}
end

def handle_call(:stop, _from, %__MODULE__{} = mix) do
{:stop, :normal, :ok, %{mix | running: false}}
{:stop, :normal, :ok, mix}
end

@impl true
def handle_cast({:new_data, {_name, _fun}, chunk}, %__MODULE__{} = mix) do
Enum.each(mix.consumers, fn {_, cons} ->
GenServer.cast(cons.pid, {:put_data, chunk})
GenServer.cast(cons.pid, :continue)
end)
def handle_call({:get_data, name}, {pid, _ref}, mix) do
data = Map.get(mix.data, name, [])
mix = %{mix | data: Map.put(mix.data, name, [])}

{:noreply, mix}
end
total_count = Enum.reduce(mix.data, 0, fn {_name, data}, count -> count + length(data) end)

if total_count <= mix.buffer do
Enum.each(mix.tasks, fn {_, task} -> send(task.pid, :continue_task) end)
end

cond do
length(data) == 0 and map_size(mix.tasks) == 0 ->
{:reply, :done, mix}

length(data) == 0 ->
waiting_clients = Map.put(mix.waiting_clients, name, pid)
{:reply, :pause, %{mix | waiting_clients: waiting_clients}}

def handle_cast({:done, {name, fun}}, %__MODULE__{} = mix) do
mix = %{mix | producers: Map.delete(mix.producers, {name, fun})}
true ->
{:reply, {:data, data}, mix}
end
end

if map_size(mix.producers) == 0 do
Enum.each(mix.consumers, fn {_, cons} ->
GenServer.cast(cons.pid, :continue)
GenServer.cast(cons.pid, :stop)
@impl true
def handle_cast({:new_data, name, new_data}, %__MODULE__{} = mix) do
{all_mix_data, total_count} =
Enum.reduce(new_data, {mix.data, 0}, fn {name, data}, {all_mix_data, count} ->
prev_data = Map.get(all_mix_data, name, [])
all_data = prev_data ++ data
{Map.put(all_mix_data, name, all_data), count + length(all_data)}
end)

Enum.each(mix.waiting_clients, fn {_name, client_pid} ->
send(client_pid, :continue_client)
end)

if total_count <= mix.buffer do
task = Map.fetch!(mix.tasks, name)
send(task.pid, :continue_task)
end

mix = %{mix | data: all_mix_data, waiting_clients: %{}}

{:noreply, mix}
end

def handle_cast({:consumer_got_data, {_name, _fun}}, %__MODULE__{} = mix) do
Enum.each(mix.producers, fn {_, task} ->
send(task.pid, :continue)
def handle_cast({:done, name}, %__MODULE__{} = mix) do
mix = %{mix | tasks: Map.delete(mix.tasks, name)}

Enum.each(mix.waiting_clients, fn {_name, client_pid} ->
send(client_pid, :continue_client)
end)

{:noreply, mix}
{:noreply, %{mix | waiting_clients: %{}}}
end

@impl true
Expand Down
109 changes: 0 additions & 109 deletions lib/gen_mix/consumer.ex

This file was deleted.

1 change: 0 additions & 1 deletion lib/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ defmodule Strom.Topology do
...> end
iex> OddEvenTopology.start()
iex> %{even: even} = OddEvenTopology.call(%{})
iex> OddEvenTopology.stop()
iex> Enum.sort(Enum.to_list(even))
[2, 4, 6]
"""
Expand Down
Loading

0 comments on commit 49f078e

Please sign in to comment.