diff --git a/lib/src/connection.dart b/lib/src/connection.dart index 29830ea..229b7d1 100644 --- a/lib/src/connection.dart +++ b/lib/src/connection.dart @@ -99,9 +99,13 @@ abstract class Connection { final Completer _onInitialPeerSettingsReceived = Completer(); - final StreamController _pingReceived = StreamController(); + /// Stream which emits an event with the ping id every time a ping is received + /// on this connection. + Function(int)? pingReceived; - final StreamController _receivedFrame = StreamController(); + /// Stream which emits an event every time a frame is received on this + /// connection. + Function()? frameReceived; /// Future which completes when the first SETTINGS frame is received from /// the peer. @@ -148,9 +152,14 @@ abstract class Connection { /// The state of this connection. late ConnectionState _state; - Connection(Stream> incoming, StreamSink> outgoing, - Settings settings, - {this.isClientConnection = true}) { + Connection( + Stream> incoming, + StreamSink> outgoing, + Settings settings, { + this.isClientConnection = true, + this.pingReceived, + this.frameReceived, + }) { _setupConnection(incoming, outgoing, settings); } @@ -164,26 +173,28 @@ abstract class Connection { _frameReaderSubscription = incomingFrames.listen((Frame frame) { _catchProtocolErrors(() => _handleFrameImpl(frame)); }, onError: (error, stack) { + // print(1); _terminate(ErrorCode.CONNECT_ERROR, causedByTransportError: true); }, onDone: () { // Ensure existing messages from lower levels are sent to the upper // levels before we terminate everything. _incomingQueue.forceDispatchIncomingMessages(); _streams.forceDispatchIncomingMessages(); - + // print(2); _terminate(ErrorCode.CONNECT_ERROR, causedByTransportError: true); }); // Setup frame writing. _frameWriter = FrameWriter(_hpackContext.encoder, outgoing, peerSettings); _frameWriter.doneFuture.whenComplete(() { + // print(3); _terminate(ErrorCode.CONNECT_ERROR, causedByTransportError: true); }); // Setup handlers. _settingsHandler = SettingsHandler(_hpackContext.encoder, _frameWriter, acknowledgedSettings, peerSettings); - _pingHandler = PingHandler(_frameWriter, _pingReceived.sink); + _pingHandler = PingHandler(_frameWriter, pingReceived); var settings = _decodeSettings(settingsObject); @@ -344,7 +355,7 @@ abstract class Connection { frame.decodedHeaders = _hpackContext.decoder.decode(frame.headerBlockFragment); } - _receivedFrame.add(null); + frameReceived?.call(); // Handle the frame as either a connection or a stream frame. if (frame.header.streamId == 0) { @@ -446,14 +457,29 @@ abstract class Connection { } class ClientConnection extends Connection implements ClientTransportConnection { - ClientConnection._(Stream> incoming, StreamSink> outgoing, - Settings settings) - : super(incoming, outgoing, settings, isClientConnection: true); - - factory ClientConnection(Stream> incoming, - StreamSink> outgoing, ClientSettings clientSettings) { + ClientConnection._( + Stream> incoming, + StreamSink> outgoing, + Settings settings, { + super.pingReceived, + super.frameReceived, + }) : super(incoming, outgoing, settings, isClientConnection: true); + + factory ClientConnection( + Stream> incoming, + StreamSink> outgoing, + ClientSettings clientSettings, { + Function(int data)? pingReceived, + Function()? frameReceived, + }) { outgoing.add(CONNECTION_PREFACE); - return ClientConnection._(incoming, outgoing, clientSettings); + return ClientConnection._( + incoming, + outgoing, + clientSettings, + pingReceived: pingReceived, + frameReceived: frameReceived, + ); } @override @@ -478,32 +504,35 @@ class ClientConnection extends Connection implements ClientTransportConnection { } return hStream; } - - @override - Stream get onPingReceived => _pingReceived.stream; - - @override - Stream get onFrameReceived => _receivedFrame.stream; } class ServerConnection extends Connection implements ServerTransportConnection { - ServerConnection._(Stream> incoming, StreamSink> outgoing, - Settings settings) - : super(incoming, outgoing, settings, isClientConnection: false); - - factory ServerConnection(Stream> incoming, - StreamSink> outgoing, ServerSettings serverSettings) { + ServerConnection._( + Stream> incoming, + StreamSink> outgoing, + Settings settings, { + super.pingReceived, + super.frameReceived, + }) : super(incoming, outgoing, settings, isClientConnection: false); + + factory ServerConnection( + Stream> incoming, + StreamSink> outgoing, + ServerSettings serverSettings, { + Function(int data)? pingReceived, + Function()? frameReceived, + }) { var frameBytes = readConnectionPreface(incoming); - return ServerConnection._(frameBytes, outgoing, serverSettings); + return ServerConnection._( + frameBytes, + outgoing, + serverSettings, + pingReceived: pingReceived, + frameReceived: frameReceived, + ); } @override Stream get incomingStreams => _streams.incomingStreams.cast(); - - @override - Stream get onPingReceived => _pingReceived.stream; - - @override - Stream get onFrameReceived => _receivedFrame.stream; } diff --git a/lib/src/ping/ping_handler.dart b/lib/src/ping/ping_handler.dart index 5ab3ffb..f8afaea 100644 --- a/lib/src/ping/ping_handler.dart +++ b/lib/src/ping/ping_handler.dart @@ -16,7 +16,7 @@ import '../sync_errors.dart'; class PingHandler extends Object with TerminatableMixin { final FrameWriter _frameWriter; final Map _remainingPings = {}; - final Sink? pingReceived; + final Function(int)? pingReceived; int _nextId = 1; PingHandler(this._frameWriter, [this.pingReceived]); @@ -37,7 +37,9 @@ class PingHandler extends Object with TerminatableMixin { } if (!frame.hasAckFlag) { - pingReceived?.add(frame.opaqueData); + // print( + // 'Call ping received on $pingReceived with data ${frame.opaqueData}'); + pingReceived?.call(frame.opaqueData); _frameWriter.writePingFrame(frame.opaqueData, ack: true); } else { var c = _remainingPings.remove(frame.opaqueData); diff --git a/lib/transport.dart b/lib/transport.dart index af37cf6..2261698 100644 --- a/lib/transport.dart +++ b/lib/transport.dart @@ -67,11 +67,11 @@ abstract class TransportConnection { /// Stream which emits an event with the ping id every time a ping is received /// on this connection. - Stream get onPingReceived; + Function(int)? pingReceived; - /// Stream which emits an event every time a ping is received on this + /// Stream which emits an event every time a frame is received on this /// connection. - Stream get onFrameReceived; + Function()? frameReceived; /// Finish this connection. /// @@ -104,18 +104,37 @@ abstract class ClientTransportConnection extends TransportConnection { } abstract class ServerTransportConnection extends TransportConnection { - factory ServerTransportConnection.viaSocket(Socket socket, - {ServerSettings? settings}) { - return ServerTransportConnection.viaStreams(socket, socket, - settings: settings); + factory ServerTransportConnection.viaSocket( + Socket socket, { + ServerSettings? settings, + Function(int data)? pingReceived, + Function()? frameReceived, + }) { + return ServerTransportConnection.viaStreams( + socket, + socket, + settings: settings, + pingReceived: pingReceived, + frameReceived: frameReceived, + ); } factory ServerTransportConnection.viaStreams( - Stream> incoming, StreamSink> outgoing, - {ServerSettings? settings = - const ServerSettings(concurrentStreamLimit: 1000)}) { + Stream> incoming, + StreamSink> outgoing, { + ServerSettings? settings = + const ServerSettings(concurrentStreamLimit: 1000), + Function(int data)? pingReceived, + Function()? frameReceived, + }) { settings ??= const ServerSettings(); - return ServerConnection(incoming, outgoing, settings); + return ServerConnection( + incoming, + outgoing, + settings, + pingReceived: pingReceived, + frameReceived: frameReceived, + ); } /// Incoming HTTP/2 streams. diff --git a/test/src/ping/ping_handler_test.dart b/test/src/ping/ping_handler_test.dart index 563f9ce..73a33d2 100644 --- a/test/src/ping/ping_handler_test.dart +++ b/test/src/ping/ping_handler_test.dart @@ -95,15 +95,16 @@ void main() { }); test('receiving-ping-calls-stream', () async { + List pingData = []; var writer = FrameWriterMock(); - var streamController = StreamController(); - var pingHandler = PingHandler(writer, streamController.sink); + pingCallback(int i) => pingData.add(i); + var pingHandler = PingHandler(writer, pingCallback); var header = FrameHeader(8, FrameType.PING, 0, 0); pingHandler.processPingFrame(PingFrame(header, 1)); var header2 = FrameHeader(8, FrameType.PING, 0, 0); pingHandler.processPingFrame(PingFrame(header2, 2)); - await expectLater(streamController.stream, emitsInOrder([1, 2])); + await expectLater(pingData, containsAllInOrder([1, 2])); }); }); }