Skip to content

Commit

Permalink
Update connection
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 21, 2023
1 parent 88c8c20 commit b582e11
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 36 deletions.
3 changes: 1 addition & 2 deletions examples/console/bin/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ void main([List<String>? args]) => runZonedGuarded<void>(() async {
},
);
}, (error, stackTrace) {
print('Error: $error');
print('Stacktrace: $stackTrace');
print('Critical error: $error');
io.exit(1);
});

Expand Down
131 changes: 114 additions & 17 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
// TODO(plugfox): extract transport from Centrifuge client.
import 'dart:async';

import 'package:centrifuge_dart/src/client/centrifuge_interface.dart';
import 'package:centrifuge_dart/src/model/config.dart';
import 'package:centrifuge_dart/src/model/exception.dart';
import 'package:centrifuge_dart/src/model/protobuf/client.pb.dart' as pb;
import 'package:centrifuge_dart/src/model/state.dart';
import 'package:meta/meta.dart';
import 'package:ws/ws.dart';

/// {@template centrifuge}
/// Centrifuge client.
/// {@endtemplate}
final class Centrifuge extends CentrifugeBase with CentrifugeConnectionMixin {
final class Centrifuge extends CentrifugeBase
with
CentrifugePingMixin,
CentrifugeConnectionMixin,
CentrifugeHandlerMixin {
/// {@macro centrifuge}
Centrifuge([CentrifugeConfig? config])
: super(config ?? CentrifugeConfig.defaultConfig());
Expand Down Expand Up @@ -97,33 +103,36 @@ abstract base class CentrifugeBase implements ICentrifuge {
/// Mixin responsible for connection.
/// {@nodoc}
@internal
base mixin CentrifugeConnectionMixin on CentrifugeBase {
base mixin CentrifugeConnectionMixin on CentrifugeBase, CentrifugePingMixin {
StreamSubscription<WebSocketClientState>? _webSocketStateSubscription;

/// {@nodoc}
@protected
@nonVirtual
@pragma('vm:prefer-inline')
@pragma('dart2js:tryInline')
void _setState(CentrifugeState state) {
if (_state.type == state.type) return;
_stateController.add(_state = state);
}

@override
void _initCentrifuge() {
/// Listen to websocket state changes and update current client state.
_webSocketStateSubscription = _webSocket.stateChanges.listen((state) {
switch (state) {
case WebSocketClientState$Connecting state:
_setState(CentrifugeState$Connecting(url: state.url));
case WebSocketClientState$Open _:
_setState(CentrifugeState$Connected(url: state.url));
case WebSocketClientState$Disconnecting _:
case WebSocketClientState$Closed _:
_setState(CentrifugeState$Disconnected());
}
});
// Listen to websocket state changes and update current client state.
_webSocketStateSubscription = _webSocket.stateChanges.listen(
(state) {
switch (state) {
case WebSocketClientState$Connecting state:
_setState(CentrifugeState$Connecting(url: state.url));
case WebSocketClientState$Open _:
_setState(CentrifugeState$Connected(url: state.url));
_setUpPingTimer();
case WebSocketClientState$Disconnecting _:
case WebSocketClientState$Closed _:
_tearDownPingTimer();
_setState(CentrifugeState$Disconnected());
}
},
cancelOnError: false,
);
super._initCentrifuge();
}

Expand All @@ -132,6 +141,27 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase {
_setState(CentrifugeState$Connecting(url: url));
try {
await _webSocket.connect(url);
} on Object catch (error, stackTrace) {
_webSocket.disconnect().ignore();
Error.throwWithStackTrace(
CentrifugoConnectionException(error),
stackTrace,
);
}
pb.ConnectRequest request;
try {
request = pb.ConnectRequest();
final token = await _config.getToken?.call();
assert(token == null || token.length > 5, 'Centrifuge JWT is too short');
if (token != null) request.token = token;
final payload = await _config.getPayload?.call();
if (payload != null) request.data = payload;
request
..name = _config.client.name
..version = _config.client.version;
// TODO(plugfox): add subscriptions.

// TODO(plugfox): Send request.
} on Object catch (error, stackTrace) {
Error.throwWithStackTrace(
CentrifugoConnectionException(error),
Expand All @@ -153,10 +183,77 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase {
}

@override
@mustCallSuper
Future<void> close() async {
await super.close();
await _webSocket.close();
_webSocketStateSubscription?.cancel().ignore();
}
}

/// Mixin responsible for responses/push/results from server.
/// {@nodoc}
@internal
base mixin CentrifugeHandlerMixin on CentrifugeBase {
StreamSubscription<Object>? _webSocketDataSubscription;
@override
void _initCentrifuge() {
// Listen to websocket data and handle it.
_webSocketDataSubscription =
_webSocket.stream.listen(_handleMessage, cancelOnError: false);
super._initCentrifuge();
}

@override
Future<void> close() async {
await super.close();
_webSocketDataSubscription?.cancel().ignore();
}

/// {@nodoc}
@protected
@nonVirtual
@pragma('vm:prefer-inline')
@pragma('dart2js:tryInline')
void _handleMessage(Object response) {
print('Received data: $response');
}
}

/// Mixin responsible for sending ping to server.
/// {@nodoc}
base mixin CentrifugePingMixin on CentrifugeBase {
Timer? _pingTimer;

@nonVirtual
void _setUpPingTimer() {}

@nonVirtual
void _tearDownPingTimer() {
_pingTimer?.cancel();
}

/* @override
Future<void> send(List<int> data) async {
final request = pb.Message()..data = data;
final command = _createCommand(
request,
true,
);
await _webSocket.add(request);
/* try {
final request = protocol.Message()..data = data;
await _webSocket.add(data);
} on Object catch (error, stackTrace) {
Error.throwWithStackTrace(
CentrifugoSendException(error),
stackTrace,
);
} */
} */

@override
Future<void> close() {
_tearDownPingTimer();
return super.close();
}
}
11 changes: 4 additions & 7 deletions lib/src/client/centrifuge_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ abstract interface class ICentrifuge {
/// [url] is a URL of endpoint.
Future<void> connect(String url);

// Send asynchronous message to a server. This method makes sense
// only when using Centrifuge library for Go on a server side. In Centrifugo
// asynchronous message handler does not exist.
//Future<void> send(List<int> data);
/// Send asynchronous message to a server. This method makes sense
/// only when using Centrifuge library for Go on a server side. In Centrifugo
/// asynchronous message handler does not exist.
/* Future<void> send(List<int> data); */

/// Publish data to the channel.
/* Future<PublishResult> publish(String channel, List<int> data); */
Expand All @@ -30,9 +30,6 @@ abstract interface class ICentrifuge {
/// free all allocated resources.
Future<void> close();

/// Send asynchronous message to the server.
/* Future<void> send( data); */

/// Send arbitrary RPC and wait for response.
/* Future<void> rpc(String method, data); */
}
38 changes: 28 additions & 10 deletions lib/src/model/config.dart
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
import 'dart:async';

import 'package:centrifuge_dart/src/model/pubspec.yaml.g.dart';
import 'package:meta/meta.dart';

/// Token used for authentication
//typedef CentrifugeToken = String;
typedef CentrifugeToken = String;

/// Callback to get/refresh tokens
//typedef CentrifugeTokenCallback = Future<CentrifugeToken> Function();
/// This callback is used for initial connection
/// and for refreshing expired tokens.
///
/// If method returns null then connection will be established without token.
typedef CentrifugeTokenCallback = FutureOr<CentrifugeToken?> Function();

/// Callback to get initial connection payload data.
/// For example to send JWT or other auth data.
///
/// If method returns null then no payload will be sent at connect time.
typedef CentrifugeConnectionPayloadCallback = FutureOr<List<int>?> Function();

/// {@template centrifuge_config}
/// Centrifuge client common options.
Expand All @@ -21,6 +33,8 @@ import 'package:meta/meta.dart';
final class CentrifugeConfig {
/// {@macro centrifuge_config}
CentrifugeConfig({
this.getToken,
this.getPayload,
({Duration min, Duration max})? connectionRetryInterval,
({String name, String version})? client,
this.timeout = const Duration(seconds: 15),
Expand All @@ -41,15 +55,19 @@ final class CentrifugeConfig {
/// {@macro centrifuge_config}
factory CentrifugeConfig.defaultConfig() = CentrifugeConfig;

// TODO(plugfox): Add support for the following options.
/// The data send for the first request
//final List<int>? data;

/// The initial token used for authentication
//CentrifugeToken token;

/// Callback to get/refresh tokens
//final CentrifugeTokenCallback? getToken;
/// This callback is used for initial connection
/// and for refreshing expired tokens.
///
/// If method returns null then connection will be established without token.
final CentrifugeTokenCallback? getToken;

/// The data send for the first request
/// Callback to get initial connection payload data.
/// For example to send JWT or other auth data.
///
/// If method returns null then no payload will be sent at connect time.
final CentrifugeConnectionPayloadCallback? getPayload;

/// The [connectionRetryInterval] argument is specifying the
/// [backoff full jitter strategy](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/) for reconnecting.
Expand Down
1 change: 1 addition & 0 deletions lib/src/model/messages.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
abstract interface class Message {}

0 comments on commit b582e11

Please sign in to comment.