Skip to content

Commit

Permalink
Update transport connect interface
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 27, 2023
1 parent dd86a6b commit afbffb9
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 43 deletions.
6 changes: 4 additions & 2 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ final class Centrifuge extends CentrifugeBase
abstract base class CentrifugeBase implements ICentrifuge {
/// {@nodoc}
CentrifugeBase(CentrifugeConfig config) : _config = config {
_transport = CentrifugeWebSocketProtobufTransport(
_transport = CentrifugeWSPBTransport(
config: config,
disconnectCallback: _onDisconnect,
);
Expand Down Expand Up @@ -126,7 +126,9 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase, CentrifugeErrorsMixin {
Future<void> connect(String url) async {
logger.fine('Interactively connecting to $url');
try {
await _transport.connect(url);
final token = await _config.getToken?.call();
final payload = await _config.getPayload?.call();
await _transport.connect(url: url, token: token, payload: payload);
} on CentrifugeException catch (error, stackTrace) {
_emitError(error, stackTrace);
rethrow;
Expand Down
6 changes: 5 additions & 1 deletion lib/src/transport/transport_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ abstract interface class ICentrifugeTransport {
/// Connect to the server.
/// [url] is a URL of endpoint.
/// {@nodoc}
Future<void> connect(String url);
Future<void> connect({
required String url,
required String? token,
required List<int>? payload,
});

/// Send asynchronous message to a server. This method makes sense
/// only when using Centrifuge library for Go on a server side. In Centrifuge
Expand Down
113 changes: 73 additions & 40 deletions lib/src/transport/ws_protobuf_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import 'package:ws/ws.dart';

/// {@nodoc}
@internal
abstract base class CentrifugeWebSocketProtobufTransportBase
abstract base class CentrifugeWSPBTransportBase
implements ICentrifugeTransport {
/// {@nodoc}
CentrifugeWebSocketProtobufTransportBase({
CentrifugeWSPBTransportBase({
required CentrifugeConfig config,
required void Function() disconnectCallback,
}) : _config = config,
Expand Down Expand Up @@ -69,7 +69,11 @@ abstract base class CentrifugeWebSocketProtobufTransportBase

@override
@mustCallSuper
Future<void> connect(String url) async {}
Future<void> connect({
required String url,
required String? token,
required List<int>? payload,
}) async {}

@override
@mustCallSuper
Expand All @@ -95,14 +99,15 @@ abstract base class CentrifugeWebSocketProtobufTransportBase
/// {@nodoc}
@internal
// ignore: lines_longer_than_80_chars
final class CentrifugeWebSocketProtobufTransport = CentrifugeWebSocketProtobufTransportBase
final class CentrifugeWSPBTransport = CentrifugeWSPBTransportBase
with
CentrifugeWebSocketProtobufReplyMixin,
CentrifugeWebSocketStateHandlerMixin,
CentrifugeWebSocketProtobufSenderMixin,
CentrifugeWebSocketConnectionMixin,
CentrifugeWebSocketProtobufPingPongMixin,
CentrifugeWebSocketProtobufHandlerMixin;
CentrifugeWSPBReplyMixin,
CentrifugeWSPBStateHandlerMixin,
CentrifugeWSPBSenderMixin,
CentrifugeWSPBConnectionMixin,
CentrifugeWSPBPingPongMixin,
CentrifugeWSPBSubscription,
CentrifugeWSPBHandlerMixin;

/// Stored completer for responses.
/// {@nodoc}
Expand All @@ -114,8 +119,7 @@ typedef _ReplyCompleter = ({
/// Mixin responsible for holding reply completers.
/// {@nodoc}
@internal
base mixin CentrifugeWebSocketProtobufReplyMixin
on CentrifugeWebSocketProtobufTransportBase {
base mixin CentrifugeWSPBReplyMixin on CentrifugeWSPBTransportBase {
/// Completers for messages by id.
/// Contains timer for timeout and completer for response.
/// {@nodoc}
Expand Down Expand Up @@ -176,10 +180,8 @@ base mixin CentrifugeWebSocketProtobufReplyMixin
/// Mixin responsible for sending data through websocket with protobuf.
/// {@nodoc}
@internal
base mixin CentrifugeWebSocketProtobufSenderMixin
on
CentrifugeWebSocketProtobufTransportBase,
CentrifugeWebSocketProtobufReplyMixin {
base mixin CentrifugeWSPBSenderMixin
on CentrifugeWSPBTransportBase, CentrifugeWSPBReplyMixin {
/// Encoder protobuf commands to bytes.
/// {@nodoc}
static const Converter<pb.Command, List<int>> _commandEncoder =
Expand Down Expand Up @@ -301,21 +303,27 @@ base mixin CentrifugeWebSocketProtobufSenderMixin
/// Mixin responsible for connection.
/// {@nodoc}
@internal
base mixin CentrifugeWebSocketConnectionMixin
base mixin CentrifugeWSPBConnectionMixin
on
CentrifugeWebSocketProtobufTransportBase,
CentrifugeWebSocketProtobufSenderMixin,
CentrifugeWebSocketStateHandlerMixin {
CentrifugeWSPBTransportBase,
CentrifugeWSPBSenderMixin,
CentrifugeWSPBStateHandlerMixin {
@override
Future<void> connect(String url) async {
Future<void> connect({
required String url,
required String? token,
required List<int>? payload,
}) async {
try {
await super.connect(url);
await super.connect(
url: url,
token: token,
payload: payload,
);
await _webSocket.connect(url);
final request = pb.ConnectRequest();
final token = await _config.getToken?.call();
assert(token == null || token.length > 5, 'Centrifuge JWT is too short');
if (token != null) request.token = token;
final payload = await _config.getPayload?.call();
if (payload != null) request.data = payload;
request
..name = _config.client.name
Expand Down Expand Up @@ -359,10 +367,8 @@ base mixin CentrifugeWebSocketConnectionMixin
/// Handler for websocket states.
/// {@nodoc}
@internal
base mixin CentrifugeWebSocketStateHandlerMixin
on
CentrifugeWebSocketProtobufTransportBase,
CentrifugeWebSocketProtobufReplyMixin {
base mixin CentrifugeWSPBStateHandlerMixin
on CentrifugeWSPBTransportBase, CentrifugeWSPBReplyMixin {
// Subscribe to websocket state after first connection.
/// Subscription to websocket state.
/// {@nodoc}
Expand Down Expand Up @@ -437,15 +443,23 @@ base mixin CentrifugeWebSocketStateHandlerMixin
}

@override
Future<void> connect(String url) {
Future<void> connect({
required String url,
required String? token,
required List<int>? payload,
}) {
// Change state to connecting before connection.
_setState(CentrifugeState$Connecting(url: url));
// Subscribe to websocket state after initialization.
_webSocketClosedStateSubscription ??= _webSocket.stateChanges.closed.listen(
_handleWebSocketClosedStates,
cancelOnError: false,
);
return super.connect(url);
return super.connect(
url: url,
token: token,
payload: payload,
);
}

@override
Expand All @@ -459,11 +473,11 @@ base mixin CentrifugeWebSocketStateHandlerMixin
/// Handler for websocket messages and decode protobuf.
/// {@nodoc}
@internal
base mixin CentrifugeWebSocketProtobufHandlerMixin
base mixin CentrifugeWSPBHandlerMixin
on
CentrifugeWebSocketProtobufTransportBase,
CentrifugeWebSocketProtobufSenderMixin,
CentrifugeWebSocketProtobufPingPongMixin {
CentrifugeWSPBTransportBase,
CentrifugeWSPBSenderMixin,
CentrifugeWSPBPingPongMixin {
/// Encoder protobuf commands to bytes.
/// {@nodoc}
static const Converter<List<int>, Iterable<pb.Reply>> _replyDecoder =
Expand All @@ -474,13 +488,21 @@ base mixin CentrifugeWebSocketProtobufHandlerMixin
StreamSubscription<List<int>>? _webSocketMessageSubscription;

@override
Future<void> connect(String url) {
Future<void> connect({
required String url,
required String? token,
required List<int>? payload,
}) {
// Subscribe to websocket messages after first connection.
_webSocketMessageSubscription ??= _webSocket.stream.bytes.listen(
_handleWebSocketMessage,
cancelOnError: false,
);
return super.connect(url);
return super.connect(
url: url,
token: token,
payload: payload,
);
}

/// {@nodoc}
Expand Down Expand Up @@ -546,6 +568,13 @@ base mixin CentrifugeWebSocketProtobufHandlerMixin
}
}

/// Mixin responsible for centrifuge subscriptions.
/// {@nodoc}
@internal
base mixin CentrifugeWSPBSubscription on CentrifugeWSPBTransportBase {
Future<void> resubscribe() async {}
}

/// To maintain connection alive and detect broken connections
/// server periodically sends empty commands to clients
/// and expects empty replies from them.
Expand All @@ -554,16 +583,20 @@ base mixin CentrifugeWebSocketProtobufHandlerMixin
/// time it can consider connection broken and try to reconnect.
/// Usually a server sends pings every 25 seconds.
/// {@nodoc}
base mixin CentrifugeWebSocketProtobufPingPongMixin
on CentrifugeWebSocketProtobufTransportBase {
@internal
base mixin CentrifugeWSPBPingPongMixin on CentrifugeWSPBTransportBase {
@protected
@nonVirtual
Timer? _pingTimer;

@override
Future<void> connect(String url) async {
Future<void> connect({
required String url,
required String? token,
required List<int>? payload,
}) async {
_tearDownPingTimer();
await super.connect(url);
await super.connect(url: url, token: token, payload: payload);
_restartPingTimer();
}

Expand Down

0 comments on commit afbffb9

Please sign in to comment.