-
Notifications
You must be signed in to change notification settings - Fork 8
/
Master.scala
116 lines (100 loc) · 3.55 KB
/
Master.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package actor_system.actor
import akka.NotUsed
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.routing.RoundRobinPool
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.typesafe.config.ConfigFactory
import actor_system.message._
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
/**
* Created by pabloperezgarcia on 18/12/2016.
*
* As a mster Actor the responsibility of this actor, it´s just to distribute the job in the number of workers
* that we have configure in the pool
* The mailbox of workers are configure to use ACK, which means that if we dont receive an ack from the worker
* we will save the message in the dead letter box and we will print a warning log
*/
class Master(nrOfWorkers: Int, numberOfMessages: Int, numberOfElements: Int, listener: ActorRef) extends Actor {
var workerResult: String = ""
var numberOfResults: Int = _
val start: Long = System.currentTimeMillis
implicit val materializer = ActorMaterializer()
/**
* Factory router which will create a worker once it´s invoked to process a message
* This worker actor will use the ActorSystem mailbox configuration
*/
val workerRouter: ActorRef = getAckActorSystem.actorOf(
Props[Worker]
.withDispatcher("peek-dispatcher")
.withRouter(RoundRobinPool(nrOfWorkers)), name = "workerRouter")
/**
* We create an ActorSystem with an mailbox configuration to use retries system and ack
* @return
*/
private def getAckActorSystem = {
ActorSystem("AckSystem", ConfigFactory.parseString(
"""
peek-dispatcher {
mailbox-type = "akka.contrib.mailbox.PeekMailboxType"
max-retries = 2
}
"""))
}
/**
* Receive Partial function it´s the only function that we need to implement once we extend Actor class
* This partial function will receive the mailbox messge and it will use chain of responsability to deliver
* in the proper place using patter matching.
* @return
*/
def receive: PartialFunction[Any, Unit] = {
case RunWorkersMsg =>
runWorkers()
case ResultMsg(value) =>
processWorkerResult(value)
}
/**
* Using Akka stream we will iterate over the number of message and we will send one per worker until we
* will reach the max worker pool, the mechanism to distribute message through the nodes it will be using
* round robin mechanism.
* @return
*/
private def runWorkers() = {
Source(0 to numberOfMessages)
.runForeach(number => workerRouter ! WorkMsg(number, numberOfElements))
}
private def processWorkerResult(future: Future[String]) = {
implicit val ec: ExecutionContext = ActorSystem().dispatcher
future.onComplete(value => {
numberOfResults += 1
workerResult = workerResult.concat("\n").concat(value.get)
if (numberOfResults == numberOfMessages) {
processAllResult()
}
})
}
private def processAllResult() = {
getFlow.run()
}
private def getFlow = {
val source = Source.single(workerResult)
val sink: Sink[Any, NotUsed] = getSink
source to sink
}
private def getSink = {
Sink.onComplete(result => {
Source.single(result)
.filter(r => r.isSuccess)
.map(r => r.get.toString.toUpperCase)
.runForeach(done => {
println(done)
callListener
context.stop(self)
})
})
}
private def callListener = {
listener ! AllResultMsg(workerResult, duration = Duration(System.currentTimeMillis - start, "millis"))
}
}