Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
    - Preliminary WS server implementation.
    - Minor version bump.
  • Loading branch information
jjzazuet committed Sep 28, 2024
1 parent 7f26b8d commit 0f765dd
Show file tree
Hide file tree
Showing 12 changed files with 512 additions and 164 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ subprojects {
apply(plugin = "io.vacco.oss.gitflow")

group = "io.vacco.tokoeka"
version = "0.2.6"
version = "0.4.0"

configure<io.vacco.oss.gitflow.GsPluginProfileExtension> {
sharedLibrary(true, false)
Expand Down
150 changes: 24 additions & 126 deletions tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java
Original file line number Diff line number Diff line change
@@ -1,38 +1,39 @@
package io.vacco.tokoeka;

import io.vacco.tokoeka.spi.TkSocketHdl;
import io.vacco.tokoeka.spi.*;
import io.vacco.tokoeka.util.*;
import org.slf4j.*;
import java.io.*;
import java.net.Socket;
import java.util.function.*;

import static java.nio.ByteBuffer.wrap;
import static java.util.Objects.requireNonNull;
import static io.vacco.tokoeka.util.TkSockets.*;

public class TkSocket implements AutoCloseable, Consumer<String> {
public class TkSocket implements Closeable, Consumer<String> {

private static final Logger log = LoggerFactory.getLogger(TkSocket.class);

private final String host;
private final int port;
private final boolean secure;
private final int timeout;
private final String endpoint;
private final String host;
private final int port;
private final boolean secure;
private final int timeout;
private final String endpoint;
private final TkSocketState socketState;

private Socket socket;
private OutputStream outputStream;
private InputStream inputStream;
private TkSocketHdl socketHdl;
private TkConn socketConn;

private final ByteArrayOutputStream accumulatedData = new ByteArrayOutputStream();

public TkSocket(String host, int port, String endpoint, boolean secure, int timeout) {
public TkSocket(String host, int port, String endpoint, boolean secure, int timeout, TkSocketState socketState) {
this.host = requireNonNull(host);
this.port = port;
this.secure = secure;
this.timeout = timeout;
this.endpoint = requireNonNull(endpoint);
this.socketState = requireNonNull(socketState);
}

public TkSocket connect() {
Expand All @@ -42,142 +43,39 @@ public TkSocket connect() {
inputStream = socket.getInputStream();
outputStream.write(wsHandShakeOf(host, port, endpoint).getBytes());
outputStream.flush();
var reader = new BufferedReader(new InputStreamReader(inputStream));
var bld = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
if (line.isEmpty()) {
break;
}
bld.append(line).append('\n');
}
var hs = bld.toString();
if (!hs.contains("HTTP/1.1 101")) {
throw new IllegalStateException("ws connection handshake failed: " + hs);
}
this.socketHdl.onOpen(hs);
socketConn = new TkConnAdapter(
socket, socketState,
(msg) -> send(msg, outputStream),
(code, msg) -> sendClose(outputStream, code, msg)
);
socketHdl.onOpen(socketConn, wsClientHandShakeResponseOf(inputStream));
return this;
} catch (Exception e) {
this.socketHdl.onError(e);
socketHdl.onError(socketConn, e);
throw new IllegalStateException("ws connection failed", e);
}
}

private void sendPong() throws IOException {
var pongFrame = new byte[2];
pongFrame[0] = (byte) 0x8A; // 0x8A = FIN + opcode 0xA (PONG)
outputStream.write(pongFrame);
outputStream.flush();
if (log.isTraceEnabled()) {
log.trace("< PONG");
}
}

public void send(String message) {
try {
var payload = message.getBytes();
var payloadLength = payload.length;
var frame = new ByteArrayOutputStream();
frame.write(0x81); // FIN + text frame opcode (0x1)
if (payloadLength <= 125) {
frame.write(payloadLength);
} else if (payloadLength <= 65535) {
frame.write(126);
frame.write((payloadLength >> 8) & 0xFF); // most significant byte
frame.write(payloadLength & 0xFF); // least significant byte
} else {
frame.write(127); // 8-byte payload length
// For large payloads (>65535 bytes), write the 8-byte length
// First four bytes should be zeros (per WebSocket protocol)
frame.write(0); frame.write(0); frame.write(0); frame.write(0);
// Write the last four bytes of the payload length
frame.write((payloadLength >> 24) & 0xFF); // most significant byte
frame.write((payloadLength >> 16) & 0xFF);
frame.write((payloadLength >> 8) & 0xFF);
frame.write(payloadLength & 0xFF); // least significant byte
}
frame.write(payload);
outputStream.write(frame.toByteArray());
outputStream.flush();
if (log.isTraceEnabled()) {
log.trace("< TXT: {} ({} bytes)", message, payload.length);
}
} catch (Exception e) {
var msg = message != null && message.length() > 64
? String.format("%s...", message.substring(0, 64))
: message;
throw new IllegalStateException(String.format("unable to send text: %s", msg), e);
}
}

public void listen(Supplier<Boolean> go) {
while (go.get() && !socket.isClosed()) {
try {
var frameHeader = new byte[2];
read(inputStream, frameHeader);
var isFinalFragment = (frameHeader[0] & 0x80) != 0; // Check if FIN bit is set
var opcode = frameHeader[0] & 0x0F;
var payloadLength = payloadLengthOf(frameHeader, inputStream);
var payload = new byte[payloadLength];
int bytesRead = 0;
while (bytesRead < payloadLength) {
int read = inputStream.read(payload, bytesRead, payloadLength - bytesRead);
if (read == -1) {
throw new IOException("unexpected end of stream");
}
bytesRead += read;
}
accumulatedData.write(payload);
if (isFinalFragment) {
var completeMessage = accumulatedData.toByteArray();
if (opcode == 0x1) {
var msg = new String(completeMessage);
if (log.isTraceEnabled()) {
log.trace("> TXT: {}", msg);
}
this.socketHdl.onMessage(msg);
} else if (opcode == 0x2) {
if (log.isTraceEnabled()) {
log.trace("> BIN ({})", completeMessage.length);
}
this.socketHdl.onMessage(wrap(completeMessage));
} else if (opcode == 0xA) {
if (log.isTraceEnabled()) {
log.trace("> PONG");
}
} else if (opcode == 0x9) {
if (log.isTraceEnabled()) {
log.trace("> PING");
}
sendPong();
} else if (opcode == 0x8) {
if (completeMessage.length >= 2) {
int closeCode = ((completeMessage[0] & 0xFF) << 8) | (completeMessage[1] & 0xFF);
if (log.isTraceEnabled()) {
log.trace("> CLOSE ({})", closeCode);
}
this.socketHdl.onClose(closeCode);
} else {
log.trace("> CLOSE (?)");
throw new IllegalStateException("Received close frame with no close code.");
}
break;
}
accumulatedData.reset();
var stop = handleMessage(socketHdl, socketState, socketConn, inputStream, outputStream);
if (stop) {
break;
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("ws message processing error", e);
}
this.socketHdl.onError(e);
socketHdl.onError(socketConn, e);
break;
}
}
this.close();
}

@Override public void accept(String s) {
this.send(s);
send(s, this.outputStream);
}

@Override public void close() {
Expand Down
92 changes: 92 additions & 0 deletions tk-sdr/src/main/java/io/vacco/tokoeka/TkSocketServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.vacco.tokoeka;

import io.vacco.tokoeka.spi.TkSocketHdl;
import io.vacco.tokoeka.util.*;
import org.slf4j.*;
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.function.*;

import static java.util.Objects.requireNonNull;
import static io.vacco.tokoeka.util.TkSockets.*;

public class TkSocketServer implements Closeable {

private static final Logger log = LoggerFactory.getLogger(TkSocketServer.class);

private final int port;
private final TkSocketHdl socketHdl;
private final Supplier<TkSocketState> stateFn;
private final ExecutorService clientThreadPool = Executors.newCachedThreadPool();

private ServerSocket serverSocket;

public TkSocketServer(int port, TkSocketHdl socketHdl, Supplier<TkSocketState> stateFn) {
this.port = port;
this.stateFn = requireNonNull(stateFn);
this.socketHdl = requireNonNull(socketHdl);
}

public void start() {
try {
serverSocket = new ServerSocket(port);
log.info("WebSocket server started on port {}", port);
while (!serverSocket.isClosed()) {
var clientSocket = serverSocket.accept();
clientThreadPool.submit(() -> handleClient(clientSocket));
}
} catch (IOException e) {
throw new IllegalStateException("Unable to start websocket server", e);
}
}

private void handleClient(Socket clientSocket) {
log.debug("Incoming connection: {}", clientSocket);
try {
var inputStream = clientSocket.getInputStream();
var outputStream = clientSocket.getOutputStream();
var socketState = this.stateFn.get();
var handShake = wsServerHandShakeOf(inputStream);
var handshakeResponse = performHandshake(handShake, outputStream);
if (handshakeResponse != null) {
var conn = new TkConnAdapter(
clientSocket, socketState,
msg -> send(msg, outputStream),
(code, msg) -> {

}
);
this.socketHdl.onOpen(conn, handshakeResponse);
while (!clientSocket.isClosed()) {
var stop = handleMessage(this.socketHdl, socketState, conn, inputStream, outputStream);
if (stop) {
break;
}
}
} else {
throw new IllegalStateException("Incoming connection - missing handshake response " + clientSocket);
}
} catch (Exception e) {
log.error("Incoming connection handler error - {}", clientSocket.getRemoteSocketAddress(), e);
} finally {
try {
clientSocket.close();
} catch (IOException e) {
log.warn("Incoming connection close error - {}", e.getMessage());
}
}
}

@Override public void close() {
clientThreadPool.shutdown();
if (serverSocket != null) {
try {
serverSocket.close();
} catch (Exception e) {
log.warn("Websocket server close error - {}", e.getMessage());
}
}
}

}
3 changes: 3 additions & 0 deletions tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkSquelch.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public void processAudio(byte[] pcm) {

private void updateNoiseFloor(double signalAvg) {
int signalBin = (int) signalAvg;
if (signalBin == 0) {
signalBin = 1;
}
noiseHistogram.put(signalBin, noiseHistogram.getOrDefault(signalBin, 0L) + 1);
int mostFrequentBin = signalBin;
long maxCount = 0;
Expand Down
27 changes: 9 additions & 18 deletions tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class TkControlHdl implements TkSocketHdl {

private final TkConfig config;

private Consumer<String> tx;
private TkAudioHdl audioHdl;
private TkWaterfallHdl waterfallHdl;
private TkJsonIn jsonIn;
Expand Down Expand Up @@ -98,15 +97,12 @@ private void processWaterfall(ByteBuffer data) {
}
}

@Override public void onOpen(String handShake) {
if (tx == null) {
throw new IllegalStateException("no tx sink, check handler configuration.");
}
tx.accept(setAuth(config.username, config.password));
tx.accept(setIdentity(config.identUser));
@Override public void onOpen(TkConn conn, String handShake) {
conn.accept(setAuth(config.username, config.password));
conn.accept(setIdentity(config.identUser));
}

@Override public void onMessage(ByteBuffer data) {
@Override public void onMessage(TkConn conn, ByteBuffer data) {
if (data == null || data.remaining() < 3) {
log.error("No data, or received data is too short to contain a valid tag");
return;
Expand All @@ -129,17 +125,17 @@ private void processWaterfall(ByteBuffer data) {
default: log.warn("Unsupported message tag {} ({})", tag, data.remaining());
}
} catch (Exception e) {
this.onError(e);
this.onError(conn, e);
}
}

@Override public void onMessage(String message) {}
@Override public void onMessage(TkConn conn, String message) {}

@Override public void onClose(int code) {
this.controlEvent(code, null, null, true, null);
@Override public void onClose(TkConn conn, int code, boolean remote) {
this.controlEvent(code, null, null, remote, null);
}

@Override public void onError(Exception e) {
@Override public void onError(TkConn conn, Exception e) {
this.controlEvent(-1, null, null, false, e);
}

Expand Down Expand Up @@ -168,9 +164,4 @@ public TkControlHdl withConfigPin(TkConfigPin pin) {
return this;
}

public TkControlHdl withSink(Consumer<String> tx) {
this.tx = requireNonNull(tx);
return this;
}

}
13 changes: 13 additions & 0 deletions tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkConn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.vacco.tokoeka.spi;

import java.io.Closeable;
import java.net.Socket;
import java.util.function.Consumer;

public interface TkConn extends Consumer<String>, Closeable {
void setAttachment(Object attachment);
<T> T getAttachment();
Socket getSocket();
void close(int code);
void close(int code, String msg);
}
Loading

0 comments on commit 0f765dd

Please sign in to comment.