From 6f9df6ec704ca3c9932464adbdb7192fb7be6dc3 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 31 Aug 2023 18:29:58 +0100 Subject: [PATCH] JetStream pull consumer redesign (#115) * JetStream pull consumer redesign * Consume and fetch all * Max bytes * Header parser multi span bug fix * Msg size fix * Fixed tests and async sync bug * Fixed tests * Fixed tests * Fixed tests * Fixed tests * Trace test issue * Fixed warnings * Removed flappy no-ACK test This test was checking if the server was resending a message which was not ACKed, after the ack_wait times out. Resending unacknowledged messages is a server function we could test under more controlled test setup and it's this part of the test isn't testing NATS NET codebase. For some reason resend function isn't working most of the time when run under the GitHub Actions environment and creating false positives. Hence I decided to remove this part of the test to avoid unnecessary noise for the time being. * End fetch with internal clock This is a case where server request timeout might not reach the client because of network anomalies. * Default to fast ACK * Options -> Opts refactor * Ignore extra messages on consumer dispose * Propagate JS options for ACK * Test asserts * Flappy test trace * Public API fixes * JSMsg hides the NatsMsg and proxy relevant fields * Moved RawData to example * JSContext NewInbox is now internal --- NATS.Client.sln | 7 + NATS.Client.sln.DotSettings | 1 + sandbox/ConsoleApp/Program.cs | 2 +- .../Example.Core.PublishHeaders/Program.cs | 2 +- sandbox/Example.Core.PublishModel/Program.cs | 2 +- .../Example.Core.SubscribeHeaders/Program.cs | 2 +- .../Example.Core.SubscribeModel/Program.cs | 2 +- .../Program.cs | 2 +- sandbox/Example.Core.SubscribeRaw/Program.cs | 2 +- .../Example.JetStream.PullConsumer.csproj | 14 + .../Example.JetStream.PullConsumer/Program.cs | 145 ++++++++ .../Example.JetStream.PullConsumer/RawData.cs | 12 + .../RawDataSerializer.cs | 28 ++ sandbox/MicroBenchmark/Program.cs | 2 +- sandbox/NatsBenchmark/Program.cs | 12 +- src/NATS.Client.Core/HeaderParser.cs | 302 ++++------------ .../Internal/ClientOptions.cs | 22 +- .../Internal/DebuggingExtensions.cs | 4 +- .../NatsPipeliningWriteProtocolProcessor.cs | 8 +- .../Internal/NatsReadProtocolProcessor.cs | 4 +- .../Internal/SubscriptionManager.cs | 8 +- .../NatsConnection.Publish.cs | 2 +- .../NatsConnection.RequestReply.cs | 2 +- .../NatsConnection.RequestSub.cs | 4 +- .../NatsConnection.Subscribe.cs | 2 +- src/NATS.Client.Core/NatsConnection.Util.cs | 2 +- src/NATS.Client.Core/NatsConnection.cs | 76 ++-- src/NATS.Client.Core/NatsConnectionPool.cs | 18 +- src/NATS.Client.Core/NatsMsg.cs | 8 +- .../{NatsOptions.cs => NatsOpts.cs} | 11 +- src/NATS.Client.Core/NatsSubBase.cs | 23 +- .../NatsHostingExtensions.cs | 6 +- .../INatsJSSubConsume.cs | 5 + .../Internal/NatsJSConstants.cs | 3 + ...r.cs => NatsJSErrorAwareJsonSerializer.cs} | 10 +- .../Internal/NatsJSExtensionsInternal.cs | 2 +- .../Internal/NatsJSOpsDefaults.cs | 4 +- .../Internal/NatsJSSubBase.cs | 339 ------------------ .../Internal/NatsJSSubConsume.cs | 132 ------- src/NATS.Client.JetStream/NatsJSConsumer.cs | 156 ++++++-- src/NATS.Client.JetStream/NatsJSContext.cs | 22 +- src/NATS.Client.JetStream/NatsJSException.cs | 8 - src/NATS.Client.JetStream/NatsJSMsg.cs | 56 ++- .../NatsJSNotification.cs | 14 +- src/NATS.Client.JetStream/NatsJSOpts.cs | 55 ++- src/NATS.Client.JetStream/NatsJSSubConsume.cs | 336 +++++++++++++++++ src/NATS.Client.JetStream/NatsJSSubFetch.cs | 225 ++++++++++++ .../CancellationTest.cs | 4 +- .../NatsConnectionTest.Auth.cs | 20 +- .../NatsConnectionTest.cs | 6 +- .../NATS.Client.Core.Tests/NatsHeaderTest.cs | 20 ++ .../RequestReplyTest.cs | 2 +- .../SubscriptionTest.cs | 4 +- .../ConsumerConsumeTest.cs | 62 ++-- .../ConsumerFetchTest.cs | 10 +- .../ConsumerNextTest.cs | 11 +- .../ConsumerStateTest.cs | 50 +-- .../JetStreamTest.cs | 36 +- .../ManageConsumerTest.cs | 4 +- .../ManageStreamTest.cs | 4 +- tests/NATS.Client.TestUtilities/NatsServer.cs | 22 +- 61 files changed, 1325 insertions(+), 1034 deletions(-) create mode 100644 sandbox/Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj create mode 100644 sandbox/Example.JetStream.PullConsumer/Program.cs create mode 100644 sandbox/Example.JetStream.PullConsumer/RawData.cs create mode 100644 sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs rename src/NATS.Client.Core/{NatsOptions.cs => NatsOpts.cs} (92%) rename src/NATS.Client.JetStream/Internal/{JSErrorAwareJsonSerializer.cs => NatsJSErrorAwareJsonSerializer.cs} (77%) delete mode 100644 src/NATS.Client.JetStream/Internal/NatsJSSubBase.cs delete mode 100644 src/NATS.Client.JetStream/Internal/NatsJSSubConsume.cs create mode 100644 src/NATS.Client.JetStream/NatsJSSubConsume.cs create mode 100644 src/NATS.Client.JetStream/NatsJSSubFetch.cs diff --git a/NATS.Client.sln b/NATS.Client.sln index d55ba25e5..267b62a3f 100644 --- a/NATS.Client.sln +++ b/NATS.Client.sln @@ -65,6 +65,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Schema.Generation", "tools\ EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Perf", "tests\NATS.Client.Perf\NATS.Client.Perf.csproj", "{ADF66CBA-4F3E-4E91-9842-E194E3BC06A1}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.JetStream.PullConsumer", "sandbox\Example.JetStream.PullConsumer\Example.JetStream.PullConsumer.csproj", "{3A9FC281-3B81-4D63-A76B-E1127C1D2241}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -159,6 +161,10 @@ Global {ADF66CBA-4F3E-4E91-9842-E194E3BC06A1}.Debug|Any CPU.Build.0 = Debug|Any CPU {ADF66CBA-4F3E-4E91-9842-E194E3BC06A1}.Release|Any CPU.ActiveCfg = Release|Any CPU {ADF66CBA-4F3E-4E91-9842-E194E3BC06A1}.Release|Any CPU.Build.0 = Release|Any CPU + {3A9FC281-3B81-4D63-A76B-E1127C1D2241}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3A9FC281-3B81-4D63-A76B-E1127C1D2241}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3A9FC281-3B81-4D63-A76B-E1127C1D2241}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3A9FC281-3B81-4D63-A76B-E1127C1D2241}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -187,6 +193,7 @@ Global {90E5BF38-70C1-460A-9177-CE42815BDBF5} = {C526E8AB-739A-48D7-8FC4-048978C9B650} {B7DD4A9C-2D24-4772-951E-86A665C59ADF} = {BD234E2E-F51A-4B18-B8BE-8AF6D546BF87} {ADF66CBA-4F3E-4E91-9842-E194E3BC06A1} = {C526E8AB-739A-48D7-8FC4-048978C9B650} + {3A9FC281-3B81-4D63-A76B-E1127C1D2241} = {95A69671-16CA-4133-981C-CC381B7AAA30} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA} diff --git a/NATS.Client.sln.DotSettings b/NATS.Client.sln.DotSettings index f01f35132..dccecad88 100644 --- a/NATS.Client.sln.DotSettings +++ b/NATS.Client.sln.DotSettings @@ -1,6 +1,7 @@  ASCII CR + CRLF JS LF True diff --git a/sandbox/ConsoleApp/Program.cs b/sandbox/ConsoleApp/Program.cs index 6ddf37b60..87532bcc1 100644 --- a/sandbox/ConsoleApp/Program.cs +++ b/sandbox/ConsoleApp/Program.cs @@ -53,7 +53,7 @@ await conn.PublishAsync("foo", new Person(30, "bar")); // Options can configure `with` expression -var options = NatsOptions.Default with +var options = NatsOpts.Default with { Url = "nats://127.0.0.1:9999", LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Information), diff --git a/sandbox/Example.Core.PublishHeaders/Program.cs b/sandbox/Example.Core.PublishHeaders/Program.cs index 0bddcb201..2227d45c9 100644 --- a/sandbox/Example.Core.PublishHeaders/Program.cs +++ b/sandbox/Example.Core.PublishHeaders/Program.cs @@ -3,7 +3,7 @@ using NATS.Client.Core; var subject = "bar.xyz"; -var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; Print("[CON] Connecting...\n"); diff --git a/sandbox/Example.Core.PublishModel/Program.cs b/sandbox/Example.Core.PublishModel/Program.cs index ffa71474f..d458201d4 100644 --- a/sandbox/Example.Core.PublishModel/Program.cs +++ b/sandbox/Example.Core.PublishModel/Program.cs @@ -3,7 +3,7 @@ using NATS.Client.Core; var subject = "bar.xyz"; -var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; Print("[CON] Connecting...\n"); diff --git a/sandbox/Example.Core.SubscribeHeaders/Program.cs b/sandbox/Example.Core.SubscribeHeaders/Program.cs index d772588b2..b0d7bb6ed 100644 --- a/sandbox/Example.Core.SubscribeHeaders/Program.cs +++ b/sandbox/Example.Core.SubscribeHeaders/Program.cs @@ -5,7 +5,7 @@ using NATS.Client.Core; var subject = "bar.*"; -var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; Print("[CON] Connecting...\n"); diff --git a/sandbox/Example.Core.SubscribeModel/Program.cs b/sandbox/Example.Core.SubscribeModel/Program.cs index c99259803..66892f7d0 100644 --- a/sandbox/Example.Core.SubscribeModel/Program.cs +++ b/sandbox/Example.Core.SubscribeModel/Program.cs @@ -2,7 +2,7 @@ using NATS.Client.Core; var subject = "bar.*"; -var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; Print("[CON] Connecting...\n"); diff --git a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs index b0e9641da..2f91b5a8e 100644 --- a/sandbox/Example.Core.SubscribeQueueGroup/Program.cs +++ b/sandbox/Example.Core.SubscribeQueueGroup/Program.cs @@ -4,7 +4,7 @@ using NATS.Client.Core; var subject = "foo.*"; -var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; // --- // Worker 1 diff --git a/sandbox/Example.Core.SubscribeRaw/Program.cs b/sandbox/Example.Core.SubscribeRaw/Program.cs index 7b3b9991c..514c0ea79 100644 --- a/sandbox/Example.Core.SubscribeRaw/Program.cs +++ b/sandbox/Example.Core.SubscribeRaw/Program.cs @@ -3,7 +3,7 @@ using NATS.Client.Core; var subject = "foo.*"; -var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; +var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; Print("[CON] Connecting...\n"); diff --git a/sandbox/Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj b/sandbox/Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj new file mode 100644 index 000000000..c66fda896 --- /dev/null +++ b/sandbox/Example.JetStream.PullConsumer/Example.JetStream.PullConsumer.csproj @@ -0,0 +1,14 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + diff --git a/sandbox/Example.JetStream.PullConsumer/Program.cs b/sandbox/Example.JetStream.PullConsumer/Program.cs new file mode 100644 index 000000000..89da628b4 --- /dev/null +++ b/sandbox/Example.JetStream.PullConsumer/Program.cs @@ -0,0 +1,145 @@ +using System.Diagnostics; +using Example.JetStream.PullConsumer; +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using NATS.Client.JetStream; + +var cts = new CancellationTokenSource(); + +Console.CancelKeyPress += (_, e) => +{ + e.Cancel = true; + cts.Cancel(); +}; + +var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; + +await using var nats = new NatsConnection(options); + +var js = new NatsJSContext(nats); + +var consumer = await js.CreateConsumerAsync("s1", "c1"); + +var idle = TimeSpan.FromSeconds(15); +var expires = TimeSpan.FromSeconds(30); + +// int? maxMsgs = null; +// int? maxBytes = 128; +int? maxMsgs = 1000; +int? maxBytes = null; + +static void ErrorHandler(NatsJSNotification notification) +{ + Console.WriteLine($"Error: {notification}"); +} + +void Report(int i, Stopwatch sw, string data) +{ + Console.WriteLine(data); + if (i % 10000 == 0) + Console.WriteLine($"Received: {i / sw.Elapsed.TotalSeconds:f2} msgs/s [{i}] {sw.Elapsed}"); +} + +var consumeOpts = new NatsJSConsumeOpts +{ + MaxMsgs = maxMsgs, + MaxBytes = maxBytes, + Expires = expires, + IdleHeartbeat = idle, + Serializer = new RawDataSerializer(), + ErrorHandler = ErrorHandler, +}; + +var fetchOpts = new NatsJSFetchOpts +{ + MaxMsgs = maxMsgs, + MaxBytes = maxBytes, + Expires = expires, + IdleHeartbeat = idle, + Serializer = new RawDataSerializer(), + ErrorHandler = ErrorHandler, +}; + +var nextOpts = new NatsJSNextOpts +{ + Expires = expires, IdleHeartbeat = idle, Serializer = new RawDataSerializer(), ErrorHandler = ErrorHandler, +}; + +var stopwatch = Stopwatch.StartNew(); +var count = 0; + +try +{ + if (args.Length > 0 && args[0] == "fetch") + { + while (!cts.Token.IsCancellationRequested) + { + Console.WriteLine($"___\nFETCH {maxMsgs}"); + await using var sub = await consumer.FetchAsync(fetchOpts, cts.Token); + await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token)) + { + await msg.AckAsync(cancellationToken: cts.Token); + Report(++count, stopwatch, $"data: {msg.Data}"); + } + } + } + else if (args.Length > 0 && args[0] == "fetch-all") + { + while (!cts.Token.IsCancellationRequested) + { + Console.WriteLine($"___\nFETCH {maxMsgs}"); + await foreach (var msg in consumer.FetchAllAsync(fetchOpts, cts.Token)) + { + await msg.AckAsync(cancellationToken: cts.Token); + Report(++count, stopwatch, $"data: {msg.Data}"); + } + } + } + else if (args.Length > 0 && args[0] == "next") + { + while (!cts.Token.IsCancellationRequested) + { + Console.WriteLine("___\nNEXT"); + var next = await consumer.NextAsync(nextOpts, cts.Token); + if (next is { } msg) + { + await msg.AckAsync(cancellationToken: cts.Token); + Report(++count, stopwatch, $"data: {msg.Data}"); + } + } + } + else if (args.Length > 0 && args[0] == "consume") + { + Console.WriteLine("___\nCONSUME"); + await using var sub = await consumer.ConsumeAsync( + consumeOpts, + cts.Token); + + await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token)) + { + await msg.AckAsync(cancellationToken: cts.Token); + Report(++count, stopwatch, $"data: {msg.Data}"); + } + + // Console.WriteLine($"took {stopwatch.Elapsed}"); + // await nats.PingAsync(cts.Token); + } + else if (args.Length > 0 && args[0] == "consume-all") + { + Console.WriteLine("___\nCONSUME-ALL"); + await foreach (var msg in consumer.ConsumeAllAsync(consumeOpts, cts.Token)) + { + await msg.AckAsync(cancellationToken: cts.Token); + Report(++count, stopwatch, $"data: {msg.Data}"); + } + } + else + { + Console.WriteLine("Usage: dotnet run -- "); + } +} +catch (OperationCanceledException) +{ +} + +Console.WriteLine("Bye"); diff --git a/sandbox/Example.JetStream.PullConsumer/RawData.cs b/sandbox/Example.JetStream.PullConsumer/RawData.cs new file mode 100644 index 000000000..348147521 --- /dev/null +++ b/sandbox/Example.JetStream.PullConsumer/RawData.cs @@ -0,0 +1,12 @@ +using System.Text; + +namespace Example.JetStream.PullConsumer; + +public class RawData +{ + public RawData(byte[] buffer) => Buffer = buffer; + + public byte[] Buffer { get; } + + public override string ToString() => Encoding.ASCII.GetString(Buffer); +} diff --git a/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs b/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs new file mode 100644 index 000000000..c738fd0be --- /dev/null +++ b/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs @@ -0,0 +1,28 @@ +using System.Buffers; +using NATS.Client.Core; + +namespace Example.JetStream.PullConsumer; + +public class RawDataSerializer : INatsSerializer +{ + public int Serialize(ICountableBufferWriter bufferWriter, T? value) + { + if (value is RawData data) + { + bufferWriter.Write(data.Buffer); + return data.Buffer.Length; + } + + throw new Exception($"Can only work with '{typeof(RawData)}'"); + } + + public T? Deserialize(in ReadOnlySequence buffer) => (T?)Deserialize(buffer, typeof(T)); + + public object? Deserialize(in ReadOnlySequence buffer, Type type) + { + if (type != typeof(RawData)) + throw new Exception($"Can only work with '{typeof(RawData)}'"); + + return new RawData(buffer.ToArray()); + } +} diff --git a/sandbox/MicroBenchmark/Program.cs b/sandbox/MicroBenchmark/Program.cs index 82697fc1d..8cdd34abc 100644 --- a/sandbox/MicroBenchmark/Program.cs +++ b/sandbox/MicroBenchmark/Program.cs @@ -58,7 +58,7 @@ public async Task SetupAsync() var loggerFactory = provider.GetRequiredService(); var logger = loggerFactory.CreateLogger>(); - var options = NatsOptions.Default with + var options = NatsOpts.Default with { LoggerFactory = loggerFactory, Echo = true, diff --git a/sandbox/NatsBenchmark/Program.cs b/sandbox/NatsBenchmark/Program.cs index e78c0bbb3..7c71b73de 100644 --- a/sandbox/NatsBenchmark/Program.cs +++ b/sandbox/NatsBenchmark/Program.cs @@ -57,7 +57,7 @@ private void RunPubSubBenchmark(string testName, long testCount, long testSize, var loggerFactory = provider.GetRequiredService(); var logger = loggerFactory.CreateLogger>(); - var options = NatsOptions.Default with + var options = NatsOpts.Default with { // LoggerFactory = loggerFactory, UseThreadPoolCallback = false, @@ -132,7 +132,7 @@ private void RunPubSubBenchmarkBatch(string testName, long testCount, long testS var loggerFactory = provider.GetRequiredService(); var logger = loggerFactory.CreateLogger>(); - var options = NatsOptions.Default with + var options = NatsOpts.Default with { // LoggerFactory = loggerFactory, UseThreadPoolCallback = false, @@ -216,7 +216,7 @@ private void ProfilingRunPubSubBenchmarkAsync(string testName, long testCount, l var loggerFactory = provider.GetRequiredService(); var logger = loggerFactory.CreateLogger>(); - var options = NatsOptions.Default with + var options = NatsOpts.Default with { // LoggerFactory = loggerFactory, UseThreadPoolCallback = false, @@ -296,7 +296,7 @@ private void RunPubSubBenchmarkBatchRaw(string testName, long testCount, long te var loggerFactory = provider.GetRequiredService(); var logger = loggerFactory.CreateLogger>(); - var options = NatsOptions.Default with + var options = NatsOpts.Default with { // LoggerFactory = loggerFactory, UseThreadPoolCallback = false, @@ -394,7 +394,7 @@ private void RunPubSubBenchmarkPubSub2(string testName, long testCount, long tes var loggerFactory = provider.GetRequiredService(); var logger = loggerFactory.CreateLogger>(); - var options = NatsOptions.Default with + var options = NatsOpts.Default with { // LoggerFactory = loggerFactory, UseThreadPoolCallback = false, @@ -511,7 +511,7 @@ private void RunPubSubBenchmarkVector3(string testName, long testCount, bool dis var loggerFactory = provider.GetRequiredService(); var logger = loggerFactory.CreateLogger>(); - var options = NatsOptions.Default with + var options = NatsOpts.Default with { // LoggerFactory = loggerFactory, UseThreadPoolCallback = false, diff --git a/src/NATS.Client.Core/HeaderParser.cs b/src/NATS.Client.Core/HeaderParser.cs index cb74bfef7..aa843f8f5 100644 --- a/src/NATS.Client.Core/HeaderParser.cs +++ b/src/NATS.Client.Core/HeaderParser.cs @@ -19,282 +19,118 @@ public class HeaderParser private const byte ByteColon = (byte)':'; private const byte ByteSpace = (byte)' '; private const byte ByteTab = (byte)'\t'; + private static readonly byte[] ByteCRLF = { ByteCR, ByteLF }; private readonly Encoding _encoding; public HeaderParser(Encoding encoding) => _encoding = encoding; - public bool ParseHeaders(in SequenceReader reader, NatsHeaders headers) + public bool ParseHeaders(SequenceReader reader, NatsHeaders headers) { var isVersionLineRead = false; while (!reader.End) { - var span = reader.UnreadSpan; - while (span.Length > 0) + if (reader.TryReadTo(out ReadOnlySpan line, ByteCRLF)) { - var ch1 = (byte)0; - var ch2 = (byte)0; - var readAhead = 0; - - // Fast path, we're still looking at the same span - if (span.Length >= 2) - { - ch1 = span[0]; - ch2 = span[1]; - } - - // Possibly split across spans - else if (reader.TryRead(out ch1)) - { - // Note if we read ahead by 1 or 2 bytes - readAhead = reader.TryRead(out ch2) ? 2 : 1; - } - - if (ch1 == ByteCR) + if (!isVersionLineRead) { - // Check for final CRLF. - if (ch2 == ByteLF) + if (!TryParseHeaderLine(line, headers)) { - // If we got 2 bytes from the span directly so skip ahead 2 so that - // the reader's state matches what we expect - if (readAhead == 0) - { - reader.Advance(2); - } - - // Double CRLF found, so end of headers. - return true; + RejectRequestHeader(line); } - else if (readAhead == 1) - { - // Didn't read 2 bytes, reset the reader so we don't consume anything - reader.Rewind(1); - return false; - } - - // Headers don't end in CRLF line. - Debug.Assert(readAhead == 0 || readAhead == 2, "readAhead == 0 || readAhead == 2"); - throw new NatsException($"Protocol error: invalid headers, no ending CRLF+CRLF"); + isVersionLineRead = true; + continue; } - var length = 0; - - // We only need to look for the end if we didn't read ahead; otherwise there isn't enough in - // in the span to contain a header. - if (readAhead == 0) + if (line.Length > 0) { - length = span.IndexOfAny(ByteCR, ByteLF); - - // If not found length with be -1; casting to uint will turn it to uint.MaxValue - // which will be larger than any possible span.Length. This also serves to eliminate - // the bounds check for the next lookup of span[length] - if ((uint)length < (uint)span.Length) + if (!TryTakeSingleHeader(line, headers)) { - // Early memory read to hide latency - var expectedCR = span[length]; - - // Correctly has a CR, move to next - length++; - - if (expectedCR != ByteCR) - { - // Sequence needs to be CRLF not LF first. - RejectRequestHeader(span[..length]); - } - - if ((uint)length < (uint)span.Length) - { - // Early memory read to hide latency - var expectedLF = span[length]; - - // Correctly has a LF, move to next - length++; - - if (expectedLF != ByteLF || - length < 5 || - - // Exclude the CRLF from the headerLine and parse the header name:value pair - !TryTakeSingleHeader(span[..(length - 2)], headers, ref isVersionLineRead)) - { - // Sequence needs to be CRLF and not contain an inner CR not part of terminator. - // Less than min possible headerSpan of 5 bytes a:b\r\n - // Not parsable as a valid name:value header pair. - RejectRequestHeader(span[..length]); - } - - // Read the header successfully, skip the reader forward past the headerSpan. - span = span.Slice(length); - reader.Advance(length); - } - else - { - // No enough data, set length to 0. - length = 0; - } + RejectRequestHeader(line); } } - - // End found in current span - if (length > 0) - { - continue; - } - - // We moved the reader to look ahead 2 bytes so rewind the reader - if (readAhead > 0) - { - reader.Rewind(readAhead); - } - - length = ParseMultiSpanHeader(reader, headers, ref isVersionLineRead); - if (length < 0) - { - // Not there - return false; - } - - reader.Advance(length); - - // As we crossed spans set the current span to default - // so we move to the next span on the next iteration - span = default; } - } - - return false; - } - - private int ParseMultiSpanHeader(in SequenceReader reader, NatsHeaders headers, ref bool isVersionLineRead) - { - var currentSlice = reader.UnreadSequence; - var lineEndPosition = currentSlice.PositionOfAny(ByteCR, ByteLF); - - if (lineEndPosition == null) - { - // Not there. - return -1; - } - - SequencePosition lineEnd; - ReadOnlySpan headerSpan; - if (currentSlice.Slice(reader.Position, lineEndPosition.Value).Length == currentSlice.Length - 1) - { - // No enough data, so CRLF can't currently be there. - // However, we need to check the found char is CR and not LF - - // Advance 1 to include CR/LF in lineEnd - lineEnd = currentSlice.GetPosition(1, lineEndPosition.Value); - headerSpan = currentSlice.Slice(reader.Position, lineEnd).ToSpan(); - if (headerSpan[^1] != ByteCR) + else { - RejectRequestHeader(headerSpan); + RejectRequestHeader(reader.Sequence.ToSpan()); } - - return -1; } - // Advance 2 to include CR{LF?} in lineEnd - lineEnd = currentSlice.GetPosition(2, lineEndPosition.Value); - headerSpan = currentSlice.Slice(reader.Position, lineEnd).ToSpan(); + return isVersionLineRead; + } - if (headerSpan.Length < 5) - { - // Less than min possible headerSpan is 5 bytes a:b\r\n - RejectRequestHeader(headerSpan); - } + private bool TryParseHeaderLine(ReadOnlySpan headerLine, NatsHeaders headers) + { + // We are first looking for a version line + // e.g. NATS/1.0 100 Idle Heartbeat + headerLine.Split(out var versionBytes, out headerLine); - if (headerSpan[^2] != ByteCR) + if (!versionBytes.SequenceEqual(CommandConstants.NatsHeaders10)) { - // Sequence needs to be CRLF not LF first. - RejectRequestHeader(headerSpan[..^1]); + throw new NatsException("Protocol error: header version mismatch"); } - if (headerSpan[^1] != ByteLF || - - // Exclude the CRLF from the headerLine and parse the header name:value pair - !TryTakeSingleHeader(headerSpan[..^2], headers, ref isVersionLineRead)) + if (headerLine.Length != 0) { - // Sequence needs to be CRLF and not contain an inner CR not part of terminator. - // Not parsable as a valid name:value header pair. - RejectRequestHeader(headerSpan); + headerLine.Split(out var codeBytes, out headerLine); + if (!Utf8Parser.TryParse(codeBytes, out int code, out _)) + throw new NatsException("Protocol error: header code is not a number"); + headers.Code = code; } - return headerSpan.Length; - } - - private bool TryTakeSingleHeader(ReadOnlySpan headerLine, NatsHeaders headers, ref bool isVersionLineRead) - { - // We are first looking for a version line - // e.g. NATS/1.0 100 Idle Heartbeat - if (!isVersionLineRead) + if (headerLine.Length != 0) { - headerLine.Split(out var versionBytes, out headerLine); - - if (!versionBytes.SequenceEqual(CommandConstants.NatsHeaders10)) + // We can reduce string allocations by detecting commonly used + // header messages. + if (headerLine.SequenceEqual(NatsHeaders.MessageIdleHeartbeat)) { - throw new NatsException("Protocol error: header version mismatch"); + headers.Message = NatsHeaders.Messages.IdleHeartbeat; + headers.MessageText = NatsHeaders.MessageIdleHeartbeatStr; } - - if (headerLine.Length != 0) + else if (headerLine.SequenceEqual(NatsHeaders.MessageBadRequest)) { - headerLine.Split(out var codeBytes, out headerLine); - if (!Utf8Parser.TryParse(codeBytes, out int code, out _)) - throw new NatsException("Protocol error: header code is not a number"); - headers.Code = code; + headers.Message = NatsHeaders.Messages.BadRequest; + headers.MessageText = NatsHeaders.MessageBadRequestStr; } - - if (headerLine.Length != 0) + else if (headerLine.SequenceEqual(NatsHeaders.MessageConsumerDeleted)) { - // We can reduce string allocations by detecting commonly used - // header messages. - if (headerLine.SequenceEqual(NatsHeaders.MessageIdleHeartbeat)) - { - headers.Message = NatsHeaders.Messages.IdleHeartbeat; - headers.MessageText = NatsHeaders.MessageIdleHeartbeatStr; - } - else if (headerLine.SequenceEqual(NatsHeaders.MessageBadRequest)) - { - headers.Message = NatsHeaders.Messages.BadRequest; - headers.MessageText = NatsHeaders.MessageBadRequestStr; - } - else if (headerLine.SequenceEqual(NatsHeaders.MessageConsumerDeleted)) - { - headers.Message = NatsHeaders.Messages.ConsumerDeleted; - headers.MessageText = NatsHeaders.MessageConsumerDeletedStr; - } - else if (headerLine.SequenceEqual(NatsHeaders.MessageConsumerIsPushBased)) - { - headers.Message = NatsHeaders.Messages.ConsumerIsPushBased; - headers.MessageText = NatsHeaders.MessageConsumerIsPushBasedStr; - } - else if (headerLine.SequenceEqual(NatsHeaders.MessageNoMessages)) - { - headers.Message = NatsHeaders.Messages.NoMessages; - headers.MessageText = NatsHeaders.MessageNoMessagesStr; - } - else if (headerLine.SequenceEqual(NatsHeaders.MessageRequestTimeout)) - { - headers.Message = NatsHeaders.Messages.RequestTimeout; - headers.MessageText = NatsHeaders.MessageRequestTimeoutStr; - } - else if (headerLine.SequenceEqual(NatsHeaders.MessageMessageSizeExceedsMaxBytes)) - { - headers.Message = NatsHeaders.Messages.MessageSizeExceedsMaxBytes; - headers.MessageText = NatsHeaders.MessageMessageSizeExceedsMaxBytesStr; - } - else - { - headers.Message = NatsHeaders.Messages.Text; - headers.MessageText = _encoding.GetString(headerLine); - } + headers.Message = NatsHeaders.Messages.ConsumerDeleted; + headers.MessageText = NatsHeaders.MessageConsumerDeletedStr; + } + else if (headerLine.SequenceEqual(NatsHeaders.MessageConsumerIsPushBased)) + { + headers.Message = NatsHeaders.Messages.ConsumerIsPushBased; + headers.MessageText = NatsHeaders.MessageConsumerIsPushBasedStr; + } + else if (headerLine.SequenceEqual(NatsHeaders.MessageNoMessages)) + { + headers.Message = NatsHeaders.Messages.NoMessages; + headers.MessageText = NatsHeaders.MessageNoMessagesStr; + } + else if (headerLine.SequenceEqual(NatsHeaders.MessageRequestTimeout)) + { + headers.Message = NatsHeaders.Messages.RequestTimeout; + headers.MessageText = NatsHeaders.MessageRequestTimeoutStr; + } + else if (headerLine.SequenceEqual(NatsHeaders.MessageMessageSizeExceedsMaxBytes)) + { + headers.Message = NatsHeaders.Messages.MessageSizeExceedsMaxBytes; + headers.MessageText = NatsHeaders.MessageMessageSizeExceedsMaxBytesStr; + } + else + { + headers.Message = NatsHeaders.Messages.Text; + headers.MessageText = _encoding.GetString(headerLine); } - - isVersionLineRead = true; - return true; } + return true; + } + + private bool TryTakeSingleHeader(ReadOnlySpan headerLine, NatsHeaders headers) + { // We are looking for a colon to terminate the header name. // However, the header name cannot contain a space or tab so look for all three // and see which is found first. diff --git a/src/NATS.Client.Core/Internal/ClientOptions.cs b/src/NATS.Client.Core/Internal/ClientOptions.cs index 4d0be738d..6ecbf74ac 100644 --- a/src/NATS.Client.Core/Internal/ClientOptions.cs +++ b/src/NATS.Client.Core/Internal/ClientOptions.cs @@ -11,16 +11,16 @@ namespace NATS.Client.Core.Internal; // https://github.com/nats-io/nats-server/blob/a23b1b7/server/client.go#L536 internal sealed class ClientOptions { - private ClientOptions(NatsOptions options) + private ClientOptions(NatsOpts opts) { - Name = options.Name; - Echo = options.Echo; - Verbose = options.Verbose; - Headers = options.Headers; - Username = options.AuthOptions.Username; - Password = options.AuthOptions.Password; - AuthToken = options.AuthOptions.Token; - JWT = options.AuthOptions.Jwt; + Name = opts.Name; + Echo = opts.Echo; + Verbose = opts.Verbose; + Headers = opts.Headers; + Username = opts.AuthOptions.Username; + Password = opts.AuthOptions.Password; + AuthToken = opts.AuthOptions.Token; + JWT = opts.AuthOptions.Jwt; } /// Optional boolean. If set to true, the server (version 1.2.0+) will not send originating messages from this connection to its own subscriptions. Clients should set this to true only for server supporting this feature, which is when proto in the INFO protocol is set to at least 1. @@ -90,9 +90,9 @@ private ClientOptions(NatsOptions options) [JsonPropertyName("no_responders")] public bool NoResponders { get; init; } = false; - public static ClientOptions Create(NatsOptions options) + public static ClientOptions Create(NatsOpts opts) { - return new ClientOptions(options); + return new ClientOptions(opts); } private static string GetAssemblyVersion() diff --git a/src/NATS.Client.Core/Internal/DebuggingExtensions.cs b/src/NATS.Client.Core/Internal/DebuggingExtensions.cs index 9d91ca571..ca6750523 100644 --- a/src/NATS.Client.Core/Internal/DebuggingExtensions.cs +++ b/src/NATS.Client.Core/Internal/DebuggingExtensions.cs @@ -27,10 +27,10 @@ public static string Dump(this ReadOnlySpan span) sb.Append(b); break; case '\r': - sb.Append('␍'); + sb.Append("\\r"); break; case '\n': - sb.Append('␊'); + sb.Append("\\n"); break; default: sb.Append('.'); diff --git a/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs index fba637149..8ea96dcdd 100644 --- a/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs @@ -13,7 +13,7 @@ internal sealed class NatsPipeliningWriteProtocolProcessor : IAsyncDisposable private readonly ConnectionStatsCounter _counter; private readonly FixedArrayBufferWriter _bufferWriter; private readonly Channel _channel; - private readonly NatsOptions _options; + private readonly NatsOpts _opts; private readonly Task _writeLoop; private readonly Stopwatch _stopwatch = new Stopwatch(); private readonly CancellationTokenSource _cancellationTokenSource; @@ -27,7 +27,7 @@ public NatsPipeliningWriteProtocolProcessor(ISocketConnection socketConnection, _counter = counter; _bufferWriter = state.BufferWriter; _channel = state.CommandBuffer; - _options = state.Options; + _opts = state.Opts; _cancellationTokenSource = new CancellationTokenSource(); _writeLoop = Task.Run(WriteLoopAsync); } @@ -45,8 +45,8 @@ private async Task WriteLoopAsync() { var reader = _channel.Reader; var protocolWriter = new ProtocolWriter(_bufferWriter); - var logger = _options.LoggerFactory.CreateLogger(); - var writerBufferSize = _options.WriterBufferSize; + var logger = _opts.LoggerFactory.CreateLogger(); + var writerBufferSize = _opts.WriterBufferSize; var promiseList = new List(100); var isEnabledTraceLogging = logger.IsEnabled(LogLevel.Trace); diff --git a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs index c6ce91e23..55393303b 100644 --- a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs @@ -27,13 +27,13 @@ internal sealed class NatsReadProtocolProcessor : IAsyncDisposable public NatsReadProtocolProcessor(ISocketConnection socketConnection, NatsConnection connection, TaskCompletionSource waitForInfoSignal, TaskCompletionSource waitForPongOrErrorSignal, Task infoParsed) { _connection = connection; - _logger = connection.Options.LoggerFactory.CreateLogger(); + _logger = connection.Opts.LoggerFactory.CreateLogger(); _trace = _logger.IsEnabled(LogLevel.Trace); _waitForInfoSignal = waitForInfoSignal; _waitForPongOrErrorSignal = waitForPongOrErrorSignal; _infoParsed = infoParsed; _pingCommands = new ConcurrentQueue(); - _socketReader = new SocketReader(socketConnection, connection.Options.ReaderBufferSize, connection.Counter, connection.Options.LoggerFactory); + _socketReader = new SocketReader(socketConnection, connection.Opts.ReaderBufferSize, connection.Counter, connection.Opts.LoggerFactory); _readLoop = Task.Run(ReadLoopAsync); } diff --git a/src/NATS.Client.Core/Internal/SubscriptionManager.cs b/src/NATS.Client.Core/Internal/SubscriptionManager.cs index c61ad8343..ed09a7544 100644 --- a/src/NATS.Client.Core/Internal/SubscriptionManager.cs +++ b/src/NATS.Client.Core/Internal/SubscriptionManager.cs @@ -6,7 +6,7 @@ namespace NATS.Client.Core.Internal; -internal interface ISubscriptionManager +public interface ISubscriptionManager { public ValueTask RemoveAsync(NatsSubBase sub); } @@ -36,11 +36,11 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix) { _connection = connection; _inboxPrefix = inboxPrefix; - _logger = _connection.Options.LoggerFactory.CreateLogger(); + _logger = _connection.Opts.LoggerFactory.CreateLogger(); _cts = new CancellationTokenSource(); - _cleanupInterval = _connection.Options.SubscriptionCleanUpInterval; + _cleanupInterval = _connection.Opts.SubscriptionCleanUpInterval; _timer = Task.Run(CleanupAsync); - InboxSubBuilder = new InboxSubBuilder(connection.Options.LoggerFactory.CreateLogger()); + InboxSubBuilder = new InboxSubBuilder(connection.Opts.LoggerFactory.CreateLogger()); _inboxSubSentinel = new InboxSub(InboxSubBuilder, nameof(_inboxSubSentinel), default, connection, this); _inboxSub = _inboxSubSentinel; } diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index a2453c652..b12e032c0 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -26,7 +26,7 @@ public ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, Cance /// public ValueTask PublishAsync(string subject, T? data, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { - var serializer = opts?.Serializer ?? Options.Serializer; + var serializer = opts?.Serializer ?? Opts.Serializer; if (opts?.WaitUntilSent ?? false) { return PubModelAsync(subject, data, serializer, opts?.ReplyTo, opts?.Headers, cancellationToken); diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs index c32905e32..b54c8a113 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs @@ -112,7 +112,7 @@ private NatsSubOpts SetReplyOptsDefaults(in NatsSubOpts? replyOpts) if ((opts.Timeout ?? default) == default) { - opts = opts with { Timeout = Options.RequestTimeout }; + opts = opts with { Timeout = Opts.RequestTimeout }; } return opts; diff --git a/src/NATS.Client.Core/NatsConnection.RequestSub.cs b/src/NATS.Client.Core/NatsConnection.RequestSub.cs index ec7347ba6..d1ac0860e 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestSub.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestSub.cs @@ -28,14 +28,14 @@ internal async ValueTask> RequestSubAsync( { var replyTo = $"{InboxPrefix}{Guid.NewGuid():n}"; - var replySerializer = replyOpts?.Serializer ?? Options.Serializer; + var replySerializer = replyOpts?.Serializer ?? Opts.Serializer; var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, replyOpts, replySerializer); await SubAsync(replyTo, replyOpts, sub, cancellationToken).ConfigureAwait(false); await PubModelAsync( subject, data, - requestOpts?.Serializer ?? Options.Serializer, + requestOpts?.Serializer ?? Opts.Serializer, replyTo, requestOpts?.Headers, cancellationToken).ConfigureAwait(false); diff --git a/src/NATS.Client.Core/NatsConnection.Subscribe.cs b/src/NATS.Client.Core/NatsConnection.Subscribe.cs index 8f8cd122d..77deb3475 100644 --- a/src/NATS.Client.Core/NatsConnection.Subscribe.cs +++ b/src/NATS.Client.Core/NatsConnection.Subscribe.cs @@ -16,7 +16,7 @@ public async ValueTask SubscribeAsync(string subject, NatsSubOpts? opt /// public async ValueTask> SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) { - var serializer = opts?.Serializer ?? Options.Serializer; + var serializer = opts?.Serializer ?? Opts.Serializer; var sub = new NatsSub(this, SubscriptionManager.GetManagerFor(subject), subject, opts, serializer); await SubAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false); return sub; diff --git a/src/NATS.Client.Core/NatsConnection.Util.cs b/src/NATS.Client.Core/NatsConnection.Util.cs index d1ba90944..a5635a9e1 100644 --- a/src/NATS.Client.Core/NatsConnection.Util.cs +++ b/src/NATS.Client.Core/NatsConnection.Util.cs @@ -70,7 +70,7 @@ internal void PostDirectWrite(DirectWriteCommand command) internal CancellationTimer GetCancellationTimer(CancellationToken cancellationToken, TimeSpan timeout = default) { if (timeout == default) - timeout = Options.CommandTimeout; + timeout = Opts.CommandTimeout; return _cancellationTimerPool.Start(timeout, cancellationToken); } diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index d557565b7..88a9220d7 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -52,27 +52,27 @@ public partial class NatsConnection : IAsyncDisposable, INatsConnection private UserCredentials? _userCredentials; public NatsConnection() - : this(NatsOptions.Default) + : this(NatsOpts.Default) { } - public NatsConnection(NatsOptions options) + public NatsConnection(NatsOpts opts) { - Options = options; + Opts = opts; ConnectionState = NatsConnectionState.Closed; _waitForOpenConnection = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _disposedCancellationTokenSource = new CancellationTokenSource(); - _pool = new ObjectPool(options.ObjectPoolSize); + _pool = new ObjectPool(opts.ObjectPoolSize); _cancellationTimerPool = new CancellationTimerPool(_pool, _disposedCancellationTokenSource.Token); - _name = options.Name; + _name = opts.Name; Counter = new ConnectionStatsCounter(); - _writerState = new WriterState(options); + _writerState = new WriterState(opts); _commandWriter = _writerState.CommandBuffer.Writer; - InboxPrefix = $"{options.InboxPrefix}.{Guid.NewGuid():n}."; + InboxPrefix = $"{opts.InboxPrefix}.{Guid.NewGuid():n}."; SubscriptionManager = new SubscriptionManager(this, InboxPrefix); - _logger = options.LoggerFactory.CreateLogger(); - _clientOptions = ClientOptions.Create(Options); - HeaderParser = new HeaderParser(options.HeaderEncoding); + _logger = opts.LoggerFactory.CreateLogger(); + _clientOptions = ClientOptions.Create(Opts); + HeaderParser = new HeaderParser(opts.HeaderEncoding); } // events @@ -82,7 +82,7 @@ public NatsConnection(NatsOptions options) public event EventHandler? ReconnectFailed; - public NatsOptions Options { get; } + public NatsOpts Opts { get; } public NatsConnectionState ConnectionState { get; private set; } @@ -221,15 +221,15 @@ private async ValueTask InitialConnectAsync() { Debug.Assert(ConnectionState == NatsConnectionState.Connecting, "Connection state"); - var uris = Options.GetSeedUris(); - if (Options.TlsOptions.Disabled && uris.Any(u => u.IsTls)) + var uris = Opts.GetSeedUris(); + if (Opts.TlsOptions.Disabled && uris.Any(u => u.IsTls)) throw new NatsException($"URI {uris.First(u => u.IsTls)} requires TLS but TlsOptions.Disabled is set to true"); - if (Options.TlsOptions.Required) - _tlsCerts = new TlsCerts(Options.TlsOptions); + if (Opts.TlsOptions.Required) + _tlsCerts = new TlsCerts(Opts.TlsOptions); - if (!Options.AuthOptions.IsAnonymous) + if (!Opts.AuthOptions.IsAnonymous) { - _userCredentials = new UserCredentials(Options.AuthOptions); + _userCredentials = new UserCredentials(Opts.AuthOptions); } foreach (var uri in uris) @@ -247,13 +247,13 @@ private async ValueTask InitialConnectAsync() if (uri.IsWebSocket) { var conn = new WebSocketConnection(); - await conn.ConnectAsync(uri.Uri, Options.ConnectTimeout).ConfigureAwait(false); + await conn.ConnectAsync(uri.Uri, Opts.ConnectTimeout).ConfigureAwait(false); _socket = conn; } else { var conn = new TcpConnection(); - await conn.ConnectAsync(target.Host, target.Port, Options.ConnectTimeout).ConfigureAwait(false); + await conn.ConnectAsync(target.Host, target.Port, Opts.ConnectTimeout).ConfigureAwait(false); _socket = conn; } @@ -331,19 +331,19 @@ private async ValueTask SetupReaderWriterAsync(bool reconnect) // check to see if we should upgrade to TLS if (_socket is TcpConnection tcpConnection) { - if (Options.TlsOptions.Disabled && WritableServerInfo!.TlsRequired) + if (Opts.TlsOptions.Disabled && WritableServerInfo!.TlsRequired) { throw new NatsException( $"Server {_currentConnectUri} requires TLS but TlsOptions.Disabled is set to true"); } - if (Options.TlsOptions.Required && !WritableServerInfo!.TlsRequired && !WritableServerInfo.TlsAvailable) + if (Opts.TlsOptions.Required && !WritableServerInfo!.TlsRequired && !WritableServerInfo.TlsAvailable) { throw new NatsException( $"Server {_currentConnectUri} does not support TLS but TlsOptions.Disabled is set to true"); } - if (Options.TlsOptions.Required || WritableServerInfo!.TlsRequired || WritableServerInfo.TlsAvailable) + if (Opts.TlsOptions.Required || WritableServerInfo!.TlsRequired || WritableServerInfo.TlsAvailable) { // do TLS upgrade // if the current URI is not a seed URI and is not a DNS hostname, check the server cert against the @@ -364,7 +364,7 @@ private async ValueTask SetupReaderWriterAsync(bool reconnect) _socketReader = null; // upgrade TcpConnection to SslConnection - var sslConnection = tcpConnection.UpgradeToSslStreamConnection(Options.TlsOptions, _tlsCerts); + var sslConnection = tcpConnection.UpgradeToSslStreamConnection(Opts.TlsOptions, _tlsCerts); await sslConnection.AuthenticateAsClientAsync(targetHost).ConfigureAwait(false); _socket = sslConnection; @@ -433,12 +433,12 @@ private async void ReconnectLoop() await DisposeSocketAsync(true).ConfigureAwait(false); var defaultScheme = _currentConnectUri!.Uri.Scheme; - var urls = (Options.NoRandomize + var urls = (Opts.NoRandomize ? WritableServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x, false, defaultScheme)).Distinct().ToArray() : WritableServerInfo?.ClientConnectUrls?.Select(x => new NatsUri(x, false, defaultScheme)).OrderBy(_ => Guid.NewGuid()).Distinct().ToArray()) ?? Array.Empty(); if (urls.Length == 0) - urls = Options.GetSeedUris(); + urls = Opts.GetSeedUris(); // add last. urls = urls.Where(x => x != _currentConnectUri).Append(_currentConnectUri).ToArray(); @@ -463,13 +463,13 @@ private async void ReconnectLoop() if (url.IsWebSocket) { var conn = new WebSocketConnection(); - await conn.ConnectAsync(url.Uri, Options.ConnectTimeout).ConfigureAwait(false); + await conn.ConnectAsync(url.Uri, Opts.ConnectTimeout).ConfigureAwait(false); _socket = conn; } else { var conn = new TcpConnection(); - await conn.ConnectAsync(target.Host, target.Port, Options.ConnectTimeout).ConfigureAwait(false); + await conn.ConnectAsync(target.Host, target.Port, Opts.ConnectTimeout).ConfigureAwait(false); _socket = conn; } @@ -517,8 +517,8 @@ private async void ReconnectLoop() private async Task WaitWithJitterAsync() { - var jitter = Random.Shared.NextDouble() * Options.ReconnectJitter.TotalMilliseconds; - var waitTime = Options.ReconnectWait + TimeSpan.FromMilliseconds(jitter); + var jitter = Random.Shared.NextDouble() * Opts.ReconnectJitter.TotalMilliseconds; + var waitTime = Opts.ReconnectWait + TimeSpan.FromMilliseconds(jitter); if (waitTime != TimeSpan.Zero) { _logger.LogTrace("Wait {0}ms to reconnect.", waitTime.TotalMilliseconds); @@ -528,16 +528,16 @@ private async Task WaitWithJitterAsync() private async void StartPingTimer(CancellationToken cancellationToken) { - if (Options.PingInterval == TimeSpan.Zero) + if (Opts.PingInterval == TimeSpan.Zero) return; - var periodicTimer = new PeriodicTimer(Options.PingInterval); + var periodicTimer = new PeriodicTimer(Opts.PingInterval); ResetPongCount(); try { while (!cancellationToken.IsCancellationRequested) { - if (Interlocked.Increment(ref _pongCount) > Options.MaxPingOut) + if (Interlocked.Increment(ref _pongCount) > Opts.MaxPingOut) { _logger.LogInformation("Detect MaxPingOut, try to connection abort."); if (_socket != null) @@ -559,7 +559,7 @@ private async void StartPingTimer(CancellationToken cancellationToken) [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] private CancellationTimer GetRequestCommandTimer(CancellationToken cancellationToken) { - return _cancellationTimerPool.Start(Options.RequestTimeout, cancellationToken); + return _cancellationTimerPool.Start(Opts.RequestTimeout, cancellationToken); } [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] @@ -787,12 +787,12 @@ private async ValueTask WithConnectAsync(T // This writer state is reused when reconnecting. internal sealed class WriterState { - public WriterState(NatsOptions options) + public WriterState(NatsOpts opts) { - Options = options; + Opts = opts; BufferWriter = new FixedArrayBufferWriter(); - if (options.WriterCommandBufferLimit == null) + if (opts.WriterCommandBufferLimit == null) { CommandBuffer = Channel.CreateUnbounded(new UnboundedChannelOptions { @@ -803,7 +803,7 @@ public WriterState(NatsOptions options) } else { - CommandBuffer = Channel.CreateBounded(new BoundedChannelOptions(options.WriterCommandBufferLimit.Value) + CommandBuffer = Channel.CreateBounded(new BoundedChannelOptions(opts.WriterCommandBufferLimit.Value) { FullMode = BoundedChannelFullMode.Wait, AllowSynchronousContinuations = false, // always should be in async loop. @@ -820,7 +820,7 @@ public WriterState(NatsOptions options) public Channel CommandBuffer { get; } - public NatsOptions Options { get; } + public NatsOpts Opts { get; } public List PriorityCommands { get; } diff --git a/src/NATS.Client.Core/NatsConnectionPool.cs b/src/NATS.Client.Core/NatsConnectionPool.cs index ecf6613b7..b77e10336 100644 --- a/src/NATS.Client.Core/NatsConnectionPool.cs +++ b/src/NATS.Client.Core/NatsConnectionPool.cs @@ -6,33 +6,33 @@ public sealed class NatsConnectionPool : INatsConnectionPool private int _index = -1; public NatsConnectionPool() - : this(Environment.ProcessorCount / 2, NatsOptions.Default, _ => { }) + : this(Environment.ProcessorCount / 2, NatsOpts.Default, _ => { }) { } public NatsConnectionPool(int poolSize) - : this(poolSize, NatsOptions.Default, _ => { }) + : this(poolSize, NatsOpts.Default, _ => { }) { } - public NatsConnectionPool(NatsOptions options) - : this(Environment.ProcessorCount / 2, options, _ => { }) + public NatsConnectionPool(NatsOpts opts) + : this(Environment.ProcessorCount / 2, opts, _ => { }) { } - public NatsConnectionPool(int poolSize, NatsOptions options) - : this(poolSize, options, _ => { }) + public NatsConnectionPool(int poolSize, NatsOpts opts) + : this(poolSize, opts, _ => { }) { } - public NatsConnectionPool(int poolSize, NatsOptions options, Action configureConnection) + public NatsConnectionPool(int poolSize, NatsOpts opts, Action configureConnection) { poolSize = Math.Max(1, poolSize); _connections = new NatsConnection[poolSize]; for (var i = 0; i < _connections.Length; i++) { - var name = (options.Name == null) ? $"#{i}" : $"{options.Name}#{i}"; - var conn = new NatsConnection(options with { Name = name }); + var name = (opts.Name == null) ? $"#{i}" : $"{opts.Name}#{i}"; + var conn = new NatsConnection(opts with { Name = name }); configureConnection(conn); _connections[i] = conn; } diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 9a830c0d3..f90f609ba 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -33,8 +33,8 @@ internal static NatsMsg Build( } var size = subject.Length - + replyTo?.Length ?? 0 - + headersBuffer?.Length ?? 0 + + (replyTo?.Length ?? 0) + + (headersBuffer?.Length ?? 0) + payloadBuffer.Length; return new NatsMsg(subject, replyTo, (int)size, headers, payloadBuffer.ToArray(), connection); @@ -105,8 +105,8 @@ internal static NatsMsg Build( } var size = subject.Length - + replyTo?.Length ?? 0 - + headersBuffer?.Length ?? 0 + + (replyTo?.Length ?? 0) + + (headersBuffer?.Length ?? 0) + payloadBuffer.Length; return new NatsMsg(subject, replyTo, (int)size, headers, data, connection); diff --git a/src/NATS.Client.Core/NatsOptions.cs b/src/NATS.Client.Core/NatsOpts.cs similarity index 92% rename from src/NATS.Client.Core/NatsOptions.cs rename to src/NATS.Client.Core/NatsOpts.cs index b47f9c112..d461a8ae3 100644 --- a/src/NATS.Client.Core/NatsOptions.cs +++ b/src/NATS.Client.Core/NatsOpts.cs @@ -33,7 +33,8 @@ namespace NATS.Client.Core; /// /// /// -public sealed record NatsOptions +/// +public sealed record NatsOpts ( string Url, string Name, @@ -59,9 +60,10 @@ public sealed record NatsOptions TimeSpan CommandTimeout, TimeSpan SubscriptionCleanUpInterval, int? WriterCommandBufferLimit, - Encoding HeaderEncoding) + Encoding HeaderEncoding, + bool WaitUntilSent) { - public static readonly NatsOptions Default = new( + public static readonly NatsOpts Default = new( Url: "nats://localhost:4222", Name: "NATS .Net Client", Echo: true, @@ -86,7 +88,8 @@ public sealed record NatsOptions CommandTimeout: TimeSpan.FromMinutes(1), SubscriptionCleanUpInterval: TimeSpan.FromMinutes(5), WriterCommandBufferLimit: 1_000, - HeaderEncoding: Encoding.ASCII); + HeaderEncoding: Encoding.ASCII, + WaitUntilSent: false); internal NatsUri[] GetSeedUris() { diff --git a/src/NATS.Client.Core/NatsSubBase.cs b/src/NATS.Client.Core/NatsSubBase.cs index f8916bd9c..b3459775c 100644 --- a/src/NATS.Client.Core/NatsSubBase.cs +++ b/src/NATS.Client.Core/NatsSubBase.cs @@ -1,14 +1,17 @@ using System.Buffers; using System.Runtime.ExceptionServices; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; using NATS.Client.Core.Commands; using NATS.Client.Core.Internal; namespace NATS.Client.Core; -internal enum NatsSubEndReason +public enum NatsSubEndReason { None, MaxMsgs, + MaxBytes, Timeout, IdleTimeout, StartUpTimeout, @@ -17,6 +20,8 @@ internal enum NatsSubEndReason public abstract class NatsSubBase { + private readonly ILogger _logger; + private readonly bool _debug; private readonly ISubscriptionManager _manager; private readonly Timer? _timeoutTimer; private readonly Timer? _idleTimeoutTimer; @@ -38,6 +43,8 @@ internal NatsSubBase( string subject, NatsSubOpts? opts) { + _logger = connection.Opts.LoggerFactory.CreateLogger(); + _debug = _logger.IsEnabled(LogLevel.Debug); _manager = manager; _pendingMsgs = opts is { MaxMsgs: > 0 } ? opts.Value.MaxMsgs ?? -1 : -1; _countPendingMsgs = _pendingMsgs > 0; @@ -93,7 +100,7 @@ internal NatsSubBase( // since INatsSub is marked as internal. public int? PendingMsgs => _pendingMsgs == -1 ? null : Volatile.Read(ref _pendingMsgs); - internal NatsSubEndReason EndReason => (NatsSubEndReason)Volatile.Read(ref _endReasonRaw); + public NatsSubEndReason EndReason => (NatsSubEndReason)Volatile.Read(ref _endReasonRaw); protected NatsConnection Connection { get; } @@ -171,6 +178,13 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea // Need to await to handle any exceptions await ReceiveInternalAsync(subject, replyTo, headersBuffer, payloadBuffer).ConfigureAwait(false); } + catch (ChannelClosedException) + { + // When user disposes or unsubscribes there maybe be messages still coming in + // (e.g. JetStream consumer batch might not be finished) even though we're not + // interested in the messages anymore. Hence we ignore any messages being + // fed into the channel and rejected. + } catch (Exception e) { var payload = new Memory(new byte[payloadBuffer.Length]); @@ -248,8 +262,11 @@ protected void DecrementMaxMsgs() /// protected abstract void TryComplete(); - private void EndSubscription(NatsSubEndReason reason) + protected void EndSubscription(NatsSubEndReason reason) { + if (_debug) + _logger.LogDebug("End subscription {Reason}", reason); + lock (this) { if (_endSubscription) diff --git a/src/NATS.Client.Hosting/NatsHostingExtensions.cs b/src/NATS.Client.Hosting/NatsHostingExtensions.cs index de19ae256..2d0205b39 100644 --- a/src/NATS.Client.Hosting/NatsHostingExtensions.cs +++ b/src/NATS.Client.Hosting/NatsHostingExtensions.cs @@ -11,7 +11,7 @@ public static class NatsHostingExtensions /// Add NatsConnection/Pool to ServiceCollection. When poolSize = 1, registered `NatsConnection` and `INatsCommand` as singleton. /// Others, registered `NatsConnectionPool` as singleton, `NatsConnection` and `INatsCommand` as transient(get from pool). /// - public static IServiceCollection AddNats(this IServiceCollection services, int poolSize = 1, Func? configureOptions = null, Action? configureConnection = null) + public static IServiceCollection AddNats(this IServiceCollection services, int poolSize = 1, Func? configureOptions = null, Action? configureConnection = null) { poolSize = Math.Max(poolSize, 1); @@ -19,7 +19,7 @@ public static IServiceCollection AddNats(this IServiceCollection services, int p { services.TryAddSingleton(provider => { - var options = NatsOptions.Default with { LoggerFactory = provider.GetRequiredService() }; + var options = NatsOpts.Default with { LoggerFactory = provider.GetRequiredService() }; if (configureOptions != null) { options = configureOptions(options); @@ -48,7 +48,7 @@ public static IServiceCollection AddNats(this IServiceCollection services, int p { services.TryAddSingleton(provider => { - var options = NatsOptions.Default with { LoggerFactory = provider.GetRequiredService() }; + var options = NatsOpts.Default with { LoggerFactory = provider.GetRequiredService() }; if (configureOptions != null) { options = configureOptions(options); diff --git a/src/NATS.Client.JetStream/INatsJSSubConsume.cs b/src/NATS.Client.JetStream/INatsJSSubConsume.cs index 2a2420bb2..df3496c1f 100644 --- a/src/NATS.Client.JetStream/INatsJSSubConsume.cs +++ b/src/NATS.Client.JetStream/INatsJSSubConsume.cs @@ -6,3 +6,8 @@ public interface INatsJSSubConsume : IAsyncDisposable { ChannelReader> Msgs { get; } } + +public interface INatsJSSubFetch : IAsyncDisposable +{ + ChannelReader> Msgs { get; } +} diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConstants.cs b/src/NATS.Client.JetStream/Internal/NatsJSConstants.cs index 5395e8d3e..1e0a7cffa 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConstants.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConstants.cs @@ -6,4 +6,7 @@ namespace NATS.Client.JetStream.Internal; internal static class NatsJSConstants { public static readonly ReadOnlySequence Ack = new(Encoding.ASCII.GetBytes("+ACK")); + public static readonly ReadOnlySequence Nack = new(Encoding.ASCII.GetBytes("-NAK")); + public static readonly ReadOnlySequence AckProgress = new(Encoding.ASCII.GetBytes("+WPI")); + public static readonly ReadOnlySequence AckTerminate = new(Encoding.ASCII.GetBytes("+TERM")); } diff --git a/src/NATS.Client.JetStream/Internal/JSErrorAwareJsonSerializer.cs b/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs similarity index 77% rename from src/NATS.Client.JetStream/Internal/JSErrorAwareJsonSerializer.cs rename to src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs index b574a1cc2..1c1fbb463 100644 --- a/src/NATS.Client.JetStream/Internal/JSErrorAwareJsonSerializer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs @@ -5,9 +5,9 @@ namespace NATS.Client.JetStream.Internal; -internal sealed class JSErrorAwareJsonSerializer : INatsSerializer +internal sealed class NatsJSErrorAwareJsonSerializer : INatsSerializer { - public static readonly JSErrorAwareJsonSerializer Default = new(); + public static readonly NatsJSErrorAwareJsonSerializer Default = new(); public int Serialize(ICountableBufferWriter bufferWriter, T? value) => throw new NotSupportedException(); @@ -22,7 +22,7 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) => if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement)) { var error = errorElement.Deserialize() ?? throw new NatsJSException("Can't parse JetStream error JSON payload"); - throw new JSApiErrorException(error); + throw new NatsJSApiErrorException(error); } return jsonDocument.Deserialize(); @@ -32,9 +32,9 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) => throw new NotSupportedException(); } -internal class JSApiErrorException : Exception +internal class NatsJSApiErrorException : Exception { - public JSApiErrorException(ApiError error) => Error = error; + public NatsJSApiErrorException(ApiError error) => Error = error; public ApiError Error { get; } } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs b/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs index 93e82173e..9554afa6c 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs @@ -2,5 +2,5 @@ public static class NatsJSExtensionsInternal { - internal static long ToNanos(this TimeSpan timeSpan) => (long)(timeSpan.TotalMilliseconds * 1_000_000); + public static long ToNanos(this TimeSpan timeSpan) => (long)(timeSpan.TotalMilliseconds * 1_000_000); } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOpsDefaults.cs b/src/NATS.Client.JetStream/Internal/NatsJSOpsDefaults.cs index 7df2b9dfe..f0f4cd69f 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOpsDefaults.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOpsDefaults.cs @@ -1,3 +1,5 @@ +using NATS.Client.Core; + namespace NATS.Client.JetStream.Internal; internal static class NatsJSOpsDefaults @@ -14,7 +16,7 @@ internal static (long MaxMsgs, long MaxBytes, long ThresholdMsgs, long Threshold long? thresholdMsgs = default, long? thresholdBytes = default) { - var jsOpts = opts ?? new NatsJSOpts(); + var jsOpts = opts ?? new NatsJSOpts(NatsOpts.Default); long maxMsgsOut; long maxBytesOut; diff --git a/src/NATS.Client.JetStream/Internal/NatsJSSubBase.cs b/src/NATS.Client.JetStream/Internal/NatsJSSubBase.cs deleted file mode 100644 index af49dd236..000000000 --- a/src/NATS.Client.JetStream/Internal/NatsJSSubBase.cs +++ /dev/null @@ -1,339 +0,0 @@ -using System.Buffers; -using System.Runtime.ExceptionServices; -using Microsoft.Extensions.Logging; -using NATS.Client.Core; -using NATS.Client.Core.Commands; -using NATS.Client.Core.Internal; -using NATS.Client.JetStream.Models; - -namespace NATS.Client.JetStream.Internal; - -internal abstract class NatsJSSubBase : NatsSubBase -{ - private readonly bool _trace; - private readonly string _stream; - private readonly string _consumer; - private readonly NatsJSContext _context; - private readonly NatsJSSubState _state; - private readonly INatsSerializer _serializer; - private readonly CancellationToken _cancellationToken; - private readonly Timer _heartbeatTimer; - private readonly TimeSpan _hearthBeatTimeout; - - internal NatsJSSubBase( - string stream, - string consumer, - NatsJSContext context, - ISubscriptionManager manager, - string subject, - NatsSubOpts? opts, - NatsJSSubState state, - INatsSerializer serializer, - CancellationToken cancellationToken = default) - : base(context.Nats, manager, subject, opts) - { - _stream = stream; - _consumer = consumer; - _context = context; - _state = state; - _serializer = serializer; - _cancellationToken = cancellationToken; - Logger = Connection.Options.LoggerFactory.CreateLogger>(); - _trace = Logger.IsEnabled(LogLevel.Trace); - - _hearthBeatTimeout = state.HearthBeatTimeout; - - // TODO: Heartbeat timeouts are signaled through the subscription internal channel - // so that state transitions can be done in the same loop as other messages - // to ensure state consistency. - _heartbeatTimer = new Timer( - callback: _ => HeartbeatTimerCallback(), - state: default, - dueTime: Timeout.Infinite, - period: Timeout.Infinite); - } - - protected ILogger Logger { get; } - - public override async ValueTask ReadyAsync() - { - await base.ReadyAsync().ConfigureAwait(false); - await CallMsgNextAsync(_state.GetRequest(init: true)).ConfigureAwait(false); - ResetHeartbeatTimer(); - } - - internal override IEnumerable GetReconnectCommands(int sid) - { - foreach (var command in base.GetReconnectCommands(sid)) - yield return command; - - yield return PublishCommand.Create( - pool: Connection.ObjectPool, - subject: $"{_context.Opts.ApiPrefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", - replyTo: Subject, - headers: default, - value: _state.GetRequest(init: true), - serializer: JsonNatsSerializer.Default, - cancellationToken: default); - } - - protected override async ValueTask ReceiveInternalAsync( - string subject, - string? replyTo, - ReadOnlySequence? headersBuffer, - ReadOnlySequence payloadBuffer) - { - ResetHeartbeatTimer(); - - // State is handled in a single-threaded fashion to make sure all decisions - // are made in order e.g. control messages may change pending counts which are - // also effected by user messages. - if (subject == Subject) - { - var msg = NatsMsg.Build( - subject, - replyTo, - headersBuffer, - payloadBuffer, - Connection, - Connection.HeaderParser); - - NatsJSNotification? notification = null; - - if (msg.Headers is { } headers) - { - if (_trace) - { - Logger.LogTrace("Control message received {Code} {Message}", headers.Code, headers.Message); - } - - // Read the values of Nats-Pending-Messages and Nats-Pending-Bytes headers. - // Subtract the values from pending messages count and pending bytes count respectively. if (msg.JSMsg.Msg.Headers is { Code: 100, Message: NatsHeaders.Messages.IdleHeartbeat } headers) - if (headers.TryGetValue("Nats-Pending-Messages", out var pendingMsgsStr) - && long.TryParse(pendingMsgsStr.ToString(), out var pendingMsgs)) - { - _state.ReceivedPendingMsgs(pendingMsgs); - _state.MarkForPull(); - } - - if (headers.TryGetValue("Nats-Pending-Bytes", out var pendingBytesStr) - && long.TryParse(pendingBytesStr.ToString(), out var pendingBytes)) - { - _state.ReceivedPendingBytes(pendingBytes); - _state.MarkForPull(); - } - - // React on other headers - if (headers is { Code: 100, Message: NatsHeaders.Messages.IdleHeartbeat }) - { - // Do nothing. Timer is reset for every message already. - } - - // Stop - else if (headers is { Code: 409, Message: NatsHeaders.Messages.ConsumerDeleted } - or { Code: 409, Message: NatsHeaders.Messages.ConsumerIsPushBased }) - { - SetException(new NatsJSConsumerPullTerminated(headers.Code, headers.MessageText)); - } - - // Errors - else if (headers is { Code: 400, Message: NatsHeaders.Messages.BadRequest }) - { - notification = new NatsJSNotification(headers.Code, headers.MessageText); - } - - // Warnings - else if (headers is { Code: 409 } - && (headers.MessageText.StartsWith("Exceeded MaxRequestBatch of") - || headers.MessageText.StartsWith("Exceeded MaxRequestExpires of ") - || headers.MessageText.StartsWith("Exceeded MaxRequestMaxBytes of ") - || headers.MessageText == "Exceeded MaxWaiting")) - { - notification = new NatsJSNotification(headers.Code, headers.MessageText); - } - - // Not Telegraphed - else if (headers is { Code: 404, Message: NatsHeaders.Messages.NoMessages } - or { Code: 408, Message: NatsHeaders.Messages.RequestTimeout } - or { Code: 409, Message: NatsHeaders.Messages.MessageSizeExceedsMaxBytes }) - { - _state.MarkForPull(); - } - else - { - notification = new NatsJSNotification(headers.Code, headers.MessageText); - Logger.LogError("Unhandled control message {Code} {Text}", headers.Code, headers.MessageText); - } - } - else - { - notification = new NatsJSNotification(999, "Unknown"); - } - - if (notification != null) - await ReceivedControlMsg(notification); - - if (_state.CanFetch()) - { - await CallMsgNextAsync(_state.GetRequest()); - } - } - else - { - try - { - var msg = NatsMsg.Build( - subject, - replyTo, - headersBuffer, - payloadBuffer, - Connection, - Connection.HeaderParser, - _serializer); - - _state.MsgReceived(msg.Size); - - if (_state.CanFetch()) - { - await CallMsgNextAsync(_state.GetRequest()); - } - - await ReceivedUserMsg(msg).ConfigureAwait(false); - - DecrementMaxMsgs(); - } - catch (Exception e) - { - var payload = new Memory(new byte[payloadBuffer.Length]); - payloadBuffer.CopyTo(payload.Span); - - Memory headers = default; - if (headersBuffer != null) - { - headers = new Memory(new byte[headersBuffer.Value.Length]); - } - - SetException(new NatsSubException($"Message error: {e.Message}", ExceptionDispatchInfo.Capture(e), payload, headers)); - } - } - } - - protected abstract void HeartbeatTimerCallback(); - - protected abstract ValueTask ReceivedControlMsg(NatsJSNotification notification); - - protected abstract ValueTask ReceivedUserMsg(NatsMsg msg); - - private void ResetHeartbeatTimer() => _heartbeatTimer.Change(_hearthBeatTimeout, Timeout.InfiniteTimeSpan); - - private ValueTask CallMsgNextAsync(ConsumerGetnextRequest request) - { - _state.IncrementTotalRequests(); - return Connection.PubModelAsync( - subject: $"{_context.Opts.ApiPrefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", - data: request, - serializer: JsonNatsSerializer.Default, - replyTo: Subject, - headers: default, - _cancellationToken); - } -} - -internal class NatsJSSubState -{ - private const int LargeMsgsBatchSize = 1_000_000; - - private readonly long _optsMaxBytes; - private readonly long _optsMaxMsgs; - private readonly long _optsThresholdMsgs; - private readonly long _optsThresholdBytes; - private readonly long _optsIdleHeartbeatNanos; - private readonly long _optsExpiresNanos; - - private long _pendingMsgs; - private long _pendingBytes; - private long _totalRequests; - private bool _pull; - - public NatsJSSubState( - NatsJSOpts? opts = default, - long? optsMaxBytes = default, - long? optsMaxMsgs = default, - long? optsThresholdMsgs = default, - long? optsThresholdBytes = default, - TimeSpan? optsIdleHeartbeat = default, - TimeSpan? optsExpires = default) - { - var m = NatsJSOpsDefaults.SetMax(opts, optsMaxMsgs, optsMaxBytes, optsThresholdMsgs, optsThresholdBytes); - var t = NatsJSOpsDefaults.SetTimeouts(optsExpires, optsIdleHeartbeat); - - _optsMaxBytes = m.MaxBytes; - _optsMaxMsgs = m.MaxMsgs; - _optsThresholdMsgs = m.ThresholdMsgs; - _optsThresholdBytes = m.ThresholdBytes; - _optsIdleHeartbeatNanos = t.IdleHeartbeat.ToNanos(); - _optsExpiresNanos = t.Expires.ToNanos(); - HearthBeatTimeout = t.IdleHeartbeat * 2; - } - - public TimeSpan HearthBeatTimeout { get; } - - public void ReceivedPendingMsgs(long pendingMsgs) => _pendingMsgs -= pendingMsgs; - - public void ReceivedPendingBytes(long pendingBytes) => _pendingBytes -= pendingBytes; - - public void MarkForPull() => _pull = true; - - public void IncrementTotalRequests() => Interlocked.Increment(ref _totalRequests); - - public ConsumerGetnextRequest GetRequest(bool init = false) - { - if (init) - { - _pendingBytes = 0; - _pendingMsgs = 0; - _totalRequests = 0; - } - - _pull = false; - _totalRequests++; - - long batch; - long maxBytes; - if (init) - { - batch = _optsMaxBytes > 0 ? LargeMsgsBatchSize : _optsMaxMsgs; - maxBytes = _optsMaxBytes > 0 ? _optsMaxBytes : 0; - } - else - { - batch = _optsMaxBytes > 0 ? LargeMsgsBatchSize : _optsMaxMsgs - _pendingMsgs; - maxBytes = _optsMaxBytes > 0 ? _optsMaxBytes - _pendingBytes : 0; - } - - var request = new ConsumerGetnextRequest - { - Batch = batch, - MaxBytes = maxBytes, - IdleHeartbeat = _optsIdleHeartbeatNanos, - Expires = _optsExpiresNanos, - NoWait = false, - }; - - _pendingMsgs += request.Batch; - _pendingBytes += request.MaxBytes; - - return request; - } - - public void MsgReceived(int size) - { - _pendingMsgs--; - _pendingBytes -= size; - } - - public bool CanFetch() => - _pull - || _optsThresholdMsgs >= _pendingMsgs - || (_optsThresholdBytes > 0 && _optsThresholdBytes >= _pendingBytes); -} diff --git a/src/NATS.Client.JetStream/Internal/NatsJSSubConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSSubConsume.cs deleted file mode 100644 index c93723b7a..000000000 --- a/src/NATS.Client.JetStream/Internal/NatsJSSubConsume.cs +++ /dev/null @@ -1,132 +0,0 @@ -using System.Threading.Channels; -using Microsoft.Extensions.Logging; -using NATS.Client.Core; -using NATS.Client.Core.Internal; - -namespace NATS.Client.JetStream.Internal; - -/* - * Channel Connections - * ------------------- - * - * - Sub CH: - * NatsJSSub message channel where all the inbox messages are - * delivered to. - * - * - User Messages CH: - * These are all the user messages (i.e. subject != inbox) - * - * - User Notifications CH: - * Anything we want to let user know about the state of the - * consumer, connection status, timeouts etc. - * - * The main idea is to deliver user and control messages from the server - * inbox subscription and internal control messages (e.g. heartbeat - * timeouts) to a single 'controller' where all messages would be - * processed in order and state managed in one place in a non-concurrent - * manner so that races are avoided and it's easier to reason with state - * changes. - * - * User Notifications also have their own channel so they can be - * prioritized and can run in their own Task where User error handler - * will be dispatched. - * - * - * NATS-SERVER - * | User - * | +--> [User Messages CH] -------> message loop - * v / (await foreach) - * [Sub CH] ---> Controller (with state) - * ^ \ User error - * | +--> [User Notifications CH] ---> handler - * | (Action<>) - * | Internal control msgs - * | (e.g. heartbeat timeout) - * | - * Heartbeat Timer - * - */ -internal class NatsJSSubConsume : NatsJSSubBase, INatsJSSubConsume -{ - private readonly Action? _errorHandler; - private readonly CancellationToken _cancellationToken; - private readonly Task _notifier; - private readonly Channel _notificationChannel; - private readonly Channel> _userMessageChannel; - - internal NatsJSSubConsume( - string stream, - string consumer, - NatsJSContext context, - ISubscriptionManager manager, - string subject, - NatsSubOpts? opts, - NatsJSSubState state, - INatsSerializer serializer, - Action? errorHandler = default, - CancellationToken cancellationToken = default) - : base(stream, consumer, context, manager, subject, opts, state, serializer, cancellationToken) - { - _errorHandler = errorHandler; - _cancellationToken = cancellationToken; - - // User messages are buffered here separately to allow smoother flow while control loop - // pulls more data in the background. This also allows control messages to be dealt with - // in the same loop as the control messages to keep state updates consistent. This is as - // opposed to having a control and a message channel at the point of serializing the messages - // in NatsJSSub class. - _userMessageChannel = Channel.CreateBounded>(NatsSub.GetChannelOptions(opts?.ChannelOptions)); - - // We drop the old message if notification handler isn't able to keep up. - // This is to avoid blocking the control loop and making sure we deliver all the messages. - // Assuming newer messages would be more relevant and worth keeping than older ones. - _notificationChannel = Channel.CreateBounded(new BoundedChannelOptions(1_000) - { - FullMode = BoundedChannelFullMode.DropOldest, - AllowSynchronousContinuations = false, - }); - _notifier = Task.Run(NotificationLoop); - } - - public ChannelReader> Msgs => _userMessageChannel.Reader; - - public override async ValueTask DisposeAsync() - { - await base.DisposeAsync(); - await _notifier; - } - - protected override void HeartbeatTimerCallback() => - _notificationChannel.Writer.WriteAsync(new NatsJSNotification(-1, "Heartbeat timeout"), _cancellationToken); - - protected override ValueTask ReceivedControlMsg(NatsJSNotification notification) - { - return _notificationChannel.Writer.WriteAsync(notification, _cancellationToken); - } - - protected override ValueTask ReceivedUserMsg(NatsMsg msg) - { - return _userMessageChannel.Writer.WriteAsync(new NatsJSMsg(msg), _cancellationToken); - } - - protected override void TryComplete() - { - _userMessageChannel.Writer.Complete(); - _notificationChannel.Writer.Complete(); - } - - private async Task NotificationLoop() - { - await foreach (var notification in _notificationChannel.Reader.ReadAllAsync(_cancellationToken)) - { - try - { - _errorHandler?.Invoke(notification); - } - catch (Exception e) - { - Logger.LogError(e, "User notification callback error"); - } - } - } -} diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 1392c010f..b030cc3c3 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.Logging; +using System.Runtime.CompilerServices; +using System.Threading.Channels; using NATS.Client.Core; using NATS.Client.JetStream.Internal; using NATS.Client.JetStream.Models; @@ -28,60 +29,163 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d return _deleted = await _context.DeleteConsumerAsync(_stream, _consumer, cancellationToken); } - public async ValueTask> ConsumeAsync( + public async IAsyncEnumerable> ConsumeAllAsync( NatsJSConsumeOpts opts, - NatsSubOpts requestOpts = default, - CancellationToken cancellationToken = default) + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await using var cc = await ConsumeAsync(opts, cancellationToken); + await foreach (var jsMsg in cc.Msgs.ReadAllAsync(cancellationToken)) + { + yield return jsMsg; + } + } + + public async ValueTask> ConsumeAsync(NatsJSConsumeOpts opts, CancellationToken cancellationToken = default) { ThrowIfDeleted(); - var inbox = $"{_context.Opts.InboxPrefix}.{Guid.NewGuid():n}"; + var inbox = _context.NewInbox(); - var state = new NatsJSSubState( - opts: _context.Opts, - optsMaxBytes: opts.MaxBytes, - optsMaxMsgs: opts.MaxMsgs, - optsThresholdMsgs: opts.ThresholdMsgs, - optsThresholdBytes: opts.ThresholdBytes, - optsExpires: opts.Expires, - optsIdleHeartbeat: opts.IdleHeartbeat); + var max = NatsJSOpsDefaults.SetMax(_context.Opts, opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes); + var timeouts = NatsJSOpsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); + + var requestOpts = new NatsSubOpts + { + Serializer = opts.Serializer, + ChannelOptions = new NatsSubChannelOpts + { + // Keep capacity at 1 to make sure message acknowledgements are sent + // right after the message is processed and messages aren't queued up + // which might cause timeouts for acknowledgments. + Capacity = 1, + FullMode = BoundedChannelFullMode.Wait, + }, + }; var sub = new NatsJSSubConsume( stream: _stream, consumer: _consumer, context: _context, - manager: _context.Nats.SubscriptionManager, subject: inbox, opts: requestOpts, - state: state, - serializer: requestOpts.Serializer ?? _context.Nats.Options.Serializer, - errorHandler: opts.ErrorHandler, - cancellationToken: cancellationToken); + maxMsgs: max.MaxMsgs, + maxBytes: max.MaxBytes, + thresholdMsgs: max.ThresholdMsgs, + thresholdBytes: max.ThresholdBytes, + expires: timeouts.Expires, + idle: timeouts.IdleHeartbeat, + errorHandler: opts.ErrorHandler); - await _context.Nats.SubAsync( + await _context.Connection.SubAsync( subject: inbox, opts: requestOpts, sub: sub, cancellationToken); + await sub.CallMsgNextAsync( + new ConsumerGetnextRequest + { + Batch = max.MaxMsgs, + MaxBytes = max.MaxBytes, + IdleHeartbeat = timeouts.IdleHeartbeat.ToNanos(), + Expires = timeouts.Expires.ToNanos(), + }, + cancellationToken); + + sub.ResetPending(); + sub.ResetHeartbeatTimer(); + return sub; } - public async ValueTask> NextAsync(CancellationToken cancellationToken = default) + public async ValueTask?> NextAsync(NatsJSNextOpts opts, CancellationToken cancellationToken = default) { - await foreach (var natsJSMsg in FetchAsync(new NatsJSFetchOpts { MaxMsgs = 1 }, cancellationToken: cancellationToken)) + await using var f = await FetchAsync( + new NatsJSFetchOpts + { + MaxMsgs = 1, + IdleHeartbeat = opts.IdleHeartbeat, + Expires = opts.Expires, + Serializer = opts.Serializer, + ErrorHandler = opts.ErrorHandler, + }, + cancellationToken: cancellationToken); + + await foreach (var natsJSMsg in f.Msgs.ReadAllAsync(cancellationToken)) { return natsJSMsg; } - throw new NatsJSException("No data"); + return default; } - public IAsyncEnumerable> FetchAsync( + public async IAsyncEnumerable> FetchAllAsync( NatsJSFetchOpts opts, - NatsSubOpts? requestOpts = default, - CancellationToken cancellationToken = default) => - throw new NotImplementedException(); + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await using var fc = await FetchAsync(opts, cancellationToken); + await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken)) + { + yield return jsMsg; + } + } + + public async ValueTask> FetchAsync( + NatsJSFetchOpts opts, + CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + + var inbox = _context.NewInbox(); + + var max = NatsJSOpsDefaults.SetMax(_context.Opts, opts.MaxMsgs, opts.MaxBytes); + var timeouts = NatsJSOpsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); + + var requestOpts = new NatsSubOpts + { + Serializer = opts.Serializer, + ChannelOptions = new NatsSubChannelOpts + { + // Keep capacity at 1 to make sure message acknowledgements are sent + // right after the message is processed and messages aren't queued up + // which might cause timeouts for acknowledgments. + Capacity = 1, + FullMode = BoundedChannelFullMode.Wait, + }, + }; + + var sub = new NatsJSSubFetch( + stream: _stream, + consumer: _consumer, + context: _context, + subject: inbox, + opts: requestOpts, + maxMsgs: max.MaxMsgs, + maxBytes: max.MaxBytes, + expires: timeouts.Expires, + idle: timeouts.IdleHeartbeat, + errorHandler: opts.ErrorHandler); + + await _context.Connection.SubAsync( + subject: inbox, + opts: requestOpts, + sub: sub, + cancellationToken); + + await sub.CallMsgNextAsync( + new ConsumerGetnextRequest + { + Batch = max.MaxMsgs, + MaxBytes = max.MaxBytes, + IdleHeartbeat = timeouts.IdleHeartbeat.ToNanos(), + Expires = timeouts.Expires.ToNanos(), + }, + cancellationToken); + + sub.ResetHeartbeatTimer(); + + return sub; + } private void ThrowIfDeleted() { diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 0bd9782e3..ce53d3952 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -7,23 +7,23 @@ namespace NATS.Client.JetStream; public partial class NatsJSContext { - public NatsJSContext(NatsConnection nats) - : this(nats, new NatsJSOpts()) + public NatsJSContext(NatsConnection connection) + : this(connection, new NatsJSOpts(connection.Opts)) { } - public NatsJSContext(NatsConnection nats, NatsJSOpts opts) + public NatsJSContext(NatsConnection connection, NatsJSOpts opts) { - Nats = nats; - if (opts.InboxPrefix == string.Empty) - opts = opts with { InboxPrefix = nats.Options.InboxPrefix }; + Connection = connection; Opts = opts; } - internal NatsConnection Nats { get; } + internal NatsConnection Connection { get; } internal NatsJSOpts Opts { get; } + internal string NewInbox() => $"{Opts.InboxPrefix}.{Guid.NewGuid():n}"; + public ValueTask GetAccountInfoAsync(CancellationToken cancellationToken = default) => JSRequestResponseAsync( subject: $"{Opts.ApiPrefix}.INFO", @@ -36,7 +36,7 @@ public async ValueTask PublishAsync( NatsPubOpts opts = default, CancellationToken cancellationToken = default) { - await using var sub = await Nats.RequestSubAsync( + await using var sub = await Connection.RequestSubAsync( subject: subject, data: data, requestOpts: opts, @@ -84,11 +84,11 @@ internal async ValueTask> JSRequestAsync( + await using var sub = await Connection.RequestSubAsync( subject: subject, data: request, requestOpts: default, - replyOpts: new NatsSubOpts { Serializer = JSErrorAwareJsonSerializer.Default }, + replyOpts: new NatsSubOpts { Serializer = NatsJSErrorAwareJsonSerializer.Default }, cancellationToken) .ConfigureAwait(false); @@ -107,7 +107,7 @@ internal async ValueTask> JSRequestAsync -/// NATS JetStream message with and control messages. +/// NATS JetStream message with and control messages. /// -public readonly struct NatsJSMsg +/// User message type +public readonly struct NatsJSMsg { - public NatsMsg Msg { get; init; } + private readonly NatsJSContext _context; + private readonly NatsMsg _msg; - public ValueTask Ack(CancellationToken cancellationToken = default) + internal NatsJSMsg(NatsMsg msg, NatsJSContext context) { - if (Msg == default) - throw new NatsJSException("No user message, can't acknowledge"); - return Msg.ReplyAsync(NatsJSConstants.Ack, cancellationToken: cancellationToken); + _msg = msg; + _context = context; } -} -/// -/// NATS JetStream message with and control messages. -/// -/// User message type -public readonly struct NatsJSMsg -{ - public NatsJSMsg(NatsMsg msg) => Msg = msg; + public string Subject => _msg.Subject; + + public int Size => _msg.Size; + + public NatsHeaders? Headers => _msg.Headers; + + public T? Data => _msg.Data; + + public INatsConnection? Connection => _msg.Connection; - public NatsMsg Msg { get; } + public ValueTask AckAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Ack, opts, cancellationToken); - public ValueTask Ack(CancellationToken cancellationToken = default) + public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Nack, opts, cancellationToken); + + public ValueTask AckProgressAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckProgress, opts, cancellationToken); + + public ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckTerminate, opts, cancellationToken); + + private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = default, CancellationToken cancellationToken = default) { - if (Msg == default) + if (_msg == default) throw new NatsJSException("No user message, can't acknowledge"); - return Msg.ReplyAsync(NatsJSConstants.Ack, cancellationToken: cancellationToken); + + return _msg.ReplyAsync( + payload: payload, + opts: new NatsPubOpts + { + WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent, + }, + cancellationToken: cancellationToken); } } + +public readonly record struct AckOpts(bool? WaitUntilSent); diff --git a/src/NATS.Client.JetStream/NatsJSNotification.cs b/src/NATS.Client.JetStream/NatsJSNotification.cs index 1f487017b..34232e218 100644 --- a/src/NATS.Client.JetStream/NatsJSNotification.cs +++ b/src/NATS.Client.JetStream/NatsJSNotification.cs @@ -1,16 +1,6 @@ namespace NATS.Client.JetStream; -public class NatsJSNotification +public record NatsJSNotification(int Code, string Description) { - public static readonly NatsJSNotification Timeout = new(code: 100, message: "Timeout"); - - public NatsJSNotification(int code, string message) - { - Code = code; - Message = message; - } - - public int Code { get; } - - public string Message { get; } + public static readonly NatsJSNotification HeartbeatTimeout = new NatsJSNotification(1001, "Heartbeat Timeout"); } diff --git a/src/NATS.Client.JetStream/NatsJSOpts.cs b/src/NATS.Client.JetStream/NatsJSOpts.cs index e031951f1..07362c28b 100644 --- a/src/NATS.Client.JetStream/NatsJSOpts.cs +++ b/src/NATS.Client.JetStream/NatsJSOpts.cs @@ -1,11 +1,21 @@ +using NATS.Client.Core; + namespace NATS.Client.JetStream; public record NatsJSOpts { + public NatsJSOpts(NatsOpts opts, string? apiPrefix = default, int? maxMsgs = default, AckOpts? ackOpts = default, string? inboxPrefix = default) + { + ApiPrefix = apiPrefix ?? "$JS.API"; + MaxMsgs = maxMsgs ?? 1000; + AckOpts = ackOpts ?? new AckOpts(opts.WaitUntilSent); + InboxPrefix = inboxPrefix ?? opts.InboxPrefix; + } + /// /// Prefix to prepend to JetStream API subjects. (default: $JS.API) /// - public string ApiPrefix { get; init; } = "$JS.API"; + public string ApiPrefix { get; init; } /// /// Prefix to use in inbox subscription subjects to receive messages from JetStream. (default: _INBOX) @@ -13,18 +23,20 @@ public record NatsJSOpts /// Default is taken from NatsOptions (on the parent NatsConnection) which is '_INBOX' if not set. /// /// - public string InboxPrefix { get; init; } = string.Empty; + public string InboxPrefix { get; init; } /// /// Maximum number of messages to receive in a batch. (default: 1000) /// public int MaxMsgs { get; init; } = 1000; + + public AckOpts AckOpts { get; init; } } public record NatsJSConsumeOpts { /// - /// Maximum number of messages stored in the buffer + /// Errors and notifications handler /// public Action? ErrorHandler { get; init; } @@ -57,10 +69,24 @@ public record NatsJSConsumeOpts /// Hint for the number of bytes left in buffer that should trigger a low watermark on the client, and influence it to request more data. /// public int? ThresholdBytes { get; init; } + + /// + /// Serializer to use to deserialize the message if a model is being used. + /// + /// + /// If not set, serializer set in connection options or the default JSON serializer + /// will be used. + /// + public INatsSerializer? Serializer { get; init; } } public record NatsJSNextOpts { + /// + /// Errors and notifications handler + /// + public Action? ErrorHandler { get; init; } + /// /// Amount of time to wait for the request to expire (in nanoseconds) /// @@ -70,10 +96,24 @@ public record NatsJSNextOpts /// Amount idle time the server should wait before sending a heartbeat. For requests with expires > 30s, heartbeats should be enabled by default /// public TimeSpan? IdleHeartbeat { get; init; } + + /// + /// Serializer to use to deserialize the message if a model is being used. + /// + /// + /// If not set, serializer set in connection options or the default JSON serializer + /// will be used. + /// + public INatsSerializer? Serializer { get; init; } } public record NatsJSFetchOpts { + /// + /// Errors and notifications handler + /// + public Action? ErrorHandler { get; init; } + /// /// Maximum number of messages to return /// @@ -93,4 +133,13 @@ public record NatsJSFetchOpts /// Amount idle time the server should wait before sending a heartbeat. For requests with expires > 30s, heartbeats should be enabled by default /// public TimeSpan? IdleHeartbeat { get; init; } + + /// + /// Serializer to use to deserialize the message if a model is being used. + /// + /// + /// If not set, serializer set in connection options or the default JSON serializer + /// will be used. + /// + public INatsSerializer? Serializer { get; init; } } diff --git a/src/NATS.Client.JetStream/NatsJSSubConsume.cs b/src/NATS.Client.JetStream/NatsJSSubConsume.cs new file mode 100644 index 000000000..0a4213d47 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSSubConsume.cs @@ -0,0 +1,336 @@ +using System.Buffers; +using System.Text; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using NATS.Client.Core.Commands; +using NATS.Client.JetStream.Internal; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +public class NatsJSSubConsume : NatsSubBase, INatsJSSubConsume +{ + private readonly ILogger _logger; + private readonly bool _debug; + private readonly Channel> _userMsgs; + private readonly Channel _notifications; + private readonly Channel _pullRequests; + private readonly NatsJSContext _context; + private readonly string _stream; + private readonly string _consumer; + private readonly Action? _errorHandler; + private readonly INatsSerializer _serializer; + private readonly Timer _timer; + private readonly Task _pullTask; + private readonly Task _notificationsTask; + + private readonly long _maxMsgs; + private readonly long _expires; + private readonly long _idle; + private readonly long _hbTimeout; + private readonly long _thresholdMsgs; + private readonly long _maxBytes; + private readonly long _thresholdBytes; + + private long _pendingMsgs; + private long _pendingBytes; + + public NatsJSSubConsume( + long maxMsgs, + long thresholdMsgs, + long maxBytes, + long thresholdBytes, + TimeSpan expires, + TimeSpan idle, + NatsJSContext context, + string stream, + string consumer, + string subject, + NatsSubOpts? opts, + Action? errorHandler) + : base(context.Connection, context.Connection.SubscriptionManager, subject, opts) + { + _logger = Connection.Opts.LoggerFactory.CreateLogger>(); + _debug = _logger.IsEnabled(LogLevel.Debug); + _context = context; + _stream = stream; + _consumer = consumer; + _errorHandler = errorHandler; + _serializer = opts?.Serializer ?? context.Connection.Opts.Serializer; + + _maxMsgs = maxMsgs; + _thresholdMsgs = thresholdMsgs; + _maxBytes = maxBytes; + _thresholdBytes = thresholdBytes; + _expires = expires.ToNanos(); + _idle = idle.ToNanos(); + _hbTimeout = (int)(idle * 2).TotalMilliseconds; + + if (_debug) + { + _logger.LogDebug( + "Consumer setup {@Config}", + new + { + maxMsgs, + thresholdMsgs, + maxBytes, + thresholdBytes, + expires, + idle, + _hbTimeout, + }); + } + + _timer = new Timer( + static state => + { + var self = (NatsJSSubConsume)state!; + self.Pull(self._maxMsgs, self._maxBytes); + self.ResetPending(); + self._notifications.Writer.TryWrite(NatsJSNotification.HeartbeatTimeout); + }, + this, + Timeout.Infinite, + Timeout.Infinite); + + _userMsgs = Channel.CreateBounded>(NatsSub.GetChannelOptions(opts?.ChannelOptions)); + Msgs = _userMsgs.Reader; + + _pullRequests = Channel.CreateBounded(NatsSub.GetChannelOptions(opts?.ChannelOptions)); + _pullTask = Task.Run(PullLoop); + + _notifications = Channel.CreateBounded(NatsSub.GetChannelOptions(opts?.ChannelOptions)); + _notificationsTask = Task.Run(NotificationsLoop); + } + + public ChannelReader> Msgs { get; } + + public ValueTask CallMsgNextAsync(ConsumerGetnextRequest request, CancellationToken cancellationToken = default) => + Connection.PubModelAsync( + subject: $"{_context.Opts.ApiPrefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", + data: request, + serializer: JsonNatsSerializer.Default, + replyTo: Subject, + headers: default, + cancellationToken); + + public void ResetPending() + { + _pendingMsgs = _maxMsgs; + _pendingBytes = _maxBytes; + } + + public void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, Timeout.Infinite); + + public override async ValueTask DisposeAsync() + { + await base.DisposeAsync().ConfigureAwait(false); + await _pullTask.ConfigureAwait(false); + await _notificationsTask.ConfigureAwait(false); + await _timer.DisposeAsync().ConfigureAwait(false); + } + + internal override IEnumerable GetReconnectCommands(int sid) + { + foreach (var command in base.GetReconnectCommands(sid)) + yield return command; + + ResetPending(); + + var request = new ConsumerGetnextRequest + { + Batch = _maxMsgs, + MaxBytes = _maxBytes, + IdleHeartbeat = _idle, + Expires = _expires, + }; + + yield return PublishCommand.Create( + pool: Connection.ObjectPool, + subject: $"{_context.Opts.ApiPrefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", + replyTo: Subject, + headers: default, + value: request, + serializer: JsonNatsSerializer.Default, + cancellationToken: default); + } + + protected override async ValueTask ReceiveInternalAsync( + string subject, + string? replyTo, + ReadOnlySequence? headersBuffer, + ReadOnlySequence payloadBuffer) + { + ResetHeartbeatTimer(); + + if (subject == Subject) + { + if (headersBuffer.HasValue) + { + var headers = new NatsHeaders(); + if (Connection.HeaderParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) + { + if (_maxBytes == 0 && headers.TryGetValue("Nats-Pending-Messages", out var natsPendingMsgs)) + { + if (long.TryParse(natsPendingMsgs, out var pendingMsgs)) + { + if (_debug) + { + _logger.LogDebug("Header pending messages current {Pending}", _pendingMsgs); + } + + _pendingMsgs -= pendingMsgs; + if (_pendingMsgs < 0) + _pendingMsgs = 0; + + if (_debug) + { + _logger.LogDebug("Header pending messages {Header} {Pending}", natsPendingMsgs, _pendingMsgs); + } + } + else + { + _logger.LogError("Can't parse Nats-Pending-Messages {Header}", natsPendingMsgs); + } + } + + if (_maxBytes > 0 && headers.TryGetValue("Nats-Pending-Bytes", out var natsPendingBytes)) + { + if (long.TryParse(natsPendingBytes, out var pendingBytes)) + { + if (_debug) + { + _logger.LogDebug("Header pending bytes current {Pending}", _pendingBytes); + } + + _pendingBytes -= pendingBytes; + if (_pendingBytes < 0) + _pendingBytes = 0; + + if (_debug) + { + _logger.LogDebug("Header pending bytes {Header} {Pending}", natsPendingBytes, _pendingBytes); + } + } + else + { + _logger.LogError("Can't parse Nats-Pending-Bytes {Header}", natsPendingBytes); + } + } + + if (headers is { Code: 408, Message: NatsHeaders.Messages.RequestTimeout }) + { + } + else if (headers is { Code: 409, Message: NatsHeaders.Messages.MessageSizeExceedsMaxBytes }) + { + } + else if (headers is { Code: 100, Message: NatsHeaders.Messages.IdleHeartbeat }) + { + } + else + { + _notifications.Writer.TryWrite(new NatsJSNotification(headers.Code, headers.MessageText)); + } + } + else + { + _logger.LogError( + "Can't parse headers: {HeadersBuffer}", + Encoding.ASCII.GetString(headersBuffer.Value.ToArray())); + throw new NatsJSException("Can't parse headers"); + } + } + else + { + throw new NatsJSException("No header found"); + } + } + else + { + var msg = new NatsJSMsg( + NatsMsg.Build( + subject, + replyTo, + headersBuffer, + payloadBuffer, + Connection, + Connection.HeaderParser, + _serializer), + _context); + + _pendingMsgs--; + + if (_maxBytes > 0) + { + if (_debug) + _logger.LogDebug("Message size {Size}", msg.Size); + + _pendingBytes -= msg.Size; + } + + await _userMsgs.Writer.WriteAsync(msg).ConfigureAwait(false); + } + + CheckPending(); + } + + protected override void TryComplete() + { + _pullRequests.Writer.TryComplete(); + _userMsgs.Writer.TryComplete(); + _notifications.Writer.TryComplete(); + } + + private void CheckPending() + { + if (_maxBytes > 0 && _pendingBytes <= _thresholdBytes) + { + if (_debug) + _logger.LogDebug("Check pending bytes {Pending}", _pendingBytes); + + Pull(_maxMsgs, _maxBytes - _pendingBytes); + ResetPending(); + } + else if (_maxBytes == 0 && _pendingMsgs <= _thresholdMsgs) + { + if (_debug) + _logger.LogDebug("Check pending messages {Pending}", _pendingMsgs); + + Pull(_maxMsgs - _pendingMsgs, 0); + ResetPending(); + } + } + + private void Pull(long batch, long maxBytes) => _pullRequests.Writer.TryWrite(new ConsumerGetnextRequest + { + Batch = batch, + MaxBytes = maxBytes, + IdleHeartbeat = _idle, + Expires = _expires, + }); + + private async Task PullLoop() + { + await foreach (var pr in _pullRequests.Reader.ReadAllAsync()) + { + await CallMsgNextAsync(pr).ConfigureAwait(false); + } + } + + private async Task NotificationsLoop() + { + await foreach (var notification in _notifications.Reader.ReadAllAsync()) + { + try + { + _errorHandler?.Invoke(notification); + } + catch (Exception e) + { + _logger.LogError(e, "Notification error handler error"); + } + } + } +} diff --git a/src/NATS.Client.JetStream/NatsJSSubFetch.cs b/src/NATS.Client.JetStream/NatsJSSubFetch.cs new file mode 100644 index 000000000..55d46a890 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSSubFetch.cs @@ -0,0 +1,225 @@ +using System.Buffers; +using System.Text; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using NATS.Client.Core.Commands; +using NATS.Client.JetStream.Internal; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +public class NatsJSSubFetch : NatsSubBase, INatsJSSubFetch +{ + private readonly ILogger _logger; + private readonly Channel> _userMsgs; + private readonly Channel _notifications; + private readonly NatsJSContext _context; + private readonly string _stream; + private readonly string _consumer; + private readonly Action? _errorHandler; + private readonly INatsSerializer _serializer; + private readonly Timer _hbTimer; + private readonly Timer _expiresTimer; + private readonly Task _notificationsTask; + + private readonly long _maxMsgs; + private readonly long _maxBytes; + private readonly long _expires; + private readonly long _idle; + private readonly long _hbTimeout; + + private long _pendingMsgs; + private long _pendingBytes; + + public NatsJSSubFetch( + long maxMsgs, + long maxBytes, + TimeSpan expires, + TimeSpan idle, + NatsJSContext context, + string stream, + string consumer, + string subject, + NatsSubOpts? opts, + Action? errorHandler) + : base(context.Connection, context.Connection.SubscriptionManager, subject, opts) + { + _logger = Connection.Opts.LoggerFactory.CreateLogger>(); + _context = context; + _stream = stream; + _consumer = consumer; + _errorHandler = errorHandler; + _serializer = opts?.Serializer ?? context.Connection.Opts.Serializer; + + _maxMsgs = maxMsgs; + _maxBytes = maxBytes; + _expires = expires.ToNanos(); + _idle = idle.ToNanos(); + _hbTimeout = (int)(idle * 2).TotalMilliseconds; + _pendingMsgs = _maxMsgs; + _pendingBytes = _maxBytes; + + _userMsgs = Channel.CreateBounded>(NatsSub.GetChannelOptions(opts?.ChannelOptions)); + Msgs = _userMsgs.Reader; + + _notifications = Channel.CreateBounded(NatsSub.GetChannelOptions(opts?.ChannelOptions)); + _notificationsTask = Task.Run(NotificationsLoop); + + _hbTimer = new Timer( + static state => + { + var self = (NatsJSSubFetch)state!; + self._notifications.Writer.TryWrite(NatsJSNotification.HeartbeatTimeout); + }, + this, + Timeout.Infinite, + Timeout.Infinite); + + _expiresTimer = new Timer( + static state => + { + var self = (NatsJSSubFetch)state!; + self.EndSubscription(NatsSubEndReason.Timeout); + }, + this, + expires + TimeSpan.FromSeconds(5), + Timeout.InfiniteTimeSpan); + } + + public ChannelReader> Msgs { get; } + + public ValueTask CallMsgNextAsync(ConsumerGetnextRequest request, CancellationToken cancellationToken = default) => + Connection.PubModelAsync( + subject: $"{_context.Opts.ApiPrefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", + data: request, + serializer: JsonNatsSerializer.Default, + replyTo: Subject, + headers: default, + cancellationToken); + + public void ResetHeartbeatTimer() => _hbTimer.Change(_hbTimeout, Timeout.Infinite); + + public override async ValueTask DisposeAsync() + { + await base.DisposeAsync().ConfigureAwait(false); + await _notificationsTask.ConfigureAwait(false); + await _hbTimer.DisposeAsync().ConfigureAwait(false); + await _expiresTimer.DisposeAsync().ConfigureAwait(false); + } + + internal override IEnumerable GetReconnectCommands(int sid) + { + foreach (var command in base.GetReconnectCommands(sid)) + yield return command; + + var request = new ConsumerGetnextRequest + { + Batch = _maxMsgs, + IdleHeartbeat = _idle, + Expires = _expires, + }; + + yield return PublishCommand.Create( + pool: Connection.ObjectPool, + subject: $"{_context.Opts.ApiPrefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", + replyTo: Subject, + headers: default, + value: request, + serializer: JsonNatsSerializer.Default, + cancellationToken: default); + } + + protected override async ValueTask ReceiveInternalAsync( + string subject, + string? replyTo, + ReadOnlySequence? headersBuffer, + ReadOnlySequence payloadBuffer) + { + ResetHeartbeatTimer(); + if (subject == Subject) + { + if (headersBuffer.HasValue) + { + var headers = new NatsHeaders(); + if (Connection.HeaderParser.ParseHeaders(new SequenceReader(headersBuffer.Value), headers)) + { + if (headers is { Code: 408, Message: NatsHeaders.Messages.RequestTimeout }) + { + EndSubscription(NatsSubEndReason.Timeout); + } + else if (headers is { Code: 409, Message: NatsHeaders.Messages.MessageSizeExceedsMaxBytes }) + { + EndSubscription(NatsSubEndReason.MaxBytes); + } + else if (headers is { Code: 100, Message: NatsHeaders.Messages.IdleHeartbeat }) + { + } + else + { + _notifications.Writer.TryWrite(new NatsJSNotification(headers.Code, headers.MessageText)); + } + } + else + { + _logger.LogError( + "Can't parse headers: {HeadersBuffer}", + Encoding.ASCII.GetString(headersBuffer.Value.ToArray())); + throw new NatsJSException("Can't parse headers"); + } + } + else + { + throw new NatsJSException("No header found"); + } + } + else + { + var msg = new NatsJSMsg( + NatsMsg.Build( + subject, + replyTo, + headersBuffer, + payloadBuffer, + Connection, + Connection.HeaderParser, + _serializer), + _context); + + _pendingMsgs--; + _pendingBytes -= msg.Size; + + await _userMsgs.Writer.WriteAsync(msg).ConfigureAwait(false); + } + + if (_maxBytes > 0 && _pendingBytes <= 0) + { + EndSubscription(NatsSubEndReason.MaxBytes); + } + else if (_maxBytes == 0 && _pendingMsgs == 0) + { + EndSubscription(NatsSubEndReason.MaxMsgs); + } + } + + protected override void TryComplete() + { + _userMsgs.Writer.TryComplete(); + _notifications.Writer.TryComplete(); + } + + private async Task NotificationsLoop() + { + await foreach (var notification in _notifications.Reader.ReadAllAsync()) + { + try + { + _errorHandler?.Invoke(notification); + } + catch (Exception e) + { + _logger.LogError(e, "Notification error handler error"); + } + } + } +} diff --git a/tests/NATS.Client.Core.Tests/CancellationTest.cs b/tests/NATS.Client.Core.Tests/CancellationTest.cs index 7c2f8bca7..96e8b7149 100644 --- a/tests/NATS.Client.Core.Tests/CancellationTest.cs +++ b/tests/NATS.Client.Core.Tests/CancellationTest.cs @@ -17,8 +17,8 @@ public async Task CommandTimeoutTest() { await using var server = NatsServer.Start(_output, TransportType.Tcp); - await using var subConnection = server.CreateClientConnection(NatsOptions.Default with { CommandTimeout = TimeSpan.FromSeconds(1) }); - await using var pubConnection = server.CreateClientConnection(NatsOptions.Default with { CommandTimeout = TimeSpan.FromSeconds(1) }); + await using var subConnection = server.CreateClientConnection(NatsOpts.Default with { CommandTimeout = TimeSpan.FromSeconds(1) }); + await using var pubConnection = server.CreateClientConnection(NatsOpts.Default with { CommandTimeout = TimeSpan.FromSeconds(1) }); await pubConnection.ConnectAsync(); await subConnection.SubscribeAsync("foo"); diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs index 0ef420657..795aa2d00 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs @@ -9,7 +9,7 @@ public static IEnumerable GetAuthConfigs() new Auth( "TOKEN", "resources/configs/auth/token.conf", - NatsOptions.Default with { AuthOptions = NatsAuthOptions.Default with { Token = "s3cr3t", }, }), + NatsOpts.Default with { AuthOptions = NatsAuthOptions.Default with { Token = "s3cr3t", }, }), }; yield return new object[] @@ -17,7 +17,7 @@ public static IEnumerable GetAuthConfigs() new Auth( "USER-PASSWORD", "resources/configs/auth/password.conf", - NatsOptions.Default with + NatsOpts.Default with { AuthOptions = NatsAuthOptions.Default with { Username = "a", Password = "b", }, }), @@ -28,7 +28,7 @@ NatsOptions.Default with new Auth( "NKEY", "resources/configs/auth/nkey.conf", - NatsOptions.Default with + NatsOpts.Default with { AuthOptions = NatsAuthOptions.Default with { @@ -43,7 +43,7 @@ NatsOptions.Default with new Auth( "NKEY (FROM FILE)", "resources/configs/auth/nkey.conf", - NatsOptions.Default with + NatsOpts.Default with { AuthOptions = NatsAuthOptions.Default with { NKeyFile = "resources/configs/auth/user.nk", }, }), @@ -54,7 +54,7 @@ NatsOptions.Default with new Auth( "USER-CREDS", "resources/configs/auth/operator.conf", - NatsOptions.Default with + NatsOpts.Default with { AuthOptions = NatsAuthOptions.Default with { @@ -70,7 +70,7 @@ NatsOptions.Default with new Auth( "USER-CREDS (FROM FILE)", "resources/configs/auth/operator.conf", - NatsOptions.Default with + NatsOpts.Default with { AuthOptions = NatsAuthOptions.Default with { CredsFile = "resources/configs/auth/user.creds", }, }), @@ -83,7 +83,7 @@ public async Task UserCredentialAuthTest(Auth auth) { var name = auth.Name; var serverConfig = auth.ServerConfig; - var clientOptions = auth.ClientOptions; + var clientOptions = auth.ClientOpts; _output.WriteLine($"AUTH TEST {name}"); @@ -162,18 +162,18 @@ await Retry.Until( public class Auth { - public Auth(string name, string serverConfig, NatsOptions clientOptions) + public Auth(string name, string serverConfig, NatsOpts clientOpts) { Name = name; ServerConfig = serverConfig; - ClientOptions = clientOptions; + ClientOpts = clientOpts; } public string Name { get; } public string ServerConfig { get; } - public NatsOptions ClientOptions { get; } + public NatsOpts ClientOpts { get; } public override string ToString() => Name; } diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs index 9e6a5f35f..8e361b07c 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs @@ -57,11 +57,11 @@ public async Task EncodingTest() { await using var server = NatsServer.Start(_output, _transportType); - var serializer1 = NatsOptions.Default.Serializer; + var serializer1 = NatsOpts.Default.Serializer; foreach (var serializer in new INatsSerializer[] { serializer1 }) { - var options = NatsOptions.Default with { Serializer = serializer }; + var options = NatsOpts.Default with { Serializer = serializer }; await using var subConnection = server.CreateClientConnection(options); await using var pubConnection = server.CreateClientConnection(options); @@ -100,7 +100,7 @@ public async Task RequestTest(int minSize) { await using var server = NatsServer.Start(_output, _transportType); - var options = NatsOptions.Default with { RequestTimeout = TimeSpan.FromSeconds(5) }; + var options = NatsOpts.Default with { RequestTimeout = TimeSpan.FromSeconds(5) }; await using var subConnection = server.CreateClientConnection(options); await using var pubConnection = server.CreateClientConnection(options); diff --git a/tests/NATS.Client.Core.Tests/NatsHeaderTest.cs b/tests/NATS.Client.Core.Tests/NatsHeaderTest.cs index bb4f59674..8fd768878 100644 --- a/tests/NATS.Client.Core.Tests/NatsHeaderTest.cs +++ b/tests/NATS.Client.Core.Tests/NatsHeaderTest.cs @@ -136,4 +136,24 @@ public void ParserHeaderVersionOnlyTests(string text, int code, string message, Assert.Equal(message, headers.MessageText); Assert.Equal(headerCount, headers.Count); } + + [Fact] + public void ParserMultiSpanTests() + { + const string text1 = "NATS/1.0 123 Test "; + const string text2 = "Message\r\n\r\n"; + + var parser = new HeaderParser(Encoding.UTF8); + var builder = new SeqeunceBuilder(); + builder.Append(Encoding.UTF8.GetBytes(text1)); + builder.Append(Encoding.UTF8.GetBytes(text2)); + var input = new SequenceReader(builder.ToReadOnlySequence()); + + var headers = new NatsHeaders(); + parser.ParseHeaders(input, headers); + + Assert.Equal(1, headers.Version); + Assert.Equal(123, headers.Code); + Assert.Equal("Test Message", headers.MessageText); + } } diff --git a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs index b0e18c38f..8d0b081ae 100644 --- a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs +++ b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs @@ -38,7 +38,7 @@ public async Task Request_reply_command_timeout_test() // Request timeout as default timeout { - await using var nats = server.CreateClientConnection(NatsOptions.Default with + await using var nats = server.CreateClientConnection(NatsOpts.Default with { RequestTimeout = TimeSpan.FromSeconds(1), }); diff --git a/tests/NATS.Client.Core.Tests/SubscriptionTest.cs b/tests/NATS.Client.Core.Tests/SubscriptionTest.cs index 9d1d7114e..dcc3f3a34 100644 --- a/tests/NATS.Client.Core.Tests/SubscriptionTest.cs +++ b/tests/NATS.Client.Core.Tests/SubscriptionTest.cs @@ -14,7 +14,7 @@ public async Task Subscription_periodic_cleanup_test() .UseTransport(TransportType.Tcp) .Build(); await using var server = NatsServer.Start(_output, serverOptions); - var options = NatsOptions.Default with { SubscriptionCleanUpInterval = TimeSpan.FromSeconds(1) }; + var options = NatsOpts.Default with { SubscriptionCleanUpInterval = TimeSpan.FromSeconds(1) }; var (nats, proxy) = server.CreateProxiedClientConnection(options); async Task Isolator() @@ -53,7 +53,7 @@ public async Task Subscription_cleanup_on_message_receive_test() await using var server = NatsServer.Start(_output, TransportType.Tcp); // Make sure time won't kick-in and unsubscribe - var options = NatsOptions.Default with { SubscriptionCleanUpInterval = TimeSpan.MaxValue }; + var options = NatsOpts.Default with { SubscriptionCleanUpInterval = TimeSpan.MaxValue }; var (nats, proxy) = server.CreateProxiedClientConnection(options); async Task Isolator() diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs index a77844715..dd65373cd 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -1,4 +1,3 @@ -using System.Text.RegularExpressions; using NATS.Client.Core.Tests; namespace NATS.Client.JetStream.Tests; @@ -13,6 +12,7 @@ public class ConsumerConsumeTest public async Task Consume_msgs_test() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + await using var server = NatsServer.Start( outputHelper: _output, options: new NatsServerOptionsBuilder() @@ -20,7 +20,6 @@ public async Task Consume_msgs_test() .Trace() .UseJetStream() .Build()); - var (nats, proxy) = server.CreateProxiedClientConnection(); var js = new NatsJSContext(nats); await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); @@ -35,36 +34,42 @@ public async Task Consume_msgs_test() var consumerOpts = new NatsJSConsumeOpts { MaxMsgs = 10 }; var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; - var cc = await consumer.ConsumeAsync(consumerOpts, cancellationToken: cts.Token); + await using var cc = await consumer.ConsumeAsync(consumerOpts, cancellationToken: cts.Token); await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token)) { - await msg.Ack(cts.Token); - Assert.Equal(count, msg.Msg.Data!.Test); + await msg.AckAsync(new AckOpts(true), cts.Token); + Assert.Equal(count, msg.Data!.Test); count++; - if (count == 25) + if (count == 30) break; } - Assert.Equal(25, count); + int? PullCount() => proxy? + .ClientFrames + .Count(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")); + + await Retry.Until( + reason: "received enough pulls", + condition: () => PullCount() > 5, + action: () => + { + _output.WriteLine($"### PullCount:{PullCount()}"); + return Task.CompletedTask; + }, + retryDelay: TimeSpan.FromSeconds(3), + timeout: TimeSpan.FromSeconds(15)); - // TODO: we seem to be getting inconsistent number of pulls here! - // It's sometimes 5 sometimes 7! - // await Retry.Until( - // "receiving all pulls", - // () => proxy - // .ClientFrames - // .Count(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) == 7); var msgNextRequests = proxy .ClientFrames .Where(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) .ToList(); - // Prefetch + // Initial pull Assert.Matches(@"^PUB.*""batch"":10\b", msgNextRequests.First().Message); foreach (var frame in msgNextRequests.Skip(1)) { - // Consequent fetches should top up to the prefetch value + // Consequent pulls should top-up to max value Assert.Matches(@"^PUB.*""batch"":5\b", frame.Message); } } @@ -107,26 +112,27 @@ public async Task Consume_idle_heartbeat_test() var cc = await consumer.ConsumeAsync(consumerOpts, cancellationToken: cts.Token); await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token)) { - await msg.Ack(cts.Token); - Assert.Equal(count, msg.Msg.Data!.Test); + await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); + Assert.Equal(count, msg.Data!.Test); await signal; break; } + await Retry.Until( + "all pull requests are received", + () => proxy.ClientFrames.Count(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) == 2); + var msgNextRequests = proxy .ClientFrames .Where(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) .ToList(); - Assert.Single(msgNextRequests); + Assert.Equal(2, msgNextRequests.Count); - // Prefetch - Assert.Matches(@"^PUB.*""batch"":10\b", msgNextRequests.First().Message); - - foreach (var frame in msgNextRequests.Skip(1)) + // Pull requests + foreach (var frame in msgNextRequests) { - // Consequent fetches should top up to the prefetch value - Assert.Matches(@"^PUB.*""batch"":5\b", frame.Message); + Assert.Matches(@"^PUB.*""batch"":10\b", frame.Message); } } @@ -149,7 +155,7 @@ public async Task Consume_reconnect_test() MaxMsgs = 10, ErrorHandler = e => { - _output.WriteLine($"Consume error: {e.Code} {e.Message}"); + _output.WriteLine($"Consume error: {e.Code} {e.Description}"); }, }; @@ -165,8 +171,8 @@ public async Task Consume_reconnect_test() var count = 0; await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token)) { - await msg.Ack(cts.Token); - Assert.Equal(count, msg.Msg.Data!.Test); + await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); + Assert.Equal(count, msg.Data!.Test); count++; // We only need two test messages; before and after reconnect. diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs index bbf4f71f6..05d8c19cc 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs @@ -8,7 +8,7 @@ public class ConsumerFetchTest public ConsumerFetchTest(ITestOutputHelper output) => _output = output; - [Fact(Skip = "TODO")] + [Fact] public async Task Fetch_test() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); @@ -26,10 +26,12 @@ public async Task Fetch_test() var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; - await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token)) + await using var fc = + await consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token); + await foreach (var msg in fc.Msgs.ReadAllAsync(cts.Token)) { - await msg.Ack(cts.Token); - Assert.Equal(count, msg.Msg.Data!.Test); + await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); + Assert.Equal(count, msg.Data!.Test); count++; } diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerNextTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerNextTest.cs index 27bd44569..c615b2ba9 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerNextTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerNextTest.cs @@ -8,7 +8,7 @@ public class ConsumerNextTest public ConsumerNextTest(ITestOutputHelper output) => _output = output; - [Fact(Skip = "TODO")] + [Fact] public async Task Next_test() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); @@ -23,9 +23,12 @@ public async Task Next_test() { var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); ack.EnsureSuccess(); - var msg = await consumer.NextAsync(cts.Token); - await msg.Ack(cts.Token); - Assert.Equal(i, msg.Msg.Data!.Test); + var next = await consumer.NextAsync(new NatsJSNextOpts(), cts.Token); + if (next is { } msg) + { + await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); + Assert.Equal(i, msg.Data!.Test); + } } } diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerStateTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerStateTest.cs index 5c7dc18f6..f25183b40 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerStateTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerStateTest.cs @@ -19,12 +19,12 @@ public void Default_options() [Fact] public void Allow_only_max_msgs_or_bytes_options() => - Assert.Throws(() => _ = NatsJSOpsDefaults.SetMax(new NatsJSOpts(), 1, 1)); + Assert.Throws(() => _ = NatsJSOpsDefaults.SetMax(new NatsJSOpts(NatsOpts.Default), 1, 1)); [Fact] public void Set_bytes_option() { - var opts = NatsJSOpsDefaults.SetMax(new NatsJSOpts(), maxBytes: 1024); + var opts = NatsJSOpsDefaults.SetMax(new NatsJSOpts(NatsOpts.Default), maxBytes: 1024); Assert.Equal(1_000_000, opts.MaxMsgs); Assert.Equal(1024, opts.MaxBytes); Assert.Equal(500_000, opts.ThresholdMsgs); @@ -91,50 +91,4 @@ public void Set_threshold_option(int max, int? threshold, int expected) Assert.Equal(expected, opts.ThresholdBytes); } } - - [Fact] - public void Calculate_pending_msgs() - { - var state = new NatsJSSubState(optsMaxMsgs: 100, optsThresholdMsgs: 10); - - // initial pull - var init = state.GetRequest(); - Assert.Equal(100, init.Batch); - Assert.Equal(0, init.MaxBytes); - - for (var i = 0; i < 89; i++) - { - state.MsgReceived(128); - Assert.False(state.CanFetch(), $"iter:{i}"); - } - - state.MsgReceived(128); - Assert.True(state.CanFetch()); - var request = state.GetRequest(); - Assert.Equal(90, request.Batch); - Assert.Equal(0, request.MaxBytes); - } - - [Fact] - public void Calculate_pending_bytes() - { - var state = new NatsJSSubState(optsMaxBytes: 1000, optsThresholdBytes: 100); - - // initial pull - var init = state.GetRequest(); - Assert.Equal(1_000_000, init.Batch); - Assert.Equal(1000, init.MaxBytes); - - for (var i = 0; i < 89; i++) - { - state.MsgReceived(10); - Assert.False(state.CanFetch(), $"iter:{i}"); - } - - state.MsgReceived(10); - Assert.True(state.CanFetch()); - var request = state.GetRequest(); - Assert.Equal(1_000_000, request.Batch); - Assert.Equal(900, request.MaxBytes); - } } diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs index 2ca3f46f1..978faff65 100644 --- a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs @@ -12,14 +12,20 @@ public class JetStreamTest [Fact] public async Task Create_stream_test() { - await using var server = NatsServer.StartJS(); + await using var server = NatsServer.Start( + outputHelper: _output, + options: new NatsServerOptionsBuilder() + .UseTransport(TransportType.Tcp) + .Trace() + .UseJetStream() + .Build()); var nats = server.CreateClientConnection(); // Happy user { var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var js = new NatsJSContext(nats, new NatsJSOpts()); + var js = new NatsJSContext(nats); // Create stream var stream = await js.CreateStreamAsync( @@ -39,12 +45,6 @@ public async Task Create_stream_test() // Turn on ACK so we can test them below AckPolicy = ConsumerConfigurationAckPolicy.@explicit, - - // Effectively set message expiry for the consumer - // so that unacknowledged messages can be put back into - // the consumer to be delivered again (in a sense). - // This is to make below consumer tests work. - AckWait = 2_000_000_000, // 2 seconds }, }, cts1.Token); @@ -93,7 +93,7 @@ public async Task Create_stream_test() // Only ACK one message so we can consume again if (messages.Count == 1) { - await msg.Ack(cts2.Token); + await msg.AckAsync(new AckOpts(WaitUntilSent: true), cancellationToken: cts2.Token); } if (messages.Count == 2) @@ -103,25 +103,15 @@ public async Task Create_stream_test() } Assert.Equal(2, messages.Count); - Assert.Equal("events.foo", messages[0].Msg.Subject); - Assert.Equal("events.foo", messages[1].Msg.Subject); - var cc2 = await consumer.ConsumeAsync( - new NatsJSConsumeOpts { MaxMsgs = 100 }, - cancellationToken: cts2.Token); - - // Consume the unacknowledged message - await foreach (var msg in cc2.Msgs.ReadAllAsync(cts2.Token)) - { - Assert.Equal("events.foo", msg.Msg.Subject); - break; - } + Assert.Equal("events.foo", messages[0].Subject); + Assert.Equal("events.foo", messages[1].Subject); } // Handle errors { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var js = new NatsJSContext(nats, new NatsJSOpts()); + var js = new NatsJSContext(nats); var exception = await Assert.ThrowsAsync(async () => { await js.CreateStreamAsync( @@ -142,7 +132,7 @@ await js.CreateStreamAsync( { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var js = new NatsJSContext(nats, new NatsJSOpts()); + var js = new NatsJSContext(nats); // Success await js.DeleteStreamAsync("events", cts.Token); diff --git a/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs index aa0f1452a..6eff05751 100644 --- a/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs @@ -16,7 +16,7 @@ public async Task Create_get_consumer() await using var server = NatsServer.StartJS(); var nats = server.CreateClientConnection(); - var js = new NatsJSContext(nats, new NatsJSOpts()); + var js = new NatsJSContext(nats); await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); // Create @@ -52,7 +52,7 @@ public async Task List_delete_consumer() await using var server = NatsServer.StartJS(); var nats = server.CreateClientConnection(); - var js = new NatsJSContext(nats, new NatsJSOpts()); + var js = new NatsJSContext(nats); await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); await js.CreateConsumerAsync("s1", "c2", cancellationToken: cts.Token); diff --git a/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs index c3666dd7a..174641bef 100644 --- a/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs @@ -14,7 +14,7 @@ public async Task Account_info_create_get_update_stream() { await using var server = NatsServer.StartJS(); var nats = server.CreateClientConnection(); - var js = new NatsJSContext(nats, new NatsJSOpts()); + var js = new NatsJSContext(nats); // Account Info { @@ -62,7 +62,7 @@ public async Task List_delete_stream() await using var server = NatsServer.StartJS(); var nats = server.CreateClientConnection(); - var js = new NatsJSContext(nats, new NatsJSOpts()); + var js = new NatsJSContext(nats); await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); await js.CreateStreamAsync("s2", new[] { "s2.*" }, cts.Token); diff --git a/tests/NATS.Client.TestUtilities/NatsServer.cs b/tests/NATS.Client.TestUtilities/NatsServer.cs index 4e5ddb27f..310090acf 100644 --- a/tests/NATS.Client.TestUtilities/NatsServer.cs +++ b/tests/NATS.Client.TestUtilities/NatsServer.cs @@ -151,7 +151,7 @@ public static NatsServer StartJS(ITestOutputHelper outputHelper, TransportType t public static NatsServer Start(ITestOutputHelper outputHelper, TransportType transportType) => Start(outputHelper, new NatsServerOptionsBuilder().UseTransport(transportType).Build()); - public static NatsServer Start(ITestOutputHelper outputHelper, NatsServerOptions options, NatsOptions? clientOptions = default) + public static NatsServer Start(ITestOutputHelper outputHelper, NatsServerOptions options, NatsOpts? clientOptions = default) { NatsServer? server = null; NatsConnection? nats = null; @@ -160,7 +160,7 @@ public static NatsServer Start(ITestOutputHelper outputHelper, NatsServerOptions try { server = new NatsServer(outputHelper, options); - nats = server.CreateClientConnection(clientOptions ?? NatsOptions.Default, reTryCount: 3); + nats = server.CreateClientConnection(clientOptions ?? NatsOpts.Default, reTryCount: 3); #pragma warning disable CA2012 return server; } @@ -228,7 +228,7 @@ public async ValueTask DisposeAsync() } } - public (NatsConnection, NatsProxy) CreateProxiedClientConnection(NatsOptions? options = null) + public (NatsConnection, NatsProxy) CreateProxiedClientConnection(NatsOpts? options = null) { if (Options.EnableTls) { @@ -237,7 +237,7 @@ public async ValueTask DisposeAsync() var proxy = new NatsProxy(Options.ServerPort, _outputHelper, Options.Trace); - var client = new NatsConnection((options ?? NatsOptions.Default) with + var client = new NatsConnection((options ?? NatsOpts.Default) with { LoggerFactory = new OutputHelperLoggerFactory(_outputHelper), Url = $"nats://localhost:{proxy.Port}", @@ -247,13 +247,13 @@ public async ValueTask DisposeAsync() return (client, proxy); } - public NatsConnection CreateClientConnection(NatsOptions? options = default, int reTryCount = 10, bool ignoreAuthorizationException = false) + public NatsConnection CreateClientConnection(NatsOpts? options = default, int reTryCount = 10, bool ignoreAuthorizationException = false) { for (var i = 0; i < reTryCount; i++) { try { - var nats = new NatsConnection(ClientOptions(options ?? NatsOptions.Default)); + var nats = new NatsConnection(ClientOptions(options ?? NatsOpts.Default)); try { @@ -281,16 +281,16 @@ public NatsConnection CreateClientConnection(NatsOptions? options = default, int throw new Exception("Can't create a connection to nats-server"); } - public NatsConnectionPool CreatePooledClientConnection() => CreatePooledClientConnection(NatsOptions.Default); + public NatsConnectionPool CreatePooledClientConnection() => CreatePooledClientConnection(NatsOpts.Default); - public NatsConnectionPool CreatePooledClientConnection(NatsOptions options) + public NatsConnectionPool CreatePooledClientConnection(NatsOpts opts) { - return new NatsConnectionPool(4, ClientOptions(options)); + return new NatsConnectionPool(4, ClientOptions(opts)); } - public NatsOptions ClientOptions(NatsOptions options) + public NatsOpts ClientOptions(NatsOpts opts) { - return options with + return opts with { LoggerFactory = new OutputHelperLoggerFactory(_outputHelper),