Skip to content

Commit a3c4ec5

Browse files
committed
Create cancellation token from timeout
Fixes #1759 When passing a timeout of 0, `DisposeAsync` would block forever after closing a connection. This change ensures that the timeout is used in a cancellation token.
1 parent 21fb198 commit a3c4ec5

File tree

4 files changed

+72
-22
lines changed

4 files changed

+72
-22
lines changed

projects/RabbitMQ.Client/IConnectionExtensions.cs

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,11 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st
5959
/// To wait infinitely for the close operations to complete use <see cref="System.Threading.Timeout.InfiniteTimeSpan"/>.
6060
/// </para>
6161
/// </remarks>
62-
public static Task CloseAsync(this IConnection connection, TimeSpan timeout)
62+
public static async Task CloseAsync(this IConnection connection, TimeSpan timeout)
6363
{
64-
return connection.CloseAsync(Constants.ReplySuccess, "Goodbye", timeout, false,
65-
CancellationToken.None);
64+
using var cts = new CancellationTokenSource(timeout);
65+
await connection.CloseAsync(Constants.ReplySuccess, "Goodbye", timeout, false, cts.Token)
66+
.ConfigureAwait(false);
6667
}
6768

6869
/// <summary>
@@ -82,10 +83,11 @@ public static Task CloseAsync(this IConnection connection, TimeSpan timeout)
8283
/// Operation timeout.
8384
/// </para>
8485
/// </remarks>
85-
public static Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout)
86+
public static async Task CloseAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout)
8687
{
87-
return connection.CloseAsync(reasonCode, reasonText, timeout, false,
88-
CancellationToken.None);
88+
using var cts = new CancellationTokenSource(timeout);
89+
await connection.CloseAsync(reasonCode, reasonText, timeout, false, cts.Token)
90+
.ConfigureAwait(false);
8991
}
9092

9193
/// <summary>
@@ -97,10 +99,12 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st
9799
/// <see cref="IOException"/> during closing connection.
98100
///This method waits infinitely for the in-progress close operation to complete.
99101
/// </remarks>
100-
public static Task AbortAsync(this IConnection connection)
102+
public static async Task AbortAsync(this IConnection connection)
101103
{
102-
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true,
103-
CancellationToken.None);
104+
using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout);
105+
await connection.CloseAsync(Constants.ReplySuccess,
106+
"Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true, cts.Token)
107+
.ConfigureAwait(false);
104108
}
105109

106110
/// <summary>
@@ -116,10 +120,12 @@ public static Task AbortAsync(this IConnection connection)
116120
/// A message indicating the reason for closing the connection
117121
/// </para>
118122
/// </remarks>
119-
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText)
123+
public static async Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText)
120124
{
121-
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionAbortTimeout, true,
122-
CancellationToken.None);
125+
using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout);
126+
await connection.CloseAsync(reasonCode,
127+
reasonText, InternalConstants.DefaultConnectionAbortTimeout, true, cts.Token)
128+
.ConfigureAwait(false);
123129
}
124130

125131
/// <summary>
@@ -135,10 +141,12 @@ public static Task AbortAsync(this IConnection connection, ushort reasonCode, st
135141
/// To wait infinitely for the close operations to complete use <see cref="Timeout.Infinite"/>.
136142
/// </para>
137143
/// </remarks>
138-
public static Task AbortAsync(this IConnection connection, TimeSpan timeout)
144+
public static async Task AbortAsync(this IConnection connection, TimeSpan timeout)
139145
{
140-
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", timeout, true,
141-
CancellationToken.None);
146+
using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout);
147+
await connection.CloseAsync(Constants.ReplySuccess,
148+
"Connection close forced", timeout, true, cts.Token)
149+
.ConfigureAwait(false);
142150
}
143151

144152
/// <summary>
@@ -155,10 +163,12 @@ public static Task AbortAsync(this IConnection connection, TimeSpan timeout)
155163
/// A message indicating the reason for closing the connection.
156164
/// </para>
157165
/// </remarks>
158-
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout)
166+
public static async Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout)
159167
{
160-
return connection.CloseAsync(reasonCode, reasonText, timeout, true,
161-
CancellationToken.None);
168+
using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout);
169+
await connection.CloseAsync(reasonCode,
170+
reasonText, timeout, true, cts.Token)
171+
.ConfigureAwait(false);
162172
}
163173
}
164174
}

projects/RabbitMQ.Client/Impl/Connection.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,6 @@ internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan ti
330330
}
331331
else
332332
{
333-
cancellationToken.ThrowIfCancellationRequested();
334-
335333
await OnShutdownAsync(reason)
336334
.ConfigureAwait(false);
337335
await _session0.SetSessionClosingAsync(false, cancellationToken)
@@ -518,7 +516,6 @@ await this.AbortAsync()
518516
}
519517

520518
_session0.Dispose();
521-
_mainLoopCts.Dispose();
522519

523520
await _channel0.DisposeAsync()
524521
.ConfigureAwait(false);
@@ -529,6 +526,7 @@ await _channel0.DisposeAsync()
529526
}
530527
finally
531528
{
529+
_mainLoopCts.Dispose();
532530
_disposed = true;
533531
}
534532
}

projects/RabbitMQ.Client/Impl/MainSession.cs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ internal sealed class MainSession : Session, IDisposable
4747
private volatile bool _closeIsServerInitiated;
4848
private volatile bool _closing;
4949
private readonly SemaphoreSlim _closingSemaphore = new SemaphoreSlim(1, 1);
50+
private bool _disposed = false;
5051

5152
public MainSession(Connection connection, uint maxBodyLength)
5253
: base(connection, 0, maxBodyLength)
@@ -83,6 +84,13 @@ public override Task HandleFrameAsync(InboundFrame frame, CancellationToken canc
8384

8485
public async Task SetSessionClosingAsync(bool closeIsServerInitiated, CancellationToken cancellationToken)
8586
{
87+
if (_disposed)
88+
{
89+
_closing = true;
90+
_closeIsServerInitiated = closeIsServerInitiated;
91+
return;
92+
}
93+
8694
if (await _closingSemaphore.WaitAsync(InternalConstants.DefaultConnectionAbortTimeout, cancellationToken)
8795
.ConfigureAwait(false))
8896
{
@@ -122,6 +130,24 @@ public override ValueTask TransmitAsync<T>(in T cmd, CancellationToken cancellat
122130
return base.TransmitAsync(in cmd, cancellationToken);
123131
}
124132

125-
public void Dispose() => ((IDisposable)_closingSemaphore).Dispose();
133+
public void Dispose()
134+
{
135+
if (_disposed)
136+
{
137+
return;
138+
}
139+
140+
try
141+
{
142+
_closingSemaphore.Dispose();
143+
}
144+
catch
145+
{
146+
}
147+
finally
148+
{
149+
_disposed = true;
150+
}
151+
}
126152
}
127153
}

projects/Test/Integration/GH/TestGitHubIssues.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,5 +131,21 @@ public async Task TestHeartbeatTimeoutValue_GH1756()
131131

132132
Assert.True(_conn.Heartbeat != default);
133133
}
134+
135+
[Fact]
136+
public async Task DisposeWhileCatchingTimeoutDeadlocksRepro_GH1759()
137+
{
138+
_connFactory = new ConnectionFactory();
139+
_conn = await _connFactory.CreateConnectionAsync();
140+
try
141+
{
142+
await _conn.CloseAsync(TimeSpan.Zero);
143+
}
144+
catch (Exception)
145+
{
146+
}
147+
148+
await _conn.DisposeAsync();
149+
}
134150
}
135151
}

0 commit comments

Comments
 (0)