Skip to content

Commit 3989e91

Browse files
committed
fix(server): at-least-once messages received during subscription must be delivered
1 parent 4f9e54b commit 3989e91

File tree

2 files changed

+60
-0
lines changed

2 files changed

+60
-0
lines changed

Source/MQTTnet.Tests/Server/Retained_Messages_Tests.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// See the LICENSE file in the project root for more information.
44

55
using System.Linq;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using Microsoft.VisualStudio.TestTools.UnitTesting;
89
using MQTTnet.Client;
@@ -169,6 +170,39 @@ public async Task Receive_Retained_Message_After_Subscribe()
169170
}
170171
}
171172

173+
[TestMethod]
174+
public async Task Receive_AtLeastOnce_Retained_Message_Published_During_Subscribe()
175+
{
176+
using (var testEnvironment = CreateTestEnvironment())
177+
{
178+
var messagePublished = new SemaphoreSlim(0,1);
179+
var subscribeReceived = new SemaphoreSlim(0,1);
180+
await testEnvironment.StartServer();
181+
testEnvironment.Server.InterceptingSubscriptionAsync += _ =>
182+
{
183+
subscribeReceived.Release();
184+
return messagePublished.WaitAsync();
185+
};
186+
187+
var c1 = await testEnvironment.ConnectClient();
188+
189+
var c2 = await testEnvironment.ConnectClient();
190+
var messageHandler = testEnvironment.CreateApplicationMessageHandler(c2);
191+
192+
Task subscribeComplete = c2.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("retained").WithAtLeastOnceQoS().Build());
193+
await subscribeReceived.WaitAsync(1000);
194+
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce).Build());
195+
await c1.DisconnectAsync();
196+
197+
messagePublished.Release();
198+
await subscribeComplete;
199+
await Task.Delay(500);
200+
201+
messageHandler.AssertReceivedCountEquals(1);
202+
Assert.IsTrue(messageHandler.ReceivedEventArgs.First().ApplicationMessage.Retain);
203+
}
204+
}
205+
172206
[TestMethod]
173207
public async Task Receive_Retained_Messages_From_Higher_Qos_Level()
174208
{

Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using MQTTnet.Internal;
1111
using MQTTnet.Packets;
1212
using MQTTnet.Protocol;
13+
using static MQTTnet.Server.MqttClientSubscriptionsManager;
1314

1415
namespace MQTTnet.Server
1516
{
@@ -177,6 +178,13 @@ public async Task<SubscribeResult> Subscribe(MqttSubscribePacket subscribePacket
177178

178179
var addedSubscriptions = new List<string>();
179180
var finalTopicFilters = new List<MqttTopicFilter>();
181+
var atLeastOnceSubscriptionResults = new List<CreateSubscriptionResult>();
182+
183+
IList<MqttApplicationMessage> retainedApplicationMessages = null;
184+
if (subscribePacket.TopicFilters.Any(f => f.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtLeastOnce))
185+
{
186+
retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false);
187+
}
180188

181189
// The topic filters are order by its QoS so that the higher QoS will win over a
182190
// lower one.
@@ -208,6 +216,24 @@ public async Task<SubscribeResult> Subscribe(MqttSubscribePacket subscribePacket
208216
finalTopicFilters.Add(topicFilter);
209217

210218
FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result);
219+
if (createSubscriptionResult.Subscription.GrantedQualityOfServiceLevel != MqttQualityOfServiceLevel.AtLeastOnce)
220+
{
221+
FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result);
222+
}
223+
else
224+
{
225+
atLeastOnceSubscriptionResults.Add(createSubscriptionResult);
226+
}
227+
}
228+
229+
if (atLeastOnceSubscriptionResults.Count != 0)
230+
{
231+
// In order to satisfy at least once, we must query for retained messages after creating the subscription.
232+
retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false);
233+
foreach (var createSubscriptionResult in atLeastOnceSubscriptionResults)
234+
{
235+
FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result);
236+
}
211237
}
212238

213239
// This call will add the new subscription to the internal storage.

0 commit comments

Comments
 (0)