Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle unobserved tasks exceptions #1871

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
* [Server] Fixed not working _UpdateRetainedMessageAsync_ public api (#1858, thanks to @kimdiego2098).
* [Client] Added support for custom CA chain validation (#1851, thanks to @rido-min).
* [Client] Added support for custom CA chain validation (#1851, thanks to @rido-min).
* [Client] Fixed handling of unobserved tasks exceptions (#1871).
1 change: 1 addition & 0 deletions MQTTnet.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ See the LICENSE file in the project root for more information.</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002ECSharpPlaceAttributeOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=catched/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=CONNACK/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=PINGREQ/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=PINGRESP/@EntryIndexedValue">True</s:Boolean>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,99 +22,6 @@ namespace MQTTnet.Tests.Clients.ManagedMqttClient
[TestClass]
public sealed class ManagedMqttClient_Tests : BaseTestClass
{
[TestMethod]
public async Task Expose_Custom_Connection_Error()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = await testEnvironment.StartServer();

server.ValidatingConnectionAsync += args =>
{
args.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return CompletedTask.Instance;
};

var managedClient = testEnvironment.Factory.CreateManagedMqttClient();

MqttClientDisconnectedEventArgs disconnectedArgs = null;
managedClient.DisconnectedAsync += args =>
{
disconnectedArgs = args;
return CompletedTask.Instance;
};

var clientOptions = testEnvironment.Factory.CreateManagedMqttClientOptionsBuilder().WithClientOptions(testEnvironment.CreateDefaultClientOptions()).Build();
await managedClient.StartAsync(clientOptions);

await LongTestDelay();

Assert.IsNotNull(disconnectedArgs);
Assert.AreEqual(MqttClientConnectResultCode.BadUserNameOrPassword, disconnectedArgs.ConnectResult.ResultCode);
}
}

[TestMethod]
public async Task Receive_While_Not_Cleanly_Disconnected()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
{
await testEnvironment.StartServer(o => o.WithPersistentSessions());

var senderClient = await testEnvironment.ConnectClient();

// Prepare managed client.
var managedClient = testEnvironment.Factory.CreateManagedMqttClient();
await managedClient.SubscribeAsync("#");
var receivedMessages = testEnvironment.CreateApplicationMessageHandler(managedClient);

var managedClientOptions = new ManagedMqttClientOptions
{
ClientOptions = testEnvironment.Factory.CreateClientOptionsBuilder()
.WithTcpServer("127.0.0.1", testEnvironment.ServerPort)
.WithClientId(nameof(Receive_While_Not_Cleanly_Disconnected) + "_managed")
.WithCleanSession(false)
.Build()
};

await managedClient.StartAsync(managedClientOptions);
await LongTestDelay();
await LongTestDelay();

// Send test data.
await senderClient.PublishStringAsync("topic1");
await LongTestDelay();
await LongTestDelay();

receivedMessages.AssertReceivedCountEquals(1);

// Stop the managed client but keep session at server (not clean disconnect required).
await managedClient.StopAsync(false);
await LongTestDelay();

// Send new messages in the meantime.
await senderClient.PublishStringAsync("topic2", qualityOfServiceLevel: MqttQualityOfServiceLevel.ExactlyOnce);
await LongTestDelay();

// Start the managed client, it should receive the new message.
await managedClient.StartAsync(managedClientOptions);
await LongTestDelay();

receivedMessages.AssertReceivedCountEquals(2);

// Stop and start again, no new message should be received.
for (var i = 0; i < 3; i++)
{
await managedClient.StopAsync(false);
await LongTestDelay();
await managedClient.StartAsync(managedClientOptions);
await LongTestDelay();
}

receivedMessages.AssertReceivedCountEquals(2);
}
}

[TestMethod]
public async Task Connect_To_Invalid_Server()
{
Expand Down Expand Up @@ -181,6 +88,38 @@ public async Task Drop_New_Messages_On_Full_Queue()
}
}

[TestMethod]
public async Task Expose_Custom_Connection_Error()
{
using (var testEnvironment = CreateTestEnvironment())
{
var server = await testEnvironment.StartServer();

server.ValidatingConnectionAsync += args =>
{
args.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return CompletedTask.Instance;
};

var managedClient = testEnvironment.Factory.CreateManagedMqttClient();

MqttClientDisconnectedEventArgs disconnectedArgs = null;
managedClient.DisconnectedAsync += args =>
{
disconnectedArgs = args;
return CompletedTask.Instance;
};

var clientOptions = testEnvironment.Factory.CreateManagedMqttClientOptionsBuilder().WithClientOptions(testEnvironment.CreateDefaultClientOptions()).Build();
await managedClient.StartAsync(clientOptions);

await LongTestDelay();

Assert.IsNotNull(disconnectedArgs);
Assert.AreEqual(MqttClientConnectResultCode.BadUserNameOrPassword, disconnectedArgs.ConnectResult.ResultCode);
}
}

[TestMethod]
public async Task ManagedClients_Will_Message_Send()
{
Expand Down Expand Up @@ -224,6 +163,67 @@ public async Task ManagedClients_Will_Message_Send()
}
}

[TestMethod]
public async Task Receive_While_Not_Cleanly_Disconnected()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
{
await testEnvironment.StartServer(o => o.WithPersistentSessions());

var senderClient = await testEnvironment.ConnectClient();

// Prepare managed client.
var managedClient = testEnvironment.Factory.CreateManagedMqttClient();
await managedClient.SubscribeAsync("#");
var receivedMessages = testEnvironment.CreateApplicationMessageHandler(managedClient);

var managedClientOptions = new ManagedMqttClientOptions
{
ClientOptions = testEnvironment.Factory.CreateClientOptionsBuilder()
.WithTcpServer("127.0.0.1", testEnvironment.ServerPort)
.WithClientId(nameof(Receive_While_Not_Cleanly_Disconnected) + "_managed")
.WithCleanSession(false)
.Build()
};

await managedClient.StartAsync(managedClientOptions);
await LongTestDelay();
await LongTestDelay();

// Send test data.
await senderClient.PublishStringAsync("topic1");
await LongTestDelay();
await LongTestDelay();

receivedMessages.AssertReceivedCountEquals(1);

// Stop the managed client but keep session at server (not clean disconnect required).
await managedClient.StopAsync(false);
await LongTestDelay();

// Send new messages in the meantime.
await senderClient.PublishStringAsync("topic2", qualityOfServiceLevel: MqttQualityOfServiceLevel.ExactlyOnce);
await LongTestDelay();

// Start the managed client, it should receive the new message.
await managedClient.StartAsync(managedClientOptions);
await LongTestDelay();

receivedMessages.AssertReceivedCountEquals(2);

// Stop and start again, no new message should be received.
for (var i = 0; i < 3; i++)
{
await managedClient.StopAsync(false);
await LongTestDelay();
await managedClient.StartAsync(managedClientOptions);
await LongTestDelay();
}

receivedMessages.AssertReceivedCountEquals(2);
}
}

[TestMethod]
public async Task Start_Stop()
{
Expand Down Expand Up @@ -375,7 +375,7 @@ public async Task Subscriptions_Are_Cleared_At_Logout()
var clientOptions = new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1", testEnvironment.ServerPort);

var receivedManagedMessages = new List<MqttApplicationMessage>();

var managedClient = testEnvironment.Factory.CreateManagedMqttClient(testEnvironment.CreateClient());
managedClient.ApplicationMessageReceivedAsync += e =>
{
Expand Down Expand Up @@ -403,7 +403,7 @@ public async Task Subscriptions_Are_Cleared_At_Logout()
// Make sure that it gets received after subscribing again.
await managedClient.SubscribeAsync("topic");
await LongTestDelay();

Assert.AreEqual(2, receivedManagedMessages.Count);
}
}
Expand All @@ -421,7 +421,7 @@ public async Task Subscriptions_Are_Published_Immediately()
var receivingClient = await CreateManagedClientAsync(testEnvironment, null, connectionCheckInterval);
var sendingClient = await testEnvironment.ConnectClient();

await sendingClient.PublishAsync(new MqttApplicationMessage { Topic = "topic", PayloadSegment = new ArraySegment<byte>( new byte[] { 1 }), Retain = true });
await sendingClient.PublishAsync(new MqttApplicationMessage { Topic = "topic", PayloadSegment = new ArraySegment<byte>(new byte[] { 1 }), Retain = true });

var subscribeTime = DateTime.UtcNow;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,38 @@ public async Task Disconnect_Clean_With_User_Properties()
}
}

[TestMethod]
public async Task No_Unobserved_Exception()
{
using (var testEnvironment = CreateTestEnvironment())
{
testEnvironment.IgnoreClientLogErrors = true;

var client = testEnvironment.CreateClient();
var options = new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").WithTimeout(TimeSpan.FromSeconds(2)).Build();

try
{
using (var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(0.5)))
{
await client.ConnectAsync(options, timeout.Token);
}
}
catch (OperationCanceledException)
{
}

client.Dispose();

// These delays and GC calls are required in order to make calling the finalizer reproducible.
GC.Collect();
GC.WaitForPendingFinalizers();
await LongTestDelay();
await LongTestDelay();
await LongTestDelay();
}
}

[TestMethod]
public async Task Return_Non_Success()
{
Expand Down
4 changes: 2 additions & 2 deletions Source/MQTTnet.Tests/Extensions/Rpc_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public async Task Execute_Success_Parameters_Propagated_Correctly()
var paramValue = "123";
var parameters = new Dictionary<string, object>
{
{ TestParametersTopicGenerationStrategy.ExpectedParamName, "123" },
{ TestParametersTopicGenerationStrategy.ExpectedParamName, "123" }
};

using (var testEnvironment = CreateTestEnvironment())
Expand Down Expand Up @@ -164,7 +164,7 @@ public async Task Execute_Timeout_MQTT_V5_Mixed_Clients()

using (var rpcClient = new MqttRpcClient(requestSender, new MqttRpcClientOptionsBuilder().Build()))
{
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);
await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);
}
}
}
Expand Down
Loading
Loading