Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 28, 2023
1 parent 32d13fa commit 6d7b092
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 21 deletions.
2 changes: 2 additions & 0 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ base mixin CentrifugeStateMixin on CentrifugeBase {

@protected
@nonVirtual
@pragma('vm:prefer-inline')
@pragma('dart2js:tryInline')
void _onStateChange(CentrifugeState newState) {
logger.info('State changed: ${_state.type} -> ${state.type}');
switch (newState) {
Expand Down
5 changes: 4 additions & 1 deletion lib/src/client/disconnect_code.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ enum DisconnectCode {
badProtocol(2, 'bad protocol'),

/// Client message write error
messageSizeLimit(3, 'message size limit exceeded');
messageSizeLimit(3, 'message size limit exceeded'),

/// Timeout
timeout(4, 'timeout exceeded');

/// {@nodoc}
const DisconnectCode(this.code, this.reason);
Expand Down
2 changes: 2 additions & 0 deletions lib/src/subscription/client_subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,11 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
ttl: subscribed.ttl,
));
} on CentrifugeException catch (error, stackTrace) {
unsubscribe(0, 'error while subscribing').ignore();
_emitError(error, stackTrace);
rethrow;
} on Object catch (error, stackTrace) {
unsubscribe(0, 'error while subscribing').ignore();
final centrifugeException = CentrifugeSubscriptionException(
message: 'Error while subscribing',
channel: channel,
Expand Down
28 changes: 11 additions & 17 deletions lib/src/subscription/client_subscription_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -94,36 +94,30 @@ final class ClientSubscriptionManager {
/// {@nodoc}
void subscribeAll() {
for (final entry in _channelSubscriptions.values) {
try {
// TODO(plugfox): moveToSubscribing if subscribed now
} on Object {
/* ignore */
}
entry.subscribe().ignore();
}
}

/// Disconnect all subscriptions for the specific client
/// from internal registry.
/// {@nodoc}
void unsubscribeAll() {
void unsubscribeAll([
int code = 0,
String reason = 'connection closed',
]) {
for (final entry in _channelSubscriptions.values) {
try {
// TODO(plugfox): moveToSubscribing if subscribed now
} on Object {
/* ignore */
}
entry.unsubscribe(code, reason).ignore();
}
}

/// Remove all subscriptions for the specific client from internal registry.
/// {@nodoc}
void removeAll() {
void removeAll([
int code = 0,
String reason = 'client closed',
]) {
for (final entry in _channelSubscriptions.values) {
try {
// TODO(plugfox): moveToSubscribing if subscribed now
} on Object {
/* ignore */
}
entry.unsubscribe(code, reason).ignore();
}
_channelSubscriptions.clear();
}
Expand Down
10 changes: 7 additions & 3 deletions lib/src/transport/ws_protobuf_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,7 @@ base mixin CentrifugeWSPBStateHandlerMixin
/// {@nodoc}
@protected
@nonVirtual
void _setState(CentrifugeState state) {
states.notify(state);
}
void _setState(CentrifugeState state) => states.notify(state);

@protected
@nonVirtual
Expand Down Expand Up @@ -556,6 +554,12 @@ base mixin CentrifugeWSPBSubscription
try {
result = await _sendMessage(request, pb.SubscribeResult())
.timeout(config.timeout);
} on TimeoutException {
disconnect(
DisconnectCode.timeout.code,
'Timeout while subscribing to channel $channel',
).ignore();
rethrow;
} on Object catch (error, stackTrace) {
Error.throwWithStackTrace(
CentrifugeSubscriptionException(
Expand Down

0 comments on commit 6d7b092

Please sign in to comment.