Skip to content

Commit

Permalink
Publish
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 28, 2023
1 parent 6a7bf8a commit 4203cb3
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 116 deletions.
101 changes: 91 additions & 10 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import 'dart:async';

import 'package:centrifuge_dart/src/client/centrifuge_interface.dart';
import 'package:centrifuge_dart/src/client/config.dart';
import 'package:centrifuge_dart/src/client/disconnect_code.dart';
import 'package:centrifuge_dart/src/client/state.dart';
import 'package:centrifuge_dart/src/client/states_stream.dart';
import 'package:centrifuge_dart/src/model/exception.dart';
import 'package:centrifuge_dart/src/model/publication.dart';
import 'package:centrifuge_dart/src/subscription/client_subscription_manager.dart';
import 'package:centrifuge_dart/src/subscription/subscription.dart';
import 'package:centrifuge_dart/src/subscription/subscription_config.dart';
Expand All @@ -25,6 +27,7 @@ final class Centrifuge extends CentrifugeBase
CentrifugeConnectionMixin,
CentrifugeSendMixin,
CentrifugeClientSubscriptionMixin,
CentrifugePublicationsMixin,
CentrifugeQueueMixin {
/// {@macro centrifuge}
Centrifuge([CentrifugeConfig? config])
Expand Down Expand Up @@ -93,21 +96,22 @@ abstract base class CentrifugeBase implements ICentrifuge {
/// {@nodoc}
@internal
base mixin CentrifugeStateMixin on CentrifugeBase {
@protected
@nonVirtual
late CentrifugeState _state = _transport.states.value;

@override
@nonVirtual
CentrifugeState get state => _state;

@nonVirtual
@protected
late CentrifugeState _state;

@override
@nonVirtual
late final CentrifugeStatesStream states =
CentrifugeStatesStream(_statesController.stream);

@override
void _initCentrifuge() {
_state = _transport.state;
_transport.states.addListener(_onStateChange);
super._initCentrifuge();
}
Expand Down Expand Up @@ -187,7 +191,10 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase, CentrifugeErrorsMixin {
_emitError(error, stackTrace);
rethrow;
} on Object catch (error, stackTrace) {
final centrifugeException = CentrifugeConnectionException(error);
final centrifugeException = CentrifugeConnectionException(
message: 'Error while connecting to $url',
error: error,
);
_emitError(centrifugeException, stackTrace);
Error.throwWithStackTrace(centrifugeException, stackTrace);
}
Expand All @@ -198,23 +205,36 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase, CentrifugeErrorsMixin {
try {
switch (state) {
case CentrifugeState$Disconnected _:
throw const CentrifugeDisconnectionException(
throw const CentrifugeConnectionException(
message: 'Client is not connected',
);
case CentrifugeState$Closed _:
throw const CentrifugeDisconnectionException(
throw const CentrifugeConnectionException(
message: 'Client is permanently closed',
);
case CentrifugeState$Connected _:
return;
case CentrifugeState$Connecting _:
await states.connected.first.timeout(_config.timeout);
}
} on TimeoutException catch (error, stackTrace) {
_transport
.disconnect(
DisconnectCode.timeout.code,
DisconnectCode.timeout.reason,
)
.ignore();
final centrifugeException = CentrifugeConnectionException(
message: 'Timeout exception while waiting for connection',
error: error,
);
_emitError(centrifugeException, stackTrace);
Error.throwWithStackTrace(centrifugeException, stackTrace);
} on CentrifugeException catch (error, stackTrace) {
_emitError(error, stackTrace);
rethrow;
} on Object catch (error, stackTrace) {
final centrifugeException = CentrifugeDisconnectionException(
final centrifugeException = CentrifugeConnectionException(
message: 'Client is not connected',
error: error,
);
Expand All @@ -232,7 +252,10 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase, CentrifugeErrorsMixin {
_emitError(error, stackTrace);
rethrow;
} on Object catch (error, stackTrace) {
final centrifugeException = CentrifugeConnectionException(error);
final centrifugeException = CentrifugeConnectionException(
message: 'Error while disconnecting',
error: error,
);
_emitError(centrifugeException, stackTrace);
Error.throwWithStackTrace(centrifugeException, stackTrace);
}
Expand All @@ -255,7 +278,8 @@ base mixin CentrifugeSendMixin on CentrifugeBase, CentrifugeErrorsMixin {
try {
await ready();
await _transport.sendAsyncMessage(data);
} on CentrifugeException {
} on CentrifugeException catch (error, stackTrace) {
_emitError(error, stackTrace);
rethrow;
} on Object catch (error, stackTrace) {
final centrifugeException = CentrifugeSendException(error: error);
Expand Down Expand Up @@ -327,6 +351,63 @@ base mixin CentrifugeClientSubscriptionMixin
}
}

/// Mixin responsible for client-side subscriptions.
/// {@nodoc}
@internal
base mixin CentrifugePublicationsMixin
on
CentrifugeBase,
CentrifugeErrorsMixin,
CentrifugeClientSubscriptionMixin {
@override
void _initCentrifuge() {
super._initCentrifuge();
_transport.publications.addListener(_onPublication);
// TODO(plugfox): notify subscriptions
}

@protected
@nonVirtual
final StreamController<CentrifugePublication> _publicationsController =
StreamController<CentrifugePublication>.broadcast();

@override
@nonVirtual
late final Stream<CentrifugePublication> publications =
_publicationsController.stream;

@override
Future<void> publish(String channel, List<int> data) async {
try {
await ready();
await _transport.publish(channel, data);
} on CentrifugeException catch (error, stackTrace) {
_emitError(error, stackTrace);
rethrow;
} on Object catch (error, stackTrace) {
final centrifugeException = CentrifugeSendException(
message: 'Error while publishing to channel $channel',
error: error,
);
_emitError(centrifugeException, stackTrace);
Error.throwWithStackTrace(centrifugeException, stackTrace);
}
}

@protected
@nonVirtual
@pragma('vm:prefer-inline')
@pragma('dart2js:tryInline')
void _onPublication(CentrifugePublication publication) =>
_publicationsController.add(publication);

@override
Future<void> close() => super.close().whenComplete(() {
_transport.publications.removeListener(_onPublication);
_publicationsController.close().ignore();
});
}

/// Mixin responsible for queue.
/// SHOULD BE LAST MIXIN.
/// {@nodoc}
Expand Down
14 changes: 14 additions & 0 deletions lib/src/client/centrifuge_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ abstract interface class ICentrifuge
implements
ICentrifugeStateOwner,
ICentrifugeAsyncMessageSender,
ICentrifugePublicationSender,
ICentrifugePublicationReceiver,
ICentrifugeClientSubscriptionsManager {
/// Stream of errors.
abstract final Stream<
Expand Down Expand Up @@ -58,6 +60,18 @@ abstract interface class ICentrifugeStateOwner {
abstract final CentrifugeStatesStream states;
}

/// Centrifuge send publication interface.
abstract interface class ICentrifugePublicationSender {
/// Publish data to specific subscription channel
Future<void> publish(String channel, List<int> data);
}

/// Centrifuge receive publication interface.
abstract interface class ICentrifugePublicationReceiver {
/// Stream of publications.
abstract final Stream<CentrifugePublication> publications;
}

/// Centrifuge send asynchronous message interface.
abstract interface class ICentrifugeAsyncMessageSender {
/// Send asynchronous message to a server. This method makes sense
Expand Down
14 changes: 1 addition & 13 deletions lib/src/model/exception.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,7 @@ sealed class CentrifugeException implements Exception {
/// {@category Exception}
final class CentrifugeConnectionException extends CentrifugeException {
/// {@macro exception}
const CentrifugeConnectionException([Object? error])
: super(
'centrifuge_connection_exception',
'Connection problem',
error,
);
}

/// {@macro exception}
/// {@category Exception}
final class CentrifugeDisconnectionException extends CentrifugeException {
/// {@macro exception}
const CentrifugeDisconnectionException({String? message, Object? error})
const CentrifugeConnectionException({String? message, Object? error})
: super(
'centrifuge_disconnection_exception',
message ?? 'Connection problem',
Expand Down
4 changes: 4 additions & 0 deletions lib/src/model/publication.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ import 'package:meta/meta.dart';
final class CentrifugePublication {
/// {@macro publication}
const CentrifugePublication({
required this.channel,
required this.data,
this.offset,
this.info,
this.tags,
});

/// Publication channel
final String channel;

/// Publication payload
final List<int> data;

Expand Down
Loading

0 comments on commit 4203cb3

Please sign in to comment.