Skip to content

Commit

Permalink
JetStream pull consumer redesign (#115)
Browse files Browse the repository at this point in the history
* JetStream pull consumer redesign

* Consume and fetch all

* Max bytes

* Header parser multi span bug fix

* Msg size fix

* Fixed tests and async sync bug

* Fixed tests

* Fixed tests

* Fixed tests

* Fixed tests

* Trace test issue

* Fixed warnings

* Removed flappy no-ACK test

This test was checking if the server was resending a message
which was not ACKed, after the ack_wait times out. Resending
unacknowledged messages is a server function we could test
under more controlled test setup and it's this part of the
test isn't testing NATS NET codebase. For some reason resend
function isn't working most of the time when run under the
GitHub Actions environment and creating false positives.
Hence I decided to remove this part of the test to avoid
unnecessary noise for the time being.

* End fetch with internal clock

This is a case where server request timeout might not
reach the client because of network anomalies.

* Default to fast ACK

* Options -> Opts refactor
* Ignore extra messages on consumer dispose
* Propagate JS options for ACK

* Test asserts

* Flappy test trace

* Public API fixes

* JSMsg hides the NatsMsg and proxy relevant fields
* Moved RawData to example
* JSContext NewInbox is now internal
  • Loading branch information
mtmk authored Aug 31, 2023
1 parent ebcf2f1 commit 6f9df6e
Show file tree
Hide file tree
Showing 61 changed files with 1,325 additions and 1,034 deletions.
7 changes: 7 additions & 0 deletions NATS.Client.sln
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Schema.Generation", "tools\
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Perf", "tests\NATS.Client.Perf\NATS.Client.Perf.csproj", "{ADF66CBA-4F3E-4E91-9842-E194E3BC06A1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.JetStream.PullConsumer", "sandbox\Example.JetStream.PullConsumer\Example.JetStream.PullConsumer.csproj", "{3A9FC281-3B81-4D63-A76B-E1127C1D2241}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -159,6 +161,10 @@ Global
{ADF66CBA-4F3E-4E91-9842-E194E3BC06A1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{ADF66CBA-4F3E-4E91-9842-E194E3BC06A1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{ADF66CBA-4F3E-4E91-9842-E194E3BC06A1}.Release|Any CPU.Build.0 = Release|Any CPU
{3A9FC281-3B81-4D63-A76B-E1127C1D2241}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3A9FC281-3B81-4D63-A76B-E1127C1D2241}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3A9FC281-3B81-4D63-A76B-E1127C1D2241}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3A9FC281-3B81-4D63-A76B-E1127C1D2241}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -187,6 +193,7 @@ Global
{90E5BF38-70C1-460A-9177-CE42815BDBF5} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{B7DD4A9C-2D24-4772-951E-86A665C59ADF} = {BD234E2E-F51A-4B18-B8BE-8AF6D546BF87}
{ADF66CBA-4F3E-4E91-9842-E194E3BC06A1} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{3A9FC281-3B81-4D63-A76B-E1127C1D2241} = {95A69671-16CA-4133-981C-CC381B7AAA30}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA}
Expand Down
1 change: 1 addition & 0 deletions NATS.Client.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=ASCII/@EntryIndexedValue">ASCII</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=CR/@EntryIndexedValue">CR</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=CRLF/@EntryIndexedValue">CRLF</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=JS/@EntryIndexedValue">JS</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=LF/@EntryIndexedValue">LF</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HMSG/@EntryIndexedValue">True</s:Boolean>
Expand Down
2 changes: 1 addition & 1 deletion sandbox/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
await conn.PublishAsync("foo", new Person(30, "bar"));

// Options can configure `with` expression
var options = NatsOptions.Default with
var options = NatsOpts.Default with
{
Url = "nats://127.0.0.1:9999",
LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Information),
Expand Down
2 changes: 1 addition & 1 deletion sandbox/Example.Core.PublishHeaders/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using NATS.Client.Core;

var subject = "bar.xyz";
var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };

Print("[CON] Connecting...\n");

Expand Down
2 changes: 1 addition & 1 deletion sandbox/Example.Core.PublishModel/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using NATS.Client.Core;

var subject = "bar.xyz";
var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };

Print("[CON] Connecting...\n");

Expand Down
2 changes: 1 addition & 1 deletion sandbox/Example.Core.SubscribeHeaders/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using NATS.Client.Core;

var subject = "bar.*";
var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };

Print("[CON] Connecting...\n");

Expand Down
2 changes: 1 addition & 1 deletion sandbox/Example.Core.SubscribeModel/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using NATS.Client.Core;

var subject = "bar.*";
var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };

Print("[CON] Connecting...\n");

Expand Down
2 changes: 1 addition & 1 deletion sandbox/Example.Core.SubscribeQueueGroup/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using NATS.Client.Core;

var subject = "foo.*";
var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };

// ---
// Worker 1
Expand Down
2 changes: 1 addition & 1 deletion sandbox/Example.Core.SubscribeRaw/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using NATS.Client.Core;

var subject = "foo.*";
var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };

Print("[CON] Connecting...\n");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\NATS.Client.JetStream\NATS.Client.JetStream.csproj" />
</ItemGroup>

</Project>
145 changes: 145 additions & 0 deletions sandbox/Example.JetStream.PullConsumer/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
using System.Diagnostics;
using Example.JetStream.PullConsumer;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;

var cts = new CancellationTokenSource();

Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
};

var options = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };

await using var nats = new NatsConnection(options);

var js = new NatsJSContext(nats);

var consumer = await js.CreateConsumerAsync("s1", "c1");

var idle = TimeSpan.FromSeconds(15);
var expires = TimeSpan.FromSeconds(30);

// int? maxMsgs = null;
// int? maxBytes = 128;
int? maxMsgs = 1000;
int? maxBytes = null;

static void ErrorHandler(NatsJSNotification notification)
{
Console.WriteLine($"Error: {notification}");
}

void Report(int i, Stopwatch sw, string data)
{
Console.WriteLine(data);
if (i % 10000 == 0)
Console.WriteLine($"Received: {i / sw.Elapsed.TotalSeconds:f2} msgs/s [{i}] {sw.Elapsed}");
}

var consumeOpts = new NatsJSConsumeOpts
{
MaxMsgs = maxMsgs,
MaxBytes = maxBytes,
Expires = expires,
IdleHeartbeat = idle,
Serializer = new RawDataSerializer(),
ErrorHandler = ErrorHandler,
};

var fetchOpts = new NatsJSFetchOpts
{
MaxMsgs = maxMsgs,
MaxBytes = maxBytes,
Expires = expires,
IdleHeartbeat = idle,
Serializer = new RawDataSerializer(),
ErrorHandler = ErrorHandler,
};

var nextOpts = new NatsJSNextOpts
{
Expires = expires, IdleHeartbeat = idle, Serializer = new RawDataSerializer(), ErrorHandler = ErrorHandler,
};

var stopwatch = Stopwatch.StartNew();
var count = 0;

try
{
if (args.Length > 0 && args[0] == "fetch")
{
while (!cts.Token.IsCancellationRequested)
{
Console.WriteLine($"___\nFETCH {maxMsgs}");
await using var sub = await consumer.FetchAsync<RawData>(fetchOpts, cts.Token);
await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
}
}
else if (args.Length > 0 && args[0] == "fetch-all")
{
while (!cts.Token.IsCancellationRequested)
{
Console.WriteLine($"___\nFETCH {maxMsgs}");
await foreach (var msg in consumer.FetchAllAsync<RawData>(fetchOpts, cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
}
}
else if (args.Length > 0 && args[0] == "next")
{
while (!cts.Token.IsCancellationRequested)
{
Console.WriteLine("___\nNEXT");
var next = await consumer.NextAsync<RawData>(nextOpts, cts.Token);
if (next is { } msg)
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
}
}
else if (args.Length > 0 && args[0] == "consume")
{
Console.WriteLine("___\nCONSUME");
await using var sub = await consumer.ConsumeAsync<RawData>(
consumeOpts,
cts.Token);

await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}

// Console.WriteLine($"took {stopwatch.Elapsed}");
// await nats.PingAsync(cts.Token);
}
else if (args.Length > 0 && args[0] == "consume-all")
{
Console.WriteLine("___\nCONSUME-ALL");
await foreach (var msg in consumer.ConsumeAllAsync<RawData>(consumeOpts, cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
}
else
{
Console.WriteLine("Usage: dotnet run -- <consume|consume-all|fetch|fetch-all|next>");
}
}
catch (OperationCanceledException)
{
}

Console.WriteLine("Bye");
12 changes: 12 additions & 0 deletions sandbox/Example.JetStream.PullConsumer/RawData.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Text;

namespace Example.JetStream.PullConsumer;

public class RawData
{
public RawData(byte[] buffer) => Buffer = buffer;

public byte[] Buffer { get; }

public override string ToString() => Encoding.ASCII.GetString(Buffer);
}
28 changes: 28 additions & 0 deletions sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Buffers;
using NATS.Client.Core;

namespace Example.JetStream.PullConsumer;

public class RawDataSerializer : INatsSerializer
{
public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
{
if (value is RawData data)
{
bufferWriter.Write(data.Buffer);
return data.Buffer.Length;
}

throw new Exception($"Can only work with '{typeof(RawData)}'");
}

public T? Deserialize<T>(in ReadOnlySequence<byte> buffer) => (T?)Deserialize(buffer, typeof(T));

public object? Deserialize(in ReadOnlySequence<byte> buffer, Type type)
{
if (type != typeof(RawData))
throw new Exception($"Can only work with '{typeof(RawData)}'");

return new RawData(buffer.ToArray());
}
}
2 changes: 1 addition & 1 deletion sandbox/MicroBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public async Task SetupAsync()

var loggerFactory = provider.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<ILogger<DefaultRun>>();
var options = NatsOptions.Default with
var options = NatsOpts.Default with
{
LoggerFactory = loggerFactory,
Echo = true,
Expand Down
12 changes: 6 additions & 6 deletions sandbox/NatsBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private void RunPubSubBenchmark(string testName, long testCount, long testSize,

var loggerFactory = provider.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<ILogger<Benchmark>>();
var options = NatsOptions.Default with
var options = NatsOpts.Default with
{
// LoggerFactory = loggerFactory,
UseThreadPoolCallback = false,
Expand Down Expand Up @@ -132,7 +132,7 @@ private void RunPubSubBenchmarkBatch(string testName, long testCount, long testS

var loggerFactory = provider.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<ILogger<Benchmark>>();
var options = NatsOptions.Default with
var options = NatsOpts.Default with
{
// LoggerFactory = loggerFactory,
UseThreadPoolCallback = false,
Expand Down Expand Up @@ -216,7 +216,7 @@ private void ProfilingRunPubSubBenchmarkAsync(string testName, long testCount, l

var loggerFactory = provider.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<ILogger<Benchmark>>();
var options = NatsOptions.Default with
var options = NatsOpts.Default with
{
// LoggerFactory = loggerFactory,
UseThreadPoolCallback = false,
Expand Down Expand Up @@ -296,7 +296,7 @@ private void RunPubSubBenchmarkBatchRaw(string testName, long testCount, long te

var loggerFactory = provider.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<ILogger<Benchmark>>();
var options = NatsOptions.Default with
var options = NatsOpts.Default with
{
// LoggerFactory = loggerFactory,
UseThreadPoolCallback = false,
Expand Down Expand Up @@ -394,7 +394,7 @@ private void RunPubSubBenchmarkPubSub2(string testName, long testCount, long tes

var loggerFactory = provider.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<ILogger<Benchmark>>();
var options = NatsOptions.Default with
var options = NatsOpts.Default with
{
// LoggerFactory = loggerFactory,
UseThreadPoolCallback = false,
Expand Down Expand Up @@ -511,7 +511,7 @@ private void RunPubSubBenchmarkVector3(string testName, long testCount, bool dis

var loggerFactory = provider.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<ILogger<Benchmark>>();
var options = NatsOptions.Default with
var options = NatsOpts.Default with
{
// LoggerFactory = loggerFactory,
UseThreadPoolCallback = false,
Expand Down
Loading

0 comments on commit 6f9df6e

Please sign in to comment.