Skip to content

Commit

Permalink
Add INatsConnectionPool interface (#109)
Browse files Browse the repository at this point in the history
* Extract INatsConnectionPool interface

* Make request/reply extension methods instance methods

* Move NewInbox to INatsConnection

* Fix formatting

* Simplify INatsConnection registration

* Remove explicit interface implementation

* Fix NatsConnection factory delegate
  • Loading branch information
jasper-d authored Aug 17, 2023
1 parent 5d80742 commit f05ab10
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 207 deletions.
95 changes: 95 additions & 0 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Buffers;
using System.Runtime.CompilerServices;

namespace NATS.Client.Core;

Expand Down Expand Up @@ -69,4 +70,98 @@ public interface INatsConnection
/// <typeparam name="T">Specifies the type of data that may be received from the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous send operation.</returns>
ValueTask<INatsSub<T>> SubscribeAsync<T>(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Create a new inbox subject with the form {Inbox Prefix}.{Unique Connection ID}.{Unique Inbox ID}
/// </summary>
/// <returns>A <see cref="string"/> containing a unique inbox subject.</returns>
string NewInbox();

/// <summary>
/// Request and receive a single reply from a responder.
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="data">Data to send to responder</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
/// <typeparam name="TRequest">Request type</typeparam>
/// <typeparam name="TReply">Reply type</typeparam>
/// <returns>Returns the <see cref="NatsMsg{T}"/> received from the responder as reply.</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// Response can be (null) or one <see cref="NatsMsg"/>.
/// Reply option's max messages will be set to 1.
/// if reply option's timeout is not defined then it will be set to NatsOptions.RequestTimeout.
/// </remarks>
ValueTask<NatsMsg<TReply?>?> RequestAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Request and receive a single reply from a responder.
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="payload">Payload to send to responder</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
/// <returns>Returns the <see cref="NatsMsg"/> received from the responder as reply.</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// Response can be (null) or one <see cref="NatsMsg"/>.
/// Reply option's max messages will be set to 1 (one).
/// if reply option's timeout is not defined then it will be set to NatsOptions.RequestTimeout.
/// </remarks>
ValueTask<NatsMsg?> RequestAsync(
string subject,
ReadOnlySequence<byte> payload = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Request and receive zero or more replies from a responder.
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="data">Data to send to responder</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
/// <typeparam name="TRequest">Request type</typeparam>
/// <typeparam name="TReply">Reply type</typeparam>
/// <returns>An asynchronous enumerable of <see cref="NatsMsg"/> objects</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// if reply option's timeout is not defined then it will be set to NatsOptions.RequestTimeout.
/// </remarks>
IAsyncEnumerable<NatsMsg<TReply?>> RequestManyAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Request and receive zero or more replies from a responder.
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="payload">Payload to send to responder</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
/// <returns>An asynchronous enumerable of <see cref="NatsMsg"/> objects</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// if reply option's timeout is not defined then it will be set to NatsOptions.RequestTimeout.
/// </remarks>
IAsyncEnumerable<NatsMsg> RequestManyAsync(
string subject,
ReadOnlySequence<byte> payload = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);
}
8 changes: 8 additions & 0 deletions src/NATS.Client.Core/INatsConnectionPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace NATS.Client.Core;

public interface INatsConnectionPool : IAsyncDisposable
{
INatsConnection GetConnection();

IEnumerable<INatsConnection> GetConnections();
}
120 changes: 120 additions & 0 deletions src/NATS.Client.Core/NatsConnection.RequestReply.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
using System.Buffers;
using System.Runtime.CompilerServices;

namespace NATS.Client.Core;

public partial class NatsConnection
{
/// <inheritdoc />
public string NewInbox() => $"{InboxPrefix}{Guid.NewGuid():n}";

/// <inheritdoc />
public async ValueTask<NatsMsg<TReply?>?> RequestAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default)
{
var opts = SetReplyOptsDefaults(replyOpts);

await using var sub = await RequestSubAsync<TRequest, TReply>(subject, data, requestOpts, opts, cancellationToken)
.ConfigureAwait(false);

if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
if (sub.Msgs.TryRead(out var msg))
{
return msg;
}
}

return null;
}

/// <inheritdoc />
public async ValueTask<NatsMsg?> RequestAsync(
string subject,
ReadOnlySequence<byte> payload = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default)
{
var opts = SetReplyOptsDefaults(replyOpts);

await using var sub = await RequestSubAsync(subject, payload, requestOpts, opts, cancellationToken).ConfigureAwait(false);

if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
if (sub.Msgs.TryRead(out var msg))
{
return msg;
}
}

return null;
}

/// <inheritdoc />
public async IAsyncEnumerable<NatsMsg<TReply?>> RequestManyAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var sub = await RequestSubAsync<TRequest, TReply>(subject, data, requestOpts, replyOpts, cancellationToken)
.ConfigureAwait(false);

while (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (sub.Msgs.TryRead(out var msg))
{
// Received end of stream sentinel
if (msg.Data is null)
{
yield break;
}

yield return msg;
}
}
}

/// <inheritdoc />
public async IAsyncEnumerable<NatsMsg> RequestManyAsync(
string subject,
ReadOnlySequence<byte> payload = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var sub = await RequestSubAsync(subject, payload, requestOpts, replyOpts, cancellationToken).ConfigureAwait(false);

while (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (sub.Msgs.TryRead(out var msg))
{
// Received end of stream sentinel
if (msg.Data.Length == 0)
{
yield break;
}

yield return msg;
}
}
}

private NatsSubOpts SetReplyOptsDefaults(in NatsSubOpts? replyOpts)
{
var opts = (replyOpts ?? default) with { MaxMsgs = 1, };

if ((opts.Timeout ?? default) == default)
{
opts = opts with { Timeout = Options.RequestTimeout };
}

return opts;
}
}
11 changes: 3 additions & 8 deletions src/NATS.Client.Core/NatsConnectionPool.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace NATS.Client.Core;

public sealed class NatsConnectionPool : IAsyncDisposable
public sealed class NatsConnectionPool : INatsConnectionPool
{
private readonly NatsConnection[] _connections;
private int _index = -1;
Expand Down Expand Up @@ -38,25 +38,20 @@ public NatsConnectionPool(int poolSize, NatsOptions options, Action<NatsConnecti
}
}

public NatsConnection GetConnection()
public INatsConnection GetConnection()
{
var i = Interlocked.Increment(ref _index);
return _connections[i % _connections.Length];
}

public IEnumerable<NatsConnection> GetConnections()
public IEnumerable<INatsConnection> GetConnections()
{
foreach (var item in _connections)
{
yield return item;
}
}

public INatsConnection GetCommand()
{
return GetConnection();
}

public async ValueTask DisposeAsync()
{
foreach (var item in _connections)
Expand Down
Loading

0 comments on commit f05ab10

Please sign in to comment.