Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
    - WS client connection refactoring.
  • Loading branch information
jjzazuet committed Sep 8, 2024
1 parent 5d70922 commit a306a0c
Show file tree
Hide file tree
Showing 25 changed files with 507 additions and 108 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ subprojects {
apply(plugin = "io.vacco.oss.gitflow")

group = "io.vacco.tokoeka"
version = "0.1.1"
version = "0.2.0"

configure<io.vacco.oss.gitflow.GsPluginProfileExtension> {
sharedLibrary(true, false)
Expand Down

This file was deleted.

2 changes: 1 addition & 1 deletion tk-sdr/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ configure<io.vacco.oss.gitflow.GsPluginProfileExtension> {
val api by configurations

dependencies {
api("org.java-websocket:Java-WebSocket:1.5.6")
api(project(":tk-schema"))
api("org.slf4j:slf4j-api:2.0.6")
testImplementation("io.vacco.shax:shax:2.0.6.0.1.0")
testImplementation("com.google.code.gson:gson:2.10.1")
}
191 changes: 167 additions & 24 deletions tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java
Original file line number Diff line number Diff line change
@@ -1,48 +1,191 @@
package io.vacco.tokoeka;

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.*;
import io.vacco.tokoeka.spi.TkSocketHdl;
import org.slf4j.*;
import java.io.*;
import java.net.Socket;
import java.util.function.*;

public class TkSocket extends WebSocketClient {
import static java.nio.ByteBuffer.wrap;
import static java.util.Objects.requireNonNull;
import static io.vacco.tokoeka.util.TkSockets.*;

private TkControlHdl handler;
public class TkSocket implements AutoCloseable, Consumer<String> {

public TkSocket(URI serverUri) {
super(serverUri);
private static final Logger log = LoggerFactory.getLogger(TkSocket.class);

private final String endpoint;
private final Socket socket;

private OutputStream outputStream;
private InputStream inputStream;
private TkSocketHdl socketHdl;

private final ByteArrayOutputStream accumulatedData = new ByteArrayOutputStream();

public TkSocket(String host, int port, String endpoint, boolean secure, int timeout) {
this.endpoint = requireNonNull(endpoint);
this.socket = createSocket(host, port, secure, timeout);
}

@Override public void onOpen(ServerHandshake hs) {
if (handler != null) {
handler.onAuth();
public TkSocket connect() {
try {
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
outputStream.write(wsHandShakeOf(endpoint).getBytes());
outputStream.flush();
var reader = new BufferedReader(new InputStreamReader(inputStream));
var bld = new StringBuilder();
String line;
while (!(line = reader.readLine()).isEmpty()) {
bld.append(line).append('\n');
}
var hs = bld.toString();
if (!hs.contains("HTTP/1.1 101")) {
throw new IllegalStateException("ws connection handshake failed: " + hs);
}
this.socketHdl.onOpen(hs);
return this;
} catch (Exception e) {
throw new IllegalStateException("ws connection open failed", e);
}
}

@Override public void onMessage(ByteBuffer bytes) {
if (handler != null) {
handler.accept(bytes);
private void sendPong() throws IOException {
var pongFrame = new byte[2];
pongFrame[0] = (byte) 0x8A; // 0x8A = FIN + opcode 0xA (PONG)
outputStream.write(pongFrame);
outputStream.flush();
if (log.isTraceEnabled()) {
log.trace("< PONG");
}
}

@Override public void onMessage(String message) { }
public void send(String message) {
try {
var payload = message.getBytes();
var payloadLength = payload.length;
var frame = new ByteArrayOutputStream();
frame.write(0x81); // FIN + text frame opcode (0x1)
if (payloadLength <= 125) {
frame.write(payloadLength);
} else if (payloadLength <= 65535) {
frame.write(126);
frame.write((payloadLength >> 8) & 0xFF); // most significant byte
frame.write(payloadLength & 0xFF); // least significant byte
} else {
frame.write(127); // 8-byte payload length
// For large payloads (>65535 bytes), write the 8-byte length
// First four bytes should be zeros (per WebSocket protocol)
frame.write(0); frame.write(0); frame.write(0); frame.write(0);
// Write the last four bytes of the payload length
frame.write((payloadLength >> 24) & 0xFF); // most significant byte
frame.write((payloadLength >> 16) & 0xFF);
frame.write((payloadLength >> 8) & 0xFF);
frame.write(payloadLength & 0xFF); // least significant byte
}
frame.write(payload);
outputStream.write(frame.toByteArray());
outputStream.flush();
if (log.isDebugEnabled()) {
log.debug("< TXT: {} ({} bytes)", message, payload.length);
}
} catch (Exception e) {
throw new IllegalStateException(String.format("unable to send text: %s", message), e);
}
}

@Override public void onClose(int code, String reason, boolean remote) {
if (handler != null && handler.controlPin != null) {
handler.controlPin.onEvent(code, null, reason, remote, null);
public void listen(Supplier<Boolean> go) {
while (go.get() && !socket.isClosed()) {
try {
var frameHeader = new byte[2];
read(inputStream, frameHeader);
var isFinalFragment = (frameHeader[0] & 0x80) != 0; // Check if FIN bit is set
var opcode = frameHeader[0] & 0x0F;
var payloadLength = payloadLengthOf(frameHeader, inputStream);
var payload = new byte[payloadLength];
int bytesRead = 0;
while (bytesRead < payloadLength) {
int read = inputStream.read(payload, bytesRead, payloadLength - bytesRead);
if (read == -1) {
throw new IOException("unexpected end of stream");
}
bytesRead += read;
}
accumulatedData.write(payload);
if (isFinalFragment) {
var completeMessage = accumulatedData.toByteArray();
if (opcode == 0x1) {
var msg = new String(completeMessage);
if (log.isTraceEnabled()) {
log.trace("> TXT: {}", msg);
}
this.socketHdl.onMessage(msg);
} else if (opcode == 0x2) {
if (log.isTraceEnabled()) {
log.trace("> BIN ({})", completeMessage.length);
}
this.socketHdl.onMessage(wrap(completeMessage));
} else if (opcode == 0xA) {
if (log.isTraceEnabled()) {
log.trace("> PONG");
}
} else if (opcode == 0x9) {
if (log.isTraceEnabled()) {
log.trace("> PING");
}
sendPong();
} else if (opcode == 0x8) {
if (completeMessage.length >= 2) {
int closeCode = ((completeMessage[0] & 0xFF) << 8) | (completeMessage[1] & 0xFF);
if (log.isTraceEnabled()) {
log.trace("> CLOSE ({})", closeCode);
}
this.socketHdl.onClose(closeCode);
} else {
log.warn("Received close frame with no close code.");
}
break;
}
accumulatedData.reset();
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("ws message processing error", e);
}
this.socketHdl.onError(e);
break;
}
}
this.close();
}

@Override public void onError(Exception ex) {
if (handler != null && handler.controlPin != null) {
handler.controlPin.onEvent(-1, null, null, false, ex);
@Override public void accept(String s) {
this.send(s);
}

@Override public void close() {
try {
socket.close();
} catch (IOException e) {
log.error("Unable to close ws socket: {} - {}", this, e.getMessage());
}
if (log.isDebugEnabled()) {
log.debug("ws connection closed");
}
}

public TkSocket withHandler(TkControlHdl handler) {
this.handler = Objects.requireNonNull(handler);
public TkSocket withHandler(TkSocketHdl hdl) {
this.socketHdl = requireNonNull(hdl);
return this;
}

public Socket getSocket() {
return socket;
}

@Override public String toString() {
return String.format("%s - %s", socket, endpoint);
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.vacco.tokoeka.util;
package io.vacco.tokoeka.audio;

public class TkAdpcm {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.vacco.tokoeka.util;
package io.vacco.tokoeka.audio;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.vacco.tokoeka.util;
package io.vacco.tokoeka.audio;

import javax.sound.sampled.*;

Expand Down
64 changes: 64 additions & 0 deletions tk-sdr/src/main/java/io/vacco/tokoeka/audio/TkNormalize.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.vacco.tokoeka.audio;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class TkNormalize {

private final List<byte[]> audioChunks = new ArrayList<>();

public void update(byte[] pcm) {
audioChunks.add(pcm);
}

public Iterator<byte[]> close() {
var normalizedChunks = new ArrayList<byte[]>();
int runningPeak = extractRunningPeak();
var normalizationFactor = calculateNormalizationFactor(runningPeak);

for (byte[] chunk : audioChunks) {
var samples = byteArrayToShortArray(chunk);
var normalizedSamples = new short[samples.length];
for (int i = 0; i < samples.length; i++) {
int normalizedSample = (int) (samples[i] * normalizationFactor);
normalizedSamples[i] = (short) Math.max(Short.MIN_VALUE, Math.min(Short.MAX_VALUE, normalizedSample));
}
normalizedChunks.add(shortArrayToByteArray(normalizedSamples));
}

return normalizedChunks.iterator();
}

private int extractRunningPeak() {
int runningPeak = 0;
for (var chunk : audioChunks) {
var samples = byteArrayToShortArray(chunk);
for (var sample : samples) {
int absValue = Math.abs(sample);
if (absValue > runningPeak) {
runningPeak = absValue;
}
}
}
return runningPeak;
}

private short[] byteArrayToShortArray(byte[] byteArray) {
var shortArray = new short[byteArray.length / 2];
ByteBuffer.wrap(byteArray).order(java.nio.ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(shortArray);
return shortArray;
}

private byte[] shortArrayToByteArray(short[] shortArray) {
var byteArray = new byte[shortArray.length * 2];
ByteBuffer.wrap(byteArray).order(java.nio.ByteOrder.LITTLE_ENDIAN).asShortBuffer().put(shortArray);
return byteArray;
}

private float calculateNormalizationFactor(int currentPeak) {
return 32767.0f / currentPeak;
}

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package io.vacco.tokoeka.util;
package io.vacco.tokoeka.audio;

import io.vacco.tokoeka.schema.TkSquelchParams;
import io.vacco.tokoeka.spi.TkSquelchPin;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static io.vacco.tokoeka.util.TkAudio.signalAverageOf;
import static io.vacco.tokoeka.audio.TkAudio.signalAverageOf;

public class TkSquelch {

Expand All @@ -30,12 +30,12 @@ public void processAudio(byte[] pcm) {
squelchOpen = true;
} else if ((currentTime - lastSignalTime) > params.tailTimeMs) {
if (squelchOpen && pin != null) {
pin.onUpdate(false, pcm, signalAvg, threshold);
pin.onSquelch(false, pcm, signalAvg, threshold);
}
squelchOpen = false;
}
if (squelchOpen && pin != null) {
pin.onUpdate(true, pcm, signalAvg, threshold);
pin.onSquelch(true, pcm, signalAvg, threshold);
}

updateNoiseFloor(signalAvg);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.vacco.tokoeka;
package io.vacco.tokoeka.handler;

import io.vacco.tokoeka.audio.TkAdpcm;
import io.vacco.tokoeka.schema.TkConfig;
import io.vacco.tokoeka.spi.TkAudioPin;
import io.vacco.tokoeka.util.*;
Expand All @@ -8,7 +9,7 @@
import java.util.Objects;
import java.util.function.Consumer;

import static io.vacco.tokoeka.util.TkAudio.*;
import static io.vacco.tokoeka.audio.TkAudio.*;
import static io.vacco.tokoeka.util.TkCommand.*;

public class TkAudioHdl {
Expand Down
Loading

0 comments on commit a306a0c

Please sign in to comment.