Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added AllowPacketFragmentationSelector option. #2124

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ public bool IsSecureConnection
}
}

public bool IsWebSocketConnection
{
get
{
var httpFeature = _connection.Features.Get<IHttpContextFeature>();
return httpFeature != null && httpFeature.HttpContext != null && httpFeature.HttpContext.WebSockets.IsWebSocketRequest;
}
}

public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }

public async Task ConnectAsync(CancellationToken cancellationToken)
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet.Benchmarks/SerializerBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public BenchmarkMqttChannel(ArraySegment<byte> buffer)

public X509Certificate2 ClientCertificate { get; }

public bool IsWebSocketConnection => false;

public void Reset()
{
_position = _buffer.Offset;
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet.Server/Events/ValidatingConnectionEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public ValidatingConnectionEventArgs(MqttConnectPacket connectPacket, IMqttChann

public bool IsSecureConnection => ChannelAdapter.IsSecureConnection;

public bool IsWebSocketConnection => ChannelAdapter.IsWebSocketConnection;

/// <summary>
/// Gets or sets the keep alive period.
/// The connection is normally left open by the client so that is can send and receive data at any time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,14 @@ await sslStream.AuthenticateAsServerAsync(

using (var clientAdapter = new MqttChannelAdapter(tcpChannel, packetFormatterAdapter, _rootLogger))
{
clientAdapter.AllowPacketFragmentation = _options.AllowPacketFragmentation;
if (_options.AllowPacketFragmentationSelector == null)
{
clientAdapter.AllowPacketFragmentation = _options.AllowPacketFragmentation;
}
else
{
clientAdapter.AllowPacketFragmentation = _options.AllowPacketFragmentationSelector(clientAdapter);
}
await clientHandler(clientAdapter).ConfigureAwait(false);
}
}
Expand Down
8 changes: 8 additions & 0 deletions Source/MQTTnet.Server/Options/MqttServerOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Security.Authentication;
using MQTTnet.Certificates;
using System.Security.Cryptography.X509Certificates;
using MQTTnet.Adapter;

// ReSharper disable UnusedMember.Global
namespace MQTTnet.Server
Expand Down Expand Up @@ -136,6 +137,13 @@ public MqttServerOptionsBuilder WithoutPacketFragmentation()
{
_options.DefaultEndpointOptions.AllowPacketFragmentation = false;
_options.TlsEndpointOptions.AllowPacketFragmentation = false;
return WithPacketFragmentationSelector(null);
}

public MqttServerOptionsBuilder WithPacketFragmentationSelector(Func<IMqttChannelAdapter, bool> selector)
{
_options.DefaultEndpointOptions.AllowPacketFragmentationSelector = selector;
_options.TlsEndpointOptions.AllowPacketFragmentationSelector = selector;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using MQTTnet.Adapter;
using System.Net;
using System.Net.Sockets;

Expand Down Expand Up @@ -31,6 +32,12 @@ public abstract class MqttServerTcpEndpointBaseOptions
/// </summary>
public bool AllowPacketFragmentation { get; set; } = true;

/// <summary>
/// Select whether to AllowPacketFragmentation for an <see cref="IMqttChannelAdapter"/>.
/// Its priority is higher than the <see cref="AllowPacketFragmentation"/>.
/// </summary>
public Func<IMqttChannelAdapter, bool> AllowPacketFragmentationSelector { get; set; }

/// <summary>
/// Gets or sets the TCP keep alive interval.
/// The value _null_ indicates that the OS and framework defaults should be used.
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet.Tests/Mockups/MemoryMqttChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public MemoryMqttChannel(byte[] buffer)

public X509Certificate2 ClientCertificate { get; }

public bool IsWebSocketConnection => false;

public Task ConnectAsync(CancellationToken cancellationToken)
{
return CompletedTask.Instance;
Expand Down
12 changes: 12 additions & 0 deletions Source/MQTTnet.Tests/Server/Events_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ public async Task Fire_Client_Connected_Event()
return CompletedTask.Instance;
};

ValidatingConnectionEventArgs validatingEventArgs = null;
server.ValidatingConnectionAsync += e =>
{
validatingEventArgs = e;
return CompletedTask.Instance;
};


await testEnvironment.ConnectClient(o => o.WithCredentials("TheUser"));

await LongTestDelay();
Expand All @@ -39,6 +47,10 @@ public async Task Fire_Client_Connected_Event()
Assert.IsTrue(eventArgs.RemoteEndPoint.ToString().Contains("127.0.0.1"));
Assert.AreEqual(MqttProtocolVersion.V311, eventArgs.ProtocolVersion);
Assert.AreEqual("TheUser", eventArgs.UserName);

Assert.AreEqual("TheUser", validatingEventArgs.UserName);
Assert.IsFalse(validatingEventArgs.IsSecureConnection);
Assert.IsFalse(validatingEventArgs.IsWebSocketConnection);
}
}

Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet/Adapter/IMqttChannelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public interface IMqttChannelAdapter : IDisposable

bool IsSecureConnection { get; }

bool IsWebSocketConnection { get; }

MqttPacketFormatterAdapter PacketFormatterAdapter { get; }

Task ConnectAsync(CancellationToken cancellationToken);
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet/Adapter/MqttChannelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public MqttChannelAdapter(IMqttChannel channel, MqttPacketFormatterAdapter packe

public bool IsSecureConnection => _channel.IsSecureConnection;

public bool IsWebSocketConnection => _channel.IsWebSocketConnection;

public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }

public MqttPacketInspector PacketInspector { get; set; }
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet/Channel/IMqttChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public interface IMqttChannel : IDisposable

bool IsSecureConnection { get; }

bool IsWebSocketConnection { get; }

Task ConnectAsync(CancellationToken cancellationToken);

Task DisconnectAsync(CancellationToken cancellationToken);
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet/Implementations/MqttTcpChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public MqttTcpChannel(Stream stream, EndPoint remoteEndPoint, X509Certificate2 c

public bool IsSecureConnection { get; }

public bool IsWebSocketConnection => false;

public async Task ConnectAsync(CancellationToken cancellationToken)
{
CrossPlatformSocket socket = null;
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet/Implementations/MqttWebSocketChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public MqttWebSocketChannel(WebSocket webSocket, EndPoint remoteEndPoint, bool i

public bool IsSecureConnection { get; private set; }

public bool IsWebSocketConnection => true;

public async Task ConnectAsync(CancellationToken cancellationToken)
{
var uri = _options.Uri;
Expand Down
Loading