Skip to content

Commit

Permalink
onPush
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Aug 1, 2023
1 parent fde3cd7 commit 63549cc
Show file tree
Hide file tree
Showing 14 changed files with 287 additions and 120 deletions.
111 changes: 86 additions & 25 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import 'package:centrifuge_dart/src/client/config.dart';
import 'package:centrifuge_dart/src/client/disconnect_code.dart';
import 'package:centrifuge_dart/src/client/state.dart';
import 'package:centrifuge_dart/src/client/states_stream.dart';
import 'package:centrifuge_dart/src/model/channel_presence.dart';
import 'package:centrifuge_dart/src/model/channel_push.dart';
import 'package:centrifuge_dart/src/model/event.dart';
import 'package:centrifuge_dart/src/model/event_stream.dart';
import 'package:centrifuge_dart/src/model/exception.dart';
import 'package:centrifuge_dart/src/model/presence.dart';
import 'package:centrifuge_dart/src/model/presence_stats.dart';
import 'package:centrifuge_dart/src/model/publication.dart';
import 'package:centrifuge_dart/src/model/pushes_stream.dart';
import 'package:centrifuge_dart/src/subscription/client_subscription_manager.dart';
import 'package:centrifuge_dart/src/subscription/subscription.dart';
import 'package:centrifuge_dart/src/subscription/subscription_config.dart';
Expand All @@ -25,6 +28,7 @@ import 'package:stack_trace/stack_trace.dart' as st;
/// {@endtemplate}
final class Centrifuge extends CentrifugeBase
with
CentrifugeEventReceiverMixin,
CentrifugeErrorsMixin,
CentrifugeStateMixin,
CentrifugeConnectionMixin,
Expand Down Expand Up @@ -66,24 +70,17 @@ abstract base class CentrifugeBase implements ICentrifuge {
@nonVirtual
final CentrifugeConfig _config;

@protected
@nonVirtual
final StreamController<CentrifugeEvent> _eventsController =
StreamController<CentrifugeEvent>.broadcast();

@override
@nonVirtual
late final CentrifugeEventStream events =
CentrifugeEventStream(_eventsController.stream);
/// Manager responsible for client-side subscriptions.
/// {@nodoc}
late final ClientSubscriptionManager _clientSubscriptionManager =
ClientSubscriptionManager(_transport);

/// Init centrifuge client, override this method to add custom logic.
/// This method is called in constructor.
/// {@nodoc}
@protected
@mustCallSuper
void _initCentrifuge() {
_transport.events.addListener(_onEvent);
}
void _initCentrifuge() {}

/// Called when connection established.
/// Right before [CentrifugeState$Connected] state.
Expand All @@ -103,25 +100,92 @@ abstract base class CentrifugeBase implements ICentrifuge {
logger.fine('Connection lost');
}

@override
@mustCallSuper
Future<void> close() async {}
}

/// Mixin responsible for event receiving and distribution by controllers
/// and streams to subscribers.
/// {@nodoc}
base mixin CentrifugeEventReceiverMixin on CentrifugeBase {
@protected
@nonVirtual
final StreamController<CentrifugeChannelPush> _pushController =
StreamController<CentrifugeChannelPush>.broadcast();

@protected
@nonVirtual
final StreamController<CentrifugePublication> _publicationsController =
StreamController<CentrifugePublication>.broadcast();

@protected
@nonVirtual
final StreamController<CentrifugeJoin> _joinController =
StreamController<CentrifugeJoin>.broadcast();

@protected
@nonVirtual
final StreamController<CentrifugeLeave> _leaveController =
StreamController<CentrifugeLeave>.broadcast();

@protected
@nonVirtual
final StreamController<CentrifugeChannelPresence> _presenceController =
StreamController<CentrifugeChannelPresence>.broadcast();

@override
@nonVirtual
late final CentrifugePushesStream stream = CentrifugePushesStream(
pushes: _pushController.stream,
publications: _publicationsController.stream,
presenceEvents: _presenceController.stream,
joinEvents: _joinController.stream,
leaveEvents: _leaveController.stream,
);

@override
void _initCentrifuge() {
_transport.events.addListener(_onEvent);
super._initCentrifuge();
}

/// Router for all events.
/// {@nodoc}
@protected
@mustCallSuper
@nonVirtual
@pragma('vm:prefer-inline')
@pragma('dart2js:tryInline')
void _onEvent(CentrifugeEvent event) {
/* switch (event) {
if (event is! CentrifugeChannelPush) return;
// This is a push to a channel.
_clientSubscriptionManager.onPush(event);
_pushController.add(event);
switch (event) {
case CentrifugePublication publication:
_onPublication(publication);
case CentrifugeChannelPresenceEvent event:
_onPresenceEvent(event);
} */
_eventsController.add(event);
_publicationsController.add(publication);
case CentrifugeJoin join:
_presenceController.add(join);
_joinController.add(join);
case CentrifugeLeave leave:
_presenceController.add(leave);
_leaveController.add(leave);
}
}

@override
@mustCallSuper
Future<void> close() async {
await super.close();
_transport.events.removeListener(_onEvent);
_eventsController.close().ignore();
for (final controller in <StreamSink<CentrifugeEvent>>[
_pushController,
_publicationsController,
_joinController,
_leaveController,
_presenceController,
]) {
controller.close().ignore();
}
}
}

Expand Down Expand Up @@ -373,9 +437,6 @@ base mixin CentrifugeSendMixin on CentrifugeBase, CentrifugeErrorsMixin {
@internal
base mixin CentrifugeClientSubscriptionMixin
on CentrifugeBase, CentrifugeErrorsMixin {
late final ClientSubscriptionManager _clientSubscriptionManager =
ClientSubscriptionManager(_transport);

@override
CentrifugeClientSubscription newSubscription(
String channel, [
Expand Down
6 changes: 3 additions & 3 deletions lib/src/client/centrifuge_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import 'dart:async';

import 'package:centrifuge_dart/centrifuge.dart';
import 'package:centrifuge_dart/src/model/event_stream.dart';
import 'package:centrifuge_dart/src/model/presence.dart';
import 'package:centrifuge_dart/src/model/presence_stats.dart';
import 'package:centrifuge_dart/src/model/pushes_stream.dart';

/// Centrifuge client interface.
abstract interface class ICentrifuge
Expand Down Expand Up @@ -72,8 +72,8 @@ abstract interface class ICentrifugeAsyncMessageSender {

/// Centrifuge event receiver interface.
abstract interface class ICentrifugeEventReceiver {
/// Stream of centrifuge events.
abstract final CentrifugeEventStream events;
/// Stream of received pushes from Centrifugo server for a channel.
abstract final CentrifugePushesStream stream;
}

/// Centrifuge client subscriptions manager interface.
Expand Down
32 changes: 19 additions & 13 deletions lib/src/model/channel_presence.dart
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import 'package:centrifuge_dart/src/model/channel_event.dart';
import 'package:centrifuge_dart/src/model/channel_push.dart';
import 'package:centrifuge_dart/src/model/client_info.dart';
import 'package:meta/meta.dart';

/// {@template channel_presence_event}
/// {@template channel_presence}
/// Channel presence.
/// Join / Leave events.
/// {@endtemplate}
/// {@category Entity}
/// {@subCategory Channel}
/// {@subCategory Presence}
@immutable
sealed class CentrifugeChannelPresenceEvent extends CentrifugeChannelEvent {
/// {@macro channel_presence_event}
const CentrifugeChannelPresenceEvent({
sealed class CentrifugeChannelPresence extends CentrifugeChannelPush {
/// {@macro channel_presence}
const CentrifugeChannelPresence({
required super.channel,
required this.info,
});
Expand All @@ -27,29 +27,35 @@ sealed class CentrifugeChannelPresenceEvent extends CentrifugeChannelEvent {
abstract final bool isLeave;
}

/// {@macro channel_presence_event}
final class CentrifugeJoinEvent extends CentrifugeChannelPresenceEvent {
/// {@macro channel_presence_event}
const CentrifugeJoinEvent({
/// {@macro channel_presence}
final class CentrifugeJoin extends CentrifugeChannelPresence {
/// {@macro channel_presence}
const CentrifugeJoin({
required super.channel,
required super.info,
});

@override
String get type => 'join';

@override
bool get isJoin => true;

@override
bool get isLeave => false;
}

/// {@macro channel_presence_event}
final class CentrifugeLeaveEvent extends CentrifugeChannelPresenceEvent {
/// {@macro channel_presence_event}
const CentrifugeLeaveEvent({
/// {@macro channel_presence}
final class CentrifugeLeave extends CentrifugeChannelPresence {
/// {@macro channel_presence}
const CentrifugeLeave({
required super.channel,
required super.info,
});

@override
String get type => 'leave';

@override
bool get isJoin => false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import 'package:centrifuge_dart/src/model/event.dart';
/// {@template centrifuge_channel_event}
/// Base class for all channel events.
/// {@endtemplate}
abstract base class CentrifugeChannelEvent extends CentrifugeEvent {
abstract base class CentrifugeChannelPush extends CentrifugeEvent {
/// {@template centrifuge_channel_event}
const CentrifugeChannelEvent({
const CentrifugeChannelPush({
required this.channel,
});

Expand Down
3 changes: 3 additions & 0 deletions lib/src/model/event.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ import 'package:meta/meta.dart';
abstract base class CentrifugeEvent {
/// {@template centrifuge_event}
const CentrifugeEvent();

/// Event type.
abstract final String type;
}
39 changes: 0 additions & 39 deletions lib/src/model/event_stream.dart

This file was deleted.

7 changes: 5 additions & 2 deletions lib/src/model/publication.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import 'package:centrifuge_dart/src/model/channel_event.dart';
import 'package:centrifuge_dart/src/model/channel_push.dart';
import 'package:centrifuge_dart/src/model/client_info.dart';
import 'package:fixnum/fixnum.dart' as fixnum;
import 'package:meta/meta.dart';
Expand All @@ -8,7 +8,7 @@ import 'package:meta/meta.dart';
/// {@endtemplate}
/// {@category Entity}
@immutable
final class CentrifugePublication extends CentrifugeChannelEvent {
final class CentrifugePublication extends CentrifugeChannelPush {
/// {@macro publication}
const CentrifugePublication({
required super.channel,
Expand All @@ -18,6 +18,9 @@ final class CentrifugePublication extends CentrifugeChannelEvent {
this.tags,
});

@override
String get type => 'publication';

/// Publication payload
final List<int> data;

Expand Down
42 changes: 42 additions & 0 deletions lib/src/model/pushes_stream.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import 'dart:async';

import 'package:centrifuge_dart/src/model/channel_presence.dart';
import 'package:centrifuge_dart/src/model/channel_push.dart';
import 'package:centrifuge_dart/src/model/event.dart';
import 'package:centrifuge_dart/src/model/publication.dart';

/// Stream of received pushes from Centrifugo server for a channel.
/// {@category Entity}
/// {@subCategory Event}
/// {@subCategory Channel}
final class CentrifugePushesStream extends StreamView<CentrifugeChannelPush> {
/// Stream of received events.
CentrifugePushesStream({
required Stream<CentrifugeChannelPush> pushes,
required this.publications,
required this.presenceEvents,
required this.joinEvents,
required this.leaveEvents,
}) : super(pushes);

/// Publications stream.
final Stream<CentrifugePublication> publications;

/// Stream of presence (join & leave) events.
final Stream<CentrifugeChannelPresence> presenceEvents;

/// Join events
final Stream<CentrifugeJoin> joinEvents;

/// Leave events
final Stream<CentrifugeLeave> leaveEvents;

/// Filtered stream of data of [CentrifugeEvent].
Stream<T> whereType<T extends CentrifugeChannelPush>() =>
transform<T>(StreamTransformer<CentrifugeChannelPush, T>.fromHandlers(
handleData: (data, sink) => switch (data) {
T valid => sink.add(valid),
_ => null,
},
)).asBroadcastStream();
}
12 changes: 12 additions & 0 deletions lib/src/model/subscribe_event.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import 'package:centrifuge_dart/src/model/channel_push.dart';

/// {@macro subscribe_event}
final class CentrifugeSubscribeEvent extends CentrifugeChannelPush {
/// {@macro subscribe_event}
const CentrifugeSubscribeEvent({
required super.channel,
});

@override
String get type => 'subscribe';
}
Loading

0 comments on commit 63549cc

Please sign in to comment.