Skip to content

Commit

Permalink
Switch to callbacks for pings
Browse files Browse the repository at this point in the history
  • Loading branch information
mosuem committed Jan 5, 2024
1 parent e54366e commit d90d617
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 84 deletions.
3 changes: 1 addition & 2 deletions lib/src/client/http2_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ class Http2ClientConnection implements connection.ClientConnection {
},
onPingTimeout: () => shutdown(),
);
transport.onFrameReceived
.listen((_) => keepAliveManager?.onFrameReceived());
transport.frameReceived = keepAliveManager?.onFrameReceived;
}
_connectionLifeTimer
..reset()
Expand Down
8 changes: 4 additions & 4 deletions lib/src/server/handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class ServerHandler extends ServiceCall {
final X509Certificate? _clientCertificate;
final InternetAddress? _remoteAddress;

/// Emits a ping everytime data is received
final Sink<void>? onDataReceived;
/// Callback everytime data is received
final void Function()? onDataReceived;

final Completer<void> _isCanceledCompleter = Completer<void>();

Expand Down Expand Up @@ -148,7 +148,7 @@ class ServerHandler extends ServiceCall {
// -- Idle state, incoming data --

void _onDataIdle(GrpcMessage headerMessage) async {
onDataReceived?.add(null);
onDataReceived?.call();
if (headerMessage is! GrpcMetadata) {
_sendError(GrpcError.unimplemented('Expected header frame'));
_sinkIncoming();
Expand Down Expand Up @@ -289,7 +289,7 @@ class ServerHandler extends ServiceCall {
return;
}

onDataReceived?.add(null);
onDataReceived?.call();
final data = message;
Object? request;
try {
Expand Down
24 changes: 12 additions & 12 deletions lib/src/server/server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,21 @@ class ConnectionServer {
required ServerTransportConnection connection,
X509Certificate? clientCertificate,
InternetAddress? remoteAddress,
required ServerKeepAlive serverKeepAlive,
}) async {
_connections.add(connection);
handlers[connection] = [];
// TODO(jakobr): Set active state handlers, close connection after idle
// timeout.
final onDataReceivedController = StreamController<void>();
ServerKeepAlive(
options: _keepAliveOptions,
tooManyBadPings: () async =>
await connection.terminate(ErrorCode.ENHANCE_YOUR_CALM),
pingNotifier: connection.onPingReceived,
dataNotifier: onDataReceivedController.stream,
).handle();

serverKeepAlive.tooManyBadPings =
() async => await connection.terminate(ErrorCode.ENHANCE_YOUR_CALM);
connection.incomingStreams.listen((stream) {
final handler = serveStream_(
stream: stream,
clientCertificate: clientCertificate,
remoteAddress: remoteAddress,
onDataReceived: onDataReceivedController.sink,
onDataReceived: serverKeepAlive.onDataReceived,
);
handler.onCanceled.then((_) => handlers[connection]?.remove(handler));
handlers[connection]!.add(handler);
Expand All @@ -153,7 +149,6 @@ class ConnectionServer {
}
_connections.remove(connection);
handlers.remove(connection);
await onDataReceivedController.close();
});
}

Expand All @@ -162,7 +157,7 @@ class ConnectionServer {
required ServerTransportStream stream,
X509Certificate? clientCertificate,
InternetAddress? remoteAddress,
Sink<void>? onDataReceived,
void Function()? onDataReceived,
}) {
return ServerHandler(
stream: stream,
Expand Down Expand Up @@ -279,17 +274,22 @@ class Server extends ConnectionServer {
clientCertificate = socket.peerCertificate;
}

final serverKeepAlive = ServerKeepAlive(options: _keepAliveOptions);
final connection = ServerTransportConnection.viaSocket(
socket,
settings: http2ServerSettings,
pingReceived: serverKeepAlive.onPingReceived,
);
connection.pingReceived = serverKeepAlive.onPingReceived;

serveConnection(
connection: connection,
clientCertificate: clientCertificate,
remoteAddress: socket.remoteAddressOrNull,
serverKeepAlive: serverKeepAlive,
);
}, onError: (error, stackTrace) {
print('error');
if (error is Error) {
Zone.current.handleUncaughtError(error, stackTrace);
}
Expand All @@ -302,7 +302,7 @@ class Server extends ConnectionServer {
required ServerTransportStream stream,
X509Certificate? clientCertificate,
InternetAddress? remoteAddress,
Sink<void>? onDataReceived,
void Function()? onDataReceived,
}) {
return ServerHandler(
stream: stream,
Expand Down
24 changes: 4 additions & 20 deletions lib/src/server/server_keepalive.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,38 +40,21 @@ class ServerKeepAliveOptions {
class ServerKeepAlive {
/// What to do after receiving too many bad pings, probably shut down the
/// connection to not be DDoSed.
final Future<void> Function()? tooManyBadPings;
Future<void> Function()? tooManyBadPings;

final ServerKeepAliveOptions options;

/// A stream of events for every time the server gets pinged.
final Stream<void> pingNotifier;

/// A stream of events for every time the server receives data.
final Stream<void> dataNotifier;

int _badPings = 0;
Stopwatch? _timeOfLastReceivedPing;

ServerKeepAlive({
this.tooManyBadPings,
required this.options,
required this.pingNotifier,
required this.dataNotifier,
});

void handle() {
// If we don't care about bad pings, there is not point in listening to
// events.
if (_enforcesMaxBadPings) {
pingNotifier.listen((_) => _onPingReceived());
dataNotifier.listen((_) => _onDataReceived());
}
}

bool get _enforcesMaxBadPings => (options.maxBadPings ?? 0) > 0;

Future<void> _onPingReceived() async {
Future<void> onPingReceived(int _) async {
if (_enforcesMaxBadPings) {
if (_timeOfLastReceivedPing == null) {
_timeOfLastReceivedPing = clock.stopwatch()
Expand All @@ -82,12 +65,13 @@ class ServerKeepAlive {
_badPings++;
}
if (_badPings > options.maxBadPings!) {
// print('Call too many bad pings');
await tooManyBadPings?.call();
}
}
}

void _onDataReceived() {
void onDataReceived() {
if (_enforcesMaxBadPings) {
_badPings = 0;
_timeOfLastReceivedPing = null;
Expand Down
3 changes: 3 additions & 0 deletions pubspec_overrides.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
dependency_overrides:
http2:
path: ../http2
5 changes: 2 additions & 3 deletions test/client_tests/client_keepalive_manager_test.mocks.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Mocks generated by Mockito 5.4.1 from annotations
// Mocks generated by Mockito 5.4.2 from annotations
// in grpc/test/client_tests/client_keepalive_manager_test.dart.
// Do not manually edit this file.

// @dart=2.19

// ignore_for_file: no_leading_underscores_for_library_prefixes
import 'package:mockito/mockito.dart' as _i1;

Expand Down Expand Up @@ -32,6 +30,7 @@ class MockPinger extends _i1.Mock implements _i2.Pinger {
),
returnValueForMissingStub: null,
);

@override
void onPingTimeout() => super.noSuchMethod(
Invocation.method(
Expand Down
2 changes: 1 addition & 1 deletion test/keepalive_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class FakeEchoService extends EchoServiceBase {
@override
Stream<ServerStreamingEchoResponse> serverStreamingEcho(
ServiceCall call, ServerStreamingEchoRequest request) {
// TODO: implement serverStreamingEcho
// TODO: implement serverStreamingEcho
throw UnimplementedError();
}
}
46 changes: 17 additions & 29 deletions test/server_keepalive_manager_test.dart
Original file line number Diff line number Diff line change
@@ -1,57 +1,45 @@
import 'dart:async';

import 'package:fake_async/fake_async.dart';
import 'package:grpc/src/server/server_keepalive.dart';
import 'package:test/test.dart';

void main() {
late StreamController pingStream;
late StreamController dataStream;
late int maxBadPings;

var goAway = false;

void initServer([ServerKeepAliveOptions? options]) => ServerKeepAlive(
ServerKeepAlive initServer([ServerKeepAliveOptions? options]) =>
ServerKeepAlive(
options: options ??
ServerKeepAliveOptions(
maxBadPings: maxBadPings,
minIntervalBetweenPingsWithoutData: Duration(milliseconds: 5),
),
pingNotifier: pingStream.stream,
dataNotifier: dataStream.stream,
tooManyBadPings: () async => goAway = true,
).handle();
);

setUp(() {
pingStream = StreamController();
dataStream = StreamController();
maxBadPings = 10;
goAway = false;
});

tearDown(() {
pingStream.close();
dataStream.close();
});

final timeAfterPing = Duration(milliseconds: 10);

test('Sending too many pings without data kills connection', () async {
FakeAsync().run((async) {
initServer();
final server = initServer();
// Send good ping
pingStream.sink.add(null);
server.onPingReceived(0);
async.elapse(timeAfterPing);

// Send [maxBadPings] bad pings, that's still ok
for (var i = 0; i < maxBadPings; i++) {
pingStream.sink.add(null);
server.onPingReceived(0);
}
async.elapse(timeAfterPing);
expect(goAway, false);

// Send another bad ping; that's one too many!
pingStream.sink.add(null);
server.onPingReceived(0);
async.elapse(timeAfterPing);
expect(goAway, true);
});
Expand All @@ -60,17 +48,17 @@ void main() {
'Sending too many pings without data doesn`t kill connection if the server doesn`t care',
() async {
FakeAsync().run((async) {
initServer(ServerKeepAliveOptions(
final server = initServer(ServerKeepAliveOptions(
maxBadPings: null,
minIntervalBetweenPingsWithoutData: Duration(milliseconds: 5),
));
// Send good ping
pingStream.sink.add(null);
server.onPingReceived(0);
async.elapse(timeAfterPing);

// Send a lot of bad pings, that's still ok.
for (var i = 0; i < 50; i++) {
pingStream.sink.add(null);
server.onPingReceived(0);
}
async.elapse(timeAfterPing);
expect(goAway, false);
Expand All @@ -79,36 +67,36 @@ void main() {

test('Sending many pings with data doesn`t kill connection', () async {
FakeAsync().run((async) {
initServer();
final server = initServer();

// Send good ping
pingStream.sink.add(null);
server.onPingReceived(0);
async.elapse(timeAfterPing);

// Send [maxBadPings] bad pings, that's still ok
for (var i = 0; i < maxBadPings; i++) {
pingStream.sink.add(null);
server.onPingReceived(0);
}
async.elapse(timeAfterPing);
expect(goAway, false);

// Sending data resets the bad ping count
dataStream.add(null);
server.onDataReceived();
async.elapse(timeAfterPing);

// Send good ping
pingStream.sink.add(null);
server.onPingReceived(0);
async.elapse(timeAfterPing);

// Send [maxBadPings] bad pings, that's still ok
for (var i = 0; i < maxBadPings; i++) {
pingStream.sink.add(null);
server.onPingReceived(0);
}
async.elapse(timeAfterPing);
expect(goAway, false);

// Send another bad ping; that's one too many!
pingStream.sink.add(null);
server.onPingReceived(0);
async.elapse(timeAfterPing);
expect(goAway, true);
});
Expand Down
Loading

0 comments on commit d90d617

Please sign in to comment.