From 7b5283758ae9e5936aada1e437ef0a78a0335dbd Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 24 Jul 2023 16:31:07 +0100 Subject: [PATCH] Hunting test flappers (#98) --- .../RequestReplyTest.cs | 235 +++++++++++------- .../SubscriptionTest.cs | 33 ++- 2 files changed, 172 insertions(+), 96 deletions(-) diff --git a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs index c63fee2f3..8cba50fcb 100644 --- a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs +++ b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs @@ -62,121 +62,190 @@ await Assert.ThrowsAsync(async () => [Fact] public async Task Request_reply_many_test() { + const int msgs = 10; + await using var server = NatsServer.Start(); await using var nats = server.CreateClientConnection(); var sub = await nats.SubscribeAsync("foo"); var reg = sub.Register(async msg => { - // Start-up timeout - if (msg.Data == 2) + for (var i = 0; i < msgs; i++) { - await Task.Delay(2_000); - await msg.ReplyAsync(msg.Data * 2); + await msg.ReplyAsync(msg.Data * i); } - // Idle timeout - else if (msg.Data == 3) - { - await msg.ReplyAsync(msg.Data * 2); - await Task.Delay(100); - await msg.ReplyAsync(msg.Data * 3); - await Task.Delay(5_000); - await msg.ReplyAsync(msg.Data * 4); - } + await msg.ReplyAsync(null); // stop iteration with a sentinel + }); - // Overall timeout - else if (msg.Data == 4) - { - await msg.ReplyAsync(msg.Data * 2); - await msg.ReplyAsync(msg.Data * 3); - } + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + const int data = 2; + var results = Enumerable.Range(0, msgs).Select(x => x * data).ToArray(); + var count = 0; + await foreach (var msg in nats.RequestManyAsync("foo", data, cancellationToken: cts.Token)) + { + Assert.Equal(results[count++], msg.Data); + } - // Sentinel - else - { - await msg.ReplyAsync(msg.Data * 2); - await msg.ReplyAsync(msg.Data * 3); - await msg.ReplyAsync(msg.Data * 4); - await msg.ReplyAsync(null); // sentinel - } - }); + Assert.Equal(results.Length, count); + + await sub.DisposeAsync(); + await reg; + } + + [Fact] + public async Task Request_reply_many_test_overall_timeout() + { + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(); - // Sentinel + var sub = await nats.SubscribeAsync("foo"); + var reg = sub.Register(async msg => { - var results = new[] { 2, 3, 4 }; - var count = 0; - await foreach (var msg in nats.RequestManyAsync("foo", 1)) - { - Assert.Equal(results[count++], msg.Data); - } + await msg.ReplyAsync(msg.Data * 2); + await msg.ReplyAsync(msg.Data * 3); + }); - Assert.Equal(3, count); + var results = new[] { 8, 12 }; + var count = 0; + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var opts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(4) }; + await using var rep = + await nats.RequestSubAsync("foo", 4, replyOpts: opts, cancellationToken: cts.Token); + await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token)) + { + Assert.Equal(results[count++], msg.Data); } - // Max Count + Assert.Equal(2, count); + Assert.Equal(NatsSubEndReason.Timeout, rep.EndReason); + + await sub.DisposeAsync(); + await reg; + } + + [Fact] + public async Task Request_reply_many_test_idle_timeout() + { + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(); + + var sub = await nats.SubscribeAsync("foo"); + var reg = sub.Register(async msg => { - var results = new[] { 2, 3, 4 }; - var count = 0; - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var opts = new NatsSubOpts { MaxMsgs = 2 }; - await using var rep = await nats.RequestSubAsync("foo", 1, replyOpts: opts); - await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token)) - { - Assert.Equal(results[count++], msg.Data); - } + await msg.ReplyAsync(msg.Data * 2); + await Task.Delay(100); + await msg.ReplyAsync(msg.Data * 3); + await Task.Delay(5_000); + await msg.ReplyAsync(msg.Data * 4); + }); - Assert.Equal(2, count); - Assert.Equal(NatsSubEndReason.MaxMsgs, rep.EndReason); + var results = new[] { 6, 9 }; + var count = 0; + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var opts = new NatsSubOpts { IdleTimeout = TimeSpan.FromSeconds(4) }; + await using var rep = + await nats.RequestSubAsync("foo", 3, replyOpts: opts, cancellationToken: cts.Token); + await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token)) + { + Assert.Equal(results[count++], msg.Data); } - // Start-up Timeout + Assert.Equal(2, count); + Assert.Equal(NatsSubEndReason.IdleTimeout, rep.EndReason); + + await sub.DisposeAsync(); + await reg; + } + + [Fact] + public async Task Request_reply_many_test_start_up_timeout() + { + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(); + + var sub = await nats.SubscribeAsync("foo"); + var reg = sub.Register(async msg => { - var count = 0; - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var opts = new NatsSubOpts { StartUpTimeout = TimeSpan.FromSeconds(1) }; - await using var rep = await nats.RequestSubAsync("foo", 2, replyOpts: opts); - await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token)) - { - count++; - } + await Task.Delay(2_000); + await msg.ReplyAsync(msg.Data * 2); + }); - Assert.Equal(0, count); - Assert.Equal(NatsSubEndReason.StartUpTimeout, rep.EndReason); + var count = 0; + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var opts = new NatsSubOpts { StartUpTimeout = TimeSpan.FromSeconds(1) }; + await using var rep = + await nats.RequestSubAsync("foo", 2, replyOpts: opts, cancellationToken: cts.Token); + await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token)) + { + count++; } - // Idle Timeout + Assert.Equal(0, count); + Assert.Equal(NatsSubEndReason.StartUpTimeout, rep.EndReason); + + await sub.DisposeAsync(); + await reg; + } + + [Fact] + public async Task Request_reply_many_test_max_count() + { + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(); + + var sub = await nats.SubscribeAsync("foo"); + var reg = sub.Register(async msg => { - var results = new[] { 6, 9 }; - var count = 0; - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var opts = new NatsSubOpts { IdleTimeout = TimeSpan.FromSeconds(4) }; - await using var rep = await nats.RequestSubAsync("foo", 3, replyOpts: opts); - await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token)) - { - Assert.Equal(results[count++], msg.Data); - } + await msg.ReplyAsync(msg.Data * 2); + await msg.ReplyAsync(msg.Data * 3); + await msg.ReplyAsync(msg.Data * 4); + await msg.ReplyAsync(null); // sentinel + }); - Assert.Equal(2, count); - Assert.Equal(NatsSubEndReason.IdleTimeout, rep.EndReason); + var results = new[] { 2, 3, 4 }; + var count = 0; + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var opts = new NatsSubOpts { MaxMsgs = 2 }; + await using var rep = + await nats.RequestSubAsync("foo", 1, replyOpts: opts, cancellationToken: cts.Token); + await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token)) + { + Assert.Equal(results[count++], msg.Data); } - // Overall Timeout + Assert.Equal(2, count); + Assert.Equal(NatsSubEndReason.MaxMsgs, rep.EndReason); + + await sub.DisposeAsync(); + await reg; + } + + [Fact] + public async Task Request_reply_many_test_sentinel() + { + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(); + + var sub = await nats.SubscribeAsync("foo"); + var reg = sub.Register(async msg => { - var results = new[] { 8, 12 }; - var count = 0; - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var opts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(4) }; - await using var rep = await nats.RequestSubAsync("foo", 4, replyOpts: opts); - await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token)) - { - Assert.Equal(results[count++], msg.Data); - } + await msg.ReplyAsync(msg.Data * 2); + await msg.ReplyAsync(msg.Data * 3); + await msg.ReplyAsync(msg.Data * 4); + await msg.ReplyAsync(null); // sentinel + }); - Assert.Equal(2, count); - Assert.Equal(NatsSubEndReason.Timeout, rep.EndReason); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var results = new[] { 2, 3, 4 }; + var count = 0; + await foreach (var msg in nats.RequestManyAsync("foo", 1, cancellationToken: cts.Token)) + { + Assert.Equal(results[count++], msg.Data); } + Assert.Equal(3, count); + await sub.DisposeAsync(); await reg; } diff --git a/tests/NATS.Client.Core.Tests/SubscriptionTest.cs b/tests/NATS.Client.Core.Tests/SubscriptionTest.cs index f1a451828..bbababd89 100644 --- a/tests/NATS.Client.Core.Tests/SubscriptionTest.cs +++ b/tests/NATS.Client.Core.Tests/SubscriptionTest.cs @@ -9,7 +9,11 @@ public class SubscriptionTest [Fact] public async Task Subscription_periodic_cleanup_test() { - await using var server = NatsServer.Start(_output); + var serverOptions = new NatsServerOptionsBuilder() + .Trace() + .UseTransport(TransportType.Tcp) + .Build(); + await using var server = NatsServer.Start(_output, serverOptions); var options = NatsOptions.Default with { SubscriptionCleanUpInterval = TimeSpan.FromSeconds(1) }; var (nats, proxy) = server.CreateProxiedClientConnection(options); @@ -18,8 +22,10 @@ async Task Isolator() var sub = await nats.SubscribeAsync("foo"); await Retry.Until( - "unsubscribed", - () => proxy.ClientFrames.Count(f => f.Message.StartsWith("SUB")) == 1); + reason: "unsubscribed", + condition: () => proxy.ClientFrames.Count(f => f.Message.StartsWith("SUB")) == 1, + retryDelay: TimeSpan.FromSeconds(.5), + timeout: TimeSpan.FromSeconds(20)); // subscription object will be eligible for GC after next statement Assert.Equal("foo", sub.Subject); @@ -30,14 +36,15 @@ await Retry.Until( GC.Collect(); await Retry.Until( - "unsubscribe message received", - () => proxy.ClientFrames.Count(f => f.Message.StartsWith("UNSUB")) == 1, - () => + reason: "unsubscribe message received", + condition: () => proxy.ClientFrames.Count(f => f.Message.StartsWith("UNSUB")) >= 1, + action: () => { GC.Collect(); return Task.CompletedTask; }, - retryDelay: TimeSpan.FromSeconds(.5)); + retryDelay: TimeSpan.FromSeconds(.5), + timeout: TimeSpan.FromSeconds(20)); } [Fact] @@ -65,14 +72,14 @@ async Task Isolator() // Publish should trigger UNSUB since NatsSub object should be collected by now. await Retry.Until( - "unsubscribe message received", - () => proxy.ClientFrames.Count(f => f.Message.StartsWith("UNSUB")) == 1, - async () => + reason: "unsubscribe message received", + condition: () => proxy.ClientFrames.Count(f => f.Message.StartsWith("UNSUB")) >= 1, + action: async () => { GC.Collect(); await nats.PublishAsync("foo", 1); }, - timeout: TimeSpan.FromSeconds(30), + timeout: TimeSpan.FromSeconds(20), retryDelay: TimeSpan.FromSeconds(.5)); } @@ -132,7 +139,7 @@ public async Task Auto_unsubscribe_test() // Auto unsubscribe on idle timeout { const string subject = "foo3"; - var opts = new NatsSubOpts { IdleTimeout = TimeSpan.FromSeconds(2) }; + var opts = new NatsSubOpts { IdleTimeout = TimeSpan.FromSeconds(3) }; await using var sub = await nats.SubscribeAsync(subject, opts); @@ -141,7 +148,7 @@ public async Task Auto_unsubscribe_test() await nats.PublishAsync(subject, 2); await Task.Delay(TimeSpan.FromSeconds(.1)); await nats.PublishAsync(subject, 3); - await Task.Delay(TimeSpan.FromSeconds(2.1)); + await Task.Delay(TimeSpan.FromSeconds(5)); await nats.PublishAsync(subject, 100); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));