Skip to content

Commit

Permalink
Add centrifuge observer
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Aug 3, 2023
1 parent 1788237 commit f345d1d
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 89 deletions.
8 changes: 0 additions & 8 deletions examples/console/bin/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@ void main([List<String>? args]) {
// e.g. `client.states.connected`
client.states.listen((state) => print('State changed to: $state'));

// Handle all centrifuge exceptions.
client.errors.listen(
(error) => print(
'Exception: ${error.exception}, '
'Stack trace: ${error.stackTrace}',
),
);

// TODO(plugfox): Read from stdin and send to channel.

/* // Close client
Expand Down
39 changes: 14 additions & 25 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:async';
import 'package:centrifuge_dart/src/client/centrifuge_interface.dart';
import 'package:centrifuge_dart/src/client/config.dart';
import 'package:centrifuge_dart/src/client/disconnect_code.dart';
import 'package:centrifuge_dart/src/client/observer.dart';
import 'package:centrifuge_dart/src/client/state.dart';
import 'package:centrifuge_dart/src/client/states_stream.dart';
import 'package:centrifuge_dart/src/model/channel_presence.dart';
Expand All @@ -28,7 +29,6 @@ import 'package:centrifuge_dart/src/transport/ws_protobuf_transport.dart';
import 'package:centrifuge_dart/src/util/event_queue.dart';
import 'package:centrifuge_dart/src/util/logger.dart' as logger;
import 'package:meta/meta.dart';
import 'package:stack_trace/stack_trace.dart' as st;

/// {@template centrifuge}
/// Centrifuge client.
Expand All @@ -54,6 +54,9 @@ final class Centrifuge extends CentrifugeBase
/// {@macro centrifuge}
factory Centrifuge.connect(String url, [CentrifugeConfig? config]) =>
Centrifuge(config)..connect(url);

/// The current [CentrifugeObserver] instance.
static CentrifugeObserver? observer;
}

/// {@nodoc}
Expand Down Expand Up @@ -93,7 +96,9 @@ abstract base class CentrifugeBase implements ICentrifuge {
/// {@nodoc}
@protected
@mustCallSuper
void _initCentrifuge() {}
void _initCentrifuge() {
Centrifuge.observer?.onCreate(this);
}

/// Called when connection established.
/// Right before [CentrifugeState$Connected] state.
Expand All @@ -102,6 +107,7 @@ abstract base class CentrifugeBase implements ICentrifuge {
@mustCallSuper
void _onConnected(CentrifugeState$Connected state) {
logger.fine('Connection established');
Centrifuge.observer?.onConnected(this, state);
}

/// Called when connection lost.
Expand All @@ -111,11 +117,15 @@ abstract base class CentrifugeBase implements ICentrifuge {
@mustCallSuper
void _onDisconnected(CentrifugeState$Disconnected state) {
logger.fine('Connection lost');
Centrifuge.observer?.onDisconnected(this, state);
}

@override
@mustCallSuper
Future<void> close() async {}
Future<void> close() async {
await _transport.close();
Centrifuge.observer?.onClose(this);
}
}

/// Mixin responsible for event receiving and distribution by controllers
Expand Down Expand Up @@ -349,27 +359,7 @@ base mixin CentrifugeErrorsMixin on CentrifugeBase {
@protected
@nonVirtual
void _emitError(CentrifugeException exception, StackTrace stackTrace) =>
_errorsController.add(
(
exception: exception,
stackTrace: st.Trace.from(stackTrace).terse,
),
);

late final StreamController<
({CentrifugeException exception, StackTrace stackTrace})>
_errorsController = StreamController<
({CentrifugeException exception, StackTrace stackTrace})>.broadcast();

@override
late final Stream<({CentrifugeException exception, StackTrace stackTrace})>
errors = _errorsController.stream;

@override
Future<void> close() async {
await super.close();
_errorsController.close().ignore();
}
Centrifuge.observer?.onError(exception, stackTrace);
}

/// Mixin responsible for connection.
Expand Down Expand Up @@ -465,7 +455,6 @@ base mixin CentrifugeConnectionMixin
Future<void> close() async {
logger.fine('Interactively closing');
await super.close();
await _transport.close();
}
}

Expand Down
7 changes: 0 additions & 7 deletions lib/src/client/centrifuge_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ abstract interface class ICentrifuge
ICentrifugeEventReceiver,
ICentrifugeClientSubscriptionsManager,
ICentrifugePresenceOwner {
/// Stream of errors.
abstract final Stream<
({CentrifugeException exception, StackTrace stackTrace})> errors;

/// Connect to the server.
/// [url] is a URL of endpoint.
Future<void> connect(String url);
Expand All @@ -42,9 +38,6 @@ abstract interface class ICentrifuge
/// Send arbitrary RPC and wait for response.
/* Future<void> rpc(String method, data); */

/// Publish data to the channel.
/* Future<PublishResult> publish(String channel, List<int> data); */

/// Send History command.
/* Future<HistoryResult> history(String channel,
{int limit = 0, StreamPosition? since, bool reverse = false}); */
Expand Down
22 changes: 22 additions & 0 deletions lib/src/client/observer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import 'package:centrifuge_dart/centrifuge.dart';
import 'package:centrifuge_dart/src/client/centrifuge_interface.dart';

/// An interface for observing the behavior of Centrifuge instances.
abstract class CentrifugeObserver {
/// Called whenever a [ICentrifuge] is instantiated.
void onCreate(ICentrifuge client) {}

/// Called whenever a [ICentrifuge] client changes its state
/// to [CentrifugeState$Connecting].
void onConnected(ICentrifuge client, CentrifugeState$Connected state) {}

/// Called whenever a [ICentrifuge] client changes its state
/// to [CentrifugeState$Disconnected].
void onDisconnected(ICentrifuge client, CentrifugeState$Disconnected state) {}

/// Called whenever an error is thrown in any Centrifuge client.
void onError(CentrifugeException error, StackTrace stackTrace) {}

/// Called whenever a [ICentrifuge] is closed.
void onClose(ICentrifuge client) {}
}
23 changes: 1 addition & 22 deletions lib/src/subscription/client_subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import 'package:centrifuge_dart/src/util/event_queue.dart';
import 'package:centrifuge_dart/src/util/logger.dart' as logger;
import 'package:fixnum/fixnum.dart' as fixnum;
import 'package:meta/meta.dart';
import 'package:stack_trace/stack_trace.dart' as st;

/// Client-side subscription implementation.
/// {@nodoc}
Expand Down Expand Up @@ -253,27 +252,7 @@ base mixin CentrifugeClientSubscriptionErrorsMixin
@protected
@nonVirtual
void _emitError(CentrifugeException exception, StackTrace stackTrace) =>
_errorsController.add(
(
exception: exception,
stackTrace: st.Trace.from(stackTrace).terse,
),
);

late final StreamController<
({CentrifugeException exception, StackTrace stackTrace})>
_errorsController = StreamController<
({CentrifugeException exception, StackTrace stackTrace})>.broadcast();

@override
late final Stream<({CentrifugeException exception, StackTrace stackTrace})>
errors = _errorsController.stream;

@override
Future<void> close([int code = 0, String reason = 'closed']) async {
await super.close(code, reason);
_errorsController.close().ignore();
}
Centrifuge.observer?.onError(exception, stackTrace);
}

/// Mixin responsible for subscribing.
Expand Down
24 changes: 2 additions & 22 deletions lib/src/subscription/server_subscription_impl.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'dart:async';

import 'package:centrifuge_dart/src/client/centrifuge.dart';
import 'package:centrifuge_dart/src/model/channel_presence.dart';
import 'package:centrifuge_dart/src/model/channel_push.dart';
import 'package:centrifuge_dart/src/model/connect.dart';
Expand All @@ -24,7 +25,6 @@ import 'package:centrifuge_dart/src/util/event_queue.dart';
import 'package:centrifuge_dart/src/util/logger.dart' as logger;
import 'package:fixnum/fixnum.dart' as fixnum;
import 'package:meta/meta.dart';
import 'package:stack_trace/stack_trace.dart' as st;

/// Server-side subscription implementation.
/// {@nodoc}
Expand Down Expand Up @@ -258,27 +258,7 @@ base mixin CentrifugeServerSubscriptionErrorsMixin
@protected
@nonVirtual
void _emitError(CentrifugeException exception, StackTrace stackTrace) =>
_errorsController.add(
(
exception: exception,
stackTrace: st.Trace.from(stackTrace).terse,
),
);

late final StreamController<
({CentrifugeException exception, StackTrace stackTrace})>
_errorsController = StreamController<
({CentrifugeException exception, StackTrace stackTrace})>.broadcast();

@override
late final Stream<({CentrifugeException exception, StackTrace stackTrace})>
errors = _errorsController.stream;

@override
Future<void> close([int code = 0, String reason = 'closed']) async {
await super.close(code, reason);
_errorsController.close().ignore();
}
Centrifuge.observer?.onError(exception, stackTrace);
}

/// Mixin responsible for ready method.
Expand Down
5 changes: 0 additions & 5 deletions lib/src/subscription/subscription.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import 'dart:async';

import 'package:centrifuge_dart/src/model/exception.dart';
import 'package:centrifuge_dart/src/model/history.dart';
import 'package:centrifuge_dart/src/model/presence.dart';
import 'package:centrifuge_dart/src/model/presence_stats.dart';
Expand Down Expand Up @@ -30,10 +29,6 @@ abstract interface class ICentrifugeSubscription {
/// Stream of received pushes from Centrifugo server for a channel.
abstract final CentrifugePushesStream stream;

/// Errors stream.
abstract final Stream<
({CentrifugeException exception, StackTrace stackTrace})> errors;

/// Await for subscription to be ready.
/// Ready resolves when subscription successfully subscribed.
/// Throws exceptions if called not in subscribing or subscribed state.
Expand Down

0 comments on commit f345d1d

Please sign in to comment.