From eea71e65282591ed5dfee65649be10e1ad767baf Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Fri, 21 Jul 2023 00:23:10 +0100 Subject: [PATCH] JetStream basic usage (#96) Basic use case APIs, publish and consume methods implemented. Consider this as the 'low-level' JetStream API where we can build our more familiar NATS Client Simplified JetStream API. --- .../Internal/JSErrorAwareJsonSerializer.cs | 42 +++++ src/NATS.Client.JetStream/JSContext.cs | 175 +++++++++++++++--- src/NATS.Client.JetStream/JSResponse.cs | 22 +++ .../Models/ConsumerConfiguration.cs | 4 +- .../Models/ConsumerConfigurationAckPolicy.cs | 8 +- .../ConsumerConfigurationDeliverPolicy.cs | 14 +- .../ConsumerConfigurationReplayPolicy.cs | 6 +- .../Models/StreamConfiguration.cs | 8 +- .../Models/StreamConfigurationCompression.cs | 6 +- .../Models/StreamConfigurationDiscard.cs | 6 +- .../Models/StreamConfigurationRetention.cs | 8 +- .../Models/StreamConfigurationStorage.cs | 6 +- .../Models/StreamCreateResponse.cs | 5 +- .../JetStreamTest.cs | 165 ++++++++++++++--- .../NATS.Client.TestUtilities.csproj | 8 +- tests/NATS.Client.TestUtilities/NatsServer.cs | 21 +++ .../NatsServerOptions.cs | 6 +- 17 files changed, 432 insertions(+), 78 deletions(-) create mode 100644 src/NATS.Client.JetStream/Internal/JSErrorAwareJsonSerializer.cs create mode 100644 src/NATS.Client.JetStream/JSResponse.cs diff --git a/src/NATS.Client.JetStream/Internal/JSErrorAwareJsonSerializer.cs b/src/NATS.Client.JetStream/Internal/JSErrorAwareJsonSerializer.cs new file mode 100644 index 000000000..8fc5e42da --- /dev/null +++ b/src/NATS.Client.JetStream/Internal/JSErrorAwareJsonSerializer.cs @@ -0,0 +1,42 @@ +using System.Buffers; +using System.Text.Json; +using NATS.Client.Core; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream.Internal; + +internal sealed class JSErrorAwareJsonSerializer : INatsSerializer +{ + public static readonly JSErrorAwareJsonSerializer Default = new(); + + public int Serialize(ICountableBufferWriter bufferWriter, T? value) => + throw new NotSupportedException(); + + public T? Deserialize(in ReadOnlySequence buffer) + { + // We need to determine what type we're deserializing into + // .NET 6 new APIs to the rescue: we can read the buffer once + // by deserializing into a document, inspect and using the new + // API deserialize to the final type from the document. + var jsonDocument = JsonDocument.Parse(buffer); + if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement)) + { + var error = errorElement.Deserialize(); + if (error == null) + throw new NatsJetStreamException("Can't parse JetStream error JSON payload"); + throw new JSErrorException(error); + } + + return jsonDocument.Deserialize(); + } + + public object? Deserialize(in ReadOnlySequence buffer, Type type) => + throw new NotSupportedException(); +} + +internal class JSErrorException : Exception +{ + public JSErrorException(ApiError error) => Error = error; + + public ApiError Error { get; } +} diff --git a/src/NATS.Client.JetStream/JSContext.cs b/src/NATS.Client.JetStream/JSContext.cs index e35e9ed33..7bf2b05d4 100644 --- a/src/NATS.Client.JetStream/JSContext.cs +++ b/src/NATS.Client.JetStream/JSContext.cs @@ -1,5 +1,11 @@ using System.ComponentModel.DataAnnotations; +using System.Reflection.Emit; +using System.Runtime.CompilerServices; +using System.Text.Json; +using System.Text.Json.Serialization; using NATS.Client.Core; +using NATS.Client.Core.Internal; +using NATS.Client.JetStream.Internal; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream; @@ -15,23 +21,158 @@ public JSContext(NatsConnection nats, JSOptions options) _options = options; } - public async ValueTask CreateStream(Action request) + public ValueTask> CreateStreamAsync( + StreamConfiguration request, + CancellationToken cancellationToken = default) => + JSRequestAsync( + subject: $"{_options.Prefix}.STREAM.CREATE.{request.Name}", + request, + cancellationToken); + + public ValueTask> DeleteStreamAsync( + string stream, + CancellationToken cancellationToken = default) => + JSRequestAsync( + subject: $"{_options.Prefix}.STREAM.DELETE.{stream}", + null, + cancellationToken); + + public ValueTask> CreateConsumerAsync( + ConsumerCreateRequest request, + CancellationToken cancellationToken = default) => + JSRequestAsync( + subject: $"{_options.Prefix}.CONSUMER.CREATE.{request.StreamName}.{request.Config.Name}", + request, + cancellationToken); + + public async ValueTask PublishAsync( + string subject, + T? data, + NatsPubOpts opts = default, + CancellationToken cancellationToken = default) + { + await using var sub = await _nats.RequestSubAsync( + subject: subject, + data: data, + requestOpts: opts, + replyOpts: default, + cancellationToken) + .ConfigureAwait(false); + + if (await sub.Msgs.WaitToReadAsync(CancellationToken.None).ConfigureAwait(false)) + { + if (sub.Msgs.TryRead(out var msg)) + { + if (msg.Data == null) + { + throw new NatsJetStreamException("No response data received"); + } + + return msg.Data; + } + } + + if (sub.EndReason == NatsSubEndReason.Cancelled) + { + throw new OperationCanceledException("Publishing inbox subscription was cancelled"); + } + + if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) + { + throw sub.Exception; + } + + throw new NatsJetStreamException("No response received"); + } + + public async IAsyncEnumerable ConsumeAsync( + string stream, + string consumer, + ConsumerGetnextRequest request, + NatsSubOpts requestOpts = default, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var inbox = $"_INBOX.{Guid.NewGuid():n}"; + + await using var sub = await _nats.SubAsync( + subject: inbox, + opts: requestOpts, + builder: NatsSubBuilder.Default, + cancellationToken); + + await _nats.PubModelAsync( + subject: $"$JS.API.CONSUMER.MSG.NEXT.{stream}.{consumer}", + data: request, + serializer: JsonNatsSerializer.Default, + replyTo: inbox, + headers: default, + cancellationToken); + + await foreach (var msg in sub.Msgs.ReadAllAsync(CancellationToken.None)) + { + yield return msg; + } + + if (sub.EndReason == NatsSubEndReason.Cancelled) + { + throw new OperationCanceledException("Consumer's inbox subscription was cancelled"); + } + + if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) + { + throw sub.Exception; + } + } + + internal async ValueTask> JSRequestAsync( + string subject, + TRequest? request, + CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class { - var requestObj = new StreamCreateRequest(); - request(requestObj); + if (request != null) + { + Validator.ValidateObject(request, new ValidationContext(request)); + } + + await using var sub = await _nats.RequestSubAsync( + subject: subject, + data: request, + requestOpts: default, + replyOpts: new NatsSubOpts { Serializer = JSErrorAwareJsonSerializer.Default }, + cancellationToken) + .ConfigureAwait(false); + + if (await sub.Msgs.WaitToReadAsync(CancellationToken.None).ConfigureAwait(false)) + { + if (sub.Msgs.TryRead(out var msg)) + { + if (msg.Data == null) + { + throw new NatsJetStreamException("No response data received"); + } - Validator.ValidateObject(requestObj, new ValidationContext(requestObj)); + return new JSResponse(msg.Data, default); + } + } - var response = - await _nats.RequestAsync( - $"{_options.Prefix}.STREAM.CREATE.{requestObj.Name}", - requestObj); + if (sub.EndReason == NatsSubEndReason.Cancelled) + { + throw new OperationCanceledException("Request's inbox subscription was cancelled"); + } - // TODO: Better error handling - if (response?.Data == null) - throw new NatsJetStreamException("No response received"); + if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) + { + if (sub.Exception is NatsSubException { InnerException: JSErrorException jsError }) + { + return new JSResponse(default, jsError.Error); + } - return new JSStream(response.Value.Data); + throw sub.Exception; + } + + throw new NatsJetStreamException("No response received"); } } @@ -52,13 +193,3 @@ public record JSOptions { public string Prefix { get; init; } = "$JS.API"; } - -public class JSStream -{ - public JSStream(StreamCreateResponse response) - { - Response = response; - } - - public StreamCreateResponse Response { get; } -} diff --git a/src/NATS.Client.JetStream/JSResponse.cs b/src/NATS.Client.JetStream/JSResponse.cs new file mode 100644 index 000000000..fbd6b143d --- /dev/null +++ b/src/NATS.Client.JetStream/JSResponse.cs @@ -0,0 +1,22 @@ +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +/// +/// JetStream response including an optional error property encapsulating both successful and failed calls. +/// +/// JetStream response type +public readonly struct JSResponse +{ + internal JSResponse(T? response, ApiError? error) + { + Response = response; + Error = error; + } + + public T? Response { get; } + + public ApiError? Error { get; } + + public bool Success => Error == null && Response != null; +} diff --git a/src/NATS.Client.JetStream/Models/ConsumerConfiguration.cs b/src/NATS.Client.JetStream/Models/ConsumerConfiguration.cs index b7507b1f2..609d71044 100644 --- a/src/NATS.Client.JetStream/Models/ConsumerConfiguration.cs +++ b/src/NATS.Client.JetStream/Models/ConsumerConfiguration.cs @@ -52,7 +52,7 @@ public record ConsumerConfiguration [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)] [System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)] [System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))] - public ConsumerConfigurationAckPolicy AckPolicy { get; set; } = NATS.Client.JetStream.Models.ConsumerConfigurationAckPolicy.None; + public ConsumerConfigurationAckPolicy AckPolicy { get; set; } = ConsumerConfigurationAckPolicy.none; /// /// How long (in nanoseconds) to allow messages to remain un-acknowledged before attempting redelivery @@ -88,7 +88,7 @@ public record ConsumerConfiguration [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)] [System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)] [System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))] - public ConsumerConfigurationReplayPolicy ReplayPolicy { get; set; } = NATS.Client.JetStream.Models.ConsumerConfigurationReplayPolicy.Instant; + public ConsumerConfigurationReplayPolicy ReplayPolicy { get; set; } = NATS.Client.JetStream.Models.ConsumerConfigurationReplayPolicy.instant; [System.Text.Json.Serialization.JsonPropertyName("sample_freq")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] diff --git a/src/NATS.Client.JetStream/Models/ConsumerConfigurationAckPolicy.cs b/src/NATS.Client.JetStream/Models/ConsumerConfigurationAckPolicy.cs index db26693cf..72c1a0a7d 100644 --- a/src/NATS.Client.JetStream/Models/ConsumerConfigurationAckPolicy.cs +++ b/src/NATS.Client.JetStream/Models/ConsumerConfigurationAckPolicy.cs @@ -1,13 +1,15 @@ namespace NATS.Client.JetStream.Models; +// TODO: enum member naming with JSON serialization isn't working for some reason +#pragma warning disable SA1300 public enum ConsumerConfigurationAckPolicy { [System.Runtime.Serialization.EnumMember(Value = @"none")] - None = 0, + none = 0, [System.Runtime.Serialization.EnumMember(Value = @"all")] - All = 1, + all = 1, [System.Runtime.Serialization.EnumMember(Value = @"explicit")] - Explicit = 2, + @explicit = 2, } diff --git a/src/NATS.Client.JetStream/Models/ConsumerConfigurationDeliverPolicy.cs b/src/NATS.Client.JetStream/Models/ConsumerConfigurationDeliverPolicy.cs index d1f807fab..ac2206682 100644 --- a/src/NATS.Client.JetStream/Models/ConsumerConfigurationDeliverPolicy.cs +++ b/src/NATS.Client.JetStream/Models/ConsumerConfigurationDeliverPolicy.cs @@ -1,22 +1,24 @@ namespace NATS.Client.JetStream.Models; +// TODO: enum member naming with JSON serialization isn't working for some reason +#pragma warning disable SA1300 public enum ConsumerConfigurationDeliverPolicy { [System.Runtime.Serialization.EnumMember(Value = @"all")] - All = 0, + all = 0, [System.Runtime.Serialization.EnumMember(Value = @"last")] - Last = 1, + last = 1, [System.Runtime.Serialization.EnumMember(Value = @"new")] - New = 2, + @new = 2, [System.Runtime.Serialization.EnumMember(Value = @"by_start_sequence")] - ByStartSequence = 3, + by_start_sequence = 3, [System.Runtime.Serialization.EnumMember(Value = @"by_start_time")] - ByStartTime = 4, + by_start_time = 4, [System.Runtime.Serialization.EnumMember(Value = @"last_per_subject")] - LastPerSubject = 5, + last_per_subject = 5, } diff --git a/src/NATS.Client.JetStream/Models/ConsumerConfigurationReplayPolicy.cs b/src/NATS.Client.JetStream/Models/ConsumerConfigurationReplayPolicy.cs index ff6653953..174473424 100644 --- a/src/NATS.Client.JetStream/Models/ConsumerConfigurationReplayPolicy.cs +++ b/src/NATS.Client.JetStream/Models/ConsumerConfigurationReplayPolicy.cs @@ -1,10 +1,12 @@ namespace NATS.Client.JetStream.Models; +// TODO: enum member naming with JSON serialization isn't working for some reason +#pragma warning disable SA1300 public enum ConsumerConfigurationReplayPolicy { [System.Runtime.Serialization.EnumMember(Value = @"instant")] - Instant = 0, + instant = 0, [System.Runtime.Serialization.EnumMember(Value = @"original")] - Original = 1, + original = 1, } diff --git a/src/NATS.Client.JetStream/Models/StreamConfiguration.cs b/src/NATS.Client.JetStream/Models/StreamConfiguration.cs index ea4b50640..b42186e71 100644 --- a/src/NATS.Client.JetStream/Models/StreamConfiguration.cs +++ b/src/NATS.Client.JetStream/Models/StreamConfiguration.cs @@ -40,7 +40,7 @@ public record StreamConfiguration [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)] [System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)] [System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))] - public StreamConfigurationRetention Retention { get; set; } = NATS.Client.JetStream.Models.StreamConfigurationRetention.Limits; + public StreamConfigurationRetention Retention { get; set; } = NATS.Client.JetStream.Models.StreamConfigurationRetention.limits; /// /// How many Consumers can be defined for a given Stream. -1 for unlimited. @@ -97,7 +97,7 @@ public record StreamConfiguration [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)] [System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)] [System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))] - public StreamConfigurationStorage Storage { get; set; } = NATS.Client.JetStream.Models.StreamConfigurationStorage.File; + public StreamConfigurationStorage Storage { get; set; } = StreamConfigurationStorage.file; /// /// Optional compression algorithm used for the Stream. @@ -105,7 +105,7 @@ public record StreamConfiguration [System.Text.Json.Serialization.JsonPropertyName("compression")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] [System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))] - public StreamConfigurationCompression Compression { get; set; } = NATS.Client.JetStream.Models.StreamConfigurationCompression.None; + public StreamConfigurationCompression Compression { get; set; } = NATS.Client.JetStream.Models.StreamConfigurationCompression.none; /// /// How many replicas to keep for each message. @@ -135,7 +135,7 @@ public record StreamConfiguration [System.Text.Json.Serialization.JsonPropertyName("discard")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] [System.Text.Json.Serialization.JsonConverter(typeof(System.Text.Json.Serialization.JsonStringEnumConverter))] - public StreamConfigurationDiscard Discard { get; set; } = NATS.Client.JetStream.Models.StreamConfigurationDiscard.Old; + public StreamConfigurationDiscard Discard { get; set; } = StreamConfigurationDiscard.old; /// /// The time window to track duplicate messages for, expressed in nanoseconds. 0 for default diff --git a/src/NATS.Client.JetStream/Models/StreamConfigurationCompression.cs b/src/NATS.Client.JetStream/Models/StreamConfigurationCompression.cs index 3abbbbc7c..cf90eef51 100644 --- a/src/NATS.Client.JetStream/Models/StreamConfigurationCompression.cs +++ b/src/NATS.Client.JetStream/Models/StreamConfigurationCompression.cs @@ -1,10 +1,12 @@ namespace NATS.Client.JetStream.Models; +// TODO: enum member naming with JSON serialization isn't working for some reason +#pragma warning disable SA1300 public enum StreamConfigurationCompression { [System.Runtime.Serialization.EnumMember(Value = @"none")] - None = 0, + none = 0, [System.Runtime.Serialization.EnumMember(Value = @"s2")] - S2 = 1, + s2 = 1, } diff --git a/src/NATS.Client.JetStream/Models/StreamConfigurationDiscard.cs b/src/NATS.Client.JetStream/Models/StreamConfigurationDiscard.cs index a9f7159cb..5e826b664 100644 --- a/src/NATS.Client.JetStream/Models/StreamConfigurationDiscard.cs +++ b/src/NATS.Client.JetStream/Models/StreamConfigurationDiscard.cs @@ -1,10 +1,12 @@ namespace NATS.Client.JetStream.Models; +// TODO: enum member naming with JSON serialization isn't working for some reason +#pragma warning disable SA1300 public enum StreamConfigurationDiscard { [System.Runtime.Serialization.EnumMember(Value = @"old")] - Old = 0, + old = 0, [System.Runtime.Serialization.EnumMember(Value = @"new")] - New = 1, + @new = 1, } diff --git a/src/NATS.Client.JetStream/Models/StreamConfigurationRetention.cs b/src/NATS.Client.JetStream/Models/StreamConfigurationRetention.cs index 141992c40..957f7628c 100644 --- a/src/NATS.Client.JetStream/Models/StreamConfigurationRetention.cs +++ b/src/NATS.Client.JetStream/Models/StreamConfigurationRetention.cs @@ -1,13 +1,15 @@ namespace NATS.Client.JetStream.Models; +// TODO: enum member naming with JSON serialization isn't working for some reason +#pragma warning disable SA1300 public enum StreamConfigurationRetention { [System.Runtime.Serialization.EnumMember(Value = @"limits")] - Limits = 0, + limits = 0, [System.Runtime.Serialization.EnumMember(Value = @"interest")] - Interest = 1, + interest = 1, [System.Runtime.Serialization.EnumMember(Value = @"workqueue")] - Workqueue = 2, + workqueue = 2, } diff --git a/src/NATS.Client.JetStream/Models/StreamConfigurationStorage.cs b/src/NATS.Client.JetStream/Models/StreamConfigurationStorage.cs index b326b5bec..fbe57b6f4 100644 --- a/src/NATS.Client.JetStream/Models/StreamConfigurationStorage.cs +++ b/src/NATS.Client.JetStream/Models/StreamConfigurationStorage.cs @@ -1,10 +1,12 @@ namespace NATS.Client.JetStream.Models; +// TODO: enum member naming with JSON serialization isn't working for some reason +#pragma warning disable SA1300 public enum StreamConfigurationStorage { [System.Runtime.Serialization.EnumMember(Value = @"file")] - File = 0, + file = 0, [System.Runtime.Serialization.EnumMember(Value = @"memory")] - Memory = 1, + memory = 1, } diff --git a/src/NATS.Client.JetStream/Models/StreamCreateResponse.cs b/src/NATS.Client.JetStream/Models/StreamCreateResponse.cs index 9ab475b19..940b1d396 100644 --- a/src/NATS.Client.JetStream/Models/StreamCreateResponse.cs +++ b/src/NATS.Client.JetStream/Models/StreamCreateResponse.cs @@ -1,9 +1,12 @@ +using System.Text.Json; +using NATS.Client.Core; +using NATS.Client.Core.Internal; + namespace NATS.Client.JetStream.Models; /// /// A response from the JetStream $JS.API.STREAM.CREATE API /// - public record StreamCreateResponse : StreamInfo { } diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs index 1ee3a07dd..4da24895a 100644 --- a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs @@ -1,5 +1,8 @@ -using Microsoft.Extensions.Logging; +using System.Buffers; +using System.Text; +using Microsoft.Extensions.Logging; using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; namespace NATS.Client.Core.Tests; @@ -13,39 +16,155 @@ public class JetStreamTest public async Task Create_stream_test() { await using var server = new NatsServer(new NullOutputHelper(), new NatsServerOptionsBuilder().UseTransport(TransportType.Tcp).UseJetStream().Build()); - await using var nats = server.CreateClientConnection(); + var nats = server.CreateClientConnection(); - // Create stream + // Happy user { var context = new JSContext(nats, new JSOptions()); - var stream = await context.CreateStream(request: stream => + + // Create stream + var response = await context.CreateStreamAsync(request: new StreamConfiguration { - stream.Name = "events"; - stream.Subjects = new[] { "events" }; + Name = "events", Subjects = new[] { "events.*" }, }); - _output.WriteLine($"RCV: {stream.Response}"); - } + Assert.True(response.Success); + Assert.Null(response.Error); + Assert.NotNull(response.Response); + Assert.Equal("events", response.Response.Config.Name); - // Handle exceptions - { - var context = new JSContext(nats, new JSOptions()); - try + // Create consumer + var consumerInfo = await context.CreateConsumerAsync(new ConsumerCreateRequest + { + StreamName = "events", + Config = new ConsumerConfiguration + { + Name = "consumer1", + DurableName = "consumer1", + + // Turn on ACK so we can test them below + AckPolicy = ConsumerConfigurationAckPolicy.@explicit, + + // Effectively set message expiry for the consumer + // so that unacknowledged messages can be put back into + // the consumer to be delivered again (in a sense). + // This is to make below consumer tests work. + AckWait = 2_000_000_000, // 2 seconds + }, + }); + Assert.True(consumerInfo.Success); + Assert.Null(consumerInfo.Error); + Assert.NotNull(consumerInfo.Response); + Assert.Equal("events", consumerInfo.Response.StreamName); + Assert.Equal("consumer1", consumerInfo.Response.Config.Name); + + // Publish + PubAckResponse ack; + ack = await context.PublishAsync("events.foo", new TestData { Test = 1 }); + Assert.Null(ack.Error); + Assert.Equal("events", ack.Stream); + Assert.Equal(1, ack.Seq); + Assert.False(ack.Duplicate); + + // Message ID + ack = await context.PublishAsync("events.foo", new TestData { Test = 2 }, new NatsPubOpts + { + Headers = new NatsHeaders { { "Nats-Msg-Id", "test2" } }, + }); + Assert.Null(ack.Error); + Assert.Equal("events", ack.Stream); + Assert.Equal(2, ack.Seq); + Assert.False(ack.Duplicate); + + // Duplicate + ack = await context.PublishAsync("events.foo", new TestData { Test = 2 }, new NatsPubOpts + { + Headers = new NatsHeaders { { "Nats-Msg-Id", "test2" } }, + }); + Assert.Null(ack.Error); + Assert.Equal("events", ack.Stream); + Assert.Equal(2, ack.Seq); + Assert.True(ack.Duplicate); + + // Consume + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var messages = new List(); + await foreach (var msg in context.ConsumeAsync( + stream: "events", + consumer: "consumer1", + request: new ConsumerGetnextRequest { Batch = 100 }, + requestOpts: new NatsSubOpts { CanBeCancelled = true }, + cancellationToken: cts.Token)) { - var stream = await context.CreateStream(request: stream => + messages.Add(msg); + + // Only ACK one message so we can consume again + if (messages.Count == 1) + { + await msg.ReplyAsync(new ReadOnlySequence("+ACK"u8.ToArray()), cancellationToken: cts.Token); + } + + if (messages.Count == 2) { - stream.Name = "events"; - stream.Subjects = new[] { "events" }; - }); - _output.WriteLine($"RCV: {stream.Response}"); + break; + } } - catch (NatsSubException e) + + Assert.Equal(2, messages.Count); + Assert.Equal("events.foo", messages[0].Subject); + Assert.Equal("events.foo", messages[1].Subject); + + // Consume the unacknowledged message + await foreach (var msg in context.ConsumeAsync( + stream: "events", + consumer: "consumer1", + request: new ConsumerGetnextRequest { Batch = 100 }, + requestOpts: new NatsSubOpts { CanBeCancelled = true }, + cancellationToken: cts.Token)) { - var payload = e.Payload.Dump(); - var headers = e.Headers.Dump(); - _output.WriteLine($"{e}"); - _output.WriteLine($"headers: {headers}"); - _output.WriteLine($"payload: {payload}"); + Assert.Equal("events.foo", msg.Subject); + break; } } + + // Handle errors + { + var context = new JSContext(nats, new JSOptions()); + + var response = await context.CreateStreamAsync(request: new StreamConfiguration + { + Name = "events2", Subjects = new[] { "events.*" }, + }); + Assert.False(response.Success); + Assert.Null(response.Response); + Assert.NotNull(response.Error); + Assert.Equal(400, response.Error.Code); + + // subjects overlap with an existing stream + Assert.Equal(10065, response.Error.ErrCode); + } + + // Delete stream + { + var context = new JSContext(nats, new JSOptions()); + JSResponse response; + + // Success + response = await context.DeleteStreamAsync("events"); + Assert.True(response.Success); + Assert.True(response.Response!.Success); + + // Error + response = await context.DeleteStreamAsync("events2"); + Assert.False(response.Success); + Assert.Equal(404, response.Error!.Code); + + // stream not found + Assert.Equal(10059, response.Error!.ErrCode); + } } } + +public class TestData +{ + public int Test { get; set; } +} diff --git a/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj b/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj index 5d8f93b32..b31f2182f 100644 --- a/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj +++ b/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj @@ -4,8 +4,10 @@ net6.0 enable enable + false + $(NoWarn);CS8002 - + @@ -21,9 +23,9 @@ - + - + diff --git a/tests/NATS.Client.TestUtilities/NatsServer.cs b/tests/NATS.Client.TestUtilities/NatsServer.cs index e405f22e4..d5ef5dd02 100644 --- a/tests/NATS.Client.TestUtilities/NatsServer.cs +++ b/tests/NATS.Client.TestUtilities/NatsServer.cs @@ -28,6 +28,7 @@ public class NatsServer : IAsyncDisposable private readonly CancellationTokenSource _cancellationTokenSource = new(); private readonly string? _configFileName; + private readonly string? _jetStreamStoreDir; private readonly ITestOutputHelper _outputHelper; private readonly Task _processOut; private readonly Task _processErr; @@ -68,6 +69,14 @@ public NatsServer(ITestOutputHelper outputHelper, NatsServerOptions options) _outputHelper = outputHelper; _transportType = options.TransportType; Options = options; + + if (options.EnableJetStream) + { + _jetStreamStoreDir = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("n")); + Directory.CreateDirectory(_jetStreamStoreDir); + options.JetStreamStoreDir = _jetStreamStoreDir; + } + _configFileName = Path.GetTempFileName(); var config = options.ConfigFileContents; File.WriteAllText(_configFileName, config); @@ -169,6 +178,18 @@ public async ValueTask DisposeAsync() File.Delete(_configFileName); } + if (_jetStreamStoreDir != null) + { + try + { + Directory.Delete(_jetStreamStoreDir, true); + } + catch + { + /* best effort */ + } + } + if (Options.ServerDisposeReturnsPorts) { Options.Dispose(); diff --git a/tests/NATS.Client.TestUtilities/NatsServerOptions.cs b/tests/NATS.Client.TestUtilities/NatsServerOptions.cs index 7c8367801..32a6d094e 100644 --- a/tests/NATS.Client.TestUtilities/NatsServerOptions.cs +++ b/tests/NATS.Client.TestUtilities/NatsServerOptions.cs @@ -116,6 +116,8 @@ public NatsServerOptions() public bool EnableJetStream { get; init; } + public string? JetStreamStoreDir { get; set; } + public bool ServerDisposeReturnsPorts { get; init; } = true; public string? TlsClientCertFile { get; init; } @@ -190,9 +192,7 @@ public string ConfigFileContents if (EnableJetStream) { sb.AppendLine("jetstream {"); - var storeDir = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("n")); - Directory.CreateDirectory(storeDir); - sb.AppendLine($" store_dir: '{storeDir}'"); + sb.AppendLine($" store_dir: '{JetStreamStoreDir}'"); sb.AppendLine("}"); }