Skip to content

Commit

Permalink
Add RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Aug 3, 2023
1 parent 8901a3e commit 92b6938
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 11 deletions.
35 changes: 32 additions & 3 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ final class Centrifuge extends CentrifugeBase
CentrifugePublicationsMixin,
CentrifugePresenceMixin,
CentrifugeHistoryMixin,
CentrifugeRPCMixin,
CentrifugeQueueMixin {
/// {@macro centrifuge}
Centrifuge([CentrifugeConfig? config])
Expand Down Expand Up @@ -675,18 +676,22 @@ base mixin CentrifugeHistoryMixin on CentrifugeBase, CentrifugeErrorsMixin {
Error.throwWithStackTrace(centrifugeException, stackTrace);
}
}
}

/// Mixin responsible for history.
/// {@nodoc}
base mixin CentrifugeRPCMixin on CentrifugeBase, CentrifugeErrorsMixin {
@override
Future<CentrifugePresenceStats> presenceStats(String channel) async {
Future<List<int>> rpc(String method, List<int> data) async {
try {
await ready();
return await _transport.presenceStats(channel);
return await _transport.rpc(method, data);
} on CentrifugeException catch (error, stackTrace) {
_emitError(error, stackTrace);
rethrow;
} on Object catch (error, stackTrace) {
final centrifugeException = CentrifugeFetchException(
message: 'Error while fetching presence for channel $channel',
message: 'Error while remote procedure call for method $method',
error: error,
);
_emitError(centrifugeException, stackTrace);
Expand Down Expand Up @@ -728,6 +733,30 @@ base mixin CentrifugeQueueMixin on CentrifugeBase {
() => super.presenceStats(channel),
);

@override
Future<CentrifugeHistory> history(
String channel, {
int? limit,
CentrifugeStreamPosition? since,
bool? reverse,
}) =>
_eventQueue.push<CentrifugeHistory>(
'history',
() => super.history(
channel,
limit: limit,
since: since,
reverse: reverse,
),
);

@override
Future<List<int>> rpc(String method, List<int> data) =>
_eventQueue.push<List<int>>(
'rpc',
() => super.rpc(method, data),
);

@override
Future<void> disconnect([
int code = 0,
Expand Down
12 changes: 8 additions & 4 deletions lib/src/client/centrifuge_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ abstract interface class ICentrifuge
ICentrifugeEventReceiver,
ICentrifugeClientSubscriptionsManager,
ICentrifugePresenceOwner,
ICentrifugeHistoryOwner {
ICentrifugeHistoryOwner,
ICentrifugeRemoteProcedureCall {
/// Connect to the server.
/// [url] is a URL of endpoint.
Future<void> connect(String url);
Expand All @@ -37,9 +38,6 @@ abstract interface class ICentrifuge
/// Permanent close connection to the server and
/// free all allocated resources.
Future<void> close();

/// Send arbitrary RPC and wait for response.
/* Future<void> rpc(String method, data); */
}

/// Centrifuge client state owner interface.
Expand Down Expand Up @@ -121,3 +119,9 @@ abstract interface class ICentrifugeHistoryOwner {
bool? reverse,
});
}

/// Centrifuge remote procedure call interface.
abstract interface class ICentrifugeRemoteProcedureCall {
/// Send arbitrary RPC and wait for response.
Future<List<int>> rpc(String method, List<int> data);
}
3 changes: 3 additions & 0 deletions lib/src/transport/transport_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ abstract interface class ICentrifugeTransport {
String token,
);

/// Send arbitrary RPC and wait for response.
Future<List<int>> rpc(String method, List<int> data);

/// Permanent close connection to the server and
/// free all allocated resources.
/// {@nodoc}
Expand Down
16 changes: 12 additions & 4 deletions lib/src/transport/ws_protobuf_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ base mixin CentrifugeWSPBConnectionMixin
channel: channel,
positioned: positioned,
recoverable: recoverable,
data: sub.hasData() ? sub.data : <int>[],
data: sub.hasData() ? sub.data : const <int>[],
streamPosition:
(positioned || recoverable) && sub.hasOffset() && sub.hasEpoch()
? (offset: sub.offset, epoch: sub.epoch)
Expand Down Expand Up @@ -619,7 +619,7 @@ base mixin CentrifugeWSPBHandlerMixin
CentrifugeMessage(
timestamp: now,
channel: channel,
data: push.message.hasData() ? push.message.data : <int>[],
data: push.message.hasData() ? push.message.data : const <int>[],
),
);
} else if (push.hasJoin()) {
Expand Down Expand Up @@ -649,7 +649,7 @@ base mixin CentrifugeWSPBHandlerMixin
channel: channel,
positioned: positioned,
recoverable: recoverable,
data: push.subscribe.hasData() ? push.subscribe.data : <int>[],
data: push.subscribe.hasData() ? push.subscribe.data : const <int>[],
streamPosition: (positioned || recoverable) &&
push.subscribe.hasOffset() &&
push.subscribe.hasEpoch()
Expand All @@ -674,7 +674,7 @@ base mixin CentrifugeWSPBHandlerMixin
CentrifugeConnect(
timestamp: now,
channel: channel,
data: push.message.hasData() ? push.message.data : <int>[],
data: push.message.hasData() ? push.message.data : const <int>[],
client: connect.hasClient() ? connect.client : '',
version: connect.hasVersion() ? connect.version : '',
ttl: expires ? now.add(Duration(seconds: connect.ttl)) : null,
Expand Down Expand Up @@ -905,6 +905,14 @@ base mixin CentrifugeWSPBSubscription
);
},
);

@override
Future<List<int>> rpc(String method, List<int> data) => _sendMessage(
pb.RPCRequest()
..method = method
..data = data,
pb.RPCResult())
.then<List<int>>((r) => r.hasData() ? r.data : const <int>[]);
}

/// To maintain connection alive and detect broken connections
Expand Down

0 comments on commit 92b6938

Please sign in to comment.