Skip to content

Commit

Permalink
Reconnect backoff (#211)
Browse files Browse the repository at this point in the history
* Reconnect backoff

To help minimize the impact on the servers during failures.

* Exponential backoff

* Backoff negative check
  • Loading branch information
mtmk authored Nov 16, 2023
1 parent 22c314a commit c6ffcce
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 82 deletions.
45 changes: 43 additions & 2 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public partial class NatsConnection : IAsyncDisposable, INatsConnection
private TlsCerts? _tlsCerts;
private ClientOpts _clientOpts;
private UserCredentials? _userCredentials;
private int _connectRetry;
private TimeSpan _backoff = TimeSpan.Zero;

public NatsConnection()
: this(NatsOpts.Default)
Expand Down Expand Up @@ -432,7 +434,7 @@ private async void ReconnectLoop()
{
ConnectionState = NatsConnectionState.Reconnecting;
_waitForOpenConnection.TrySetCanceled();
_waitForOpenConnection = new TaskCompletionSource();
_waitForOpenConnection = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_pingTimerCancellationTokenSource?.Cancel();
}

Expand Down Expand Up @@ -522,6 +524,8 @@ private async void ReconnectLoop()

lock (_gate)
{
_connectRetry = 0;
_backoff = TimeSpan.Zero;
_logger.LogInformation("Connect succeed {0}, NATS {1}", _name, url);
ConnectionState = NatsConnectionState.Open;
_pingTimerCancellationTokenSource = new CancellationTokenSource();
Expand All @@ -535,6 +539,7 @@ private async void ReconnectLoop()
{
if (ex is OperationCanceledException)
return;
_waitForOpenConnection.TrySetException(ex);
_logger.LogError(ex, "Unknown error, loop stopped and connection is invalid state.");
}
}
Expand All @@ -561,8 +566,44 @@ private NatsUri FixTlsHost(NatsUri uri)

private async Task WaitWithJitterAsync()
{
int retry;
TimeSpan backoff;
lock (_gate)
{
retry = _connectRetry++;

if (Opts.ReconnectWaitMin >= Opts.ReconnectWaitMax)
{
_backoff = Opts.ReconnectWaitMin;
}
else if (_backoff == TimeSpan.Zero)
{
_backoff = Opts.ReconnectWaitMin;
}
else if (_backoff == Opts.ReconnectWaitMax)
{
}
else
{
_backoff *= 2;
if (_backoff > Opts.ReconnectWaitMax)
{
_backoff = Opts.ReconnectWaitMax;
}
else if (_backoff <= TimeSpan.Zero)
{
_backoff = TimeSpan.FromSeconds(1);
}
}

backoff = _backoff;
}

if (Opts.MaxReconnectRetry > 0 && retry > Opts.MaxReconnectRetry)
throw new NatsException("Max connect retry exceeded.");

var jitter = Random.Shared.NextDouble() * Opts.ReconnectJitter.TotalMilliseconds;
var waitTime = Opts.ReconnectWait + TimeSpan.FromMilliseconds(jitter);
var waitTime = TimeSpan.FromMilliseconds(jitter) + backoff;
if (waitTime != TimeSpan.Zero)
{
_logger.LogTrace("Wait {0}ms to reconnect.", waitTime.TotalMilliseconds);
Expand Down
157 changes: 77 additions & 80 deletions src/NATS.Client.Core/NatsOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,88 +8,85 @@ namespace NATS.Client.Core;
/// <summary>
/// Immutable options for NatsConnection, you can configure via `with` operator.
/// </summary>
/// <param name="Url"></param>
/// <param name="Name"></param>
/// <param name="Echo"></param>
/// <param name="Verbose"></param>
/// <param name="Headers"></param>
/// <param name="AuthOpts"></param>
/// <param name="TlsOpts"></param>
/// <param name="SerializerRegistry"></param>
/// <param name="LoggerFactory"></param>
/// <param name="WriterBufferSize"></param>
/// <param name="ReaderBufferSize"></param>
/// <param name="UseThreadPoolCallback"></param>
/// <param name="InboxPrefix"></param>
/// <param name="NoRandomize"></param>
/// <param name="PingInterval"></param>
/// <param name="MaxPingOut"></param>
/// <param name="ReconnectWait"></param>
/// <param name="ReconnectJitter"></param>
/// <param name="ConnectTimeout"></param>
/// <param name="ObjectPoolSize"></param>
/// <param name="RequestTimeout"></param>
/// <param name="CommandTimeout"></param>
/// <param name="SubscriptionCleanUpInterval"></param>
/// <param name="WriterCommandBufferLimit"></param>
/// <param name="HeaderEncoding"></param>
/// <param name="WaitUntilSent"></param>
public sealed record NatsOpts
(
string Url,
string Name,
bool Echo,
bool Verbose,
bool Headers,
NatsAuthOpts AuthOpts,
NatsTlsOpts TlsOpts,
INatsSerializerRegistry SerializerRegistry,
ILoggerFactory LoggerFactory,
int WriterBufferSize,
int ReaderBufferSize,
bool UseThreadPoolCallback,
string InboxPrefix,
bool NoRandomize,
TimeSpan PingInterval,
int MaxPingOut,
TimeSpan ReconnectWait,
TimeSpan ReconnectJitter,
TimeSpan ConnectTimeout,
int ObjectPoolSize,
TimeSpan RequestTimeout,
TimeSpan CommandTimeout,
TimeSpan SubscriptionCleanUpInterval,
int? WriterCommandBufferLimit,
Encoding HeaderEncoding,
bool WaitUntilSent)
{
public static readonly NatsOpts Default = new(
Url: "nats://localhost:4222",
Name: "NATS .Net Client",
Echo: true,
Verbose: false,
Headers: true,
AuthOpts: NatsAuthOpts.Default,
TlsOpts: NatsTlsOpts.Default,
SerializerRegistry: NatsDefaultSerializerRegistry.Default,
LoggerFactory: NullLoggerFactory.Instance,
WriterBufferSize: 65534, // 32767
ReaderBufferSize: 1048576,
UseThreadPoolCallback: false,
InboxPrefix: "_INBOX",
NoRandomize: false,
PingInterval: TimeSpan.FromMinutes(2),
MaxPingOut: 2,
ReconnectWait: TimeSpan.FromSeconds(2),
ReconnectJitter: TimeSpan.FromMilliseconds(100),
ConnectTimeout: TimeSpan.FromSeconds(2),
ObjectPoolSize: 256,
RequestTimeout: TimeSpan.FromSeconds(5),
CommandTimeout: TimeSpan.FromMinutes(1),
SubscriptionCleanUpInterval: TimeSpan.FromMinutes(5),
WriterCommandBufferLimit: 1_000,
HeaderEncoding: Encoding.ASCII,
WaitUntilSent: false);
public static readonly NatsOpts Default = new();

public string Url { get; init; } = "nats://localhost:4222";

public string Name { get; init; } = "NATS .Net Client";

public bool Echo { get; init; } = true;

public bool Verbose { get; init; } = false;

public bool Headers { get; init; } = true;

public NatsAuthOpts AuthOpts { get; init; } = NatsAuthOpts.Default;

public NatsTlsOpts TlsOpts { get; init; } = NatsTlsOpts.Default;

public INatsSerializerRegistry SerializerRegistry { get; init; } = NatsDefaultSerializerRegistry.Default;

public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance;

public int WriterBufferSize { get; init; } = 65534;

public int ReaderBufferSize { get; init; } = 1048576;

public bool UseThreadPoolCallback { get; init; } = false;

public string InboxPrefix { get; init; } = "_INBOX";

public bool NoRandomize { get; init; } = false;

public TimeSpan PingInterval { get; init; } = TimeSpan.FromMinutes(2);

public int MaxPingOut { get; init; } = 2;

/// <summary>
/// Minimum amount of time to wait between reconnect attempts. (default: 2s)
/// </summary>
public TimeSpan ReconnectWaitMin { get; init; } = TimeSpan.FromSeconds(2);

/// <summary>
/// Random amount of time to wait between reconnect attempts. (default: 100ms)
/// </summary>
public TimeSpan ReconnectJitter { get; init; } = TimeSpan.FromMilliseconds(100);

public TimeSpan ConnectTimeout { get; init; } = TimeSpan.FromSeconds(2);

public int ObjectPoolSize { get; init; } = 256;

public TimeSpan RequestTimeout { get; init; } = TimeSpan.FromSeconds(5);

public TimeSpan CommandTimeout { get; init; } = TimeSpan.FromMinutes(1);

public TimeSpan SubscriptionCleanUpInterval { get; init; } = TimeSpan.FromMinutes(5);

public int? WriterCommandBufferLimit { get; init; } = 1_000;

public Encoding HeaderEncoding { get; init; } = Encoding.ASCII;

public bool WaitUntilSent { get; init; } = false;

/// <summary>
/// Maximum number of reconnect attempts. (default: -1, unlimited)
/// </summary>
/// <remarks>
/// Set to -1 for unlimited retries.
/// </remarks>
public int MaxReconnectRetry { get; init; } = -1;

/// <summary>
/// Backoff delay limit for reconnect attempts. (default: 5 seconds)
/// </summary>
/// <remarks>
/// When the connection is lost, the client will wait for <see cref="ReconnectWaitMin"/> before attempting to reconnect.
/// Every failed attempt will increase the wait time by 2x, up to <see cref="ReconnectWaitMax"/>.
/// If <see cref="ReconnectWaitMax"/> is equal to or less than <see cref="ReconnectWaitMin"/>, the delay will be constant.
/// </remarks>
public TimeSpan ReconnectWaitMax { get; init; } = TimeSpan.FromSeconds(5);

internal NatsUri[] GetSeedUris()
{
Expand Down
59 changes: 59 additions & 0 deletions tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
namespace NATS.Client.Core.Tests;

public class ConnectionRetryTest
{
private readonly ITestOutputHelper _output;

public ConnectionRetryTest(ITestOutputHelper output) => _output = output;

[Fact]
public async Task Max_retry_reached_after_disconnect()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection(new NatsOpts
{
MaxReconnectRetry = 2,
ReconnectWaitMax = TimeSpan.Zero,
ReconnectWaitMin = TimeSpan.FromSeconds(.1),
});

var signal = new WaitSignal();
nats.ReconnectFailed += (_, e) => signal.Pulse();

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await server.StopAsync();

await signal;
var exception = await Assert.ThrowsAsync<NatsException>(async () => await nats.PingAsync(cts.Token));
Assert.Equal("Max connect retry exceeded.", exception.Message);
}

[Fact]
public async Task Retry_and_connect_after_disconnected()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection(new NatsOpts
{
MaxReconnectRetry = 10,
ReconnectWaitMax = TimeSpan.Zero,
ReconnectWaitMin = TimeSpan.FromSeconds(2),
});

var signal = new WaitSignal();
nats.ReconnectFailed += (_, e) => signal.Pulse();

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await server.StopAsync();

await signal;

await Task.Delay(TimeSpan.FromSeconds(5), cts.Token);

server.StartServerProcess();

var rtt = await nats.PingAsync(cts.Token);
Assert.True(rtt > TimeSpan.Zero);
}
}

0 comments on commit c6ffcce

Please sign in to comment.