Skip to content

Commit

Permalink
Inbox subscription iprovements (#95)
Browse files Browse the repository at this point in the history
* inbox subscription iprovements

Signed-off-by: Caleb Lloyd <caleb@synadia.com>

* Update src/NATS.Client.Core/Internal/InboxSub.cs

Co-authored-by: Ziya Suzen <ziya@synadia.com>

---------

Signed-off-by: Caleb Lloyd <caleb@synadia.com>
Co-authored-by: Ziya Suzen <ziya@synadia.com>
  • Loading branch information
caleblloyd and mtmk authored Jul 20, 2023
1 parent 0aaccd7 commit b0b5b12
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 26 deletions.
62 changes: 51 additions & 11 deletions src/NATS.Client.Core/Internal/InboxSub.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Buffers;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;

namespace NATS.Client.Core.Internal;
Expand Down Expand Up @@ -46,7 +47,7 @@ public ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence<
internal class InboxSubBuilder : INatsSubBuilder<InboxSub>, ISubscriptionManager
{
private readonly ILogger<InboxSubBuilder> _logger;
private readonly ConcurrentDictionary<string, INatsSub> _writers = new();
private readonly ConcurrentDictionary<string, ConditionalWeakTable<INatsSub, object>> _bySubject = new();

public InboxSubBuilder(ILogger<InboxSubBuilder> logger) => _logger = logger;

Expand All @@ -57,27 +58,66 @@ public InboxSub Build(string subject, NatsSubOpts? opts, NatsConnection connecti

public void Register(INatsSub sub)
{
if (!_writers.TryAdd(sub.Subject, sub))
throw new InvalidOperationException("Subject already registered");
_bySubject.AddOrUpdate(
sub.Subject,
static (_, s) => new ConditionalWeakTable<INatsSub, object> { { s, new object() } },
static (_, subTable, s) =>
{
lock (subTable)
{
if (!subTable.Any())
{
// if current subTable is empty, it may be in process of being removed
// return a new object
return new ConditionalWeakTable<INatsSub, object> { { s, new object() } };
}
// the updateValueFactory delegate can be called multiple times
// use AddOrUpdate to avoid exceptions if this happens
subTable.AddOrUpdate(s, new object());
return subTable;
}
},
sub);

sub.Ready();
}

public void Unregister(string subject) => _writers.TryRemove(subject, out _);

public ValueTask ReceivedAsync(string subject, string? replyTo, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer, NatsConnection connection)
public async ValueTask ReceivedAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer, NatsConnection connection)
{
if (!_writers.TryGetValue(subject, out var sub))
if (!_bySubject.TryGetValue(subject, out var subTable))
{
_logger.LogWarning("Unregistered message inbox received");
return ValueTask.CompletedTask;
_logger.LogWarning($"Unregistered message inbox received for {subject}");
return;
}

return sub.ReceiveAsync(subject, replyTo, headersBuffer, payloadBuffer);
foreach (var (sub, _) in subTable)
{
await sub.ReceiveAsync(subject, replyTo, headersBuffer, payloadBuffer).ConfigureAwait(false);
}
}

public ValueTask RemoveAsync(INatsSub sub)
{
Unregister(sub.Subject);
if (!_bySubject.TryGetValue(sub.Subject, out var subTable))
{
_logger.LogWarning($"Unregistered message inbox received for {sub.Subject}");
return ValueTask.CompletedTask;
}

lock (subTable)
{
if (!subTable.Remove(sub))
_logger.LogWarning($"Unregistered message inbox received for {sub.Subject}");

if (!subTable.Any())
{
// try to remove this specific instance of the subTable
// if an update is in process and sees an empty subTable, it will set a new instance
_bySubject.TryRemove(KeyValuePair.Create(sub.Subject, subTable));
}
}

return ValueTask.CompletedTask;
}
}
27 changes: 15 additions & 12 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ internal interface ISubscriptionManager
public ValueTask RemoveAsync(INatsSub sub);
}

internal record struct SidMetadata(string Subject, WeakReference<INatsSub> WeakReference);

internal sealed record SubscriptionMetadata(int Sid);

internal sealed class SubscriptionManager : ISubscriptionManager, IAsyncDisposable
Expand All @@ -18,7 +20,7 @@ internal sealed class SubscriptionManager : ISubscriptionManager, IAsyncDisposab
private readonly object _gate = new();
private readonly NatsConnection _connection;
private readonly string _inboxPrefix;
private readonly ConcurrentDictionary<int, WeakReference<INatsSub>> _bySid = new();
private readonly ConcurrentDictionary<int, SidMetadata> _bySid = new();
private readonly ConditionalWeakTable<INatsSub, SubscriptionMetadata> _bySub = new();
private readonly CancellationTokenSource _cts;
private readonly Task _timer;
Expand Down Expand Up @@ -47,11 +49,11 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)
{
lock (_gate)
{
foreach (var subRef in _bySid)
foreach (var (sid, sidMetadata) in _bySid)
{
if (subRef.Value.TryGetTarget(out var sub))
if (sidMetadata.WeakReference.TryGetTarget(out var sub))
{
yield return (subRef.Key, sub.Subject, sub.QueueGroup, sub.PendingMsgs);
yield return (sid, sub.Subject, sub.QueueGroup, sub.PendingMsgs);
}
}
}
Expand Down Expand Up @@ -98,15 +100,15 @@ public ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, i
int? orphanSid = null;
lock (_gate)
{
if (_bySid.TryGetValue(sid, out var subRef))
if (_bySid.TryGetValue(sid, out var sidMetadata))
{
if (subRef.TryGetTarget(out var sub))
if (sidMetadata.WeakReference.TryGetTarget(out var sub))
{
return sub.ReceiveAsync(subject, replyTo, headersBuffer, payloadBuffer);
}
else
{
_logger.LogWarning($"Dead subscription {subject}/{sid}");
_logger.LogWarning($"Subscription GCd but was never disposed {subject}/{sid}");
orphanSid = sid;
}
}
Expand All @@ -124,7 +126,7 @@ public ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, i
}
catch (Exception e)
{
_logger.LogWarning($"Error unsubscribing ophan SID during publish: {e.GetBaseException().Message}");
_logger.LogWarning($"Error unsubscribing orphan SID during publish: {e.GetBaseException().Message}");
}
}

Expand All @@ -138,7 +140,7 @@ public async ValueTask DisposeAsync()
WeakReference<INatsSub>[] subRefs;
lock (_gate)
{
subRefs = _bySid.Values.ToArray();
subRefs = _bySid.Values.Select(m => m.WeakReference).ToArray();
_bySid.Clear();
}

Expand Down Expand Up @@ -172,7 +174,7 @@ private async ValueTask<T> SubscribeInternalAsync<T>(string subject, NatsSubOpts
var sid = GetNextSid();
lock (_gate)
{
_bySid[sid] = new WeakReference<INatsSub>(sub);
_bySid[sid] = new SidMetadata(Subject: subject, WeakReference: new WeakReference<INatsSub>(sub));
_bySub.AddOrUpdate(sub, new SubscriptionMetadata(Sid: sid));
}

Expand Down Expand Up @@ -203,15 +205,16 @@ private async Task CleanupAsync()

lock (_gate)
{
foreach (var (sid, subRef) in _bySid)
foreach (var (sid, sidMetadata) in _bySid)
{
if (_cts.Token.IsCancellationRequested)
break;

if (subRef.TryGetTarget(out _))
if (sidMetadata.WeakReference.TryGetTarget(out _))
continue;

// NatsSub object GCed
_logger.LogWarning($"Subscription GCd but was never disposed {sidMetadata.Subject}/{sid}");
orphanSids ??= new List<int>();
orphanSids.Add(sid);
}
Expand Down
6 changes: 6 additions & 0 deletions src/NATS.Client.Core/NatsRequestExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ namespace NATS.Client.Core;

public static class NatsRequestExtensions
{
/// <summary>
/// Create a new inbox subject with the form {Inbox Prefix}.{Unique Connection ID}.{Unique Inbox ID}
/// </summary>
/// <returns>A <see cref="string"/> containing a unique inbox subject.</returns>
public static string NewInbox(this NatsConnection nats) => $"{nats.InboxPrefix}{Guid.NewGuid():n}";

/// <summary>
/// Request and receive a single reply from a responder.
/// </summary>
Expand Down
39 changes: 36 additions & 3 deletions tests/NATS.Client.Core.Tests/SubscriptionTest.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
using System.Buffers;
using System.Text;

namespace NATS.Client.Core.Tests;

public class SubscriptionTest
Expand Down Expand Up @@ -184,5 +181,41 @@ public async Task Auto_unsubscribe_test()
Assert.Equal(0, count);
Assert.Equal(NatsSubEndReason.None, sub.EndReason);
}

// Auto unsubscribe on max messages with Inbox Subscription
{
var subject = nats.NewInbox();

await using var sub1 = await nats.SubscribeAsync<int>(subject, new NatsSubOpts { MaxMsgs = 1 });
await using var sub2 = await nats.SubscribeAsync<int>(subject, new NatsSubOpts { MaxMsgs = 2 });

for (var i = 0; i < 3; i++)
{
await nats.PublishAsync(subject, i);
}

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

var count1 = 0;
await foreach (var natsMsg in sub1.Msgs.ReadAllAsync(cancellationToken))
{
Assert.Equal(count1, natsMsg.Data);
count1++;
}

Assert.Equal(1, count1);
Assert.Equal(NatsSubEndReason.MaxMsgs, sub1.EndReason);

var count2 = 0;
await foreach (var natsMsg in sub2.Msgs.ReadAllAsync(cancellationToken))
{
Assert.Equal(count2, natsMsg.Data);
count2++;
}

Assert.Equal(2, count2);
Assert.Equal(NatsSubEndReason.MaxMsgs, sub2.EndReason);
}
}
}

0 comments on commit b0b5b12

Please sign in to comment.