Skip to content

Commit 5144e5d

Browse files
authored
Refactor sink and source (#10)
* Simplify the Sink interface * Simplify the Source interface * Fix test
1 parent 5d917ee commit 5144e5d

File tree

11 files changed

+76
-55
lines changed

11 files changed

+76
-55
lines changed

lib/composite.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ defmodule Strom.Composite do
152152
end
153153

154154
defp timestamp_postfix do
155-
:erlang.system_time()
155+
System.os_time()
156156
|> rem(round(1.0e9))
157157
|> to_string()
158158
end

lib/loop.ex

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ defmodule Strom.Loop do
2828
end
2929
end
3030

31+
# call as a source
3132
def call(%__MODULE__{name: name} = loop) do
3233
Agent.get_and_update(name, fn data ->
3334
case data do
@@ -40,24 +41,25 @@ defmodule Strom.Loop do
4041
case loop.last_empty_call_at do
4142
nil ->
4243
Process.sleep(loop.sleep)
43-
{:ok, {[], %{loop | last_empty_call_at: System.os_time(:millisecond)}}}
44+
{[], %{loop | last_empty_call_at: System.os_time(:millisecond)}}
4445

4546
last_empty_call_at ->
4647
if System.os_time(:millisecond) - last_empty_call_at > loop.timeout do
47-
{:error, {:halt, loop}}
48+
{:halt, loop}
4849
else
49-
{:ok, {[], loop}}
50+
{[], loop}
5051
end
5152
end
5253

5354
datum ->
54-
{:ok, {[datum], %{loop | last_empty_call_at: nil}}}
55+
{[datum], %{loop | last_empty_call_at: nil}}
5556
end
5657
end
5758

59+
# call as a sink
5860
def call(%__MODULE__{name: name} = loop, data) do
5961
:ok = Agent.update(name, fn prev_data -> prev_data ++ [data] end)
60-
{:ok, {[], loop}}
62+
loop
6163
end
6264

6365
def stop(%__MODULE__{name: name} = loop) do

lib/sink.ex

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ defmodule Strom.Sink do
1515
See `Strom.Sink.Writeline`, `Strom.Sink.IOPuts`, `Strom.Sink.Null`
1616
"""
1717
@callback start(map) :: map
18-
@callback call(map, term) :: {:ok, {term, map}} | {:error, {term, map}}
18+
@callback call(map, term) :: map | no_return()
1919
@callback stop(map) :: map
2020

2121
use GenServer
@@ -67,12 +67,12 @@ defmodule Strom.Sink do
6767
Map.delete(flow, name)
6868
end
6969

70-
defp async_run_sink(sink, stream) do
70+
defp async_run_sink(%__MODULE__{origin: origin} = sink, stream) do
7171
Task.Supervisor.async_nolink(
7272
{:via, PartitionSupervisor, {Strom.TaskSupervisor, self()}},
7373
fn ->
74-
Stream.transform(stream, sink, fn el, sink ->
75-
call_sink(sink, el)
74+
Stream.transform(stream, sink, fn el, _sink ->
75+
sink = apply(origin.__struct__, :call, [origin, el])
7676
{[], sink}
7777
end)
7878
|> Stream.run()
@@ -82,16 +82,6 @@ defmodule Strom.Sink do
8282
)
8383
end
8484

85-
defp call_sink(%__MODULE__{origin: origin} = sink, data) do
86-
case apply(origin.__struct__, :call, [origin, data]) do
87-
{:ok, {[], origin}} ->
88-
{[], %{sink | origin: origin}}
89-
90-
{:error, {:halt, origin}} ->
91-
{:halt, %{sink | origin: origin}}
92-
end
93-
end
94-
9585
@spec stop(__MODULE__.t()) :: :ok
9686
def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)
9787

@@ -106,8 +96,10 @@ defmodule Strom.Sink do
10696
{:reply, :ok, %{sink | task: task, stream: stream}}
10797
end
10898

109-
def handle_call({:call, data}, _from, %__MODULE__{} = sink) do
110-
{:reply, call_sink(sink, data), sink}
99+
def handle_call({:call, data}, _from, %__MODULE__{origin: origin} = sink) do
100+
origin = apply(origin.__struct__, :call, [origin, data])
101+
sink = %{sink | origin: origin}
102+
{:reply, {[], sink}, sink}
111103
end
112104

113105
def handle_call(:stop, _from, %__MODULE__{origin: origin, task: task} = sink) do

lib/sink/io_puts.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ defmodule Strom.Sink.IOPuts do
1212
def call(%__MODULE__{} = io_puts, data) do
1313
IO.puts(io_puts.prefix <> "#{data}" <> io_puts.line_sep)
1414

15-
{:ok, {[], io_puts}}
15+
io_puts
1616
end
1717

1818
@impl true

lib/sink/null.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ defmodule Strom.Sink.Null do
99
def start(%__MODULE__{} = null), do: null
1010

1111
@impl true
12-
def call(%__MODULE__{} = null, _data), do: {:ok, {[], null}}
12+
def call(%__MODULE__{} = null, _data), do: null
1313

1414
@impl true
1515
def stop(%__MODULE__{} = null), do: null

lib/sink/write_lines.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ defmodule Strom.Sink.WriteLines do
1919
def call(%__MODULE__{} = write_lines, data) do
2020
:ok = IO.write(write_lines.file, data <> write_lines.line_sep)
2121

22-
{:ok, {[], write_lines}}
22+
write_lines
2323
end
2424

2525
@impl true

lib/source.ex

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ defmodule Strom.Source do
2929
"""
3030

3131
@callback start(map) :: map
32-
@callback call(map) :: {:ok, {[term], map}} | {:error, {:halt, map}}
32+
@callback call(map) :: {[term], map} | {:halt, map} | no_return()
3333
@callback stop(map) :: map
3434
@callback infinite?(map) :: true | false
3535

@@ -159,36 +159,20 @@ defmodule Strom.Source do
159159
)
160160
end
161161

162-
defp loop_call(source) do
163-
case call_source(source) do
164-
{:halt, _source} ->
162+
defp loop_call(%__MODULE__{origin: origin} = source) do
163+
case apply(origin.__struct__, :call, [origin]) do
164+
{:halt, _origin} ->
165165
:task_done
166166

167-
{events, source} ->
167+
{events, origin} ->
168168
GenServer.cast(source.pid, {:new_data, events})
169169

170170
receive do
171171
:continue_task ->
172172
flush(:continue_task)
173173
end
174174

175-
loop_call(source)
176-
end
177-
end
178-
179-
defp call_source(%__MODULE__{origin: origin} = source) do
180-
case apply(origin.__struct__, :call, [origin]) do
181-
{:ok, {events, origin}} ->
182-
source = %{source | origin: origin}
183-
{events, source}
184-
185-
{:error, {:halt, origin}} ->
186-
source = %{source | origin: origin}
187-
188-
case apply(origin.__struct__, :infinite?, [origin]) do
189-
true -> {[], source}
190-
false -> {:halt, source}
191-
end
175+
loop_call(%{source | origin: origin})
192176
end
193177
end
194178

@@ -200,8 +184,10 @@ defmodule Strom.Source do
200184
end
201185

202186
@impl true
203-
def handle_call(:call, _from, %__MODULE__{} = source) do
204-
{:reply, call_source(source), source}
187+
def handle_call(:call, _from, %__MODULE__{origin: origin} = source) do
188+
{events, origin} = apply(origin.__struct__, :call, [origin])
189+
source = %{source | origin: origin}
190+
{:reply, {events, source}, source}
205191
end
206192

207193
def handle_call(:stop, _from, %__MODULE__{origin: origin, task: task} = source)

lib/source/io_gets.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ defmodule Strom.Source.IOGets do
1111
@impl true
1212
def call(%__MODULE__{} = io_gets) do
1313
data = IO.gets("IOGets> ")
14-
{:ok, {[String.trim(data)], io_gets}}
14+
{[String.trim(data)], io_gets}
1515
end
1616

1717
@impl true

lib/source/read_lines.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ defmodule Strom.Source.ReadLines do
1212
def call(%__MODULE__{} = read_lines) do
1313
case read_line(read_lines.file) do
1414
{:ok, data} ->
15-
{:ok, {[String.trim(data)], read_lines}}
15+
{[String.trim(data)], read_lines}
1616

1717
{:error, :eof} ->
18-
{:error, {:halt, read_lines}}
18+
{:halt, read_lines}
1919
end
2020
end
2121

test/crash_test.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,11 @@ defmodule Strom.CrashTest do
141141
if String.trim(data) == "4" do
142142
raise "error"
143143
else
144-
{:ok, {[String.trim(data)], read_lines}}
144+
{[String.trim(data)], read_lines}
145145
end
146146

147147
{:error, :eof} ->
148-
{:error, {:halt, read_lines}}
148+
{:halt, read_lines}
149149
end
150150
end
151151

@@ -215,7 +215,7 @@ defmodule Strom.CrashTest do
215215
:ok = IO.write(write_lines.file, data <> write_lines.line_sep)
216216
end
217217

218-
{:ok, {[], write_lines}}
218+
{[], write_lines}
219219
end
220220

221221
@impl true

test/experiments_test.exs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
defmodule Strom.ExperimentsTest do
2+
use ExUnit.Case, async: true
3+
4+
# alias Strom.{Composite, Transformer}
5+
6+
# @tag timeout: :infinity
7+
# test "event speed" do
8+
# :observer.start()
9+
# print_time =
10+
# Transformer.new(:stream, fn event ->
11+
# IO.inspect(:erlang.system_time(:millisecond))
12+
# event
13+
# end)
14+
#
15+
# transformer = Transformer.new(:stream, &(&1 + 1))
16+
# transformers = List.duplicate(transformer, 200_000)
17+
#
18+
# composite =
19+
# [print_time, transformers, print_time]
20+
# |> Composite.new()
21+
# |> Composite.start()
22+
#
23+
# start = :erlang.system_time(:millisecond)
24+
#
25+
# %{stream: [1]}
26+
# |> Composite.call(composite)
27+
# |> Map.get(:stream)
28+
# |> Enum.to_list()
29+
# |> IO.inspect
30+
#
31+
# IO.inspect(:erlang.system_time(:millisecond) - start, label: :total)
32+
# end
33+
#
34+
# test "tasks memory consumption" do
35+
# :observer.start()
36+
# Enum.map(1..1_000_000, fn _i ->
37+
# Task.async(fn -> Process.sleep(50000) end)
38+
# end)
39+
# |> Enum.map(&Task.await(&1, :infinity))
40+
# end
41+
end

0 commit comments

Comments
 (0)