Skip to content

Commit

Permalink
Added slow consumer events (#276)
Browse files Browse the repository at this point in the history
* Added slow consumer events

Slow consumers are subscriptions which are unable to process their messages
fast enough. When the internal subscription channel is full oldest messages
should be dropped and new messages should continued to be processed. This is
aligned with NATS at most once delivery.

See also https://docs.nats.io/running-a-nats-service/nats_admin/slow_consumers

* Avoid capturing too many references

Now only captures 'this' reference and not 'connection' in addition.

* Set channel default capacity 1024

Also opts SubPending* rename.

* Don't drop messages in perf test
  • Loading branch information
mtmk authored Dec 12, 2023
1 parent 9ed8a14 commit aececb9
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 37 deletions.
34 changes: 34 additions & 0 deletions src/NATS.Client.Core/INatsError.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
namespace NATS.Client.Core;

public interface INatsError
{
string Message { get; }
}

public sealed class MessageDroppedError : INatsError
{
public MessageDroppedError(NatsSubBase subscription, int pending, string subject, string? replyTo, NatsHeaders? headers, object? data)
{
Subscription = subscription;
Pending = pending;
Subject = subject;
ReplyTo = replyTo;
Headers = headers;
Data = data;
Message = $"Dropped message from {subject} with {pending} pending messages";
}

public NatsSubBase Subscription { get; }

public int Pending { get; }

public string Subject { get; }

public string? ReplyTo { get; }

public NatsHeaders? Headers { get; }

public object? Data { get; }

public string Message { get; }
}
38 changes: 38 additions & 0 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public partial class NatsConnection : IAsyncDisposable, INatsConnection
private readonly CancellationTokenSource _disposedCancellationTokenSource;
private readonly string _name;
private readonly TimeSpan _socketComponentDisposeTimeout = TimeSpan.FromSeconds(5);
private readonly BoundedChannelOptions _defaultSubscriptionChannelOpts;

private int _pongCount;
private bool _isDisposed;
Expand Down Expand Up @@ -78,6 +79,13 @@ public NatsConnection(NatsOpts opts)
_logger = opts.LoggerFactory.CreateLogger<NatsConnection>();
_clientOpts = ClientOpts.Create(Opts);
HeaderParser = new NatsHeaderParser(opts.HeaderEncoding);
_defaultSubscriptionChannelOpts = new BoundedChannelOptions(opts.SubPendingChannelCapacity)
{
FullMode = opts.SubPendingChannelFullMode,
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
};
}

// events
Expand All @@ -87,6 +95,8 @@ public NatsConnection(NatsOpts opts)

public event EventHandler<string>? ReconnectFailed;

public event EventHandler<INatsError>? OnError;

public NatsOpts Opts { get; }

public NatsConnectionState ConnectionState
Expand Down Expand Up @@ -238,6 +248,34 @@ internal ValueTask UnsubscribeAsync(int sid)
return ValueTask.CompletedTask;
}

internal void MessageDropped<T>(NatsSub<T> natsSub, int pending, NatsMsg<T> msg)
{
var subject = msg.Subject;
_logger.LogWarning("Dropped message from {Subject} with {Pending} pending messages", subject, pending);
OnError?.Invoke(this, new MessageDroppedError(natsSub, pending, subject, msg.ReplyTo, msg.Headers, msg.Data));
}

internal BoundedChannelOptions GetChannelOpts(NatsOpts connectionOpts, NatsSubChannelOpts? subChannelOpts)
{
if (subChannelOpts is { } overrideOpts)
{
return new BoundedChannelOptions(overrideOpts.Capacity ??
_defaultSubscriptionChannelOpts.Capacity)
{
AllowSynchronousContinuations =
_defaultSubscriptionChannelOpts.AllowSynchronousContinuations,
FullMode =
overrideOpts.FullMode ?? _defaultSubscriptionChannelOpts.FullMode,
SingleWriter = _defaultSubscriptionChannelOpts.SingleWriter,
SingleReader = _defaultSubscriptionChannelOpts.SingleReader,
};
}
else
{
return _defaultSubscriptionChannelOpts;
}
}

private async ValueTask InitialConnectAsync()
{
Debug.Assert(ConnectionState == NatsConnectionState.Connecting, "Connection state");
Expand Down
20 changes: 20 additions & 0 deletions src/NATS.Client.Core/NatsOpts.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Text;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Client.Core.Internal;
Expand Down Expand Up @@ -94,6 +95,25 @@ public sealed record NatsOpts
/// </summary>
public bool IgnoreAuthErrorAbort { get; init; } = false;

/// <summary>
/// This value will be used for subscriptions internal bounded message channel capacity.
/// The default subscriber pending message limit is 1024.
/// </summary>
public int SubPendingChannelCapacity { get; init; } = 1024;

/// <summary>
/// This value will be used for subscriptions internal bounded message channel <c>FullMode</c>.
/// The default is to drop newest message when full (<c>BoundedChannelFullMode.DropNewest</c>).
/// </summary>
/// <remarks>
/// If the client reaches this internal limit (bounded channel capacity), by default it will drop messages
/// and continue to process new messages. This is aligned with NATS at most once delivery. It is up to
/// the application to detect the missing messages (<seealso cref="NatsConnection.OnError"/>) and recover
/// from this condition or set a different default such as <c>BoundedChannelFullMode.Wait</c> in which
/// case it might risk server disconnecting the client as a slow consumer.
/// </remarks>
public BoundedChannelFullMode SubPendingChannelFullMode { get; init; } = BoundedChannelFullMode.DropNewest;

internal NatsUri[] GetSeedUris()
{
var urls = Url.Split(',');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ internal NatsSub(
: base(connection, manager, subject, queueGroup, opts, cancellationToken)
{
_msgs = Channel.CreateBounded<NatsMsg<T>>(
NatsSubUtils.GetChannelOpts(opts?.ChannelOpts));
connection.GetChannelOpts(connection.Opts, opts?.ChannelOpts),
msg => Connection.MessageDropped(this, _msgs?.Reader.Count ?? 0, msg));

Serializer = serializer;
}
Expand Down Expand Up @@ -64,37 +65,3 @@ public NatsSubException(string message, ExceptionDispatchInfo exception, Memory<

public Memory<byte> Headers { get; }
}

internal sealed class NatsSubUtils
{
private static readonly BoundedChannelOptions DefaultChannelOpts =
new BoundedChannelOptions(1_000)
{
FullMode = BoundedChannelFullMode.Wait,
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
};

internal static BoundedChannelOptions GetChannelOpts(
NatsSubChannelOpts? subChannelOpts)
{
if (subChannelOpts is { } overrideOpts)
{
return new BoundedChannelOptions(overrideOpts.Capacity ??
DefaultChannelOpts.Capacity)
{
AllowSynchronousContinuations =
DefaultChannelOpts.AllowSynchronousContinuations,
FullMode =
overrideOpts.FullMode ?? DefaultChannelOpts.FullMode,
SingleWriter = DefaultChannelOpts.SingleWriter,
SingleReader = DefaultChannelOpts.SingleReader,
};
}
else
{
return DefaultChannelOpts;
}
}
}
7 changes: 6 additions & 1 deletion tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ async Task Isolator()
// Subscription is not being disposed here
var natsSub = await nats.SubscribeCoreAsync<string>("foo");
Assert.That(natsSub.Subject, Is.EqualTo("foo"));
dotMemory.Check(memory =>
{
var count = memory.GetObjects(where => where.Type.Is<NatsSub<string>>()).ObjectsCount;
Assert.That(count, Is.EqualTo(1));
});
}

Isolator().GetAwaiter().GetResult();
Expand All @@ -26,7 +31,7 @@ async Task Isolator()

dotMemory.Check(memory =>
{
var count = memory.GetObjects(where => where.Type.Is<NatsSubUtils>()).ObjectsCount;
var count = memory.GetObjects(where => where.Type.Is<NatsSub<string>>()).ObjectsCount;
Assert.That(count, Is.EqualTo(0));
});
}
Expand Down
85 changes: 85 additions & 0 deletions tests/NATS.Client.Core.Tests/SlowConsumerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
namespace NATS.Client.Core.Tests;

public class SlowConsumerTest
{
private readonly ITestOutputHelper _output;

public SlowConsumerTest(ITestOutputHelper output) => _output = output;

[Fact]
public async Task Slow_consumer()
{
await using var server = NatsServer.Start();
var nats = server.CreateClientConnection(new NatsOpts { SubPendingChannelCapacity = 3 });

var lost = 0;
nats.OnError += (_, e) =>
{
if (e is MessageDroppedError dropped)
{
Interlocked.Increment(ref lost);
_output.WriteLine($"LOST {dropped.Data}");
}
};

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

var sync = 0;
var end = 0;
var count = 0;
var signal = new WaitSignal();
var subscription = Task.Run(
async () =>
{
await foreach (var msg in nats.SubscribeAsync<string>("foo.>", cancellationToken: cancellationToken))
{
if (msg.Subject == "foo.sync")
{
Interlocked.Increment(ref sync);
continue;
}
if (msg.Subject == "foo.end")
{
Interlocked.Increment(ref end);
break;
}
await signal;
Interlocked.Increment(ref count);
_output.WriteLine($"GOOD {msg.Data}");
}
},
cancellationToken);

await Retry.Until(
"subscription is ready",
() => Volatile.Read(ref sync) > 0,
async () => await nats.PublishAsync("foo.sync", cancellationToken: cancellationToken));

for (var i = 0; i < 10; i++)
{
await nats.PublishAsync("foo.data", $"A{i}", cancellationToken: cancellationToken);
}

signal.Pulse();

for (var i = 0; i < 10; i++)
{
await nats.PublishAsync("foo.data", $"B{i}", cancellationToken: cancellationToken);
}

await Retry.Until(
"subscription is ended",
() => Volatile.Read(ref end) > 0,
async () => await nats.PublishAsync("foo.end", cancellationToken: cancellationToken));

await subscription;

// we should've lost most of the messages because of the low channel capacity
Volatile.Read(ref count).Should().BeLessThan(20);
Volatile.Read(ref lost).Should().BeGreaterThan(0);
}
}
7 changes: 6 additions & 1 deletion tests/NATS.Client.Perf/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Buffers;
using System.Diagnostics;
using System.Text.RegularExpressions;
using System.Threading.Channels;
using NATS.Client.Core;
using NATS.Client.Core.Tests;

Expand All @@ -23,7 +24,11 @@
Console.WriteLine("\nRunning nats bench");
var natsBenchTotalMsgs = RunNatsBench(server.ClientUrl, t);

await using var nats1 = server.CreateClientConnection();
await using var nats1 = server.CreateClientConnection(new NatsOpts
{
// don't drop messages
SubPendingChannelFullMode = BoundedChannelFullMode.Wait,
});
await using var nats2 = server.CreateClientConnection();

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
Expand Down

0 comments on commit aececb9

Please sign in to comment.