Skip to content

Commit

Permalink
Update ws client
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 16, 2023
1 parent 2355847 commit 55c7e85
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 13 deletions.
26 changes: 26 additions & 0 deletions examples/console/bin/main.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import 'dart:async';
import 'dart:io' as io show exit;

import 'package:centrifuge_dart/centrifuge.dart';

void main([List<String>? args]) {
final client = Centrifuge(
CentrifugeConfig(
client: (
name: 'Centrifuge Console Example',
version: '0.0.1',
),
),
)..connect('ws://localhost:8000/connection/websocket?format=protobuf');

// TODO(plugfox): Read from stdin and send to channel.

Timer(
const Duration(seconds: 1),
() async {
await client.close();
await Future<void>.delayed(const Duration(seconds: 1));
io.exit(0);
},
);
}
46 changes: 46 additions & 0 deletions examples/console/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: centrifuge_dart_example_console

description: >
Dart client to communicate with Centrifuge and Centrifugo from Flutter and VM
over WebSockets
version: 0.0.1-pre.1

publish_to: 'none'

homepage: https://centrifugal.dev

repository: https://github.com/PlugFox/centrifuge-client

issue_tracker: https://github.com/PlugFox/centrifuge-client/issues

funding:
- https://www.buymeacoffee.com/plugfox
- https://www.patreon.com/plugfox
- https://boosty.to/plugfox

topics:
- centrifugo
- centrifuge
- websocket
- cross-platform
- client

platforms:
android:
ios:
linux:
macos:
web:
windows:

#screenshots:
# - description: 'Example of using the ws library to connect to a Centrifugo server.'
# path: example.png

environment:
sdk: '>=3.0.0 <4.0.0'

dependencies:
centrifuge_dart:
path: ../../
33 changes: 21 additions & 12 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ abstract base class CentrifugeBase implements ICentrifuge {
js: () => WebSocketOptions.js(
connectionRetryInterval: config.connectionRetryInterval,
protocols: _$protocolsCentrifugeProtobuf,
timeout: config.timeout,
useBlobForBinary: false,
),
vm: () => WebSocketOptions.vm(
connectionRetryInterval: config.connectionRetryInterval,
protocols: _$protocolsCentrifugeProtobuf,
timeout: config.timeout,
headers: config.headers,
),
),
Expand All @@ -54,24 +57,29 @@ abstract base class CentrifugeBase implements ICentrifuge {

/// State controller.
/// {@nodoc}
@nonVirtual
final StreamController<CentrifugeState> _stateController;

/// Websocket client.
/// {@nodoc}
@nonVirtual
final WebSocketClient _webSocket;

@override
@nonVirtual
CentrifugeState get state => _state;

/// Current state of client.
/// {@nodoc}
@nonVirtual
CentrifugeState _state;

@override
late Stream<CentrifugeState> states = _stateController.stream;

/// Centrifuge config.
/// {@nodoc}
@nonVirtual
final CentrifugeConfig _config;

/// Init centrifuge client, override this method to add custom logic.
Expand All @@ -81,14 +89,6 @@ abstract base class CentrifugeBase implements ICentrifuge {
@mustCallSuper
void _initCentrifuge() {}

/// {@nodoc}
@protected
@nonVirtual
void _setState(CentrifugeState state) {
if (_state.type == state.type) return;
_stateController.add(_state = state);
}

@override
@mustCallSuper
Future<void> close() async {}
Expand All @@ -100,8 +100,19 @@ abstract base class CentrifugeBase implements ICentrifuge {
base mixin CentrifugeConnectionMixin on CentrifugeBase {
StreamSubscription<WebSocketClientState>? _webSocketStateSubscription;

/// {@nodoc}
@protected
@nonVirtual
@pragma('vm:prefer-inline')
@pragma('dart2js:tryInline')
void _setState(CentrifugeState state) {
if (_state.type == state.type) return;
_stateController.add(_state = state);
}

@override
void _initCentrifuge() {
/// Listen to websocket state changes and update current client state.
_webSocketStateSubscription = _webSocket.stateChanges.listen((state) {
switch (state) {
case WebSocketClientState$Connecting state:
Expand All @@ -122,7 +133,6 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase {
try {
await _webSocket.connect(url);
} on Object catch (error, stackTrace) {
_setState(CentrifugeState$Disconnected());
Error.throwWithStackTrace(
CentrifugoConnectionException(error),
stackTrace,
Expand All @@ -135,9 +145,8 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase {
try {
await _webSocket.disconnect();
} on Object catch (error, stackTrace) {
_setState(CentrifugeState$Disconnected());
Error.throwWithStackTrace(
CentrifugoConnectionException(error),
CentrifugoDisconnectionException(error),
stackTrace,
);
}
Expand All @@ -148,6 +157,6 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase {
Future<void> close() async {
await super.close();
await _webSocket.close();
await _webSocketStateSubscription?.cancel();
_webSocketStateSubscription?.cancel().ignore();
}
}
10 changes: 10 additions & 0 deletions lib/src/model/config.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ import 'package:meta/meta.dart';
/// Centrifuge client common options.
///
/// There are several common options available when creating Client instance.
///
/// - [connectionRetryInterval] - tweaks for reconnect backoff
/// - [client] - the user's client name and version
/// - [headers] - headers that are set when connecting the web socket
/// - [timeout] - maximum time to wait for the connection to be established
/// {@endtemplate}
@immutable
final class CentrifugeConfig {
/// {@macro centrifuge_config}
CentrifugeConfig({
({Duration min, Duration max})? connectionRetryInterval,
({String name, String version})? client,
this.timeout = const Duration(seconds: 15),
this.headers,
}) : connectionRetryInterval = connectionRetryInterval ??
(
Expand Down Expand Up @@ -59,6 +65,10 @@ final class CentrifugeConfig {
/// Note that headers are ignored on the web platform.
final Map<String, Object?>? headers;

/// Maximum time to wait for the connection to be established.
/// If not specified, the timeout will be 15 seconds.
final Duration timeout;

@override
String toString() => 'CentrifugeConfig{}';
}
11 changes: 11 additions & 0 deletions lib/src/model/exception.dart
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,14 @@ final class CentrifugoConnectionException extends CentrifugoException {
error,
);
}

/// {@macro exception}
final class CentrifugoDisconnectionException extends CentrifugoException {
/// {@macro exception}
const CentrifugoDisconnectionException([Object? error])
: super(
'centrifugo_disconnection_exception',
'Connection problem',
error,
);
}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ dependencies:
meta: ^1.9.1

# WebSockets
ws: ^1.0.0-pre.1
ws: ^1.0.0-pre.3

# Protocol Buffers
protobuf: ^3.0.0
Expand Down

0 comments on commit 55c7e85

Please sign in to comment.