Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
- fix subscription callback fail to work for sync fetch
Browse files Browse the repository at this point in the history
- sync data when `total` mismatch
- subscription stop when Sub is not running
  • Loading branch information
ogios committed Oct 29, 2023
1 parent 02df387 commit 87905b3
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
12 changes: 7 additions & 5 deletions lib/api/fetch.dart
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class AsyncFetcher {
if (running) return;
running = true;
syncData(null);
_timer = Timer.periodic(const Duration(seconds: 5), syncData);
sub.setCallback(syncData);
_timer = Timer.periodic(const Duration(seconds: 20), syncData);
sub.setCallback((){syncData(null);});
sub.startSub();
}

Expand Down Expand Up @@ -110,7 +110,6 @@ class AsyncFetcher {
SocketOut so = SocketOut();
so.addBytes(Uint8List.fromList("fetch".codeUnits));
so.addBytes(SocketOut.getLength(0));
log("total: $total");
so.addBytes(SocketOut.getLength(total));
return so;
}
Expand All @@ -129,7 +128,6 @@ class AsyncFetcher {
await so.writeTo(socket);
SocketIn sin = SocketIn(conn: socket);
Uint8List status = await sin.getSec();
log("Status: $status");
if (status[0] == 200) {
Uint8List data = await sin.getSec();
return utf8.decode(data);
Expand All @@ -153,7 +151,11 @@ class AsyncFetcher {
if (!response.containsKey("total") || !response.containsKey("data")) {
throw Exception("No total/id/data messages provided: $response");
}
this.total = (response["total"]) as int;
int rec_total = (response["total"]) as int;
if (rec_total > total) {
this.syncData(null);
}
this.total = rec_total;
List<dynamic> metas = response["data"] as List<dynamic>;
List<Message> processed = [];
for (Map<String, dynamic> meta in metas) {
Expand Down
7 changes: 5 additions & 2 deletions lib/api/subscribe.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:developer';
import 'dart:io';
import 'dart:typed_data';

Expand All @@ -20,8 +21,10 @@ class MsgSubcribe {
}

void onMsgCallback(Uint8List data) {
log("receiev from udps: $data");
if (data.length == 1) {
if (data[0] == 2) {
log("received notify");
callback();
} else if (data[0] == 1) {
this.socket!.send(
Expand All @@ -35,7 +38,7 @@ class MsgSubcribe {
void startSub() {
if (this.running) return;
() async {
var socket = await RawDatagramSocket.bind(InternetAddress("0.0.0.0"), GlobalConfig.u_port);
var socket = await RawDatagramSocket.bind(InternetAddress("0.0.0.0"), 15012);
this.running = true;
this.socket = socket;
socket.listen((e) async {
Expand All @@ -58,7 +61,7 @@ class MsgSubcribe {

Future<void> sub(Timer? t) async {
this.subed = false;
while (this.subed) {
while (!this.subed && this.running) {
this.socket!.send("sub".codeUnits, InternetAddress(GlobalConfig.u_host),
GlobalConfig.u_port);
await Future.delayed(Duration(seconds: 3));
Expand Down

0 comments on commit 87905b3

Please sign in to comment.