diff --git a/lib/src/client/centrifuge.dart b/lib/src/client/centrifuge.dart index 7efd2cb..adeee68 100644 --- a/lib/src/client/centrifuge.dart +++ b/lib/src/client/centrifuge.dart @@ -100,7 +100,11 @@ abstract base class CentrifugeBase implements ICentrifuge { /// Mixin responsible for centrifuge states /// {@nodoc} @internal -base mixin CentrifugeStateMixin on CentrifugeBase { +base mixin CentrifugeStateMixin on CentrifugeBase, CentrifugeErrorsMixin { + /// Refresh timer. + /// {@nodoc} + Timer? _refreshTimer; + @override @nonVirtual CentrifugeState get state => _state; @@ -127,6 +131,8 @@ base mixin CentrifugeStateMixin on CentrifugeBase { @pragma('dart2js:tryInline') void _onStateChange(CentrifugeState newState) { logger.info('State changed: ${_state.type} -> ${state.type}'); + _refreshTimer?.cancel(); + _refreshTimer = null; switch (newState) { case CentrifugeState$Disconnected state: _onDisconnected(state); @@ -134,6 +140,7 @@ base mixin CentrifugeStateMixin on CentrifugeBase { break; case CentrifugeState$Connected state: _onConnected(state); + if (state.expires == true) _setRefreshTimer(state.ttl); case CentrifugeState$Closed _: break; } @@ -145,6 +152,43 @@ base mixin CentrifugeStateMixin on CentrifugeBase { final StreamController _statesController = StreamController.broadcast(); + /// Refresh connection token when ttl is expired. + /// {@nodoc} + void _setRefreshTimer(DateTime? ttl) { + _refreshTimer?.cancel(); + _refreshTimer = null; + if (ttl == null) return; + final now = DateTime.now(); + final duration = ttl.subtract(_config.timeout * 4).difference(now); + if (duration.isNegative) return; + _refreshTimer = Timer(duration, _refreshToken); + } + + /// Refresh token for subscription. + /// {@nodoc} + void _refreshToken() => Future(() async { + try { + _refreshTimer?.cancel(); + _refreshTimer = null; + final token = await _config.getToken?.call(); + if (token == null || !state.isConnected) return; + await _transport.sendRefresh(token); + } on Object catch (error, stackTrace) { + logger.warning( + error, + stackTrace, + 'Error while refreshing connection token', + ); + _emitError( + CentrifugeRefreshException( + message: 'Error while refreshing connection token', + error: error, + ), + stackTrace, + ); + } + }).ignore(); + @override Future close() => super.close().whenComplete(() { _transport.states.removeListener(_onStateChange); @@ -185,11 +229,14 @@ base mixin CentrifugeErrorsMixin on CentrifugeBase { /// Mixin responsible for connection. /// {@nodoc} @internal -base mixin CentrifugeConnectionMixin on CentrifugeBase, CentrifugeErrorsMixin { +base mixin CentrifugeConnectionMixin + on CentrifugeBase, CentrifugeErrorsMixin, CentrifugeStateMixin { @override Future connect(String url) async { logger.fine('Interactively connecting to $url'); try { + _refreshTimer?.cancel(); + _refreshTimer = null; await _transport.connect(url); } on CentrifugeException catch (error, stackTrace) { _emitError(error, stackTrace); diff --git a/lib/src/model/exception.dart b/lib/src/model/exception.dart index 0ed6dfd..df65f7c 100644 --- a/lib/src/model/exception.dart +++ b/lib/src/model/exception.dart @@ -38,7 +38,7 @@ final class CentrifugeConnectionException extends CentrifugeException { /// {@macro exception} const CentrifugeConnectionException({String? message, Object? error}) : super( - 'centrifuge_disconnection_exception', + 'centrifuge_connection_exception', message ?? 'Connection problem', error, ); @@ -121,3 +121,17 @@ final class CentrifugeFetchException extends CentrifugeException { error, ); } + +/// {@macro exception} +/// {@category Exception} +final class CentrifugeRefreshException extends CentrifugeException { + /// {@macro exception} + const CentrifugeRefreshException({ + String? message, + Object? error, + }) : super( + 'centrifuge_refresh_exception', + message ?? 'Error while refreshing connection token', + error, + ); +} diff --git a/lib/src/model/refresh.dart b/lib/src/model/refresh.dart new file mode 100644 index 0000000..e83d363 --- /dev/null +++ b/lib/src/model/refresh.dart @@ -0,0 +1,43 @@ +import 'package:meta/meta.dart'; + +/// {@nodoc} +@internal +@immutable +final class CentrifugeRefreshResult { + /// {@nodoc} + const CentrifugeRefreshResult({ + required this.expires, + this.client, + this.version, + this.ttl, + }); + + /// Unique client connection ID server issued to this connection + final String? client; + + /// Server version + final String? version; + + /// Whether a server will expire connection at some point + final bool expires; + + /// Time when connection will be expired + final DateTime? ttl; +} + +/// {@nodoc} +@internal +@immutable +final class CentrifugeSubRefreshResult { + /// {@nodoc} + const CentrifugeSubRefreshResult({ + required this.expires, + this.ttl, + }); + + /// Whether a server will expire subscription at some point + final bool expires; + + /// Time when subscription will be expired + final DateTime? ttl; +} diff --git a/lib/src/subscription/client_subscription_impl.dart b/lib/src/subscription/client_subscription_impl.dart index 10ca338..1677863 100644 --- a/lib/src/subscription/client_subscription_impl.dart +++ b/lib/src/subscription/client_subscription_impl.dart @@ -188,9 +188,9 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin } else if (state.isSubscribing) { return await ready(); } - _setState(CentrifugeSubscriptionState$Subscribing(since: state.since)); _refreshTimer?.cancel(); _refreshTimer = null; + _setState(CentrifugeSubscriptionState$Subscribing(since: state.since)); final subscribed = await _transport.subscribe( channel, _config, @@ -319,13 +319,28 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin /// Refresh token for subscription. /// {@nodoc} void _refreshToken() => Future(() async { - _refreshTimer?.cancel(); - _refreshTimer = null; - final token = await _config.getToken?.call(); - if (token == null || !state.isSubscribed) return; - final result = await _transport.sendSubRefresh(channel, token); - if (result.expires) _setRefreshTimer(result.ttl); - }); + try { + _refreshTimer?.cancel(); + _refreshTimer = null; + final token = await _config.getToken?.call(); + if (token == null || !state.isSubscribed) return; + final result = await _transport.sendSubRefresh(channel, token); + if (result.expires) _setRefreshTimer(result.ttl); + } on Object catch (error, stackTrace) { + logger.warning( + error, + stackTrace, + 'Error while refreshing subscription token', + ); + _emitError( + CentrifugeRefreshException( + message: 'Error while refreshing subscription token', + error: error, + ), + stackTrace, + ); + } + }).ignore(); @override Future close() async { diff --git a/lib/src/transport/transport_interface.dart b/lib/src/transport/transport_interface.dart index 215955b..df2d4a0 100644 --- a/lib/src/transport/transport_interface.dart +++ b/lib/src/transport/transport_interface.dart @@ -5,6 +5,7 @@ import 'package:centrifuge_dart/src/model/channel_presence.dart'; import 'package:centrifuge_dart/src/model/history.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; +import 'package:centrifuge_dart/src/model/refresh.dart'; import 'package:centrifuge_dart/src/model/stream_position.dart'; import 'package:centrifuge_dart/src/subscription/subcibed_on_channel.dart'; import 'package:centrifuge_dart/src/util/notifier.dart'; @@ -85,9 +86,11 @@ abstract interface class ICentrifugeTransport { /// {@nodoc} Future disconnect(int code, String reason); + Future sendRefresh(String token); + /// Send subscription channel refresh token command to server. /// {@nodoc} - Future<({bool expires, DateTime? ttl})> sendSubRefresh( + Future sendSubRefresh( String channel, String token, ); diff --git a/lib/src/transport/ws_protobuf_transport.dart b/lib/src/transport/ws_protobuf_transport.dart index f093840..a9b9a27 100644 --- a/lib/src/transport/ws_protobuf_transport.dart +++ b/lib/src/transport/ws_protobuf_transport.dart @@ -9,6 +9,7 @@ import 'package:centrifuge_dart/src/model/history.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; import 'package:centrifuge_dart/src/model/protobuf/client.pb.dart' as pb; +import 'package:centrifuge_dart/src/model/refresh.dart'; import 'package:centrifuge_dart/src/model/stream_position.dart'; import 'package:centrifuge_dart/src/subscription/subcibed_on_channel.dart'; import 'package:centrifuge_dart/src/transport/transport_interface.dart'; @@ -355,6 +356,42 @@ base mixin CentrifugeWSPBConnectionMixin rethrow; } } + + @override + Future sendRefresh(String token) { + if (!_state.isConnected) throw StateError('Not connected'); + return _sendMessage(pb.RefreshRequest()..token = token, pb.RefreshResult()) + .then( + (result) { + final state = _state; + if (state is CentrifugeState$Connected) { + final now = DateTime.now(); + final expires = + result.hasExpires() && result.expires && result.hasTtl(); + final ttl = expires ? now.add(Duration(seconds: result.ttl)) : null; + _setState(CentrifugeState$Connected( + url: state.url, + timestamp: now, + client: result.hasClient() ? result.client : null, + version: result.hasVersion() ? result.version : null, + expires: expires, + ttl: ttl, + node: state.node, + pingInterval: state.pingInterval, + sendPong: state.sendPong, + session: state.session, + data: state.data, + )); + return CentrifugeRefreshResult( + expires: expires, + ttl: ttl, + ); + } else { + throw StateError('Not connected'); + } + }, + ); + } } /// Handler for websocket states. @@ -726,7 +763,7 @@ base mixin CentrifugeWSPBSubscription ); @override - Future<({bool expires, DateTime? ttl})> sendSubRefresh( + Future sendSubRefresh( String channel, String token, ) => @@ -735,12 +772,12 @@ base mixin CentrifugeWSPBSubscription ..channel = channel ..token = token, pb.SubRefreshResult()) - .then<({bool expires, DateTime? ttl})>( + .then( (r) { final expires = r.hasExpires() && r.expires && r.hasTtl(); - return ( + return CentrifugeSubRefreshResult( expires: expires, - ttl: expires ? DateTime.now().add(Duration(seconds: r.ttl)) : null + ttl: expires ? DateTime.now().add(Duration(seconds: r.ttl)) : null, ); }, );