diff --git a/Makefile b/Makefile index 4d87daf..7412b4c 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,9 @@ publish: generate deploy: publish +centrifugo-it: + @docker run -it --rm --ulimit nofile=65536:65536 -p 8000:8000 --name centrifugo -v $(PWD)/config.json:/centrifugo/config.json centrifugo/centrifugo:latest centrifugo --client_insecure --admin --admin_insecure --log_level=debug -c config.json + centrifugo-up: @docker run -d --rm --ulimit nofile=65536:65536 -p 8000:8000 --name centrifugo centrifugo/centrifugo:latest centrifugo --client_insecure --admin --admin_insecure --log_level=debug diff --git a/lib/src/spinify_impl.dart b/lib/src/spinify_impl.dart index 8a31bdb..2d6f937 100644 --- a/lib/src/spinify_impl.dart +++ b/lib/src/spinify_impl.dart @@ -229,6 +229,14 @@ base mixin SpinifyCommandMixin on SpinifyBase { _replies.remove(reply.id)?.complete(reply); await super._onReply(reply); } + + @override + Future _onDisconnect(({int? code, String? reason}) arg) async { + for (final completer in _replies.values) { + completer.completeError(StateError('Client is disconnected')); + } + await super._onDisconnect(arg); + } } /// Base mixin for Spinify client connection management (connect & disconnect). @@ -244,9 +252,7 @@ base mixin SpinifyConnectionMixin await super._onConnect(url); try { // Disconnect previous transport if exists. - _transport - ?.disconnect(SpinifyConnectingCode.connectCalled, 'Reconnecting') - .ignore(); + _transport?.disconnect(1000, 'Reconnecting').ignore(); // Create new transport. _transport = await _createTransport(url, config.headers) @@ -274,15 +280,10 @@ base mixin SpinifyConnectionMixin _readyCompleter?.complete(); _readyCompleter = null; } on Object catch (error, stackTrace) { - _transport - ?.disconnect( - SpinifyConnectingCode.transportClosed, 'Failed to connect') - .ignore(); - _transport = null; - _setState(SpinifyState$Disconnected( - closeCode: SpinifyConnectingCode.connectCalled, - closeReason: 'Failed to connect', - timestamp: DateTime.now())); + await _onDisconnect(( + code: SpinifyConnectingCode.transportClosed, + reason: 'Failed to connect' + )).catchError((_) {}); _readyCompleter?.completeError(error, stackTrace); rethrow; } @@ -317,15 +318,14 @@ base mixin SpinifyConnectionMixin @override Future _onDisconnect(({int? code, String? reason}) arg) async { - await _transport?.disconnect(arg.code, arg.reason); + await _transport?.disconnect(1000, arg.reason); _transport = null; await super._onDisconnect(arg); } @override Future _onClose() async { - await _transport?.disconnect( - SpinifyDisconnectedCode.disconnectCalled, 'Client closing'); + await _transport?.disconnect(1000, 'Client closing'); _transport = null; await super._onClose(); } diff --git a/lib/src/transport_ws_pb_vm.dart b/lib/src/transport_ws_pb_vm.dart index fdfc5b8..3bdc910 100644 --- a/lib/src/transport_ws_pb_vm.dart +++ b/lib/src/transport_ws_pb_vm.dart @@ -15,7 +15,11 @@ Future $create$WS$PB$Transport( Map headers, ) async { // ignore: close_sinks - final socket = await io.WebSocket.connect(url, headers: headers); + final socket = await io.WebSocket.connect( + url, + headers: headers, + protocols: {'centrifuge-protobuf'}, + ); final transport = SpinifyTransport$WS$PB$VM(socket); // 0 CONNECTING Socket has been created. The connection is not yet open. // 1 OPEN The connection is open and ready to communicate. @@ -66,6 +70,6 @@ final class SpinifyTransport$WS$PB$VM implements ISpinifyTransport { Future disconnect([int? code, String? reason]) async { await _subscription.cancel(); await _socket.close(code, reason); - assert(_socket.readyState == io.WebSocket.closed, 'Socket is not closed'); + //assert(_socket.readyState == io.WebSocket.closed, 'Socket is not closed'); } } diff --git a/test/smoke/smoke_test.dart b/test/smoke/smoke_test.dart new file mode 100644 index 0000000..72a9313 --- /dev/null +++ b/test/smoke/smoke_test.dart @@ -0,0 +1,17 @@ +import 'package:spinify/spinify.dart'; +import 'package:test/test.dart'; + +void main() { + group('Smoke test', () { + const url = 'ws://localhost:8000/connection/websocket'; + test('Connection', () async { + final client = Spinify(); + await client.connect(url); + expect(client.state, isA()); + await client.disconnect(); + expect(client.state, isA()); + await client.close(); + expect(client.state, isA()); + }); + }); +} diff --git a/test/unit/spinify_test.dart b/test/unit/spinify_test.dart index c5f1b81..6fa826c 100644 --- a/test/unit/spinify_test.dart +++ b/test/unit/spinify_test.dart @@ -48,14 +48,5 @@ void main() { isA() ])); }); - - /* const url = 'ws://localhost:8000/connection/websocket'; - test('Connection', () async { - final client = Spinify(); - await client.connect(url); - expect(client.state, isA()); - await client.disconnect(); - expect(client.state, isA()); - }); */ }); }