Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
Switch to callbacks from streams for ping handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mosuem committed Jan 5, 2024
1 parent e7531ef commit c4f4730
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 50 deletions.
97 changes: 63 additions & 34 deletions lib/src/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,13 @@ abstract class Connection {

final Completer<void> _onInitialPeerSettingsReceived = Completer<void>();

final StreamController<int> _pingReceived = StreamController<int>();
/// Stream which emits an event with the ping id every time a ping is received
/// on this connection.
Function(int)? pingReceived;

final StreamController<void> _receivedFrame = StreamController<void>();
/// Stream which emits an event every time a frame is received on this
/// connection.
Function()? frameReceived;

/// Future which completes when the first SETTINGS frame is received from
/// the peer.
Expand Down Expand Up @@ -148,9 +152,14 @@ abstract class Connection {
/// The state of this connection.
late ConnectionState _state;

Connection(Stream<List<int>> incoming, StreamSink<List<int>> outgoing,
Settings settings,
{this.isClientConnection = true}) {
Connection(
Stream<List<int>> incoming,
StreamSink<List<int>> outgoing,
Settings settings, {
this.isClientConnection = true,
this.pingReceived,
this.frameReceived,
}) {
_setupConnection(incoming, outgoing, settings);
}

Expand All @@ -164,26 +173,28 @@ abstract class Connection {
_frameReaderSubscription = incomingFrames.listen((Frame frame) {
_catchProtocolErrors(() => _handleFrameImpl(frame));
}, onError: (error, stack) {
// print(1);
_terminate(ErrorCode.CONNECT_ERROR, causedByTransportError: true);
}, onDone: () {
// Ensure existing messages from lower levels are sent to the upper
// levels before we terminate everything.
_incomingQueue.forceDispatchIncomingMessages();
_streams.forceDispatchIncomingMessages();

// print(2);
_terminate(ErrorCode.CONNECT_ERROR, causedByTransportError: true);
});

// Setup frame writing.
_frameWriter = FrameWriter(_hpackContext.encoder, outgoing, peerSettings);
_frameWriter.doneFuture.whenComplete(() {
// print(3);
_terminate(ErrorCode.CONNECT_ERROR, causedByTransportError: true);
});

// Setup handlers.
_settingsHandler = SettingsHandler(_hpackContext.encoder, _frameWriter,
acknowledgedSettings, peerSettings);
_pingHandler = PingHandler(_frameWriter, _pingReceived.sink);
_pingHandler = PingHandler(_frameWriter, pingReceived);

var settings = _decodeSettings(settingsObject);

Expand Down Expand Up @@ -344,7 +355,7 @@ abstract class Connection {
frame.decodedHeaders =
_hpackContext.decoder.decode(frame.headerBlockFragment);
}
_receivedFrame.add(null);
frameReceived?.call();

// Handle the frame as either a connection or a stream frame.
if (frame.header.streamId == 0) {
Expand Down Expand Up @@ -446,14 +457,29 @@ abstract class Connection {
}

class ClientConnection extends Connection implements ClientTransportConnection {
ClientConnection._(Stream<List<int>> incoming, StreamSink<List<int>> outgoing,
Settings settings)
: super(incoming, outgoing, settings, isClientConnection: true);

factory ClientConnection(Stream<List<int>> incoming,
StreamSink<List<int>> outgoing, ClientSettings clientSettings) {
ClientConnection._(
Stream<List<int>> incoming,
StreamSink<List<int>> outgoing,
Settings settings, {
super.pingReceived,
super.frameReceived,
}) : super(incoming, outgoing, settings, isClientConnection: true);

factory ClientConnection(
Stream<List<int>> incoming,
StreamSink<List<int>> outgoing,
ClientSettings clientSettings, {
Function(int data)? pingReceived,
Function()? frameReceived,
}) {
outgoing.add(CONNECTION_PREFACE);
return ClientConnection._(incoming, outgoing, clientSettings);
return ClientConnection._(
incoming,
outgoing,
clientSettings,
pingReceived: pingReceived,
frameReceived: frameReceived,
);
}

@override
Expand All @@ -478,32 +504,35 @@ class ClientConnection extends Connection implements ClientTransportConnection {
}
return hStream;
}

@override
Stream<int> get onPingReceived => _pingReceived.stream;

@override
Stream<void> get onFrameReceived => _receivedFrame.stream;
}

class ServerConnection extends Connection implements ServerTransportConnection {
ServerConnection._(Stream<List<int>> incoming, StreamSink<List<int>> outgoing,
Settings settings)
: super(incoming, outgoing, settings, isClientConnection: false);

factory ServerConnection(Stream<List<int>> incoming,
StreamSink<List<int>> outgoing, ServerSettings serverSettings) {
ServerConnection._(
Stream<List<int>> incoming,
StreamSink<List<int>> outgoing,
Settings settings, {
super.pingReceived,
super.frameReceived,
}) : super(incoming, outgoing, settings, isClientConnection: false);

factory ServerConnection(
Stream<List<int>> incoming,
StreamSink<List<int>> outgoing,
ServerSettings serverSettings, {
Function(int data)? pingReceived,
Function()? frameReceived,
}) {
var frameBytes = readConnectionPreface(incoming);
return ServerConnection._(frameBytes, outgoing, serverSettings);
return ServerConnection._(
frameBytes,
outgoing,
serverSettings,
pingReceived: pingReceived,
frameReceived: frameReceived,
);
}

@override
Stream<ServerTransportStream> get incomingStreams =>
_streams.incomingStreams.cast<ServerTransportStream>();

@override
Stream<int> get onPingReceived => _pingReceived.stream;

@override
Stream<void> get onFrameReceived => _receivedFrame.stream;
}
6 changes: 4 additions & 2 deletions lib/src/ping/ping_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import '../sync_errors.dart';
class PingHandler extends Object with TerminatableMixin {
final FrameWriter _frameWriter;
final Map<int, Completer> _remainingPings = {};
final Sink<int>? pingReceived;
final Function(int)? pingReceived;
int _nextId = 1;

PingHandler(this._frameWriter, [this.pingReceived]);
Expand All @@ -37,7 +37,9 @@ class PingHandler extends Object with TerminatableMixin {
}

if (!frame.hasAckFlag) {
pingReceived?.add(frame.opaqueData);
// print(
// 'Call ping received on $pingReceived with data ${frame.opaqueData}');
pingReceived?.call(frame.opaqueData);
_frameWriter.writePingFrame(frame.opaqueData, ack: true);
} else {
var c = _remainingPings.remove(frame.opaqueData);
Expand Down
41 changes: 30 additions & 11 deletions lib/transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ abstract class TransportConnection {

/// Stream which emits an event with the ping id every time a ping is received
/// on this connection.
Stream<int> get onPingReceived;
Function(int)? pingReceived;

/// Stream which emits an event every time a ping is received on this
/// Stream which emits an event every time a frame is received on this
/// connection.
Stream<void> get onFrameReceived;
Function()? frameReceived;

/// Finish this connection.
///
Expand Down Expand Up @@ -104,18 +104,37 @@ abstract class ClientTransportConnection extends TransportConnection {
}

abstract class ServerTransportConnection extends TransportConnection {
factory ServerTransportConnection.viaSocket(Socket socket,
{ServerSettings? settings}) {
return ServerTransportConnection.viaStreams(socket, socket,
settings: settings);
factory ServerTransportConnection.viaSocket(
Socket socket, {
ServerSettings? settings,
Function(int data)? pingReceived,
Function()? frameReceived,
}) {
return ServerTransportConnection.viaStreams(
socket,
socket,
settings: settings,
pingReceived: pingReceived,
frameReceived: frameReceived,
);
}

factory ServerTransportConnection.viaStreams(
Stream<List<int>> incoming, StreamSink<List<int>> outgoing,
{ServerSettings? settings =
const ServerSettings(concurrentStreamLimit: 1000)}) {
Stream<List<int>> incoming,
StreamSink<List<int>> outgoing, {
ServerSettings? settings =
const ServerSettings(concurrentStreamLimit: 1000),
Function(int data)? pingReceived,
Function()? frameReceived,
}) {
settings ??= const ServerSettings();
return ServerConnection(incoming, outgoing, settings);
return ServerConnection(
incoming,
outgoing,
settings,
pingReceived: pingReceived,
frameReceived: frameReceived,
);
}

/// Incoming HTTP/2 streams.
Expand Down
7 changes: 4 additions & 3 deletions test/src/ping/ping_handler_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,16 @@ void main() {
});

test('receiving-ping-calls-stream', () async {
List<int> pingData = [];
var writer = FrameWriterMock();
var streamController = StreamController<int>();
var pingHandler = PingHandler(writer, streamController.sink);
pingCallback(int i) => pingData.add(i);
var pingHandler = PingHandler(writer, pingCallback);

var header = FrameHeader(8, FrameType.PING, 0, 0);
pingHandler.processPingFrame(PingFrame(header, 1));
var header2 = FrameHeader(8, FrameType.PING, 0, 0);
pingHandler.processPingFrame(PingFrame(header2, 2));
await expectLater(streamController.stream, emitsInOrder([1, 2]));
await expectLater(pingData, containsAllInOrder([1, 2]));
});
});
}
Expand Down

0 comments on commit c4f4730

Please sign in to comment.