-
Notifications
You must be signed in to change notification settings - Fork 8
/
AkkaStream.scala
347 lines (294 loc) · 10.9 KB
/
AkkaStream.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
package stream
import akka.Done
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.junit.Test
import scala.collection.immutable._
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
/**
* Created by pabloperezgarcia on 25/01/2017.
*
* Akka stream provide all the operators and features that other reactive libraries provide as ReactiveX or Flow
* Having those feature we can not only consume the message in a much better way but also and really important
* have back-pressure mechanism which will prevent OutOfMemory problems in our system
*
* In Akka Stream we have three elements:
* [Source]~>[Flow]~>[Sink]
*
* Source: Origin of data, it could be multiple inputs
* Flow: This part of the stream is where we transform the data that we introduce in our pipeline
* Sink: This is where we receive all data once that has been processed in order to be printed,
* passed to another source and so once with "via"
*/
class AkkaStream {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
@Test def sourceToFlowToSink(): Unit = {
val source = Source(List("Akka", "Kafka", "Afakka", "Kakfta"))
val flow = Flow[String]
.map(word => word.toUpperCase)
.filter(word => word.contains("AKKA"))
val sink = Sink.foreach(println)
source via flow to sink run()
//Just to wait for the end of the execution
Thread.sleep(1000)
}
/**
* Flow is normally the glue between Source and sink, in Flow we define some operators where our items will pass over
* the pipeline. Making in this way, we can define some independent flows to be reused and avoid break DRY
* It´s even possible zip multiple flows in the same pipeline.
*/
@Test def flow(): Unit = {
val doubleFlow = Flow[Int]
.map(_ * 2)
.filter(_ < 10)
Await.ready(Source(0 to 10)
.via(doubleFlow)
.runWith(Sink.foreach(value => println(value))), 5 seconds)
}
/**
* Using mapAsync operator, we pass a function which return a Future, the number of parallel run futures will
* be determine by the argument passed to the operator.
*/
@Test def mapAsync(): Unit = {
Source(0 to 10)
.mapAsync(2) { value =>
implicit val ec: ExecutionContext = ActorSystem().dispatcher
Future {
Thread.sleep(500)
println(s"Process in Thread:${Thread.currentThread().getName}")
value
}
}
.runWith(Sink.foreach(value => println(s"Item emitted:$value in Thread:${Thread.currentThread().getName}")))
}
/**
* FlatMapConcat works like flatMap in Rx but sequentially since concat one emition to the next,
* it create a new Line of execution(Source) and once it finish
* It flat all the result to be emitted in the pipeline.
*/
@Test def flatMapConcat(): Unit = {
Await.ready(Source(0 to 10)
.flatMapConcat(value => Source.single(value)
.map(value => value * 10))
.runForeach(item => println(s"Item:$item")), 5 seconds)
}
/**
* FlatMapMerge works like flatMap in Rx all source run in parallel.
* Once that finish all source flatMap merge all results.
* it create a new Line of execution(Source) and once it finish
* It flat all the result to be emitted in the pipeline.
*/
@Test def flatMapMerge(): Unit = {
Await.result(Source(0 to 10)
.via(requestFlow)
.runForeach(res => println(s"Item emitted:$res")), 5 seconds)
def requestFlow = Flow[Int]
.flatMapMerge(10, resNumber => Source.single(resNumber)
.map(res => res * 100))
}
/**
* Zip operator allow you to merge together into a Vector multiple Sources
*/
@Test def zipWith(): Unit = {
val sources: Seq[Source[Char, _]] = Seq(Source("h"), Source("e"), Source("l"), Source("l"), Source("o"))
Await.ready(Source.zipN[Char](sources)
.map(vector => vector.scan(new String)((b, b1) => mergeCharacters(b, b1)).last).async
.runForeach(word => println(word)), 5 seconds)
}
private def mergeCharacters(b: Any, b1: Any) = {
b.asInstanceOf[String].concat(b1.asInstanceOf[Char].toString)
}
/**
* Adding async operator after another operator mark that all previous operator of the pipeline
* must be executed in another thread, then the sink(subscriber) will consume the events in the main thread.
*/
@Test def async(): Unit = {
val runWith = Source(0 to 10)
.map(value => {
println(s"Execution 1 ${Thread.currentThread().getName}")
value * 100
})
.map(value => {
println(s"Execution 2 ${Thread.currentThread().getName}")
value * 100
}).async
.runWith(Sink.foreach(value => println(s"Item emitted:$value in Thread:${Thread.currentThread().getName}")))
runWith
Thread.sleep(10000)
}
/**
* The way to get the value of the source onces start emitting (run) it´s made by lazy operator, which inside
* we need to pass a function with the Source that we want to create
*/
@Test def defer(): Unit = {
var range = 0 to 10
val graphs = Source.lazily(() => Source(range))
.to(Sink.foreach(println))
range = 10 to 20
graphs.run()
Thread.sleep(2000)
}
/**
* Tick operator is like interval in Rx it will repeat the emittion of item with an initial delay
* and an internal delay
*/
@Test def tick(): Unit = {
val runnableGraph = Source.tick(0 seconds, 1 seconds, "Tick")
.map(value => value.toUpperCase)
.to(Sink.foreach(value => println(s"item emitted:$value")))
runnableGraph.run()
}
/**
* Create a `Source` that will continually emit the given element.
*
* Delay operator will delay the emittion of the item in the pipeline the time specify in the operator
*/
@Test def repeat(): Unit = {
Source.repeat("Repeat")
.delay(500 millisecond)
.map(value => value.toUpperCase)
.to(Sink.foreach(value => println(s"item emitted:$value")))
.run()
}
/**
* TakeWhile operator will emitt items while the predicate function return true.
* You can achieve the same result with filter+take operators
*/
@Test def takeWhile(): Unit = {
Await.result(Source(0 to 10)
.takeWhile(n => n < 5)
.runForeach(value => println(s"Item emitted:$value"))
, 5 seconds)
}
/**
* DropWhile operator will drop items while the predicate function return true.
* You can achieve the same result with filter+drop operators
*/
@Test def dropWhile(): Unit = {
Await.result(Source(0 to 10)
.dropWhile(n => n < 5)
.runForeach(value => println(s"Item emitted:$value"))
, 5 seconds)
}
/**
* Group the emission of items in groups define in the operator, in this case since start in 0
* and we group per 5 items we should end up having 3 collections.
*/
@Test def group(): Unit = {
Await.result(Source(0 to 10)
.grouped(5)
.runForeach(value => println(s"Item emitted:$value"))
, 5 seconds)
}
/**
* When receive an item create a collection with the item emitted and wait to collect the number of
* items specify in the operator.
* In this case since we specify 5 we should print:
* (0, 1, 2, 3, 4)
* (1, 2, 3, 4, 5)
* (2, 3, 4, 5, 6)
* (3, 4, 5, 6, 7)
* (4, 5, 6, 7, 8)
* (5, 6, 7, 8, 9)
* (6, 7, 8, 9, 10)
*/
@Test def sliding(): Unit = {
Await.result(Source(0 to 10)
.sliding(5)
.runForeach(value => println(s"Item emitted:$value"))
, 5 seconds)
}
/**
* Scan operator get the item emitted and it´s added into a collection to be passed in every emission.
*/
@Test def scan(): Unit = {
Await.ready(Source(0 to 10)
.scan(List[Int]())((list, item) => list.::(item))
.runForeach(list => println(s"List:$list")), 5 seconds)
}
val start = System.nanoTime()
/**
* Intersperse operator allow you the possibility to attach items in the emission of items in the pipeline
* Here for instance we add an item at the beginning of the emission, per item emitted and once we finish
*/
@Test def intersperse(): Unit = {
Await.ready(Source(0 to 10)
.map(_.toString)
.intersperse("Start", separatorFunction.apply(), "End")
.runForeach(list => println(s"List:$list")), 5 seconds)
}
/**
* Operator that allow us to redirect the flow to another sink but still continue with the flow of the stream
*/
@Test def alsoTo(): Unit = {
val sink = Sink.foreach(println)
val flow = Flow[String]
.alsoTo(Sink.foreach(s => println(s"Input:$s")))
.map(s => s.toUpperCase)
.filter(_.contains("AKKA"))
val source = Source(List("Akka", "Kafka", "Afakka", "Kakfta"))
source via flow to sink run()
//Just to wait for the end of the execution
Thread.sleep(1000)
}
/**
* One of the most important things about akka stream is how easy is create a DSL for user than can be reuse in their
* applications, we just need to provide some lego pieces and they just need to put it together
*/
@Test def generateDSL(): Unit = {
val year = 2017
transactions via filterTransactionBefore(year) to report(s"Transaction before $year") run()
//Just to wait for the end of the execution
Thread.sleep(5000)
}
type Transaction = (Int, String)
val transactions = Source(List((2014, "Coca-cola"),
(2012, "Levis"),
(2016, "Burrito"),
(2008, "M&M"),
(2005, "Playstation 2"),
(2017, "Playstation 4")))
def filterTransactionBefore(year: Int) = Flow[Transaction]
.filter(transaction => transaction._1 < year)
def report(name: String) = Sink.foreach[Transaction](transaction => println(s"Report $name ${transaction._2}"))
private def separatorFunction = {
() => "******************+"
}
/**
* Using mapAsync operator, we pass a function which return a Future, the number of parallel run futures will
* be determine by the argument passed to the operator.
*/
@Test def readAsync(): Unit = {
Source(0 to 10) //-->Your files
.mapAsync(5) { value => //-> It will run in parallel 5 reads
implicit val ec: ExecutionContext = ActorSystem().dispatcher
Future {
//Here read your file
Thread.sleep(500)
println(s"Process in Thread:${Thread.currentThread().getName}")
value
}
}
.runWith(Sink.foreach(value => println(s"Item emitted:$value in Thread:${Thread.currentThread().getName}")))
}
@Test def pipeline(): Unit = {
val source = Source[String](List("Hello", "Akka", "foo", "Stream"))
val flow = Flow[String]
.delay(100 millis)
.filter(word => word != "foo")
.map(s => s.toUpperCase)
.recover[String]({
case _ => "Error processing element in pipeline"
})
val sink: Sink[String, Future[Done]] =
Sink.foreach(word => println(s"Item emitted:$word"))
source
.via(flow)
.runWith(sink)
//Just to wait for the end of the execution
Thread.sleep(1000)
}
}