Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 23, 2023
1 parent 3a74832 commit f38b98f
Show file tree
Hide file tree
Showing 11 changed files with 499 additions and 44 deletions.
95 changes: 80 additions & 15 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,60 @@
{
"label": "Start Centrifugo Server",
"type": "shell",
"command": "docker",
"args": [
"run",
"-d",
"--rm",
"--ulimit=nofile=65536:65536",
"-p=8000:8000",
"--name=centrifugo",
"centrifugo/centrifugo:latest",
"centrifugo",
"--client_insecure",
"--admin",
"--admin_insecure",
"--log_level=debug"
],
"windows": {
"command": "docker",
"args": [
"run",
"-d",
"--rm",
"--ulimit=nofile=65536:65536",
"-p=8000:8000/tcp",
"--volume=${PWD}/config.json:/centrifugo/config.json:ro",
"--name=centrifugo",
"centrifugo/centrifugo:latest",
"centrifugo",
//"--client_insecure",
"--admin",
"--admin_insecure",
"--log_level=debug"
]
},
"linux": {
"command": "docker",
"args": [
"run",
"-d",
"--rm",
"--ulimit=nofile=65536:65536",
"-p=8000:8000/tcp",
"--volume=${PWD}/config.json:/centrifugo/config.json:ro",
"--name=centrifugo",
"centrifugo/centrifugo:latest",
"centrifugo",
//"--client_insecure",
"--admin",
"--admin_insecure",
"--log_level=debug"
]
},
"osx": {
"command": "docker",
"args": [
"run",
"-d",
"--rm",
"--ulimit=nofile=65536:65536",
"-p=8000:8000/tcp",
"--volume=${PWD}/config.json:/centrifugo/config.json:ro",
"--name=centrifugo",
"centrifugo/centrifugo:latest",
"centrifugo",
"--client_insecure",
"--admin",
"--admin_insecure",
"--log_level=debug"
]
},
"group": {
"kind": "none",
"isDefault": true
Expand All @@ -53,6 +92,32 @@
"reveal": "always",
"panel": "dedicated"
}
},
{
"label": "Generate new user token",
"type": "shell",
"command": "docker",
"args": [
"run",
"-it",
"--rm",
"--volume=${PWD}/config.json:/centrifugo/config.json:ro",
"--name=centrifugo-cli",
"centrifugo/centrifugo:latest",
"centrifugo",
"gentoken",
"--user=dart",
"--ttl=604800000" // 604800
],
"group": {
"kind": "none",
"isDefault": true
},
"problemMatcher": [],
"presentation": {
"reveal": "always",
"panel": "dedicated"
}
}
]
}
4 changes: 2 additions & 2 deletions examples/console/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ Bash:
```bash
docker run -it --rm --volume ${PWD}/config.json:/centrifugo/config.json:ro \
--name centrifugo-cli centrifugo/centrifugo:latest \
centrifugo gensubtoken --user dart --channel chat:index
centrifugo gentoken --user dart
```

PowerShell:

```powershell
docker run -it --rm --volume ${PWD}/config.json:/centrifugo/config.json:ro `
--name centrifugo-cli centrifugo/centrifugo:latest `
centrifugo gensubtoken --user dart --channel chat:index
centrifugo gentoken --user dart
```

### Run Centrifugo
Expand Down
28 changes: 25 additions & 3 deletions examples/console/bin/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,44 @@ import 'dart:io' as io show exit, Platform;
import 'package:args/args.dart' show ArgParser;
import 'package:centrifuge_dart/centrifuge.dart';

const url = 'ws://localhost:8000/connection/websocket?format=protobuf';

void main([List<String>? args]) {
final options = _extractOptions(args ?? const <String>[]);
runZonedGuarded<void>(
() async {
// Create centrifuge client.
final client = Centrifuge(
CentrifugeConfig(
client: (
name: 'Centrifuge Console Example',
version: '0.0.1',
),
getToken: () =>
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkYXJ0IiwiZXhwIj'
'oyMjk0OTE1MTMyLCJpYXQiOjE2OTAxMTUxMzJ9.hIGDXKn-eMdsdj57wn6-4y5p'
'k0tZcKoJCu0qxuuWSoQ',
),
);
await client
.connect('ws://localhost:8000/connection/websocket?format=protobuf');

await Future<void>.delayed(const Duration(seconds: 3));
// Connect to centrifuge server using provided URL.
await client.connect(url);

// Output current client state.
print('Current state after connect: ${client.state}');

// State changes.
// Or you can observe specific state changes.
// e.g. `client.states.connected`
client.states.listen((state) => print('State changed to: $state'));

// Handle all centrifuge errors.
client.errors.listen(
(error) => print(
'Exception: ${error.exception}, '
'Stack trace: ${error.stackTrace}',
),
);

// TODO(plugfox): Read from stdin and send to channel.

Expand Down
11 changes: 6 additions & 5 deletions lib/centrifuge.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
library centrifuge;

export 'src/client/centrifuge.dart' show Centrifuge;
export 'src/model/config.dart';
export 'src/model/exception.dart';
export 'src/model/jwt.dart';
export 'src/model/state.dart';
export 'package:centrifuge_dart/src/client/centrifuge.dart' show Centrifuge;
export 'package:centrifuge_dart/src/model/config.dart';
export 'package:centrifuge_dart/src/model/exception.dart';
export 'package:centrifuge_dart/src/model/jwt.dart';
export 'package:centrifuge_dart/src/model/state.dart';
export 'package:centrifuge_dart/src/model/states_stream.dart';
54 changes: 41 additions & 13 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'package:centrifuge_dart/src/client/centrifuge_interface.dart';
import 'package:centrifuge_dart/src/model/config.dart';
import 'package:centrifuge_dart/src/model/exception.dart';
import 'package:centrifuge_dart/src/model/state.dart';
import 'package:centrifuge_dart/src/model/states_stream.dart';
import 'package:centrifuge_dart/src/transport/transport_interface.dart';
import 'package:centrifuge_dart/src/transport/ws_protobuf_transport.dart';
import 'package:centrifuge_dart/src/util/logger.dart' as logger;
Expand All @@ -12,7 +13,8 @@ import 'package:meta/meta.dart';
/// {@template centrifuge}
/// Centrifuge client.
/// {@endtemplate}
final class Centrifuge extends CentrifugeBase with CentrifugeConnectionMixin {
final class Centrifuge extends CentrifugeBase
with CentrifugeErrorsMixin, CentrifugeConnectionMixin {
/// {@macro centrifuge}
Centrifuge([CentrifugeConfig? config])
: super(config ?? CentrifugeConfig.defaultConfig());
Expand Down Expand Up @@ -45,7 +47,8 @@ abstract base class CentrifugeBase implements ICentrifuge {
CentrifugeState get state => _transport.state;

@override
Stream<CentrifugeState> get states => _transport.states;
late final CentrifugeStatesStream states =
CentrifugeStatesStream(_transport.states);

/// Centrifuge config.
/// {@nodoc}
Expand All @@ -67,19 +70,44 @@ abstract base class CentrifugeBase implements ICentrifuge {
/// Mixin responsible for connection.
/// {@nodoc}
@internal
base mixin CentrifugeConnectionMixin on CentrifugeBase {
base mixin CentrifugeErrorsMixin on CentrifugeBase {
@protected
@nonVirtual
void _emitError(CentrifugeException exception, StackTrace stackTrace) =>
_errorsController.add((exception: exception, stackTrace: stackTrace));

late final StreamController<
({CentrifugeException exception, StackTrace stackTrace})>
_errorsController = StreamController<
({CentrifugeException exception, StackTrace stackTrace})>.broadcast();

@override
late final Stream<({CentrifugeException exception, StackTrace stackTrace})>
errors = _errorsController.stream;

@override
Future<void> close() async {
await super.close();
_errorsController.close().ignore();
}
}

/// Mixin responsible for connection.
/// {@nodoc}
@internal
base mixin CentrifugeConnectionMixin on CentrifugeBase, CentrifugeErrorsMixin {
@override
Future<void> connect(String url) async {
logger.fine('Interactively connecting to $url');
try {
await _transport.connect(url);
} on CentrifugeException {
} on CentrifugeException catch (error, stackTrace) {
_emitError(error, stackTrace);
rethrow;
} on Object catch (error, stackTrace) {
Error.throwWithStackTrace(
CentrifugeConnectionException(error),
stackTrace,
);
final centrifugeException = CentrifugeConnectionException(error);
_emitError(centrifugeException, stackTrace);
Error.throwWithStackTrace(centrifugeException, stackTrace);
}
}

Expand All @@ -88,13 +116,13 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase {
logger.fine('Interactively disconnecting');
try {
await _transport.disconnect(0, 'Disconnect called');
} on CentrifugeException {
} on CentrifugeException catch (error, stackTrace) {
_emitError(error, stackTrace);
rethrow;
} on Object catch (error, stackTrace) {
Error.throwWithStackTrace(
CentrifugeDisconnectionException(error),
stackTrace,
);
final centrifugeException = CentrifugeConnectionException(error);
_emitError(centrifugeException, stackTrace);
Error.throwWithStackTrace(centrifugeException, stackTrace);
}
}

Expand Down
8 changes: 6 additions & 2 deletions lib/src/client/centrifuge_interface.dart
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import 'package:centrifuge_dart/src/model/state.dart';
import 'package:centrifuge_dart/centrifuge.dart';

/// Centrifuge client interface.
abstract interface class ICentrifuge {
/// State of client.
CentrifugeState get state;

/// Stream of client states.
abstract final Stream<CentrifugeState> states;
abstract final CentrifugeStatesStream states;

/// Stream of errors.
abstract final Stream<
({CentrifugeException exception, StackTrace stackTrace})> errors;

/* abstract final Stream<Object> publications; */

Expand Down
13 changes: 13 additions & 0 deletions lib/src/model/state.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:convert';

import 'package:meta/meta.dart';

/// {@template state}
Expand Down Expand Up @@ -29,6 +31,7 @@ import 'package:meta/meta.dart';
/// Also, this can happen due to server advice from a server,
/// or due to a terminal problem that happened on the client-side.
/// {@endtemplate}
@immutable
sealed class CentrifugeState extends _$CentrifugeStateBase {
/// {@macro state}
const CentrifugeState(super.timestamp);
Expand Down Expand Up @@ -59,6 +62,7 @@ sealed class CentrifugeState extends _$CentrifugeStateBase {
bool? sendPong,
String? session,
String? node,
List<int>? data,
}) = CentrifugeState$Connected;

/// Permanently closed
Expand Down Expand Up @@ -111,6 +115,10 @@ sealed class CentrifugeState extends _$CentrifugeStateBase {
},
session: json['session']?.toString(),
node: json['node']?.toString(),
data: switch (json['data']) {
String data when data.isNotEmpty => base64Decode(data),
_ => null,
},
),
('closed', int timestamp, _) => CentrifugeState.closed(
timestamp: DateTime.fromMicrosecondsSinceEpoch(timestamp),
Expand Down Expand Up @@ -261,6 +269,7 @@ final class CentrifugeState$Connected extends CentrifugeState
this.sendPong,
this.session,
this.node,
this.data,
}) : super(timestamp ?? DateTime.now());

@override
Expand Down Expand Up @@ -296,6 +305,9 @@ final class CentrifugeState$Connected extends CentrifugeState
/// Server node ID.
final String? node;

/// Additional data returned from server on connect.
final List<int>? data;

@override
bool get isDisconnected => false;

Expand Down Expand Up @@ -328,6 +340,7 @@ final class CentrifugeState$Connected extends CentrifugeState
if (sendPong != null) 'sendPong': sendPong,
if (session != null) 'session': session,
if (node != null) 'node': node,
if (data != null) 'data': base64Encode(data!),
};

@override
Expand Down
Loading

0 comments on commit f38b98f

Please sign in to comment.