Skip to content

Commit

Permalink
- WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jjzazuet committed Oct 13, 2024
1 parent d3f3a69 commit 51900c0
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 59 deletions.
4 changes: 2 additions & 2 deletions tk-sdr/src/main/java/io/vacco/tokoeka/TkSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public TkSocket connect() {
public void listen() {
while (!socket.isClosed()) {
try {
var stop = handleMessage(socketHdl, socketState, socketConn, inputStream, outputStream);
var stop = handleMessage(socketHdl, socketConn, inputStream, outputStream);
if (stop) {
tearDown(outputStream, socketConn, socketState, socketHdl);
tearDown(outputStream, socketConn, socketHdl);
break;
}
} catch (Exception e) {
Expand Down
4 changes: 2 additions & 2 deletions tk-sdr/src/main/java/io/vacco/tokoeka/TkSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ private void handleClient(Socket clientSocket) {
var socketConn = new TkConnAdapter(clientSocket, socketState, msg -> send(msg, outputStream));
this.socketHdl.onOpen(socketConn, handshakeResponse);
while (!clientSocket.isClosed()) {
var stop = handleMessage(this.socketHdl, socketState, socketConn, inputStream, outputStream);
var stop = handleMessage(this.socketHdl, socketConn, inputStream, outputStream);
if (stop) {
tearDown(outputStream, socketConn, socketState, socketHdl);
tearDown(outputStream, socketConn, socketHdl);
break;
}
}
Expand Down
36 changes: 18 additions & 18 deletions tk-sdr/src/main/java/io/vacco/tokoeka/handler/TkControlHdl.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ public class TkControlHdl implements TkSocketHdl {

private final TkConfig config;

private TkAudioHdl audioHdl;
private TkWaterfallHdl waterfallHdl;
private TkJsonIn jsonIn;
private TkConfigPin configPin;
protected TkControlPin controlPin;
private TkAudioHdl audioHdl;
private TkWaterfallHdl waterfallHdl;
private TkJsonIn jsonIn;
private TkConfigPin configPin;
protected TkSdrPin sdrPin;

private Function<TkConn, Boolean> controlFn;

Expand All @@ -40,13 +40,13 @@ public TkControlHdl(TkConfig config) {
this.config = requireNonNull(config);
}

public void controlEvent(int wsCode, String key, String value, boolean remote, Exception e) {
if (this.controlPin != null) {
this.controlPin.onEvent(wsCode, key, value, remote, e);
public void sdrEvent(TkConn conn, String key, String value, Exception e) {
if (this.sdrPin != null) {
this.sdrPin.onEvent(conn, key, value, e);
}
}

private void processKeyValue(String key, String value) {
private void processKeyValue(TkConn conn, String key, String value) {
switch (key) {
case last_community_download: log.info(URLDecoder.decode(value, StandardCharsets.UTF_8)); break;
case bandwidth: this.config.frequencyMax = parseDouble(value); break;
Expand All @@ -68,20 +68,20 @@ private void processKeyValue(String key, String value) {
case badp:
case too_busy:
case redirect:
case down: this.controlEvent(-1, key, value, true, null); break;
case down: this.sdrEvent(conn, key, value, null); break;
default:
if (log.isDebugEnabled()) {
log.debug("Unknown message key/value: {} -> {}", key, shorten(value));
}
}
}

private void processMsg(String body) {
private void processMsg(TkConn conn, String body) {
var params = parseParameters(body);
if (log.isTraceEnabled()) {
log.trace(">> {} {} {}", MSG, shorten(body), shorten(params.toString()));
}
params.forEach(this::processKeyValue);
params.forEach((key, value) -> processKeyValue(conn, key, value));
}

private void processAudio(ByteBuffer data) {
Expand Down Expand Up @@ -121,7 +121,7 @@ private void processWaterfall(ByteBuffer data) {
}

switch (tag) {
case MSG: processMsg(asString(skip(data, 1))); break;
case MSG: processMsg(conn, asString(skip(data, 1))); break;
case SND: processAudio(data); break;
case WF: processWaterfall(skip(data, 1)); break;
// case "EXT": processExt(asString(skip(data, 1))); break; TODO what should be implemented?
Expand All @@ -134,12 +134,12 @@ private void processWaterfall(ByteBuffer data) {

@Override public void onMessage(TkConn conn, String message) {}

@Override public void onClose(TkConn conn, TkSocketState state) {
this.controlEvent(state.closeCode, null, state.closeReason, state.closeRemote, null);
@Override public void onClose(TkConn conn) {
this.sdrEvent(conn, null, null, null);
}

@Override public void onError(TkConn conn, Exception e) {
this.controlEvent(-1, null, null, false, e);
this.sdrEvent(conn, null, null, e);
}

public TkControlHdl withAudioHandler(TkAudioHdl hdl) {
Expand All @@ -157,8 +157,8 @@ public TkControlHdl withJsonIn(TkJsonIn jsonIn) {
return this;
}

public TkControlHdl withControlPin(TkControlPin pin) {
this.controlPin = requireNonNull(pin);
public TkControlHdl withSdrPin(TkSdrPin pin) {
this.sdrPin = requireNonNull(pin);
return this;
}

Expand Down
6 changes: 6 additions & 0 deletions tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkConn.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package io.vacco.tokoeka.spi;

import io.vacco.tokoeka.util.TkSocketState;
import java.net.Socket;
import java.util.function.Consumer;

public interface TkConn extends Consumer<String> {

void setAttachment(Object attachment);
<T> T getAttachment();

Socket getSocket();
TkSocketState getState();

void close(int code);
void close(int code, String msg);

}
5 changes: 0 additions & 5 deletions tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkControlPin.java

This file was deleted.

5 changes: 5 additions & 0 deletions tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSdrPin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.vacco.tokoeka.spi;

public interface TkSdrPin {
void onEvent(TkConn conn, String key, String value, Exception e);
}
3 changes: 1 addition & 2 deletions tk-sdr/src/main/java/io/vacco/tokoeka/spi/TkSocketHdl.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package io.vacco.tokoeka.spi;

import io.vacco.tokoeka.util.TkSocketState;
import java.nio.ByteBuffer;

public interface TkSocketHdl {

void onOpen(TkConn conn, String handShake);
void onMessage(TkConn conn, String msg);
void onMessage(TkConn conn, ByteBuffer msg);
void onClose(TkConn conn, TkSocketState state);
void onClose(TkConn conn);
void onError(TkConn conn, Exception e);

}
7 changes: 5 additions & 2 deletions tk-sdr/src/main/java/io/vacco/tokoeka/util/TkConnAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.vacco.tokoeka.spi.TkConn;
import java.net.Socket;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class TkConnAdapter implements TkConn {
Expand Down Expand Up @@ -35,6 +34,10 @@ public TkConnAdapter(Socket socket, TkSocketState socketState, Consumer<String>
return socket;
}

@Override public TkSocketState getState() {
return socketState;
}

@Override public void close(int code) {
socketState.markClosed(code, null, false);
}
Expand All @@ -44,7 +47,7 @@ public TkConnAdapter(Socket socket, TkSocketState socketState, Consumer<String>
}

@Override public String toString() {
return socket.toString();
return String.format("%s, %s", socket, socketState);
}

}
19 changes: 10 additions & 9 deletions tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSocketState.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.vacco.tokoeka.util;

import java.io.ByteArrayOutputStream;
import java.util.Objects;

public class TkSocketState {

Expand All @@ -12,20 +11,20 @@ public class TkSocketState {
public long keepAliveMs;
public int maxFrameBytes;

public Integer closeCode;
public int closeCode = Integer.MIN_VALUE;
public String closeReason;
public boolean closeRemote;
public boolean closeByRemote;

public Object attachment;

public void markClosed(Integer closeCode, String closeReason, boolean closeRemote) {
this.closeCode = Objects.requireNonNull(closeCode);
public void markClosed(int closeCode, String closeReason, boolean closeByRemote) {
this.closeCode = closeCode;
this.closeReason = closeReason;
this.closeRemote = closeRemote;
this.closeByRemote = closeByRemote;
}

public boolean isClosed() {
return closeCode != null;
return closeCode != Integer.MIN_VALUE;
}

public static TkSocketState of(long keepAliveMs, int maxFrameBytes) {
Expand All @@ -37,8 +36,10 @@ public static TkSocketState of(long keepAliveMs, int maxFrameBytes) {

@Override public String toString() {
return String.format(
"lpi: %d, lpo: %d, clc: %s, clr: %s, clRm: %s",
lastPingMs, lastPongMs, closeCode, closeReason, closeRemote
"piMs: %d, poMs: %d, cl: %d, clr: %s, clRm: %s",
lastPingMs, lastPongMs,
closeCode == Integer.MIN_VALUE ? -1 : closeCode,
closeReason, closeByRemote
);
}

Expand Down
15 changes: 8 additions & 7 deletions tk-sdr/src/main/java/io/vacco/tokoeka/util/TkSockets.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,14 @@ public static void send(String message, OutputStream outputStream) {
}
}

public static void tearDown(OutputStream outputStream, TkConn socketConn,
TkSocketState socketState, TkSocketHdl socketHdl) {
public static void tearDown(OutputStream outputStream, TkConn socketConn, TkSocketHdl socketHdl) {
try {
if (socketState.closeCode != null) {
if (!socketState.closeRemote) {
sendClose(outputStream, socketState.closeCode, socketState.closeReason);
var state = socketConn.getState();
if (state.isClosed()) {
if (!state.closeByRemote) {
sendClose(outputStream, state.closeCode, state.closeReason);
}
socketHdl.onClose(socketConn, socketState);
socketHdl.onClose(socketConn);
}
} catch (Exception e) {
socketHdl.onError(socketConn, e);
Expand All @@ -255,8 +255,9 @@ else if (payloadLength == 127) {
return payloadLength;
}

public static boolean handleMessage(TkSocketHdl socketHdl, TkSocketState socketState, TkConn conn,
public static boolean handleMessage(TkSocketHdl socketHdl, TkConn conn,
InputStream inputStream, OutputStream outputStream) throws IOException, InterruptedException {
var socketState = conn.getState();
if (socketState.isClosed()) {
return true;
}
Expand Down
8 changes: 5 additions & 3 deletions tk-sdr/src/test/java/io/vacco/tokoeka/TkControlHdlTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.vacco.tokoeka.handler.*;
import io.vacco.tokoeka.schema.*;
import io.vacco.tokoeka.spi.TkConn;
import io.vacco.tokoeka.util.TkSocketState;
import j8spec.annotation.DefinedOrder;
import j8spec.junit.J8SpecRunner;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -38,14 +39,14 @@ public class TkControlHdlTest {
var cfg = new TkConfig();
cfg.modulation = TkModulation.am;

var state = TkSocketState.of(-1, 65536);
var conn = new TkConn() {
@Override public void accept(String s) { log.info(s); }
@Override public void setAttachment(Object attachment) {}
@Override public <T> T getAttachment() { return null; }
@Override public Socket getSocket() { return null; }
@Override public void close(int code) {
log.info("close - [{}]", code);
}
@Override public TkSocketState getState() { return state; }
@Override public void close(int code) { log.info("close - [{}]", code); }
@Override public void close(int code, String msg) {
log.info("close - [{}, {}]", code, msg);
}
Expand Down Expand Up @@ -75,6 +76,7 @@ public class TkControlHdlTest {
log.info("Ref command ==> {}", shorten(msg.data));
}
}

for (var msg : wfMessages) {
if ("receive".equals(msg.type)) {
var bytes = ByteBuffer.wrap(Base64.getDecoder().decode(msg.data));
Expand Down
16 changes: 7 additions & 9 deletions tk-sdr/src/test/java/io/vacco/tokoeka/TkSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,13 @@ public class TkSocketTest {
squelch.processAudio(rawPcm);
player.play(sampleRate, rawPcm);
}))
.withControlPin((code, key, value, remote, e) -> {
log.info("control event: {} [{}] [{}] {}", code, key, value, remote, e);
var isError = code > 1000
|| (value != null && value.equals("Operation timed out"))
|| (code == -1 && key == null && value == null && !remote);
if (isError) {
go[0] = false;
}
})
.withSdrPin(((conn, key, value, e) -> {
log.info("sdr event: {} [{}, {}]", conn, key, value, e);
var state0 = conn.getState();
var code = state0.closeCode;
var isError = (code > 1000) || (value != null && value.equals("Operation timed out"));
go[0] = !isError;
}))
.withControlFn(conn -> go[0] && nowMsDiffLt(nowMs, 120_000));

sock.withHandler(ctlHdl);
Expand Down

0 comments on commit 51900c0

Please sign in to comment.