Skip to content

Commit

Permalink
lab akka
Browse files Browse the repository at this point in the history
  • Loading branch information
PietroPizzoccheri committed Nov 27, 2024
1 parent 2a63af9 commit 697495b
Show file tree
Hide file tree
Showing 15 changed files with 460 additions and 28 deletions.
48 changes: 28 additions & 20 deletions NSDS/akka/akka-nsds/src/main/java/com/counter/Counter.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.counter;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -9,30 +10,37 @@

public class Counter {

private static final int numThreads = 10;
private static final int numMessages = 100;
private static final int numThreads = 10;
private static final int numMessages = 100;

public static void main(String[] args) {
public static void main(String[] args) {

final ActorSystem sys = ActorSystem.create("System");
final ActorRef counter = sys.actorOf(CounterActor.props(), "counter");
final ActorSystem sys = ActorSystem.create("System");
final ActorRef counter = sys.actorOf(CounterActor.props(), "counter");

// Send messages from multiple threads in parallel
final ExecutorService exec = Executors.newFixedThreadPool(numThreads);
// Send messages from multiple threads in parallel
final ExecutorService exec = Executors.newFixedThreadPool(numThreads);

for (int i = 0; i < numMessages; i++) {
exec.submit(() -> counter.tell(new SimpleMessage(), ActorRef.noSender()));
}

// Wait for all messages to be sent and received
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
exec.shutdown();
sys.terminate();
Random r = new Random(100);

}
for (int i = 0; i < numMessages; i++) {
int random = r.nextInt(100) + 1;
if (random < 30) {
exec.submit(() -> counter.tell(new IncrementMessage(), ActorRef.noSender()));
} else {
exec.submit(() -> counter.tell(new DecrementMessage(), ActorRef.noSender()));
}
}

// Wait for all messages to be sent and received
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
exec.shutdown();
sys.terminate();

}

}
23 changes: 20 additions & 3 deletions NSDS/akka/akka-nsds/src/main/java/com/counter/CounterActor.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.counter;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithStash;
import akka.actor.Props;

public class CounterActor extends AbstractActor {
public class CounterActor extends AbstractActorWithStash {

private int counter;

Expand All @@ -13,12 +14,28 @@ public CounterActor() {

@Override
public Receive createReceive() {
return receiveBuilder().match(SimpleMessage.class, this::onMessage).build();
return receiveBuilder()
.match(IncrementMessage.class, this::onMessage)
.match(DecrementMessage.class, this::onMessage)
.build();
}

void onMessage(SimpleMessage msg) {
void onMessage(IncrementMessage msg) {
++counter;
System.out.println("Counter increased to " + counter);
if(counter > 0){
unstash();
}
}

void onMessage(DecrementMessage msg){
if(counter <= 0){
stash();
System.out.println("stashed with counter: " + counter);
} else{
counter--;
System.out.println("Counter decreased to " + counter);
}
}

static Props props() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.counter;

public class DecrementMessage {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.counter;

public class IncrementMessage {

}

This file was deleted.

83 changes: 83 additions & 0 deletions NSDS/akka/akka-nsds/src/main/java/com/email/ClientActor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.email;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Cancellable;
import akka.pattern.Patterns;
import com.email.messages.GetMsg;
import com.email.messages.ReplyMsg;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

import java.util.concurrent.TimeUnit;

public class ClientActor extends AbstractActor {

private final ActorRef server;
private final String[] names;
private final String[] additionalNames = {
"Liam", "Noah", "Oliver", "Elijah", "William",
"James", "Benjamin", "Lucas", "Henry", "Alexander"
};
private Cancellable cancellable;

public ClientActor(ActorRef server, String[] names) {
this.server = server;
this.names = names;
}

@Override
public void preStart() {
// Schedule periodic GetMsg messages
cancellable = getContext().system().scheduler().schedule(
Duration.Zero(),
Duration.create(1, TimeUnit.SECONDS),
self(),
new GetMsg(""),
getContext().dispatcher(),
self()
);
}

@Override
public void postStop() {
// Cancel the periodic task when the actor stops
cancellable.cancel();
}

@Override
public AbstractActor.Receive createReceive() {
return receiveBuilder()
.match(GetMsg.class, this::onGetMsg)
.build();
}

private void onGetMsg(GetMsg msg) {
// Generate a new random name each time
String randomName = getRandomName();
Future<Object> future = Patterns.ask(server, new GetMsg(randomName), 5000);
future.onComplete(result -> {
if (result.isSuccess()) {
ReplyMsg reply = (ReplyMsg) result.get();
System.out.println("Received email for " + randomName + ": " + reply.getEmail());
} else {
System.out.println("Failed to get email for: " + randomName);
}
return null;
}, getContext().getDispatcher());
}

private String getRandomName() {
if (Math.random() < 0.5) {
return names[(int) (Math.random() * names.length)];
} else {
return additionalNames[(int) (Math.random() * additionalNames.length)];
}
}

static Props props(ActorRef server, String[] names) {
return Props.create(ClientActor.class, () -> new ClientActor(server, names));
}
}

63 changes: 63 additions & 0 deletions NSDS/akka/akka-nsds/src/main/java/com/email/Launch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.email;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.email.messages.PutMsg;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Launch {

private static final int numThreads = 10;

private static final String[] NAMES = {
"Alice", "Bob", "Charlie", "David", "Eve",
"Frank", "Grace", "Hank", "Ivy", "Jack"
};

public static void main(String[] args) {

final ActorSystem sys = ActorSystem.create("System");
final ActorRef supervisor = sys.actorOf(SupervisorActor.props(), "supervisor");
sys.actorOf(ClientActor.props(supervisor, NAMES), "client");

String[] EMAILS = new String[NAMES.length];

int j = 0;
for (String name : NAMES) {
String email = name.toLowerCase() + "@example.com";
EMAILS[j] = email;
j++;
}

// send messages from multiple threads in parallel
final ExecutorService exec = Executors.newFixedThreadPool(numThreads);

Random r = new Random(100);

for (int k = 0; k < 10; k++) {
int finalK = k;
exec.submit(() -> supervisor.tell(new PutMsg(NAMES[finalK], EMAILS[finalK]), ActorRef.noSender()));
}

// forcing failure
try {
Thread.sleep(r.nextInt(20000));
exec.submit(() -> supervisor.tell(new PutMsg("Fail!", ""), ActorRef.noSender()));
} catch (InterruptedException e) {
e.printStackTrace();
}

// wait for all messages to be sent and received
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
exec.shutdown();
sys.terminate();
}
}
67 changes: 67 additions & 0 deletions NSDS/akka/akka-nsds/src/main/java/com/email/ServerActor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.email;

import akka.actor.AbstractActor;
import akka.actor.Props;
import com.email.messages.GetMsg;
import com.email.messages.PutMsg;
import com.email.messages.ReplyMsg;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Random;

public class ServerActor extends AbstractActor {

Map<String, String> contactList;
Random r = new Random();

public ServerActor() {
this.contactList = new HashMap<>();
}

@Override
public AbstractActor.Receive createReceive() {
return receiveBuilder()
.match(PutMsg.class, this::onMessage)
.match(GetMsg.class, this::onMessage)
.build();
}

@Override
public void preRestart(Throwable reason, Optional<Object> message) throws Exception {
super.preRestart(reason, message);
System.out.println("ServerActor is about to restart with map: " + (contactList.isEmpty() ? "empty map" : contactList));
}

@Override
public void postRestart(Throwable reason) throws Exception {
super.postRestart(reason);
System.out.println("ServerActor restarted with map: " + (contactList.isEmpty() ? "empty map" : contactList));

}

void onMessage(PutMsg msg) throws Exception {
//System.out.println("ServerActor received PutMsg: " + msg.getName() + " -> " + msg.getEmail());
if ("Fail!".equals(msg.getName())) {
System.out.println("ServerActor forced failure for name: " + msg.getName());
int random = r.nextInt(100);
if(random>50){
throw new RuntimeException("forced failure to resume");
} else {
throw new Exception("forced failure to restart");
}

}
contactList.put(msg.getName(), msg.getEmail());
}

void onMessage(GetMsg msg) {
//System.out.println("ServerActor received GetMsg: " + msg.getName());
sender().tell(new ReplyMsg(contactList.get(msg.getName())), self());
}

static Props props() {
return Props.create(ServerActor.class);
}
}
50 changes: 50 additions & 0 deletions NSDS/akka/akka-nsds/src/main/java/com/email/SupervisorActor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.email;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.OneForOneStrategy;
import akka.japi.pf.DeciderBuilder;

import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.resume;

public class SupervisorActor extends AbstractActor {

private final ActorRef serverActor = getContext().actorOf(ServerActor.props(), "serverActor");

private static SupervisorStrategy strategy =
new OneForOneStrategy(
10,
java.time.Duration.ofMinutes(1),
DeciderBuilder
.match(RuntimeException.class, e -> {
System.out.println("SupervisorActor: Resuming on RuntimeException");
return resume();
})
.matchAny(o -> {
System.out.println("SupervisorActor: Restarting on unknown exception");
return restart();
})
.build());

@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}

@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(msg -> {
//System.out.println("SupervisorActor forwarding message: " + msg);
serverActor.forward(msg, getContext());
})
.build();
}

public static Props props() {
return Props.create(SupervisorActor.class);
}
}
Loading

0 comments on commit 697495b

Please sign in to comment.