Skip to content

Commit

Permalink
Increase consume and fetch channel size (#246)
Browse files Browse the repository at this point in the history
Increase consume and fetch channel size to avoid blocking the socket reads
for other operations. I think we have done this change before but somehow
reverted back during merges perhaps.
  • Loading branch information
mtmk authored Nov 27, 2023
1 parent 7f78a1c commit 8ae6b9d
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 8 deletions.
6 changes: 2 additions & 4 deletions src/NATS.Client.JetStream/Internal/NatsJSConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,11 @@ public NatsJSConsume(
Timeout.Infinite,
Timeout.Infinite);

// Keep user channel small to avoid blocking the user code
// when disposed otherwise channel reader will continue delivering messages
// if there are messages queued up already. This channel is used to pass messages
// This channel is used to pass messages
// to the user from the subscription channel (which should be set to a
// sufficiently large value to avoid blocking socket reads in the
// NATS connection).
_userMsgs = Channel.CreateBounded<NatsJSMsg<TMsg>>(1);
_userMsgs = Channel.CreateBounded<NatsJSMsg<TMsg>>(1000);
Msgs = _userMsgs.Reader;

// Capacity as 1 is enough here since it's used for signaling only.
Expand Down
9 changes: 5 additions & 4 deletions src/NATS.Client.JetStream/Internal/NatsJSFetch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ public NatsJSFetch(
_pendingMsgs = _maxMsgs;
_pendingBytes = _maxBytes;

// Keep user channel small to avoid blocking the user code when disposed,
// otherwise channel reader will continue delivering messages even after
// this 'fetch' object being disposed.
_userMsgs = Channel.CreateBounded<NatsJSMsg<TMsg>>(1);
// This channel is used to pass messages
// to the user from the subscription channel (which should be set to a
// sufficiently large value to avoid blocking socket reads in the
// NATS connection).
_userMsgs = Channel.CreateBounded<NatsJSMsg<TMsg>>(1000);
Msgs = _userMsgs.Reader;

if (_debug)
Expand Down
67 changes: 67 additions & 0 deletions tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using NATS.Client.Core.Tests;

namespace NATS.Client.JetStream.Tests;

public class DoubleAckTest
{
[Fact]
public async Task Fetch_should_not_block_socket()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await using var server = NatsServer.StartJS();

await using var nats = server.CreateClientConnection();

var js = new NatsJSContext(nats);
await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);

for (var i = 0; i < 100; i++)
{
var ack = await js.PublishAsync("s1.foo", i, cancellationToken: cts.Token);
ack.EnsureSuccess();
}

// fetch loop
{
var consumer = (NatsJSConsumer) await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 26 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

var fetchOpts = new NatsJSFetchOpts
{
MaxMsgs = 100, Expires = TimeSpan.FromSeconds(5),
};

var count = 0;
await foreach (var msg in consumer.FetchAsync<int>(opts: fetchOpts, cancellationToken: cts.Token))
{
// double ack will use the same TCP stream to wait for the ACK from the server
// fetch must not block the socket so that the ACK can be received
await msg.AckAsync(new AckOpts(DoubleAck: true), cts.Token);
count++;
}

Assert.Equal(100, count);
}

// consume loop
{
var consumer = (NatsJSConsumer) await js.CreateConsumerAsync("s1", "c2", cancellationToken: cts.Token);

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 47 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Operator '(NatsJSConsumer)' should not be followed by whitespace. (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1003.md)

var opts = new NatsJSConsumeOpts
{
MaxMsgs = 100, Expires = TimeSpan.FromSeconds(5),
};

var count = 0;
await foreach (var msg in consumer.ConsumeAsync<int>(opts: opts, cancellationToken: cts.Token))
{
// double ack will use the same TCP stream to wait for the ACK from the server
// fetch must not block the socket so that the ACK can be received
await msg.AckAsync(new AckOpts(DoubleAck: true), cts.Token);
count++;
}

Assert.Equal(100, count);
}

}

Check warning on line 66 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 66 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 66 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 66 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 66 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 66 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 66 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 66 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 66 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 66 in tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

}

0 comments on commit 8ae6b9d

Please sign in to comment.