From 19f1e8262dc6dcd9b35c858e6ad9629f5904746e Mon Sep 17 00:00:00 2001 From: Jesus Zazueta Date: Tue, 5 Nov 2024 23:28:49 -0500 Subject: [PATCH] Squashed commit of the following: commit da67fbd330ebdfd0c19677c788b7fa79b7a62e31 Author: Jesus Zazueta Date: Tue Nov 5 23:01:24 2024 -0500 - Ping/pong handling fixes. commit ae312e7ec0f8892d62dadd448aa01c267275280c Author: Jesus Zazueta Date: Tue Nov 5 21:49:22 2024 -0500 - Ping/pong handling fixes. commit fdb4ca8c541b14041a29877972a4ee8f10aba15c Author: Jesus Zazueta Date: Tue Nov 5 21:18:22 2024 -0500 - Ping/pong handling fixes. commit d49aff4cf3c353c7524e1750f793c4706aaf9fd6 Author: Jesus Zazueta Date: Sun Oct 13 16:22:16 2024 -0400 Squashed commit of the following: - Connection handling fixes. - Minor version bump. commit bad35f68cda8482c0ff3c4e9458120cfe1167f0a Author: Jesus Zazueta Date: Thu Oct 10 10:43:39 2024 -0400 - Dependency update. commit 85bbd1be9befd916ee4c357eaa6d2fe8e5c64313 Author: Jesus Zazueta Date: Thu Oct 10 10:37:39 2024 -0400 - Client connection handler fixes. - Minor version bump. commit 2acdd0511e980471a2d161ee00171473e3a995f7 Author: Jesus Zazueta Date: Wed Oct 9 01:22:48 2024 -0400 - Server connection handler fixes. - Logging tweaks. - Test case updates. - Minor version bump. commit 44e6ec3dcbdbfc0e5d7a7a496c36c5fe1a06c935 Author: Jesus Zazueta Date: Sat Sep 28 16:53:40 2024 -0400 - Logging tweaks. - Patch version bump. commit 0f765dd23a13c4d580978273353f9e885c829a31 Author: Jesus Zazueta Date: Sat Sep 28 16:17:20 2024 -0400 Squashed commit of the following: - Preliminary WS server implementation. - Minor version bump. commit 7f26b8dce0eecb322d7428c53e6a02480133b79b Author: Jesus Zazueta Date: Tue Sep 24 01:15:22 2024 -0400 - Logging tweaks. - Patch version bump. commit cfe5ae63a5ad706658d27107ef93b9a74a0dea86 Author: Jesus Zazueta Date: Mon Sep 16 04:03:05 2024 -0400 - Connection error handling fixes. - Minor version bump. commit f5b94983b562177f6e651f2e5b49aa9a8b90cfc5 Author: Jesus Zazueta Date: Sun Sep 15 17:21:56 2024 -0400 - Host header fix. - Minor version bump. commit 0c448b83d53be504e2d876f449b0b9aeb8e3528b Author: Jesus Zazueta Date: Sat Sep 14 13:45:43 2024 -0400 - Logging tweaks. - Minor version bump. commit 889a4e9e1a929ef3f72623500e80e92fcb05beb4 Author: Jesus Zazueta Date: Sat Sep 14 03:36:03 2024 -0400 - Missing close code handler notification. - Logging tweaks. - Minor version bump. commit cde16dada8ee97882ab3668735cabc12709de0cf Author: Jesus Zazueta Date: Thu Sep 12 19:34:27 2024 -0400 - Socket connection error notifications. --- build.gradle.kts | 4 +- tk-sdr/build.gradle.kts | 4 +- .../main/java/io/vacco/tokoeka/TkSocket.java | 166 ++-------- .../java/io/vacco/tokoeka/TkSocketServer.java | 82 +++++ .../io/vacco/tokoeka/audio/TkAudioPlayer.java | 1 + .../io/vacco/tokoeka/audio/TkSquelch.java | 3 + .../io/vacco/tokoeka/handler/TkAudioHdl.java | 20 +- .../vacco/tokoeka/handler/TkControlHdl.java | 87 ++--- .../vacco/tokoeka/handler/TkWaterfallHdl.java | 8 +- .../java/io/vacco/tokoeka/spi/TkAudioPin.java | 6 +- .../java/io/vacco/tokoeka/spi/TkConn.java | 21 ++ .../io/vacco/tokoeka/spi/TkControlPin.java | 5 - .../java/io/vacco/tokoeka/spi/TkSdrPin.java | 8 + .../io/vacco/tokoeka/spi/TkSocketHdl.java | 12 +- .../io/vacco/tokoeka/util/TkConnAdapter.java | 70 ++++ .../java/io/vacco/tokoeka/util/TkCounter.java | 29 -- .../java/io/vacco/tokoeka/util/TkFormat.java | 20 ++ .../io/vacco/tokoeka/util/TkSocketState.java | 46 +++ .../java/io/vacco/tokoeka/util/TkSockets.java | 306 +++++++++++++++++- .../java/io/vacco/tokoeka/util/TkTimer.java | 31 ++ tk-sdr/src/test/java/TkPairTest.java | 28 -- tk-sdr/src/test/java/TkSocketTest.java | 93 ------ .../vacco/tokoeka}/TkControlHdlTest.java | 47 ++- .../test/java/io/vacco/tokoeka/TkLogging.java | 26 ++ .../vacco/tokoeka}/TkNormalizeTest.java | 4 +- .../java/io/vacco/tokoeka/TkPairTest.java | 43 +++ .../java/io/vacco/tokoeka/TkSocketTest.java | 83 +++++ .../java/{ => io/vacco/tokoeka}/WsMsg.java | 3 + 28 files changed, 882 insertions(+), 374 deletions(-) create mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/TkSocketServer.java create mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkConn.java delete mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java create mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSdrPin.java create mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/util/TkConnAdapter.java delete mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/util/TkCounter.java create mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSocketState.java create mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/util/TkTimer.java delete mode 100644 tk-sdr/src/test/java/TkPairTest.java delete mode 100644 tk-sdr/src/test/java/TkSocketTest.java rename tk-sdr/src/test/java/{ => io/vacco/tokoeka}/TkControlHdlTest.java (58%) create mode 100644 tk-sdr/src/test/java/io/vacco/tokoeka/TkLogging.java rename tk-sdr/src/test/java/{ => io/vacco/tokoeka}/TkNormalizeTest.java (97%) create mode 100644 tk-sdr/src/test/java/io/vacco/tokoeka/TkPairTest.java create mode 100644 tk-sdr/src/test/java/io/vacco/tokoeka/TkSocketTest.java rename tk-sdr/src/test/java/{ => io/vacco/tokoeka}/WsMsg.java (89%) diff --git a/build.gradle.kts b/build.gradle.kts index effdc22..fa415c4 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -4,9 +4,9 @@ subprojects { apply(plugin = "io.vacco.oss.gitflow") group = "io.vacco.tokoeka" - version = "0.2.0" + version = "0.6.5" configure { sharedLibrary(true, false) } -} \ No newline at end of file +} diff --git a/tk-sdr/build.gradle.kts b/tk-sdr/build.gradle.kts index b7adac2..f239deb 100644 --- a/tk-sdr/build.gradle.kts +++ b/tk-sdr/build.gradle.kts @@ -6,7 +6,7 @@ val api by configurations dependencies { api(project(":tk-schema")) - api("org.slf4j:slf4j-api:2.0.6") - testImplementation("io.vacco.shax:shax:2.0.6.0.1.0") + api("org.slf4j:slf4j-api:2.0.16") + testImplementation("io.vacco.shax:shax:2.0.16.0.1.2") testImplementation("com.google.code.gson:gson:2.10.1") } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java b/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java index 5fb0191..cc8d238 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java @@ -1,178 +1,82 @@ package io.vacco.tokoeka; -import io.vacco.tokoeka.spi.TkSocketHdl; +import io.vacco.tokoeka.spi.*; +import io.vacco.tokoeka.util.*; import org.slf4j.*; import java.io.*; import java.net.Socket; import java.util.function.*; -import static java.nio.ByteBuffer.wrap; import static java.util.Objects.requireNonNull; import static io.vacco.tokoeka.util.TkSockets.*; -public class TkSocket implements AutoCloseable, Consumer { +public class TkSocket implements Closeable, Consumer { private static final Logger log = LoggerFactory.getLogger(TkSocket.class); - private final String endpoint; - private final Socket socket; + private final String host; + private final int port; + private final boolean secure; + private final int timeout; + private final String endpoint; + private final TkSocketState socketState; + private Socket socket; private OutputStream outputStream; private InputStream inputStream; private TkSocketHdl socketHdl; + private TkConn socketConn; - private final ByteArrayOutputStream accumulatedData = new ByteArrayOutputStream(); - - public TkSocket(String host, int port, String endpoint, boolean secure, int timeout) { + public TkSocket(String host, int port, String endpoint, boolean secure, int timeout, TkSocketState socketState) { + this.host = requireNonNull(host); + this.port = port; + this.secure = secure; + this.timeout = timeout; this.endpoint = requireNonNull(endpoint); - this.socket = createSocket(host, port, secure, timeout); + this.socketState = requireNonNull(socketState); } public TkSocket connect() { try { + socket = createSocket(host, port, secure, timeout); outputStream = socket.getOutputStream(); inputStream = socket.getInputStream(); - outputStream.write(wsHandShakeOf(endpoint).getBytes()); + outputStream.write(wsHandShakeOf(host, port, endpoint).getBytes()); outputStream.flush(); - var reader = new BufferedReader(new InputStreamReader(inputStream)); - var bld = new StringBuilder(); - String line; - while (!(line = reader.readLine()).isEmpty()) { - bld.append(line).append('\n'); - } - var hs = bld.toString(); - if (!hs.contains("HTTP/1.1 101")) { - throw new IllegalStateException("ws connection handshake failed: " + hs); - } - this.socketHdl.onOpen(hs); + socketConn = new TkConnAdapter(socket, socketState, (msg) -> send(msg, outputStream)); + socketHdl.onOpen(socketConn, wsClientHandShakeResponseOf(inputStream)); return this; } catch (Exception e) { - throw new IllegalStateException("ws connection open failed", e); - } - } - - private void sendPong() throws IOException { - var pongFrame = new byte[2]; - pongFrame[0] = (byte) 0x8A; // 0x8A = FIN + opcode 0xA (PONG) - outputStream.write(pongFrame); - outputStream.flush(); - if (log.isTraceEnabled()) { - log.trace("< PONG"); - } - } - - public void send(String message) { - try { - var payload = message.getBytes(); - var payloadLength = payload.length; - var frame = new ByteArrayOutputStream(); - frame.write(0x81); // FIN + text frame opcode (0x1) - if (payloadLength <= 125) { - frame.write(payloadLength); - } else if (payloadLength <= 65535) { - frame.write(126); - frame.write((payloadLength >> 8) & 0xFF); // most significant byte - frame.write(payloadLength & 0xFF); // least significant byte - } else { - frame.write(127); // 8-byte payload length - // For large payloads (>65535 bytes), write the 8-byte length - // First four bytes should be zeros (per WebSocket protocol) - frame.write(0); frame.write(0); frame.write(0); frame.write(0); - // Write the last four bytes of the payload length - frame.write((payloadLength >> 24) & 0xFF); // most significant byte - frame.write((payloadLength >> 16) & 0xFF); - frame.write((payloadLength >> 8) & 0xFF); - frame.write(payloadLength & 0xFF); // least significant byte - } - frame.write(payload); - outputStream.write(frame.toByteArray()); - outputStream.flush(); - if (log.isDebugEnabled()) { - log.debug("< TXT: {} ({} bytes)", message, payload.length); - } - } catch (Exception e) { - throw new IllegalStateException(String.format("unable to send text: %s", message), e); + socketHdl.onError(socketConn, e); + doClose(this); + throw new IllegalStateException("ws connection failed", e); } } - public void listen(Supplier go) { - while (go.get() && !socket.isClosed()) { + public void listen() { + while (!socket.isClosed()) { try { - var frameHeader = new byte[2]; - read(inputStream, frameHeader); - var isFinalFragment = (frameHeader[0] & 0x80) != 0; // Check if FIN bit is set - var opcode = frameHeader[0] & 0x0F; - var payloadLength = payloadLengthOf(frameHeader, inputStream); - var payload = new byte[payloadLength]; - int bytesRead = 0; - while (bytesRead < payloadLength) { - int read = inputStream.read(payload, bytesRead, payloadLength - bytesRead); - if (read == -1) { - throw new IOException("unexpected end of stream"); - } - bytesRead += read; - } - accumulatedData.write(payload); - if (isFinalFragment) { - var completeMessage = accumulatedData.toByteArray(); - if (opcode == 0x1) { - var msg = new String(completeMessage); - if (log.isTraceEnabled()) { - log.trace("> TXT: {}", msg); - } - this.socketHdl.onMessage(msg); - } else if (opcode == 0x2) { - if (log.isTraceEnabled()) { - log.trace("> BIN ({})", completeMessage.length); - } - this.socketHdl.onMessage(wrap(completeMessage)); - } else if (opcode == 0xA) { - if (log.isTraceEnabled()) { - log.trace("> PONG"); - } - } else if (opcode == 0x9) { - if (log.isTraceEnabled()) { - log.trace("> PING"); - } - sendPong(); - } else if (opcode == 0x8) { - if (completeMessage.length >= 2) { - int closeCode = ((completeMessage[0] & 0xFF) << 8) | (completeMessage[1] & 0xFF); - if (log.isTraceEnabled()) { - log.trace("> CLOSE ({})", closeCode); - } - this.socketHdl.onClose(closeCode); - } else { - log.warn("Received close frame with no close code."); - } - break; - } - accumulatedData.reset(); + var stop = handleMessage(socketHdl, socketConn, inputStream, outputStream); + if (stop) { + break; } } catch (Exception e) { if (log.isDebugEnabled()) { log.debug("ws message processing error", e); } - this.socketHdl.onError(e); + socketHdl.onError(socketConn, e); break; } } - this.close(); + tearDown(socket, socketConn, socketHdl); } @Override public void accept(String s) { - this.send(s); + send(s, this.outputStream); } @Override public void close() { - try { - socket.close(); - } catch (IOException e) { - log.error("Unable to close ws socket: {} - {}", this, e.getMessage()); - } - if (log.isDebugEnabled()) { - log.debug("ws connection closed"); - } + tearDown(socket, socketConn, socketHdl); } public TkSocket withHandler(TkSocketHdl hdl) { @@ -180,10 +84,6 @@ public TkSocket withHandler(TkSocketHdl hdl) { return this; } - public Socket getSocket() { - return socket; - } - @Override public String toString() { return String.format("%s - %s", socket, endpoint); } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocketServer.java b/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocketServer.java new file mode 100644 index 0000000..ccdbbc2 --- /dev/null +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocketServer.java @@ -0,0 +1,82 @@ +package io.vacco.tokoeka; + +import io.vacco.tokoeka.spi.*; +import io.vacco.tokoeka.util.*; +import org.slf4j.*; +import java.io.*; +import java.net.*; +import java.util.concurrent.*; +import java.util.function.*; + +import static java.util.Objects.requireNonNull; +import static io.vacco.tokoeka.util.TkSockets.*; + +public class TkSocketServer implements Closeable { + + private static final Logger log = LoggerFactory.getLogger(TkSocketServer.class); + + private final int port; + private final TkSocketHdl socketHdl; + private final Supplier stateFn; + private final ExecutorService clientPool; + + private ServerSocket serverSocket; + + public TkSocketServer(int port, TkSocketHdl socketHdl, Supplier stateFn, ExecutorService clientPool) { + this.port = port; + this.stateFn = requireNonNull(stateFn); + this.socketHdl = requireNonNull(socketHdl); + this.clientPool = requireNonNull(clientPool); + } + + public TkSocketServer(int port, TkSocketHdl socketHdl, Supplier stateFn) { + this(port, socketHdl, stateFn, Executors.newCachedThreadPool()); + } + + public void start() { + try { + serverSocket = new ServerSocket(port); + log.info("WebSocket server started on port {}", port); + while (!serverSocket.isClosed()) { + var clientSocket = serverSocket.accept(); + clientPool.submit(() -> handleClient(clientSocket)); + } + } catch (IOException e) { + throw new IllegalStateException("Unable to start websocket server", e); + } + } + + private void handleClient(Socket clientSocket) { + log.debug("Client connection: {}", clientSocket); + TkConn conn = null; + try { + var inputStream = clientSocket.getInputStream(); + var outputStream = clientSocket.getOutputStream(); + var socketState = this.stateFn.get(); + var handShake = wsServerHandShakeOf(inputStream); + var handshakeResponse = performHandshake(handShake, outputStream); + conn = new TkConnAdapter(clientSocket, socketState, msg -> send(msg, outputStream)); + this.socketHdl.onOpen(conn, handshakeResponse); + while (!clientSocket.isClosed()) { + var stop = handleMessage(this.socketHdl, conn, inputStream, outputStream); + if (stop) { + break; + } + } + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("Client connection error - {}", clientSocket.getRemoteSocketAddress(), e); + } + } finally { + tearDown(clientSocket, conn, socketHdl); + } + } + + @Override public void close() { + clientPool.shutdown(); + if (serverSocket != null) { + doClose(serverSocket); + } + } + +} diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkAudioPlayer.java b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkAudioPlayer.java index aafc6c7..c7f5b3b 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkAudioPlayer.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkAudioPlayer.java @@ -70,4 +70,5 @@ public void close() { line.close(); } } + } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkSquelch.java b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkSquelch.java index 9335e88..1a86719 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkSquelch.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkSquelch.java @@ -43,6 +43,9 @@ public void processAudio(byte[] pcm) { private void updateNoiseFloor(double signalAvg) { int signalBin = (int) signalAvg; + if (signalBin == 0) { + signalBin = 1; + } noiseHistogram.put(signalBin, noiseHistogram.getOrDefault(signalBin, 0L) + 1); int mostFrequentBin = signalBin; long maxCount = 0; diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkAudioHdl.java b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkAudioHdl.java index 0d6b7b4..ed3af90 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkAudioHdl.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkAudioHdl.java @@ -2,7 +2,7 @@ import io.vacco.tokoeka.audio.TkAdpcm; import io.vacco.tokoeka.schema.TkConfig; -import io.vacco.tokoeka.spi.TkAudioPin; +import io.vacco.tokoeka.spi.*; import io.vacco.tokoeka.util.*; import org.slf4j.*; import java.nio.*; @@ -16,18 +16,18 @@ public class TkAudioHdl { private static final Logger log = LoggerFactory.getLogger(TkAudioHdl.class); - private final TkAdpcm adPcm; - private final TkConfig config; - private final TkAudioPin audioPin; - private final TkCounter counter; - private final Consumer tx; + private final TkAdpcm adPcm; + private final TkConfig config; + private final TkAudioPin audioPin; + private final TkTimer timer; + private final Consumer tx; public TkAudioHdl(TkConfig config, Consumer tx, TkAudioPin audioPin) { this.adPcm = new TkAdpcm(); this.config = Objects.requireNonNull(config); this.audioPin = Objects.requireNonNull(audioPin); this.tx = Objects.requireNonNull(tx); - this.counter = new TkCounter(32, () -> tx.accept(setKeepAlive())); + this.timer = new TkTimer<>(3000, (Nil) -> tx.accept(setKeepAlive())); } public void updateAudioParams() { @@ -65,7 +65,7 @@ public void onSampleRate(double sr) { tx.accept(setKeepAlive()); } - public void processAudio(ByteBuffer buffer) { + public void processAudio(TkConn conn, ByteBuffer buffer) { if (buffer.remaining() < 7) { log.warn("Received audio data is too short"); return; @@ -89,10 +89,10 @@ public void processAudio(ByteBuffer buffer) { } else { rawPcm = audio; } - audioPin.onAudio((int) config.sampleRate, flags, sequenceNumber, sMeter, rssi, imaPcm, rawPcm); + audioPin.onAudio(conn, (int) config.sampleRate, flags, sequenceNumber, sMeter, rssi, imaPcm, rawPcm); } - counter.update(); + timer.update(null); } } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java index 68b32bf..16e13ed 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java @@ -4,12 +4,13 @@ import io.vacco.tokoeka.schema.kiwi.TkKiwiConfig; import io.vacco.tokoeka.spi.*; import io.vacco.tokoeka.schema.TkConfig; +import io.vacco.tokoeka.util.*; import org.slf4j.*; import java.net.URLDecoder; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.function.Consumer; +import java.util.function.Function; import static io.vacco.tokoeka.schema.TkConstants.*; import static io.vacco.tokoeka.util.TkCommand.*; @@ -24,12 +25,13 @@ public class TkControlHdl implements TkSocketHdl { private final TkConfig config; - private Consumer tx; - private TkAudioHdl audioHdl; - private TkWaterfallHdl waterfallHdl; - private TkJsonIn jsonIn; - private TkConfigPin configPin; - protected TkControlPin controlPin; + private TkAudioHdl audioHdl; + private TkWaterfallHdl waterfallHdl; + private TkJsonIn jsonIn; + private TkConfigPin configPin; + protected TkSdrPin sdrPin; + + private Function controlFn; public TkKiwiConfig kiwiConfig; public TkDxConfig dxConfig, dxCommConfig; @@ -38,13 +40,13 @@ public TkControlHdl(TkConfig config) { this.config = requireNonNull(config); } - public void controlEvent(int wsCode, String key, String value, boolean remote, Exception e) { - if (this.controlPin != null) { - this.controlPin.onEvent(wsCode, key, value, remote, e); + public void sdrEvent(TkConn conn, String key, String value, Exception e, boolean ping, boolean pong) { + if (this.sdrPin != null) { + this.sdrPin.onEvent(conn, key, value, e, ping, pong); } } - private void processKeyValue(String key, String value) { + private void processKeyValue(TkConn conn, String key, String value) { switch (key) { case last_community_download: log.info(URLDecoder.decode(value, StandardCharsets.UTF_8)); break; case bandwidth: this.config.frequencyMax = parseDouble(value); break; @@ -66,25 +68,25 @@ private void processKeyValue(String key, String value) { case badp: case too_busy: case redirect: - case down: this.controlEvent(-1, key, value, true, null); break; + case down: this.sdrEvent(conn, key, value, null, false, false); break; default: if (log.isDebugEnabled()) { - log.debug("Unknown message key/value: {} -> {}", key, value == null ? "" : value.trim()); + log.debug("Unknown message key/value: {} -> {}", key, shorten(value)); } } } - private void processMsg(String body) { + private void processMsg(TkConn conn, String body) { var params = parseParameters(body); - if (log.isDebugEnabled()) { - log.debug(">> {} {} {}", MSG, body, params); + if (log.isTraceEnabled()) { + log.trace(">> {} {} {}", MSG, shorten(body), shorten(params.toString())); } - params.forEach(this::processKeyValue); + params.forEach((key, value) -> processKeyValue(conn, key, value)); } - private void processAudio(ByteBuffer data) { + private void processAudio(TkConn conn, ByteBuffer data) { if (this.audioHdl != null) { - this.audioHdl.processAudio(data); + this.audioHdl.processAudio(conn, data); } } @@ -94,15 +96,16 @@ private void processWaterfall(ByteBuffer data) { } } - @Override public void onOpen(String handShake) { - if (tx == null) { - throw new IllegalStateException("no tx sink, check handler configuration."); - } - tx.accept(setAuth(config.username, config.password)); - tx.accept(setIdentity(config.identUser)); + @Override public void onOpen(TkConn conn, String handShake) { + conn.accept(setAuth(config.username, config.password)); + conn.accept(setIdentity(config.identUser)); } - @Override public void onMessage(ByteBuffer data) { + @Override public void onMessage(TkConn conn, ByteBuffer data) { + if (this.controlFn != null && !this.controlFn.apply(conn)) { + conn.close(TkSockets.WsCloseGoAway); + return; + } if (data == null || data.remaining() < 3) { log.error("No data, or received data is too short to contain a valid tag"); return; @@ -118,25 +121,33 @@ private void processWaterfall(ByteBuffer data) { } switch (tag) { - case MSG: processMsg(asString(skip(data, 1))); break; - case SND: processAudio(data); break; + case MSG: processMsg(conn, asString(skip(data, 1))); break; + case SND: processAudio(conn, data); break; case WF: processWaterfall(skip(data, 1)); break; // case "EXT": processExt(asString(skip(data, 1))); break; TODO what should be implemented? default: log.warn("Unsupported message tag {} ({})", tag, data.remaining()); } } catch (Exception e) { - this.onError(e); + this.onError(conn, e); } } - @Override public void onMessage(String message) {} + @Override public void onMessage(TkConn conn, String message) {} + + @Override public void onClose(TkConn conn) { + this.sdrEvent(conn, null, null, null, false, false); + } + + @Override public void onError(TkConn conn, Exception e) { + this.sdrEvent(conn, null, null, e, false, false); + } - @Override public void onClose(int code) { - this.controlEvent(code, null, null, true, null); + @Override public void onPing(TkConn conn) { + this.sdrEvent(conn, null, null, null, true, false); } - @Override public void onError(Exception e) { - this.controlEvent(-1, null, null, false, e); + @Override public void onPong(TkConn conn) { + this.sdrEvent(conn, null, null, null, false, true); } public TkControlHdl withAudioHandler(TkAudioHdl hdl) { @@ -154,8 +165,8 @@ public TkControlHdl withJsonIn(TkJsonIn jsonIn) { return this; } - public TkControlHdl withControlPin(TkControlPin pin) { - this.controlPin = requireNonNull(pin); + public TkControlHdl withSdrPin(TkSdrPin pin) { + this.sdrPin = requireNonNull(pin); return this; } @@ -164,8 +175,8 @@ public TkControlHdl withConfigPin(TkConfigPin pin) { return this; } - public TkControlHdl withSink(Consumer tx) { - this.tx = requireNonNull(tx); + public TkControlHdl withControlFn(Function controlFn) { + this.controlFn = requireNonNull(controlFn); return this; } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkWaterfallHdl.java b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkWaterfallHdl.java index 73e16be..7347983 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkWaterfallHdl.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkWaterfallHdl.java @@ -1,7 +1,7 @@ package io.vacco.tokoeka.handler; import io.vacco.tokoeka.spi.TkWfPin; -import io.vacco.tokoeka.util.TkCounter; +import io.vacco.tokoeka.util.TkTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; @@ -16,13 +16,13 @@ public class TkWaterfallHdl { private static final Logger log = LoggerFactory.getLogger(TkWaterfallHdl.class); private final TkWfPin wfPin; - private final TkCounter counter; + private final TkTimer timer; private final Consumer tx; public TkWaterfallHdl(Consumer tx, TkWfPin wfPin) { this.tx = Objects.requireNonNull(tx); this.wfPin = Objects.requireNonNull(wfPin); - this.counter = new TkCounter(128, () -> tx.accept(setKeepAlive())); + this.timer = new TkTimer<>(3000, (Nil) -> tx.accept(setKeepAlive())); } public void processWaterfall(ByteBuffer data) { @@ -42,7 +42,7 @@ public void processWaterfall(ByteBuffer data) { wfPin.onWaterfallData(xBin, sequenceNumber, flags, wfData); } - counter.update(); + timer.update(null); } } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkAudioPin.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkAudioPin.java index f7b2a87..099fd96 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkAudioPin.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkAudioPin.java @@ -1,6 +1,8 @@ package io.vacco.tokoeka.spi; public interface TkAudioPin { - void onAudio(int sampleRate, int flags, int sequenceNumber, int sMeter, - double rssi, byte[] imaPcm, byte[] rawPcm); + void onAudio( + TkConn conn, int sampleRate, int flags, int sequenceNumber, + int sMeter, double rssi, byte[] imaPcm, byte[] rawPcm + ); } \ No newline at end of file diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkConn.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkConn.java new file mode 100644 index 0000000..3ee7a73 --- /dev/null +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkConn.java @@ -0,0 +1,21 @@ +package io.vacco.tokoeka.spi; + +import io.vacco.tokoeka.util.TkSocketState; +import java.net.Socket; +import java.util.function.Consumer; + +public interface TkConn extends Consumer { + + void setAttachment(Object attachment); + T getAttachment(); + + Socket getSocket(); + TkSocketState getState(); + + void sendPing(); + void sendPong(); + + void close(int code); + void close(int code, String msg); + +} diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java deleted file mode 100644 index 7bddda5..0000000 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java +++ /dev/null @@ -1,5 +0,0 @@ -package io.vacco.tokoeka.spi; - -public interface TkControlPin { - void onEvent(int code, String key, String value, boolean remote, Exception e); -} diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSdrPin.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSdrPin.java new file mode 100644 index 0000000..1231591 --- /dev/null +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSdrPin.java @@ -0,0 +1,8 @@ +package io.vacco.tokoeka.spi; + +public interface TkSdrPin { + void onEvent( + TkConn conn, String key, String value, + Exception e, boolean ping, boolean pong + ); +} diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSocketHdl.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSocketHdl.java index fb9a954..50046ab 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSocketHdl.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSocketHdl.java @@ -4,10 +4,12 @@ public interface TkSocketHdl { - void onOpen(String handShake); - void onMessage(String message); - void onMessage(ByteBuffer message); - void onClose(int code); - void onError(Exception e); + void onOpen(TkConn conn, String handShake); + void onClose(TkConn conn); + void onMessage(TkConn conn, String msg); + void onMessage(TkConn conn, ByteBuffer msg); + void onPing(TkConn conn); + void onPong(TkConn conn); + void onError(TkConn conn, Exception e); } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkConnAdapter.java b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkConnAdapter.java new file mode 100644 index 0000000..2983527 --- /dev/null +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkConnAdapter.java @@ -0,0 +1,70 @@ +package io.vacco.tokoeka.util; + +import io.vacco.tokoeka.spi.TkConn; +import java.io.IOException; +import java.net.Socket; +import java.util.Objects; +import java.util.function.Consumer; + +public class TkConnAdapter implements TkConn { + + private final Socket socket; + private final TkSocketState socketState; + private final Consumer tx; + + public TkConnAdapter(Socket socket, TkSocketState socketState, Consumer tx) { + this.socket = Objects.requireNonNull(socket); + this.socketState = Objects.requireNonNull(socketState); + this.tx = Objects.requireNonNull(tx); + } + + @Override public void setAttachment(Object attachment) { + socketState.attachment = Objects.requireNonNull(attachment); + } + + @SuppressWarnings("unchecked") + @Override public T getAttachment() { + return (T) socketState.attachment; + } + + @Override public void accept(String s) { + tx.accept(s); + } + + @Override public Socket getSocket() { + return socket; + } + + @Override public TkSocketState getState() { + return socketState; + } + + @Override public void close(int code) { + socketState.markClosed(code, null, false); + } + + @Override public void close(int code, String msg) { + socketState.markClosed(code, msg, false); + } + + @Override public void sendPing() { + try { + TkSockets.sendPing(socket.getOutputStream()); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + @Override public void sendPong() { + try { + TkSockets.sendPong(socket.getOutputStream()); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + @Override public String toString() { + return String.format("%s, %s", socket, socketState); + } + +} diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkCounter.java b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkCounter.java deleted file mode 100644 index 764282c..0000000 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkCounter.java +++ /dev/null @@ -1,29 +0,0 @@ -package io.vacco.tokoeka.util; - -import java.util.Objects; - -public class TkCounter { - - private final int max; - private final Runnable r; - private int k = 0; - - public TkCounter(int max, Runnable r) { - this.max = max; - this.r = Objects.requireNonNull(r); - } - - public void update() { - k++; - if (k == max) { - r.run(); - k = 0; - } - } - - public static boolean nowMsDiffLt(long t0, long diffMs) { - var tN = System.currentTimeMillis(); - return (tN - t0) < diffMs; - } - -} diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkFormat.java b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkFormat.java index a1e3596..5c08836 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkFormat.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkFormat.java @@ -12,6 +12,8 @@ public class TkFormat { + public static int DefaultMessageShorten = 64; + public static String asString(ByteBuffer bytes) { return StandardCharsets.UTF_8.decode(bytes).toString(); } @@ -50,4 +52,22 @@ public static TkDxConfig loadKiwiDxConfig(String data, TkJsonIn jsonIn) { return load(data, TkDxConfig.class, jsonIn); } + public static String shorten(String msg, int maxLength) { + if (msg == null) { + return "null"; + } + msg = msg.trim(); + if (msg.length() <= maxLength || maxLength == 1) { + return msg; + } + var ml2 = maxLength / 2; + var v0 = msg.substring(0, ml2); + var v1 = msg.substring(msg.length() - ml2); + return String.format("%s...%s", v0, v1); + } + + public static String shorten(String msg) { + return shorten(msg, DefaultMessageShorten); + } + } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSocketState.java b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSocketState.java new file mode 100644 index 0000000..cfde0e3 --- /dev/null +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSocketState.java @@ -0,0 +1,46 @@ +package io.vacco.tokoeka.util; + +import java.io.ByteArrayOutputStream; + +public class TkSocketState { + + public final ByteArrayOutputStream accumulatedData = new ByteArrayOutputStream(); + + public long lastPingMs = 0; + public long lastPongMs = 0; + public long keepAliveMs; + public int maxFrameBytes; + + public int closeCode = Integer.MIN_VALUE; + public String closeReason; + public boolean closeByRemote; + + public Object attachment; + + public void markClosed(int closeCode, String closeReason, boolean closeByRemote) { + this.closeCode = closeCode; + this.closeReason = closeReason; + this.closeByRemote = closeByRemote; + } + + public boolean isClosed() { + return closeCode != Integer.MIN_VALUE; + } + + public static TkSocketState of(long keepAliveMs, int maxFrameBytes) { + var ss = new TkSocketState(); + ss.maxFrameBytes = maxFrameBytes; + ss.keepAliveMs = keepAliveMs; + return ss; + } + + @Override public String toString() { + return String.format( + "piMs: %d, poMs: %d, cl: %d, clr: %s, clRm: %s", + lastPingMs, lastPongMs, + closeCode == Integer.MIN_VALUE ? -1 : closeCode, + closeReason, closeByRemote + ); + } + +} diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java index 4421b11..645ecd7 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java @@ -1,16 +1,29 @@ package io.vacco.tokoeka.util; +import io.vacco.tokoeka.spi.*; import org.slf4j.*; import javax.net.ssl.SSLSocketFactory; import java.io.*; import java.net.*; import java.nio.ByteBuffer; -import java.util.Base64; +import java.security.MessageDigest; +import java.util.*; import static java.nio.ByteBuffer.wrap; +import static java.lang.System.currentTimeMillis; +import static io.vacco.tokoeka.util.TkFormat.shorten; public class TkSockets { + public static final int WsCloseTooBig = 1009; + public static final int WsCloseGoAway = 1001; + + public static final String WsCloseTooBigRes = "msg too big"; + public static final String WsCloseGoAwayRes = "going away"; + + public static final int MaxWsHandshakeChars = 2048; + public static final String magicString = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + private static final Logger log = LoggerFactory.getLogger(TkSockets.class); public static Socket createSocket(String host, int port, boolean secure, int timeout) { @@ -32,18 +45,79 @@ public static Socket createSocket(String host, int port, boolean secure, int tim } } - public static String wsHandShakeOf(String endpoint) { + public static String wsHandShakeOf(String host, int port, String endpoint) { var wsKey = Base64.getEncoder().encodeToString(Double.toHexString(Math.random()).getBytes()); - return "GET " + endpoint + " HTTP/1.1\r\n" - // + "Host: " + host + "\r\n" TODO do I need this? + var req = "GET " + endpoint + " HTTP/1.1\r\n" + + "Host: " + String.format("%s:%d", host, port) + "\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Key: " + wsKey + "\r\n" + "Sec-WebSocket-Version: 13\r\n" + "\r\n"; + if (log.isTraceEnabled()) { + log.trace("ws request: {}", req); + } + return req; } - public static void read(InputStream is, byte[] buff) { + public static String wsClientHandShakeResponseOf(InputStream inputStream) throws IOException { + var reader = new BufferedReader(new InputStreamReader(inputStream)); + var bld = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) { + break; + } + bld.append(line).append('\n'); + } + var hs = bld.toString(); + if (!hs.contains("HTTP/1.1 101")) { + throw new IllegalStateException("ws connection handshake failed: " + hs); + } + return hs; + } + + public static String wsServerHandShakeOf(InputStream inputStream) throws IOException { + String line; + var reader = new BufferedReader(new InputStreamReader(inputStream)); + var request = new StringBuilder(); + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) { + break; + } + request.append(line).append("\n"); + if (request.length() >= MaxWsHandshakeChars) { + throw new IllegalStateException( + "Incoming connection handshake exceeds max length: " + MaxWsHandshakeChars + ); + } + } + return request.toString(); + } + + public static String performHandshake(String request, OutputStream outputStream) throws IOException { + if (request == null || !request.contains("Upgrade: websocket")) { + return null; + } + var key = request.lines() + .filter(line -> line.startsWith("Sec-WebSocket-Key:")) + .map(line -> line.substring(19).trim()) + .findFirst() + .orElse(null); + if (key == null) { + throw new IllegalStateException("Incoming request - missing handshake response: " + request); + } + var acceptKey = generateAcceptKey(key); + var response = "HTTP/1.1 101 Switching Protocols\r\n" + + "Upgrade: websocket\r\n" + + "Connection: Upgrade\r\n" + + "Sec-WebSocket-Accept: " + acceptKey + "\r\n\r\n"; + outputStream.write(response.getBytes()); + outputStream.flush(); + return response; + } + + public static void readBlocking(InputStream is, byte[] buff) { try { var bytes = is.read(buff); if (log.isTraceEnabled()) { @@ -57,16 +131,127 @@ public static void read(InputStream is, byte[] buff) { } } + public static String generateAcceptKey(String key) { + try { + var sha1 = MessageDigest.getInstance("SHA-1"); + var hash = sha1.digest((key + magicString).getBytes()); + return Base64.getEncoder().encodeToString(hash); + } catch (Exception e) { + throw new IllegalStateException("Error generating accept key", e); + } + } + + public static void sendPing(OutputStream outputStream) throws IOException { + byte[] pingFrame = new byte[2]; + pingFrame[0] = (byte) 0x89; // FIN + opcode 0x9 (PING) + pingFrame[1] = 0x00; // No payload + outputStream.write(pingFrame); + outputStream.flush(); + log.debug("> PING"); + } + + public static void sendPong(OutputStream outputStream) throws IOException { + byte[] pongFrame = new byte[2]; + pongFrame[0] = (byte) 0x8A; // FIN + opcode 0xA (PONG) + pongFrame[1] = 0x00; // No payload + outputStream.write(pongFrame); + outputStream.flush(); + log.debug("> PONG"); + } + + public static void sendClose(OutputStream outputStream, int closeCode, String closeReason) { + try { + var frame = new ByteArrayOutputStream(); // Close frame header: FIN + opcode 0x8 (CLOSE) + frame.write(0x88); + + var reasonBytes = closeReason != null ? closeReason.getBytes() : new byte[0]; + int payloadLength = 2 + reasonBytes.length; + if (payloadLength <= 125) { + frame.write(payloadLength); + } else { + throw new IllegalArgumentException("Close reason too long"); + } + + frame.write((closeCode >> 8) & 0xFF); // most significant byte + frame.write(closeCode & 0xFF); // least significant byte + if (reasonBytes.length > 0) { + frame.write(reasonBytes); + } + + outputStream.write(frame.toByteArray()); + outputStream.flush(); + if (log.isTraceEnabled()) { + log.trace("> CLOSE: code={}, reason='{}'", closeCode, closeReason); + } + } catch (Exception e) { + log.warn("unable to send close frame - [{}, {}]", closeCode, closeReason, e); + } + } + + public static void send(String message, OutputStream outputStream) { + try { + var payload = message.getBytes(); + var payloadLength = payload.length; + var frame = new ByteArrayOutputStream(); + frame.write(0x81); // FIN + text frame opcode (0x1) + if (payloadLength <= 125) { + frame.write(payloadLength); + } else if (payloadLength <= 65535) { + frame.write(126); + frame.write((payloadLength >> 8) & 0xFF); // most significant byte + frame.write(payloadLength & 0xFF); // least significant byte + } else { + frame.write(127); // 8-byte payload length + // For large payloads (>65535 bytes), write the 8-byte length + // First four bytes should be zeros (per WebSocket protocol) + frame.write(0); frame.write(0); frame.write(0); frame.write(0); + // Write the last four bytes of the payload length + frame.write((payloadLength >> 24) & 0xFF); // most significant byte + frame.write((payloadLength >> 16) & 0xFF); + frame.write((payloadLength >> 8) & 0xFF); + frame.write(payloadLength & 0xFF); // least significant byte + } + frame.write(payload); + outputStream.write(frame.toByteArray()); + outputStream.flush(); + if (log.isTraceEnabled()) { + log.trace("> TXT: {} ({} bytes)", message, payload.length); + } + } catch (Exception e) { + throw new IllegalStateException(String.format("unable to send text: %s", shorten(message)), e); + } + } + + public static void tearDown(Socket socket, TkConn socketConn, TkSocketHdl socketHdl) { + try { + if (socketConn != null) { + var state = socketConn.getState(); + if (state.isClosed()) { + if (!state.closeByRemote && !socket.isClosed()) { + sendClose(socket.getOutputStream(), state.closeCode, state.closeReason); + } + } + socketHdl.onClose(socketConn); + } + doClose(socket); + if (log.isDebugEnabled()) { + log.debug("ws connection closed - {}, {}", socket, socketConn != null ? socketConn.getState() : "?"); + } + } catch (Exception e) { + log.debug("ws connection close error", e); + } + } + public static int payloadLengthOf(byte[] frameHeader, InputStream is) { int payloadLength = frameHeader[1] & 0x7F; if (payloadLength == 126) { var extendedPayloadLength = new byte[2]; - read(is, extendedPayloadLength); + readBlocking(is, extendedPayloadLength); return wrap(extendedPayloadLength).getShort() & 0xFFFF; } else if (payloadLength == 127) { var extendedPayloadLength = new byte[8]; - read(is, extendedPayloadLength); + readBlocking(is, extendedPayloadLength); var longPayloadLength = ByteBuffer.wrap(extendedPayloadLength).getLong(); if (longPayloadLength > Integer.MAX_VALUE) { throw new IllegalStateException("payload too large to handle"); @@ -76,4 +261,111 @@ else if (payloadLength == 127) { return payloadLength; } + public static boolean handleMessage(TkSocketHdl socketHdl, TkConn conn, + InputStream inputStream, OutputStream outputStream) throws IOException, InterruptedException { + var socketState = conn.getState(); + if (socketState.isClosed()) { + return true; + } + if (socketState.keepAliveMs > 0) { + var nowMs = currentTimeMillis(); + var noData = inputStream.available() == 0; + var pingDiff = nowMs - socketState.lastPingMs; + var doPing = pingDiff >= (socketState.keepAliveMs * 0.75); + if (doPing || noData) { + sendPing(outputStream); + socketState.lastPingMs = nowMs; + Thread.sleep( + pingDiff == nowMs + ? socketState.keepAliveMs / 8 + : socketState.keepAliveMs / 2 // TODO uggghh... move this to a thread or something. + ); + return false; + } + var pongDiff = nowMs - socketState.lastPongMs; + var pongExp = pongDiff != nowMs && pongDiff >= socketState.keepAliveMs; // missing subsequent client pong + if (pongExp) { + log.warn("Ping/Pong keep-alive expired {}, pingDiff: {}, pongDiff: {}", conn.getSocket().getRemoteSocketAddress(), pingDiff, pongDiff); + socketState.markClosed(WsCloseGoAway, WsCloseGoAwayRes, false); + return true; + } + } + + var frameHeader = new byte[2]; + readBlocking(inputStream, frameHeader); + var isFinalFragment = (frameHeader[0] & 0x80) != 0; // Check if FIN bit is set + var opcode = frameHeader[0] & 0x0F; + var payloadLength = payloadLengthOf(frameHeader, inputStream); + + if (payloadLength > socketState.maxFrameBytes) { + log.warn("Frame exceeds maximum allowed size: [{}], closing", payloadLength); + socketState.markClosed(WsCloseTooBig, WsCloseTooBigRes, false); + return true; + } + + var payload = new byte[payloadLength]; + int bytesRead = 0; + while (bytesRead < payloadLength) { + int read = inputStream.read(payload, bytesRead, payloadLength - bytesRead); + if (read == -1) { + throw new IOException("unexpected end of stream"); + } + bytesRead += read; + } + + socketState.accumulatedData.write(payload); + + if (isFinalFragment) { + var completeMessage = socketState.accumulatedData.toByteArray(); + if (opcode == 0x1) { + var msg = new String(completeMessage); + if (log.isTraceEnabled()) { + log.trace("< TXT: {}", shorten(msg)); + } + socketHdl.onMessage(conn, msg); + } else if (opcode == 0x2) { + if (log.isTraceEnabled()) { + log.trace("< BIN ({})", completeMessage.length); + } + socketHdl.onMessage(conn, wrap(completeMessage)); + } else if (opcode == 0xA) { + log.debug("< PONG"); + socketState.lastPongMs = currentTimeMillis(); + socketHdl.onPong(conn); + } else if (opcode == 0x9) { + log.debug("< PING"); + sendPong(outputStream); + socketHdl.onPing(conn); + } else if (opcode == 0x8) { + if (completeMessage.length >= 2) { + int closeCode = ((completeMessage[0] & 0xFF) << 8) | (completeMessage[1] & 0xFF); + if (log.isTraceEnabled()) { + log.trace("< CLOSE ({})", closeCode); + } + socketState.markClosed(closeCode, "TODO implement me", true); // TODO implement reading close reason. + } else { + log.trace("< CLOSE (?)"); + socketState.markClosed(-1, "remote provided no close code/reason", true); + } + return true; + } + socketState.accumulatedData.reset(); + } + return false; + } + + public static void doClose(Closeable c) { + try { + if (c != null) { + c.close(); + } + } catch (Exception e) { + if (log.isWarnEnabled()) { + log.warn("Error closing {} - {}", c, e.getMessage()); + } else if (log.isDebugEnabled()) { + log.debug("Error closing {}", c); + } + } + } + } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkTimer.java b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkTimer.java new file mode 100644 index 0000000..8d84c97 --- /dev/null +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkTimer.java @@ -0,0 +1,31 @@ +package io.vacco.tokoeka.util; + +import java.util.Objects; +import java.util.function.Consumer; + +public class TkTimer { + + private final long intervalMs; + private final Consumer runCons; + private long lastRunMs; + + public TkTimer(long intervalMs, Consumer runCons) { + this.intervalMs = intervalMs; + this.runCons = Objects.requireNonNull(runCons); + this.lastRunMs = System.currentTimeMillis(); + } + + public void update(T arg) { + long currentTime = System.currentTimeMillis(); + if ((currentTime - lastRunMs) >= intervalMs) { + runCons.accept(arg); + lastRunMs = currentTime; + } + } + + public static boolean nowMsDiffLt(long t0, long diffMs) { + var tN = System.currentTimeMillis(); + return (tN - t0) < diffMs; + } + +} diff --git a/tk-sdr/src/test/java/TkPairTest.java b/tk-sdr/src/test/java/TkPairTest.java deleted file mode 100644 index 9f4a2be..0000000 --- a/tk-sdr/src/test/java/TkPairTest.java +++ /dev/null @@ -1,28 +0,0 @@ -import j8spec.annotation.DefinedOrder; -import j8spec.junit.J8SpecRunner; -import org.junit.runner.RunWith; - -import static io.vacco.tokoeka.util.TkPair.*; -import static io.vacco.tokoeka.util.TkCommand.*; -import static j8spec.J8Spec.*; -import static org.junit.Assert.*; - -@DefinedOrder -@RunWith(J8SpecRunner.class) -public class TkPairTest { - static { - describe("Websocket commands", () -> { - it("Generates command pairs", () -> { - var cmd = set( - ks("single", null), - ks("hello", "world"), - kb("boolFlag1", true), - kb("boolFlag0", false), - ki("intArg", 999), - kd3("dblArg", 1.2345) - ); - assertEquals("SET single hello=world boolFlag1=1 boolFlag0=0 intArg=999 dblArg=1.235", cmd); - }); - }); - } -} diff --git a/tk-sdr/src/test/java/TkSocketTest.java b/tk-sdr/src/test/java/TkSocketTest.java deleted file mode 100644 index 46e699f..0000000 --- a/tk-sdr/src/test/java/TkSocketTest.java +++ /dev/null @@ -1,93 +0,0 @@ -import io.vacco.shax.logging.ShOption; -import io.vacco.tokoeka.TkSocket; -import io.vacco.tokoeka.audio.*; -import io.vacco.tokoeka.handler.TkAudioHdl; -import io.vacco.tokoeka.handler.TkControlHdl; -import io.vacco.tokoeka.schema.*; -import j8spec.annotation.DefinedOrder; -import j8spec.junit.J8SpecRunner; -import org.junit.runner.RunWith; -import org.slf4j.*; - -import java.awt.*; - -import static j8spec.J8Spec.*; -import static io.vacco.tokoeka.util.TkCounter.nowMsDiffLt; - -@DefinedOrder -@RunWith(J8SpecRunner.class) -public class TkSocketTest { - - static { - ShOption.setSysProp(ShOption.IO_VACCO_SHAX_DEVMODE, "true"); - ShOption.setSysProp(ShOption.IO_VACCO_SHAX_LOGLEVEL, "debug"); - } - - private static final Logger log = LoggerFactory.getLogger(TkSocketTest.class); - - static { - it("Can communicate with a KiwiSDR server", () -> { - if (!GraphicsEnvironment.isHeadless()) { - // sdr.vebik.cz:8073 - 639kHz - // 85.191.35.22:8073 - // sdr.hfunderground.com:8074 - 880kHz good quality! - // 80m.live:8078 - // hb9bxewebsdr.ddns.net:8073 - // bclinfo.ddns.net:8073 - 13750kHz, 594kHz, 1600kHz - - var sock = new TkSocket("sdr.hfunderground.com", 8074, "/12287283/SND", false, 3000); - var cfg = new TkConfig(); - var sqParams = TkSquelchParams.of(2500, 4.0); - var squelch = new TkSquelch(sqParams) - .withPin((open, pcm, signalAvg, signalThr) -> log.info(">>>> Squelch [open: {}, avg: {}, thr: {}]", open, signalAvg, signalThr)); - var player = new TkAudioPlayer(16, 1); - - var go = new boolean[] { true }; - var nowMs = System.currentTimeMillis(); - - var ctlHdl = new TkControlHdl(cfg) - .withSink(sock) - .withAudioHandler(new TkAudioHdl(cfg, sock, (sampleRate, flags, sequenceNumber, sMeter, rssi, imaPcm, rawPcm) -> { - log.info("flags: {} seqNo: {} sMeter: {} rssi: {} raw: {}", flags, sequenceNumber, sMeter, String.format("%6.2f", rssi), rawPcm.length); - squelch.processAudio(rawPcm); - player.play(sampleRate, rawPcm); - })) - .withControlPin((code, key, value, remote, e) -> { - log.info("control event: {} [{}] [{}] {}", code, key, value, remote, e); - var isError = code > 1000 - || (value != null && value.equals("Operation timed out")) - || (code == -1 && key == null && value == null && !remote); - if (isError) { - go[0] = false; - } - }); - - sock.withHandler(ctlHdl); - - cfg.username = "kiwi"; - cfg.identUser = "tokoeka"; - cfg.modulation = TkModulation.usb; - cfg.frequencyKHz = 13354; - cfg.compression = true; - - cfg.agcOn = false; - cfg.agcHang = false; - cfg.agcThresh = -100; - cfg.agcSlope = 6; - cfg.agcDecay = 1000; - cfg.agcGain = 70; - - cfg.nrAlgoId = 3; - cfg.nrSpecAlpha = 0.95; - cfg.nrSpecGain = 1; - cfg.nrSpecActiveSnr = 1000; - - sock.connect().listen(() -> go[0] && nowMsDiffLt(nowMs, 120_000)); - sock.close(); - player.close(); - } else { - log.info("CI/CD build. Stopping."); - } - }); - } -} diff --git a/tk-sdr/src/test/java/TkControlHdlTest.java b/tk-sdr/src/test/java/io/vacco/tokoeka/TkControlHdlTest.java similarity index 58% rename from tk-sdr/src/test/java/TkControlHdlTest.java rename to tk-sdr/src/test/java/io/vacco/tokoeka/TkControlHdlTest.java index 26e335a..51f3ac6 100644 --- a/tk-sdr/src/test/java/TkControlHdlTest.java +++ b/tk-sdr/src/test/java/io/vacco/tokoeka/TkControlHdlTest.java @@ -1,30 +1,32 @@ +package io.vacco.tokoeka; + import com.google.gson.Gson; -import io.vacco.shax.logging.ShOption; import io.vacco.tokoeka.handler.*; import io.vacco.tokoeka.schema.*; +import io.vacco.tokoeka.spi.TkConn; +import io.vacco.tokoeka.util.TkSocketState; import j8spec.annotation.DefinedOrder; import j8spec.junit.J8SpecRunner; import org.junit.runner.RunWith; import org.slf4j.*; - import java.io.InputStreamReader; +import java.net.Socket; import java.nio.ByteBuffer; import java.util.*; -import java.util.function.Consumer; import static j8spec.J8Spec.*; +import static io.vacco.tokoeka.TkLogging.initLog; +import static io.vacco.tokoeka.util.TkFormat.shorten; @DefinedOrder @RunWith(J8SpecRunner.class) public class TkControlHdlTest { - private static final Gson g = new Gson(); - static { - ShOption.setSysProp(ShOption.IO_VACCO_SHAX_DEVMODE, "true"); - ShOption.setSysProp(ShOption.IO_VACCO_SHAX_LOGLEVEL, "debug"); + initLog(); } + private static final Gson g = new Gson(); private static final Logger log = LoggerFactory.getLogger(TkControlHdlTest.class); static { @@ -37,19 +39,33 @@ public class TkControlHdlTest { var cfg = new TkConfig(); cfg.modulation = TkModulation.am; - var send = (Consumer) log::info; - var ctlHdl = new TkControlHdl(cfg).withSink(send) - .withAudioHandler(new TkAudioHdl(cfg, send, (cfg0, flags, sequenceNumber, sMeter, rssi, imaPcm, rawPcm) -> { + var state = TkSocketState.of(-1, 65536); + var conn = new TkConn() { + @Override public void accept(String s) { log.info(s); } + @Override public void setAttachment(Object attachment) {} + @Override public T getAttachment() { return null; } + @Override public Socket getSocket() { return null; } + @Override public TkSocketState getState() { return state; } + @Override public void sendPing() { log.info("I send the pingz"); } + @Override public void sendPong() { log.info("I send the pongz"); } + @Override public void close(int code) { log.info("close - [{}]", code); } + @Override public void close(int code, String msg) { + log.info("close - [{}, {}]", code, msg); + } + }; + + var ctlHdl = new TkControlHdl(cfg) + .withAudioHandler(new TkAudioHdl(cfg, conn, (conn0, sampleRate, flags, sequenceNumber, sMeter, rssi, imaPcm, rawPcm) -> { log.info("flags: {} seqNo: {} sMeter: {} rssi: {} raw: {}", flags, sequenceNumber, sMeter, rssi, rawPcm.length); })) - .withWaterfallHandler(new TkWaterfallHdl(send, (xBin, sequenceNumber, flags, rawWfData) -> { + .withWaterfallHandler(new TkWaterfallHdl(conn, (xBin, sequenceNumber, flags, rawWfData) -> { log.info("bin: {} seqNo: {} flags: {} wfData: {}", xBin, sequenceNumber, flags, rawWfData.length); })) .withJsonIn(g::fromJson) .withConfigPin(((kiwiConfig, dxConfig, dxCommConfig) -> { for (var o : new Object[]{kiwiConfig, dxConfig, dxCommConfig}) { if (o != null) { - log.info(g.toJson(o)); + log.info(shorten(g.toJson(o))); } } })); @@ -57,15 +73,16 @@ public class TkControlHdlTest { for (var msg : sndMessages) { if ("receive".equals(msg.type)) { var bytes = ByteBuffer.wrap(Base64.getDecoder().decode(msg.data)); - ctlHdl.onMessage(bytes); + ctlHdl.onMessage(conn, bytes); } else { - log.info("Ref command ==> {}", msg.data); + log.info("Ref command ==> {}", shorten(msg.data)); } } + for (var msg : wfMessages) { if ("receive".equals(msg.type)) { var bytes = ByteBuffer.wrap(Base64.getDecoder().decode(msg.data)); - ctlHdl.onMessage(bytes); + ctlHdl.onMessage(conn, bytes); } } diff --git a/tk-sdr/src/test/java/io/vacco/tokoeka/TkLogging.java b/tk-sdr/src/test/java/io/vacco/tokoeka/TkLogging.java new file mode 100644 index 0000000..2314cb8 --- /dev/null +++ b/tk-sdr/src/test/java/io/vacco/tokoeka/TkLogging.java @@ -0,0 +1,26 @@ +package io.vacco.tokoeka; + +import io.vacco.shax.logging.ShOption; +import j8spec.UnsafeBlock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.awt.*; + +public class TkLogging { + + private static Logger log; + + public static void initLog() { + ShOption.setSysProp(ShOption.IO_VACCO_SHAX_DEVMODE, "true"); + ShOption.setSysProp(ShOption.IO_VACCO_SHAX_LOGLEVEL, "trace"); + log = LoggerFactory.getLogger(TkLogging.class); + } + + public static UnsafeBlock localTest(UnsafeBlock b) { + if (!GraphicsEnvironment.isHeadless()) { + return b; + } + return () -> log.info("CI/CD build. Nothing to do."); + } + +} diff --git a/tk-sdr/src/test/java/TkNormalizeTest.java b/tk-sdr/src/test/java/io/vacco/tokoeka/TkNormalizeTest.java similarity index 97% rename from tk-sdr/src/test/java/TkNormalizeTest.java rename to tk-sdr/src/test/java/io/vacco/tokoeka/TkNormalizeTest.java index fa6ed1d..7b47d3f 100644 --- a/tk-sdr/src/test/java/TkNormalizeTest.java +++ b/tk-sdr/src/test/java/io/vacco/tokoeka/TkNormalizeTest.java @@ -1,9 +1,11 @@ +package io.vacco.tokoeka; + import io.vacco.tokoeka.audio.TkNormalize; import j8spec.annotation.DefinedOrder; import j8spec.junit.J8SpecRunner; import org.junit.runner.RunWith; -import static io.vacco.tokoeka.util.TkCounter.nowMsDiffLt; +import static io.vacco.tokoeka.util.TkTimer.nowMsDiffLt; import static j8spec.J8Spec.*; import static org.junit.Assert.*; diff --git a/tk-sdr/src/test/java/io/vacco/tokoeka/TkPairTest.java b/tk-sdr/src/test/java/io/vacco/tokoeka/TkPairTest.java new file mode 100644 index 0000000..8b0bc76 --- /dev/null +++ b/tk-sdr/src/test/java/io/vacco/tokoeka/TkPairTest.java @@ -0,0 +1,43 @@ +package io.vacco.tokoeka; + +import io.vacco.tokoeka.util.TkFormat; +import j8spec.annotation.DefinedOrder; +import j8spec.junit.J8SpecRunner; +import org.junit.runner.RunWith; + +import static io.vacco.tokoeka.util.TkPair.*; +import static io.vacco.tokoeka.util.TkCommand.*; +import static j8spec.J8Spec.*; +import static org.junit.Assert.*; + +@DefinedOrder +@RunWith(J8SpecRunner.class) +public class TkPairTest { + static { + it("Generates command pairs", () -> { + var cmd = set( + ks("single", null), + ks("hello", "world"), + kb("boolFlag1", true), + kb("boolFlag0", false), + ki("intArg", 999), + kd3("dblArg", 1.2345) + ); + assertEquals("SET single hello=world boolFlag1=1 boolFlag0=0 intArg=999 dblArg=1.235", cmd); + }); + it("Formats redacted messages", () -> { + assertEquals("null", TkFormat.shorten(null, 0)); + assertEquals("1", TkFormat.shorten("1", 2)); + assertEquals("1", TkFormat.shorten("1", 1)); + assertEquals("1", TkFormat.shorten("1", 2)); + assertEquals("1...1", TkFormat.shorten("111", 2)); + assertEquals("1...1", TkFormat.shorten("1111", 2)); + assertEquals("1...1", TkFormat.shorten("11111", 2)); + assertEquals("111", TkFormat.shorten("111", 3)); + assertEquals("1...1", TkFormat.shorten("1111", 3)); + assertEquals("1...1", TkFormat.shorten("11111", 3)); + assertEquals("01...34", TkFormat.shorten("01234", 4)); + assertEquals("0123...CDEF", TkFormat.shorten("0123456789ABCDEF", 8)); + }); + } +} diff --git a/tk-sdr/src/test/java/io/vacco/tokoeka/TkSocketTest.java b/tk-sdr/src/test/java/io/vacco/tokoeka/TkSocketTest.java new file mode 100644 index 0000000..30b478e --- /dev/null +++ b/tk-sdr/src/test/java/io/vacco/tokoeka/TkSocketTest.java @@ -0,0 +1,83 @@ +package io.vacco.tokoeka; + +import io.vacco.tokoeka.audio.*; +import io.vacco.tokoeka.handler.*; +import io.vacco.tokoeka.schema.*; +import io.vacco.tokoeka.util.TkSocketState; +import j8spec.annotation.DefinedOrder; +import j8spec.junit.J8SpecRunner; +import org.junit.runner.RunWith; +import org.slf4j.*; + +import static j8spec.J8Spec.*; +import static io.vacco.tokoeka.util.TkTimer.nowMsDiffLt; +import static io.vacco.tokoeka.TkLogging.*; + +@DefinedOrder +@RunWith(J8SpecRunner.class) +public class TkSocketTest { + + static { initLog(); } + + private static final Logger log = LoggerFactory.getLogger(TkSocketTest.class); + + static { + it("Can communicate with a KiwiSDR server", localTest(() -> { + // sdr.vebik.cz:8073 - 639kHz + // 85.191.35.22:8073 + // sdr.hfunderground.com:8074 - 880kHz good quality! + // 80m.live:8078 + // hb9bxewebsdr.ddns.net:8073 + // bclinfo.ddns.net:8073 - 13750kHz, 594kHz, 1600kHz + + var state = TkSocketState.of(-1, 65536); + var sock = new TkSocket("sdr.hfunderground.com", 8074, "/12287283/SND", false, 3000, state); + var cfg = new TkConfig(); + var sqParams = TkSquelchParams.of(2500, 4.0); + var squelch = new TkSquelch(sqParams) + .withPin((open, pcm, signalAvg, signalThr) -> log.info(">>>> Squelch [open: {}, avg: {}, thr: {}]", open, signalAvg, signalThr)); + var player = new TkAudioPlayer(16, 1); + + var go = new boolean[] { true }; + var nowMs = System.currentTimeMillis(); + + var ctlHdl = new TkControlHdl(cfg) + .withAudioHandler(new TkAudioHdl(cfg, sock, (conn, sampleRate, flags, sequenceNumber, sMeter, rssi, imaPcm, rawPcm) -> { + log.info("flags: {} seqNo: {} sMeter: {} rssi: {} raw: {}", flags, sequenceNumber, sMeter, String.format("%6.2f", rssi), rawPcm.length); + squelch.processAudio(rawPcm); + player.play(sampleRate, rawPcm); + })) + .withSdrPin(((conn, key, value, e, ping, pong) -> { + log.info("sdr event: {} [{}, {}, ping: {}, pong: {}]", conn, key, value, ping, pong, e); + var state0 = conn.getState(); + var code = state0.closeCode; + var isError = (code > 1000) || (value != null && value.equals("Operation timed out")); + go[0] = !isError; + })) + .withControlFn(conn -> go[0] && nowMsDiffLt(nowMs, 120_000)); + + sock.withHandler(ctlHdl); + + cfg.username = "kiwi"; + cfg.identUser = "tokoeka"; + cfg.modulation = TkModulation.usb; + cfg.frequencyKHz = 13354; + cfg.compression = true; + + cfg.agcOn = false; + cfg.agcHang = false; + cfg.agcThresh = -100; + cfg.agcSlope = 6; + cfg.agcDecay = 1000; + cfg.agcGain = 70; + + cfg.nrAlgoId = 3; + cfg.nrSpecAlpha = 0.95; + cfg.nrSpecGain = 1; + cfg.nrSpecActiveSnr = 1000; + + sock.connect().listen(); + player.close(); + })); + } +} diff --git a/tk-sdr/src/test/java/WsMsg.java b/tk-sdr/src/test/java/io/vacco/tokoeka/WsMsg.java similarity index 89% rename from tk-sdr/src/test/java/WsMsg.java rename to tk-sdr/src/test/java/io/vacco/tokoeka/WsMsg.java index 5e5d13a..97659d3 100644 --- a/tk-sdr/src/test/java/WsMsg.java +++ b/tk-sdr/src/test/java/io/vacco/tokoeka/WsMsg.java @@ -1,3 +1,5 @@ +package io.vacco.tokoeka; + public class WsMsg { public String type; @@ -8,4 +10,5 @@ public class WsMsg { @Override public String toString() { return String.format("%.4f %d %s (%d)", time, opcode, type, data.length()); } + }