Skip to content

Commit

Permalink
Merge branch 'PietroPizzoccheri:main' into barde
Browse files Browse the repository at this point in the history
  • Loading branch information
omgbarde authored Nov 27, 2024
2 parents 2dd75be + 697495b commit 45ad32b
Show file tree
Hide file tree
Showing 21 changed files with 958 additions and 0 deletions.
53 changes: 53 additions & 0 deletions NSDS/akka/akka-nsds/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>hello-akka-java</groupId>
<artifactId>AkkaNSDS</artifactId>
<version>1.0</version>

<properties>
<akka.version>2.7.0</akka.version>
</properties>

<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor-typed_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor-testkit-typed_2.13</artifactId>
<version>${akka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
46 changes: 46 additions & 0 deletions NSDS/akka/akka-nsds/src/main/java/com/counter/Counter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.counter;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;

public class Counter {

private static final int numThreads = 10;
private static final int numMessages = 100;

public static void main(String[] args) {

final ActorSystem sys = ActorSystem.create("System");
final ActorRef counter = sys.actorOf(CounterActor.props(), "counter");

// Send messages from multiple threads in parallel
final ExecutorService exec = Executors.newFixedThreadPool(numThreads);

Random r = new Random(100);

for (int i = 0; i < numMessages; i++) {
int random = r.nextInt(100) + 1;
if (random < 30) {
exec.submit(() -> counter.tell(new IncrementMessage(), ActorRef.noSender()));
} else {
exec.submit(() -> counter.tell(new DecrementMessage(), ActorRef.noSender()));
}
}

// Wait for all messages to be sent and received
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
exec.shutdown();
sys.terminate();

}

}
45 changes: 45 additions & 0 deletions NSDS/akka/akka-nsds/src/main/java/com/counter/CounterActor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.counter;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithStash;
import akka.actor.Props;

public class CounterActor extends AbstractActorWithStash {

private int counter;

public CounterActor() {
this.counter = 0;
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(IncrementMessage.class, this::onMessage)
.match(DecrementMessage.class, this::onMessage)
.build();
}

void onMessage(IncrementMessage msg) {
++counter;
System.out.println("Counter increased to " + counter);
if(counter > 0){
unstash();
}
}

void onMessage(DecrementMessage msg){
if(counter <= 0){
stash();
System.out.println("stashed with counter: " + counter);
} else{
counter--;
System.out.println("Counter decreased to " + counter);
}
}

static Props props() {
return Props.create(CounterActor.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.counter;

public class DecrementMessage {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.counter;

public class IncrementMessage {

}
83 changes: 83 additions & 0 deletions NSDS/akka/akka-nsds/src/main/java/com/email/ClientActor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.email;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Cancellable;
import akka.pattern.Patterns;
import com.email.messages.GetMsg;
import com.email.messages.ReplyMsg;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

import java.util.concurrent.TimeUnit;

public class ClientActor extends AbstractActor {

private final ActorRef server;
private final String[] names;
private final String[] additionalNames = {
"Liam", "Noah", "Oliver", "Elijah", "William",
"James", "Benjamin", "Lucas", "Henry", "Alexander"
};
private Cancellable cancellable;

public ClientActor(ActorRef server, String[] names) {
this.server = server;
this.names = names;
}

@Override
public void preStart() {
// Schedule periodic GetMsg messages
cancellable = getContext().system().scheduler().schedule(
Duration.Zero(),
Duration.create(1, TimeUnit.SECONDS),
self(),
new GetMsg(""),
getContext().dispatcher(),
self()
);
}

@Override
public void postStop() {
// Cancel the periodic task when the actor stops
cancellable.cancel();
}

@Override
public AbstractActor.Receive createReceive() {
return receiveBuilder()
.match(GetMsg.class, this::onGetMsg)
.build();
}

private void onGetMsg(GetMsg msg) {
// Generate a new random name each time
String randomName = getRandomName();
Future<Object> future = Patterns.ask(server, new GetMsg(randomName), 5000);
future.onComplete(result -> {
if (result.isSuccess()) {
ReplyMsg reply = (ReplyMsg) result.get();
System.out.println("Received email for " + randomName + ": " + reply.getEmail());
} else {
System.out.println("Failed to get email for: " + randomName);
}
return null;
}, getContext().getDispatcher());
}

private String getRandomName() {
if (Math.random() < 0.5) {
return names[(int) (Math.random() * names.length)];
} else {
return additionalNames[(int) (Math.random() * additionalNames.length)];
}
}

static Props props(ActorRef server, String[] names) {
return Props.create(ClientActor.class, () -> new ClientActor(server, names));
}
}

63 changes: 63 additions & 0 deletions NSDS/akka/akka-nsds/src/main/java/com/email/Launch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.email;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.email.messages.PutMsg;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Launch {

private static final int numThreads = 10;

private static final String[] NAMES = {
"Alice", "Bob", "Charlie", "David", "Eve",
"Frank", "Grace", "Hank", "Ivy", "Jack"
};

public static void main(String[] args) {

final ActorSystem sys = ActorSystem.create("System");
final ActorRef supervisor = sys.actorOf(SupervisorActor.props(), "supervisor");
sys.actorOf(ClientActor.props(supervisor, NAMES), "client");

String[] EMAILS = new String[NAMES.length];

int j = 0;
for (String name : NAMES) {
String email = name.toLowerCase() + "@example.com";
EMAILS[j] = email;
j++;
}

// send messages from multiple threads in parallel
final ExecutorService exec = Executors.newFixedThreadPool(numThreads);

Random r = new Random(100);

for (int k = 0; k < 10; k++) {
int finalK = k;
exec.submit(() -> supervisor.tell(new PutMsg(NAMES[finalK], EMAILS[finalK]), ActorRef.noSender()));
}

// forcing failure
try {
Thread.sleep(r.nextInt(20000));
exec.submit(() -> supervisor.tell(new PutMsg("Fail!", ""), ActorRef.noSender()));
} catch (InterruptedException e) {
e.printStackTrace();
}

// wait for all messages to be sent and received
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
exec.shutdown();
sys.terminate();
}
}
67 changes: 67 additions & 0 deletions NSDS/akka/akka-nsds/src/main/java/com/email/ServerActor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.email;

import akka.actor.AbstractActor;
import akka.actor.Props;
import com.email.messages.GetMsg;
import com.email.messages.PutMsg;
import com.email.messages.ReplyMsg;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Random;

public class ServerActor extends AbstractActor {

Map<String, String> contactList;
Random r = new Random();

public ServerActor() {
this.contactList = new HashMap<>();
}

@Override
public AbstractActor.Receive createReceive() {
return receiveBuilder()
.match(PutMsg.class, this::onMessage)
.match(GetMsg.class, this::onMessage)
.build();
}

@Override
public void preRestart(Throwable reason, Optional<Object> message) throws Exception {
super.preRestart(reason, message);
System.out.println("ServerActor is about to restart with map: " + (contactList.isEmpty() ? "empty map" : contactList));
}

@Override
public void postRestart(Throwable reason) throws Exception {
super.postRestart(reason);
System.out.println("ServerActor restarted with map: " + (contactList.isEmpty() ? "empty map" : contactList));

}

void onMessage(PutMsg msg) throws Exception {
//System.out.println("ServerActor received PutMsg: " + msg.getName() + " -> " + msg.getEmail());
if ("Fail!".equals(msg.getName())) {
System.out.println("ServerActor forced failure for name: " + msg.getName());
int random = r.nextInt(100);
if(random>50){
throw new RuntimeException("forced failure to resume");
} else {
throw new Exception("forced failure to restart");
}

}
contactList.put(msg.getName(), msg.getEmail());
}

void onMessage(GetMsg msg) {
//System.out.println("ServerActor received GetMsg: " + msg.getName());
sender().tell(new ReplyMsg(contactList.get(msg.getName())), self());
}

static Props props() {
return Props.create(ServerActor.class);
}
}
Loading

0 comments on commit 45ad32b

Please sign in to comment.