Skip to content

Commit

Permalink
Hunting test flappers (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk authored Jul 24, 2023
1 parent c5bc03b commit 7b52837
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 96 deletions.
235 changes: 152 additions & 83 deletions tests/NATS.Client.Core.Tests/RequestReplyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,121 +62,190 @@ await Assert.ThrowsAsync<OperationCanceledException>(async () =>
[Fact]
public async Task Request_reply_many_test()
{
const int msgs = 10;

await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeAsync<int>("foo");
var reg = sub.Register(async msg =>
{
// Start-up timeout
if (msg.Data == 2)
for (var i = 0; i < msgs; i++)
{
await Task.Delay(2_000);
await msg.ReplyAsync(msg.Data * 2);
await msg.ReplyAsync(msg.Data * i);
}
// Idle timeout
else if (msg.Data == 3)
{
await msg.ReplyAsync(msg.Data * 2);
await Task.Delay(100);
await msg.ReplyAsync(msg.Data * 3);
await Task.Delay(5_000);
await msg.ReplyAsync(msg.Data * 4);
}
await msg.ReplyAsync<int?>(null); // stop iteration with a sentinel
});

// Overall timeout
else if (msg.Data == 4)
{
await msg.ReplyAsync(msg.Data * 2);
await msg.ReplyAsync(msg.Data * 3);
}
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
const int data = 2;
var results = Enumerable.Range(0, msgs).Select(x => x * data).ToArray();
var count = 0;
await foreach (var msg in nats.RequestManyAsync<int, int?>("foo", data, cancellationToken: cts.Token))
{
Assert.Equal(results[count++], msg.Data);
}

// Sentinel
else
{
await msg.ReplyAsync(msg.Data * 2);
await msg.ReplyAsync(msg.Data * 3);
await msg.ReplyAsync(msg.Data * 4);
await msg.ReplyAsync<int?>(null); // sentinel
}
});
Assert.Equal(results.Length, count);

await sub.DisposeAsync();
await reg;
}

[Fact]
public async Task Request_reply_many_test_overall_timeout()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

// Sentinel
var sub = await nats.SubscribeAsync<int>("foo");
var reg = sub.Register(async msg =>
{
var results = new[] { 2, 3, 4 };
var count = 0;
await foreach (var msg in nats.RequestManyAsync<int, int?>("foo", 1))
{
Assert.Equal(results[count++], msg.Data);
}
await msg.ReplyAsync(msg.Data * 2);
await msg.ReplyAsync(msg.Data * 3);
});

Assert.Equal(3, count);
var results = new[] { 8, 12 };
var count = 0;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var opts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(4) };
await using var rep =
await nats.RequestSubAsync<int, int>("foo", 4, replyOpts: opts, cancellationToken: cts.Token);
await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token))
{
Assert.Equal(results[count++], msg.Data);
}

// Max Count
Assert.Equal(2, count);
Assert.Equal(NatsSubEndReason.Timeout, rep.EndReason);

await sub.DisposeAsync();
await reg;
}

[Fact]
public async Task Request_reply_many_test_idle_timeout()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeAsync<int>("foo");
var reg = sub.Register(async msg =>
{
var results = new[] { 2, 3, 4 };
var count = 0;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var opts = new NatsSubOpts { MaxMsgs = 2 };
await using var rep = await nats.RequestSubAsync<int, int>("foo", 1, replyOpts: opts);
await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token))
{
Assert.Equal(results[count++], msg.Data);
}
await msg.ReplyAsync(msg.Data * 2);
await Task.Delay(100);
await msg.ReplyAsync(msg.Data * 3);
await Task.Delay(5_000);
await msg.ReplyAsync(msg.Data * 4);
});

Assert.Equal(2, count);
Assert.Equal(NatsSubEndReason.MaxMsgs, rep.EndReason);
var results = new[] { 6, 9 };
var count = 0;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var opts = new NatsSubOpts { IdleTimeout = TimeSpan.FromSeconds(4) };
await using var rep =
await nats.RequestSubAsync<int, int>("foo", 3, replyOpts: opts, cancellationToken: cts.Token);
await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token))
{
Assert.Equal(results[count++], msg.Data);
}

// Start-up Timeout
Assert.Equal(2, count);
Assert.Equal(NatsSubEndReason.IdleTimeout, rep.EndReason);

await sub.DisposeAsync();
await reg;
}

[Fact]
public async Task Request_reply_many_test_start_up_timeout()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeAsync<int>("foo");
var reg = sub.Register(async msg =>
{
var count = 0;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var opts = new NatsSubOpts { StartUpTimeout = TimeSpan.FromSeconds(1) };
await using var rep = await nats.RequestSubAsync<int, int>("foo", 2, replyOpts: opts);
await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token))
{
count++;
}
await Task.Delay(2_000);
await msg.ReplyAsync(msg.Data * 2);
});

Assert.Equal(0, count);
Assert.Equal(NatsSubEndReason.StartUpTimeout, rep.EndReason);
var count = 0;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var opts = new NatsSubOpts { StartUpTimeout = TimeSpan.FromSeconds(1) };
await using var rep =
await nats.RequestSubAsync<int, int>("foo", 2, replyOpts: opts, cancellationToken: cts.Token);
await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token))
{
count++;
}

// Idle Timeout
Assert.Equal(0, count);
Assert.Equal(NatsSubEndReason.StartUpTimeout, rep.EndReason);

await sub.DisposeAsync();
await reg;
}

[Fact]
public async Task Request_reply_many_test_max_count()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeAsync<int>("foo");
var reg = sub.Register(async msg =>
{
var results = new[] { 6, 9 };
var count = 0;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var opts = new NatsSubOpts { IdleTimeout = TimeSpan.FromSeconds(4) };
await using var rep = await nats.RequestSubAsync<int, int>("foo", 3, replyOpts: opts);
await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token))
{
Assert.Equal(results[count++], msg.Data);
}
await msg.ReplyAsync(msg.Data * 2);
await msg.ReplyAsync(msg.Data * 3);
await msg.ReplyAsync(msg.Data * 4);
await msg.ReplyAsync<int?>(null); // sentinel
});

Assert.Equal(2, count);
Assert.Equal(NatsSubEndReason.IdleTimeout, rep.EndReason);
var results = new[] { 2, 3, 4 };
var count = 0;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var opts = new NatsSubOpts { MaxMsgs = 2 };
await using var rep =
await nats.RequestSubAsync<int, int>("foo", 1, replyOpts: opts, cancellationToken: cts.Token);
await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token))
{
Assert.Equal(results[count++], msg.Data);
}

// Overall Timeout
Assert.Equal(2, count);
Assert.Equal(NatsSubEndReason.MaxMsgs, rep.EndReason);

await sub.DisposeAsync();
await reg;
}

[Fact]
public async Task Request_reply_many_test_sentinel()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeAsync<int>("foo");
var reg = sub.Register(async msg =>
{
var results = new[] { 8, 12 };
var count = 0;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var opts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(4) };
await using var rep = await nats.RequestSubAsync<int, int>("foo", 4, replyOpts: opts);
await foreach (var msg in rep.Msgs.ReadAllAsync(cts.Token))
{
Assert.Equal(results[count++], msg.Data);
}
await msg.ReplyAsync(msg.Data * 2);
await msg.ReplyAsync(msg.Data * 3);
await msg.ReplyAsync(msg.Data * 4);
await msg.ReplyAsync<int?>(null); // sentinel
});

Assert.Equal(2, count);
Assert.Equal(NatsSubEndReason.Timeout, rep.EndReason);
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var results = new[] { 2, 3, 4 };
var count = 0;
await foreach (var msg in nats.RequestManyAsync<int, int?>("foo", 1, cancellationToken: cts.Token))
{
Assert.Equal(results[count++], msg.Data);
}

Assert.Equal(3, count);

await sub.DisposeAsync();
await reg;
}
Expand Down
33 changes: 20 additions & 13 deletions tests/NATS.Client.Core.Tests/SubscriptionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ public class SubscriptionTest
[Fact]
public async Task Subscription_periodic_cleanup_test()
{
await using var server = NatsServer.Start(_output);
var serverOptions = new NatsServerOptionsBuilder()
.Trace()
.UseTransport(TransportType.Tcp)
.Build();
await using var server = NatsServer.Start(_output, serverOptions);
var options = NatsOptions.Default with { SubscriptionCleanUpInterval = TimeSpan.FromSeconds(1) };
var (nats, proxy) = server.CreateProxiedClientConnection(options);

Expand All @@ -18,8 +22,10 @@ async Task Isolator()
var sub = await nats.SubscribeAsync<int>("foo");

await Retry.Until(
"unsubscribed",
() => proxy.ClientFrames.Count(f => f.Message.StartsWith("SUB")) == 1);
reason: "unsubscribed",
condition: () => proxy.ClientFrames.Count(f => f.Message.StartsWith("SUB")) == 1,
retryDelay: TimeSpan.FromSeconds(.5),
timeout: TimeSpan.FromSeconds(20));

// subscription object will be eligible for GC after next statement
Assert.Equal("foo", sub.Subject);
Expand All @@ -30,14 +36,15 @@ await Retry.Until(
GC.Collect();

await Retry.Until(
"unsubscribe message received",
() => proxy.ClientFrames.Count(f => f.Message.StartsWith("UNSUB")) == 1,
() =>
reason: "unsubscribe message received",
condition: () => proxy.ClientFrames.Count(f => f.Message.StartsWith("UNSUB")) >= 1,
action: () =>
{
GC.Collect();
return Task.CompletedTask;
},
retryDelay: TimeSpan.FromSeconds(.5));
retryDelay: TimeSpan.FromSeconds(.5),
timeout: TimeSpan.FromSeconds(20));
}

[Fact]
Expand Down Expand Up @@ -65,14 +72,14 @@ async Task Isolator()

// Publish should trigger UNSUB since NatsSub object should be collected by now.
await Retry.Until(
"unsubscribe message received",
() => proxy.ClientFrames.Count(f => f.Message.StartsWith("UNSUB")) == 1,
async () =>
reason: "unsubscribe message received",
condition: () => proxy.ClientFrames.Count(f => f.Message.StartsWith("UNSUB")) >= 1,
action: async () =>
{
GC.Collect();
await nats.PublishAsync("foo", 1);
},
timeout: TimeSpan.FromSeconds(30),
timeout: TimeSpan.FromSeconds(20),
retryDelay: TimeSpan.FromSeconds(.5));
}

Expand Down Expand Up @@ -132,7 +139,7 @@ public async Task Auto_unsubscribe_test()
// Auto unsubscribe on idle timeout
{
const string subject = "foo3";
var opts = new NatsSubOpts { IdleTimeout = TimeSpan.FromSeconds(2) };
var opts = new NatsSubOpts { IdleTimeout = TimeSpan.FromSeconds(3) };

await using var sub = await nats.SubscribeAsync<int>(subject, opts);

Expand All @@ -141,7 +148,7 @@ public async Task Auto_unsubscribe_test()
await nats.PublishAsync(subject, 2);
await Task.Delay(TimeSpan.FromSeconds(.1));
await nats.PublishAsync(subject, 3);
await Task.Delay(TimeSpan.FromSeconds(2.1));
await Task.Delay(TimeSpan.FromSeconds(5));
await nats.PublishAsync(subject, 100);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
Expand Down

0 comments on commit 7b52837

Please sign in to comment.