Skip to content

Commit

Permalink
Refresh subscription token
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 28, 2023
1 parent bbf1b5c commit 19d4b86
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 2 deletions.
28 changes: 28 additions & 0 deletions lib/src/subscription/client_subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
on
CentrifugeClientSubscriptionBase,
CentrifugeClientSubscriptionErrorsMixin {
/// Refresh timer.
/// {@nodoc}
Timer? _refreshTimer;

/// Start subscribing to a channel
/// {@nodoc}
@override
Expand Down Expand Up @@ -205,6 +209,7 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
));
if (subscribed.publications.isNotEmpty)
subscribed.publications.forEach(handlePublication);
if (subscribed.expires) _setRefreshTimer(subscribed.ttl);
} on CentrifugeException catch (error, stackTrace) {
unsubscribe(0, 'error while subscribing').ignore();
_emitError(error, stackTrace);
Expand Down Expand Up @@ -295,6 +300,29 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
}
}

/// Refresh subscription when ttl is expired.
/// {@nodoc}
void _setRefreshTimer(DateTime? ttl) {
_refreshTimer?.cancel();
_refreshTimer = null;
if (ttl == null) return;
final now = DateTime.now();
final duration = ttl.subtract(_config.timeout * 4).difference(now);
if (duration.isNegative) return;
_refreshTimer = Timer(duration, _refreshToken);
}

/// Refresh token for subscription.
/// {@nodoc}
void _refreshToken() => Future<void>(() async {
_refreshTimer?.cancel();
_refreshTimer = null;
final token = await _config.getToken?.call();
if (token == null) return;
final result = await _transport.sendSubRefresh(channel, token);
if (result.expires) _setRefreshTimer(result.ttl);
});

@override
Future<void> close() async {
logger.fine('Closing subscription to $channel');
Expand Down
7 changes: 7 additions & 0 deletions lib/src/transport/transport_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ abstract interface class ICentrifugeTransport {
/// {@nodoc}
Future<void> disconnect(int code, String reason);

/// Send subscription channel refresh token command to server.
/// {@nodoc}
Future<({bool expires, DateTime? ttl})> sendSubRefresh(
String channel,
String token,
);

/// Permanent close connection to the server and
/// free all allocated resources.
/// {@nodoc}
Expand Down
30 changes: 28 additions & 2 deletions lib/src/transport/ws_protobuf_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,14 @@ base mixin CentrifugeWSPBConnectionMixin
throw StateError('Connection closed during connection process');
}
final now = DateTime.now();
final expires = result.hasExpires() && result.expires && result.hasTtl();
_setState(CentrifugeState$Connected(
url: url,
timestamp: now,
client: result.hasClient() ? result.client : null,
version: result.hasVersion() ? result.version : null,
expires: result.hasExpires() ? result.expires : null,
ttl: result.hasTtl() ? now.add(Duration(seconds: result.ttl)) : null,
expires: expires,
ttl: expires ? now.add(Duration(seconds: result.ttl)) : null,
node: result.hasNode() ? result.node : null,
pingInterval: result.hasPing() ? Duration(seconds: result.ping) : null,
sendPong: result.hasPong() ? result.pong : null,
Expand Down Expand Up @@ -646,6 +647,11 @@ base mixin CentrifugeWSPBSubscription
String channel,
CentrifugeSubscriptionConfig config,
) async {
if (_webSocket.state.readyState.isDisconnecting ||
_webSocket.state.readyState.isClosed) {
// Disconnected - do nothing.
return;
}
final request = pb.UnsubscribeRequest()..channel = channel;
await _sendMessage(request, pb.UnsubscribeResult()).timeout(config.timeout);
}
Expand Down Expand Up @@ -718,6 +724,26 @@ base mixin CentrifugeWSPBSubscription
users: r.hasNumUsers() ? r.numUsers : 0,
),
);

@override
Future<({bool expires, DateTime? ttl})> sendSubRefresh(
String channel,
String token,
) =>
_sendMessage(
pb.SubRefreshRequest()
..channel = channel
..token = token,
pb.SubRefreshResult())
.then<({bool expires, DateTime? ttl})>(
(r) {
final expires = r.hasExpires() && r.expires && r.hasTtl();
return (
expires: expires,
ttl: expires ? DateTime.now().add(Duration(seconds: r.ttl)) : null
);
},
);
}

/// To maintain connection alive and detect broken connections
Expand Down

0 comments on commit 19d4b86

Please sign in to comment.