Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Add onActiveStateChanged callback to Connection. (#13)
Browse files Browse the repository at this point in the history
The callback is invoked when the connection goes from idle (0 active streams) to active (at least 1 active stream), and when the connection goes from active to idle.

This can be used to implement an idle connection timeout.
  • Loading branch information
jakobr-google authored Oct 2, 2017
1 parent 35dde73 commit e787697
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 20 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 0.1.4

* Added an `onActiveStateChanged` callback to `Connection`, which is invoked when
the connection changes state from idle to active or from active to idle. This
can be used to implement an idle connection timeout.

## 0.1.3

* Fixed a bug where a closed window would not open correctly due to an increase
Expand Down
15 changes: 13 additions & 2 deletions lib/src/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ abstract class Connection {
/// Whether this connection is a client connection.
final bool isClientConnection;

/// Active state handler for this connection.
void Function(bool isActive) onActiveStateChanged;

/// The HPack context for this connection.
final HPackContext _hpackContext = new HPackContext();

Expand Down Expand Up @@ -188,11 +191,13 @@ abstract class Connection {
if (isClientConnection) {
_streams = new StreamHandler.client(
_frameWriter, _incomingQueue, _outgoingQueue,
_settingsHandler.peerSettings, _settingsHandler.acknowledgedSettings);
_settingsHandler.peerSettings, _settingsHandler.acknowledgedSettings,
_activeStateHandler);
} else {
_streams = new StreamHandler.server(
_frameWriter, _incomingQueue, _outgoingQueue,
_settingsHandler.peerSettings, _settingsHandler.acknowledgedSettings);
_settingsHandler.peerSettings, _settingsHandler.acknowledgedSettings,
_activeStateHandler);
}

// NOTE: We're not waiting until initial settings have been exchanged
Expand Down Expand Up @@ -257,6 +262,12 @@ abstract class Connection {
return _terminate(ErrorCode.NO_ERROR);
}

void _activeStateHandler(bool isActive) {
if (onActiveStateChanged != null) {
onActiveStateChanged(isActive);
}
}

/// Invokes the passed in closure and catches any exceptions.
void _catchProtocolErrors(void fn()) {
try {
Expand Down
43 changes: 29 additions & 14 deletions lib/src/streams/stream_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -149,28 +149,35 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {

bool get ranOutOfStreamIds => _ranOutOfStreamIds();

final void Function(bool isActive) _onActiveStateChanged;

StreamHandler._(this._frameWriter, this.incomingQueue, this.outgoingQueue,
this._peerSettings, this._localSettings,
this.nextStreamId, this.lastRemoteStreamId);

factory StreamHandler.client(FrameWriter writer,
ConnectionMessageQueueIn incomingQueue,
ConnectionMessageQueueOut outgoingQueue,
ActiveSettings peerSettings,
ActiveSettings localSettings) {
this._onActiveStateChanged, this.nextStreamId,
this.lastRemoteStreamId);

factory StreamHandler.client(
FrameWriter writer,
ConnectionMessageQueueIn incomingQueue,
ConnectionMessageQueueOut outgoingQueue,
ActiveSettings peerSettings,
ActiveSettings localSettings,
void Function(bool isActive) onActiveStateChanged) {
return new StreamHandler._(
writer, incomingQueue, outgoingQueue, peerSettings, localSettings,
1, 0);
onActiveStateChanged, 1, 0);
}

factory StreamHandler.server(FrameWriter writer,
ConnectionMessageQueueIn incomingQueue,
ConnectionMessageQueueOut outgoingQueue,
ActiveSettings peerSettings,
ActiveSettings localSettings) {
factory StreamHandler.server(
FrameWriter writer,
ConnectionMessageQueueIn incomingQueue,
ConnectionMessageQueueOut outgoingQueue,
ActiveSettings peerSettings,
ActiveSettings localSettings,
void Function(bool isActive) onActiveStateChanged) {
return new StreamHandler._(
writer, incomingQueue, outgoingQueue, peerSettings, localSettings,
2, -1);
onActiveStateChanged, 2, -1);
}

void onTerminated(exception) {
Expand Down Expand Up @@ -298,6 +305,7 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {
var stream = new Http2StreamImpl(
streamQueueIn, streamQueueOut, _outgoingC, streamId, windowOutHandler,
this._canPush, this._push, this._terminateStream);
final wasIdle = _openStreams.isEmpty;
_openStreams[stream.id] = stream;

_setupOutgoingMessageHandling(stream);
Expand All @@ -309,6 +317,10 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {
_cleanupClosedStream(stream);
});

if (wasIdle) {
_onActiveStateChanged(true);
}

return stream;
}

Expand Down Expand Up @@ -707,6 +719,9 @@ class StreamHandler extends Object with TerminatableMixin, ClosableMixin {
if (stream.state != StreamState.Terminated) {
_changeState(stream, StreamState.Terminated);
}
if (_openStreams.isEmpty) {
_onActiveStateChanged(false);
}
onCheckForClose();
}

Expand Down
10 changes: 9 additions & 1 deletion lib/transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ abstract class TransportConnection {
/// Pings the other end.
Future ping();

/// Sets the active state callback.
///
/// This callback is invoked with [true] when the number of active streams
/// goes from 0 to 1 (the connection goes from idle to active), and with
/// [false] when the number of active streams becomes 0 (the connection goes
/// from active to idle).
set onActiveStateChanged(void Function(bool isActive) callback);

/// Finish this connection.
///
/// No new streams will be accepted or can be created.
Expand Down Expand Up @@ -210,7 +218,7 @@ abstract class TransportStream {
/// A sink for writing data and/or headers to the remote end.
StreamSink<StreamMessage> get outgoingMessages;

/// Set the termination handler on this stream.
/// Sets the termination handler on this stream.
///
/// The handler will be called if the stream receives an RST_STREAM frame.
set onTerminated(void value(int));
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: http2
version: 0.1.3
version: 0.1.4
description: A HTTP/2 implementation in Dart.
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/http2
Expand Down
64 changes: 62 additions & 2 deletions test/transport_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ main() {
transportTest('server-terminates-stream',
(ClientTransportConnection client,
ServerTransportConnection server) async {

Future serverFun() async {
await for (ServerTransportStream stream in server.incomingStreams) {
stream.terminate();
Expand Down Expand Up @@ -267,7 +266,6 @@ main() {
transportTest('server-terminates-stream-after-half-close',
(ClientTransportConnection client,
ServerTransportConnection server) async {

var readyForError = new Completer();

Future serverFun() async {
Expand Down Expand Up @@ -299,6 +297,68 @@ main() {
await Future.wait([serverFun(), clientFun()]);
});

transportTest('idle-handler',
(ClientTransportConnection client,
ServerTransportConnection server) async {
Future serverFun() async {
int activeCount = 0;
int idleCount = 0;
server.onActiveStateChanged = expectAsync1((active) {
if (active) {
activeCount++;
} else {
idleCount++;
}
}, count: 6);
await for (final stream in server.incomingStreams) {
stream.sendHeaders([]);
stream.incomingMessages.toList().then(
(_) => stream.outgoingMessages.close());
}
await server.finish();
expect(activeCount, 3);
expect(idleCount, 3);
}

Future clientFun() async {
int activeCount = 0;
int idleCount = 0;
client.onActiveStateChanged = expectAsync1((active) {
if (active) {
activeCount++;
} else {
idleCount++;
}
}, count: 6);
final streams = new List<ClientTransportStream>.generate(
5, (_) => client.makeRequest([]));
await Future.wait(streams.map((s) => s.outgoingMessages.close()));
await Future.wait(streams.map((s) => s.incomingMessages.toList()));
// This extra await is needed to allow the idle handler to run before
// verifying the idleCount, because the stream cleanup runs
// asynchronously after the stream is closed.
await new Future.value();
expect(activeCount, 1);
expect(idleCount, 1);

var stream = client.makeRequest([]);
await stream.outgoingMessages.close();
await stream.incomingMessages.toList();
await new Future.value();

stream = client.makeRequest([]);
await stream.outgoingMessages.close();
await stream.incomingMessages.toList();
await new Future.value();

await client.finish();
expect(activeCount, 3);
expect(idleCount, 3);
}

await Future.wait([clientFun(), serverFun()]);
});

group('flow-control', () {
const int kChunkSize = 1024;
const int kNumberOfMessages = 1000;
Expand Down

0 comments on commit e787697

Please sign in to comment.