Skip to content

Andromeda.Framing

NamelessK1NG edited this page Mar 1, 2021 · 1 revision

The Andromeda.Framing library provide read and write mechanism around pipelines to handle any kind of length prefixed protocol.

Both mechanism works around the Frame and Frame<TMetadata> readonly structs. Here is a less verbose version of the frames (without docs) :

public readonly struct Frame
{
    public static readonly Frame Empty = new(ReadOnlySequence<byte>.Empty, default!);

    public Frame(ReadOnlyMemory<byte> payload, IFrameMetadata metadata) =>
        (Payload, Metadata) = (new ReadOnlySequence<byte>(payload), metadata);

    public Frame(ReadOnlySequence<byte> payload, IFrameMetadata metadata) =>
        (Payload, Metadata) = (payload, metadata);

    public ReadOnlySequence<byte> Payload { get; }
    public IFrameMetadata Metadata { get; }

    public bool IsPayloadEmpty() =>  Metadata.Length == 0 && Payload.IsEmpty;
    public bool IsEmptyFrame() => Metadata == default! && Payload.IsEmpty;
}

public readonly struct Frame<TMetadata> where TMetadata : class, IFrameMetadata
{
    public static readonly Frame<TMetadata> Empty = new(ReadOnlySequence<byte>.Empty, default!);

    public Frame(ReadOnlyMemory<byte> payload, TMetadata metadata) =>
        (Payload, Metadata) = (new ReadOnlySequence<byte>(payload), metadata);

    public Frame(ReadOnlySequence<byte> payload, TMetadata metadata) =>
        (Payload, Metadata) = (payload, metadata);

    public ReadOnlySequence<byte> Payload { get; }
    public TMetadata Metadata { get; }

    public bool IsPayloadEmpty() => Metadata.Length == 0 && Payload.IsEmpty;
    public bool IsEmptyFrame() => Metadata == default! && Payload.IsEmpty;
}

How it works

The library provides abstractions that must be implemented for a any kind of protocol such as IFrameMetadata, IMetadataEncoder, IMetadataDecoder, and an IMetadataParser which inherit from both two previous interfaces.

Here is the MetadataParser<TMetadata> base abstraction to implement :

public abstract class MetadataParser<TMetadata> : IMetadataParser where TMetadata : class, IFrameMetadata
{
    public bool TryParse(ref SequenceReader<byte> input, out IFrameMetadata? metadata)
    {
        if (!TryParse(ref input, out TMetadata? meta))
        {
             metadata = default;
            return false;
        }

        metadata = meta;
        return true;
    }


    public void Write(ref Span<byte> span, IFrameMetadata metadata) => Write(ref span, (TMetadata)metadata);
    public int GetLength(IFrameMetadata metadata) => GetLength((TMetadata)metadata);
    public int GetMetadataLength(IFrameMetadata metadata) => GetLength(metadata);
        
    protected abstract bool TryParse(ref SequenceReader<byte> input, out TMetadata? metadata);
    protected abstract void Write(ref Span<byte> span, TMetadata metadata);
    protected abstract int GetLength(TMetadata metadata);
}

Once you've a protocol-specific implementation of an IMetadataParser you can use the main mechanism provided by the IFrameEncoder and the IFrameDecoder interfaces.

The first mechanism is implemented by the PipeFrameEncoder class which can be thread synchronizeded (or not) to write frames in a PipeWriter or Stream. A typed implementation also exists to handle typed Frame<TMetadata>.

The second one is implemented by the PipeFrameDecoder class which provides methods to read single frames or consume them via an IAsyncEnumerable<Frame>. No thread synchronization is provided since read are mostly done with loops. A typed implementation also exists to handle typed Frame<TMetadata>.

Here is a pseudo-code sample use using these APIs with untyped decoder/encoder :

public class SomeProtocolHandler : ConnectionHandler
{
    public SomeProtocolHandler(IMetadataParser parser) => _someProtocolParser = parser;
    private readonly IMetadataParser _someProtocolParser;

    public async Task OnConnectedAsync(ConnectionContext connection)
    {
        await using var encoder = connection.Transport.Output.AsFrameEncoder(_someProtocolParser);
        await using var decoder = connection.Transport.Input.AsFrameDecoder(_someProtocolParser);
        
        try
        {
            await foreach(var frame in decoder.ReadFramesAsync(connection.ConnectionClosed))
            {
                var metadata = frame.Metadata as MyProtocolHeader ?? throw new InvalidOperationException("Invalid frame metadata !");
                var response = metadata.MessageId switch {
                    1 => encoder.WriteAsync(in someResponseFrame),
                    2 => encoder.WriteAsync(in anotherResponseFrame),
                    _ => throw new InvalidOperationException($"Message with Id={metadata.MessageId} is not handled !");
                }

                if(response.IsCompletedSuccessfully) continue;
                await response.ConfigureAwait(false);
            }
        }
        catch (ObjectDisposedException) { /* if the decoder throw this it means the connection closed, don't let this out */ }
    }
}
Clone this wiki locally