Skip to content

Commit

Permalink
Revert "Asynchronous Codec methods + updated (deprecated) message pro…
Browse files Browse the repository at this point in the history
…cessing.…" (#374)

This reverts commit ccb3891.
isoos authored Sep 12, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 5962c9f commit cc2d3e7
Showing 10 changed files with 81 additions and 309 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
- `Connection.info` (through `ConnectionInfo` class) exposes read-only connection-level information,
e.g. acessing access server-provided parameter status values.
- Support for binary `pgoutput` replication by [wolframm](https://github.com/Wolframm-Activities-OU).
- Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages).
- **Allowing custom type codecs**:
- `Codec` interface is used for encoding/decoding value by type OIDs or Dart values.
- `Codec.encode` and `Codec.decode` gets a reference to `CodecContext` which provides
@@ -13,8 +14,6 @@
(for values where type is not specified).
- `RelationTracker` tracks information about relations (currently limited to `RelationMessage` caching).
- **Behaviour / soft-breaking changes**:
- Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages).
- Deprecated some logical replication message parsing method.
- Removed `@internal`-annotated methods from the public API of `ServerException` and `Severity`.
- `ServerException` may be transformed into `_PgTimeoutException` which is both `PgException` and `TimeoutException` (but no longer `ServerException`).
- The `timeout` parameters and the `SessionSettings.queryTimeout` has only a somewhat
5 changes: 2 additions & 3 deletions lib/messages.dart
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ library messages;

export 'src/buffer.dart' show PgByteDataWriter;
export 'src/messages/client_messages.dart';
export 'src/messages/logical_replication_messages.dart'
hide tryAsyncParseLogicalReplicationMessage;
export 'src/messages/server_messages.dart' hide parseXLogDataMessage;
export 'src/messages/logical_replication_messages.dart';
export 'src/messages/server_messages.dart';
export 'src/messages/shared_messages.dart';
10 changes: 4 additions & 6 deletions lib/src/message_window.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import 'dart:async';
import 'dart:collection';
import 'dart:typed_data';

@@ -12,7 +11,7 @@ import 'messages/shared_messages.dart';

const int _headerByteSize = 5;

typedef _ServerMessageFn = FutureOr<ServerMessage> Function(
typedef _ServerMessageFn = ServerMessage Function(
PgByteDataReader reader, int length);

Map<int, _ServerMessageFn> _messageTypeMap = {
@@ -51,7 +50,7 @@ class MessageFramer {
bool get _isComplete =>
_expectedLength == 0 || _expectedLength <= _reader.remainingLength;

Future<void> addBytes(Uint8List bytes) async {
void addBytes(Uint8List bytes) {
_reader.add(bytes);

while (true) {
@@ -77,7 +76,7 @@ class MessageFramer {
}

final targetRemainingLength = _reader.remainingLength - _expectedLength;
final msg = await msgMaker(_reader, _expectedLength);
final msg = msgMaker(_reader, _expectedLength);
if (_reader.remainingLength > targetRemainingLength) {
throw StateError(
'Message parser consumed more bytes than expected. type=$_type expectedLength=$_expectedLength');
@@ -112,8 +111,7 @@ class MessageFramer {
/// such as replication messages.
/// Returns a [ReplicationMessage] if the message contains such message.
/// Otherwise, it'll just return the provided bytes as [CopyDataMessage].
Future<ServerMessage> _parseCopyDataMessage(
PgByteDataReader reader, int length) async {
ServerMessage _parseCopyDataMessage(PgByteDataReader reader, int length) {
final code = reader.readUint8();
if (code == ReplicationMessageId.primaryKeepAlive) {
return PrimaryKeepAliveMessage.parse(reader);
201 changes: 11 additions & 190 deletions lib/src/messages/logical_replication_messages.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import 'dart:typed_data';

import 'package:buffer/buffer.dart';
import 'package:meta/meta.dart';
import 'package:postgres/src/types/codec.dart';

import '../buffer.dart';
@@ -48,8 +47,6 @@ class XLogDataLogicalMessage implements XLogDataMessage {

/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so,
/// [LogicalReplicationMessage] is returned, otherwise `null` is returned.
@Deprecated('This method will be removed from public API. '
'Please file a new issue on GitHub if you are using it.')
LogicalReplicationMessage? tryParseLogicalReplicationMessage(
PgByteDataReader reader, int length) {
// the first byte is the msg type
@@ -72,66 +69,13 @@ LogicalReplicationMessage? tryParseLogicalReplicationMessage(
return TypeMessage._parse(reader);

case LogicalReplicationMessageTypes.insert:
return InsertMessage._syncParse(reader);
return InsertMessage._parse(reader);

case LogicalReplicationMessageTypes.update:
return UpdateMessage._syncParse(reader);
return UpdateMessage._parse(reader);

case LogicalReplicationMessageTypes.delete:
return DeleteMessage._syncParse(reader);

case LogicalReplicationMessageTypes.truncate:
return TruncateMessage._parse(reader);

case LogicalReplicationMessageTypes.unsupported:
// wal2json messages starts with `{` as the first byte
if (firstByte == '{'.codeUnits.single) {
// note this needs the full set of bytes unlike other cases
final bb = BytesBuffer();
bb.addByte(firstByte);
bb.add(reader.read(length - 1));
try {
return JsonMessage(reader.encoding.decode(bb.toBytes()));
} catch (_) {
// ignore
}
}
return null;
}
}

/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so,
/// [LogicalReplicationMessage] is returned, otherwise `null` is returned.
@internal
Future<LogicalReplicationMessage?> tryAsyncParseLogicalReplicationMessage(
PgByteDataReader reader, int length) async {
// the first byte is the msg type
final firstByte = reader.readUint8();
final msgType = LogicalReplicationMessageTypes.fromByte(firstByte);
switch (msgType) {
case LogicalReplicationMessageTypes.begin:
return BeginMessage._parse(reader);

case LogicalReplicationMessageTypes.commit:
return CommitMessage._parse(reader);

case LogicalReplicationMessageTypes.origin:
return OriginMessage._parse(reader);

case LogicalReplicationMessageTypes.relation:
return RelationMessage._parse(reader);

case LogicalReplicationMessageTypes.type:
return TypeMessage._parse(reader);

case LogicalReplicationMessageTypes.insert:
return await InsertMessage._parse(reader);

case LogicalReplicationMessageTypes.update:
return await UpdateMessage._parse(reader);

case LogicalReplicationMessageTypes.delete:
return await DeleteMessage._parse(reader);
return DeleteMessage._parse(reader);

case LogicalReplicationMessageTypes.truncate:
return TruncateMessage._parse(reader);
@@ -437,9 +381,7 @@ class TupleData {
/// TupleData does not consume the entire bytes
///
/// It'll read until the types are generated.
///
/// NOTE: do not use, will be removed.
factory TupleData._syncParse(PgByteDataReader reader, int relationId) {
factory TupleData._parse(PgByteDataReader reader, int relationId) {
final columnCount = reader.readUint16();
final columns = <TupleDataColumn>[];
for (var i = 0; i < columnCount; i++) {
@@ -495,66 +437,6 @@ class TupleData {
return TupleData(columns: columns);
}

/// TupleData does not consume the entire bytes
///
/// It'll read until the types are generated.
static Future<TupleData> _parse(
PgByteDataReader reader, int relationId) async {
final columnCount = reader.readUint16();
final columns = <TupleDataColumn>[];
for (var i = 0; i < columnCount; i++) {
// reading order matters
final typeId = reader.readUint8();
final tupleDataType = TupleDataType.fromByte(typeId);
late final int length;
late final String data;
final typeOid = reader.codecContext.relationTracker
.getCachedTypeOidForRelationColumn(relationId, i);
Object? value;
switch (tupleDataType) {
case TupleDataType.text:
length = reader.readUint32();
data = reader.encoding.decode(reader.read(length));
value = data;
break;
case TupleDataType.binary:
length = reader.readUint32();
final bytes = reader.read(length);
value = typeOid == null
? UndecodedBytes(
typeOid: 0,
isBinary: true,
bytes: bytes,
encoding: reader.codecContext.encoding,
)
: await reader.codecContext.typeRegistry.decode(
EncodedValue.binary(
bytes,
typeOid: typeOid,
),
reader.codecContext,
);
data = value.toString();
break;
case TupleDataType.null_:
case TupleDataType.toast:
length = 0;
data = '';
break;
}
columns.add(
TupleDataColumn(
typeId: typeId,
length: length,
typeOid: typeOid,
data: data,
value: value,
),
);
}
return TupleData(columns: columns);
}

late final int columnCount = columns.length;

@override
@@ -569,26 +451,13 @@ class InsertMessage implements LogicalReplicationMessage {
late final int relationId;
late final TupleData tuple;

InsertMessage._(this.relationId, this.tuple);

/// NOTE: do not use, will be removed.
InsertMessage._syncParse(PgByteDataReader reader) {
InsertMessage._parse(PgByteDataReader reader) {
relationId = reader.readUint32();
final tupleType = reader.readUint8();
if (tupleType != 'N'.codeUnitAt(0)) {
throw Exception("InsertMessage must have 'N' tuple type");
}
tuple = TupleData._syncParse(reader, relationId);
}

static Future<InsertMessage> _parse(PgByteDataReader reader) async {
final relationId = reader.readUint32();
final tupleType = reader.readUint8();
if (tupleType != 'N'.codeUnitAt(0)) {
throw Exception("InsertMessage must have 'N' tuple type");
}
final tuple = await TupleData._parse(reader, relationId);
return InsertMessage._(relationId, tuple);
tuple = TupleData._parse(reader, relationId);
}

@override
@@ -642,58 +511,28 @@ class UpdateMessage implements LogicalReplicationMessage {
/// Byte1('N'): Identifies the following TupleData message as a new tuple.
late final TupleData? newTuple;

UpdateMessage._(
this.relationId, this.oldTupleType, this.oldTuple, this.newTuple);

/// NOTE: do not use, will be removed.
UpdateMessage._syncParse(PgByteDataReader reader) {
UpdateMessage._parse(PgByteDataReader reader) {
// reading order matters
relationId = reader.readUint32();
var tupleType = UpdateMessageTuple.fromByte(reader.readUint8());

if (tupleType == UpdateMessageTuple.oldType ||
tupleType == UpdateMessageTuple.keyType) {
oldTupleType = tupleType;
oldTuple = TupleData._syncParse(reader, relationId);
oldTuple = TupleData._parse(reader, relationId);
tupleType = UpdateMessageTuple.fromByte(reader.readUint8());
} else {
oldTupleType = null;
oldTuple = null;
}

if (tupleType == UpdateMessageTuple.newType) {
newTuple = TupleData._syncParse(reader, relationId);
newTuple = TupleData._parse(reader, relationId);
} else {
throw Exception('Invalid Tuple Type for UpdateMessage');
}
}

static Future<UpdateMessage> _parse(PgByteDataReader reader) async {
// reading order matters
final relationId = reader.readUint32();
UpdateMessageTuple? oldTupleType;
TupleData? oldTuple;
TupleData? newTuple;
var tupleType = UpdateMessageTuple.fromByte(reader.readUint8());

if (tupleType == UpdateMessageTuple.oldType ||
tupleType == UpdateMessageTuple.keyType) {
oldTupleType = tupleType;
oldTuple = await TupleData._parse(reader, relationId);
tupleType = UpdateMessageTuple.fromByte(reader.readUint8());
} else {
oldTupleType = null;
oldTuple = null;
}

if (tupleType == UpdateMessageTuple.newType) {
newTuple = await TupleData._parse(reader, relationId);
} else {
throw Exception('Invalid Tuple Type for UpdateMessage');
}
return UpdateMessage._(relationId, oldTupleType, oldTuple, newTuple);
}

@override
String toString() {
return 'UpdateMessage(relationId: $relationId, oldTupleType: $oldTupleType, oldTuple: $oldTuple, newTuple: $newTuple)';
@@ -744,36 +583,18 @@ class DeleteMessage implements LogicalReplicationMessage {
/// Byte1('N'): Identifies the following TupleData message as a new tuple.
late final TupleData oldTuple;

DeleteMessage._(this.relationId, this.oldTupleType, this.oldTuple);

/// NOTE: do not use, will be removed.
DeleteMessage._syncParse(PgByteDataReader reader) {
DeleteMessage._parse(PgByteDataReader reader) {
relationId = reader.readUint32();
oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8());

switch (oldTupleType) {
case DeleteMessageTuple.keyType:
case DeleteMessageTuple.oldType:
oldTuple = TupleData._syncParse(reader, relationId);
break;
case DeleteMessageTuple.unknown:
throw Exception('Unknown tuple type for DeleteMessage');
}
}

static Future<DeleteMessage> _parse(PgByteDataReader reader) async {
final relationId = reader.readUint32();
final oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8());
TupleData? oldTuple;
switch (oldTupleType) {
case DeleteMessageTuple.keyType:
case DeleteMessageTuple.oldType:
oldTuple = await TupleData._parse(reader, relationId);
oldTuple = TupleData._parse(reader, relationId);
break;
case DeleteMessageTuple.unknown:
throw Exception('Unknown tuple type for DeleteMessage');
}
return DeleteMessage._(relationId, oldTupleType, oldTuple);
}

@override
44 changes: 3 additions & 41 deletions lib/src/messages/server_messages.dart
Original file line number Diff line number Diff line change
@@ -368,8 +368,9 @@ class XLogDataMessage implements ReplicationMessage, ServerMessage {
/// will return a [XLogDataLogicalMessage] with that message. Otherwise, it'll
/// return [XLogDataMessage] with raw data.
///
@Deprecated('This method will be removed from public API. '
'Please file a new issue on GitHub if you are using it.')
@Deprecated(
'It is likely that this method signature will change or will be removed in '
'an upcoming release. Please file a new issue on GitHub if you are using it.')
static XLogDataMessage parse(
Uint8List bytes,
Encoding encoding, {
@@ -408,45 +409,6 @@ class XLogDataMessage implements ReplicationMessage, ServerMessage {
'XLogDataMessage(walStart: $walStart, walEnd: $walEnd, time: $time, data: $data)';
}

/// Parses the XLogDataMessage
///
/// If [XLogDataMessage.data] is a [LogicalReplicationMessage], then the method
/// will return a [XLogDataLogicalMessage] with that message. Otherwise, it'll
/// return [XLogDataMessage] with raw data.
@internal
Future<XLogDataMessage> parseXLogDataMessage(
Uint8List bytes,
Encoding encoding, {
CodecContext? codecContext,
}) async {
final reader = PgByteDataReader(
codecContext:
codecContext ?? CodecContext.withDefaults(encoding: encoding))
..add(bytes);
final walStart = LSN(reader.readUint64());
final walEnd = LSN(reader.readUint64());
final time = dateTimeFromMicrosecondsSinceY2k(reader.readUint64());

final message = await tryAsyncParseLogicalReplicationMessage(
reader, reader.remainingLength);
if (message != null) {
return XLogDataLogicalMessage(
message: message,
bytes: bytes,
time: time,
walEnd: walEnd,
walStart: walStart,
);
} else {
return XLogDataMessage(
bytes: bytes,
time: time,
walEnd: walEnd,
walStart: walStart,
);
}
}

class UnknownMessage extends ServerMessage {
final int code;
final Uint8List bytes;
7 changes: 3 additions & 4 deletions lib/src/types/codec.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';

@@ -61,21 +60,21 @@ enum EncodingFormat {
/// Encodes the [input] value and returns an [EncodedValue] object.
///
/// May return `null` if the encoder is not able to convert the [input] value.
typedef EncoderFn = FutureOr<EncodedValue?> Function(
typedef EncoderFn = EncodedValue? Function(
TypedValue input, CodecContext context);

/// Encoder and decoder for a value stored in Postgresql.
abstract class Codec {
/// Encodes the [input] value and returns an [EncodedValue] object.
///
/// May return `null` if the codec is not able to encode the [input].
FutureOr<EncodedValue?> encode(TypedValue input, CodecContext context);
EncodedValue? encode(TypedValue input, CodecContext context);

/// Decodes the [input] value and returns a Dart value object.
///
/// May return [UndecodedBytes] or the same [input] instance if the codec
/// is not able to decode the [input].
FutureOr<Object?> decode(EncodedValue input, CodecContext context);
Object? decode(EncodedValue input, CodecContext context);
}

/// Provides access to connection and database information, and also to additional codecs.
10 changes: 5 additions & 5 deletions lib/src/types/type_registry.dart
Original file line number Diff line number Diff line change
@@ -267,28 +267,28 @@ class TypeRegistry {
]);
}

Future<EncodedValue?> encode(TypedValue input, CodecContext context) async {
EncodedValue? encode(TypedValue input, CodecContext context) {
// check for codec
final typeOid = input.type.oid;
final codec = typeOid == null ? null : _codecs[typeOid];
if (codec != null) {
final r = await codec.encode(input, context);
final r = codec.encode(input, context);
if (r != null) {
return r;
}
}

// fallback encoders
for (final encoder in _encoders) {
final encoded = await encoder(input, context);
final encoded = encoder(input, context);
if (encoded != null) {
return encoded;
}
}
throw PgException("Could not infer type of value '${input.value}'.");
}

Future<Object?> decode(EncodedValue value, CodecContext context) async {
Object? decode(EncodedValue value, CodecContext context) {
final typeOid = value.typeOid;
if (typeOid == null) {
throw ArgumentError('`EncodedValue.typeOid` was not provided.');
@@ -297,7 +297,7 @@ class TypeRegistry {
// check for codec
final codec = _codecs[typeOid];
if (codec != null) {
final r = await codec.decode(value, context);
final r = codec.decode(value, context);
if (r != value && r is! UndecodedBytes) {
return r;
}
11 changes: 5 additions & 6 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
@@ -679,17 +679,16 @@ class _PgResultStreamSubscription
_scheduleStatement(() async {
connection._pending = this;

final encodedFutures = <Future<EncodedValue?>>[];
final encodedValues = <EncodedValue?>[];
final context = connection.codecContext;
for (final e in statement.parameters) {
if (e.isSqlNull) {
encodedFutures.add(Future.value(null));
encodedValues.add(null);
continue;
}
final f = context.typeRegistry.encode(e, context);
encodedFutures.add(f);
encodedValues.add(f);
}
final encodedValues = await Future.wait(encodedFutures);

connection._channel.sink.add(AggregatedClientMessage([
BindMessage(
@@ -806,15 +805,15 @@ class _PgResultStreamSubscription
sqlNulls ??= List<bool>.filled(columnCount, false);
sqlNulls[i] = true;
}
final futureValue = context.typeRegistry.decode(
final futureOr = context.typeRegistry.decode(
EncodedValue(
input,
format: EncodingFormat.fromBinaryFlag(field.isBinaryEncoding),
typeOid: field.typeOid,
),
context,
);
futures.add(futureValue);
futures.add(futureOr is Future ? futureOr : Future.value(futureOr));
}
final values = await Future.wait(futures);

4 changes: 2 additions & 2 deletions lib/src/v3/protocol.dart
Original file line number Diff line number Diff line change
@@ -69,8 +69,8 @@ StreamTransformer<Uint8List, ServerMessage> _readMessages(
}
}

Future<void> handleChunk(Uint8List bytes) async {
await framer.addBytes(bytes);
void handleChunk(Uint8List bytes) {
framer.addBytes(bytes);
emitFinishedMessages();
}

95 changes: 45 additions & 50 deletions test/framer_test.dart
Original file line number Diff line number Diff line change
@@ -14,12 +14,12 @@ void main() {
framer = MessageFramer(CodecContext.withDefaults());
});

tearDown(() async {
await flush(framer);
tearDown(() {
flush(framer);
});

test('Perfectly sized message in one buffer', () async {
await framer.addBytes(bufferWithMessages([
test('Perfectly sized message in one buffer', () {
framer.addBytes(bufferWithMessages([
messageWithBytes([1, 2, 3], 1)
]));

@@ -29,8 +29,8 @@ void main() {
]);
});

test('Two perfectly sized messages in one buffer', () async {
await framer.addBytes(bufferWithMessages([
test('Two perfectly sized messages in one buffer', () {
framer.addBytes(bufferWithMessages([
messageWithBytes([1, 2, 3], 1),
messageWithBytes([1, 2, 3, 4], 2)
]));
@@ -42,50 +42,49 @@ void main() {
]);
});

test('Header fragment', () async {
test('Header fragment', () {
final message = messageWithBytes([1, 2, 3], 1);
final fragments = fragmentedMessageBuffer(message, 2);
await framer.addBytes(fragments.first);
framer.addBytes(fragments.first);
expect(framer.messageQueue, isEmpty);

await framer.addBytes(fragments.last);
framer.addBytes(fragments.last);

final messages = framer.messageQueue.toList();
expect(messages, [
UnknownMessage(1, Uint8List.fromList([1, 2, 3]))
]);
});

test('Two header fragments', () async {
test('Two header fragments', () {
final message = messageWithBytes([1, 2, 3], 1);
final fragments = fragmentedMessageBuffer(message, 2);
final moreFragments = fragmentedMessageBuffer(fragments.first, 1);

await framer.addBytes(moreFragments.first);
framer.addBytes(moreFragments.first);
expect(framer.messageQueue, isEmpty);

await framer.addBytes(moreFragments.last);
framer.addBytes(moreFragments.last);
expect(framer.messageQueue, isEmpty);

await framer.addBytes(fragments.last);
framer.addBytes(fragments.last);

final messages = framer.messageQueue.toList();
expect(messages, [
UnknownMessage(1, Uint8List.fromList([1, 2, 3])),
]);
});

test('One message + header fragment', () async {
test('One message + header fragment', () {
final message1 = messageWithBytes([1, 2, 3], 1);
final message2 = messageWithBytes([2, 2, 3], 2);
final message2Fragments = fragmentedMessageBuffer(message2, 3);

await framer
.addBytes(bufferWithMessages([message1, message2Fragments.first]));
framer.addBytes(bufferWithMessages([message1, message2Fragments.first]));

expect(framer.messageQueue.length, 1);

await framer.addBytes(message2Fragments.last);
framer.addBytes(message2Fragments.last);

final messages = framer.messageQueue.toList();
expect(messages, [
@@ -94,17 +93,16 @@ void main() {
]);
});

test('Message + header, missing rest of buffer', () async {
test('Message + header, missing rest of buffer', () {
final message1 = messageWithBytes([1, 2, 3], 1);
final message2 = messageWithBytes([2, 2, 3], 2);
final message2Fragments = fragmentedMessageBuffer(message2, 5);

await framer
.addBytes(bufferWithMessages([message1, message2Fragments.first]));
framer.addBytes(bufferWithMessages([message1, message2Fragments.first]));

expect(framer.messageQueue.length, 1);

await framer.addBytes(message2Fragments.last);
framer.addBytes(message2Fragments.last);

final messages = framer.messageQueue.toList();
expect(messages, [
@@ -113,13 +111,13 @@ void main() {
]);
});

test('Message body spans two packets', () async {
test('Message body spans two packets', () {
final message = messageWithBytes([1, 2, 3, 4, 5, 6, 7], 1);
final fragments = fragmentedMessageBuffer(message, 8);
await framer.addBytes(fragments.first);
framer.addBytes(fragments.first);
expect(framer.messageQueue, isEmpty);

await framer.addBytes(fragments.last);
framer.addBytes(fragments.last);

final messages = framer.messageQueue.toList();
expect(messages, [
@@ -129,15 +127,15 @@ void main() {

test(
'Message spans two packets, started in a packet that contained another message',
() async {
() {
final earlierMessage = messageWithBytes([1, 2], 0);
final message = messageWithBytes([1, 2, 3, 4, 5, 6, 7], 1);

await framer.addBytes(bufferWithMessages(
framer.addBytes(bufferWithMessages(
[earlierMessage, fragmentedMessageBuffer(message, 8).first]));
expect(framer.messageQueue, hasLength(1));

await framer.addBytes(fragmentedMessageBuffer(message, 8).last);
framer.addBytes(fragmentedMessageBuffer(message, 8).last);

final messages = framer.messageQueue.toList();
expect(messages, [
@@ -146,22 +144,21 @@ void main() {
]);
});

test('Message spans three packets, only part of header in the first',
() async {
test('Message spans three packets, only part of header in the first', () {
final earlierMessage = messageWithBytes([1, 2], 0);
final message =
messageWithBytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13], 1);

await framer.addBytes(bufferWithMessages(
framer.addBytes(bufferWithMessages(
[earlierMessage, fragmentedMessageBuffer(message, 3).first]));
expect(framer.messageQueue, hasLength(1));

await framer.addBytes(
framer.addBytes(
fragmentedMessageBuffer(fragmentedMessageBuffer(message, 3).last, 6)
.first);
expect(framer.messageQueue, hasLength(1));

await framer.addBytes(
framer.addBytes(
fragmentedMessageBuffer(fragmentedMessageBuffer(message, 3).last, 6)
.last);

@@ -173,43 +170,41 @@ void main() {
]);
});

test('Frame with no data', () async {
await framer.addBytes(bufferWithMessages([messageWithBytes([], 10)]));
test('Frame with no data', () {
framer.addBytes(bufferWithMessages([messageWithBytes([], 10)]));

final messages = framer.messageQueue.toList();
expect(messages, [UnknownMessage(10, Uint8List(0))]);
});

test('Identify CopyDoneMessage with length equals size length (min)',
() async {
test('Identify CopyDoneMessage with length equals size length (min)', () {
// min length
final length = [0, 0, 0, 4]; // min length (4 bytes) as 32-bit
final bytes = Uint8List.fromList([
SharedMessageId.copyDone,
...length,
]);
await framer.addBytes(bytes);
framer.addBytes(bytes);

final message = framer.messageQueue.toList().first;
expect(message, isA<CopyDoneMessage>());
expect((message as CopyDoneMessage).length, 4);
});

test('Identify CopyDoneMessage when length larger than size length',
() async {
test('Identify CopyDoneMessage when length larger than size length', () {
final length = (ByteData(4)..setUint32(0, 42)).buffer.asUint8List();
final bytes = Uint8List.fromList([
SharedMessageId.copyDone,
...length,
]);
await framer.addBytes(bytes);
framer.addBytes(bytes);

final message = framer.messageQueue.toList().first;
expect(message, isA<CopyDoneMessage>());
expect((message as CopyDoneMessage).length, 42);
});

test('Adds XLogDataMessage to queue', () async {
test('Adds XLogDataMessage to queue', () {
final bits64 = (ByteData(8)..setUint64(0, 42)).buffer.asUint8List();
// random data bytes
final dataBytes = [1, 2, 3, 4, 5, 6, 7, 8];
@@ -232,13 +227,13 @@ void main() {
...xlogDataMessage,
];

await framer.addBytes(Uint8List.fromList(copyDataBytes));
framer.addBytes(Uint8List.fromList(copyDataBytes));
final message = framer.messageQueue.toList().first;
expect(message, isA<XLogDataMessage>());
expect(message, isNot(isA<XLogDataLogicalMessage>()));
});

test('Adds XLogDataLogicalMessage with JsonMessage to queue', () async {
test('Adds XLogDataLogicalMessage with JsonMessage to queue', () {
final bits64 = (ByteData(8)..setUint64(0, 42)).buffer.asUint8List();

/// represent an empty json object so we should get a XLogDataLogicalMessage
@@ -264,13 +259,13 @@ void main() {
...xlogDataMessage,
];

await framer.addBytes(Uint8List.fromList(copyDataMessage));
framer.addBytes(Uint8List.fromList(copyDataMessage));
final message = framer.messageQueue.toList().first;
expect(message, isA<XLogDataLogicalMessage>());
expect((message as XLogDataLogicalMessage).message, isA<JsonMessage>());
});

test('Adds PrimaryKeepAliveMessage to queue', () async {
test('Adds PrimaryKeepAliveMessage to queue', () {
final bits64 = (ByteData(8)..setUint64(0, 42)).buffer.asUint8List();

/// This represent a raw [PrimaryKeepAliveMessage]
@@ -290,12 +285,12 @@ void main() {
...xlogDataMessage,
];

await framer.addBytes(Uint8List.fromList(copyDataMessage));
framer.addBytes(Uint8List.fromList(copyDataMessage));
final message = framer.messageQueue.toList().first;
expect(message, isA<PrimaryKeepAliveMessage>());
});

test('Adds raw CopyDataMessage for unknown stream message', () async {
test('Adds raw CopyDataMessage for unknown stream message', () {
final xlogDataBytes = <int>[
-1, // unknown id
];
@@ -310,7 +305,7 @@ void main() {
...xlogDataBytes,
];

await framer.addBytes(Uint8List.fromList(copyDataMessage));
framer.addBytes(Uint8List.fromList(copyDataMessage));
final message = framer.messageQueue.toList().first;
expect(message, isA<CopyDataMessage>());
});
@@ -336,9 +331,9 @@ Uint8List bufferWithMessages(List<List<int>> messages) {
return Uint8List.fromList(messages.expand((l) => l).toList());
}

Future<void> flush(MessageFramer framer) async {
void flush(MessageFramer framer) {
framer.messageQueue.clear();
await framer.addBytes(bufferWithMessages([
framer.addBytes(bufferWithMessages([
messageWithBytes([1, 2, 3], 1)
]));

0 comments on commit cc2d3e7

Please sign in to comment.