Skip to content

Commit

Permalink
NatsSub simplification and JetStream support (#108)
Browse files Browse the repository at this point in the history
* NatsSubX simplification and JetStream support

* Remove INatsSubBuilder interfaces: This is to reduce unnecessary complexity
  since NatsSubBase can be used as the contract in SubscriptionManager

* Create a separation of public interfaces and internal handling for
  subscriptions by using INatsSub* interfaces for public methods
  and using NatsSubBase as the internal handling interface with internals
  exposed such as Pending messages.

* Push responsibility of reconnections to SubscriptionManager so that
  not only we can renew subscriptions but in the case of JetStream
  consumers recovery, we can issue pull requests for example.

* Fixed req-reply

* Tidy-up

* Tidy-up

* Test flapping

* Fixed warnings
  • Loading branch information
mtmk authored Aug 7, 2023
1 parent 9c60b06 commit 68ea8d9
Show file tree
Hide file tree
Showing 23 changed files with 231 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class WeatherForecastService : IHostedService, IAsyncDisposable

private readonly ILogger<WeatherForecastService> _logger;
private readonly INatsConnection _natsConnection;
private NatsSub<object>? _replySubscription;
private INatsSub<object>? _replySubscription;
private Task? _replyTask;

public WeatherForecastService(ILogger<WeatherForecastService> logger, INatsConnection natsConnection)
Expand Down
4 changes: 2 additions & 2 deletions sandbox/NatsBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ public struct Vector3

internal static class NatsMsgTestUtils
{
internal static NatsSub<T>? Register<T>(this NatsSub<T>? sub, Action<NatsMsg<T?>> action)
internal static INatsSub<T>? Register<T>(this INatsSub<T>? sub, Action<NatsMsg<T?>> action)
{
if (sub == null)
return null;
Expand All @@ -825,7 +825,7 @@ internal static class NatsMsgTestUtils
return sub;
}

internal static NatsSub? Register(this NatsSub? sub, Action<NatsMsg> action)
internal static INatsSub? Register(this INatsSub? sub, Action<NatsMsg> action)
{
if (sub == null)
return null;
Expand Down
12 changes: 6 additions & 6 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public interface INatsConnection
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = default, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes the message payload to the given subject name, optionally supplying a reply subject.
Expand All @@ -28,7 +28,7 @@ public interface INatsConnection
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync(in NatsMsg msg, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
Expand All @@ -39,7 +39,7 @@ public interface INatsConnection
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be send to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(string subject, T data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
ValueTask PublishAsync<T>(string subject, T data, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
Expand All @@ -49,7 +49,7 @@ public interface INatsConnection
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be send to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(in NatsMsg<T> msg, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
ValueTask PublishAsync<T>(in NatsMsg<T> msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
Expand All @@ -58,7 +58,7 @@ public interface INatsConnection
/// <param name="opts">A <see cref="NatsSubOpts"/> for subscription options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous send operation.</returns>
ValueTask<NatsSub> SubscribeAsync(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default);
ValueTask<INatsSub> SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
Expand All @@ -68,5 +68,5 @@ public interface INatsConnection
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be received from the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous send operation.</returns>
ValueTask<NatsSub<T>> SubscribeAsync<T>(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default);
ValueTask<INatsSub<T>> SubscribeAsync<T>(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);
}
57 changes: 57 additions & 0 deletions src/NATS.Client.Core/INatsSub.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System.Threading.Channels;

namespace NATS.Client.Core;

public interface INatsSub : IAsyncDisposable
{
/// <summary>
/// Access incoming messages for your subscription.
/// </summary>
ChannelReader<NatsMsg> Msgs { get; }

/// <summary>
/// The subject name to subscribe to.
/// </summary>
string Subject { get; }

/// <summary>
/// If specified, the subscriber will join this queue group. Subscribers with the same queue group name,
/// become a queue group, and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </summary>
string? QueueGroup { get; }

/// <summary>
/// Complete the message channel, stop timers if they were used and send an unsubscribe
/// message to the server.
/// </summary>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous server UNSUB operation.</returns>
public ValueTask UnsubscribeAsync();
}

public interface INatsSub<T> : IAsyncDisposable
{
/// <summary>
/// Access incoming messages for your subscription.
/// </summary>
ChannelReader<NatsMsg<T?>> Msgs { get; }

/// <summary>
/// The subject name to subscribe to.
/// </summary>
string Subject { get; }

/// <summary>
/// If specified, the subscriber will join this queue group. Subscribers with the same queue group name,
/// become a queue group, and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </summary>
string? QueueGroup { get; }

/// <summary>
/// Complete the message channel, stop timers if they were used and send an unsubscribe
/// message to the server.
/// </summary>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous server UNSUB operation.</returns>
public ValueTask UnsubscribeAsync();
}
53 changes: 0 additions & 53 deletions src/NATS.Client.Core/Internal/INatsSub.cs

This file was deleted.

40 changes: 15 additions & 25 deletions src/NATS.Client.Core/Internal/InboxSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,39 @@

namespace NATS.Client.Core.Internal;

internal class InboxSub : INatsSub
internal class InboxSub : NatsSubBase
{
private readonly InboxSubBuilder _inbox;
private readonly NatsConnection _connection;
private readonly ISubscriptionManager _manager;

public InboxSub(
InboxSubBuilder inbox,
string subject,
NatsSubOpts? opts,
NatsConnection connection,
ISubscriptionManager manager)
: base(connection, manager, subject, opts)
{
_inbox = inbox;
_connection = connection;
_manager = manager;
Subject = subject;
QueueGroup = opts?.QueueGroup;
PendingMsgs = opts?.MaxMsgs;
}

public string Subject { get; }

public string? QueueGroup { get; }

public int? PendingMsgs { get; }

public void Ready()
{
}
protected override ValueTask ReceiveInternalAsync(
string subject,
string? replyTo,
ReadOnlySequence<byte>? headersBuffer,
ReadOnlySequence<byte> payloadBuffer) =>
_inbox.ReceivedAsync(subject, replyTo, headersBuffer, payloadBuffer, _connection);

public ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer)
protected override void TryComplete()
{
return _inbox.ReceivedAsync(subject, replyTo, headersBuffer, payloadBuffer, _connection);
}

public ValueTask DisposeAsync() => _manager.RemoveAsync(this);
}

internal class InboxSubBuilder : INatsSubBuilder<InboxSub>, ISubscriptionManager
internal class InboxSubBuilder : ISubscriptionManager
{
private readonly ILogger<InboxSubBuilder> _logger;
private readonly ConcurrentDictionary<string, ConditionalWeakTable<INatsSub, object>> _bySubject = new();
private readonly ConcurrentDictionary<string, ConditionalWeakTable<NatsSubBase, object>> _bySubject = new();

public InboxSubBuilder(ILogger<InboxSubBuilder> logger) => _logger = logger;

Expand All @@ -56,11 +46,11 @@ public InboxSub Build(string subject, NatsSubOpts? opts, NatsConnection connecti
return new InboxSub(this, subject, opts, connection, manager);
}

public void Register(INatsSub sub)
public void Register(NatsSubBase sub)
{
_bySubject.AddOrUpdate(
sub.Subject,
static (_, s) => new ConditionalWeakTable<INatsSub, object> { { s, new object() } },
static (_, s) => new ConditionalWeakTable<NatsSubBase, object> { { s, new object() } },
static (_, subTable, s) =>
{
lock (subTable)
Expand All @@ -69,7 +59,7 @@ public void Register(INatsSub sub)
{
// if current subTable is empty, it may be in process of being removed
// return a new object
return new ConditionalWeakTable<INatsSub, object> { { s, new object() } };
return new ConditionalWeakTable<NatsSubBase, object> { { s, new object() } };
}
// the updateValueFactory delegate can be called multiple times
Expand Down Expand Up @@ -97,7 +87,7 @@ public async ValueTask ReceivedAsync(string subject, string? replyTo, ReadOnlySe
}
}

public ValueTask RemoveAsync(INatsSub sub)
public ValueTask RemoveAsync(NatsSubBase sub)
{
if (!_bySubject.TryGetValue(sub.Subject, out var subTable))
{
Expand Down
Loading

0 comments on commit 68ea8d9

Please sign in to comment.