diff --git a/lib/src/client/centrifuge.dart b/lib/src/client/centrifuge.dart index 6866e3b..cfc7015 100644 --- a/lib/src/client/centrifuge.dart +++ b/lib/src/client/centrifuge.dart @@ -5,13 +5,11 @@ import 'package:centrifuge_dart/src/client/config.dart'; import 'package:centrifuge_dart/src/client/disconnect_code.dart'; import 'package:centrifuge_dart/src/client/state.dart'; import 'package:centrifuge_dart/src/client/states_stream.dart'; -import 'package:centrifuge_dart/src/model/channel_presence.dart'; -import 'package:centrifuge_dart/src/model/channel_presence_stream.dart'; import 'package:centrifuge_dart/src/model/event.dart'; +import 'package:centrifuge_dart/src/model/event_stream.dart'; import 'package:centrifuge_dart/src/model/exception.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; -import 'package:centrifuge_dart/src/model/publication.dart'; import 'package:centrifuge_dart/src/subscription/client_subscription_manager.dart'; import 'package:centrifuge_dart/src/subscription/subscription.dart'; import 'package:centrifuge_dart/src/subscription/subscription_config.dart'; @@ -68,6 +66,16 @@ abstract base class CentrifugeBase implements ICentrifuge { @nonVirtual final CentrifugeConfig _config; + @protected + @nonVirtual + final StreamController _eventsController = + StreamController.broadcast(); + + @override + @nonVirtual + late final CentrifugeEventStream events = + CentrifugeEventStream(_eventsController.stream); + /// Init centrifuge client, override this method to add custom logic. /// This method is called in constructor. /// {@nodoc} @@ -100,28 +108,20 @@ abstract base class CentrifugeBase implements ICentrifuge { @protected @mustCallSuper void _onEvent(CentrifugeEvent event) { - switch (event) { + /* switch (event) { case CentrifugePublication publication: _onPublication(publication); case CentrifugeChannelPresenceEvent event: _onPresenceEvent(event); - } + } */ + _eventsController.add(event); } - /// Called when publication received. - /// {@nodoc} - @protected - void _onPublication(CentrifugePublication publication); - - /// Called when presence event received. - /// {@nodoc} - @protected - void _onPresenceEvent(CentrifugeChannelPresenceEvent event); - @override @mustCallSuper Future close() async { _transport.events.removeListener(_onEvent); + _eventsController.close().ignore(); } } @@ -439,21 +439,6 @@ base mixin CentrifugePublicationsMixin CentrifugeErrorsMixin, CentrifugeClientSubscriptionMixin { @override - void _initCentrifuge() { - super._initCentrifuge(); - } - - @protected - @nonVirtual - final StreamController _publicationsController = - StreamController.broadcast(); - - @override - @nonVirtual - late final Stream publications = - _publicationsController.stream; - - @override Future publish(String channel, List data) async { try { await ready(); @@ -470,20 +455,6 @@ base mixin CentrifugePublicationsMixin Error.throwWithStackTrace(centrifugeException, stackTrace); } } - - @override - @nonVirtual - @pragma('vm:prefer-inline') - @pragma('dart2js:tryInline') - void _onPublication(CentrifugePublication publication) { - _clientSubscriptionManager.handlePublication(publication); - _publicationsController.add(publication); - } - - @override - Future close() => super.close().whenComplete(() { - _publicationsController.close().ignore(); - }); } /// Mixin responsible for presence. @@ -494,22 +465,6 @@ base mixin CentrifugePresenceMixin CentrifugeErrorsMixin, CentrifugeClientSubscriptionMixin { @override - void _initCentrifuge() { - super._initCentrifuge(); - } - - @protected - @nonVirtual - final StreamController - _presenceEventsController = - StreamController.broadcast(); - - @override - @nonVirtual - late final CentrifugeChannelPresenceStream presenceEvents = - CentrifugeChannelPresenceStream(_presenceEventsController.stream); - - @override Future presence(String channel) async { try { await ready(); @@ -544,20 +499,6 @@ base mixin CentrifugePresenceMixin Error.throwWithStackTrace(centrifugeException, stackTrace); } } - - @override - @nonVirtual - @pragma('vm:prefer-inline') - @pragma('dart2js:tryInline') - void _onPresenceEvent(CentrifugeChannelPresenceEvent event) { - _clientSubscriptionManager.handlePresenceEvent(event); - _presenceEventsController.add(event); - } - - @override - Future close() => super.close().whenComplete(() { - _presenceEventsController.close().ignore(); - }); } /// Mixin responsible for queue. diff --git a/lib/src/client/centrifuge_interface.dart b/lib/src/client/centrifuge_interface.dart index 8045b83..fb8353f 100644 --- a/lib/src/client/centrifuge_interface.dart +++ b/lib/src/client/centrifuge_interface.dart @@ -3,7 +3,7 @@ import 'dart:async'; import 'package:centrifuge_dart/centrifuge.dart'; -import 'package:centrifuge_dart/src/model/channel_presence_stream.dart'; +import 'package:centrifuge_dart/src/model/event_stream.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; @@ -13,7 +13,7 @@ abstract interface class ICentrifuge ICentrifugeStateOwner, ICentrifugeAsyncMessageSender, ICentrifugePublicationSender, - ICentrifugePublicationReceiver, + ICentrifugeEventReceiver, ICentrifugeClientSubscriptionsManager, ICentrifugePresenceOwner { /// Stream of errors. @@ -62,12 +62,6 @@ abstract interface class ICentrifugePublicationSender { Future publish(String channel, List data); } -/// Centrifuge receive publication interface. -abstract interface class ICentrifugePublicationReceiver { - /// Stream of publications. - abstract final Stream publications; -} - /// Centrifuge send asynchronous message interface. abstract interface class ICentrifugeAsyncMessageSender { /// Send asynchronous message to a server. This method makes sense @@ -76,6 +70,12 @@ abstract interface class ICentrifugeAsyncMessageSender { Future send(List data); } +/// Centrifuge event receiver interface. +abstract interface class ICentrifugeEventReceiver { + /// Stream of centrifuge events. + abstract final CentrifugeEventStream events; +} + /// Centrifuge client subscriptions manager interface. abstract interface class ICentrifugeClientSubscriptionsManager { /// Create new client-side subscription. @@ -108,9 +108,6 @@ abstract interface class ICentrifugeClientSubscriptionsManager { /// Centrifuge presence owner interface. abstract interface class ICentrifugePresenceOwner { - /// Stream of presence (join & leave) events. - abstract final CentrifugeChannelPresenceStream presenceEvents; - /// Fetch presence information inside a channel. Future presence(String channel); diff --git a/lib/src/model/channel_presence_stream.dart b/lib/src/model/channel_presence_stream.dart deleted file mode 100644 index 542e3d1..0000000 --- a/lib/src/model/channel_presence_stream.dart +++ /dev/null @@ -1,33 +0,0 @@ -import 'dart:async'; - -import 'package:centrifuge_dart/src/model/channel_presence.dart'; - -/// Stream of Centrifuge's presence event changes. -/// Join & Leave events. -/// {@category Entity} -/// {@subCategory Channel} -/// {@subCategory Presence} -final class CentrifugeChannelPresenceStream - extends StreamView { - /// Stream of Centrifuge's presence event changes. - /// Join & Leave events. - CentrifugeChannelPresenceStream(super.stream); - - /// Join events - late final Stream joinEvents = - whereType(); - - /// Leave events - late final Stream leaveEvents = - whereType(); - - /// Filtered stream of data of [CentrifugeChannelPresenceEvent]. - Stream whereType() => - transform( - StreamTransformer.fromHandlers( - handleData: (data, sink) => switch (data) { - T valid => sink.add(valid), - _ => null, - }, - )).asBroadcastStream(); -} diff --git a/lib/src/model/event_stream.dart b/lib/src/model/event_stream.dart new file mode 100644 index 0000000..9941aad --- /dev/null +++ b/lib/src/model/event_stream.dart @@ -0,0 +1,39 @@ +import 'dart:async'; + +import 'package:centrifuge_dart/src/model/channel_presence.dart'; +import 'package:centrifuge_dart/src/model/event.dart'; +import 'package:centrifuge_dart/src/model/publication.dart'; + +/// Stream of received events. +/// {@category Entity} +/// {@subCategory Event} +/// {@subCategory Channel} +final class CentrifugeEventStream extends StreamView { + /// Stream of received events. + CentrifugeEventStream(super.stream); + + /// Publications stream. + late final Stream publications = + whereType(); + + /// Stream of presence (join & leave) events. + late final Stream presenceEvents = + whereType(); + + /// Join events + late final Stream joinEvents = + whereType(); + + /// Leave events + late final Stream leaveEvents = + whereType(); + + /// Filtered stream of data of [CentrifugeEvent]. + Stream whereType() => + transform(StreamTransformer.fromHandlers( + handleData: (data, sink) => switch (data) { + T valid => sink.add(valid), + _ => null, + }, + )).asBroadcastStream(); +} diff --git a/lib/src/subscription/client_subscription_impl.dart b/lib/src/subscription/client_subscription_impl.dart index 9049f8f..93b4429 100644 --- a/lib/src/subscription/client_subscription_impl.dart +++ b/lib/src/subscription/client_subscription_impl.dart @@ -2,8 +2,9 @@ import 'dart:async'; import 'package:centrifuge_dart/centrifuge.dart'; import 'package:centrifuge_dart/src/client/disconnect_code.dart'; -import 'package:centrifuge_dart/src/model/channel_presence.dart'; -import 'package:centrifuge_dart/src/model/channel_presence_stream.dart'; +import 'package:centrifuge_dart/src/model/channel_event.dart'; +import 'package:centrifuge_dart/src/model/event.dart'; +import 'package:centrifuge_dart/src/model/event_stream.dart'; import 'package:centrifuge_dart/src/model/history.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; @@ -70,6 +71,16 @@ abstract base class CentrifugeClientSubscriptionBase /// {@nodoc} final CentrifugeSubscriptionConfig _config; + @protected + @nonVirtual + final StreamController _eventsController = + StreamController.broadcast(); + + @override + @nonVirtual + late final CentrifugeEventStream events = + CentrifugeEventStream(_eventsController.stream); + /// Init subscription. /// {@nodoc} @protected @@ -78,6 +89,16 @@ abstract base class CentrifugeClientSubscriptionBase _state = CentrifugeSubscriptionState.unsubscribed( since: _config.since, code: 0, reason: 'initial state'); _offset = _config.since?.offset ?? fixnum.Int64.ZERO; + _transport.events.addListener(_onEvent); + } + + /// Router for all events. + /// {@nodoc} + @protected + @mustCallSuper + void _onEvent(CentrifugeEvent event) { + if (event is! CentrifugeChannelEvent || event.channel != channel) return; + _eventsController.add(event); } /// Subscription has 3 states: @@ -107,16 +128,6 @@ abstract base class CentrifugeClientSubscriptionBase _stateController.add(_state = state); } - /// Stream of publications. - /// {@nodoc} - @override - Stream get publications => - _publicationController.stream; - - /// {@nodoc} - final StreamController _publicationController = - StreamController.broadcast(); - /// Notify about new publication. /// {@nodoc} @internal @@ -124,14 +135,13 @@ abstract base class CentrifugeClientSubscriptionBase void handlePublication(CentrifugePublication publication) { final offset = publication.offset; if (offset != null && offset > _offset) _offset = offset; - _publicationController.add(publication); } /// {@nodoc} @internal @mustCallSuper Future close() async { - _publicationController.close().ignore(); + await _eventsController.close(); } } @@ -416,24 +426,6 @@ base mixin CentrifugeClientSubscriptionPresenceMixin on CentrifugeClientSubscriptionBase, CentrifugeClientSubscriptionErrorsMixin { - @protected - @nonVirtual - final StreamController - _presenceEventsController = - StreamController.broadcast(); - - @override - @nonVirtual - late final CentrifugeChannelPresenceStream presenceEvents = - CentrifugeChannelPresenceStream(_presenceEventsController.stream); - - /// Notify about new presence event. - /// {@nodoc} - @internal - @nonVirtual - void handlePresenceEvent(CentrifugeChannelPresenceEvent event) => - _presenceEventsController.add(event); - @override Future presence() async { await ready(); diff --git a/lib/src/subscription/client_subscription_manager.dart b/lib/src/subscription/client_subscription_manager.dart index 609f7f3..ce29f65 100644 --- a/lib/src/subscription/client_subscription_manager.dart +++ b/lib/src/subscription/client_subscription_manager.dart @@ -1,8 +1,6 @@ import 'dart:collection'; -import 'package:centrifuge_dart/src/model/channel_presence.dart'; import 'package:centrifuge_dart/src/model/exception.dart'; -import 'package:centrifuge_dart/src/model/publication.dart'; import 'package:centrifuge_dart/src/subscription/client_subscription_impl.dart'; import 'package:centrifuge_dart/src/subscription/subscription.dart'; import 'package:centrifuge_dart/src/subscription/subscription_config.dart'; @@ -50,21 +48,6 @@ final class ClientSubscriptionManager { ); } - /// Notify subscription about new publication. - /// {@nodoc} - @internal - @nonVirtual - void handlePublication(CentrifugePublication publication) => - _channelSubscriptions[publication.channel] - ?.handlePublication(publication); - - /// Notify subscription about new presence event. - /// {@nodoc} - @internal - @nonVirtual - void handlePresenceEvent(CentrifugeChannelPresenceEvent event) => - _channelSubscriptions[event.channel]?.handlePresenceEvent(event); - /// Get map wirth all registered client-side subscriptions. /// Returns all registered subscriptions, /// so you can iterate over all and do some action if required diff --git a/lib/src/subscription/subscription.dart b/lib/src/subscription/subscription.dart index d3406c7..7f6587f 100644 --- a/lib/src/subscription/subscription.dart +++ b/lib/src/subscription/subscription.dart @@ -1,11 +1,10 @@ import 'dart:async'; -import 'package:centrifuge_dart/src/model/channel_presence_stream.dart'; +import 'package:centrifuge_dart/src/model/event_stream.dart'; import 'package:centrifuge_dart/src/model/exception.dart'; import 'package:centrifuge_dart/src/model/history.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; -import 'package:centrifuge_dart/src/model/publication.dart'; import 'package:centrifuge_dart/src/model/stream_position.dart'; import 'package:centrifuge_dart/src/subscription/subscription_state.dart'; import 'package:centrifuge_dart/src/subscription/subscription_states_stream.dart'; @@ -86,11 +85,9 @@ abstract interface class CentrifugeClientSubscription /// Stream of subscription states. abstract final CentrifugeSubscriptionStateStream states; - /// Stream of publications. - abstract final Stream publications; - - /// Stream of presence (join & leave) events. - abstract final CentrifugeChannelPresenceStream presenceEvents; + /// Stream of events (publications, join/leave messages and etc) at + /// this subscription. + abstract final CentrifugeEventStream events; /// Errors stream. abstract final Stream<