-
Notifications
You must be signed in to change notification settings - Fork 0
/
CollapsingWithAkka.java
97 lines (83 loc) · 3.69 KB
/
CollapsingWithAkka.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package challenge4.akka_actor;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import externalLegacyCodeNotUnderOurControl.PriceService;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
import static externalLegacyCodeNotUnderOurControl.PrintlnWithThreadname.println;
import static java.lang.Thread.sleep;
/**
* Collapse _equal_ requests when sending to the {@link PriceServiceActor}.
* <pre>
* [current "book" price?] -> -> [println "book" price]
* \ /
* [current "book" price?] -> [PriceService1::getPrice] -> [println "book" price]
*
* </pre>
*/
public class CollapsingWithAkka {
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create("MySystem");
final ActorRef priceServiceActor = system.actorOf(Props.create(PriceServiceActor.class, new PriceService(4)));
final ActorRef priceFacade = system.actorOf(Props.create(PriceFacade.class, priceServiceActor));
final ActorRef bookSimulator = system.actorOf(Props.create(ClientSimulator.class, priceFacade, "Book"));
final ActorRef toySimulator = system.actorOf(Props.create(ClientSimulator.class, priceFacade, "Toy"));
sleep(10_000);
system.shutdown();
println("Main thread done.");
}
/**
* Collapses {@link PriceRequest}s and returns the same result to all requesters for the same product.
*/
private static class PriceFacade extends AbstractActor {
Multimap<PriceRequest, ActorRef> requesters = ArrayListMultimap.create();
public PriceFacade(ActorRef priceServiceActor) {
receive(ReceiveBuilder.
match(PriceRequest.class, request -> {
println("PriceFacade: " + request);
boolean firstRequest = !requesters.containsKey(request);
requesters.put(request, sender());
if (firstRequest) {
priceServiceActor.tell(request, self());
}
}).
match(PriceEnvelope.class, priceEnvelope -> {
println("PriceFacade: response: " + priceEnvelope);
for (ActorRef requester : requesters.removeAll(priceEnvelope.priceRequest)) {
requester.tell(priceEnvelope.price, self());
}
}).
build()
);
}
}
/**
* There is one simulator per Product, which sends requesters regularly.
*/
public static class ClientSimulator extends AbstractActor {
private ActorRef priceFacade;
private String productName;
public ClientSimulator(ActorRef priceFacade, String productName) {
this.priceFacade = priceFacade;
this.productName = productName;
receive(ReceiveBuilder.
match(Price.class, price -> {
println("ClientSimulator " + productName + ": " + price);
}).
build());
}
@Override
public void preStart() throws Exception {
PriceRequest request = new PriceRequest(productName);
context().system().scheduler().schedule(Duration.Zero(),
Duration.create(1, TimeUnit.SECONDS),
priceFacade, request,
context().system().dispatcher(), self());
}
}
}