Skip to content

Commit

Permalink
Unsubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 28, 2023
1 parent 6d7b092 commit 79e0f88
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 2 deletions.
5 changes: 4 additions & 1 deletion lib/src/client/disconnect_code.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ enum DisconnectCode {
messageSizeLimit(3, 'message size limit exceeded'),

/// Timeout
timeout(4, 'timeout exceeded');
timeout(4, 'timeout exceeded'),

/// Unsubscribe error
unsubscribeError(5, 'unsubscribe error');

/// {@nodoc}
const DisconnectCode(this.code, this.reason);
Expand Down
19 changes: 18 additions & 1 deletion lib/src/subscription/client_subscription_impl.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:async';

import 'package:centrifuge_dart/centrifuge.dart';
import 'package:centrifuge_dart/src/client/disconnect_code.dart';
import 'package:centrifuge_dart/src/subscription/subscription_states_stream.dart';
import 'package:centrifuge_dart/src/transport/transport_interface.dart';
import 'package:centrifuge_dart/src/util/event_queue.dart';
Expand Down Expand Up @@ -244,7 +245,23 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
reason: reason,
since: state.since,
));
// TODO(plugfox): implement
try {
await _transport.unsubscribe(channel, _config);
} on Object catch (error, stackTrace) {
final centrifugeException = CentrifugeSubscriptionException(
message: 'Error while unsubscribing',
channel: channel,
error: error,
);
_emitError(centrifugeException, stackTrace);
_transport
.disconnect(
DisconnectCode.unsubscribeError.code,
DisconnectCode.unsubscribeError.reason,
)
.ignore();
Error.throwWithStackTrace(centrifugeException, stackTrace);
}
}

@override
Expand Down
9 changes: 9 additions & 0 deletions lib/src/transport/transport_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,24 @@ abstract interface class ICentrifugeTransport {
/// Send asynchronous message to a server. This method makes sense
/// only when using Centrifuge library for Go on a server side. In Centrifuge
/// asynchronous message handler does not exist.
/// {@nodoc}
Future<void> sendAsyncMessage(List<int> data);

/// Subscribe on channel with optional [since] position.
/// {@nodoc}
Future<SubcibedOnChannel> subscribe(
String channel,
CentrifugeSubscriptionConfig config,
CentrifugeStreamPosition? since,
);

/// Unsubscribe from channel.
/// {@nodoc}
Future<void> unsubscribe(
String channel,
CentrifugeSubscriptionConfig config,
);

/// Disconnect from the server.
/// e.g. code: 0, reason: 'disconnect called'
/// {@nodoc}
Expand Down
9 changes: 9 additions & 0 deletions lib/src/transport/ws_protobuf_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,15 @@ base mixin CentrifugeWSPBSubscription
data: result.hasData() ? result.data : null,
);
}

@override
Future<void> unsubscribe(
String channel,
CentrifugeSubscriptionConfig config,
) async {
final request = pb.UnsubscribeRequest()..channel = channel;
await _sendMessage(request, pb.UnsubscribeResult()).timeout(config.timeout);
}
}

/// To maintain connection alive and detect broken connections
Expand Down

0 comments on commit 79e0f88

Please sign in to comment.