From 16480c38ae4bf997ef664cf9ba5f0c8ff4cbf23d Mon Sep 17 00:00:00 2001 From: AlexNik4 Date: Fri, 11 Apr 2025 16:30:06 -0400 Subject: [PATCH 1/9] Performance: Implemented simple topic subscription management in MqttClientSessionsManager. --- .../Internal/MqttClientSessionsManager.cs | 44 ++++++++++++++++++- Source/MQTTnet.Server/Internal/MqttSession.cs | 11 ++++- Source/MQTTnet/MqttTopicFilterComparer.cs | 14 ++++++ 3 files changed, 66 insertions(+), 3 deletions(-) diff --git a/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs index 2356b554b..6eda5d1f2 100644 --- a/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs @@ -31,6 +31,7 @@ public sealed class MqttClientSessionsManager : ISubscriptionChangedNotification // See the MqttSubscription object for a detailed explanation. readonly MqttSessionsStorage _sessionsStorage = new(); readonly HashSet _subscriberSessions = []; + readonly Dictionary> _simpleTopicsToSession = []; public MqttClientSessionsManager(MqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, MqttServerEventContainer eventContainer, IMqttNetLogger logger) { @@ -78,6 +79,17 @@ public async Task DeleteSessionAsync(string clientId) if (_sessionsStorage.TryRemoveSession(clientId, out session)) { _subscriberSessions.Remove(session); + foreach (var simpleTopic in session.GetSimpleSubscribedTopics) + { + if (_simpleTopicsToSession.TryGetValue(simpleTopic, out var sessionsWithSimpleTopics)) + { + sessionsWithSimpleTopics.Remove(session); + if (sessionsWithSimpleTopics.Count == 0) + { + _simpleTopicsToSession.Remove(simpleTopic); + } + } + } } } finally @@ -165,7 +177,15 @@ public async Task DispatchApplicationMessage( _sessionsManagementLock.EnterReadLock(); try { - subscriberSessions = _subscriberSessions.ToList(); + if (_simpleTopicsToSession.TryGetValue(applicationMessage.Topic, out var sessionsWithSimpleTopics)) + { + subscriberSessions = sessionsWithSimpleTopics.ToList(); + } + else + { + // No simple topic match. Check all subscribers. + subscriberSessions = _subscriberSessions.ToList(); + } } finally { @@ -459,7 +479,19 @@ public void OnSubscriptionsAdded(MqttSession clientSession, List topics) foreach (var topic in topics) { - clientSession.AddSubscribedTopic(topic); + bool isSimpleTopic = !MqttTopicFilterComparer.ContainsWildcards(topic); + if (isSimpleTopic) + { + if (_simpleTopicsToSession.TryGetValue(topic, out var sessionsWithSimpleTopics)) + { + sessionsWithSimpleTopics.Add(clientSession); + } + else + { + _simpleTopicsToSession[topic] = [clientSession]; + } + } + clientSession.AddSubscribedTopic(topic, isSimpleTopic); } } finally @@ -475,6 +507,14 @@ public void OnSubscriptionsRemoved(MqttSession clientSession, List subsc { foreach (var subscriptionTopic in subscriptionTopics) { + if (_simpleTopicsToSession.TryGetValue(subscriptionTopic, out var sessionsWithSimpleTopics)) + { + sessionsWithSimpleTopics.Remove(clientSession); + if (sessionsWithSimpleTopics.Count == 0) + { + _simpleTopicsToSession.Remove(subscriptionTopic); + } + } clientSession.RemoveSubscribedTopic(subscriptionTopic); } diff --git a/Source/MQTTnet.Server/Internal/MqttSession.cs b/Source/MQTTnet.Server/Internal/MqttSession.cs index 7e3ab1279..6ab639815 100644 --- a/Source/MQTTnet.Server/Internal/MqttSession.cs +++ b/Source/MQTTnet.Server/Internal/MqttSession.cs @@ -25,6 +25,7 @@ public sealed class MqttSession : IDisposable // Bookkeeping to know if this is a subscribing client; lazy initialize later. HashSet _subscribedTopics; + HashSet _subscribedSimpleTopics; public MqttSession( MqttConnectPacket connectPacket, @@ -52,6 +53,8 @@ public MqttSession( public bool HasSubscribedTopics => _subscribedTopics != null && _subscribedTopics.Count > 0; + public HashSet GetSimpleSubscribedTopics => _subscribedSimpleTopics ?? []; + public string Id => _connectPacket.ClientId; public string UserName => _connectPacket.Username; @@ -79,14 +82,19 @@ public MqttPublishPacket AcknowledgePublishPacket(ushort packetIdentifier) return publishPacket; } - public void AddSubscribedTopic(string topic) + public void AddSubscribedTopic(string topic, bool isSimpleTopic) { if (_subscribedTopics == null) { _subscribedTopics = new HashSet(); + _subscribedSimpleTopics = new HashSet(); } _subscribedTopics.Add(topic); + if (isSimpleTopic) + { + _subscribedSimpleTopics.Add(topic); + } } public Task DeleteAsync() @@ -209,6 +217,7 @@ public void Recover() public void RemoveSubscribedTopic(string topic) { _subscribedTopics?.Remove(topic); + _subscribedSimpleTopics?.Remove(topic); } public Task Subscribe(MqttSubscribePacket subscribePacket, CancellationToken cancellationToken) diff --git a/Source/MQTTnet/MqttTopicFilterComparer.cs b/Source/MQTTnet/MqttTopicFilterComparer.cs index 72285a82f..815d85970 100644 --- a/Source/MQTTnet/MqttTopicFilterComparer.cs +++ b/Source/MQTTnet/MqttTopicFilterComparer.cs @@ -174,5 +174,19 @@ public static unsafe MqttTopicFilterCompareResult Compare(string topic, string f return MqttTopicFilterCompareResult.NoMatch; } + + public static bool ContainsWildcards(string topicFilter) + { + for (var i = 0; i < topicFilter.Length; i++) + { + var c = topicFilter[i]; + if (c == MultiLevelWildcard || c == SingleLevelWildcard) + { + return true; + } + } + + return false; + } } } \ No newline at end of file From d4240cdd103f9573ae8f03051de0ec4153f02c5e Mon Sep 17 00:00:00 2001 From: AlexNik4 Date: Fri, 11 Apr 2025 22:07:12 -0400 Subject: [PATCH 2/9] Refactor MQTT session management to separate wildcard and simple topic subscriptions. This fixes the scenario where a topic now correctly matches both a simple subscription and a wildcard subscription. This also improves performance where if a simple topic is not matched, then only the sessions with wildcard topics will be checked. --- .../Internal/MqttClientSessionsManager.cs | 78 ++++++++++++------- Source/MQTTnet.Server/Internal/MqttSession.cs | 24 +++--- 2 files changed, 59 insertions(+), 43 deletions(-) diff --git a/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs index 6eda5d1f2..987a3af45 100644 --- a/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs @@ -30,7 +30,7 @@ public sealed class MqttClientSessionsManager : ISubscriptionChangedNotification // The _sessions dictionary contains all session, the _subscriberSessions hash set contains subscriber sessions only. // See the MqttSubscription object for a detailed explanation. readonly MqttSessionsStorage _sessionsStorage = new(); - readonly HashSet _subscriberSessions = []; + readonly HashSet _subscriberSessionsWithWildcards = []; readonly Dictionary> _simpleTopicsToSession = []; public MqttClientSessionsManager(MqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, MqttServerEventContainer eventContainer, IMqttNetLogger logger) @@ -78,18 +78,7 @@ public async Task DeleteSessionAsync(string clientId) { if (_sessionsStorage.TryRemoveSession(clientId, out session)) { - _subscriberSessions.Remove(session); - foreach (var simpleTopic in session.GetSimpleSubscribedTopics) - { - if (_simpleTopicsToSession.TryGetValue(simpleTopic, out var sessionsWithSimpleTopics)) - { - sessionsWithSimpleTopics.Remove(session); - if (sessionsWithSimpleTopics.Count == 0) - { - _simpleTopicsToSession.Remove(simpleTopic); - } - } - } + CleanupClientSessionUnsafe(session); } } finally @@ -173,18 +162,29 @@ public async Task DispatchApplicationMessage( await _retainedMessagesManager.UpdateMessage(senderId, applicationMessage).ConfigureAwait(false); } - List subscriberSessions; + HashSet subscriberSessions; _sessionsManagementLock.EnterReadLock(); try { if (_simpleTopicsToSession.TryGetValue(applicationMessage.Topic, out var sessionsWithSimpleTopics)) { - subscriberSessions = sessionsWithSimpleTopics.ToList(); + // Create the initial subscriberSessions from whichever set is larger to take advantage + // of the internal ConstructFrom other HashSet optimizations + if(sessionsWithSimpleTopics.Count > _subscriberSessionsWithWildcards.Count) + { + subscriberSessions = new HashSet(sessionsWithSimpleTopics); + subscriberSessions.UnionWith(_subscriberSessionsWithWildcards); + } + else + { + subscriberSessions = new HashSet(_subscriberSessionsWithWildcards); + subscriberSessions.UnionWith(sessionsWithSimpleTopics); + } } else { - // No simple topic match. Check all subscribers. - subscriberSessions = _subscriberSessions.ToList(); + // Always include the sessions with wildcards. They need to be properly matched against the topic filter. + subscriberSessions = new HashSet(_subscriberSessionsWithWildcards); } } finally @@ -471,16 +471,17 @@ public void OnSubscriptionsAdded(MqttSession clientSession, List topics) _sessionsManagementLock.EnterWriteLock(); try { - if (!clientSession.HasSubscribedTopics) - { - // first subscribed topic - _subscriberSessions.Add(clientSession); - } - foreach (var topic in topics) { - bool isSimpleTopic = !MqttTopicFilterComparer.ContainsWildcards(topic); - if (isSimpleTopic) + bool hasWildcard = MqttTopicFilterComparer.ContainsWildcards(topic); + if (hasWildcard) + { + if (!clientSession.HasSubscribedWildcardTopics) + { + _subscriberSessionsWithWildcards.Add(clientSession); + } + } + else { if (_simpleTopicsToSession.TryGetValue(topic, out var sessionsWithSimpleTopics)) { @@ -491,7 +492,7 @@ public void OnSubscriptionsAdded(MqttSession clientSession, List topics) _simpleTopicsToSession[topic] = [clientSession]; } } - clientSession.AddSubscribedTopic(topic, isSimpleTopic); + clientSession.AddSubscribedTopic(topic, hasWildcard); } } finally @@ -518,10 +519,10 @@ public void OnSubscriptionsRemoved(MqttSession clientSession, List subsc clientSession.RemoveSubscribedTopic(subscriptionTopic); } - if (!clientSession.HasSubscribedTopics) + if (!clientSession.HasSubscribedWildcardTopics) { - // last subscription removed - _subscriberSessions.Remove(clientSession); + // Last wildcard subscription removed + _subscriberSessionsWithWildcards.Remove(clientSession); } } finally @@ -604,7 +605,7 @@ async Task CreateClientConnection( if (connectPacket.CleanSession) { _logger.Verbose("Deleting existing session of client '{0}' due to clean start", connectPacket.ClientId); - _subscriberSessions.Remove(oldSession); + CleanupClientSessionUnsafe(oldSession); session = CreateSession(connectPacket, validatingConnectionEventArgs); } else @@ -709,6 +710,23 @@ MqttSession GetClientSession(string clientId) } } + //* Must be called with the _sessionsManagementLock held. + void CleanupClientSessionUnsafe(MqttSession session) + { + _subscriberSessionsWithWildcards.Remove(session); + foreach (var simpleTopic in session.GetSimpleSubscribedTopics) + { + if (_simpleTopicsToSession.TryGetValue(simpleTopic, out var sessionsWithSimpleTopics)) + { + sessionsWithSimpleTopics.Remove(session); + if (sessionsWithSimpleTopics.Count == 0) + { + _simpleTopicsToSession.Remove(simpleTopic); + } + } + } + } + async Task ReceiveConnectPacket(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) { try diff --git a/Source/MQTTnet.Server/Internal/MqttSession.cs b/Source/MQTTnet.Server/Internal/MqttSession.cs index 6ab639815..a8f7b0b0e 100644 --- a/Source/MQTTnet.Server/Internal/MqttSession.cs +++ b/Source/MQTTnet.Server/Internal/MqttSession.cs @@ -23,9 +23,8 @@ public sealed class MqttSession : IDisposable // Do not use a dictionary in order to keep the ordering of the messages. readonly List _unacknowledgedPublishPackets = new(); - // Bookkeeping to know if this is a subscribing client; lazy initialize later. - HashSet _subscribedTopics; - HashSet _subscribedSimpleTopics; + HashSet _subscribedSimpleTopics = []; + HashSet _subscribedWildcardTopics = []; public MqttSession( MqttConnectPacket connectPacket, @@ -51,7 +50,9 @@ public MqttSession( public uint ExpiryInterval => _connectPacket.SessionExpiryInterval; - public bool HasSubscribedTopics => _subscribedTopics != null && _subscribedTopics.Count > 0; + public bool HasSubscribedWildcardTopics => _subscribedWildcardTopics.Count > 0; + + public bool HasSubscribedTopics => _subscribedSimpleTopics.Count > 0 || _subscribedWildcardTopics.Count > 0; public HashSet GetSimpleSubscribedTopics => _subscribedSimpleTopics ?? []; @@ -82,16 +83,13 @@ public MqttPublishPacket AcknowledgePublishPacket(ushort packetIdentifier) return publishPacket; } - public void AddSubscribedTopic(string topic, bool isSimpleTopic) + public void AddSubscribedTopic(string topic, bool isWildcardTopic) { - if (_subscribedTopics == null) + if (isWildcardTopic) { - _subscribedTopics = new HashSet(); - _subscribedSimpleTopics = new HashSet(); + _subscribedWildcardTopics.Add(topic); } - - _subscribedTopics.Add(topic); - if (isSimpleTopic) + else { _subscribedSimpleTopics.Add(topic); } @@ -216,8 +214,8 @@ public void Recover() public void RemoveSubscribedTopic(string topic) { - _subscribedTopics?.Remove(topic); - _subscribedSimpleTopics?.Remove(topic); + _subscribedSimpleTopics.Remove(topic); + _subscribedWildcardTopics.Remove(topic); } public Task Subscribe(MqttSubscribePacket subscribePacket, CancellationToken cancellationToken) From 99ec1e4bbac306c5cc5653d6651d836c82c31356 Mon Sep 17 00:00:00 2001 From: AlexNik4 Date: Fri, 11 Apr 2025 23:16:47 -0400 Subject: [PATCH 3/9] _subscribedSimpleTopics can't be null --- Source/MQTTnet.Server/Internal/MqttSession.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/MQTTnet.Server/Internal/MqttSession.cs b/Source/MQTTnet.Server/Internal/MqttSession.cs index a8f7b0b0e..1051f8af6 100644 --- a/Source/MQTTnet.Server/Internal/MqttSession.cs +++ b/Source/MQTTnet.Server/Internal/MqttSession.cs @@ -54,7 +54,7 @@ public MqttSession( public bool HasSubscribedTopics => _subscribedSimpleTopics.Count > 0 || _subscribedWildcardTopics.Count > 0; - public HashSet GetSimpleSubscribedTopics => _subscribedSimpleTopics ?? []; + public HashSet GetSimpleSubscribedTopics => _subscribedSimpleTopics; public string Id => _connectPacket.ClientId; From 8ca221c95e001939a154638ce87a7dfe6846c65a Mon Sep 17 00:00:00 2001 From: AlexNik4 Date: Fri, 11 Apr 2025 23:26:31 -0400 Subject: [PATCH 4/9] Remove unused HasSubscribedTopics property from MqttSession --- Source/MQTTnet.Server/Internal/MqttSession.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/Source/MQTTnet.Server/Internal/MqttSession.cs b/Source/MQTTnet.Server/Internal/MqttSession.cs index 1051f8af6..1528f6e06 100644 --- a/Source/MQTTnet.Server/Internal/MqttSession.cs +++ b/Source/MQTTnet.Server/Internal/MqttSession.cs @@ -52,8 +52,6 @@ public MqttSession( public bool HasSubscribedWildcardTopics => _subscribedWildcardTopics.Count > 0; - public bool HasSubscribedTopics => _subscribedSimpleTopics.Count > 0 || _subscribedWildcardTopics.Count > 0; - public HashSet GetSimpleSubscribedTopics => _subscribedSimpleTopics; public string Id => _connectPacket.ClientId; From f07a0afc8253ba07a33ca4e5782f14e86755ef17 Mon Sep 17 00:00:00 2001 From: AlexNik4 Date: Sun, 13 Apr 2025 15:35:32 -0400 Subject: [PATCH 5/9] Make subscribed topic collections readonly in MqttSession --- Source/MQTTnet.Server/Internal/MqttSession.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet.Server/Internal/MqttSession.cs b/Source/MQTTnet.Server/Internal/MqttSession.cs index 1528f6e06..0d7ffca98 100644 --- a/Source/MQTTnet.Server/Internal/MqttSession.cs +++ b/Source/MQTTnet.Server/Internal/MqttSession.cs @@ -23,8 +23,8 @@ public sealed class MqttSession : IDisposable // Do not use a dictionary in order to keep the ordering of the messages. readonly List _unacknowledgedPublishPackets = new(); - HashSet _subscribedSimpleTopics = []; - HashSet _subscribedWildcardTopics = []; + readonly HashSet _subscribedSimpleTopics = []; + readonly HashSet _subscribedWildcardTopics = []; public MqttSession( MqttConnectPacket connectPacket, From 527f2f1922794ffe78af77708c8e13a74821b2af Mon Sep 17 00:00:00 2001 From: AlexNik4 Date: Tue, 15 Apr 2025 13:01:10 -0400 Subject: [PATCH 6/9] Improving the ContainsWildcards method --- .../TopicFilterComparer_Tests.cs | 19 ++++++++++++++++++- Source/MQTTnet/MqttTopicFilterComparer.cs | 10 +++------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/Source/MQTTnet.Tests/TopicFilterComparer_Tests.cs b/Source/MQTTnet.Tests/TopicFilterComparer_Tests.cs index 61f8987cd..f08641ac1 100644 --- a/Source/MQTTnet.Tests/TopicFilterComparer_Tests.cs +++ b/Source/MQTTnet.Tests/TopicFilterComparer_Tests.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information. using Microsoft.VisualStudio.TestTools.UnitTesting; -using MQTTnet.Server; using MQTTnet.Server.Internal; namespace MQTTnet.Tests @@ -147,6 +146,24 @@ public void SingleLevel_Finance() CompareAndAssert("/finance", "+", MqttTopicFilterCompareResult.NoMatch); } + [TestMethod] + public void CotainsWildcard_No() + { + Assert.IsFalse(MqttTopicFilterComparer.ContainsWildcards("A/B/C/D")); + } + + [TestMethod] + public void CotainsWildcard_MultiLevel() + { + Assert.IsTrue(MqttTopicFilterComparer.ContainsWildcards("A/B/C/#")); + } + + [TestMethod] + public void CotainsWildcard_SingleLevel() + { + Assert.IsTrue(MqttTopicFilterComparer.ContainsWildcards("A/B/+/D")); + } + static void CompareAndAssert(string topic, string filter, MqttTopicFilterCompareResult expectedResult) { Assert.AreEqual(expectedResult, MqttTopicFilterComparer.Compare(topic, filter)); diff --git a/Source/MQTTnet/MqttTopicFilterComparer.cs b/Source/MQTTnet/MqttTopicFilterComparer.cs index 815d85970..55a924486 100644 --- a/Source/MQTTnet/MqttTopicFilterComparer.cs +++ b/Source/MQTTnet/MqttTopicFilterComparer.cs @@ -177,16 +177,12 @@ public static unsafe MqttTopicFilterCompareResult Compare(string topic, string f public static bool ContainsWildcards(string topicFilter) { - for (var i = 0; i < topicFilter.Length; i++) + if (topicFilter[topicFilter.Length - 1] == MultiLevelWildcard) { - var c = topicFilter[i]; - if (c == MultiLevelWildcard || c == SingleLevelWildcard) - { - return true; - } + return true; } - return false; + return topicFilter.Contains(SingleLevelWildcard); } } } \ No newline at end of file From 662056f5d36cdc7798dccab51991365a9fe30c2c Mon Sep 17 00:00:00 2001 From: AlexNik4 Date: Tue, 15 Apr 2025 22:49:59 -0400 Subject: [PATCH 7/9] Addressing review comments --- .../ISubscriptionChangedNotification.cs | 2 +- .../Internal/MqttClientSessionsManager.cs | 15 +++++++------- .../MqttClientSubscriptionsManager.cs | 4 ++-- Source/MQTTnet.Server/Internal/MqttSession.cs | 2 +- .../TopicFilterComparer_Tests.cs | 20 +------------------ Source/MQTTnet/MqttTopicFilterComparer.cs | 10 ---------- 6 files changed, 12 insertions(+), 41 deletions(-) diff --git a/Source/MQTTnet.Server/Internal/ISubscriptionChangedNotification.cs b/Source/MQTTnet.Server/Internal/ISubscriptionChangedNotification.cs index af9538df6..17ac338cf 100644 --- a/Source/MQTTnet.Server/Internal/ISubscriptionChangedNotification.cs +++ b/Source/MQTTnet.Server/Internal/ISubscriptionChangedNotification.cs @@ -2,7 +2,7 @@ namespace MQTTnet.Server.Internal { public interface ISubscriptionChangedNotification { - void OnSubscriptionsAdded(MqttSession clientSession, List subscriptionsTopics); + void OnSubscriptionsAdded(MqttSession clientSession, List subscriptionsTopics); void OnSubscriptionsRemoved(MqttSession clientSession, List subscriptionTopics); } diff --git a/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs index 987a3af45..a330569d9 100644 --- a/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs @@ -466,15 +466,14 @@ public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter } } - public void OnSubscriptionsAdded(MqttSession clientSession, List topics) + public void OnSubscriptionsAdded(MqttSession clientSession, List subscriptions) { _sessionsManagementLock.EnterWriteLock(); try { - foreach (var topic in topics) + foreach (var subscription in subscriptions) { - bool hasWildcard = MqttTopicFilterComparer.ContainsWildcards(topic); - if (hasWildcard) + if (subscription.TopicHasWildcard) { if (!clientSession.HasSubscribedWildcardTopics) { @@ -483,16 +482,16 @@ public void OnSubscriptionsAdded(MqttSession clientSession, List topics) } else { - if (_simpleTopicsToSession.TryGetValue(topic, out var sessionsWithSimpleTopics)) + if (_simpleTopicsToSession.TryGetValue(subscription.Topic, out var sessionsWithSimpleTopics)) { sessionsWithSimpleTopics.Add(clientSession); } else { - _simpleTopicsToSession[topic] = [clientSession]; + _simpleTopicsToSession[subscription.Topic] = [clientSession]; } } - clientSession.AddSubscribedTopic(topic, hasWildcard); + clientSession.AddSubscribedTopic(subscription.Topic, subscription.TopicHasWildcard); } } finally @@ -714,7 +713,7 @@ MqttSession GetClientSession(string clientId) void CleanupClientSessionUnsafe(MqttSession session) { _subscriberSessionsWithWildcards.Remove(session); - foreach (var simpleTopic in session.GetSimpleSubscribedTopics) + foreach (var simpleTopic in session.SubscribedSimpleTopics) { if (_simpleTopicsToSession.TryGetValue(simpleTopic, out var sessionsWithSimpleTopics)) { diff --git a/Source/MQTTnet.Server/Internal/MqttClientSubscriptionsManager.cs b/Source/MQTTnet.Server/Internal/MqttClientSubscriptionsManager.cs index 7c1023e52..aa6affe1a 100644 --- a/Source/MQTTnet.Server/Internal/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet.Server/Internal/MqttClientSubscriptionsManager.cs @@ -166,7 +166,7 @@ public async Task Subscribe(MqttSubscribePacket subscribePacket var retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(false); var result = new SubscribeResult(subscribePacket.TopicFilters.Count); - var addedSubscriptions = new List(); + var addedSubscriptions = new List(); var finalTopicFilters = new List(); // The topic filters are order by its QoS so that the higher QoS will win over a @@ -195,7 +195,7 @@ public async Task Subscribe(MqttSubscribePacket subscribePacket var createSubscriptionResult = CreateSubscription(topicFilter, subscribePacket.SubscriptionIdentifier, interceptorEventArgs.Response.ReasonCode); - addedSubscriptions.Add(topicFilter.Topic); + addedSubscriptions.Add(createSubscriptionResult.Subscription); finalTopicFilters.Add(topicFilter); FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result); diff --git a/Source/MQTTnet.Server/Internal/MqttSession.cs b/Source/MQTTnet.Server/Internal/MqttSession.cs index 0d7ffca98..78a14ea9f 100644 --- a/Source/MQTTnet.Server/Internal/MqttSession.cs +++ b/Source/MQTTnet.Server/Internal/MqttSession.cs @@ -52,7 +52,7 @@ public MqttSession( public bool HasSubscribedWildcardTopics => _subscribedWildcardTopics.Count > 0; - public HashSet GetSimpleSubscribedTopics => _subscribedSimpleTopics; + public HashSet SubscribedSimpleTopics => _subscribedSimpleTopics; public string Id => _connectPacket.ClientId; diff --git a/Source/MQTTnet.Tests/TopicFilterComparer_Tests.cs b/Source/MQTTnet.Tests/TopicFilterComparer_Tests.cs index f08641ac1..ee22b89c1 100644 --- a/Source/MQTTnet.Tests/TopicFilterComparer_Tests.cs +++ b/Source/MQTTnet.Tests/TopicFilterComparer_Tests.cs @@ -144,25 +144,7 @@ public void SingleLevel_Finance() CompareAndAssert("/finance", "+/+", MqttTopicFilterCompareResult.IsMatch); CompareAndAssert("/finance", "/+", MqttTopicFilterCompareResult.IsMatch); CompareAndAssert("/finance", "+", MqttTopicFilterCompareResult.NoMatch); - } - - [TestMethod] - public void CotainsWildcard_No() - { - Assert.IsFalse(MqttTopicFilterComparer.ContainsWildcards("A/B/C/D")); - } - - [TestMethod] - public void CotainsWildcard_MultiLevel() - { - Assert.IsTrue(MqttTopicFilterComparer.ContainsWildcards("A/B/C/#")); - } - - [TestMethod] - public void CotainsWildcard_SingleLevel() - { - Assert.IsTrue(MqttTopicFilterComparer.ContainsWildcards("A/B/+/D")); - } + } static void CompareAndAssert(string topic, string filter, MqttTopicFilterCompareResult expectedResult) { diff --git a/Source/MQTTnet/MqttTopicFilterComparer.cs b/Source/MQTTnet/MqttTopicFilterComparer.cs index 55a924486..72285a82f 100644 --- a/Source/MQTTnet/MqttTopicFilterComparer.cs +++ b/Source/MQTTnet/MqttTopicFilterComparer.cs @@ -174,15 +174,5 @@ public static unsafe MqttTopicFilterCompareResult Compare(string topic, string f return MqttTopicFilterCompareResult.NoMatch; } - - public static bool ContainsWildcards(string topicFilter) - { - if (topicFilter[topicFilter.Length - 1] == MultiLevelWildcard) - { - return true; - } - - return topicFilter.Contains(SingleLevelWildcard); - } } } \ No newline at end of file From bb90c92bd3d665cb24f389f173097dd4147d1871 Mon Sep 17 00:00:00 2001 From: AlexNik4 Date: Tue, 15 Apr 2025 23:02:58 -0400 Subject: [PATCH 8/9] Whitespace fix --- Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs | 2 +- Source/MQTTnet.Tests/TopicFilterComparer_Tests.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs index a330569d9..9da8b0269 100644 --- a/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs @@ -170,7 +170,7 @@ public async Task DispatchApplicationMessage( { // Create the initial subscriberSessions from whichever set is larger to take advantage // of the internal ConstructFrom other HashSet optimizations - if(sessionsWithSimpleTopics.Count > _subscriberSessionsWithWildcards.Count) + if (sessionsWithSimpleTopics.Count > _subscriberSessionsWithWildcards.Count) { subscriberSessions = new HashSet(sessionsWithSimpleTopics); subscriberSessions.UnionWith(_subscriberSessionsWithWildcards); diff --git a/Source/MQTTnet.Tests/TopicFilterComparer_Tests.cs b/Source/MQTTnet.Tests/TopicFilterComparer_Tests.cs index ee22b89c1..7615cfeff 100644 --- a/Source/MQTTnet.Tests/TopicFilterComparer_Tests.cs +++ b/Source/MQTTnet.Tests/TopicFilterComparer_Tests.cs @@ -144,7 +144,7 @@ public void SingleLevel_Finance() CompareAndAssert("/finance", "+/+", MqttTopicFilterCompareResult.IsMatch); CompareAndAssert("/finance", "/+", MqttTopicFilterCompareResult.IsMatch); CompareAndAssert("/finance", "+", MqttTopicFilterCompareResult.NoMatch); - } + } static void CompareAndAssert(string topic, string filter, MqttTopicFilterCompareResult expectedResult) { From 0af81cd3fe4dfce39773dbd74ef18faa27b4c643 Mon Sep 17 00:00:00 2001 From: AlexNik4 Date: Sat, 19 Apr 2025 23:20:30 -0400 Subject: [PATCH 9/9] Improving variable names --- .../Internal/MqttClientSessionsManager.cs | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs index 9da8b0269..dfef366b9 100644 --- a/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet.Server/Internal/MqttClientSessionsManager.cs @@ -31,7 +31,7 @@ public sealed class MqttClientSessionsManager : ISubscriptionChangedNotification // See the MqttSubscription object for a detailed explanation. readonly MqttSessionsStorage _sessionsStorage = new(); readonly HashSet _subscriberSessionsWithWildcards = []; - readonly Dictionary> _simpleTopicsToSession = []; + readonly Dictionary> _simpleTopicToSessions = []; public MqttClientSessionsManager(MqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, MqttServerEventContainer eventContainer, IMqttNetLogger logger) { @@ -166,19 +166,19 @@ public async Task DispatchApplicationMessage( _sessionsManagementLock.EnterReadLock(); try { - if (_simpleTopicsToSession.TryGetValue(applicationMessage.Topic, out var sessionsWithSimpleTopics)) + if (_simpleTopicToSessions.TryGetValue(applicationMessage.Topic, out var matchedSimpleTopicSessions)) { // Create the initial subscriberSessions from whichever set is larger to take advantage // of the internal ConstructFrom other HashSet optimizations - if (sessionsWithSimpleTopics.Count > _subscriberSessionsWithWildcards.Count) + if (matchedSimpleTopicSessions.Count > _subscriberSessionsWithWildcards.Count) { - subscriberSessions = new HashSet(sessionsWithSimpleTopics); + subscriberSessions = new HashSet(matchedSimpleTopicSessions); subscriberSessions.UnionWith(_subscriberSessionsWithWildcards); } else { subscriberSessions = new HashSet(_subscriberSessionsWithWildcards); - subscriberSessions.UnionWith(sessionsWithSimpleTopics); + subscriberSessions.UnionWith(matchedSimpleTopicSessions); } } else @@ -482,13 +482,13 @@ public void OnSubscriptionsAdded(MqttSession clientSession, List subsc { foreach (var subscriptionTopic in subscriptionTopics) { - if (_simpleTopicsToSession.TryGetValue(subscriptionTopic, out var sessionsWithSimpleTopics)) + if (_simpleTopicToSessions.TryGetValue(subscriptionTopic, out var simpleTopicSessions)) { - sessionsWithSimpleTopics.Remove(clientSession); - if (sessionsWithSimpleTopics.Count == 0) + simpleTopicSessions.Remove(clientSession); + if (simpleTopicSessions.Count == 0) { - _simpleTopicsToSession.Remove(subscriptionTopic); + _simpleTopicToSessions.Remove(subscriptionTopic); } } clientSession.RemoveSubscribedTopic(subscriptionTopic); @@ -715,12 +715,12 @@ void CleanupClientSessionUnsafe(MqttSession session) _subscriberSessionsWithWildcards.Remove(session); foreach (var simpleTopic in session.SubscribedSimpleTopics) { - if (_simpleTopicsToSession.TryGetValue(simpleTopic, out var sessionsWithSimpleTopics)) + if (_simpleTopicToSessions.TryGetValue(simpleTopic, out var simpleTopicSessions)) { - sessionsWithSimpleTopics.Remove(session); - if (sessionsWithSimpleTopics.Count == 0) + simpleTopicSessions.Remove(session); + if (simpleTopicSessions.Count == 0) { - _simpleTopicsToSession.Remove(simpleTopic); + _simpleTopicToSessions.Remove(simpleTopic); } } }