Skip to content

Commit

Permalink
Add listener for publications
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 28, 2023
1 parent 4203cb3 commit be7e6e8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
22 changes: 12 additions & 10 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,10 @@ base mixin CentrifugeStateMixin on CentrifugeBase {
StreamController<CentrifugeState>.broadcast();

@override
Future<void> close() async {
await super.close();
_transport.states.removeListener(_onStateChange);
await _statesController.close();
}
Future<void> close() => super.close().whenComplete(() {
_transport.states.removeListener(_onStateChange);
_statesController.close().ignore();
});
}

/// Mixin responsible for errors stream.
Expand Down Expand Up @@ -363,7 +362,6 @@ base mixin CentrifugePublicationsMixin
void _initCentrifuge() {
super._initCentrifuge();
_transport.publications.addListener(_onPublication);
// TODO(plugfox): notify subscriptions
}

@protected
Expand Down Expand Up @@ -398,8 +396,10 @@ base mixin CentrifugePublicationsMixin
@nonVirtual
@pragma('vm:prefer-inline')
@pragma('dart2js:tryInline')
void _onPublication(CentrifugePublication publication) =>
_publicationsController.add(publication);
void _onPublication(CentrifugePublication publication) {
_clientSubscriptionManager.handlePublication(publication);
_publicationsController.add(publication);
}

@override
Future<void> close() => super.close().whenComplete(() {
Expand All @@ -416,12 +416,14 @@ base mixin CentrifugeQueueMixin on CentrifugeBase {
/// {@nodoc}
final CentrifugeEventQueue _eventQueue = CentrifugeEventQueue();

// TODO(plugfox): add all methods

@override
Future<void> connect(String url) =>
_eventQueue.push<void>('connect', () => super.connect(url));

@override
Future<void> publish(String channel, List<int> data) =>
_eventQueue.push<void>('publish', () => super.publish(channel, data));

@override
FutureOr<void> ready() => _eventQueue.push<void>('ready', super.ready);

Expand Down
8 changes: 5 additions & 3 deletions lib/src/subscription/client_subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,11 @@ abstract base class CentrifugeClientSubscriptionBase
final StreamController<CentrifugePublication> _publicationController =
StreamController<CentrifugePublication>.broadcast();

@protected
/// Notify about new publication.
/// {@nodoc}
@internal
@nonVirtual
void _handlePublication(CentrifugePublication publication) {
void handlePublication(CentrifugePublication publication) {
final offset = publication.offset;
if (offset != null && offset > _offset) _offset = offset;
_publicationController.add(publication);
Expand Down Expand Up @@ -200,7 +202,7 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
ttl: subscribed.ttl,
));
if (subscribed.publications.isNotEmpty)
subscribed.publications.forEach(_handlePublication);
subscribed.publications.forEach(handlePublication);
} on CentrifugeException catch (error, stackTrace) {
unsubscribe(0, 'error while subscribing').ignore();
_emitError(error, stackTrace);
Expand Down
9 changes: 9 additions & 0 deletions lib/src/subscription/client_subscription_manager.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:collection';

import 'package:centrifuge_dart/src/model/exception.dart';
import 'package:centrifuge_dart/src/model/publication.dart';
import 'package:centrifuge_dart/src/subscription/client_subscription_impl.dart';
import 'package:centrifuge_dart/src/subscription/subscription.dart';
import 'package:centrifuge_dart/src/subscription/subscription_config.dart';
Expand Down Expand Up @@ -48,6 +49,14 @@ final class ClientSubscriptionManager {
);
}

/// Notify subscription about new publication.
/// {@nodoc}
@internal
@nonVirtual
void handlePublication(CentrifugePublication publication) =>
_channelSubscriptions[publication.channel]
?.handlePublication(publication);

/// Get map wirth all registered client-side subscriptions.
/// Returns all registered subscriptions,
/// so you can iterate over all and do some action if required
Expand Down

0 comments on commit be7e6e8

Please sign in to comment.