@@ -35,6 +35,52 @@ defmodule Strom.SinkTest do
35
35
|> Source . call ( source ( ) )
36
36
|> Sink . call ( sink )
37
37
38
+ Sink . stop ( sink )
38
39
assert File . read! ( "test/data/orders.csv" ) <> "\n " == File . read! ( "test/data/output.csv" )
39
40
end
41
+
42
+ describe "custom sink" do
43
+ alias Strom . { Mixer , Transformer , Composite }
44
+
45
+ defmodule SlowSink do
46
+ @ behaviour Strom.Sink
47
+
48
+ defstruct [ ]
49
+
50
+ @ impl true
51
+ def start ( % __MODULE__ { } ) , do: % __MODULE__ { }
52
+
53
+ @ impl true
54
+ def stop ( % __MODULE__ { } ) , do: % __MODULE__ { }
55
+
56
+ @ impl true
57
+ def call ( % __MODULE__ { } , _data ) do
58
+ Process . sleep ( 10_000 )
59
+ % __MODULE__ { }
60
+ end
61
+ end
62
+
63
+ def build_composite do
64
+ mixer = Mixer . new ( [ :num1 , :num2 ] , :numbers )
65
+ plus_one = Transformer . new ( :numbers , & ( & 1 + 1 ) )
66
+ sink = Sink . new ( :numbers , % SlowSink { } )
67
+
68
+ [ mixer , plus_one , sink ]
69
+ |> Composite . new ( )
70
+ |> Composite . start ( )
71
+ end
72
+
73
+ test "slow sink" do
74
+ composite = build_composite ( )
75
+ flow = % { num1: 1 .. 10_000 , num2: 20_000 .. 30_000 }
76
+ Composite . call ( flow , composite )
77
+
78
+ Process . sleep ( 100 )
79
+
80
+ mixer = Enum . find ( composite . components , & is_struct ( & 1 , Mixer ) )
81
+ assert length ( :sys . get_state ( mixer . pid ) . data [ :numbers ] ) == 1001
82
+ transformer = Enum . find ( composite . components , & is_struct ( & 1 , Transformer ) )
83
+ assert length ( :sys . get_state ( transformer . pid ) . data [ :numbers ] ) == 1000
84
+ end
85
+ end
40
86
end
0 commit comments