Skip to content

Commit

Permalink
Refresh token
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 28, 2023
1 parent 2b6eaae commit d963d35
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 16 deletions.
51 changes: 49 additions & 2 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ abstract base class CentrifugeBase implements ICentrifuge {
/// Mixin responsible for centrifuge states
/// {@nodoc}
@internal
base mixin CentrifugeStateMixin on CentrifugeBase {
base mixin CentrifugeStateMixin on CentrifugeBase, CentrifugeErrorsMixin {
/// Refresh timer.
/// {@nodoc}
Timer? _refreshTimer;

@override
@nonVirtual
CentrifugeState get state => _state;
Expand All @@ -127,13 +131,16 @@ base mixin CentrifugeStateMixin on CentrifugeBase {
@pragma('dart2js:tryInline')
void _onStateChange(CentrifugeState newState) {
logger.info('State changed: ${_state.type} -> ${state.type}');
_refreshTimer?.cancel();
_refreshTimer = null;
switch (newState) {
case CentrifugeState$Disconnected state:
_onDisconnected(state);
case CentrifugeState$Connecting _:
break;
case CentrifugeState$Connected state:
_onConnected(state);
if (state.expires == true) _setRefreshTimer(state.ttl);
case CentrifugeState$Closed _:
break;
}
Expand All @@ -145,6 +152,43 @@ base mixin CentrifugeStateMixin on CentrifugeBase {
final StreamController<CentrifugeState> _statesController =
StreamController<CentrifugeState>.broadcast();

/// Refresh connection token when ttl is expired.
/// {@nodoc}
void _setRefreshTimer(DateTime? ttl) {
_refreshTimer?.cancel();
_refreshTimer = null;
if (ttl == null) return;
final now = DateTime.now();
final duration = ttl.subtract(_config.timeout * 4).difference(now);
if (duration.isNegative) return;
_refreshTimer = Timer(duration, _refreshToken);
}

/// Refresh token for subscription.
/// {@nodoc}
void _refreshToken() => Future<void>(() async {
try {
_refreshTimer?.cancel();
_refreshTimer = null;
final token = await _config.getToken?.call();
if (token == null || !state.isConnected) return;
await _transport.sendRefresh(token);
} on Object catch (error, stackTrace) {
logger.warning(
error,
stackTrace,
'Error while refreshing connection token',
);
_emitError(
CentrifugeRefreshException(
message: 'Error while refreshing connection token',
error: error,
),
stackTrace,
);
}
}).ignore();

@override
Future<void> close() => super.close().whenComplete(() {
_transport.states.removeListener(_onStateChange);
Expand Down Expand Up @@ -185,11 +229,14 @@ base mixin CentrifugeErrorsMixin on CentrifugeBase {
/// Mixin responsible for connection.
/// {@nodoc}
@internal
base mixin CentrifugeConnectionMixin on CentrifugeBase, CentrifugeErrorsMixin {
base mixin CentrifugeConnectionMixin
on CentrifugeBase, CentrifugeErrorsMixin, CentrifugeStateMixin {
@override
Future<void> connect(String url) async {
logger.fine('Interactively connecting to $url');
try {
_refreshTimer?.cancel();
_refreshTimer = null;
await _transport.connect(url);
} on CentrifugeException catch (error, stackTrace) {
_emitError(error, stackTrace);
Expand Down
16 changes: 15 additions & 1 deletion lib/src/model/exception.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ final class CentrifugeConnectionException extends CentrifugeException {
/// {@macro exception}
const CentrifugeConnectionException({String? message, Object? error})
: super(
'centrifuge_disconnection_exception',
'centrifuge_connection_exception',
message ?? 'Connection problem',
error,
);
Expand Down Expand Up @@ -121,3 +121,17 @@ final class CentrifugeFetchException extends CentrifugeException {
error,
);
}

/// {@macro exception}
/// {@category Exception}
final class CentrifugeRefreshException extends CentrifugeException {
/// {@macro exception}
const CentrifugeRefreshException({
String? message,
Object? error,
}) : super(
'centrifuge_refresh_exception',
message ?? 'Error while refreshing connection token',
error,
);
}
43 changes: 43 additions & 0 deletions lib/src/model/refresh.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import 'package:meta/meta.dart';

/// {@nodoc}
@internal
@immutable
final class CentrifugeRefreshResult {
/// {@nodoc}
const CentrifugeRefreshResult({
required this.expires,
this.client,
this.version,
this.ttl,
});

/// Unique client connection ID server issued to this connection
final String? client;

/// Server version
final String? version;

/// Whether a server will expire connection at some point
final bool expires;

/// Time when connection will be expired
final DateTime? ttl;
}

/// {@nodoc}
@internal
@immutable
final class CentrifugeSubRefreshResult {
/// {@nodoc}
const CentrifugeSubRefreshResult({
required this.expires,
this.ttl,
});

/// Whether a server will expire subscription at some point
final bool expires;

/// Time when subscription will be expired
final DateTime? ttl;
}
31 changes: 23 additions & 8 deletions lib/src/subscription/client_subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
} else if (state.isSubscribing) {
return await ready();
}
_setState(CentrifugeSubscriptionState$Subscribing(since: state.since));
_refreshTimer?.cancel();
_refreshTimer = null;
_setState(CentrifugeSubscriptionState$Subscribing(since: state.since));
final subscribed = await _transport.subscribe(
channel,
_config,
Expand Down Expand Up @@ -319,13 +319,28 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
/// Refresh token for subscription.
/// {@nodoc}
void _refreshToken() => Future<void>(() async {
_refreshTimer?.cancel();
_refreshTimer = null;
final token = await _config.getToken?.call();
if (token == null || !state.isSubscribed) return;
final result = await _transport.sendSubRefresh(channel, token);
if (result.expires) _setRefreshTimer(result.ttl);
});
try {
_refreshTimer?.cancel();
_refreshTimer = null;
final token = await _config.getToken?.call();
if (token == null || !state.isSubscribed) return;
final result = await _transport.sendSubRefresh(channel, token);
if (result.expires) _setRefreshTimer(result.ttl);
} on Object catch (error, stackTrace) {
logger.warning(
error,
stackTrace,
'Error while refreshing subscription token',
);
_emitError(
CentrifugeRefreshException(
message: 'Error while refreshing subscription token',
error: error,
),
stackTrace,
);
}
}).ignore();

@override
Future<void> close() async {
Expand Down
5 changes: 4 additions & 1 deletion lib/src/transport/transport_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'package:centrifuge_dart/src/model/channel_presence.dart';
import 'package:centrifuge_dart/src/model/history.dart';
import 'package:centrifuge_dart/src/model/presence.dart';
import 'package:centrifuge_dart/src/model/presence_stats.dart';
import 'package:centrifuge_dart/src/model/refresh.dart';
import 'package:centrifuge_dart/src/model/stream_position.dart';
import 'package:centrifuge_dart/src/subscription/subcibed_on_channel.dart';
import 'package:centrifuge_dart/src/util/notifier.dart';
Expand Down Expand Up @@ -85,9 +86,11 @@ abstract interface class ICentrifugeTransport {
/// {@nodoc}
Future<void> disconnect(int code, String reason);

Future<CentrifugeRefreshResult> sendRefresh(String token);

/// Send subscription channel refresh token command to server.
/// {@nodoc}
Future<({bool expires, DateTime? ttl})> sendSubRefresh(
Future<CentrifugeSubRefreshResult> sendSubRefresh(
String channel,
String token,
);
Expand Down
45 changes: 41 additions & 4 deletions lib/src/transport/ws_protobuf_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'package:centrifuge_dart/src/model/history.dart';
import 'package:centrifuge_dart/src/model/presence.dart';
import 'package:centrifuge_dart/src/model/presence_stats.dart';
import 'package:centrifuge_dart/src/model/protobuf/client.pb.dart' as pb;
import 'package:centrifuge_dart/src/model/refresh.dart';
import 'package:centrifuge_dart/src/model/stream_position.dart';
import 'package:centrifuge_dart/src/subscription/subcibed_on_channel.dart';
import 'package:centrifuge_dart/src/transport/transport_interface.dart';
Expand Down Expand Up @@ -355,6 +356,42 @@ base mixin CentrifugeWSPBConnectionMixin
rethrow;
}
}

@override
Future<CentrifugeRefreshResult> sendRefresh(String token) {
if (!_state.isConnected) throw StateError('Not connected');
return _sendMessage(pb.RefreshRequest()..token = token, pb.RefreshResult())
.then<CentrifugeRefreshResult>(
(result) {
final state = _state;
if (state is CentrifugeState$Connected) {
final now = DateTime.now();
final expires =
result.hasExpires() && result.expires && result.hasTtl();
final ttl = expires ? now.add(Duration(seconds: result.ttl)) : null;
_setState(CentrifugeState$Connected(
url: state.url,
timestamp: now,
client: result.hasClient() ? result.client : null,
version: result.hasVersion() ? result.version : null,
expires: expires,
ttl: ttl,
node: state.node,
pingInterval: state.pingInterval,
sendPong: state.sendPong,
session: state.session,
data: state.data,
));
return CentrifugeRefreshResult(
expires: expires,
ttl: ttl,
);
} else {
throw StateError('Not connected');
}
},
);
}
}

/// Handler for websocket states.
Expand Down Expand Up @@ -726,7 +763,7 @@ base mixin CentrifugeWSPBSubscription
);

@override
Future<({bool expires, DateTime? ttl})> sendSubRefresh(
Future<CentrifugeSubRefreshResult> sendSubRefresh(
String channel,
String token,
) =>
Expand All @@ -735,12 +772,12 @@ base mixin CentrifugeWSPBSubscription
..channel = channel
..token = token,
pb.SubRefreshResult())
.then<({bool expires, DateTime? ttl})>(
.then<CentrifugeSubRefreshResult>(
(r) {
final expires = r.hasExpires() && r.expires && r.hasTtl();
return (
return CentrifugeSubRefreshResult(
expires: expires,
ttl: expires ? DateTime.now().add(Duration(seconds: r.ttl)) : null
ttl: expires ? DateTime.now().add(Duration(seconds: r.ttl)) : null,
);
},
);
Expand Down

0 comments on commit d963d35

Please sign in to comment.