Skip to content

Commit

Permalink
Simpler message frame implementation. (#401)
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos authored Dec 21, 2024
1 parent 548cb7a commit a7fba1d
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 309 deletions.
122 changes: 70 additions & 52 deletions lib/src/message_window.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import 'dart:async';
import 'dart:collection';
import 'dart:typed_data';

import 'package:buffer/buffer.dart';
Expand Down Expand Up @@ -35,76 +34,95 @@ Map<int, _ServerMessageFn> _messageTypeMap = {
$N: NoticeMessage.parse,
};

class MessageFramer {
final CodecContext _codecContext;
late final _reader = PgByteDataReader(codecContext: _codecContext);
final messageQueue = Queue<ServerMessage>();
class _BytesFrame {
final int type;
final int length;
final Uint8List bytes;

MessageFramer(this._codecContext);
_BytesFrame(this.type, this.length, this.bytes);
}

int? _type;
int _expectedLength = 0;
StreamTransformer<Uint8List, ServerMessage> bytesToMessageParser() {
return StreamTransformer<Uint8List, ServerMessage>.fromHandlers(
handleData: (data, sink) {},
);
}

bool get _hasReadHeader => _type != null;
bool get _canReadHeader => _reader.remainingLength >= _headerByteSize;
final _emptyData = Uint8List(0);

bool get _isComplete =>
_expectedLength == 0 || _expectedLength <= _reader.remainingLength;
class _BytesToFrameParser
extends StreamTransformerBase<Uint8List, _BytesFrame> {
final CodecContext _codecContext;

Future<void> addBytes(Uint8List bytes) async {
_reader.add(bytes);
_BytesToFrameParser(this._codecContext);

while (true) {
if (!_hasReadHeader && _canReadHeader) {
_type = _reader.readUint8();
_expectedLength = _reader.readUint32() - 4;
}
@override
Stream<_BytesFrame> bind(Stream<Uint8List> stream) async* {
final reader = PgByteDataReader(codecContext: _codecContext);

// special case
if (_type == SharedMessageId.copyDone) {
// unlike other messages, CopyDoneMessage only takes the length as an
// argument (must be the full length including the length bytes)
final msg = CopyDoneMessage(_expectedLength + 4);
_addMsg(msg);
continue;
}
int? type;
int expectedLength = 0;

if (_hasReadHeader && _isComplete) {
final msgMaker = _messageTypeMap[_type];
if (msgMaker == null) {
_addMsg(UnknownMessage(_type!, _reader.read(_expectedLength)));
continue;
await for (final bytes in stream) {
reader.add(bytes);

while (true) {
if (type == null && reader.remainingLength >= _headerByteSize) {
type = reader.readUint8();
expectedLength = reader.readUint32() - 4;
}

final targetRemainingLength = _reader.remainingLength - _expectedLength;
final msg = await msgMaker(_reader, _expectedLength);
if (_reader.remainingLength > targetRemainingLength) {
throw StateError(
'Message parser consumed more bytes than expected. type=$_type expectedLength=$_expectedLength');
// special case
if (type == SharedMessageId.copyDone) {
// unlike other messages, CopyDoneMessage only takes the length as an
// argument (must be the full length including the length bytes)
yield _BytesFrame(type!, expectedLength, _emptyData);
type = null;
expectedLength = 0;
continue;
}
// consume the rest of the message
if (_reader.remainingLength < targetRemainingLength) {
_reader.read(targetRemainingLength - _reader.remainingLength);

if (type != null && expectedLength <= reader.remainingLength) {
final data = reader.read(expectedLength);
yield _BytesFrame(type, expectedLength, data);
type = null;
expectedLength = 0;
continue;
}

_addMsg(msg);
continue;
break;
}

break;
}
}
}

void _addMsg(ServerMessage msg) {
messageQueue.add(msg);
_type = null;
_expectedLength = 0;
}
class BytesToMessageParser
extends StreamTransformerBase<Uint8List, ServerMessage> {
final CodecContext _codecContext;

BytesToMessageParser(this._codecContext);

bool get hasMessage => messageQueue.isNotEmpty;
@override
Stream<ServerMessage> bind(Stream<Uint8List> stream) {
return stream
.transform(_BytesToFrameParser(_codecContext))
.asyncMap((frame) async {
// special case
if (frame.type == SharedMessageId.copyDone) {
// unlike other messages, CopyDoneMessage only takes the length as an
// argument (must be the full length including the length bytes)
return CopyDoneMessage(frame.length + 4);
}

final msgMaker = _messageTypeMap[frame.type];
if (msgMaker == null) {
return UnknownMessage(frame.type, frame.bytes);
}

ServerMessage popMessage() {
return messageQueue.removeFirst();
return await msgMaker(
PgByteDataReader(codecContext: _codecContext)..add(frame.bytes),
frame.bytes.length);
});
}
}

Expand Down
62 changes: 1 addition & 61 deletions lib/src/v3/protocol.dart
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import 'dart:async';
import 'dart:typed_data';

import 'package:async/async.dart';
import 'package:postgres/src/types/codec.dart';
import 'package:stream_channel/stream_channel.dart';

import '../buffer.dart';
import '../message_window.dart';
import '../messages/client_messages.dart';
import '../messages/server_messages.dart';
import '../messages/shared_messages.dart';

export '../messages/client_messages.dart';
Expand Down Expand Up @@ -36,7 +32,7 @@ class AggregatedClientMessage extends ClientMessage {
StreamChannelTransformer<Message, List<int>> messageTransformer(
CodecContext codecContext) {
return StreamChannelTransformer(
_readMessages(codecContext),
BytesToMessageParser(codecContext),
StreamSinkTransformer.fromHandlers(
handleData: (message, out) {
if (message is! ClientMessage) {
Expand All @@ -52,59 +48,3 @@ StreamChannelTransformer<Message, List<int>> messageTransformer(
),
);
}

StreamTransformer<Uint8List, ServerMessage> _readMessages(
CodecContext codecContext) {
return StreamTransformer.fromBind((rawStream) {
return Stream.multi((listener) {
final framer = MessageFramer(codecContext);

var paused = false;

void emitFinishedMessages() {
while (framer.hasMessage) {
listener.addSync(framer.popMessage());

if (paused) break;
}
}

Future<void> handleChunk() async {
try {
// await framer.addBytes(bytes);
emitFinishedMessages();
} catch (e, st) {
listener.addErrorSync(e, st);
}
}

// Don't cancel this subscription on error! If the listener wants that,
// they'll unsubscribe in time after we forward it synchronously.
final rawSubscription = rawStream
// TODO: figure out a better way to handle multiple callbacks to framer
.asyncMap(framer.addBytes)
.listen((_) => handleChunk(), cancelOnError: false)
..onError(listener.addErrorSync)
..onDone(listener.closeSync);

listener.onPause = () {
paused = true;
rawSubscription.pause();
};

listener.onResume = () {
paused = false;
emitFinishedMessages();

if (!paused) {
rawSubscription.resume();
}
};

listener.onCancel = () {
paused = true;
rawSubscription.cancel();
};
});
});
}
Loading

0 comments on commit a7fba1d

Please sign in to comment.