From a306a0c8a3f123e27e23d347dd93e62a187d6aca Mon Sep 17 00:00:00 2001 From: Jesus Zazueta Date: Sun, 8 Sep 2024 13:06:59 -0400 Subject: [PATCH] Squashed commit of the following: - WS client connection refactoring. --- build.gradle.kts | 2 +- .../io/vacco/tokoeka/spi/TkSquelchPin.java | 5 - tk-sdr/build.gradle.kts | 2 +- .../main/java/io/vacco/tokoeka/TkSocket.java | 191 +++++++++++++++--- .../tokoeka/{util => audio}/TkAdpcm.java | 2 +- .../tokoeka/{util => audio}/TkAudio.java | 2 +- .../{util => audio}/TkAudioPlayer.java | 2 +- .../io/vacco/tokoeka/audio/TkNormalize.java | 64 ++++++ .../tokoeka/{util => audio}/TkSquelch.java | 8 +- .../tokoeka/{ => handler}/TkAudioHdl.java | 5 +- .../tokoeka/{ => handler}/TkControlHdl.java | 93 +++++---- .../tokoeka/{ => handler}/TkWaterfallHdl.java | 2 +- .../java/io/vacco/tokoeka/spi/TkAudioPin.java | 0 .../io/vacco/tokoeka/spi/TkConfigPin.java | 0 .../io/vacco/tokoeka/spi/TkControlPin.java | 0 .../java/io/vacco/tokoeka/spi/TkExtPin.java | 0 .../java/io/vacco/tokoeka/spi/TkJsonIn.java | 0 .../io/vacco/tokoeka/spi/TkSocketHdl.java | 13 ++ .../io/vacco/tokoeka/spi/TkSquelchPin.java | 5 + .../java/io/vacco/tokoeka/spi/TkWfPin.java | 0 .../java/io/vacco/tokoeka/util/TkCounter.java | 6 + .../java/io/vacco/tokoeka/util/TkSockets.java | 79 ++++++++ tk-sdr/src/test/java/TkControlHdlTest.java | 13 +- tk-sdr/src/test/java/TkNormalizeTest.java | 81 ++++++++ tk-sdr/src/test/java/TkSocketTest.java | 40 ++-- 25 files changed, 507 insertions(+), 108 deletions(-) delete mode 100644 tk-schema/src/main/java/io/vacco/tokoeka/spi/TkSquelchPin.java rename tk-sdr/src/main/java/io/vacco/tokoeka/{util => audio}/TkAdpcm.java (98%) rename tk-sdr/src/main/java/io/vacco/tokoeka/{util => audio}/TkAudio.java (96%) rename tk-sdr/src/main/java/io/vacco/tokoeka/{util => audio}/TkAudioPlayer.java (98%) create mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkNormalize.java rename tk-sdr/src/main/java/io/vacco/tokoeka/{util => audio}/TkSquelch.java (88%) rename tk-sdr/src/main/java/io/vacco/tokoeka/{ => handler}/TkAudioHdl.java (95%) rename tk-sdr/src/main/java/io/vacco/tokoeka/{ => handler}/TkControlHdl.java (73%) rename tk-sdr/src/main/java/io/vacco/tokoeka/{ => handler}/TkWaterfallHdl.java (97%) rename {tk-schema => tk-sdr}/src/main/java/io/vacco/tokoeka/spi/TkAudioPin.java (100%) rename {tk-schema => tk-sdr}/src/main/java/io/vacco/tokoeka/spi/TkConfigPin.java (100%) rename {tk-schema => tk-sdr}/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java (100%) rename {tk-schema => tk-sdr}/src/main/java/io/vacco/tokoeka/spi/TkExtPin.java (100%) rename {tk-schema => tk-sdr}/src/main/java/io/vacco/tokoeka/spi/TkJsonIn.java (100%) create mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSocketHdl.java create mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSquelchPin.java rename {tk-schema => tk-sdr}/src/main/java/io/vacco/tokoeka/spi/TkWfPin.java (100%) create mode 100644 tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java create mode 100644 tk-sdr/src/test/java/TkNormalizeTest.java diff --git a/build.gradle.kts b/build.gradle.kts index b522ec9..effdc22 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -4,7 +4,7 @@ subprojects { apply(plugin = "io.vacco.oss.gitflow") group = "io.vacco.tokoeka" - version = "0.1.1" + version = "0.2.0" configure { sharedLibrary(true, false) diff --git a/tk-schema/src/main/java/io/vacco/tokoeka/spi/TkSquelchPin.java b/tk-schema/src/main/java/io/vacco/tokoeka/spi/TkSquelchPin.java deleted file mode 100644 index 8de981e..0000000 --- a/tk-schema/src/main/java/io/vacco/tokoeka/spi/TkSquelchPin.java +++ /dev/null @@ -1,5 +0,0 @@ -package io.vacco.tokoeka.spi; - -public interface TkSquelchPin { - void onUpdate(boolean open, byte[] pcm, double signalAvg, double signalThr); -} diff --git a/tk-sdr/build.gradle.kts b/tk-sdr/build.gradle.kts index d5068a2..b7adac2 100644 --- a/tk-sdr/build.gradle.kts +++ b/tk-sdr/build.gradle.kts @@ -5,8 +5,8 @@ configure { val api by configurations dependencies { - api("org.java-websocket:Java-WebSocket:1.5.6") api(project(":tk-schema")) + api("org.slf4j:slf4j-api:2.0.6") testImplementation("io.vacco.shax:shax:2.0.6.0.1.0") testImplementation("com.google.code.gson:gson:2.10.1") } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java b/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java index 608ad3d..5fb0191 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java @@ -1,48 +1,191 @@ package io.vacco.tokoeka; -import org.java_websocket.client.WebSocketClient; -import org.java_websocket.handshake.ServerHandshake; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.*; +import io.vacco.tokoeka.spi.TkSocketHdl; +import org.slf4j.*; +import java.io.*; +import java.net.Socket; +import java.util.function.*; -public class TkSocket extends WebSocketClient { +import static java.nio.ByteBuffer.wrap; +import static java.util.Objects.requireNonNull; +import static io.vacco.tokoeka.util.TkSockets.*; - private TkControlHdl handler; +public class TkSocket implements AutoCloseable, Consumer { - public TkSocket(URI serverUri) { - super(serverUri); + private static final Logger log = LoggerFactory.getLogger(TkSocket.class); + + private final String endpoint; + private final Socket socket; + + private OutputStream outputStream; + private InputStream inputStream; + private TkSocketHdl socketHdl; + + private final ByteArrayOutputStream accumulatedData = new ByteArrayOutputStream(); + + public TkSocket(String host, int port, String endpoint, boolean secure, int timeout) { + this.endpoint = requireNonNull(endpoint); + this.socket = createSocket(host, port, secure, timeout); } - @Override public void onOpen(ServerHandshake hs) { - if (handler != null) { - handler.onAuth(); + public TkSocket connect() { + try { + outputStream = socket.getOutputStream(); + inputStream = socket.getInputStream(); + outputStream.write(wsHandShakeOf(endpoint).getBytes()); + outputStream.flush(); + var reader = new BufferedReader(new InputStreamReader(inputStream)); + var bld = new StringBuilder(); + String line; + while (!(line = reader.readLine()).isEmpty()) { + bld.append(line).append('\n'); + } + var hs = bld.toString(); + if (!hs.contains("HTTP/1.1 101")) { + throw new IllegalStateException("ws connection handshake failed: " + hs); + } + this.socketHdl.onOpen(hs); + return this; + } catch (Exception e) { + throw new IllegalStateException("ws connection open failed", e); } } - @Override public void onMessage(ByteBuffer bytes) { - if (handler != null) { - handler.accept(bytes); + private void sendPong() throws IOException { + var pongFrame = new byte[2]; + pongFrame[0] = (byte) 0x8A; // 0x8A = FIN + opcode 0xA (PONG) + outputStream.write(pongFrame); + outputStream.flush(); + if (log.isTraceEnabled()) { + log.trace("< PONG"); } } - @Override public void onMessage(String message) { } + public void send(String message) { + try { + var payload = message.getBytes(); + var payloadLength = payload.length; + var frame = new ByteArrayOutputStream(); + frame.write(0x81); // FIN + text frame opcode (0x1) + if (payloadLength <= 125) { + frame.write(payloadLength); + } else if (payloadLength <= 65535) { + frame.write(126); + frame.write((payloadLength >> 8) & 0xFF); // most significant byte + frame.write(payloadLength & 0xFF); // least significant byte + } else { + frame.write(127); // 8-byte payload length + // For large payloads (>65535 bytes), write the 8-byte length + // First four bytes should be zeros (per WebSocket protocol) + frame.write(0); frame.write(0); frame.write(0); frame.write(0); + // Write the last four bytes of the payload length + frame.write((payloadLength >> 24) & 0xFF); // most significant byte + frame.write((payloadLength >> 16) & 0xFF); + frame.write((payloadLength >> 8) & 0xFF); + frame.write(payloadLength & 0xFF); // least significant byte + } + frame.write(payload); + outputStream.write(frame.toByteArray()); + outputStream.flush(); + if (log.isDebugEnabled()) { + log.debug("< TXT: {} ({} bytes)", message, payload.length); + } + } catch (Exception e) { + throw new IllegalStateException(String.format("unable to send text: %s", message), e); + } + } - @Override public void onClose(int code, String reason, boolean remote) { - if (handler != null && handler.controlPin != null) { - handler.controlPin.onEvent(code, null, reason, remote, null); + public void listen(Supplier go) { + while (go.get() && !socket.isClosed()) { + try { + var frameHeader = new byte[2]; + read(inputStream, frameHeader); + var isFinalFragment = (frameHeader[0] & 0x80) != 0; // Check if FIN bit is set + var opcode = frameHeader[0] & 0x0F; + var payloadLength = payloadLengthOf(frameHeader, inputStream); + var payload = new byte[payloadLength]; + int bytesRead = 0; + while (bytesRead < payloadLength) { + int read = inputStream.read(payload, bytesRead, payloadLength - bytesRead); + if (read == -1) { + throw new IOException("unexpected end of stream"); + } + bytesRead += read; + } + accumulatedData.write(payload); + if (isFinalFragment) { + var completeMessage = accumulatedData.toByteArray(); + if (opcode == 0x1) { + var msg = new String(completeMessage); + if (log.isTraceEnabled()) { + log.trace("> TXT: {}", msg); + } + this.socketHdl.onMessage(msg); + } else if (opcode == 0x2) { + if (log.isTraceEnabled()) { + log.trace("> BIN ({})", completeMessage.length); + } + this.socketHdl.onMessage(wrap(completeMessage)); + } else if (opcode == 0xA) { + if (log.isTraceEnabled()) { + log.trace("> PONG"); + } + } else if (opcode == 0x9) { + if (log.isTraceEnabled()) { + log.trace("> PING"); + } + sendPong(); + } else if (opcode == 0x8) { + if (completeMessage.length >= 2) { + int closeCode = ((completeMessage[0] & 0xFF) << 8) | (completeMessage[1] & 0xFF); + if (log.isTraceEnabled()) { + log.trace("> CLOSE ({})", closeCode); + } + this.socketHdl.onClose(closeCode); + } else { + log.warn("Received close frame with no close code."); + } + break; + } + accumulatedData.reset(); + } + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("ws message processing error", e); + } + this.socketHdl.onError(e); + break; + } } + this.close(); } - @Override public void onError(Exception ex) { - if (handler != null && handler.controlPin != null) { - handler.controlPin.onEvent(-1, null, null, false, ex); + @Override public void accept(String s) { + this.send(s); + } + + @Override public void close() { + try { + socket.close(); + } catch (IOException e) { + log.error("Unable to close ws socket: {} - {}", this, e.getMessage()); + } + if (log.isDebugEnabled()) { + log.debug("ws connection closed"); } } - public TkSocket withHandler(TkControlHdl handler) { - this.handler = Objects.requireNonNull(handler); + public TkSocket withHandler(TkSocketHdl hdl) { + this.socketHdl = requireNonNull(hdl); return this; } + public Socket getSocket() { + return socket; + } + + @Override public String toString() { + return String.format("%s - %s", socket, endpoint); + } + } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkAdpcm.java b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkAdpcm.java similarity index 98% rename from tk-sdr/src/main/java/io/vacco/tokoeka/util/TkAdpcm.java rename to tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkAdpcm.java index 5771ede..463a271 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkAdpcm.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkAdpcm.java @@ -1,4 +1,4 @@ -package io.vacco.tokoeka.util; +package io.vacco.tokoeka.audio; public class TkAdpcm { diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkAudio.java b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkAudio.java similarity index 96% rename from tk-sdr/src/main/java/io/vacco/tokoeka/util/TkAudio.java rename to tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkAudio.java index d85da94..da2fd03 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkAudio.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkAudio.java @@ -1,4 +1,4 @@ -package io.vacco.tokoeka.util; +package io.vacco.tokoeka.audio; import java.nio.ByteBuffer; import java.nio.ByteOrder; diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkAudioPlayer.java b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkAudioPlayer.java similarity index 98% rename from tk-sdr/src/main/java/io/vacco/tokoeka/util/TkAudioPlayer.java rename to tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkAudioPlayer.java index 14837b8..aafc6c7 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkAudioPlayer.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkAudioPlayer.java @@ -1,4 +1,4 @@ -package io.vacco.tokoeka.util; +package io.vacco.tokoeka.audio; import javax.sound.sampled.*; diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkNormalize.java b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkNormalize.java new file mode 100644 index 0000000..bf8ec63 --- /dev/null +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkNormalize.java @@ -0,0 +1,64 @@ +package io.vacco.tokoeka.audio; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class TkNormalize { + + private final List audioChunks = new ArrayList<>(); + + public void update(byte[] pcm) { + audioChunks.add(pcm); + } + + public Iterator close() { + var normalizedChunks = new ArrayList(); + int runningPeak = extractRunningPeak(); + var normalizationFactor = calculateNormalizationFactor(runningPeak); + + for (byte[] chunk : audioChunks) { + var samples = byteArrayToShortArray(chunk); + var normalizedSamples = new short[samples.length]; + for (int i = 0; i < samples.length; i++) { + int normalizedSample = (int) (samples[i] * normalizationFactor); + normalizedSamples[i] = (short) Math.max(Short.MIN_VALUE, Math.min(Short.MAX_VALUE, normalizedSample)); + } + normalizedChunks.add(shortArrayToByteArray(normalizedSamples)); + } + + return normalizedChunks.iterator(); + } + + private int extractRunningPeak() { + int runningPeak = 0; + for (var chunk : audioChunks) { + var samples = byteArrayToShortArray(chunk); + for (var sample : samples) { + int absValue = Math.abs(sample); + if (absValue > runningPeak) { + runningPeak = absValue; + } + } + } + return runningPeak; + } + + private short[] byteArrayToShortArray(byte[] byteArray) { + var shortArray = new short[byteArray.length / 2]; + ByteBuffer.wrap(byteArray).order(java.nio.ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(shortArray); + return shortArray; + } + + private byte[] shortArrayToByteArray(short[] shortArray) { + var byteArray = new byte[shortArray.length * 2]; + ByteBuffer.wrap(byteArray).order(java.nio.ByteOrder.LITTLE_ENDIAN).asShortBuffer().put(shortArray); + return byteArray; + } + + private float calculateNormalizationFactor(int currentPeak) { + return 32767.0f / currentPeak; + } + +} diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSquelch.java b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkSquelch.java similarity index 88% rename from tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSquelch.java rename to tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkSquelch.java index fbfead4..9335e88 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSquelch.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkSquelch.java @@ -1,11 +1,11 @@ -package io.vacco.tokoeka.util; +package io.vacco.tokoeka.audio; import io.vacco.tokoeka.schema.TkSquelchParams; import io.vacco.tokoeka.spi.TkSquelchPin; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import static io.vacco.tokoeka.util.TkAudio.signalAverageOf; +import static io.vacco.tokoeka.audio.TkAudio.signalAverageOf; public class TkSquelch { @@ -30,12 +30,12 @@ public void processAudio(byte[] pcm) { squelchOpen = true; } else if ((currentTime - lastSignalTime) > params.tailTimeMs) { if (squelchOpen && pin != null) { - pin.onUpdate(false, pcm, signalAvg, threshold); + pin.onSquelch(false, pcm, signalAvg, threshold); } squelchOpen = false; } if (squelchOpen && pin != null) { - pin.onUpdate(true, pcm, signalAvg, threshold); + pin.onSquelch(true, pcm, signalAvg, threshold); } updateNoiseFloor(signalAvg); diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/TkAudioHdl.java b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkAudioHdl.java similarity index 95% rename from tk-sdr/src/main/java/io/vacco/tokoeka/TkAudioHdl.java rename to tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkAudioHdl.java index 188fce0..0d6b7b4 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/TkAudioHdl.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkAudioHdl.java @@ -1,5 +1,6 @@ -package io.vacco.tokoeka; +package io.vacco.tokoeka.handler; +import io.vacco.tokoeka.audio.TkAdpcm; import io.vacco.tokoeka.schema.TkConfig; import io.vacco.tokoeka.spi.TkAudioPin; import io.vacco.tokoeka.util.*; @@ -8,7 +9,7 @@ import java.util.Objects; import java.util.function.Consumer; -import static io.vacco.tokoeka.util.TkAudio.*; +import static io.vacco.tokoeka.audio.TkAudio.*; import static io.vacco.tokoeka.util.TkCommand.*; public class TkAudioHdl { diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/TkControlHdl.java b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java similarity index 73% rename from tk-sdr/src/main/java/io/vacco/tokoeka/TkControlHdl.java rename to tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java index fc6b601..68b32bf 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/TkControlHdl.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java @@ -1,4 +1,4 @@ -package io.vacco.tokoeka; +package io.vacco.tokoeka.handler; import io.vacco.tokoeka.schema.dx.TkDxConfig; import io.vacco.tokoeka.schema.kiwi.TkKiwiConfig; @@ -9,7 +9,6 @@ import java.net.URLDecoder; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.Objects; import java.util.function.Consumer; import static io.vacco.tokoeka.schema.TkConstants.*; @@ -17,43 +16,31 @@ import static io.vacco.tokoeka.util.TkFormat.*; import static java.lang.Double.parseDouble; import static java.lang.Integer.parseInt; +import static java.util.Objects.requireNonNull; -public class TkControlHdl implements Consumer { +public class TkControlHdl implements TkSocketHdl { private static final Logger log = LoggerFactory.getLogger(TkControlHdl.class); private final TkConfig config; - private final Consumer tx; - private TkAudioHdl audioHdl; - private TkWaterfallHdl waterfallHdl; - private TkJsonIn jsonIn; - private TkConfigPin configPin; - protected TkControlPin controlPin; + private Consumer tx; + private TkAudioHdl audioHdl; + private TkWaterfallHdl waterfallHdl; + private TkJsonIn jsonIn; + private TkConfigPin configPin; + protected TkControlPin controlPin; public TkKiwiConfig kiwiConfig; - public TkDxConfig dxConfig, dxCommConfig; + public TkDxConfig dxConfig, dxCommConfig; - public TkControlHdl(TkConfig config, Consumer tx) { - this.config = Objects.requireNonNull(config); - this.tx = Objects.requireNonNull(tx); + public TkControlHdl(TkConfig config) { + this.config = requireNonNull(config); } - private void processAudio(ByteBuffer data) { - if (this.audioHdl != null) { - this.audioHdl.processAudio(data); - } - } - - private void processWaterfall(ByteBuffer data) { - if (this.waterfallHdl != null) { - this.waterfallHdl.processWaterfall(data); - } - } - - private void controlEvent(String key, String value, boolean remote, Exception e) { + public void controlEvent(int wsCode, String key, String value, boolean remote, Exception e) { if (this.controlPin != null) { - this.controlPin.onEvent(-1, key, value, remote, e); + this.controlPin.onEvent(wsCode, key, value, remote, e); } } @@ -79,7 +66,7 @@ private void processKeyValue(String key, String value) { case badp: case too_busy: case redirect: - case down: controlEvent(key, value, true, null); break; + case down: this.controlEvent(-1, key, value, true, null); break; default: if (log.isDebugEnabled()) { log.debug("Unknown message key/value: {} -> {}", key, value == null ? "" : value.trim()); @@ -95,7 +82,27 @@ private void processMsg(String body) { params.forEach(this::processKeyValue); } - @Override public void accept(ByteBuffer data) { + private void processAudio(ByteBuffer data) { + if (this.audioHdl != null) { + this.audioHdl.processAudio(data); + } + } + + private void processWaterfall(ByteBuffer data) { + if (this.waterfallHdl != null) { + this.waterfallHdl.processWaterfall(data); + } + } + + @Override public void onOpen(String handShake) { + if (tx == null) { + throw new IllegalStateException("no tx sink, check handler configuration."); + } + tx.accept(setAuth(config.username, config.password)); + tx.accept(setIdentity(config.identUser)); + } + + @Override public void onMessage(ByteBuffer data) { if (data == null || data.remaining() < 3) { log.error("No data, or received data is too short to contain a valid tag"); return; @@ -118,37 +125,47 @@ private void processMsg(String body) { default: log.warn("Unsupported message tag {} ({})", tag, data.remaining()); } } catch (Exception e) { - controlEvent(null, null, false, e); + this.onError(e); } } - public void onAuth() { - tx.accept(setAuth(config.username, config.password)); - tx.accept(setIdentity(config.identUser)); + @Override public void onMessage(String message) {} + + @Override public void onClose(int code) { + this.controlEvent(code, null, null, true, null); + } + + @Override public void onError(Exception e) { + this.controlEvent(-1, null, null, false, e); } public TkControlHdl withAudioHandler(TkAudioHdl hdl) { - this.audioHdl = Objects.requireNonNull(hdl); + this.audioHdl = requireNonNull(hdl); return this; } public TkControlHdl withWaterfallHandler(TkWaterfallHdl hdl) { - this.waterfallHdl = Objects.requireNonNull(hdl); + this.waterfallHdl = requireNonNull(hdl); return this; } public TkControlHdl withJsonIn(TkJsonIn jsonIn) { - this.jsonIn = Objects.requireNonNull(jsonIn); + this.jsonIn = requireNonNull(jsonIn); return this; } public TkControlHdl withControlPin(TkControlPin pin) { - this.controlPin = Objects.requireNonNull(pin); + this.controlPin = requireNonNull(pin); return this; } public TkControlHdl withConfigPin(TkConfigPin pin) { - this.configPin = Objects.requireNonNull(pin); + this.configPin = requireNonNull(pin); + return this; + } + + public TkControlHdl withSink(Consumer tx) { + this.tx = requireNonNull(tx); return this; } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/TkWaterfallHdl.java b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkWaterfallHdl.java similarity index 97% rename from tk-sdr/src/main/java/io/vacco/tokoeka/TkWaterfallHdl.java rename to tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkWaterfallHdl.java index 8e60d91..73e16be 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/TkWaterfallHdl.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkWaterfallHdl.java @@ -1,4 +1,4 @@ -package io.vacco.tokoeka; +package io.vacco.tokoeka.handler; import io.vacco.tokoeka.spi.TkWfPin; import io.vacco.tokoeka.util.TkCounter; diff --git a/tk-schema/src/main/java/io/vacco/tokoeka/spi/TkAudioPin.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkAudioPin.java similarity index 100% rename from tk-schema/src/main/java/io/vacco/tokoeka/spi/TkAudioPin.java rename to tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkAudioPin.java diff --git a/tk-schema/src/main/java/io/vacco/tokoeka/spi/TkConfigPin.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkConfigPin.java similarity index 100% rename from tk-schema/src/main/java/io/vacco/tokoeka/spi/TkConfigPin.java rename to tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkConfigPin.java diff --git a/tk-schema/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java similarity index 100% rename from tk-schema/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java rename to tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java diff --git a/tk-schema/src/main/java/io/vacco/tokoeka/spi/TkExtPin.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkExtPin.java similarity index 100% rename from tk-schema/src/main/java/io/vacco/tokoeka/spi/TkExtPin.java rename to tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkExtPin.java diff --git a/tk-schema/src/main/java/io/vacco/tokoeka/spi/TkJsonIn.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkJsonIn.java similarity index 100% rename from tk-schema/src/main/java/io/vacco/tokoeka/spi/TkJsonIn.java rename to tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkJsonIn.java diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSocketHdl.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSocketHdl.java new file mode 100644 index 0000000..fb9a954 --- /dev/null +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSocketHdl.java @@ -0,0 +1,13 @@ +package io.vacco.tokoeka.spi; + +import java.nio.ByteBuffer; + +public interface TkSocketHdl { + + void onOpen(String handShake); + void onMessage(String message); + void onMessage(ByteBuffer message); + void onClose(int code); + void onError(Exception e); + +} diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSquelchPin.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSquelchPin.java new file mode 100644 index 0000000..5efa6c3 --- /dev/null +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSquelchPin.java @@ -0,0 +1,5 @@ +package io.vacco.tokoeka.spi; + +public interface TkSquelchPin { + void onSquelch(boolean open, byte[] pcm, double signalAvg, double signalThr); +} diff --git a/tk-schema/src/main/java/io/vacco/tokoeka/spi/TkWfPin.java b/tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkWfPin.java similarity index 100% rename from tk-schema/src/main/java/io/vacco/tokoeka/spi/TkWfPin.java rename to tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkWfPin.java diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkCounter.java b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkCounter.java index 351ad71..764282c 100644 --- a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkCounter.java +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkCounter.java @@ -20,4 +20,10 @@ public void update() { k = 0; } } + + public static boolean nowMsDiffLt(long t0, long diffMs) { + var tN = System.currentTimeMillis(); + return (tN - t0) < diffMs; + } + } diff --git a/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java new file mode 100644 index 0000000..4421b11 --- /dev/null +++ b/tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java @@ -0,0 +1,79 @@ +package io.vacco.tokoeka.util; + +import org.slf4j.*; +import javax.net.ssl.SSLSocketFactory; +import java.io.*; +import java.net.*; +import java.nio.ByteBuffer; +import java.util.Base64; + +import static java.nio.ByteBuffer.wrap; + +public class TkSockets { + + private static final Logger log = LoggerFactory.getLogger(TkSockets.class); + + public static Socket createSocket(String host, int port, boolean secure, int timeout) { + try { + if (secure) { + var sslSocketFactory = (SSLSocketFactory) SSLSocketFactory.getDefault(); + var sslSocket = sslSocketFactory.createSocket(); + sslSocket.connect(new InetSocketAddress(host, port), timeout); + return sslSocket; + } else { + var socket = new Socket(); + socket.connect(new InetSocketAddress(host, port), timeout); + return socket; + } + } catch (Exception e) { + throw new IllegalStateException(String.format( + "unable to create ws socket: [%s, %d, wss: %s]", host, port, secure + ), e); + } + } + + public static String wsHandShakeOf(String endpoint) { + var wsKey = Base64.getEncoder().encodeToString(Double.toHexString(Math.random()).getBytes()); + return "GET " + endpoint + " HTTP/1.1\r\n" + // + "Host: " + host + "\r\n" TODO do I need this? + + "Upgrade: websocket\r\n" + + "Connection: Upgrade\r\n" + + "Sec-WebSocket-Key: " + wsKey + "\r\n" + + "Sec-WebSocket-Version: 13\r\n" + + "\r\n"; + } + + public static void read(InputStream is, byte[] buff) { + try { + var bytes = is.read(buff); + if (log.isTraceEnabled()) { + log.trace("read {} bytes", bytes); + } + if (bytes == -1) { + throw new IllegalStateException("eof"); + } + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + public static int payloadLengthOf(byte[] frameHeader, InputStream is) { + int payloadLength = frameHeader[1] & 0x7F; + if (payloadLength == 126) { + var extendedPayloadLength = new byte[2]; + read(is, extendedPayloadLength); + return wrap(extendedPayloadLength).getShort() & 0xFFFF; + } + else if (payloadLength == 127) { + var extendedPayloadLength = new byte[8]; + read(is, extendedPayloadLength); + var longPayloadLength = ByteBuffer.wrap(extendedPayloadLength).getLong(); + if (longPayloadLength > Integer.MAX_VALUE) { + throw new IllegalStateException("payload too large to handle"); + } + return (int) longPayloadLength; + } + return payloadLength; + } + +} diff --git a/tk-sdr/src/test/java/TkControlHdlTest.java b/tk-sdr/src/test/java/TkControlHdlTest.java index 0fb521d..26e335a 100644 --- a/tk-sdr/src/test/java/TkControlHdlTest.java +++ b/tk-sdr/src/test/java/TkControlHdlTest.java @@ -1,8 +1,7 @@ import com.google.gson.Gson; import io.vacco.shax.logging.ShOption; -import io.vacco.tokoeka.*; -import io.vacco.tokoeka.schema.TkModulation; -import io.vacco.tokoeka.schema.TkConfig; +import io.vacco.tokoeka.handler.*; +import io.vacco.tokoeka.schema.*; import j8spec.annotation.DefinedOrder; import j8spec.junit.J8SpecRunner; import org.junit.runner.RunWith; @@ -26,7 +25,7 @@ public class TkControlHdlTest { ShOption.setSysProp(ShOption.IO_VACCO_SHAX_LOGLEVEL, "debug"); } - private static final Logger log = LoggerFactory.getLogger(TkSocketTest.class); + private static final Logger log = LoggerFactory.getLogger(TkControlHdlTest.class); static { it("Replays Websocket audio messages", () -> { @@ -39,7 +38,7 @@ public class TkControlHdlTest { cfg.modulation = TkModulation.am; var send = (Consumer) log::info; - var ctlHdl = new TkControlHdl(cfg, send) + var ctlHdl = new TkControlHdl(cfg).withSink(send) .withAudioHandler(new TkAudioHdl(cfg, send, (cfg0, flags, sequenceNumber, sMeter, rssi, imaPcm, rawPcm) -> { log.info("flags: {} seqNo: {} sMeter: {} rssi: {} raw: {}", flags, sequenceNumber, sMeter, rssi, rawPcm.length); })) @@ -58,7 +57,7 @@ public class TkControlHdlTest { for (var msg : sndMessages) { if ("receive".equals(msg.type)) { var bytes = ByteBuffer.wrap(Base64.getDecoder().decode(msg.data)); - ctlHdl.accept(bytes); + ctlHdl.onMessage(bytes); } else { log.info("Ref command ==> {}", msg.data); } @@ -66,7 +65,7 @@ public class TkControlHdlTest { for (var msg : wfMessages) { if ("receive".equals(msg.type)) { var bytes = ByteBuffer.wrap(Base64.getDecoder().decode(msg.data)); - ctlHdl.accept(bytes); + ctlHdl.onMessage(bytes); } } diff --git a/tk-sdr/src/test/java/TkNormalizeTest.java b/tk-sdr/src/test/java/TkNormalizeTest.java new file mode 100644 index 0000000..fa6ed1d --- /dev/null +++ b/tk-sdr/src/test/java/TkNormalizeTest.java @@ -0,0 +1,81 @@ +import io.vacco.tokoeka.audio.TkNormalize; +import j8spec.annotation.DefinedOrder; +import j8spec.junit.J8SpecRunner; +import org.junit.runner.RunWith; + +import static io.vacco.tokoeka.util.TkCounter.nowMsDiffLt; +import static j8spec.J8Spec.*; +import static org.junit.Assert.*; + +@DefinedOrder +@RunWith(J8SpecRunner.class) +public class TkNormalizeTest { + static { + it("Measures time differences", () -> { + var nowMs = System.currentTimeMillis(); + while (nowMsDiffLt(nowMs, 2500)) { + System.out.println("tick..."); + Thread.sleep(250); + } + }); + it("Generates no output from empty input", () -> { + var tkn = new TkNormalize(); + var normalizedChunks = tkn.close(); + assertFalse(normalizedChunks.hasNext()); + }); + it("Normalizes a single chunk", () -> { + var tkn = new TkNormalize(); + var pcmChunk = new byte[] { + 0x00, 0x40, 0x00, (byte) 0xC0, // Short values: 16384, -16384 + 0x00, 0x20, 0x00, (byte) 0xE0 // Short values: 8192, -8192 + }; + tkn.update(pcmChunk); + var normalizedChunks = tkn.close(); + assertTrue(normalizedChunks.hasNext()); + var normalized = normalizedChunks.next(); + var expectedNormalized = new byte[] { + (byte) 0xFF, 0x7F, 0x01, (byte) 0x80, // Normalized short values: 32767, -32768 + (byte) 0xFF, 0x3F, 0x01, (byte) 0xC0 // Normalized short values: 16382, -16384 + }; + assertArrayEquals(expectedNormalized, normalized); + }); + it("Normalizes multiple chunks", () -> { + var tkn = new TkNormalize(); + var pcmChunk1 = new byte[] { + 0x00, 0x40, 0x00, (byte) 0xC0 // Short values: 16384, -16384 + }; + var pcmChunk2 = new byte[] { + 0x00, 0x20, 0x00, (byte) 0xE0 // Short values: 8192, -8192 + }; + + tkn.update(pcmChunk1); + tkn.update(pcmChunk2); + + var normalizedChunks = tkn.close(); + + assertTrue(normalizedChunks.hasNext()); + var normalized1 = normalizedChunks.next(); + var expectedNormalized1 = new byte[] { + (byte) 0xFF, 0x7F, 0x01, (byte) 0x80 // Normalized short values: 32767, -32768 + }; + assertArrayEquals(expectedNormalized1, normalized1); + assertTrue(normalizedChunks.hasNext()); + var normalized2 = normalizedChunks.next(); + var expectedNormalized2 = new byte[] { + (byte) 0xFF, 0x3F, 0x01, (byte) 0xC0 // Normalized short values: 16382, -16384 + }; + assertArrayEquals(expectedNormalized2, normalized2); + }); + it("Normalizes samples with peak values", () -> { + var tkn = new TkNormalize(); + var pcmChunk = new byte[] { + (byte) 0xFF, 0x7F, (byte) 0x01, (byte) 0x80 // Short values: 32767, -32767 + }; + tkn.update(pcmChunk); + var normalizedChunks = tkn.close(); + assertTrue(normalizedChunks.hasNext()); + var normalized = normalizedChunks.next(); + assertArrayEquals(pcmChunk, normalized); + }); + } +} diff --git a/tk-sdr/src/test/java/TkSocketTest.java b/tk-sdr/src/test/java/TkSocketTest.java index 0c2fc2f..46e699f 100644 --- a/tk-sdr/src/test/java/TkSocketTest.java +++ b/tk-sdr/src/test/java/TkSocketTest.java @@ -1,18 +1,18 @@ import io.vacco.shax.logging.ShOption; -import io.vacco.tokoeka.*; +import io.vacco.tokoeka.TkSocket; +import io.vacco.tokoeka.audio.*; +import io.vacco.tokoeka.handler.TkAudioHdl; +import io.vacco.tokoeka.handler.TkControlHdl; import io.vacco.tokoeka.schema.*; -import io.vacco.tokoeka.util.*; import j8spec.annotation.DefinedOrder; import j8spec.junit.J8SpecRunner; import org.junit.runner.RunWith; import org.slf4j.*; import java.awt.*; -import java.net.URI; -import java.util.concurrent.*; -import java.util.function.Consumer; import static j8spec.J8Spec.*; +import static io.vacco.tokoeka.util.TkCounter.nowMsDiffLt; @DefinedOrder @RunWith(J8SpecRunner.class) @@ -34,23 +34,20 @@ public class TkSocketTest { // 80m.live:8078 // hb9bxewebsdr.ddns.net:8073 // bclinfo.ddns.net:8073 - 13750kHz, 594kHz, 1600kHz - var uri = new URI("ws://21331.proxy.kiwisdr.com:8073/12287283/SND"); // This looks like a session ID - var cfg = new TkConfig(); - var sock = new TkSocket(uri); - var send = (Consumer) (s) -> { - if (sock.isOpen()) { - log.info(s); - sock.send(s); - } - }; - var sqParams = TkSquelchParams.of(2500, 2.0); + var sock = new TkSocket("sdr.hfunderground.com", 8074, "/12287283/SND", false, 3000); + var cfg = new TkConfig(); + var sqParams = TkSquelchParams.of(2500, 4.0); var squelch = new TkSquelch(sqParams) .withPin((open, pcm, signalAvg, signalThr) -> log.info(">>>> Squelch [open: {}, avg: {}, thr: {}]", open, signalAvg, signalThr)); - var latch = new CountDownLatch(1); var player = new TkAudioPlayer(16, 1); - var ctlHdl = new TkControlHdl(cfg, send) - .withAudioHandler(new TkAudioHdl(cfg, send, (sampleRate, flags, sequenceNumber, sMeter, rssi, imaPcm, rawPcm) -> { + + var go = new boolean[] { true }; + var nowMs = System.currentTimeMillis(); + + var ctlHdl = new TkControlHdl(cfg) + .withSink(sock) + .withAudioHandler(new TkAudioHdl(cfg, sock, (sampleRate, flags, sequenceNumber, sMeter, rssi, imaPcm, rawPcm) -> { log.info("flags: {} seqNo: {} sMeter: {} rssi: {} raw: {}", flags, sequenceNumber, sMeter, String.format("%6.2f", rssi), rawPcm.length); squelch.processAudio(rawPcm); player.play(sampleRate, rawPcm); @@ -61,7 +58,7 @@ public class TkSocketTest { || (value != null && value.equals("Operation timed out")) || (code == -1 && key == null && value == null && !remote); if (isError) { - latch.countDown(); + go[0] = false; } }); @@ -85,9 +82,8 @@ public class TkSocketTest { cfg.nrSpecGain = 1; cfg.nrSpecActiveSnr = 1000; - sock.connectBlocking(); - log.info("{}", latch.await(5, TimeUnit.MINUTES)); - sock.closeBlocking(); + sock.connect().listen(() -> go[0] && nowMsDiffLt(nowMs, 120_000)); + sock.close(); player.close(); } else { log.info("CI/CD build. Stopping.");