Skip to content

Commit

Permalink
Ensure broker-originated channel closure completes
Browse files Browse the repository at this point in the history
Fixes #1749

* Ensure `Dispose` and `DisposeAsync` are idempotent and thread-safe.
* Use TaskCompletionSource when `HandleChannelCloseAsync` runs to allow dispose methods to wait.
  • Loading branch information
lukebakken committed Jan 7, 2025
1 parent 5b1c9cc commit c4200ab
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 41 deletions.
38 changes: 32 additions & 6 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private AutorecoveringConnection _connection;
private RecoveryAwareChannel _innerChannel;
private bool _disposed;
private bool _isDisposing;
private readonly object _locker = new();

private ushort _prefetchCountConsumer;
private ushort _prefetchCountGlobal;
Expand Down Expand Up @@ -252,7 +254,15 @@ await _connection.DeleteRecordedChannelAsync(this,
public override string ToString()
=> InnerChannel.ToString();

public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
public void Dispose()
{
if (_disposed)
{
return;
}

DisposeAsync().AsTask().GetAwaiter().GetResult();
}

public async ValueTask DisposeAsync()
{
Expand All @@ -261,14 +271,30 @@ public async ValueTask DisposeAsync()
return;
}

if (IsOpen)
lock (_locker)
{
await this.AbortAsync()
.ConfigureAwait(false);
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

_recordedConsumerTags.Clear();
_disposed = true;
try
{
if (IsOpen)
{
await this.AbortAsync()
.ConfigureAwait(false);
}

_recordedConsumerTags.Clear();
}
finally
{
_disposed = true;
_isDisposing = false;
}
}

public ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken);
Expand Down
31 changes: 26 additions & 5 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ internal sealed partial class AutorecoveringConnection : IConnection

private Connection _innerConnection;
private bool _disposed;
private bool _isDisposing;
private readonly object _locker = new();

private Connection InnerConnection
{
Expand Down Expand Up @@ -268,7 +270,15 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca
return autorecoveringChannel;
}

public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
public void Dispose()
{
if (_disposed)
{
return;
}

DisposeAsync().AsTask().GetAwaiter().GetResult();
}

public async ValueTask DisposeAsync()
{
Expand All @@ -277,22 +287,33 @@ public async ValueTask DisposeAsync()
return;
}

lock (_locker)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
await _innerConnection.DisposeAsync()
.ConfigureAwait(false);

_channels.Clear();
_recordedEntitiesSemaphore.Dispose();
_channelsSemaphore.Dispose();
_recoveryCancellationTokenSource.Dispose();
}
catch (OperationInterruptedException)
{
// ignored, see rabbitmq/rabbitmq-dotnet-client#133
}
finally
{
_channels.Clear();
_recordedEntitiesSemaphore.Dispose();
_channelsSemaphore.Dispose();
_recoveryCancellationTokenSource.Dispose();
_disposed = true;
_isDisposing = false;
}
}

Expand Down
140 changes: 111 additions & 29 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,15 @@ internal partial class Channel : IChannel, IRecoverable
private ShutdownEventArgs? _closeReason;
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);

private TaskCompletionSource<bool>? _serverOriginatedChannelCloseTcs;

internal readonly IConsumerDispatcher ConsumerDispatcher;

private bool _disposed;
private bool _isDisposing;

private readonly object _locker = new();

public Channel(ISession session, CreateChannelOptions createChannelOptions)
{
ContinuationTimeout = createChannelOptions.ContinuationTimeout;
Expand Down Expand Up @@ -514,22 +521,54 @@ public override string ToString()

void IDisposable.Dispose()
{
if (_disposed)
{
return;
}

Dispose(true);
}

protected virtual void Dispose(bool disposing)
{
if (disposing)
if (_disposed)
{
if (IsOpen)
return;
}

lock (_locker)
{
if (_isDisposing)
{
this.AbortAsync().GetAwaiter().GetResult();
return;
}
_isDisposing = true;
}

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
if (disposing)
{
try
{
if (IsOpen)
{
this.AbortAsync().GetAwaiter().GetResult();
}

if (_serverOriginatedChannelCloseTcs is not null)
{
_serverOriginatedChannelCloseTcs.Task.Wait(TimeSpan.FromSeconds(5));
}

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
}
finally
{
_disposed = true;
_isDisposing = false;
}
}
}

Expand All @@ -543,18 +582,46 @@ await DisposeAsyncCore()

protected virtual async ValueTask DisposeAsyncCore()
{
if (IsOpen)
if (_disposed)
{
await this.AbortAsync().ConfigureAwait(false);
return;
}

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
if (_outstandingPublisherConfirmationsRateLimiter is not null)
lock (_locker)
{
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
.ConfigureAwait(false);
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
if (IsOpen)
{
await this.AbortAsync().ConfigureAwait(false);
}

if (_serverOriginatedChannelCloseTcs is not null)
{
await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5))
.ConfigureAwait(false);
}

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
if (_outstandingPublisherConfirmationsRateLimiter is not null)
{
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
.ConfigureAwait(false);
}
}
finally
{
_disposed = true;
_isDisposing = false;
}
}

Expand Down Expand Up @@ -651,23 +718,38 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)

protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
var channelClose = new ChannelClose(cmd.MethodSpan);
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
channelClose._replyCode,
channelClose._replyText,
channelClose._classId,
channelClose._methodId));
lock (_locker)
{
_serverOriginatedChannelCloseTcs ??= new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}

await Session.CloseAsync(_closeReason, notify: false)
.ConfigureAwait(false);
try
{
var channelClose = new ChannelClose(cmd.MethodSpan);
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
channelClose._replyCode,
channelClose._replyText,
channelClose._classId,
channelClose._methodId));

var method = new ChannelCloseOk();
await ModelSendAsync(in method, cancellationToken)
.ConfigureAwait(false);
await Session.CloseAsync(_closeReason, notify: false)
.ConfigureAwait(false);

await Session.NotifyAsync(cancellationToken)
.ConfigureAwait(false);
return true;
var method = new ChannelCloseOk();
await ModelSendAsync(in method, cancellationToken)
.ConfigureAwait(false);

await Session.NotifyAsync(cancellationToken)
.ConfigureAwait(false);

_serverOriginatedChannelCloseTcs.TrySetResult(true);
return true;
}
catch (Exception ex)
{
_serverOriginatedChannelCloseTcs.TrySetException(ex);
throw;
}
}

protected async Task<bool> HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken)
Expand Down
22 changes: 21 additions & 1 deletion projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ namespace RabbitMQ.Client.Framing
internal sealed partial class Connection : IConnection
{
private bool _disposed;
private bool _isDisposing;
private readonly object _locker = new();
private volatile bool _closed;

private readonly ConnectionConfig _config;
Expand Down Expand Up @@ -485,7 +487,15 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio
return _frameHandler.WriteAsync(frames, cancellationToken);
}

public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
public void Dispose()
{
if (_disposed)
{
return;
}

DisposeAsync().AsTask().GetAwaiter().GetResult();
}

public async ValueTask DisposeAsync()
{
Expand All @@ -494,6 +504,15 @@ public async ValueTask DisposeAsync()
return;
}

lock (_locker)
{
if (_isDisposing)
{
return;
}
_isDisposing = true;
}

try
{
if (IsOpen)
Expand All @@ -515,6 +534,7 @@ await _channel0.DisposeAsync()
finally
{
_disposed = true;
_isDisposing = false;
}
}

Expand Down
Loading

0 comments on commit c4200ab

Please sign in to comment.