diff --git a/src/NATS.Client.Core/NatsSubBase.cs b/src/NATS.Client.Core/NatsSubBase.cs
index 570fdaea3..e1b8a610b 100644
--- a/src/NATS.Client.Core/NatsSubBase.cs
+++ b/src/NATS.Client.Core/NatsSubBase.cs
@@ -27,6 +27,7 @@ public abstract class NatsSubBase
{
private static readonly byte[] NoRespondersHeaderSequence = { (byte)' ', (byte)'5', (byte)'0', (byte)'3' };
private readonly ILogger _logger;
+ private readonly object _gate = new();
private readonly bool _debug;
private readonly ISubscriptionManager _manager;
private readonly Timer? _timeoutTimer;
@@ -66,6 +67,13 @@ internal NatsSubBase(
QueueGroup = queueGroup;
Opts = opts;
+ // If cancellation token is already cancelled we don't need to register however there is still
+ // a chance that cancellation token is cancelled after this check but before we register or
+ // the derived class constructor is completed. In that case we might be calling subclass
+ // methods through EndSubscription() on potentially not a fully initialized instance which
+ // might be a problem. This should reduce the impact of that problem.
+ cancellationToken.ThrowIfCancellationRequested();
+
_tokenRegistration = cancellationToken.UnsafeRegister(
state =>
{
@@ -144,7 +152,7 @@ public virtual ValueTask ReadyAsync()
/// A that represents the asynchronous server UNSUB operation.
public ValueTask UnsubscribeAsync()
{
- lock (this)
+ lock (_gate)
{
if (_unsubscribed)
return ValueTask.CompletedTask;
@@ -154,14 +162,27 @@ public ValueTask UnsubscribeAsync()
_timeoutTimer?.Change(Timeout.Infinite, Timeout.Infinite);
_idleTimeoutTimer?.Change(Timeout.Infinite, Timeout.Infinite);
_startUpTimeoutTimer?.Change(Timeout.Infinite, Timeout.Infinite);
- TryComplete();
+
+ try
+ {
+ TryComplete();
+ }
+ catch (Exception e)
+ {
+ // Ignore any exceptions thrown by the derived class since we're already in the process of unsubscribing.
+ // We don't want to throw here and prevent the unsubscribe call from completing.
+ // This is also required to workaround a potential race condition between the derived class and the base
+ // class when cancellation token is cancelled before the instance creation is fully finished in its
+ // constructor hence TryComplete() method might be dealing with uninitialized state.
+ _logger.LogWarning(NatsLogEvents.Subscription, e, "Error while completing subscription");
+ }
return _manager.RemoveAsync(this);
}
public virtual ValueTask DisposeAsync()
{
- lock (this)
+ lock (_gate)
{
if (_disposed)
return ValueTask.CompletedTask;
@@ -294,6 +315,11 @@ protected void DecrementMaxMsgs()
///
/// Invoked to signal end of the subscription.
///
+ ///
+ /// Do not implement complex logic in this method. It should only be used to complete the channel writers.
+ /// The reason is that this method might be invoked while instance is being created in constructors and
+ /// the cancellation token might be cancelled before the members are fully initialized.
+ ///
protected abstract void TryComplete();
protected void EndSubscription(NatsSubEndReason reason)
@@ -301,7 +327,7 @@ protected void EndSubscription(NatsSubEndReason reason)
if (_debug)
_logger.LogDebug(NatsLogEvents.Subscription, "End subscription {Reason}", reason);
- lock (this)
+ lock (_gate)
{
if (_endSubscription)
return;
diff --git a/tests/NATS.Client.Core.Tests/CancellationTest.cs b/tests/NATS.Client.Core.Tests/CancellationTest.cs
index f29431931..1148cd569 100644
--- a/tests/NATS.Client.Core.Tests/CancellationTest.cs
+++ b/tests/NATS.Client.Core.Tests/CancellationTest.cs
@@ -57,12 +57,22 @@ public async Task CommandConnectCancellationTest()
await Assert.ThrowsAsync(() => conn.PingAsync(cancellationToken).AsTask());
await Assert.ThrowsAsync(() => conn.PublishAsync("test", cancellationToken: cancellationToken).AsTask());
- // todo: https://github.com/nats-io/nats.net.v2/issues/323
- // await Assert.ThrowsAsync(async () =>
- // {
- // await foreach (var unused in conn.SubscribeAsync("test", cancellationToken: cancellationToken))
- // {
- // }
- // });
+ // Because of a race condition minimization / workaround, the following test will throw an OperationCanceledException
+ // rather than a TaskCanceledException.
+ await Assert.ThrowsAsync(async () =>
+ {
+ await foreach (var unused in conn.SubscribeAsync("test", cancellationToken: cancellationToken))
+ {
+ }
+ });
+
+ await Assert.ThrowsAsync(async () =>
+ {
+ // Give NatsSubBase class a good chance to complete its constructors
+ var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(3));
+ await foreach (var unused in conn.SubscribeAsync("test", cancellationToken: cts2.Token))
+ {
+ }
+ });
}
}