Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit da67fbd
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Tue Nov 5 23:01:24 2024 -0500

    - Ping/pong handling fixes.

commit ae312e7
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Tue Nov 5 21:49:22 2024 -0500

    - Ping/pong handling fixes.

commit fdb4ca8
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Tue Nov 5 21:18:22 2024 -0500

    - Ping/pong handling fixes.

commit d49aff4
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Sun Oct 13 16:22:16 2024 -0400

    Squashed commit of the following:

    - Connection handling fixes.
    - Minor version bump.

commit bad35f6
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Thu Oct 10 10:43:39 2024 -0400

    - Dependency update.

commit 85bbd1b
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Thu Oct 10 10:37:39 2024 -0400

    - Client connection handler fixes.
    - Minor version bump.

commit 2acdd05
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Wed Oct 9 01:22:48 2024 -0400

    - Server connection handler fixes.
    - Logging tweaks.
    - Test case updates.
    - Minor version bump.

commit 44e6ec3
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Sat Sep 28 16:53:40 2024 -0400

    - Logging tweaks.
    - Patch version bump.

commit 0f765dd
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Sat Sep 28 16:17:20 2024 -0400

    Squashed commit of the following:

        - Preliminary WS server implementation.
        - Minor version bump.

commit 7f26b8d
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Tue Sep 24 01:15:22 2024 -0400

    - Logging tweaks.
    - Patch version bump.

commit cfe5ae6
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Mon Sep 16 04:03:05 2024 -0400

    - Connection error handling fixes.
    - Minor version bump.

commit f5b9498
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Sun Sep 15 17:21:56 2024 -0400

    - Host header fix.
    - Minor version bump.

commit 0c448b8
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Sat Sep 14 13:45:43 2024 -0400

    - Logging tweaks.
    - Minor version bump.

commit 889a4e9
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Sat Sep 14 03:36:03 2024 -0400

    - Missing close code handler notification.
    - Logging tweaks.
    - Minor version bump.

commit cde16da
Author: Jesus Zazueta <jjzazuet@gmail.com>
Date:   Thu Sep 12 19:34:27 2024 -0400

    - Socket connection error notifications.
  • Loading branch information
jjzazuet committed Nov 6, 2024
1 parent a306a0c commit 19f1e82
Show file tree
Hide file tree
Showing 28 changed files with 882 additions and 374 deletions.
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ subprojects {
apply(plugin = "io.vacco.oss.gitflow")

group = "io.vacco.tokoeka"
version = "0.2.0"
version = "0.6.5"

configure<io.vacco.oss.gitflow.GsPluginProfileExtension> {
sharedLibrary(true, false)
}
}
}
4 changes: 2 additions & 2 deletions tk-sdr/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ val api by configurations

dependencies {
api(project(":tk-schema"))
api("org.slf4j:slf4j-api:2.0.6")
testImplementation("io.vacco.shax:shax:2.0.6.0.1.0")
api("org.slf4j:slf4j-api:2.0.16")
testImplementation("io.vacco.shax:shax:2.0.16.0.1.2")
testImplementation("com.google.code.gson:gson:2.10.1")
}
166 changes: 33 additions & 133 deletions tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java
Original file line number Diff line number Diff line change
@@ -1,189 +1,89 @@
package io.vacco.tokoeka;

import io.vacco.tokoeka.spi.TkSocketHdl;
import io.vacco.tokoeka.spi.*;
import io.vacco.tokoeka.util.*;
import org.slf4j.*;
import java.io.*;
import java.net.Socket;
import java.util.function.*;

import static java.nio.ByteBuffer.wrap;
import static java.util.Objects.requireNonNull;
import static io.vacco.tokoeka.util.TkSockets.*;

public class TkSocket implements AutoCloseable, Consumer<String> {
public class TkSocket implements Closeable, Consumer<String> {

private static final Logger log = LoggerFactory.getLogger(TkSocket.class);

private final String endpoint;
private final Socket socket;
private final String host;
private final int port;
private final boolean secure;
private final int timeout;
private final String endpoint;
private final TkSocketState socketState;

private Socket socket;
private OutputStream outputStream;
private InputStream inputStream;
private TkSocketHdl socketHdl;
private TkConn socketConn;

private final ByteArrayOutputStream accumulatedData = new ByteArrayOutputStream();

public TkSocket(String host, int port, String endpoint, boolean secure, int timeout) {
public TkSocket(String host, int port, String endpoint, boolean secure, int timeout, TkSocketState socketState) {
this.host = requireNonNull(host);
this.port = port;
this.secure = secure;
this.timeout = timeout;
this.endpoint = requireNonNull(endpoint);
this.socket = createSocket(host, port, secure, timeout);
this.socketState = requireNonNull(socketState);
}

public TkSocket connect() {
try {
socket = createSocket(host, port, secure, timeout);
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
outputStream.write(wsHandShakeOf(endpoint).getBytes());
outputStream.write(wsHandShakeOf(host, port, endpoint).getBytes());
outputStream.flush();
var reader = new BufferedReader(new InputStreamReader(inputStream));
var bld = new StringBuilder();
String line;
while (!(line = reader.readLine()).isEmpty()) {
bld.append(line).append('\n');
}
var hs = bld.toString();
if (!hs.contains("HTTP/1.1 101")) {
throw new IllegalStateException("ws connection handshake failed: " + hs);
}
this.socketHdl.onOpen(hs);
socketConn = new TkConnAdapter(socket, socketState, (msg) -> send(msg, outputStream));
socketHdl.onOpen(socketConn, wsClientHandShakeResponseOf(inputStream));
return this;
} catch (Exception e) {
throw new IllegalStateException("ws connection open failed", e);
}
}

private void sendPong() throws IOException {
var pongFrame = new byte[2];
pongFrame[0] = (byte) 0x8A; // 0x8A = FIN + opcode 0xA (PONG)
outputStream.write(pongFrame);
outputStream.flush();
if (log.isTraceEnabled()) {
log.trace("< PONG");
}
}

public void send(String message) {
try {
var payload = message.getBytes();
var payloadLength = payload.length;
var frame = new ByteArrayOutputStream();
frame.write(0x81); // FIN + text frame opcode (0x1)
if (payloadLength <= 125) {
frame.write(payloadLength);
} else if (payloadLength <= 65535) {
frame.write(126);
frame.write((payloadLength >> 8) & 0xFF); // most significant byte
frame.write(payloadLength & 0xFF); // least significant byte
} else {
frame.write(127); // 8-byte payload length
// For large payloads (>65535 bytes), write the 8-byte length
// First four bytes should be zeros (per WebSocket protocol)
frame.write(0); frame.write(0); frame.write(0); frame.write(0);
// Write the last four bytes of the payload length
frame.write((payloadLength >> 24) & 0xFF); // most significant byte
frame.write((payloadLength >> 16) & 0xFF);
frame.write((payloadLength >> 8) & 0xFF);
frame.write(payloadLength & 0xFF); // least significant byte
}
frame.write(payload);
outputStream.write(frame.toByteArray());
outputStream.flush();
if (log.isDebugEnabled()) {
log.debug("< TXT: {} ({} bytes)", message, payload.length);
}
} catch (Exception e) {
throw new IllegalStateException(String.format("unable to send text: %s", message), e);
socketHdl.onError(socketConn, e);
doClose(this);
throw new IllegalStateException("ws connection failed", e);
}
}

public void listen(Supplier<Boolean> go) {
while (go.get() && !socket.isClosed()) {
public void listen() {
while (!socket.isClosed()) {
try {
var frameHeader = new byte[2];
read(inputStream, frameHeader);
var isFinalFragment = (frameHeader[0] & 0x80) != 0; // Check if FIN bit is set
var opcode = frameHeader[0] & 0x0F;
var payloadLength = payloadLengthOf(frameHeader, inputStream);
var payload = new byte[payloadLength];
int bytesRead = 0;
while (bytesRead < payloadLength) {
int read = inputStream.read(payload, bytesRead, payloadLength - bytesRead);
if (read == -1) {
throw new IOException("unexpected end of stream");
}
bytesRead += read;
}
accumulatedData.write(payload);
if (isFinalFragment) {
var completeMessage = accumulatedData.toByteArray();
if (opcode == 0x1) {
var msg = new String(completeMessage);
if (log.isTraceEnabled()) {
log.trace("> TXT: {}", msg);
}
this.socketHdl.onMessage(msg);
} else if (opcode == 0x2) {
if (log.isTraceEnabled()) {
log.trace("> BIN ({})", completeMessage.length);
}
this.socketHdl.onMessage(wrap(completeMessage));
} else if (opcode == 0xA) {
if (log.isTraceEnabled()) {
log.trace("> PONG");
}
} else if (opcode == 0x9) {
if (log.isTraceEnabled()) {
log.trace("> PING");
}
sendPong();
} else if (opcode == 0x8) {
if (completeMessage.length >= 2) {
int closeCode = ((completeMessage[0] & 0xFF) << 8) | (completeMessage[1] & 0xFF);
if (log.isTraceEnabled()) {
log.trace("> CLOSE ({})", closeCode);
}
this.socketHdl.onClose(closeCode);
} else {
log.warn("Received close frame with no close code.");
}
break;
}
accumulatedData.reset();
var stop = handleMessage(socketHdl, socketConn, inputStream, outputStream);
if (stop) {
break;
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("ws message processing error", e);
}
this.socketHdl.onError(e);
socketHdl.onError(socketConn, e);
break;
}
}
this.close();
tearDown(socket, socketConn, socketHdl);
}

@Override public void accept(String s) {
this.send(s);
send(s, this.outputStream);
}

@Override public void close() {
try {
socket.close();
} catch (IOException e) {
log.error("Unable to close ws socket: {} - {}", this, e.getMessage());
}
if (log.isDebugEnabled()) {
log.debug("ws connection closed");
}
tearDown(socket, socketConn, socketHdl);
}

public TkSocket withHandler(TkSocketHdl hdl) {
this.socketHdl = requireNonNull(hdl);
return this;
}

public Socket getSocket() {
return socket;
}

@Override public String toString() {
return String.format("%s - %s", socket, endpoint);
}
Expand Down
82 changes: 82 additions & 0 deletions tk-sdr/src/main/java/io/vacco/tokoeka/TkSocketServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.vacco.tokoeka;

import io.vacco.tokoeka.spi.*;
import io.vacco.tokoeka.util.*;
import org.slf4j.*;
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.function.*;

import static java.util.Objects.requireNonNull;
import static io.vacco.tokoeka.util.TkSockets.*;

public class TkSocketServer implements Closeable {

private static final Logger log = LoggerFactory.getLogger(TkSocketServer.class);

private final int port;
private final TkSocketHdl socketHdl;
private final Supplier<TkSocketState> stateFn;
private final ExecutorService clientPool;

private ServerSocket serverSocket;

public TkSocketServer(int port, TkSocketHdl socketHdl, Supplier<TkSocketState> stateFn, ExecutorService clientPool) {
this.port = port;
this.stateFn = requireNonNull(stateFn);
this.socketHdl = requireNonNull(socketHdl);
this.clientPool = requireNonNull(clientPool);
}

public TkSocketServer(int port, TkSocketHdl socketHdl, Supplier<TkSocketState> stateFn) {
this(port, socketHdl, stateFn, Executors.newCachedThreadPool());
}

public void start() {
try {
serverSocket = new ServerSocket(port);
log.info("WebSocket server started on port {}", port);
while (!serverSocket.isClosed()) {
var clientSocket = serverSocket.accept();
clientPool.submit(() -> handleClient(clientSocket));
}
} catch (IOException e) {
throw new IllegalStateException("Unable to start websocket server", e);
}
}

private void handleClient(Socket clientSocket) {
log.debug("Client connection: {}", clientSocket);
TkConn conn = null;
try {
var inputStream = clientSocket.getInputStream();
var outputStream = clientSocket.getOutputStream();
var socketState = this.stateFn.get();
var handShake = wsServerHandShakeOf(inputStream);
var handshakeResponse = performHandshake(handShake, outputStream);
conn = new TkConnAdapter(clientSocket, socketState, msg -> send(msg, outputStream));
this.socketHdl.onOpen(conn, handshakeResponse);
while (!clientSocket.isClosed()) {
var stop = handleMessage(this.socketHdl, conn, inputStream, outputStream);
if (stop) {
break;
}
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Client connection error - {}", clientSocket.getRemoteSocketAddress(), e);
}
} finally {
tearDown(clientSocket, conn, socketHdl);
}
}

@Override public void close() {
clientPool.shutdown();
if (serverSocket != null) {
doClose(serverSocket);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,5 @@ public void close() {
line.close();
}
}

}
3 changes: 3 additions & 0 deletions tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkSquelch.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public void processAudio(byte[] pcm) {

private void updateNoiseFloor(double signalAvg) {
int signalBin = (int) signalAvg;
if (signalBin == 0) {
signalBin = 1;
}
noiseHistogram.put(signalBin, noiseHistogram.getOrDefault(signalBin, 0L) + 1);
int mostFrequentBin = signalBin;
long maxCount = 0;
Expand Down
Loading

0 comments on commit 19f1e82

Please sign in to comment.