From 8901a3e6131932b103a068c307edafc009cfe257 Mon Sep 17 00:00:00 2001 From: Plague Fox Date: Thu, 3 Aug 2023 17:00:47 +0400 Subject: [PATCH] Add history to client --- lib/src/client/centrifuge.dart | 59 ++++++++++++++++++++++-- lib/src/client/centrifuge_interface.dart | 21 +++++++-- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/lib/src/client/centrifuge.dart b/lib/src/client/centrifuge.dart index f66e0be..9d473ac 100644 --- a/lib/src/client/centrifuge.dart +++ b/lib/src/client/centrifuge.dart @@ -12,12 +12,14 @@ import 'package:centrifuge_dart/src/model/connect.dart'; import 'package:centrifuge_dart/src/model/disconnect.dart'; import 'package:centrifuge_dart/src/model/event.dart'; import 'package:centrifuge_dart/src/model/exception.dart'; +import 'package:centrifuge_dart/src/model/history.dart'; import 'package:centrifuge_dart/src/model/message.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; import 'package:centrifuge_dart/src/model/publication.dart'; import 'package:centrifuge_dart/src/model/pushes_stream.dart'; import 'package:centrifuge_dart/src/model/refresh.dart'; +import 'package:centrifuge_dart/src/model/stream_position.dart'; import 'package:centrifuge_dart/src/model/subscribe.dart'; import 'package:centrifuge_dart/src/model/unsubscribe.dart'; import 'package:centrifuge_dart/src/subscription/client_subscription_manager.dart'; @@ -45,6 +47,7 @@ final class Centrifuge extends CentrifugeBase CentrifugeServerSubscriptionMixin, CentrifugePublicationsMixin, CentrifugePresenceMixin, + CentrifugeHistoryMixin, CentrifugeQueueMixin { /// {@macro centrifuge} Centrifuge([CentrifugeConfig? config]) @@ -604,11 +607,7 @@ base mixin CentrifugePublicationsMixin /// Mixin responsible for presence. /// {@nodoc} -base mixin CentrifugePresenceMixin - on - CentrifugeBase, - CentrifugeErrorsMixin, - CentrifugeClientSubscriptionMixin { +base mixin CentrifugePresenceMixin on CentrifugeBase, CentrifugeErrorsMixin { @override Future presence(String channel) async { try { @@ -646,6 +645,56 @@ base mixin CentrifugePresenceMixin } } +/// Mixin responsible for history. +/// {@nodoc} +base mixin CentrifugeHistoryMixin on CentrifugeBase, CentrifugeErrorsMixin { + @override + Future history( + String channel, { + int? limit, + CentrifugeStreamPosition? since, + bool? reverse, + }) async { + try { + await ready(); + return await _transport.history( + channel, + limit: limit, + since: since, + reverse: reverse, + ); + } on CentrifugeException catch (error, stackTrace) { + _emitError(error, stackTrace); + rethrow; + } on Object catch (error, stackTrace) { + final centrifugeException = CentrifugeFetchException( + message: 'Error while fetching history for channel $channel', + error: error, + ); + _emitError(centrifugeException, stackTrace); + Error.throwWithStackTrace(centrifugeException, stackTrace); + } + } + + @override + Future presenceStats(String channel) async { + try { + await ready(); + return await _transport.presenceStats(channel); + } on CentrifugeException catch (error, stackTrace) { + _emitError(error, stackTrace); + rethrow; + } on Object catch (error, stackTrace) { + final centrifugeException = CentrifugeFetchException( + message: 'Error while fetching presence for channel $channel', + error: error, + ); + _emitError(centrifugeException, stackTrace); + Error.throwWithStackTrace(centrifugeException, stackTrace); + } + } +} + /// Mixin responsible for queue. /// SHOULD BE LAST MIXIN. /// {@nodoc} diff --git a/lib/src/client/centrifuge_interface.dart b/lib/src/client/centrifuge_interface.dart index 6123359..0d1be9d 100644 --- a/lib/src/client/centrifuge_interface.dart +++ b/lib/src/client/centrifuge_interface.dart @@ -3,9 +3,11 @@ import 'dart:async'; import 'package:centrifuge_dart/centrifuge.dart'; +import 'package:centrifuge_dart/src/model/history.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; import 'package:centrifuge_dart/src/model/pushes_stream.dart'; +import 'package:centrifuge_dart/src/model/stream_position.dart'; /// Centrifuge client interface. abstract interface class ICentrifuge @@ -15,7 +17,8 @@ abstract interface class ICentrifuge ICentrifugePublicationSender, ICentrifugeEventReceiver, ICentrifugeClientSubscriptionsManager, - ICentrifugePresenceOwner { + ICentrifugePresenceOwner, + ICentrifugeHistoryOwner { /// Connect to the server. /// [url] is a URL of endpoint. Future connect(String url); @@ -37,10 +40,6 @@ abstract interface class ICentrifuge /// Send arbitrary RPC and wait for response. /* Future rpc(String method, data); */ - - /// Send History command. - /* Future history(String channel, - {int limit = 0, StreamPosition? since, bool reverse = false}); */ } /// Centrifuge client state owner interface. @@ -110,3 +109,15 @@ abstract interface class ICentrifugePresenceOwner { /// Fetch presence stats information inside a channel. Future presenceStats(String channel); } + +/// Centrifuge history owner interface. +abstract interface class ICentrifugeHistoryOwner { + /// Fetch publication history inside a channel. + /// Only for channels where history is enabled. + Future history( + String channel, { + int? limit, + CentrifugeStreamPosition? since, + bool? reverse, + }); +}