diff --git a/lib/src/client/centrifuge.dart b/lib/src/client/centrifuge.dart index 9d473ac..e54d42e 100644 --- a/lib/src/client/centrifuge.dart +++ b/lib/src/client/centrifuge.dart @@ -48,6 +48,7 @@ final class Centrifuge extends CentrifugeBase CentrifugePublicationsMixin, CentrifugePresenceMixin, CentrifugeHistoryMixin, + CentrifugeRPCMixin, CentrifugeQueueMixin { /// {@macro centrifuge} Centrifuge([CentrifugeConfig? config]) @@ -675,18 +676,22 @@ base mixin CentrifugeHistoryMixin on CentrifugeBase, CentrifugeErrorsMixin { Error.throwWithStackTrace(centrifugeException, stackTrace); } } +} +/// Mixin responsible for history. +/// {@nodoc} +base mixin CentrifugeRPCMixin on CentrifugeBase, CentrifugeErrorsMixin { @override - Future presenceStats(String channel) async { + Future> rpc(String method, List data) async { try { await ready(); - return await _transport.presenceStats(channel); + return await _transport.rpc(method, data); } on CentrifugeException catch (error, stackTrace) { _emitError(error, stackTrace); rethrow; } on Object catch (error, stackTrace) { final centrifugeException = CentrifugeFetchException( - message: 'Error while fetching presence for channel $channel', + message: 'Error while remote procedure call for method $method', error: error, ); _emitError(centrifugeException, stackTrace); @@ -728,6 +733,30 @@ base mixin CentrifugeQueueMixin on CentrifugeBase { () => super.presenceStats(channel), ); + @override + Future history( + String channel, { + int? limit, + CentrifugeStreamPosition? since, + bool? reverse, + }) => + _eventQueue.push( + 'history', + () => super.history( + channel, + limit: limit, + since: since, + reverse: reverse, + ), + ); + + @override + Future> rpc(String method, List data) => + _eventQueue.push>( + 'rpc', + () => super.rpc(method, data), + ); + @override Future disconnect([ int code = 0, diff --git a/lib/src/client/centrifuge_interface.dart b/lib/src/client/centrifuge_interface.dart index 0d1be9d..6e8f8e6 100644 --- a/lib/src/client/centrifuge_interface.dart +++ b/lib/src/client/centrifuge_interface.dart @@ -18,7 +18,8 @@ abstract interface class ICentrifuge ICentrifugeEventReceiver, ICentrifugeClientSubscriptionsManager, ICentrifugePresenceOwner, - ICentrifugeHistoryOwner { + ICentrifugeHistoryOwner, + ICentrifugeRemoteProcedureCall { /// Connect to the server. /// [url] is a URL of endpoint. Future connect(String url); @@ -37,9 +38,6 @@ abstract interface class ICentrifuge /// Permanent close connection to the server and /// free all allocated resources. Future close(); - - /// Send arbitrary RPC and wait for response. - /* Future rpc(String method, data); */ } /// Centrifuge client state owner interface. @@ -121,3 +119,9 @@ abstract interface class ICentrifugeHistoryOwner { bool? reverse, }); } + +/// Centrifuge remote procedure call interface. +abstract interface class ICentrifugeRemoteProcedureCall { + /// Send arbitrary RPC and wait for response. + Future> rpc(String method, List data); +} diff --git a/lib/src/transport/transport_interface.dart b/lib/src/transport/transport_interface.dart index 4ad628a..9aed8e6 100644 --- a/lib/src/transport/transport_interface.dart +++ b/lib/src/transport/transport_interface.dart @@ -96,6 +96,9 @@ abstract interface class ICentrifugeTransport { String token, ); + /// Send arbitrary RPC and wait for response. + Future> rpc(String method, List data); + /// Permanent close connection to the server and /// free all allocated resources. /// {@nodoc} diff --git a/lib/src/transport/ws_protobuf_transport.dart b/lib/src/transport/ws_protobuf_transport.dart index f5f689c..8681a54 100644 --- a/lib/src/transport/ws_protobuf_transport.dart +++ b/lib/src/transport/ws_protobuf_transport.dart @@ -382,7 +382,7 @@ base mixin CentrifugeWSPBConnectionMixin channel: channel, positioned: positioned, recoverable: recoverable, - data: sub.hasData() ? sub.data : [], + data: sub.hasData() ? sub.data : const [], streamPosition: (positioned || recoverable) && sub.hasOffset() && sub.hasEpoch() ? (offset: sub.offset, epoch: sub.epoch) @@ -619,7 +619,7 @@ base mixin CentrifugeWSPBHandlerMixin CentrifugeMessage( timestamp: now, channel: channel, - data: push.message.hasData() ? push.message.data : [], + data: push.message.hasData() ? push.message.data : const [], ), ); } else if (push.hasJoin()) { @@ -649,7 +649,7 @@ base mixin CentrifugeWSPBHandlerMixin channel: channel, positioned: positioned, recoverable: recoverable, - data: push.subscribe.hasData() ? push.subscribe.data : [], + data: push.subscribe.hasData() ? push.subscribe.data : const [], streamPosition: (positioned || recoverable) && push.subscribe.hasOffset() && push.subscribe.hasEpoch() @@ -674,7 +674,7 @@ base mixin CentrifugeWSPBHandlerMixin CentrifugeConnect( timestamp: now, channel: channel, - data: push.message.hasData() ? push.message.data : [], + data: push.message.hasData() ? push.message.data : const [], client: connect.hasClient() ? connect.client : '', version: connect.hasVersion() ? connect.version : '', ttl: expires ? now.add(Duration(seconds: connect.ttl)) : null, @@ -905,6 +905,14 @@ base mixin CentrifugeWSPBSubscription ); }, ); + + @override + Future> rpc(String method, List data) => _sendMessage( + pb.RPCRequest() + ..method = method + ..data = data, + pb.RPCResult()) + .then>((r) => r.hasData() ? r.data : const []); } /// To maintain connection alive and detect broken connections