Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmi committed Dec 6, 2023
1 parent bfc2132 commit dfbad2c
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 120 deletions.
116 changes: 61 additions & 55 deletions lib/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -249,61 +249,6 @@ defmodule ALF.Manager do
end
end

def handle_call({:done?, stream_ref}, _from, state) do
tasks_set = Map.fetch!(state.tasks, stream_ref)
{:reply, {MapSet.size(tasks_set) == 0, state.ips[stream_ref]}, state}
end

def handle_call({:process_event, event, pipeline_module, producer_name, opts}, _from, state) do
task =
Task.async(fn ->
stream_ref = opts[:stream_ref]
timeout = opts[:timeout]

ip = build_ip(event, pipeline_module, stream_ref, opts[:debug])
# TODO move to build_ip -> build_ip(event, pipeline_module, opts)
Producer.load_ip(producer_name, ip)

ips =
case wait_result(stream_ref, [], {timeout, ip}) do
[] ->
[]

ips ->
Enum.reverse(Enum.map(ips, &format_ip/1))
end

{ips, stream_ref}
end)

tasks_set = Map.get(state.tasks, opts[:stream_ref], MapSet.new())
tasks_set = MapSet.put(tasks_set, task.ref)
tasks = Map.put(state.tasks, opts[:stream_ref], tasks_set)

ips = Map.get(state.ips, opts[:stream_ref], [])
state = %{state | tasks: tasks, ips: Map.put(state.ips, opts[:stream_ref], [])}
{:reply, ips, state}
end

@impl true
def handle_info({task_ref, {ips, stream_ref}}, state) do
tasks_set = Map.fetch!(state.tasks, stream_ref)
tasks_set = MapSet.delete(tasks_set, task_ref)
tasks = Map.put(state.tasks, stream_ref, tasks_set)

old_ips = Map.get(state.ips, stream_ref, [])
ips = Map.put(state.ips, stream_ref, old_ips ++ ips)

state = %{state | tasks: tasks, ips: ips}

{:noreply, state}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, state) do
# TODO track this somehow, what if task crashes in the middle
{:noreply, state}
end

defp do_sync_stream(pipeline_module, pipeline, stream, opts) do
stream
|> Stream.transform(
Expand Down Expand Up @@ -419,6 +364,45 @@ defmodule ALF.Manager do
end

@impl true
def handle_call({:process_event, event, pipeline_module, producer_name, opts}, _from, state) do
stream_ref = opts[:stream_ref]
timeout = opts[:timeout]

task =
Task.async(fn ->
ip = build_ip(event, pipeline_module, stream_ref, opts[:debug])
Producer.load_ip(producer_name, ip)

ips =
case wait_result(stream_ref, [], {timeout, ip}) do
[] -> []
ips -> Enum.reverse(Enum.map(ips, &format_ip/1))
end

{ips, stream_ref}
end)

tasks_set =
state.tasks
|> Map.get(stream_ref, MapSet.new())
|> MapSet.put(task.ref)

ips = Map.get(state.ips, stream_ref, [])

state = %{
state
| tasks: Map.put(state.tasks, stream_ref, tasks_set),
ips: Map.put(state.ips, stream_ref, [])
}

{:reply, ips, state}
end

def handle_call({:done?, stream_ref}, _from, state) do
tasks_set = Map.fetch!(state.tasks, stream_ref)
{:reply, {MapSet.size(tasks_set) == 0, state.ips[stream_ref]}, state}
end

def handle_call(:__state__, _from, state), do: {:reply, state, state}

def handle_call({:__set_state__, new_state}, _from, _state) do
Expand Down Expand Up @@ -482,6 +466,28 @@ defmodule ALF.Manager do
end

@impl true
def handle_info({task_ref, {ips, stream_ref}}, state) do
tasks_set =
state.tasks
|> Map.fetch!(stream_ref)
|> MapSet.delete(task_ref)

old_ips = Map.get(state.ips, stream_ref, [])

state = %{
state
| tasks: Map.put(state.tasks, stream_ref, tasks_set),
ips: Map.put(state.ips, stream_ref, old_ips ++ ips)
}

{:noreply, state}
end

def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, state) do
# TODO track this somehow, what if task crashes in the middle
{:noreply, state}
end

def handle_info({:DOWN, _ref, :process, pid, reason}, %__MODULE__{} = state) do
Logger.error(
"Component #{inspect(pid)} is :DOWN with reason: #{reason} in pipeline: #{state.pipeline_module}"
Expand Down
36 changes: 16 additions & 20 deletions test/integration/component_throw_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,14 @@ defmodule ALF.ComponentThrowTest do
|> ThrowInStagePipeline.stream(debug: true)
|> Enum.to_list()

assert [
%ErrorIP{
component: %ALF.Components.Stage{function: :add_one},
ip: %IP{} = ip,
error: :throw,
stacktrace: "throw in :add_one"
},
%IP{event: 6},
%IP{event: 8}
] = results
assert length(Enum.filter(results, &is_struct(&1, IP))) == 2

%ErrorIP{
component: %ALF.Components.Stage{function: :add_one},
ip: %IP{} = ip,
error: :throw,
stacktrace: "throw in :add_one"
} = Enum.find(results, &is_struct(&1, ErrorIP))

assert [{{:add_one, 0}, _event}] = ip.history
end
Expand Down Expand Up @@ -80,16 +78,14 @@ defmodule ALF.ComponentThrowTest do
|> ExitInStagePipeline.stream(debug: true)
|> Enum.to_list()

assert [
%ErrorIP{
component: %ALF.Components.Stage{function: :add_one},
ip: %IP{} = ip,
error: :exit,
stacktrace: "exit in :add_one"
},
%IP{event: 6},
%IP{event: 8}
] = results
assert length(Enum.filter(results, &is_struct(&1, IP))) == 2

%ErrorIP{
component: %ALF.Components.Stage{function: :add_one},
ip: %IP{} = ip,
error: :exit,
stacktrace: "exit in :add_one"
} = Enum.find(results, &is_struct(&1, ErrorIP))

assert [{{:add_one, 0}, _event}] = ip.history
end
Expand Down
46 changes: 1 addition & 45 deletions test/manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ defmodule ALF.ManagerTest do
|> Manager.stream(SimplePipelineToStream)
|> Enum.to_list()

assert results == [4, 6, 8]
assert Enum.sort(results) == [4, 6, 8]
end

test "run several streams at once" do
Expand Down Expand Up @@ -543,48 +543,4 @@ defmodule ALF.ManagerTest do
assert length(components) == 4
end
end

# describe "parallelism" do
# defmodule Pp do
# use ALF.DSL
#
# @components [
# stage(:aaa, count: 3),
# stage(:bbb, count: 3),
# stage(:ccc, count: 3)
# ]
#
# def aaa(event, _) do
# IO.inspect("aaa #{event}")
# Process.sleep(900)
# event
# end
#
# def bbb(event, _) do
# IO.inspect("bbb #{event}")
# Process.sleep(900)
# event
# end
#
# def ccc(event, _) do
# IO.inspect("ccc #{event}")
# Process.sleep(900)
# event
# end
# end
#
# setup do
# Pp.start()
# on_exit(&Pp.stop/0)
# end
#
# test "test" do
# stream =
# [1,2,3]
# |> Pp.stream
## |> Stream.run
# Enum.to_list(stream)
# |> IO.inspect
# end
# end
end

0 comments on commit dfbad2c

Please sign in to comment.