Skip to content

Commit

Permalink
Add EventReceiver
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Aug 1, 2023
1 parent 764971c commit fde3cd7
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 174 deletions.
89 changes: 15 additions & 74 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import 'package:centrifuge_dart/src/client/config.dart';
import 'package:centrifuge_dart/src/client/disconnect_code.dart';
import 'package:centrifuge_dart/src/client/state.dart';
import 'package:centrifuge_dart/src/client/states_stream.dart';
import 'package:centrifuge_dart/src/model/channel_presence.dart';
import 'package:centrifuge_dart/src/model/channel_presence_stream.dart';
import 'package:centrifuge_dart/src/model/event.dart';
import 'package:centrifuge_dart/src/model/event_stream.dart';
import 'package:centrifuge_dart/src/model/exception.dart';
import 'package:centrifuge_dart/src/model/presence.dart';
import 'package:centrifuge_dart/src/model/presence_stats.dart';
import 'package:centrifuge_dart/src/model/publication.dart';
import 'package:centrifuge_dart/src/subscription/client_subscription_manager.dart';
import 'package:centrifuge_dart/src/subscription/subscription.dart';
import 'package:centrifuge_dart/src/subscription/subscription_config.dart';
Expand Down Expand Up @@ -68,6 +66,16 @@ abstract base class CentrifugeBase implements ICentrifuge {
@nonVirtual
final CentrifugeConfig _config;

@protected
@nonVirtual
final StreamController<CentrifugeEvent> _eventsController =
StreamController<CentrifugeEvent>.broadcast();

@override
@nonVirtual
late final CentrifugeEventStream events =
CentrifugeEventStream(_eventsController.stream);

/// Init centrifuge client, override this method to add custom logic.
/// This method is called in constructor.
/// {@nodoc}
Expand Down Expand Up @@ -100,28 +108,20 @@ abstract base class CentrifugeBase implements ICentrifuge {
@protected
@mustCallSuper
void _onEvent(CentrifugeEvent event) {
switch (event) {
/* switch (event) {
case CentrifugePublication publication:
_onPublication(publication);
case CentrifugeChannelPresenceEvent event:
_onPresenceEvent(event);
}
} */
_eventsController.add(event);
}

/// Called when publication received.
/// {@nodoc}
@protected
void _onPublication(CentrifugePublication publication);

/// Called when presence event received.
/// {@nodoc}
@protected
void _onPresenceEvent(CentrifugeChannelPresenceEvent event);

@override
@mustCallSuper
Future<void> close() async {
_transport.events.removeListener(_onEvent);
_eventsController.close().ignore();
}
}

Expand Down Expand Up @@ -439,21 +439,6 @@ base mixin CentrifugePublicationsMixin
CentrifugeErrorsMixin,
CentrifugeClientSubscriptionMixin {
@override
void _initCentrifuge() {
super._initCentrifuge();
}

@protected
@nonVirtual
final StreamController<CentrifugePublication> _publicationsController =
StreamController<CentrifugePublication>.broadcast();

@override
@nonVirtual
late final Stream<CentrifugePublication> publications =
_publicationsController.stream;

@override
Future<void> publish(String channel, List<int> data) async {
try {
await ready();
Expand All @@ -470,20 +455,6 @@ base mixin CentrifugePublicationsMixin
Error.throwWithStackTrace(centrifugeException, stackTrace);
}
}

@override
@nonVirtual
@pragma('vm:prefer-inline')
@pragma('dart2js:tryInline')
void _onPublication(CentrifugePublication publication) {
_clientSubscriptionManager.handlePublication(publication);
_publicationsController.add(publication);
}

@override
Future<void> close() => super.close().whenComplete(() {
_publicationsController.close().ignore();
});
}

/// Mixin responsible for presence.
Expand All @@ -494,22 +465,6 @@ base mixin CentrifugePresenceMixin
CentrifugeErrorsMixin,
CentrifugeClientSubscriptionMixin {
@override
void _initCentrifuge() {
super._initCentrifuge();
}

@protected
@nonVirtual
final StreamController<CentrifugeChannelPresenceEvent>
_presenceEventsController =
StreamController<CentrifugeChannelPresenceEvent>.broadcast();

@override
@nonVirtual
late final CentrifugeChannelPresenceStream presenceEvents =
CentrifugeChannelPresenceStream(_presenceEventsController.stream);

@override
Future<CentrifugePresence> presence(String channel) async {
try {
await ready();
Expand Down Expand Up @@ -544,20 +499,6 @@ base mixin CentrifugePresenceMixin
Error.throwWithStackTrace(centrifugeException, stackTrace);
}
}

@override
@nonVirtual
@pragma('vm:prefer-inline')
@pragma('dart2js:tryInline')
void _onPresenceEvent(CentrifugeChannelPresenceEvent event) {
_clientSubscriptionManager.handlePresenceEvent(event);
_presenceEventsController.add(event);
}

@override
Future<void> close() => super.close().whenComplete(() {
_presenceEventsController.close().ignore();
});
}

/// Mixin responsible for queue.
Expand Down
19 changes: 8 additions & 11 deletions lib/src/client/centrifuge_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import 'dart:async';

import 'package:centrifuge_dart/centrifuge.dart';
import 'package:centrifuge_dart/src/model/channel_presence_stream.dart';
import 'package:centrifuge_dart/src/model/event_stream.dart';
import 'package:centrifuge_dart/src/model/presence.dart';
import 'package:centrifuge_dart/src/model/presence_stats.dart';

Expand All @@ -13,7 +13,7 @@ abstract interface class ICentrifuge
ICentrifugeStateOwner,
ICentrifugeAsyncMessageSender,
ICentrifugePublicationSender,
ICentrifugePublicationReceiver,
ICentrifugeEventReceiver,
ICentrifugeClientSubscriptionsManager,
ICentrifugePresenceOwner {
/// Stream of errors.
Expand Down Expand Up @@ -62,12 +62,6 @@ abstract interface class ICentrifugePublicationSender {
Future<void> publish(String channel, List<int> data);
}

/// Centrifuge receive publication interface.
abstract interface class ICentrifugePublicationReceiver {
/// Stream of publications.
abstract final Stream<CentrifugePublication> publications;
}

/// Centrifuge send asynchronous message interface.
abstract interface class ICentrifugeAsyncMessageSender {
/// Send asynchronous message to a server. This method makes sense
Expand All @@ -76,6 +70,12 @@ abstract interface class ICentrifugeAsyncMessageSender {
Future<void> send(List<int> data);
}

/// Centrifuge event receiver interface.
abstract interface class ICentrifugeEventReceiver {
/// Stream of centrifuge events.
abstract final CentrifugeEventStream events;
}

/// Centrifuge client subscriptions manager interface.
abstract interface class ICentrifugeClientSubscriptionsManager {
/// Create new client-side subscription.
Expand Down Expand Up @@ -108,9 +108,6 @@ abstract interface class ICentrifugeClientSubscriptionsManager {

/// Centrifuge presence owner interface.
abstract interface class ICentrifugePresenceOwner {
/// Stream of presence (join & leave) events.
abstract final CentrifugeChannelPresenceStream presenceEvents;

/// Fetch presence information inside a channel.
Future<CentrifugePresence> presence(String channel);

Expand Down
33 changes: 0 additions & 33 deletions lib/src/model/channel_presence_stream.dart

This file was deleted.

39 changes: 39 additions & 0 deletions lib/src/model/event_stream.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import 'dart:async';

import 'package:centrifuge_dart/src/model/channel_presence.dart';
import 'package:centrifuge_dart/src/model/event.dart';
import 'package:centrifuge_dart/src/model/publication.dart';

/// Stream of received events.
/// {@category Entity}
/// {@subCategory Event}
/// {@subCategory Channel}
final class CentrifugeEventStream extends StreamView<CentrifugeEvent> {
/// Stream of received events.
CentrifugeEventStream(super.stream);

/// Publications stream.
late final Stream<CentrifugePublication> publications =
whereType<CentrifugePublication>();

/// Stream of presence (join & leave) events.
late final Stream<CentrifugeChannelPresenceEvent> presenceEvents =
whereType<CentrifugeChannelPresenceEvent>();

/// Join events
late final Stream<CentrifugeJoinEvent> joinEvents =
whereType<CentrifugeJoinEvent>();

/// Leave events
late final Stream<CentrifugeLeaveEvent> leaveEvents =
whereType<CentrifugeLeaveEvent>();

/// Filtered stream of data of [CentrifugeEvent].
Stream<T> whereType<T extends CentrifugeEvent>() =>
transform<T>(StreamTransformer<CentrifugeEvent, T>.fromHandlers(
handleData: (data, sink) => switch (data) {
T valid => sink.add(valid),
_ => null,
},
)).asBroadcastStream();
}
56 changes: 24 additions & 32 deletions lib/src/subscription/client_subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import 'dart:async';

import 'package:centrifuge_dart/centrifuge.dart';
import 'package:centrifuge_dart/src/client/disconnect_code.dart';
import 'package:centrifuge_dart/src/model/channel_presence.dart';
import 'package:centrifuge_dart/src/model/channel_presence_stream.dart';
import 'package:centrifuge_dart/src/model/channel_event.dart';
import 'package:centrifuge_dart/src/model/event.dart';
import 'package:centrifuge_dart/src/model/event_stream.dart';
import 'package:centrifuge_dart/src/model/history.dart';
import 'package:centrifuge_dart/src/model/presence.dart';
import 'package:centrifuge_dart/src/model/presence_stats.dart';
Expand Down Expand Up @@ -70,6 +71,16 @@ abstract base class CentrifugeClientSubscriptionBase
/// {@nodoc}
final CentrifugeSubscriptionConfig _config;

@protected
@nonVirtual
final StreamController<CentrifugeEvent> _eventsController =
StreamController<CentrifugeEvent>.broadcast();

@override
@nonVirtual
late final CentrifugeEventStream events =
CentrifugeEventStream(_eventsController.stream);

/// Init subscription.
/// {@nodoc}
@protected
Expand All @@ -78,6 +89,16 @@ abstract base class CentrifugeClientSubscriptionBase
_state = CentrifugeSubscriptionState.unsubscribed(
since: _config.since, code: 0, reason: 'initial state');
_offset = _config.since?.offset ?? fixnum.Int64.ZERO;
_transport.events.addListener(_onEvent);
}

/// Router for all events.
/// {@nodoc}
@protected
@mustCallSuper
void _onEvent(CentrifugeEvent event) {
if (event is! CentrifugeChannelEvent || event.channel != channel) return;
_eventsController.add(event);
}

/// Subscription has 3 states:
Expand Down Expand Up @@ -107,31 +128,20 @@ abstract base class CentrifugeClientSubscriptionBase
_stateController.add(_state = state);
}

/// Stream of publications.
/// {@nodoc}
@override
Stream<CentrifugePublication> get publications =>
_publicationController.stream;

/// {@nodoc}
final StreamController<CentrifugePublication> _publicationController =
StreamController<CentrifugePublication>.broadcast();

/// Notify about new publication.
/// {@nodoc}
@internal
@nonVirtual
void handlePublication(CentrifugePublication publication) {
final offset = publication.offset;
if (offset != null && offset > _offset) _offset = offset;
_publicationController.add(publication);
}

/// {@nodoc}
@internal
@mustCallSuper
Future<void> close() async {
_publicationController.close().ignore();
await _eventsController.close();
}
}

Expand Down Expand Up @@ -416,24 +426,6 @@ base mixin CentrifugeClientSubscriptionPresenceMixin
on
CentrifugeClientSubscriptionBase,
CentrifugeClientSubscriptionErrorsMixin {
@protected
@nonVirtual
final StreamController<CentrifugeChannelPresenceEvent>
_presenceEventsController =
StreamController<CentrifugeChannelPresenceEvent>.broadcast();

@override
@nonVirtual
late final CentrifugeChannelPresenceStream presenceEvents =
CentrifugeChannelPresenceStream(_presenceEventsController.stream);

/// Notify about new presence event.
/// {@nodoc}
@internal
@nonVirtual
void handlePresenceEvent(CentrifugeChannelPresenceEvent event) =>
_presenceEventsController.add(event);

@override
Future<CentrifugePresence> presence() async {
await ready();
Expand Down
Loading

0 comments on commit fde3cd7

Please sign in to comment.