diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 26d77f8ea..241dbbdc4 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -48,6 +48,8 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable private AutorecoveringConnection _connection; private RecoveryAwareChannel _innerChannel; private bool _disposed; + private bool _isDisposing; + private readonly object _locker = new(); private ushort _prefetchCountConsumer; private ushort _prefetchCountGlobal; @@ -252,7 +254,15 @@ await _connection.DeleteRecordedChannelAsync(this, public override string ToString() => InnerChannel.ToString(); - public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); + public void Dispose() + { + if (_disposed) + { + return; + } + + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } public async ValueTask DisposeAsync() { @@ -261,14 +271,30 @@ public async ValueTask DisposeAsync() return; } - if (IsOpen) + lock (_locker) { - await this.AbortAsync() - .ConfigureAwait(false); + if (_isDisposing) + { + return; + } + _isDisposing = true; } - _recordedConsumerTags.Clear(); - _disposed = true; + try + { + if (IsOpen) + { + await this.AbortAsync() + .ConfigureAwait(false); + } + + _recordedConsumerTags.Clear(); + } + finally + { + _disposed = true; + _isDisposing = false; + } } public ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken); diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index 61df604f7..f266f60d0 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -50,6 +50,8 @@ internal sealed partial class AutorecoveringConnection : IConnection private Connection _innerConnection; private bool _disposed; + private bool _isDisposing; + private readonly object _locker = new(); private Connection InnerConnection { @@ -268,7 +270,15 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca return autorecoveringChannel; } - public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); + public void Dispose() + { + if (_disposed) + { + return; + } + + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } public async ValueTask DisposeAsync() { @@ -277,10 +287,24 @@ public async ValueTask DisposeAsync() return; } + lock (_locker) + { + if (_isDisposing) + { + return; + } + _isDisposing = true; + } + try { await _innerConnection.DisposeAsync() .ConfigureAwait(false); + + _channels.Clear(); + _recordedEntitiesSemaphore.Dispose(); + _channelsSemaphore.Dispose(); + _recoveryCancellationTokenSource.Dispose(); } catch (OperationInterruptedException) { @@ -288,11 +312,8 @@ await _innerConnection.DisposeAsync() } finally { - _channels.Clear(); - _recordedEntitiesSemaphore.Dispose(); - _channelsSemaphore.Dispose(); - _recoveryCancellationTokenSource.Dispose(); _disposed = true; + _isDisposing = false; } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 4acc3da62..7718b0fbb 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -59,8 +59,15 @@ internal partial class Channel : IChannel, IRecoverable private ShutdownEventArgs? _closeReason; public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); + private TaskCompletionSource? _serverOriginatedChannelCloseTcs; + internal readonly IConsumerDispatcher ConsumerDispatcher; + private bool _disposed; + private bool _isDisposing; + + private readonly object _locker = new(); + public Channel(ISession session, CreateChannelOptions createChannelOptions) { ContinuationTimeout = createChannelOptions.ContinuationTimeout; @@ -514,22 +521,54 @@ public override string ToString() void IDisposable.Dispose() { + if (_disposed) + { + return; + } + Dispose(true); } protected virtual void Dispose(bool disposing) { - if (disposing) + if (_disposed) { - if (IsOpen) + return; + } + + lock (_locker) + { + if (_isDisposing) { - this.AbortAsync().GetAwaiter().GetResult(); + return; } + _isDisposing = true; + } - ConsumerDispatcher.Dispose(); - _rpcSemaphore.Dispose(); - _confirmSemaphore.Dispose(); - _outstandingPublisherConfirmationsRateLimiter?.Dispose(); + if (disposing) + { + try + { + if (IsOpen) + { + this.AbortAsync().GetAwaiter().GetResult(); + } + + if (_serverOriginatedChannelCloseTcs is not null) + { + _serverOriginatedChannelCloseTcs.Task.Wait(TimeSpan.FromSeconds(5)); + } + + ConsumerDispatcher.Dispose(); + _rpcSemaphore.Dispose(); + _confirmSemaphore.Dispose(); + _outstandingPublisherConfirmationsRateLimiter?.Dispose(); + } + finally + { + _disposed = true; + _isDisposing = false; + } } } @@ -543,18 +582,46 @@ await DisposeAsyncCore() protected virtual async ValueTask DisposeAsyncCore() { - if (IsOpen) + if (_disposed) { - await this.AbortAsync().ConfigureAwait(false); + return; } - ConsumerDispatcher.Dispose(); - _rpcSemaphore.Dispose(); - _confirmSemaphore.Dispose(); - if (_outstandingPublisherConfirmationsRateLimiter is not null) + lock (_locker) { - await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() - .ConfigureAwait(false); + if (_isDisposing) + { + return; + } + _isDisposing = true; + } + + try + { + if (IsOpen) + { + await this.AbortAsync().ConfigureAwait(false); + } + + if (_serverOriginatedChannelCloseTcs is not null) + { + await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)) + .ConfigureAwait(false); + } + + ConsumerDispatcher.Dispose(); + _rpcSemaphore.Dispose(); + _confirmSemaphore.Dispose(); + if (_outstandingPublisherConfirmationsRateLimiter is not null) + { + await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() + .ConfigureAwait(false); + } + } + finally + { + _disposed = true; + _isDisposing = false; } } @@ -651,23 +718,38 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) protected async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - var channelClose = new ChannelClose(cmd.MethodSpan); - SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, - channelClose._replyCode, - channelClose._replyText, - channelClose._classId, - channelClose._methodId)); + lock (_locker) + { + _serverOriginatedChannelCloseTcs ??= new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } - await Session.CloseAsync(_closeReason, notify: false) - .ConfigureAwait(false); + try + { + var channelClose = new ChannelClose(cmd.MethodSpan); + SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, + channelClose._replyCode, + channelClose._replyText, + channelClose._classId, + channelClose._methodId)); - var method = new ChannelCloseOk(); - await ModelSendAsync(in method, cancellationToken) - .ConfigureAwait(false); + await Session.CloseAsync(_closeReason, notify: false) + .ConfigureAwait(false); - await Session.NotifyAsync(cancellationToken) - .ConfigureAwait(false); - return true; + var method = new ChannelCloseOk(); + await ModelSendAsync(in method, cancellationToken) + .ConfigureAwait(false); + + await Session.NotifyAsync(cancellationToken) + .ConfigureAwait(false); + + _serverOriginatedChannelCloseTcs.TrySetResult(true); + return true; + } + catch (Exception ex) + { + _serverOriginatedChannelCloseTcs.TrySetException(ex); + throw; + } } protected async Task HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken) diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 68cadcfd7..1f8cafc5a 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -46,6 +46,8 @@ namespace RabbitMQ.Client.Framing internal sealed partial class Connection : IConnection { private bool _disposed; + private bool _isDisposing; + private readonly object _locker = new(); private volatile bool _closed; private readonly ConnectionConfig _config; @@ -485,7 +487,15 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio return _frameHandler.WriteAsync(frames, cancellationToken); } - public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); + public void Dispose() + { + if (_disposed) + { + return; + } + + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } public async ValueTask DisposeAsync() { @@ -494,6 +504,15 @@ public async ValueTask DisposeAsync() return; } + lock (_locker) + { + if (_isDisposing) + { + return; + } + _isDisposing = true; + } + try { if (IsOpen) @@ -515,6 +534,7 @@ await _channel0.DisposeAsync() finally { _disposed = true; + _isDisposing = false; } } diff --git a/projects/Test/Integration/TestChannelShutdown.cs b/projects/Test/Integration/TestChannelShutdown.cs index 1774555b3..3b2e6f2d3 100644 --- a/projects/Test/Integration/TestChannelShutdown.cs +++ b/projects/Test/Integration/TestChannelShutdown.cs @@ -30,6 +30,8 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Impl; @@ -61,5 +63,52 @@ public async Task TestConsumerDispatcherShutdown() await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown"); Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync"); } + + [Fact] + public async Task TestConcurrentDisposeAsync_GH1749() + { + bool sawCallbackException = false; + int channelShutdownCount = 0; + + _channel.CallbackExceptionAsync += (channel, ea) => + { + sawCallbackException = true; + return Task.CompletedTask; + }; + + _channel.ChannelShutdownAsync += (channel, args) => + { + Interlocked.Increment(ref channelShutdownCount); + return Task.CompletedTask; + }; + + var disposeTasks = new List + { + _channel.DisposeAsync(), + _channel.DisposeAsync(), + _channel.DisposeAsync() + }; + + foreach (ValueTask vt in disposeTasks) + { + await vt; + } + + Assert.Equal(1, channelShutdownCount); + Assert.False(sawCallbackException); + + disposeTasks.Clear(); + disposeTasks.Add(_conn.DisposeAsync()); + disposeTasks.Add(_conn.DisposeAsync()); + disposeTasks.Add(_conn.DisposeAsync()); + + foreach (ValueTask vt in disposeTasks) + { + await vt; + } + + _channel = null; + _conn = null; + } } } diff --git a/projects/Test/Integration/TestQueueDeclare.cs b/projects/Test/Integration/TestQueueDeclare.cs index 94e758a64..91510029f 100644 --- a/projects/Test/Integration/TestQueueDeclare.cs +++ b/projects/Test/Integration/TestQueueDeclare.cs @@ -57,6 +57,22 @@ public async Task TestQueueDeclareAsync() Assert.Equal(q, passiveDeclareResult.QueueName); } + [Fact] + public async Task TestPassiveQueueDeclareException_GH1749() + { + string q = GenerateQueueName(); + try + { + await _channel.QueueDeclarePassiveAsync(q); + } + catch (Exception ex) + { + _output.WriteLine("{0} ex: {1}", _testDisplayName, ex); + await _channel.DisposeAsync(); + _channel = null; + } + } + [Fact] public async Task TestConcurrentQueueDeclareAndBindAsync() {