Skip to content

Commit

Permalink
JetStream basic usage (#96)
Browse files Browse the repository at this point in the history
Basic use case APIs, publish and consume methods implemented.
Consider this as the 'low-level' JetStream API where we can build
our more familiar NATS Client Simplified JetStream API.
  • Loading branch information
mtmk authored Jul 20, 2023
1 parent b0b5b12 commit eea71e6
Show file tree
Hide file tree
Showing 17 changed files with 432 additions and 78 deletions.
42 changes: 42 additions & 0 deletions src/NATS.Client.JetStream/Internal/JSErrorAwareJsonSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System.Buffers;
using System.Text.Json;
using NATS.Client.Core;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream.Internal;

internal sealed class JSErrorAwareJsonSerializer : INatsSerializer
{
public static readonly JSErrorAwareJsonSerializer Default = new();

public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value) =>
throw new NotSupportedException();

public T? Deserialize<T>(in ReadOnlySequence<byte> buffer)
{
// We need to determine what type we're deserializing into
// .NET 6 new APIs to the rescue: we can read the buffer once
// by deserializing into a document, inspect and using the new
// API deserialize to the final type from the document.
var jsonDocument = JsonDocument.Parse(buffer);
if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement))
{
var error = errorElement.Deserialize<ApiError>();
if (error == null)
throw new NatsJetStreamException("Can't parse JetStream error JSON payload");
throw new JSErrorException(error);
}

return jsonDocument.Deserialize<T>();
}

public object? Deserialize(in ReadOnlySequence<byte> buffer, Type type) =>
throw new NotSupportedException();
}

internal class JSErrorException : Exception
{
public JSErrorException(ApiError error) => Error = error;

public ApiError Error { get; }
}
175 changes: 153 additions & 22 deletions src/NATS.Client.JetStream/JSContext.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
using System.ComponentModel.DataAnnotations;
using System.Reflection.Emit;
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Text.Json.Serialization;
using NATS.Client.Core;
using NATS.Client.Core.Internal;
using NATS.Client.JetStream.Internal;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream;
Expand All @@ -15,23 +21,158 @@ public JSContext(NatsConnection nats, JSOptions options)
_options = options;
}

public async ValueTask<JSStream> CreateStream(Action<StreamCreateRequest> request)
public ValueTask<JSResponse<StreamInfo>> CreateStreamAsync(
StreamConfiguration request,
CancellationToken cancellationToken = default) =>
JSRequestAsync<StreamConfiguration, StreamInfo>(
subject: $"{_options.Prefix}.STREAM.CREATE.{request.Name}",
request,
cancellationToken);

public ValueTask<JSResponse<StreamMsgDeleteResponse>> DeleteStreamAsync(
string stream,
CancellationToken cancellationToken = default) =>
JSRequestAsync<object, StreamMsgDeleteResponse>(
subject: $"{_options.Prefix}.STREAM.DELETE.{stream}",
null,
cancellationToken);

public ValueTask<JSResponse<ConsumerInfo>> CreateConsumerAsync(
ConsumerCreateRequest request,
CancellationToken cancellationToken = default) =>
JSRequestAsync<ConsumerCreateRequest, ConsumerInfo>(
subject: $"{_options.Prefix}.CONSUMER.CREATE.{request.StreamName}.{request.Config.Name}",
request,
cancellationToken);

public async ValueTask<PubAckResponse> PublishAsync<T>(
string subject,
T? data,
NatsPubOpts opts = default,
CancellationToken cancellationToken = default)
{
await using var sub = await _nats.RequestSubAsync<T, PubAckResponse>(
subject: subject,
data: data,
requestOpts: opts,
replyOpts: default,
cancellationToken)
.ConfigureAwait(false);

if (await sub.Msgs.WaitToReadAsync(CancellationToken.None).ConfigureAwait(false))
{
if (sub.Msgs.TryRead(out var msg))
{
if (msg.Data == null)
{
throw new NatsJetStreamException("No response data received");
}

return msg.Data;
}
}

if (sub.EndReason == NatsSubEndReason.Cancelled)
{
throw new OperationCanceledException("Publishing inbox subscription was cancelled");
}

if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null })
{
throw sub.Exception;
}

throw new NatsJetStreamException("No response received");
}

public async IAsyncEnumerable<NatsMsg> ConsumeAsync(
string stream,
string consumer,
ConsumerGetnextRequest request,
NatsSubOpts requestOpts = default,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var inbox = $"_INBOX.{Guid.NewGuid():n}";

await using var sub = await _nats.SubAsync(
subject: inbox,
opts: requestOpts,
builder: NatsSubBuilder.Default,
cancellationToken);

await _nats.PubModelAsync(
subject: $"$JS.API.CONSUMER.MSG.NEXT.{stream}.{consumer}",
data: request,
serializer: JsonNatsSerializer.Default,
replyTo: inbox,
headers: default,
cancellationToken);

await foreach (var msg in sub.Msgs.ReadAllAsync(CancellationToken.None))
{
yield return msg;
}

if (sub.EndReason == NatsSubEndReason.Cancelled)
{
throw new OperationCanceledException("Consumer's inbox subscription was cancelled");
}

if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null })
{
throw sub.Exception;
}
}

internal async ValueTask<JSResponse<TResponse>> JSRequestAsync<TRequest, TResponse>(
string subject,
TRequest? request,
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class
{
var requestObj = new StreamCreateRequest();
request(requestObj);
if (request != null)
{
Validator.ValidateObject(request, new ValidationContext(request));
}

await using var sub = await _nats.RequestSubAsync<TRequest, TResponse>(
subject: subject,
data: request,
requestOpts: default,
replyOpts: new NatsSubOpts { Serializer = JSErrorAwareJsonSerializer.Default },
cancellationToken)
.ConfigureAwait(false);

if (await sub.Msgs.WaitToReadAsync(CancellationToken.None).ConfigureAwait(false))
{
if (sub.Msgs.TryRead(out var msg))
{
if (msg.Data == null)
{
throw new NatsJetStreamException("No response data received");
}

Validator.ValidateObject(requestObj, new ValidationContext(requestObj));
return new JSResponse<TResponse>(msg.Data, default);
}
}

var response =
await _nats.RequestAsync<StreamCreateRequest, StreamCreateResponse>(
$"{_options.Prefix}.STREAM.CREATE.{requestObj.Name}",
requestObj);
if (sub.EndReason == NatsSubEndReason.Cancelled)
{
throw new OperationCanceledException("Request's inbox subscription was cancelled");
}

// TODO: Better error handling
if (response?.Data == null)
throw new NatsJetStreamException("No response received");
if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null })
{
if (sub.Exception is NatsSubException { InnerException: JSErrorException jsError })
{
return new JSResponse<TResponse>(default, jsError.Error);
}

return new JSStream(response.Value.Data);
throw sub.Exception;
}

throw new NatsJetStreamException("No response received");
}
}

Expand All @@ -52,13 +193,3 @@ public record JSOptions
{
public string Prefix { get; init; } = "$JS.API";
}

public class JSStream
{
public JSStream(StreamCreateResponse response)
{
Response = response;
}

public StreamCreateResponse Response { get; }
}
22 changes: 22 additions & 0 deletions src/NATS.Client.JetStream/JSResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream;

/// <summary>
/// JetStream response including an optional error property encapsulating both successful and failed calls.
/// </summary>
/// <typeparam name="T">JetStream response type</typeparam>
public readonly struct JSResponse<T>
{
internal JSResponse(T? response, ApiError? error)
{
Response = response;
Error = error;
}

public T? Response { get; }

public ApiError? Error { get; }

public bool Success => Error == null && Response != null;
}
4 changes: 2 additions & 2 deletions src/NATS.Client.JetStream/Models/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public record ConsumerConfiguration
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
[System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)]
[System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))]
public ConsumerConfigurationAckPolicy AckPolicy { get; set; } = NATS.Client.JetStream.Models.ConsumerConfigurationAckPolicy.None;
public ConsumerConfigurationAckPolicy AckPolicy { get; set; } = ConsumerConfigurationAckPolicy.none;

/// <summary>
/// How long (in nanoseconds) to allow messages to remain un-acknowledged before attempting redelivery
Expand Down Expand Up @@ -88,7 +88,7 @@ public record ConsumerConfiguration
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
[System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)]
[System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))]
public ConsumerConfigurationReplayPolicy ReplayPolicy { get; set; } = NATS.Client.JetStream.Models.ConsumerConfigurationReplayPolicy.Instant;
public ConsumerConfigurationReplayPolicy ReplayPolicy { get; set; } = NATS.Client.JetStream.Models.ConsumerConfigurationReplayPolicy.instant;

[System.Text.Json.Serialization.JsonPropertyName("sample_freq")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
namespace NATS.Client.JetStream.Models;

// TODO: enum member naming with JSON serialization isn't working for some reason
#pragma warning disable SA1300
public enum ConsumerConfigurationAckPolicy
{
[System.Runtime.Serialization.EnumMember(Value = @"none")]
None = 0,
none = 0,

[System.Runtime.Serialization.EnumMember(Value = @"all")]
All = 1,
all = 1,

[System.Runtime.Serialization.EnumMember(Value = @"explicit")]
Explicit = 2,
@explicit = 2,
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
namespace NATS.Client.JetStream.Models;

// TODO: enum member naming with JSON serialization isn't working for some reason
#pragma warning disable SA1300
public enum ConsumerConfigurationDeliverPolicy
{
[System.Runtime.Serialization.EnumMember(Value = @"all")]
All = 0,
all = 0,

[System.Runtime.Serialization.EnumMember(Value = @"last")]
Last = 1,
last = 1,

[System.Runtime.Serialization.EnumMember(Value = @"new")]
New = 2,
@new = 2,

[System.Runtime.Serialization.EnumMember(Value = @"by_start_sequence")]
ByStartSequence = 3,
by_start_sequence = 3,

[System.Runtime.Serialization.EnumMember(Value = @"by_start_time")]
ByStartTime = 4,
by_start_time = 4,

[System.Runtime.Serialization.EnumMember(Value = @"last_per_subject")]
LastPerSubject = 5,
last_per_subject = 5,
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
namespace NATS.Client.JetStream.Models;

// TODO: enum member naming with JSON serialization isn't working for some reason
#pragma warning disable SA1300
public enum ConsumerConfigurationReplayPolicy
{
[System.Runtime.Serialization.EnumMember(Value = @"instant")]
Instant = 0,
instant = 0,

[System.Runtime.Serialization.EnumMember(Value = @"original")]
Original = 1,
original = 1,
}
8 changes: 4 additions & 4 deletions src/NATS.Client.JetStream/Models/StreamConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public record StreamConfiguration
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
[System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)]
[System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))]
public StreamConfigurationRetention Retention { get; set; } = NATS.Client.JetStream.Models.StreamConfigurationRetention.Limits;
public StreamConfigurationRetention Retention { get; set; } = NATS.Client.JetStream.Models.StreamConfigurationRetention.limits;

/// <summary>
/// How many Consumers can be defined for a given Stream. -1 for unlimited.
Expand Down Expand Up @@ -97,15 +97,15 @@ public record StreamConfiguration
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
[System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)]
[System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))]
public StreamConfigurationStorage Storage { get; set; } = NATS.Client.JetStream.Models.StreamConfigurationStorage.File;
public StreamConfigurationStorage Storage { get; set; } = StreamConfigurationStorage.file;

/// <summary>
/// Optional compression algorithm used for the Stream.
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("compression")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
[System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))]
public StreamConfigurationCompression Compression { get; set; } = NATS.Client.JetStream.Models.StreamConfigurationCompression.None;
public StreamConfigurationCompression Compression { get; set; } = NATS.Client.JetStream.Models.StreamConfigurationCompression.none;

/// <summary>
/// How many replicas to keep for each message.
Expand Down Expand Up @@ -135,7 +135,7 @@ public record StreamConfiguration
[System.Text.Json.Serialization.JsonPropertyName("discard")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
[System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))]
public StreamConfigurationDiscard Discard { get; set; } = NATS.Client.JetStream.Models.StreamConfigurationDiscard.Old;
public StreamConfigurationDiscard Discard { get; set; } = StreamConfigurationDiscard.old;

/// <summary>
/// The time window to track duplicate messages for, expressed in nanoseconds. 0 for default
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
namespace NATS.Client.JetStream.Models;

// TODO: enum member naming with JSON serialization isn't working for some reason
#pragma warning disable SA1300
public enum StreamConfigurationCompression
{
[System.Runtime.Serialization.EnumMember(Value = @"none")]
None = 0,
none = 0,

[System.Runtime.Serialization.EnumMember(Value = @"s2")]
S2 = 1,
s2 = 1,
}
Loading

0 comments on commit eea71e6

Please sign in to comment.