Skip to content

Commit 2b07403

Browse files
committed
check if model is closed before sending, and try up to 3 times to send before giving up
1 parent 5bb2142 commit 2b07403

File tree

4 files changed

+47
-19
lines changed

4 files changed

+47
-19
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@
171171
* Interpret delivery count header value as ASCII text with a number, when the type encountered is `byte[]`
172172
* Ensure that RabbitMQ's built-in quorum queue header `x-delivery-count` is cleared when a message is dead-lettered
173173

174+
## 9.4.0
175+
* Screen the `IModel` for fitness before trying to send with it, and perform send operations with up to 3 attempts to hopefully better overcome transient errors
174176

175177
---
176178

Rebus.RabbitMq.Tests/Rebus.RabbitMq.Tests.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@
88
<ProjectReference Include="..\Rebus.RabbitMq\Rebus.RabbitMq.csproj" />
99
</ItemGroup>
1010
<ItemGroup>
11-
<PackageReference Include="microsoft.net.test.sdk" Version="17.9.0" />
11+
<PackageReference Include="microsoft.net.test.sdk" Version="17.10.0" />
1212
<PackageReference Include="nunit" Version="3.14.0" />
13-
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0">
13+
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0">
1414
<PrivateAssets>all</PrivateAssets>
1515
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
1616
</PackageReference>
1717
<PackageReference Include="rabbitmq.client" Version="6.8.1" />
1818
<PackageReference Include="rebus.fleetmanager" Version="6.2.0" />
1919
<PackageReference Include="rebus.serviceprovider" Version="10.0.0" />
2020
<PackageReference Include="rebus.tests.contracts" Version="8.2.6" />
21-
<PackageReference Include="Testcontainers.RabbitMq" Version="3.8.0" />
21+
<PackageReference Include="Testcontainers.RabbitMq" Version="3.9.0" />
2222
</ItemGroup>
2323
<ItemGroup>
2424
<Folder Include="Internals\" />

Rebus.RabbitMq/Internals/ModelExtensions.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ static class ModelExtensions
77
/// <summary>
88
/// Disposes the specific
99
/// </summary>
10-
/// <param name="model"></param>
1110
internal static void SafeDrop(this IModel model)
1211
{
1312
if (model == null)

Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ public class RabbitMqTransport : AbstractRebusTransport, IDisposable, IInitializ
4141
/// </summary>
4242
const int QueueDoesNotExist = 404;
4343

44+
/// <summary>
45+
/// Defines how many attempts to make at sending outgoung messages before letting exceptions bubble out
46+
/// </summary>
47+
const int WriterAttempts = 3;
48+
4449
static readonly Encoding HeaderValueEncoding = Encoding.UTF8;
4550

4651
readonly SemaphoreSlim _consumerLock = new(1, 1);
@@ -54,15 +59,15 @@ public class RabbitMqTransport : AbstractRebusTransport, IDisposable, IInitializ
5459
readonly SemaphoreSlim _subscriptionSemaphore = new(1, 1);
5560
readonly ConnectionManager _connectionManager;
5661
readonly ILog _log;
57-
62+
5863
ushort _maxMessagesToPrefetch;
5964

6065
bool _declareExchanges = true;
6166
bool _declareInputQueue = true;
6267
bool _bindInputQueue = true;
6368
bool _publisherConfirmsEnabled = true;
6469
int _batchSize = 1;
65-
70+
6671
TimeSpan _publisherConfirmsTimeout;
6772

6873
string _consumerTag = null;
@@ -428,24 +433,46 @@ protected override async Task SendOutgoingMessages(IEnumerable<OutgoingTransport
428433

429434
if (!messages.Any()) return;
430435

431-
var model = _writerPool.Get();
436+
var expressMessages = messages.Where(m => m.IsExpress).Select(m => m.Message).ToList();
437+
var ordinaryMessages = messages.Where(m => !m.IsExpress).Select(m => m.Message).ToList();
432438

433-
try
439+
var attempt = 0;
440+
441+
while (true)
434442
{
435-
var expressMessages = messages.Where(m => m.IsExpress).Select(m => m.Message).ToList();
436-
var ordinaryMessages = messages.Where(m => !m.IsExpress).Select(m => m.Message).ToList();
443+
var model = _writerPool.Get();
444+
445+
try
446+
{
447+
// if the model is not fit, discard it and try a new one
448+
if (model.IsClosed)
449+
{
450+
model.SafeDrop();
451+
continue;
452+
}
437453

438-
DoSend(expressMessages, model, isExpress: true);
454+
// otherwise, count this as an attempt and try to do it
455+
attempt++;
439456

440-
DoSend(ordinaryMessages, model, isExpress: false);
457+
DoSend(expressMessages, model, isExpress: true);
458+
DoSend(ordinaryMessages, model, isExpress: false);
459+
460+
// remember to return the model
461+
_writerPool.Return(model);
462+
463+
return; //< success - we're done!
464+
}
465+
catch (Exception)
466+
{
467+
// if anything goes wrong when using this IModel, asummed it's faulted and drop it
468+
model.SafeDrop();
441469

442-
_writerPool.Return(model);
443-
}
444-
catch (Exception)
445-
{
446-
// if anything goes wrong when using this IModel, just drop it
447-
model.SafeDrop();
448-
throw;
470+
// if the built-in number of retries has been exceeded, let the error bubble out
471+
if (attempt > WriterAttempts)
472+
{
473+
throw;
474+
}
475+
}
449476
}
450477
}
451478

0 commit comments

Comments
 (0)