Skip to content

Commit

Permalink
refactor: Update WebSocket handling and Protobuf transport in smoke_t…
Browse files Browse the repository at this point in the history
…est.dart

This commit refactors the WebSocket handling and adds support for Protobuf transport in the smoke_test.dart file. It improves the RPC testing by updating the timeouts from [1000, 100, 10, 100, 0] to [200, 50, 10, 25, 0]. Additionally, it introduces reconnecting functionality in the RPC disconnect method in the echo.go file. The client is now disconnected with reconnection if the data parameter contains the string "reconnect". This enhancement improves the flexibility and reliability of the RPC functionality.
  • Loading branch information
PlugFox committed Jul 20, 2024
1 parent 8fd27a9 commit 9a2fa88
Showing 1 changed file with 46 additions and 29 deletions.
75 changes: 46 additions & 29 deletions test/smoke/transport_ws_pb_js_test.dart
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
// ignore_for_file: unused_import, directives_ordering

import 'dart:async';
import 'dart:convert';
import 'dart:js_interop' as js;
import 'dart:typed_data';

import 'package:protobuf/protobuf.dart' as pb;
import 'package:spinify/spinify.dart';
import 'package:spinify/src/protobuf/client.pb.dart' as pb;
import 'package:spinify/src/transport_ws_pb_js.dart';
import 'package:test/test.dart';
import 'package:web/web.dart' as web;

Expand Down Expand Up @@ -86,20 +82,44 @@ void main() => group('Transport_WS_JS', () {
['centrifuge-protobuf'.toJS].toJS,
);
await socket.onOpen.first;
final messages = StreamController<pb.Reply>.broadcast();
socket.onMessage
.map((e) => e.data)
.cast<web.Blob>()
.asyncMap((b) => b.arrayBuffer().toDart)
.map((a) => a.toDart.asUint8List())
.map(pb.CodedBufferReader.new)
.map((reader) {
final reply = pb.Reply();
reader.readMessage(reply, pb.ExtensionRegistry.EMPTY);
return reply;
})
.where((_) => !messages.isClosed)
.listen(messages.add);
final onClose = StreamController<web.CloseEvent>();
final onMessage = StreamController<pb.Reply>.broadcast();
final events = StreamController<web.Event>()
..stream
.asyncMap<void>((event) async {
switch (event.type) {
case 'close':
onClose.add(event as web.CloseEvent);
case 'message':
final blob = (event as web.MessageEvent).data as web.Blob;
final buffer = await blob.arrayBuffer().toDart;
final bytes = buffer.toDart.asUint8List();
final reader = pb.CodedBufferReader(bytes);
final reply = pb.Reply();
reader.readMessage(reply, pb.ExtensionRegistry.EMPTY);
onMessage.add(reply);
default:
throw UnsupportedError(
'Unsupported event type: ${event.type}');
}
})
.drain<void>()
.ignore();
socket
..onMessage
.map((e) => e.data)
.cast<web.Blob>()
.asyncMap((b) => b.arrayBuffer().toDart)
.map((a) => a.toDart.asUint8List())
.map(pb.CodedBufferReader.new)
.map((reader) {
final reply = pb.Reply();
reader.readMessage(reply, pb.ExtensionRegistry.EMPTY);
return reply;
})
.where((_) => !onMessage.isClosed)
.listen(onMessage.add)
..onClose.listen(events.add);

void send(pb.Command command) {
final commandData = command.writeToBuffer();
Expand All @@ -115,7 +135,7 @@ void main() => group('Transport_WS_JS', () {
connect: pb.ConnectRequest(name: 'test'),
));
await expectLater(
messages.stream.first,
onMessage.stream.first,
completion(
isA<pb.Reply>()
.having((r) => r.id, 'id', equals(1))
Expand All @@ -132,17 +152,16 @@ void main() => group('Transport_WS_JS', () {
data: utf8.encode('permanent'),
),
));
/* await expectLater(
messages.stream.first,
await expectLater(
onMessage.stream.first,
completion(
isA<pb.Reply>()
.having((r) => r.id, 'id', equals(2))
.having((r) => r.hasRpc(), 'hasRpc', isTrue),
),
);
expect(socket.readyState, equals(web.WebSocket.OPEN)); */
await expectLater(
socket.onClose.first,
onClose.stream.first,
completion(
isA<web.CloseEvent>()
.having(
Expand All @@ -158,8 +177,11 @@ void main() => group('Transport_WS_JS', () {
),
);
expect(socket.readyState, equals(web.WebSocket.CLOSED));
await Future<void>.delayed(const Duration(milliseconds: 250));
} finally {
messages.close().ignore();
onClose.close().ignore();
onMessage.close().ignore();
events.close().ignore();
if (socket.readyState == web.WebSocket.OPEN) {
socket.close(1000, 'Normal closure');
}
Expand All @@ -169,8 +191,3 @@ void main() => group('Transport_WS_JS', () {
}, onPlatform: {
'dart-vm': const Skip('Only runs on the browser.'),
});

// < {"connect":{"name":"js"},"id":1}
// > {"id":1,"connect":{"client":"39939319-e710-43b4-8dda-58ebc92ae5f8","version":"0.0.0","data":{},"subs":{"#42":{"recoverable":true,"epoch":"LTyI","offset":5,"positioned":true},"notification:index":{"recoverable":true,"epoch":"KTeT","offset":5,"positioned":true}},"ping":2,"pong":true}}
// < {"rpc":{"method":"disconnect","data":"permanent"},"id":2}
// > {"id":2,"rpc":{}}

0 comments on commit 9a2fa88

Please sign in to comment.