Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update dev stream #12

Merged
merged 13 commits into from
Jun 6, 2024
5 changes: 4 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ allprojects {
apply(plugin = "maven-publish")

group = "dev.httpmarco"
version = "1.1.19-SNAPSHOT"
version = "1.2.5-SNAPSHOT"

repositories {
mavenCentral()
Expand All @@ -22,6 +22,9 @@ allprojects {

implementation(rootProject.libs.annotations)
annotationProcessor(rootProject.libs.annotations)

testAnnotationProcessor(rootProject.libs.lombok)
testImplementation(rootProject.libs.lombok)
}

tasks.withType<JavaCompile> {
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ nexusPublish = "2.0.0"
annotations = "24.1.0"
lombok = "1.18.32"

gson = "2.10.1"
gson = "2.11.0"

#networking
netty5 = "5.0.0.Alpha5"
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Tue Jan 23 09:21:27 CET 2024
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.8-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
14 changes: 0 additions & 14 deletions osgan-kubernetes/build.gradle.kts

This file was deleted.

1 change: 1 addition & 0 deletions osgan-netty/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dependencies {
api(project(":osgan-files"))
api(project(":osgan-reflections"))

testImplementation(libs.gson)
testImplementation(platform("org.junit:junit-bom:5.10.2"))
testImplementation("org.junit.jupiter:junit-jupiter")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,46 +1,34 @@
package dev.httpmarco.osgan.networking;

import dev.httpmarco.osgan.files.json.JsonObjectSerializer;
import dev.httpmarco.osgan.networking.listening.ChannelPacketListener;
import dev.httpmarco.osgan.networking.request.PacketResponder;
import dev.httpmarco.osgan.networking.request.RequestHandler;
import dev.httpmarco.osgan.utils.executers.FutureResult;
import io.netty5.channel.Channel;
import io.netty5.channel.EventLoopGroup;
import io.netty5.util.concurrent.FutureListener;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

@Getter
@Accessors(fluent = true)
public abstract class CommunicationComponent<M extends Metadata> {
public abstract class CommunicationComponent extends CommunicationListener {

@Setter
@Getter(AccessLevel.PROTECTED)
@Setter(AccessLevel.PROTECTED)
private FutureResult<Void> connectionFuture = new FutureResult<>();

private final M metadata;
@Getter(AccessLevel.PROTECTED)
private final EventLoopGroup bossGroup;
private final Map<Class<? extends Packet>, List<ChannelPacketListener<? extends Packet>>> packetListeners = new HashMap<>();
private final RequestHandler requestHandler;
@Getter(AccessLevel.PROTECTED)
private final String hostname;
@Getter(AccessLevel.PROTECTED)
private final int port;

public CommunicationComponent(M metadata, int workerThreads) {
this.bossGroup = NetworkUtils.createEventLoopGroup(workerThreads);
this.metadata = metadata;
this.requestHandler = new RequestHandler(this);
public CommunicationComponent(int bossGroupThreads, String hostname, int port) {
this.bossGroup = CommunicationNetworkUtils.createEventLoopGroup(bossGroupThreads);
this.hostname = hostname;
this.port = port;
}

public abstract <P extends Packet> void sendPacket(P packet);

public abstract <P extends Packet> void sendPacket(Channel channel, P packet);

public abstract <P extends Packet> void redirectPacket(String id, P packet);
public abstract void initialize();

public FutureListener<? super Channel> handleConnectionRelease() {
return it -> {
Expand All @@ -65,25 +53,4 @@ public void close() {
bossGroup.shutdownGracefully();
}

public void callPacketReceived(ChannelTransmit transmit, Packet packet) {
if (this.packetListeners.containsKey(packet.getClass())) {
this.packetListeners.get(packet.getClass()).forEach(it -> it.listenWithMapping(transmit, packet));
}
}

public <P extends Packet> void listen(Class<P> packetClass, ChannelPacketListener<P> listener) {
this.packetListeners.computeIfAbsent(packetClass, it -> new ArrayList<>()).add(listener);
}

public <T extends Packet> void request(String id, Class<T> responsePacket, Consumer<T> consumer) {
this.requestHandler.request(id, responsePacket, consumer);
}

public <T extends Packet> void request(String id, JsonObjectSerializer properties, Class<T> responsePacket, Consumer<T> consumer) {
this.requestHandler.request(id, properties, responsePacket, consumer);
}

public <T extends Packet> void registerResponder(String id, PacketResponder<T> responder) {
this.requestHandler.registerResponder(id, responder);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package dev.httpmarco.osgan.networking;

import dev.httpmarco.osgan.networking.channel.ChannelTransmit;
import dev.httpmarco.osgan.networking.packet.BadRequestPacket;
import dev.httpmarco.osgan.networking.packet.Packet;
import dev.httpmarco.osgan.networking.packet.RequestPacket;
import dev.httpmarco.osgan.networking.packet.RequestResponsePacket;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.jetbrains.annotations.NotNull;

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

@Getter
@Accessors(fluent = true)
public abstract class CommunicationListener {

private final Map<Class<? extends Packet>, List<BiConsumer<ChannelTransmit, Packet>>> listeners = new HashMap<>();
private final Map<String, Function<CommunicationProperty, Packet>> responders = new HashMap<>();
private final Map<UUID, Consumer<Packet>> requests = new HashMap<>();

@SuppressWarnings("unchecked")
public <P extends Packet> void listen(Class<P> listeningClass, BiConsumer<ChannelTransmit, P> packetCallback) {
var packetListeners = listeners.getOrDefault(listeningClass, new ArrayList<>());
packetListeners.add((BiConsumer<ChannelTransmit, Packet>) packetCallback);
listeners.put(listeningClass, packetListeners);
}

public <P extends Packet> void listen(Class<P> listeningClass, Consumer<P> packetCallback) {
this.listen(listeningClass, (channelTransmit, packet) -> packetCallback.accept(packet));
}

@SuppressWarnings("unchecked")
public <P extends Packet> void request(String id, CommunicationProperty property, Class<P> packet, Consumer<P> packetCallback) {
var uuid = UUID.randomUUID();
this.requests.put(uuid, (Consumer<Packet>) packetCallback);
sendPacket(new RequestPacket(id, uuid, property));
}

public <P extends Packet> void request(String id, Class<P> packet, Consumer<P> packetCallback) {
request(id, new CommunicationProperty(), packet, packetCallback);
}


public void callResponder(ChannelTransmit channelTransmit, @NotNull RequestPacket requestPacket) {
if (!responders.containsKey(requestPacket.id())) {
channelTransmit.sendPacket(new BadRequestPacket(requestPacket.uuid()));
System.out.println("Found no responder for: " + requestPacket.id());
return;
}

var response = responders.get(requestPacket.id()).apply(requestPacket.property());
channelTransmit.sendPacket(new RequestResponsePacket(requestPacket.uuid(), response));
}

public void responder(String id, Function<CommunicationProperty, Packet> packetFunction) {
this.responders.put(id, packetFunction);
}

public <P extends Packet> boolean call(@NotNull P packet, ChannelTransmit channelTransmit) {
if (packet instanceof RequestPacket requestPacket) {
this.callResponder(channelTransmit, requestPacket);
return true;
}

if (packet instanceof RequestResponsePacket requestResponsePacket) {
if (!this.requests.containsKey(requestResponsePacket.uuid())) {
return true;
}
this.requests.get(requestResponsePacket.uuid()).accept(requestResponsePacket.response());
this.requests.remove(requestResponsePacket.uuid());
return true;
}

if (packet instanceof BadRequestPacket badRequestPacket) {
this.requests.remove(badRequestPacket.uuid());
System.out.println("Invalid request from: " + badRequestPacket.uuid());
return true;
}

if (!this.listeners.containsKey(packet.getClass())) {
return false;
}

for (var consumer : this.listeners.get(packet.getClass())) {
consumer.accept(channelTransmit, packet);
}
return true;
}

public abstract void sendPacket(Packet packet);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.jetbrains.annotations.NotNull;

@UtilityClass
public class NetworkUtils {
public class CommunicationNetworkUtils {

@Contract("_ -> new")
public static @NotNull MultithreadEventLoopGroup createEventLoopGroup(int threads) {
Expand All @@ -31,4 +31,5 @@ public class NetworkUtils {
public static ServerChannelFactory<? extends ServerChannel> generateChannelFactory() {
return Epoll.isAvailable() ? EpollServerSocketChannel::new : NioServerSocketChannel::new;
}

}
Loading
Loading