Skip to content

Commit

Permalink
Handle unobserved tasks exceptions (#1871)
Browse files Browse the repository at this point in the history
* Access exception so that it will not treated as unhandled

* Refactor code

* Improve unit tests

* Fix null reference exception

* Update ReleaseNotes.md
  • Loading branch information
chkr1011 authored Nov 4, 2023
1 parent 2599b3d commit ec637f7
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 206 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
* [Server] Fixed not working _UpdateRetainedMessageAsync_ public api (#1858, thanks to @kimdiego2098).
* [Client] Added support for custom CA chain validation (#1851, thanks to @rido-min).
* [Client] Added support for custom CA chain validation (#1851, thanks to @rido-min).
* [Client] Fixed handling of unobserved tasks exceptions (#1871).
1 change: 1 addition & 0 deletions MQTTnet.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ See the LICENSE file in the project root for more information.</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002ECSharpPlaceAttributeOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=catched/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=CONNACK/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=PINGREQ/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=PINGRESP/@EntryIndexedValue">True</s:Boolean>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,99 +22,6 @@ namespace MQTTnet.Tests.Clients.ManagedMqttClient
[TestClass]
public sealed class ManagedMqttClient_Tests : BaseTestClass
{
[TestMethod]
public async Task Expose_Custom_Connection_Error()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = await testEnvironment.StartServer();

server.ValidatingConnectionAsync += args =>
{
args.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return CompletedTask.Instance;
};

var managedClient = testEnvironment.Factory.CreateManagedMqttClient();

MqttClientDisconnectedEventArgs disconnectedArgs = null;
managedClient.DisconnectedAsync += args =>
{
disconnectedArgs = args;
return CompletedTask.Instance;
};

var clientOptions = testEnvironment.Factory.CreateManagedMqttClientOptionsBuilder().WithClientOptions(testEnvironment.CreateDefaultClientOptions()).Build();
await managedClient.StartAsync(clientOptions);

await LongTestDelay();

Assert.IsNotNull(disconnectedArgs);
Assert.AreEqual(MqttClientConnectResultCode.BadUserNameOrPassword, disconnectedArgs.ConnectResult.ResultCode);
}
}

[TestMethod]
public async Task Receive_While_Not_Cleanly_Disconnected()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
{
await testEnvironment.StartServer(o => o.WithPersistentSessions());

var senderClient = await testEnvironment.ConnectClient();

// Prepare managed client.
var managedClient = testEnvironment.Factory.CreateManagedMqttClient();
await managedClient.SubscribeAsync("#");
var receivedMessages = testEnvironment.CreateApplicationMessageHandler(managedClient);

var managedClientOptions = new ManagedMqttClientOptions
{
ClientOptions = testEnvironment.Factory.CreateClientOptionsBuilder()
.WithTcpServer("127.0.0.1", testEnvironment.ServerPort)
.WithClientId(nameof(Receive_While_Not_Cleanly_Disconnected) + "_managed")
.WithCleanSession(false)
.Build()
};

await managedClient.StartAsync(managedClientOptions);
await LongTestDelay();
await LongTestDelay();

// Send test data.
await senderClient.PublishStringAsync("topic1");
await LongTestDelay();
await LongTestDelay();

receivedMessages.AssertReceivedCountEquals(1);

// Stop the managed client but keep session at server (not clean disconnect required).
await managedClient.StopAsync(false);
await LongTestDelay();

// Send new messages in the meantime.
await senderClient.PublishStringAsync("topic2", qualityOfServiceLevel: MqttQualityOfServiceLevel.ExactlyOnce);
await LongTestDelay();

// Start the managed client, it should receive the new message.
await managedClient.StartAsync(managedClientOptions);
await LongTestDelay();

receivedMessages.AssertReceivedCountEquals(2);

// Stop and start again, no new message should be received.
for (var i = 0; i < 3; i++)
{
await managedClient.StopAsync(false);
await LongTestDelay();
await managedClient.StartAsync(managedClientOptions);
await LongTestDelay();
}

receivedMessages.AssertReceivedCountEquals(2);
}
}

[TestMethod]
public async Task Connect_To_Invalid_Server()
{
Expand Down Expand Up @@ -181,6 +88,38 @@ public async Task Drop_New_Messages_On_Full_Queue()
}
}

[TestMethod]
public async Task Expose_Custom_Connection_Error()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = await testEnvironment.StartServer();

server.ValidatingConnectionAsync += args =>
{
args.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return CompletedTask.Instance;
};

var managedClient = testEnvironment.Factory.CreateManagedMqttClient();

MqttClientDisconnectedEventArgs disconnectedArgs = null;
managedClient.DisconnectedAsync += args =>
{
disconnectedArgs = args;
return CompletedTask.Instance;
};

var clientOptions = testEnvironment.Factory.CreateManagedMqttClientOptionsBuilder().WithClientOptions(testEnvironment.CreateDefaultClientOptions()).Build();
await managedClient.StartAsync(clientOptions);

await LongTestDelay();

Assert.IsNotNull(disconnectedArgs);
Assert.AreEqual(MqttClientConnectResultCode.BadUserNameOrPassword, disconnectedArgs.ConnectResult.ResultCode);
}
}

[TestMethod]
public async Task ManagedClients_Will_Message_Send()
{
Expand Down Expand Up @@ -224,6 +163,67 @@ public async Task ManagedClients_Will_Message_Send()
}
}

[TestMethod]
public async Task Receive_While_Not_Cleanly_Disconnected()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
{
await testEnvironment.StartServer(o => o.WithPersistentSessions());

var senderClient = await testEnvironment.ConnectClient();

// Prepare managed client.
var managedClient = testEnvironment.Factory.CreateManagedMqttClient();
await managedClient.SubscribeAsync("#");
var receivedMessages = testEnvironment.CreateApplicationMessageHandler(managedClient);

var managedClientOptions = new ManagedMqttClientOptions
{
ClientOptions = testEnvironment.Factory.CreateClientOptionsBuilder()
.WithTcpServer("127.0.0.1", testEnvironment.ServerPort)
.WithClientId(nameof(Receive_While_Not_Cleanly_Disconnected) + "_managed")
.WithCleanSession(false)
.Build()
};

await managedClient.StartAsync(managedClientOptions);
await LongTestDelay();
await LongTestDelay();

// Send test data.
await senderClient.PublishStringAsync("topic1");
await LongTestDelay();
await LongTestDelay();

receivedMessages.AssertReceivedCountEquals(1);

// Stop the managed client but keep session at server (not clean disconnect required).
await managedClient.StopAsync(false);
await LongTestDelay();

// Send new messages in the meantime.
await senderClient.PublishStringAsync("topic2", qualityOfServiceLevel: MqttQualityOfServiceLevel.ExactlyOnce);
await LongTestDelay();

// Start the managed client, it should receive the new message.
await managedClient.StartAsync(managedClientOptions);
await LongTestDelay();

receivedMessages.AssertReceivedCountEquals(2);

// Stop and start again, no new message should be received.
for (var i = 0; i < 3; i++)
{
await managedClient.StopAsync(false);
await LongTestDelay();
await managedClient.StartAsync(managedClientOptions);
await LongTestDelay();
}

receivedMessages.AssertReceivedCountEquals(2);
}
}

[TestMethod]
public async Task Start_Stop()
{
Expand Down Expand Up @@ -375,7 +375,7 @@ public async Task Subscriptions_Are_Cleared_At_Logout()
var clientOptions = new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort);

var receivedManagedMessages = new List<MqttApplicationMessage>();

var managedClient = testEnvironment.Factory.CreateManagedMqttClient(testEnvironment.CreateClient());
managedClient.ApplicationMessageReceivedAsync += e =>
{
Expand Down Expand Up @@ -403,7 +403,7 @@ public async Task Subscriptions_Are_Cleared_At_Logout()
// Make sure that it gets received after subscribing again.
await managedClient.SubscribeAsync("topic");
await LongTestDelay();

Assert.AreEqual(2, receivedManagedMessages.Count);
}
}
Expand All @@ -421,7 +421,7 @@ public async Task Subscriptions_Are_Published_Immediately()
var receivingClient = await CreateManagedClientAsync(testEnvironment, null, connectionCheckInterval);
var sendingClient = await testEnvironment.ConnectClient();

await sendingClient.PublishAsync(new MqttApplicationMessage { Topic = "topic", PayloadSegment = new ArraySegment<byte>( new byte[] { 1 }), Retain = true });
await sendingClient.PublishAsync(new MqttApplicationMessage { Topic = "topic", PayloadSegment = new ArraySegment<byte>(new byte[] { 1 }), Retain = true });

var subscribeTime = DateTime.UtcNow;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,38 @@ public async Task Disconnect_Clean_With_User_Properties()
}
}

[TestMethod]
public async Task No_Unobserved_Exception()
{
using (var testEnvironment = CreateTestEnvironment())
{
testEnvironment.IgnoreClientLogErrors = true;

var client = testEnvironment.CreateClient();
var options = new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").WithTimeout(TimeSpan.FromSeconds(2)).Build();

try
{
using (var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(0.5)))
{
await client.ConnectAsync(options, timeout.Token);
}
}
catch (OperationCanceledException)
{
}

client.Dispose();

// These delays and GC calls are required in order to make calling the finalizer reproducible.
GC.Collect();
GC.WaitForPendingFinalizers();
await LongTestDelay();
await LongTestDelay();
await LongTestDelay();
}
}

[TestMethod]
public async Task Return_Non_Success()
{
Expand Down
4 changes: 2 additions & 2 deletions Source/MQTTnet.Tests/Extensions/Rpc_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public async Task Execute_Success_Parameters_Propagated_Correctly()
var paramValue = "123";
var parameters = new Dictionary<string, object>
{
{ TestParametersTopicGenerationStrategy.ExpectedParamName, "123" },
{ TestParametersTopicGenerationStrategy.ExpectedParamName, "123" }
};

using (var testEnvironment = CreateTestEnvironment())
Expand Down Expand Up @@ -164,7 +164,7 @@ public async Task Execute_Timeout_MQTT_V5_Mixed_Clients()

using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build()))
{
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);
await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);
}
}
}
Expand Down
Loading

0 comments on commit ec637f7

Please sign in to comment.