Skip to content

Commit 2a8d9cf

Browse files
authored
Fix rare broken socket detection issues + tool to delay bytes in tests. (#375)
1 parent 76601a1 commit 2a8d9cf

File tree

2 files changed

+51
-6
lines changed

2 files changed

+51
-6
lines changed

lib/src/v3/connection.dart

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
202202
static Future<PgConnectionImplementation> connect(
203203
Endpoint endpoint, {
204204
ConnectionSettings? connectionSettings,
205+
@visibleForTesting
206+
StreamTransformer<Uint8List, Uint8List>? incomingBytesTransformer,
205207
}) async {
206208
final settings = connectionSettings is ResolvedConnectionSettings
207209
? connectionSettings
@@ -217,6 +219,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
217219
endpoint,
218220
settings,
219221
codecContext: codecContext,
222+
incomingBytesTransformer: incomingBytesTransformer,
220223
);
221224

222225
if (_debugLog) {
@@ -257,6 +260,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
257260
Endpoint endpoint,
258261
ResolvedConnectionSettings settings, {
259262
required CodecContext codecContext,
263+
StreamTransformer<Uint8List, Uint8List>? incomingBytesTransformer,
260264
}) async {
261265
final host = endpoint.host;
262266
final port = endpoint.port;
@@ -337,6 +341,10 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
337341
adaptedStream = async.SubscriptionStream(subscription);
338342
}
339343

344+
if (incomingBytesTransformer != null) {
345+
adaptedStream = adaptedStream.transform(incomingBytesTransformer);
346+
}
347+
340348
final outgoingSocket = async.StreamSinkExtensions(socket)
341349
.transform<Uint8List>(
342350
async.StreamSinkTransformer.fromHandlers(handleDone: (out) {
@@ -373,6 +381,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
373381
final bool _channelIsSecure;
374382
late final StreamSubscription<Message> _serverMessages;
375383
bool _isClosing = false;
384+
bool _socketIsBroken = false;
376385

377386
_PendingOperation? _pending;
378387
// Errors happening while a transaction is active will roll back the
@@ -558,19 +567,21 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
558567

559568
Future<void> _close(bool interruptRunning, PgException? cause,
560569
{bool socketIsBroken = false}) async {
570+
_socketIsBroken = _socketIsBroken || socketIsBroken;
561571
if (!_isClosing) {
562572
_isClosing = true;
563573

564574
if (interruptRunning) {
565575
_pending?.handleConnectionClosed(cause);
566-
if (!socketIsBroken) {
576+
if (!_socketIsBroken) {
567577
_channel.sink.add(const TerminateMessage());
568578
}
569579
} else {
570580
// Wait for the previous operation to complete by using the lock
571581
await _operationLock.withResource(() {
572-
// Use lock to await earlier operations
573-
_channel.sink.add(const TerminateMessage());
582+
if (!_socketIsBroken) {
583+
_channel.sink.add(const TerminateMessage());
584+
}
574585
});
575586
}
576587

@@ -580,7 +591,11 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
580591
}
581592

582593
void _closeAfterError([PgException? cause]) {
583-
_close(true, cause);
594+
_close(
595+
true,
596+
cause,
597+
socketIsBroken: cause?.willAbortConnection ?? false,
598+
);
584599
}
585600
}
586601

test/docker.dart

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import 'dart:async';
22
import 'dart:io';
3+
import 'dart:typed_data';
34

45
import 'package:async/async.dart';
56
import 'package:docker_process/containers/postgres.dart';
@@ -8,9 +9,12 @@ import 'package:meta/meta.dart';
89
import 'package:path/path.dart' as p;
910
import 'package:postgres/messages.dart';
1011
import 'package:postgres/postgres.dart';
12+
import 'package:postgres/src/v3/connection.dart';
1113
import 'package:stream_channel/stream_channel.dart';
1214
import 'package:test/test.dart';
1315

16+
final _splitAndDelayBytes = false;
17+
1418
// We log all packets sent to and received from the postgres server. This can be
1519
// used to debug failing tests. To view logs, something like this can be put
1620
// at the beginning of `main()`:
@@ -64,16 +68,18 @@ class PostgresServer {
6468
SslMode? sslMode,
6569
QueryMode? queryMode,
6670
}) async {
67-
return Connection.open(
71+
return await PgConnectionImplementation.connect(
6872
await endpoint(),
69-
settings: ConnectionSettings(
73+
connectionSettings: ConnectionSettings(
7074
connectTimeout: Duration(seconds: 3),
7175
queryTimeout: Duration(seconds: 3),
7276
replicationMode: replicationMode,
7377
transformer: loggingTransformer('conn'),
7478
sslMode: sslMode,
7579
queryMode: queryMode,
7680
),
81+
incomingBytesTransformer:
82+
_splitAndDelayBytes ? _transformIncomingBytes() : null,
7783
);
7884
}
7985

@@ -82,6 +88,30 @@ class PostgresServer {
8288
}
8389
}
8490

91+
StreamTransformer<Uint8List, Uint8List> _transformIncomingBytes() {
92+
return StreamTransformer.fromBind((s) => s.asyncExpand((u) {
93+
if (u.length <= 2) {
94+
return Stream.value(u);
95+
}
96+
final hash = u.hashCode.abs();
97+
final split = hash % u.length;
98+
if (split == 0 || split >= u.length - 1) {
99+
return Stream.value(u);
100+
}
101+
102+
final p1 = u.sublist(0, split);
103+
final p2 = u.sublist(split);
104+
105+
return Stream.fromFutures([
106+
Future.value(p1),
107+
Future.delayed(
108+
Duration(milliseconds: 50),
109+
() => p2,
110+
)
111+
]);
112+
}));
113+
}
114+
85115
@isTestGroup
86116
void withPostgresServer(
87117
String name,

0 commit comments

Comments
 (0)