Skip to content

Commit

Permalink
- Code cleanup.
Browse files Browse the repository at this point in the history
- TkConn enhancements.
  • Loading branch information
jjzazuet committed Nov 7, 2024
1 parent fe78618 commit c19478e
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 48 deletions.
45 changes: 20 additions & 25 deletions tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import io.vacco.tokoeka.spi.*;
import io.vacco.tokoeka.util.*;
import org.slf4j.*;
import java.io.*;
import java.net.Socket;

import static java.util.Objects.requireNonNull;
import static io.vacco.tokoeka.util.TkSockets.*;

public class TkSocket implements Closeable, TkConn {
public class TkSocket implements TkConn {

private static final Logger log = LoggerFactory.getLogger(TkSocket.class);

Expand Down Expand Up @@ -37,51 +36,43 @@ public TkSocket connect() {
try {
socket = createSocket(host, port, secure, timeout);
sendRaw(socket, wsHandShakeOf(host, port, endpoint).getBytes());
socketConn = new TkServerConn(socket, socketState, (msg) -> send(socket, msg));
socketConn = new TkSocketConn(socket, socketState, socketHdl);
socketHdl.onOpen(socketConn, wsClientHandShakeResponseOf(socket));
return this;
} catch (Exception e) {
socketHdl.onError(socketConn, e);
doClose(this);
tearDown(socket, socketConn, socketHdl);
throw new IllegalStateException("ws connection failed", e);
}
}

public void listen() {
while (!socket.isClosed()) {
try {
try {
while (!socket.isClosed()) {
var stop = handleMessage(socket, socketConn, socketHdl);
if (stop) {
break;
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("ws message processing error", e);
}
socketHdl.onError(socketConn, e);
break;
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("ws message processing error", e);
}
socketHdl.onError(socketConn, e);
} finally {
tearDown(socket, socketConn, socketHdl);
}
tearDown(socket, socketConn, socketHdl);
}

@Override public void accept(String s) {
send(this.socket, s);
}

@Override public void close() {
tearDown(socket, socketConn, socketHdl);
}

public TkSocket withHandler(TkSocketHdl hdl) {
this.socketHdl = requireNonNull(hdl);
return this;
}

@Override public String toString() {
return String.format("%s - %s", socket, endpoint);
}

@Override public void setAttachment(Object attachment) {
this.socketState.attachment = requireNonNull(attachment);
}
Expand All @@ -100,19 +91,23 @@ public TkSocket withHandler(TkSocketHdl hdl) {
}

@Override public void sendPing() {
TkSockets.sendPing(socket);
this.socketConn.sendPing();
}

@Override public void sendPong() {
TkSockets.sendPong(socket);
this.socketConn.sendPong();
}

@Override public void close(int code) {
this.socketState.markClosed(code, null, false);
this.socketConn.close(code);
}

@Override public void close(int code, String msg) {
this.socketState.markClosed(code, msg, false);
this.socketConn.close(code, msg);
}

@Override public String toString() {
return String.format("%s - %s", socket, endpoint);
}

}
8 changes: 2 additions & 6 deletions tk-sdr/src/main/java/io/vacco/tokoeka/TkSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ public TkSocketServer(int port, TkSocketHdl socketHdl, Supplier<TkSocketState> s
this.clientPool = requireNonNull(clientPool);
}

public TkSocketServer(int port, TkSocketHdl socketHdl, Supplier<TkSocketState> stateFn) {
this(port, socketHdl, stateFn, Executors.newCachedThreadPool());
}

public void start() {
try {
serverSocket = new ServerSocket(port);
Expand All @@ -53,7 +49,7 @@ private void handleClient(Socket clientSocket) {
var socketState = this.stateFn.get();
var handShake = wsServerHandShakeOf(clientSocket);
var handshakeResponse = performHandshake(clientSocket, handShake);
conn = new TkServerConn(clientSocket, socketState, msg -> send(clientSocket, msg));
conn = new TkSocketConn(clientSocket, socketState, this.socketHdl);
this.socketHdl.onOpen(conn, handshakeResponse);
while (!clientSocket.isClosed()) {
var stop = handleMessage(clientSocket, conn, this.socketHdl);
Expand All @@ -73,7 +69,7 @@ private void handleClient(Socket clientSocket) {
@Override public void close() {
clientPool.shutdown();
if (serverSocket != null) {
doClose(serverSocket);
tryClose(serverSocket);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
package io.vacco.tokoeka.util;

import io.vacco.tokoeka.spi.TkConn;
import io.vacco.tokoeka.spi.*;
import java.net.Socket;
import java.util.Objects;
import java.util.function.Consumer;

public class TkServerConn implements TkConn {
import static java.util.Objects.requireNonNull;
import static io.vacco.tokoeka.util.TkSockets.*;

private final Socket socket;
public class TkSocketConn implements TkConn {

private final Socket socket;
private final TkSocketState socketState;
private final Consumer<String> tx;
private final TkSocketHdl socketHdl;

public TkServerConn(Socket socket, TkSocketState socketState, Consumer<String> tx) {
this.socket = Objects.requireNonNull(socket);
this.socketState = Objects.requireNonNull(socketState);
this.tx = Objects.requireNonNull(tx);
public TkSocketConn(Socket socket, TkSocketState socketState, TkSocketHdl socketHdl) {
this.socket = requireNonNull(socket);
this.socketState = requireNonNull(socketState);
this.socketHdl = requireNonNull(socketHdl);
}

@Override public void setAttachment(Object attachment) {
socketState.attachment = Objects.requireNonNull(attachment);
socketState.attachment = requireNonNull(attachment);
}

@SuppressWarnings("unchecked")
Expand All @@ -27,7 +28,7 @@ public TkServerConn(Socket socket, TkSocketState socketState, Consumer<String> t
}

@Override public void accept(String s) {
tx.accept(s);
send(socket, s);
}

@Override public Socket getSocket() {
Expand All @@ -40,10 +41,12 @@ public TkServerConn(Socket socket, TkSocketState socketState, Consumer<String> t

@Override public void close(int code) {
socketState.markClosed(code, null, false);
tearDown(socket, this, socketHdl);
}

@Override public void close(int code, String msg) {
socketState.markClosed(code, msg, false);
tearDown(socket, this, socketHdl);
}

@Override public void sendPing() {
Expand Down
4 changes: 2 additions & 2 deletions tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public static void tearDown(Socket socket, TkConn socketConn, TkSocketHdl socket
}
socketHdl.onClose(socketConn);
}
doClose(socket);
tryClose(socket);
if (log.isDebugEnabled()) {
log.debug("ws connection closed - {}, {}", socket, socketConn != null ? socketConn.getState() : "?");
}
Expand Down Expand Up @@ -334,7 +334,7 @@ public static boolean handleMessage(Socket sck, TkConn conn, TkSocketHdl socketH
return false;
}

public static void doClose(Closeable c) {
public static void tryClose(Closeable c) {
try {
if (c != null) {
c.close();
Expand Down
7 changes: 4 additions & 3 deletions tk-sdr/src/main/java/io/vacco/tokoeka/util/TkTimer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ public class TkTimer<T> {

private final long intervalMs;
private final Consumer<T> runCons;
private long lastRunMs;
private long lastRunMs = -1;

public TkTimer(long intervalMs, Consumer<T> runCons) {
this.intervalMs = intervalMs;
this.runCons = Objects.requireNonNull(runCons);
this.lastRunMs = System.currentTimeMillis();
}

public void update(T arg) {
long currentTime = System.currentTimeMillis();
if ((currentTime - lastRunMs) >= intervalMs) {
var isFirstRun = lastRunMs == -1;
var isNextRun = (currentTime - lastRunMs) >= intervalMs;
if (isFirstRun || isNextRun) {
runCons.accept(arg);
lastRunMs = currentTime;
}
Expand Down

0 comments on commit c19478e

Please sign in to comment.