Skip to content

Commit

Permalink
add publications
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 23, 2023
1 parent 03cec36 commit ed33441
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 1 deletion.
2 changes: 2 additions & 0 deletions lib/centrifuge.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
library centrifuge;

export 'package:centrifuge_dart/src/client/centrifuge.dart' show Centrifuge;
export 'package:centrifuge_dart/src/model/client_info.dart';
export 'package:centrifuge_dart/src/model/config.dart';
export 'package:centrifuge_dart/src/model/exception.dart';
export 'package:centrifuge_dart/src/model/jwt.dart';
export 'package:centrifuge_dart/src/model/publication.dart';
export 'package:centrifuge_dart/src/model/state.dart';
export 'package:centrifuge_dart/src/model/states_stream.dart';
export 'package:centrifuge_dart/src/model/subscription.dart'
Expand Down
2 changes: 2 additions & 0 deletions lib/interface.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
library interface;

export 'package:centrifuge_dart/src/client/centrifuge_interface.dart';
export 'package:centrifuge_dart/src/model/client_info.dart';
export 'package:centrifuge_dart/src/model/config.dart';
export 'package:centrifuge_dart/src/model/exception.dart';
export 'package:centrifuge_dart/src/model/jwt.dart';
export 'package:centrifuge_dart/src/model/publication.dart';
export 'package:centrifuge_dart/src/model/state.dart';
export 'package:centrifuge_dart/src/model/states_stream.dart';
export 'package:centrifuge_dart/src/model/subscription.dart'
Expand Down
52 changes: 52 additions & 0 deletions lib/src/model/client_info.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import 'package:meta/meta.dart';

/// {@template client_info}
/// Client information.
/// {@endtemplate}
@immutable
final class CentrifugeClientInfo {
/// {@macro client_info}
const CentrifugeClientInfo({
required this.user,
required this.client,
required this.connectionInfo,
required this.channelInfo,
});

/// User
final String user;

/// Client
final String client;

/// Connection information
final List<int>? connectionInfo;

/// Channel information
final List<int>? channelInfo;

@override
int get hashCode => Object.hashAll([
user,
client,
connectionInfo,
channelInfo,
]);

@override
bool operator ==(Object other) =>
identical(this, other) ||
other is CentrifugeClientInfo &&
user == other.client &&
client == other.client &&
connectionInfo == other.connectionInfo &&
channelInfo == other.channelInfo;

@override
String toString() => 'CentrifugeClientInfo{'
'user: $user, '
'client: $client, '
'connectionInfo: ${connectionInfo == null ? 'null' : 'bytes'}, '
'channelInfo: ${channelInfo == null ? 'null' : 'bytes'}'
'}';
}
30 changes: 30 additions & 0 deletions lib/src/model/publication.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import 'package:centrifuge_dart/src/model/client_info.dart';
import 'package:fixnum/fixnum.dart' as fixnum;
import 'package:meta/meta.dart';

/// {@template publication}
/// Publication context
/// {@endtemplate}
@immutable
final class CentrifugePublication {
/// {@macro publication}
const CentrifugePublication({
required this.data,
this.offset,
this.info,
this.tags,
});

/// Publication payload
final List<int> data;

/// Optional offset inside history stream, this is an incremental number
final fixnum.Int64? offset;

/// Optional information about client connection who published this
/// (only exists if publication comes from client-side publish() API).
final CentrifugeClientInfo? info;

/// Optional tags, this is a map with string keys and string values
final Map<String, String>? tags;
}
19 changes: 19 additions & 0 deletions lib/src/model/subscription.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'package:centrifuge_dart/interface.dart';
import 'package:centrifuge_dart/src/subscription/client_subscription_controller.dart';
import 'package:fixnum/fixnum.dart' as fixnum;
import 'package:meta/meta.dart';
Expand Down Expand Up @@ -70,6 +71,15 @@ sealed class CentrifugeClientSubscription implements ICentrifugeSubscription {
@override
final String channel;

/// Stream of publications.
abstract final Stream<CentrifugePublication> publications;

/// Start subscribing to a channel
Future<void> subscribe();

/// Unsubscribe from a channel
Future<void> unsubscribe();

@override
String toString() => 'CentrifugeClientSubscription{channel: $channel}';
}
Expand Down Expand Up @@ -131,4 +141,13 @@ final class CentrifugeClientSubscriptionImpl
/// Sort of Active Record pattern for subscriptions.
/// {@nodoc}
final ClientSubscriptionController _controller;

@override
Stream<CentrifugePublication> get publications => _controller.publications;

@override
Future<void> subscribe() => _controller.subscribe();

@override
Future<void> unsubscribe() => _controller.unsubscribe();
}
36 changes: 35 additions & 1 deletion lib/src/subscription/client_subscription_controller.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import 'dart:async';

import 'package:centrifuge_dart/src/client/centrifuge_interface.dart';
import 'package:centrifuge_dart/src/model/publication.dart';
import 'package:centrifuge_dart/src/model/subscription_config.dart';
import 'package:meta/meta.dart';

Expand All @@ -18,5 +21,36 @@ final class ClientSubscriptionController {
/// {@nodoc}
final WeakReference<ICentrifuge> client;

// TODO(plugfox): implement
/// Stream of publications.
/// {@nodoc}
Stream<CentrifugePublication> get publications =>
_publicationController.stream;

/// {@nodoc}
final StreamController<CentrifugePublication> _publicationController =
StreamController<CentrifugePublication>.broadcast();

/// Start subscribing to a channel
/// {@nodoc}
Future<void> subscribe() async {
// TODO(plugfox): implement
}

/// Unsubscribe from a channel
/// {@nodoc}
Future<void> unsubscribe() async {
// TODO(plugfox): implement
}

/* publish(data) - publish data to Subscription channel
history(options) - request Subscription channel history
presence() - request Subscription channel online presence information
presenceStats() - request Subscription channel online presence stats information (number of client connections and unique users in a channel).
*/

/// {@nodoc}
@internal
void close() {
_publicationController.close().ignore();
}
}

0 comments on commit ed33441

Please sign in to comment.