From 51900c00bbbee76780698bf84266bd031a826b28 Mon Sep 17 00:00:00 2001 From: Jesus Zazueta Date: Sun, 13 Oct 2024 13:06:19 -0400 Subject: [PATCH] - WIP --- .../main/java/io/vacco/tokoeka/TkSocket.java | 4 +-- .../java/io/vacco/tokoeka/TkSocketServer.java | 4 +-- .../vacco/tokoeka/handler/TkControlHdl.java | 36 +++++++++---------- .../java/io/vacco/tokoeka/spi/TkConn.java | 6 ++++ .../io/vacco/tokoeka/spi/TkControlPin.java | 5 --- .../java/io/vacco/tokoeka/spi/TkSdrPin.java | 5 +++ .../io/vacco/tokoeka/spi/TkSocketHdl.java | 3 +- .../io/vacco/tokoeka/util/TkConnAdapter.java | 7 ++-- .../io/vacco/tokoeka/util/TkSocketState.java | 19 +++++----- .../java/io/vacco/tokoeka/util/TkSockets.java | 15 ++++---- .../io/vacco/tokoeka/TkControlHdlTest.java | 8 +++-- .../java/io/vacco/tokoeka/TkSocketTest.java | 16 ++++----- 12 files changed, 69 insertions(+), 59 deletions(-) delete mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java create mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSdrPin.java diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java b/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java index 19d5efc..b066491 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java @@ -56,9 +56,9 @@ public TkSocket connect() { public void listen() { while (!socket.isClosed()) { try { - var stop = handleMessage(socketHdl, socketState, socketConn, inputStream, outputStream); + var stop = handleMessage(socketHdl, socketConn, inputStream, outputStream); if (stop) { - tearDown(outputStream, socketConn, socketState, socketHdl); + tearDown(outputStream, socketConn, socketHdl); break; } } catch (Exception e) { diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocketServer.java b/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocketServer.java index 75d4838..fe3450c 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocketServer.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocketServer.java @@ -57,9 +57,9 @@ private void handleClient(Socket clientSocket) { var socketConn = new TkConnAdapter(clientSocket, socketState, msg -> send(msg, outputStream)); this.socketHdl.onOpen(socketConn, handshakeResponse); while (!clientSocket.isClosed()) { - var stop = handleMessage(this.socketHdl, socketState, socketConn, inputStream, outputStream); + var stop = handleMessage(this.socketHdl, socketConn, inputStream, outputStream); if (stop) { - tearDown(outputStream, socketConn, socketState, socketHdl); + tearDown(outputStream, socketConn, socketHdl); break; } } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java index 042ee10..f96bd31 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java @@ -25,11 +25,11 @@ public class TkControlHdl implements TkSocketHdl { private final TkConfig config; - private TkAudioHdl audioHdl; - private TkWaterfallHdl waterfallHdl; - private TkJsonIn jsonIn; - private TkConfigPin configPin; - protected TkControlPin controlPin; + private TkAudioHdl audioHdl; + private TkWaterfallHdl waterfallHdl; + private TkJsonIn jsonIn; + private TkConfigPin configPin; + protected TkSdrPin sdrPin; private Function controlFn; @@ -40,13 +40,13 @@ public TkControlHdl(TkConfig config) { this.config = requireNonNull(config); } - public void controlEvent(int wsCode, String key, String value, boolean remote, Exception e) { - if (this.controlPin != null) { - this.controlPin.onEvent(wsCode, key, value, remote, e); + public void sdrEvent(TkConn conn, String key, String value, Exception e) { + if (this.sdrPin != null) { + this.sdrPin.onEvent(conn, key, value, e); } } - private void processKeyValue(String key, String value) { + private void processKeyValue(TkConn conn, String key, String value) { switch (key) { case last_community_download: log.info(URLDecoder.decode(value, StandardCharsets.UTF_8)); break; case bandwidth: this.config.frequencyMax = parseDouble(value); break; @@ -68,7 +68,7 @@ private void processKeyValue(String key, String value) { case badp: case too_busy: case redirect: - case down: this.controlEvent(-1, key, value, true, null); break; + case down: this.sdrEvent(conn, key, value, null); break; default: if (log.isDebugEnabled()) { log.debug("Unknown message key/value: {} -> {}", key, shorten(value)); @@ -76,12 +76,12 @@ private void processKeyValue(String key, String value) { } } - private void processMsg(String body) { + private void processMsg(TkConn conn, String body) { var params = parseParameters(body); if (log.isTraceEnabled()) { log.trace(">> {} {} {}", MSG, shorten(body), shorten(params.toString())); } - params.forEach(this::processKeyValue); + params.forEach((key, value) -> processKeyValue(conn, key, value)); } private void processAudio(ByteBuffer data) { @@ -121,7 +121,7 @@ private void processWaterfall(ByteBuffer data) { } switch (tag) { - case MSG: processMsg(asString(skip(data, 1))); break; + case MSG: processMsg(conn, asString(skip(data, 1))); break; case SND: processAudio(data); break; case WF: processWaterfall(skip(data, 1)); break; // case "EXT": processExt(asString(skip(data, 1))); break; TODO what should be implemented? @@ -134,12 +134,12 @@ private void processWaterfall(ByteBuffer data) { @Override public void onMessage(TkConn conn, String message) {} - @Override public void onClose(TkConn conn, TkSocketState state) { - this.controlEvent(state.closeCode, null, state.closeReason, state.closeRemote, null); + @Override public void onClose(TkConn conn) { + this.sdrEvent(conn, null, null, null); } @Override public void onError(TkConn conn, Exception e) { - this.controlEvent(-1, null, null, false, e); + this.sdrEvent(conn, null, null, e); } public TkControlHdl withAudioHandler(TkAudioHdl hdl) { @@ -157,8 +157,8 @@ public TkControlHdl withJsonIn(TkJsonIn jsonIn) { return this; } - public TkControlHdl withControlPin(TkControlPin pin) { - this.controlPin = requireNonNull(pin); + public TkControlHdl withSdrPin(TkSdrPin pin) { + this.sdrPin = requireNonNull(pin); return this; } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkConn.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkConn.java index b1a5c7a..63f413f 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkConn.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkConn.java @@ -1,12 +1,18 @@ package io.vacco.tokoeka.spi; +import io.vacco.tokoeka.util.TkSocketState; import java.net.Socket; import java.util.function.Consumer; public interface TkConn extends Consumer { + void setAttachment(Object attachment); T getAttachment(); + Socket getSocket(); + TkSocketState getState(); + void close(int code); void close(int code, String msg); + } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java deleted file mode 100644 index 7bddda5..0000000 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java +++ /dev/null @@ -1,5 +0,0 @@ -package io.vacco.tokoeka.spi; - -public interface TkControlPin { - void onEvent(int code, String key, String value, boolean remote, Exception e); -} diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSdrPin.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSdrPin.java new file mode 100644 index 0000000..9ab440b --- /dev/null +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSdrPin.java @@ -0,0 +1,5 @@ +package io.vacco.tokoeka.spi; + +public interface TkSdrPin { + void onEvent(TkConn conn, String key, String value, Exception e); +} diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSocketHdl.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSocketHdl.java index 4b7f250..b63b06c 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSocketHdl.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSocketHdl.java @@ -1,6 +1,5 @@ package io.vacco.tokoeka.spi; -import io.vacco.tokoeka.util.TkSocketState; import java.nio.ByteBuffer; public interface TkSocketHdl { @@ -8,7 +7,7 @@ public interface TkSocketHdl { void onOpen(TkConn conn, String handShake); void onMessage(TkConn conn, String msg); void onMessage(TkConn conn, ByteBuffer msg); - void onClose(TkConn conn, TkSocketState state); + void onClose(TkConn conn); void onError(TkConn conn, Exception e); } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkConnAdapter.java b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkConnAdapter.java index 99f5d40..4754420 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkConnAdapter.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkConnAdapter.java @@ -3,7 +3,6 @@ import io.vacco.tokoeka.spi.TkConn; import java.net.Socket; import java.util.Objects; -import java.util.function.BiConsumer; import java.util.function.Consumer; public class TkConnAdapter implements TkConn { @@ -35,6 +34,10 @@ public TkConnAdapter(Socket socket, TkSocketState socketState, Consumer return socket; } + @Override public TkSocketState getState() { + return socketState; + } + @Override public void close(int code) { socketState.markClosed(code, null, false); } @@ -44,7 +47,7 @@ public TkConnAdapter(Socket socket, TkSocketState socketState, Consumer } @Override public String toString() { - return socket.toString(); + return String.format("%s, %s", socket, socketState); } } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSocketState.java b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSocketState.java index f69e3fe..cfde0e3 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSocketState.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSocketState.java @@ -1,7 +1,6 @@ package io.vacco.tokoeka.util; import java.io.ByteArrayOutputStream; -import java.util.Objects; public class TkSocketState { @@ -12,20 +11,20 @@ public class TkSocketState { public long keepAliveMs; public int maxFrameBytes; - public Integer closeCode; + public int closeCode = Integer.MIN_VALUE; public String closeReason; - public boolean closeRemote; + public boolean closeByRemote; public Object attachment; - public void markClosed(Integer closeCode, String closeReason, boolean closeRemote) { - this.closeCode = Objects.requireNonNull(closeCode); + public void markClosed(int closeCode, String closeReason, boolean closeByRemote) { + this.closeCode = closeCode; this.closeReason = closeReason; - this.closeRemote = closeRemote; + this.closeByRemote = closeByRemote; } public boolean isClosed() { - return closeCode != null; + return closeCode != Integer.MIN_VALUE; } public static TkSocketState of(long keepAliveMs, int maxFrameBytes) { @@ -37,8 +36,10 @@ public static TkSocketState of(long keepAliveMs, int maxFrameBytes) { @Override public String toString() { return String.format( - "lpi: %d, lpo: %d, clc: %s, clr: %s, clRm: %s", - lastPingMs, lastPongMs, closeCode, closeReason, closeRemote + "piMs: %d, poMs: %d, cl: %d, clr: %s, clRm: %s", + lastPingMs, lastPongMs, + closeCode == Integer.MIN_VALUE ? -1 : closeCode, + closeReason, closeByRemote ); } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java index 3e9ee89..48534fa 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java @@ -222,14 +222,14 @@ public static void send(String message, OutputStream outputStream) { } } - public static void tearDown(OutputStream outputStream, TkConn socketConn, - TkSocketState socketState, TkSocketHdl socketHdl) { + public static void tearDown(OutputStream outputStream, TkConn socketConn, TkSocketHdl socketHdl) { try { - if (socketState.closeCode != null) { - if (!socketState.closeRemote) { - sendClose(outputStream, socketState.closeCode, socketState.closeReason); + var state = socketConn.getState(); + if (state.isClosed()) { + if (!state.closeByRemote) { + sendClose(outputStream, state.closeCode, state.closeReason); } - socketHdl.onClose(socketConn, socketState); + socketHdl.onClose(socketConn); } } catch (Exception e) { socketHdl.onError(socketConn, e); @@ -255,8 +255,9 @@ else if (payloadLength == 127) { return payloadLength; } - public static boolean handleMessage(TkSocketHdl socketHdl, TkSocketState socketState, TkConn conn, + public static boolean handleMessage(TkSocketHdl socketHdl, TkConn conn, InputStream inputStream, OutputStream outputStream) throws IOException, InterruptedException { + var socketState = conn.getState(); if (socketState.isClosed()) { return true; } diff --git a/tk-sdr/src/test/java/io/vacco/tokoeka/TkControlHdlTest.java b/tk-sdr/src/test/java/io/vacco/tokoeka/TkControlHdlTest.java index a575332..60f6c39 100644 --- a/tk-sdr/src/test/java/io/vacco/tokoeka/TkControlHdlTest.java +++ b/tk-sdr/src/test/java/io/vacco/tokoeka/TkControlHdlTest.java @@ -4,6 +4,7 @@ import io.vacco.tokoeka.handler.*; import io.vacco.tokoeka.schema.*; import io.vacco.tokoeka.spi.TkConn; +import io.vacco.tokoeka.util.TkSocketState; import j8spec.annotation.DefinedOrder; import j8spec.junit.J8SpecRunner; import org.junit.runner.RunWith; @@ -38,14 +39,14 @@ public class TkControlHdlTest { var cfg = new TkConfig(); cfg.modulation = TkModulation.am; + var state = TkSocketState.of(-1, 65536); var conn = new TkConn() { @Override public void accept(String s) { log.info(s); } @Override public void setAttachment(Object attachment) {} @Override public T getAttachment() { return null; } @Override public Socket getSocket() { return null; } - @Override public void close(int code) { - log.info("close - [{}]", code); - } + @Override public TkSocketState getState() { return state; } + @Override public void close(int code) { log.info("close - [{}]", code); } @Override public void close(int code, String msg) { log.info("close - [{}, {}]", code, msg); } @@ -75,6 +76,7 @@ public class TkControlHdlTest { log.info("Ref command ==> {}", shorten(msg.data)); } } + for (var msg : wfMessages) { if ("receive".equals(msg.type)) { var bytes = ByteBuffer.wrap(Base64.getDecoder().decode(msg.data)); diff --git a/tk-sdr/src/test/java/io/vacco/tokoeka/TkSocketTest.java b/tk-sdr/src/test/java/io/vacco/tokoeka/TkSocketTest.java index 1d798de..68b4e56 100644 --- a/tk-sdr/src/test/java/io/vacco/tokoeka/TkSocketTest.java +++ b/tk-sdr/src/test/java/io/vacco/tokoeka/TkSocketTest.java @@ -47,15 +47,13 @@ public class TkSocketTest { squelch.processAudio(rawPcm); player.play(sampleRate, rawPcm); })) - .withControlPin((code, key, value, remote, e) -> { - log.info("control event: {} [{}] [{}] {}", code, key, value, remote, e); - var isError = code > 1000 - || (value != null && value.equals("Operation timed out")) - || (code == -1 && key == null && value == null && !remote); - if (isError) { - go[0] = false; - } - }) + .withSdrPin(((conn, key, value, e) -> { + log.info("sdr event: {} [{}, {}]", conn, key, value, e); + var state0 = conn.getState(); + var code = state0.closeCode; + var isError = (code > 1000) || (value != null && value.equals("Operation timed out")); + go[0] = !isError; + })) .withControlFn(conn -> go[0] && nowMsDiffLt(nowMs, 120_000)); sock.withHandler(ctlHdl);