From 841334a7d3835bee26a06f99c5b02558a478ffc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E5=9B=BD=E4=BC=9F?= <366193849@qq.com> Date: Sat, 7 Dec 2024 21:38:01 +0800 Subject: [PATCH] Added AllowPacketFragmentationSelector option. --- Source/MQTTnet.AspnetCore/MqttConnectionContext.cs | 9 +++++++++ Source/MQTTnet.Benchmarks/SerializerBenchmark.cs | 2 ++ .../Events/ValidatingConnectionEventArgs.cs | 2 ++ .../Internal/Adapter/MqttTcpServerListener.cs | 9 ++++++++- .../Options/MqttServerOptionsBuilder.cs | 8 ++++++++ .../Options/MqttServerTcpEndpointBaseOptions.cs | 7 +++++++ Source/MQTTnet.Tests/Mockups/MemoryMqttChannel.cs | 2 ++ Source/MQTTnet.Tests/Server/Events_Tests.cs | 12 ++++++++++++ Source/MQTTnet/Adapter/IMqttChannelAdapter.cs | 2 ++ Source/MQTTnet/Adapter/MqttChannelAdapter.cs | 2 ++ Source/MQTTnet/Channel/IMqttChannel.cs | 2 ++ Source/MQTTnet/Implementations/MqttTcpChannel.cs | 2 ++ .../MQTTnet/Implementations/MqttWebSocketChannel.cs | 2 ++ 13 files changed, 60 insertions(+), 1 deletion(-) diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs index 61184c4d5..24e60a31a 100644 --- a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs +++ b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs @@ -104,6 +104,15 @@ public bool IsSecureConnection } } + public bool IsWebSocketConnection + { + get + { + var httpFeature = _connection.Features.Get(); + return httpFeature != null && httpFeature.HttpContext != null && httpFeature.HttpContext.WebSockets.IsWebSocketRequest; + } + } + public MqttPacketFormatterAdapter PacketFormatterAdapter { get; } public async Task ConnectAsync(CancellationToken cancellationToken) diff --git a/Source/MQTTnet.Benchmarks/SerializerBenchmark.cs b/Source/MQTTnet.Benchmarks/SerializerBenchmark.cs index 48117232c..a93725c08 100644 --- a/Source/MQTTnet.Benchmarks/SerializerBenchmark.cs +++ b/Source/MQTTnet.Benchmarks/SerializerBenchmark.cs @@ -83,6 +83,8 @@ public BenchmarkMqttChannel(ArraySegment buffer) public X509Certificate2 ClientCertificate { get; } + public bool IsWebSocketConnection => false; + public void Reset() { _position = _buffer.Offset; diff --git a/Source/MQTTnet.Server/Events/ValidatingConnectionEventArgs.cs b/Source/MQTTnet.Server/Events/ValidatingConnectionEventArgs.cs index 325973c7e..57c7e3221 100644 --- a/Source/MQTTnet.Server/Events/ValidatingConnectionEventArgs.cs +++ b/Source/MQTTnet.Server/Events/ValidatingConnectionEventArgs.cs @@ -78,6 +78,8 @@ public ValidatingConnectionEventArgs(MqttConnectPacket connectPacket, IMqttChann public bool IsSecureConnection => ChannelAdapter.IsSecureConnection; + public bool IsWebSocketConnection => ChannelAdapter.IsWebSocketConnection; + /// /// Gets or sets the keep alive period. /// The connection is normally left open by the client so that is can send and receive data at any time. diff --git a/Source/MQTTnet.Server/Internal/Adapter/MqttTcpServerListener.cs b/Source/MQTTnet.Server/Internal/Adapter/MqttTcpServerListener.cs index 7a0453ce8..c1d84ec0e 100644 --- a/Source/MQTTnet.Server/Internal/Adapter/MqttTcpServerListener.cs +++ b/Source/MQTTnet.Server/Internal/Adapter/MqttTcpServerListener.cs @@ -218,7 +218,14 @@ await sslStream.AuthenticateAsServerAsync( using (var clientAdapter = new MqttChannelAdapter(tcpChannel, packetFormatterAdapter, _rootLogger)) { - clientAdapter.AllowPacketFragmentation = _options.AllowPacketFragmentation; + if (_options.AllowPacketFragmentationSelector == null) + { + clientAdapter.AllowPacketFragmentation = _options.AllowPacketFragmentation; + } + else + { + clientAdapter.AllowPacketFragmentation = _options.AllowPacketFragmentationSelector(clientAdapter); + } await clientHandler(clientAdapter).ConfigureAwait(false); } } diff --git a/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs b/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs index 2e86e21eb..6c68b41d7 100644 --- a/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs +++ b/Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs @@ -8,6 +8,7 @@ using System.Security.Authentication; using MQTTnet.Certificates; using System.Security.Cryptography.X509Certificates; +using MQTTnet.Adapter; // ReSharper disable UnusedMember.Global namespace MQTTnet.Server @@ -136,6 +137,13 @@ public MqttServerOptionsBuilder WithoutPacketFragmentation() { _options.DefaultEndpointOptions.AllowPacketFragmentation = false; _options.TlsEndpointOptions.AllowPacketFragmentation = false; + return WithPacketFragmentationSelector(null); + } + + public MqttServerOptionsBuilder WithPacketFragmentationSelector(Func selector) + { + _options.DefaultEndpointOptions.AllowPacketFragmentationSelector = selector; + _options.TlsEndpointOptions.AllowPacketFragmentationSelector = selector; return this; } diff --git a/Source/MQTTnet.Server/Options/MqttServerTcpEndpointBaseOptions.cs b/Source/MQTTnet.Server/Options/MqttServerTcpEndpointBaseOptions.cs index 3567050b7..f68f462bf 100644 --- a/Source/MQTTnet.Server/Options/MqttServerTcpEndpointBaseOptions.cs +++ b/Source/MQTTnet.Server/Options/MqttServerTcpEndpointBaseOptions.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using MQTTnet.Adapter; using System.Net; using System.Net.Sockets; @@ -31,6 +32,12 @@ public abstract class MqttServerTcpEndpointBaseOptions /// public bool AllowPacketFragmentation { get; set; } = true; + /// + /// Select whether to AllowPacketFragmentation for an . + /// Its priority is higher than the . + /// + public Func AllowPacketFragmentationSelector { get; set; } + /// /// Gets or sets the TCP keep alive interval. /// The value _null_ indicates that the OS and framework defaults should be used. diff --git a/Source/MQTTnet.Tests/Mockups/MemoryMqttChannel.cs b/Source/MQTTnet.Tests/Mockups/MemoryMqttChannel.cs index 20c76602b..23951e4f2 100644 --- a/Source/MQTTnet.Tests/Mockups/MemoryMqttChannel.cs +++ b/Source/MQTTnet.Tests/Mockups/MemoryMqttChannel.cs @@ -34,6 +34,8 @@ public MemoryMqttChannel(byte[] buffer) public X509Certificate2 ClientCertificate { get; } + public bool IsWebSocketConnection => false; + public Task ConnectAsync(CancellationToken cancellationToken) { return CompletedTask.Instance; diff --git a/Source/MQTTnet.Tests/Server/Events_Tests.cs b/Source/MQTTnet.Tests/Server/Events_Tests.cs index 415a2a2d2..86b322a44 100644 --- a/Source/MQTTnet.Tests/Server/Events_Tests.cs +++ b/Source/MQTTnet.Tests/Server/Events_Tests.cs @@ -29,6 +29,14 @@ public async Task Fire_Client_Connected_Event() return CompletedTask.Instance; }; + ValidatingConnectionEventArgs validatingEventArgs = null; + server.ValidatingConnectionAsync += e => + { + validatingEventArgs = e; + return CompletedTask.Instance; + }; + + await testEnvironment.ConnectClient(o => o.WithCredentials("TheUser")); await LongTestDelay(); @@ -39,6 +47,10 @@ public async Task Fire_Client_Connected_Event() Assert.IsTrue(eventArgs.RemoteEndPoint.ToString().Contains("127.0.0.1")); Assert.AreEqual(MqttProtocolVersion.V311, eventArgs.ProtocolVersion); Assert.AreEqual("TheUser", eventArgs.UserName); + + Assert.AreEqual("TheUser", validatingEventArgs.UserName); + Assert.IsFalse(validatingEventArgs.IsSecureConnection); + Assert.IsFalse(validatingEventArgs.IsWebSocketConnection); } } diff --git a/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs b/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs index 9337354a0..c40d10edc 100644 --- a/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/IMqttChannelAdapter.cs @@ -24,6 +24,8 @@ public interface IMqttChannelAdapter : IDisposable bool IsSecureConnection { get; } + bool IsWebSocketConnection { get; } + MqttPacketFormatterAdapter PacketFormatterAdapter { get; } Task ConnectAsync(CancellationToken cancellationToken); diff --git a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs index 28d72bcf2..771eff5f5 100644 --- a/Source/MQTTnet/Adapter/MqttChannelAdapter.cs +++ b/Source/MQTTnet/Adapter/MqttChannelAdapter.cs @@ -56,6 +56,8 @@ public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packe public bool IsSecureConnection => _channel.IsSecureConnection; + public bool IsWebSocketConnection => _channel.IsWebSocketConnection; + public MqttPacketFormatterAdapter PacketFormatterAdapter { get; } public MqttPacketInspector PacketInspector { get; set; } diff --git a/Source/MQTTnet/Channel/IMqttChannel.cs b/Source/MQTTnet/Channel/IMqttChannel.cs index 02af43e46..ec80b2f75 100644 --- a/Source/MQTTnet/Channel/IMqttChannel.cs +++ b/Source/MQTTnet/Channel/IMqttChannel.cs @@ -18,6 +18,8 @@ public interface IMqttChannel : IDisposable bool IsSecureConnection { get; } + bool IsWebSocketConnection { get; } + Task ConnectAsync(CancellationToken cancellationToken); Task DisconnectAsync(CancellationToken cancellationToken); diff --git a/Source/MQTTnet/Implementations/MqttTcpChannel.cs b/Source/MQTTnet/Implementations/MqttTcpChannel.cs index 5ef20c1a6..5e2b341b2 100644 --- a/Source/MQTTnet/Implementations/MqttTcpChannel.cs +++ b/Source/MQTTnet/Implementations/MqttTcpChannel.cs @@ -53,6 +53,8 @@ public MqttTcpChannel(Stream stream, EndPoint remoteEndPoint, X509Certificate2 c public bool IsSecureConnection { get; } + public bool IsWebSocketConnection => false; + public async Task ConnectAsync(CancellationToken cancellationToken) { CrossPlatformSocket socket = null; diff --git a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs index 8c7e59114..c4d03596e 100644 --- a/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs +++ b/Source/MQTTnet/Implementations/MqttWebSocketChannel.cs @@ -41,6 +41,8 @@ public MqttWebSocketChannel(WebSocket webSocket, EndPoint remoteEndPoint, bool i public bool IsSecureConnection { get; private set; } + public bool IsWebSocketConnection => true; + public async Task ConnectAsync(CancellationToken cancellationToken) { var uri = _options.Uri;