Skip to content

Commit

Permalink
- Code cleanup.
Browse files Browse the repository at this point in the history
- TkConn enhancements.
  • Loading branch information
jjzazuet committed Nov 7, 2024
1 parent c93b035 commit fe78618
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 108 deletions.
51 changes: 39 additions & 12 deletions tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
import org.slf4j.*;
import java.io.*;
import java.net.Socket;
import java.util.function.*;

import static java.util.Objects.requireNonNull;
import static io.vacco.tokoeka.util.TkSockets.*;

public class TkSocket implements Closeable, Consumer<String> {
public class TkSocket implements Closeable, TkConn {

private static final Logger log = LoggerFactory.getLogger(TkSocket.class);

Expand All @@ -22,8 +21,6 @@ public class TkSocket implements Closeable, Consumer<String> {
private final TkSocketState socketState;

private Socket socket;
private OutputStream outputStream;
private InputStream inputStream;
private TkSocketHdl socketHdl;
private TkConn socketConn;

Expand All @@ -39,12 +36,9 @@ public TkSocket(String host, int port, String endpoint, boolean secure, int time
public TkSocket connect() {
try {
socket = createSocket(host, port, secure, timeout);
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
outputStream.write(wsHandShakeOf(host, port, endpoint).getBytes());
outputStream.flush();
socketConn = new TkConnAdapter(socket, socketState, (msg) -> send(msg, outputStream));
socketHdl.onOpen(socketConn, wsClientHandShakeResponseOf(inputStream));
sendRaw(socket, wsHandShakeOf(host, port, endpoint).getBytes());
socketConn = new TkServerConn(socket, socketState, (msg) -> send(socket, msg));
socketHdl.onOpen(socketConn, wsClientHandShakeResponseOf(socket));
return this;
} catch (Exception e) {
socketHdl.onError(socketConn, e);
Expand All @@ -56,7 +50,7 @@ public TkSocket connect() {
public void listen() {
while (!socket.isClosed()) {
try {
var stop = handleMessage(socketHdl, socketConn, inputStream, outputStream);
var stop = handleMessage(socket, socketConn, socketHdl);
if (stop) {
break;
}
Expand All @@ -72,7 +66,7 @@ public void listen() {
}

@Override public void accept(String s) {
send(s, this.outputStream);
send(this.socket, s);
}

@Override public void close() {
Expand All @@ -88,4 +82,37 @@ public TkSocket withHandler(TkSocketHdl hdl) {
return String.format("%s - %s", socket, endpoint);
}

@Override public void setAttachment(Object attachment) {
this.socketState.attachment = requireNonNull(attachment);
}

@SuppressWarnings("unchecked")
@Override public <T> T getAttachment() {
return (T) this.socketState.attachment;
}

@Override public Socket getSocket() {
return this.socket;
}

@Override public TkSocketState getState() {
return this.socketState;
}

@Override public void sendPing() {
TkSockets.sendPing(socket);
}

@Override public void sendPong() {
TkSockets.sendPong(socket);
}

@Override public void close(int code) {
this.socketState.markClosed(code, null, false);
}

@Override public void close(int code, String msg) {
this.socketState.markClosed(code, msg, false);
}

}
10 changes: 4 additions & 6 deletions tk-sdr/src/main/java/io/vacco/tokoeka/TkSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,13 @@ private void handleClient(Socket clientSocket) {
log.debug("Client connection: {}", clientSocket);
TkConn conn = null;
try {
var inputStream = clientSocket.getInputStream();
var outputStream = clientSocket.getOutputStream();
var socketState = this.stateFn.get();
var handShake = wsServerHandShakeOf(inputStream);
var handshakeResponse = performHandshake(handShake, outputStream);
conn = new TkConnAdapter(clientSocket, socketState, msg -> send(msg, outputStream));
var handShake = wsServerHandShakeOf(clientSocket);
var handshakeResponse = performHandshake(clientSocket, handShake);
conn = new TkServerConn(clientSocket, socketState, msg -> send(clientSocket, msg));
this.socketHdl.onOpen(conn, handshakeResponse);
while (!clientSocket.isClosed()) {
var stop = handleMessage(this.socketHdl, conn, inputStream, outputStream);
var stop = handleMessage(clientSocket, conn, this.socketHdl);
if (stop) {
break;
}
Expand Down
14 changes: 7 additions & 7 deletions tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public TkControlHdl(TkConfig config) {
this.config = requireNonNull(config);
}

public void sdrEvent(TkConn conn, String key, String value, Exception e, boolean ping, boolean pong) {
public void sdrEvent(TkConn conn, String key, String value, Exception e) {
if (this.sdrPin != null) {
this.sdrPin.onEvent(conn, key, value, e, ping, pong);
this.sdrPin.onEvent(conn, key, value, e);
}
}

Expand All @@ -68,7 +68,7 @@ private void processKeyValue(TkConn conn, String key, String value) {
case badp:
case too_busy:
case redirect:
case down: this.sdrEvent(conn, key, value, null, false, false); break;
case down: this.sdrEvent(conn, key, value, null); break;
default:
if (log.isDebugEnabled()) {
log.debug("Unknown message key/value: {} -> {}", key, shorten(value));
Expand Down Expand Up @@ -135,19 +135,19 @@ private void processWaterfall(ByteBuffer data) {
@Override public void onMessage(TkConn conn, String message) {}

@Override public void onClose(TkConn conn) {
this.sdrEvent(conn, null, null, null, false, false);
this.sdrEvent(conn, null, null, null);
}

@Override public void onError(TkConn conn, Exception e) {
this.sdrEvent(conn, null, null, e, false, false);
this.sdrEvent(conn, null, null, e);
}

@Override public void onPing(TkConn conn) {
this.sdrEvent(conn, null, null, null, true, false);
this.sdrEvent(conn, null, null, null);
}

@Override public void onPong(TkConn conn) {
this.sdrEvent(conn, null, null, null, false, true);
this.sdrEvent(conn, null, null, null);
}

public TkControlHdl withAudioHandler(TkAudioHdl hdl) {
Expand Down
5 changes: 1 addition & 4 deletions tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSdrPin.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package io.vacco.tokoeka.spi;

public interface TkSdrPin {
void onEvent(
TkConn conn, String key, String value,
Exception e, boolean ping, boolean pong
);
void onEvent(TkConn conn, String key, String value, Exception e);
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package io.vacco.tokoeka.util;

import io.vacco.tokoeka.spi.TkConn;
import java.io.IOException;
import java.net.Socket;
import java.util.Objects;
import java.util.function.Consumer;

public class TkConnAdapter implements TkConn {
public class TkServerConn implements TkConn {

private final Socket socket;
private final TkSocketState socketState;
private final Consumer<String> tx;

public TkConnAdapter(Socket socket, TkSocketState socketState, Consumer<String> tx) {
public TkServerConn(Socket socket, TkSocketState socketState, Consumer<String> tx) {
this.socket = Objects.requireNonNull(socket);
this.socketState = Objects.requireNonNull(socketState);
this.tx = Objects.requireNonNull(tx);
Expand Down Expand Up @@ -48,19 +47,11 @@ public TkConnAdapter(Socket socket, TkSocketState socketState, Consumer<String>
}

@Override public void sendPing() {
try {
TkSockets.sendPing(socket.getOutputStream());
} catch (IOException e) {
throw new IllegalStateException(e);
}
TkSockets.sendPing(socket);
}

@Override public void sendPong() {
try {
TkSockets.sendPong(socket.getOutputStream());
} catch (IOException e) {
throw new IllegalStateException(e);
}
TkSockets.sendPong(socket);
}

@Override public String toString() {
Expand Down
6 changes: 6 additions & 0 deletions tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSocketState.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ public class TkSocketState {

public final ByteArrayOutputStream accumulatedData = new ByteArrayOutputStream();

/*
* TODO: these attributes are here so ping/pong based keep-alive
* mechanisms can be implemented. However, I'm not going to
* provide a default implementation. Maybe in a future release
* if there's interest/demand.
*/
public long lastPingMs = 0;
public long lastPongMs = 0;
public long keepAliveMs;
Expand Down
Loading

0 comments on commit fe78618

Please sign in to comment.