Skip to content

Commit

Permalink
Sub cancel race fix (#325)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk authored Jan 11, 2024
1 parent 421463c commit 8ab4c6f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 11 deletions.
34 changes: 30 additions & 4 deletions src/NATS.Client.Core/NatsSubBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public abstract class NatsSubBase
{
private static readonly byte[] NoRespondersHeaderSequence = { (byte)' ', (byte)'5', (byte)'0', (byte)'3' };
private readonly ILogger _logger;
private readonly object _gate = new();
private readonly bool _debug;
private readonly ISubscriptionManager _manager;
private readonly Timer? _timeoutTimer;
Expand Down Expand Up @@ -66,6 +67,13 @@ internal NatsSubBase(
QueueGroup = queueGroup;
Opts = opts;

// If cancellation token is already cancelled we don't need to register however there is still
// a chance that cancellation token is cancelled after this check but before we register or
// the derived class constructor is completed. In that case we might be calling subclass
// methods through EndSubscription() on potentially not a fully initialized instance which
// might be a problem. This should reduce the impact of that problem.
cancellationToken.ThrowIfCancellationRequested();

_tokenRegistration = cancellationToken.UnsafeRegister(
state =>
{
Expand Down Expand Up @@ -144,7 +152,7 @@ public virtual ValueTask ReadyAsync()
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous server UNSUB operation.</returns>
public ValueTask UnsubscribeAsync()
{
lock (this)
lock (_gate)
{
if (_unsubscribed)
return ValueTask.CompletedTask;
Expand All @@ -154,14 +162,27 @@ public ValueTask UnsubscribeAsync()
_timeoutTimer?.Change(Timeout.Infinite, Timeout.Infinite);
_idleTimeoutTimer?.Change(Timeout.Infinite, Timeout.Infinite);
_startUpTimeoutTimer?.Change(Timeout.Infinite, Timeout.Infinite);
TryComplete();

try
{
TryComplete();
}
catch (Exception e)
{
// Ignore any exceptions thrown by the derived class since we're already in the process of unsubscribing.
// We don't want to throw here and prevent the unsubscribe call from completing.
// This is also required to workaround a potential race condition between the derived class and the base
// class when cancellation token is cancelled before the instance creation is fully finished in its
// constructor hence TryComplete() method might be dealing with uninitialized state.
_logger.LogWarning(NatsLogEvents.Subscription, e, "Error while completing subscription");
}

return _manager.RemoveAsync(this);
}

public virtual ValueTask DisposeAsync()
{
lock (this)
lock (_gate)
{
if (_disposed)
return ValueTask.CompletedTask;
Expand Down Expand Up @@ -294,14 +315,19 @@ protected void DecrementMaxMsgs()
/// <summary>
/// Invoked to signal end of the subscription.
/// </summary>
/// <remarks>
/// Do not implement complex logic in this method. It should only be used to complete the channel writers.
/// The reason is that this method might be invoked while instance is being created in constructors and
/// the cancellation token might be cancelled before the members are fully initialized.
/// </remarks>
protected abstract void TryComplete();

protected void EndSubscription(NatsSubEndReason reason)
{
if (_debug)
_logger.LogDebug(NatsLogEvents.Subscription, "End subscription {Reason}", reason);

lock (this)
lock (_gate)
{
if (_endSubscription)
return;
Expand Down
24 changes: 17 additions & 7 deletions tests/NATS.Client.Core.Tests/CancellationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,22 @@ public async Task CommandConnectCancellationTest()
await Assert.ThrowsAsync<TaskCanceledException>(() => conn.PingAsync(cancellationToken).AsTask());
await Assert.ThrowsAsync<TaskCanceledException>(() => conn.PublishAsync("test", cancellationToken: cancellationToken).AsTask());

// todo: https://github.com/nats-io/nats.net.v2/issues/323
// await Assert.ThrowsAsync<TaskCanceledException>(async () =>
// {
// await foreach (var unused in conn.SubscribeAsync<string>("test", cancellationToken: cancellationToken))
// {
// }
// });
// Because of a race condition minimization / workaround, the following test will throw an OperationCanceledException
// rather than a TaskCanceledException.
await Assert.ThrowsAsync<OperationCanceledException>(async () =>
{
await foreach (var unused in conn.SubscribeAsync<string>("test", cancellationToken: cancellationToken))
{
}
});

await Assert.ThrowsAsync<TaskCanceledException>(async () =>
{
// Give NatsSubBase class a good chance to complete its constructors
var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(3));
await foreach (var unused in conn.SubscribeAsync<string>("test", cancellationToken: cts2.Token))
{
}
});
}
}

0 comments on commit 8ab4c6f

Please sign in to comment.