Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactors how Slic keeps connections alive #3670

Merged
merged 8 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,31 +1,39 @@
// Copyright (c) ZeroC, Inc.

using IceRpc.Transports;
using System.Buffers;
using System.Diagnostics;

namespace IceRpc.Transports.Internal;
namespace IceRpc.Internal;

/// <summary>Decorates <see cref="ReadAsync" /> to fail if no byte is received for over readIdleTimeout. Also decorates
/// <see cref="WriteAsync" /> to schedule a keep alive action (writeIdleTimeout / 2) after a successful write. Both
/// sides of the connection are expected to use the same idle timeouts.</summary>
internal class IdleTimeoutDuplexConnectionDecorator : IDuplexConnection
internal class IceDuplexConnectionDecorator : IDuplexConnection
{
private readonly IDuplexConnection _decoratee;
private Timer? _keepAliveTimer;
private readonly Timer _writerTimer;
private readonly CancellationTokenSource _readCts = new();
private TimeSpan _readIdleTimeout = Timeout.InfiniteTimeSpan;
private TimeSpan _writeIdleTimeout = Timeout.InfiniteTimeSpan;
private readonly TimeSpan _readIdleTimeout;
private readonly TimeSpan _writeIdleTimeout;

public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) =>
_decoratee.ConnectAsync(cancellationToken);
public async Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
{
TransportConnectionInformation connectionInformation = await _decoratee.ConnectAsync(cancellationToken)
.ConfigureAwait(false);

// Schedule or reschedule a keep alive after a successful connection establishment.
ResetWriteTimer();
return connectionInformation;
}

public void Dispose()
{
_decoratee.Dispose();
_readCts.Dispose();

// Using Dispose is fine, there's no need to wait for the keep alive action to terminate if it's running.
_keepAliveTimer?.Dispose();
_writerTimer.Dispose();
}

public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
Expand Down Expand Up @@ -72,50 +80,30 @@ async ValueTask PerformWriteAsync()
{
await _decoratee.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);

// After each successful write, we schedule one ping (keep alive or heartbeat) at _writeIdleTimeout / 2 in
// the future. Since each ping is itself a write, if there is no application activity at all, we'll send
// successive pings at _writeIdleTimeout / 2 intervals.
ScheduleKeepAlive();
// After each successful write, we (re)schedule one ping (heartbeat) at _writeIdleTimeout / 2 in the future.
// Since each ping is itself a write, if there is no application activity at all, we'll send successive
// pings at _writeIdleTimeout / 2 intervals.
ResetWriteTimer();
}
}

/// <summary>Constructs a decorator that does nothing until it is enabled by a call to <see cref="Enable" />.
/// </summary>
internal IdleTimeoutDuplexConnectionDecorator(IDuplexConnection decoratee) => _decoratee = decoratee;

/// <summary>Constructs a decorator that ensures a call to <see cref="ReadAsync" /> will fail after readIdleTimeout.
/// This decorator also schedules a keepAliveAction after each write (see <see cref="ScheduleKeepAlive" />).
/// </summary>
/// <remarks>Do not call <see cref="Enable" /> on a decorator constructed with this constructor.</remarks>
internal IdleTimeoutDuplexConnectionDecorator(
/// This decorator also schedules a keepAliveAction after each write (see <see cref="ResetWriteTimer" />).</summary>
internal IceDuplexConnectionDecorator(
IDuplexConnection decoratee,
TimeSpan readIdleTimeout,
TimeSpan writeIdleTimeout,
Action keepAliveAction)
: this(decoratee)
{
Debug.Assert(writeIdleTimeout != Timeout.InfiniteTimeSpan);
_decoratee = decoratee;
_readIdleTimeout = readIdleTimeout; // can be infinite i.e. disabled
_writeIdleTimeout = writeIdleTimeout;
_keepAliveTimer = new Timer(_ => keepAliveAction());
}
_writerTimer = new Timer(_ => keepAliveAction());

/// <summary>Enables the read and write idle timeouts; also schedules one keep-alive.</summary>.
internal void Enable(TimeSpan idleTimeout, Action? keepAliveAction)
{
Debug.Assert(idleTimeout != Timeout.InfiniteTimeSpan);
Debug.Assert(_keepAliveTimer is null);

_readIdleTimeout = idleTimeout;
_writeIdleTimeout = idleTimeout;

if (keepAliveAction is not null)
{
_keepAliveTimer = new Timer(_ => keepAliveAction());
ScheduleKeepAlive();
}
// We can't schedule a keep alive right away because the connection is not connected yet.
}

/// <summary>Schedules one keep alive in writeIdleTimeout / 2.</summary>
internal void ScheduleKeepAlive() => _keepAliveTimer?.Change(_writeIdleTimeout / 2, Timeout.InfiniteTimeSpan);
/// <summary>Resets the write timer. We send a keep alive when this timer expires.</summary>
private void ResetWriteTimer() => _writerTimer.Change(_writeIdleTimeout / 2, Timeout.InfiniteTimeSpan);
}
10 changes: 1 addition & 9 deletions src/IceRpc/Internal/IceProtocolConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ internal sealed class IceProtocolConnection : IProtocolConnection
// A connection refuses invocations when it's disposed, shut down, shutting down or merely "shutdown requested".
private bool _refuseInvocations;

// When not null, schedules one keep-alive action in options.IdleTimeout / 2.
private readonly Action? _scheduleKeepAlive;

// Does ShutdownAsync send a close connection frame?
private bool _sendCloseConnectionFrame = true;

Expand Down Expand Up @@ -143,9 +140,6 @@ internal sealed class IceProtocolConnection : IProtocolConnection
throw new InvalidDataException(
$"Expected '{nameof(IceFrameType.ValidateConnection)}' frame but received frame type '{validateConnectionFrame.FrameType}'.");
}

// Schedules a keep-alive to keep the connection alive now that it's established.
_scheduleKeepAlive?.Invoke();
}
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -600,13 +594,11 @@ internal IceProtocolConnection(

if (options.IceIdleTimeout != Timeout.InfiniteTimeSpan)
{
var duplexConnectionDecorator = new IdleTimeoutDuplexConnectionDecorator(
duplexConnection = new IceDuplexConnectionDecorator(
duplexConnection,
readIdleTimeout: options.EnableIceIdleCheck ? options.IceIdleTimeout : Timeout.InfiniteTimeSpan,
writeIdleTimeout: options.IceIdleTimeout,
KeepAlive);
duplexConnection = duplexConnectionDecorator;
_scheduleKeepAlive = duplexConnectionDecorator.ScheduleKeepAlive;
}

_duplexConnection = duplexConnection;
Expand Down
79 changes: 44 additions & 35 deletions src/IceRpc/Transports/Slic/Internal/SlicConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ internal class SlicConnection : IMultiplexedConnection
private Task<TransportConnectionInformation>? _connectTask;
private readonly CancellationTokenSource _disposedCts = new();
private Task? _disposeTask;
private readonly IDuplexConnection _duplexConnection;
private readonly SlicDuplexConnectionDecorator _duplexConnection;
private readonly DuplexConnectionReader _duplexConnectionReader;
private readonly SlicDuplexConnectionWriter _duplexConnectionWriter;
private readonly Action<TimeSpan, Action?> _enableIdleTimeoutAndKeepAlive;
private bool _isClosed;
private ulong? _lastRemoteBidirectionalStreamId;
private ulong? _lastRemoteUnidirectionalStreamId;
Expand Down Expand Up @@ -265,9 +264,7 @@ async Task<TransportConnectionInformation> PerformConnectAsync()

if (idleTimeout != Timeout.InfiniteTimeSpan)
{
// Only client connections send ping frames when idle to keep the server connection alive. The server
// sends back a Pong frame in turn to keep alive the client connection.
_enableIdleTimeoutAndKeepAlive(idleTimeout, IsServer ? null : KeepAlive);
_duplexConnection.Enable(idleTimeout);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to keep IDuplexConnection and then do

if (_duplexConnection is SlicDuplexConnectionDecorator duplexConnectionDecoator)
{
    duplexConnectionDecoator.Enable();
} 

Then we can make SlicDuplexConnectionDecorator timers non-nullable, and keep a single constructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how this would work.

The difficulty here is the idle timeout is negotiated during connection establishment. At construction time, we don't know if the negotiated idle timeout is good to be infinite or some other value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pass the idleTimeout I forget to add it there, and that is not the point. The point is to avoid creating this decorator for the Server connections.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the decorator for server connections too. It implements the idle timer.

}

_readFramesTask = ReadFramesAsync(_disposedCts.Token);
Expand Down Expand Up @@ -317,31 +314,6 @@ async Task<TransportConnectionInformation> PerformConnectAsync()
_ => throw new InvalidDataException($"Received unexpected Slic frame: '{frameType}'."),
};

void KeepAlive()
{
// _pendingPongCount can be < 0 if an unexpected pong is received. If it's the case, the connection is being
// torn down and there's no point in sending a ping frame.
if (Interlocked.Increment(ref _pendingPongCount) > 0)
{
try
{
// For now, the Ping frame payload is just a long which is always set to 0. In the future, it could
// be a ping frame type value if the ping frame is used for different purpose (e.g: a KeepAlive or
// RTT ping frame type).
WriteConnectionFrame(FrameType.Ping, new PingBody(0L).Encode);
}
catch (IceRpcException)
{
// Expected if the connection is closed.
}
catch (Exception exception)
{
Debug.Fail($"The Slic keep alive timer failed with an unexpected exception: {exception}");
throw;
}
}
}

async ValueTask<T> ReadFrameAsync<T>(
Func<FrameType?, ReadOnlySequence<byte>, T> decodeFunc,
CancellationToken cancellationToken)
Expand Down Expand Up @@ -577,10 +549,11 @@ internal SlicConnection(

_closedCancellationToken = _closedCts.Token;

var duplexConnectionDecorator = new IdleTimeoutDuplexConnectionDecorator(duplexConnection);
_enableIdleTimeoutAndKeepAlive = duplexConnectionDecorator.Enable;
// Only the client-side sends pings to keep the connection alive when idle timeout (set later) is not infinite.
_duplexConnection = IsServer ?
new SlicDuplexConnectionDecorator(duplexConnection) :
new SlicDuplexConnectionDecorator(duplexConnection, SendReadPing, SendWritePing);

_duplexConnection = duplexConnectionDecorator;
_duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize);
_duplexConnectionWriter = new SlicDuplexConnectionWriter(
_duplexConnection,
Expand All @@ -598,6 +571,42 @@ internal SlicConnection(
_nextBidirectionalId = 0;
_nextUnidirectionalId = 2;
}

void SendPing(long payload)
{
try
{
WriteConnectionFrame(FrameType.Ping, new PingBody(payload).Encode);
}
catch (IceRpcException)
{
// Expected if the connection is closed.
}
catch (Exception exception)
{
Debug.Fail($"The sending of a Ping frame failed with an unexpected exception: {exception}");
throw;
}
}

void SendReadPing()
{
// This local function is no-op if there is already a pending Pong.
if (Interlocked.CompareExchange(ref _pendingPongCount, 1, 0) == 0)
{
SendPing(1L);
}
}

void SendWritePing()
{
// _pendingPongCount can be <= 0 if an unexpected pong is received. If it's the case, the connection is
// being torn down and there's no point in sending a ping frame.
if (Interlocked.Increment(ref _pendingPongCount) > 0)
{
SendPing(0L);
}
}
}

/// <summary>Fills the given writer with stream data received on the connection.</summary>
Expand Down Expand Up @@ -1190,8 +1199,8 @@ async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken)
(ref SliceDecoder decoder) => new PongBody(ref decoder),
cancellationToken).ConfigureAwait(false);

// For now, we only send a 0 payload value.
if (pongBody.Payload != 0L)
// For now, we only send a 0 or 1 payload value (0 for "write ping" and 1 for "read ping").
if (pongBody.Payload != 0L && pongBody.Payload != 1L)
{
throw new InvalidDataException($"Received {nameof(FrameType.Pong)} with unexpected payload.");
}
Expand Down
127 changes: 127 additions & 0 deletions src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionDecorator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) ZeroC, Inc.

using System.Buffers;
using System.Diagnostics;

namespace IceRpc.Transports.Slic.Internal;

/// <summary>Decorates <see cref="ReadAsync" /> to fail if no byte is received for over idle timeout. Also optionally
/// decorates both <see cref="ReadAsync"/> and <see cref="WriteAsync" /> to schedule pings that prevent both the local
/// and remote idle timers from expiring.</summary>
internal class SlicDuplexConnectionDecorator : IDuplexConnection
{
private readonly IDuplexConnection _decoratee;
private TimeSpan _idleTimeout = Timeout.InfiniteTimeSpan;
private readonly CancellationTokenSource _readCts = new();

private readonly Timer? _readTimer;
private readonly Timer? _writeTimer;

public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) =>
_decoratee.ConnectAsync(cancellationToken);

public void Dispose()
{
_decoratee.Dispose();
_readCts.Dispose();

// Using Dispose is fine, there's no need to wait for the keep alive action to terminate if it's running.
_readTimer?.Dispose();
_writeTimer?.Dispose();
}

public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
{
return _idleTimeout == Timeout.InfiniteTimeSpan ?
_decoratee.ReadAsync(buffer, cancellationToken) :
PerformReadAsync();

async ValueTask<int> PerformReadAsync()
{
try
{
using CancellationTokenRegistration _ = cancellationToken.UnsafeRegister(
cts => ((CancellationTokenSource)cts!).Cancel(),
_readCts);
_readCts.CancelAfter(_idleTimeout); // enable idle timeout before reading

int bytesRead = await _decoratee.ReadAsync(buffer, _readCts.Token).ConfigureAwait(false);

// After each successful read, we schedule one ping some time in the future.
if (bytesRead > 0)
{
ResetReadTimer();
}
// When 0, the other side called ShutdownWriteAsync, so there is no point to send a ping since we can't
// get back a pong.

return bytesRead;
}
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();

throw new IceRpcException(
IceRpcError.ConnectionIdle,
$"The connection did not receive any bytes for over {_idleTimeout.TotalSeconds} s.");
}
finally
{
_readCts.CancelAfter(Timeout.InfiniteTimeSpan); // disable idle timeout if not canceled
}
}
}

public Task ShutdownWriteAsync(CancellationToken cancellationToken) =>
_decoratee.ShutdownWriteAsync(cancellationToken);

public ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
{
return _idleTimeout == Timeout.InfiniteTimeSpan ?
_decoratee.WriteAsync(buffer, cancellationToken) :
PerformWriteAsync();

async ValueTask PerformWriteAsync()
{
await _decoratee.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);

// After each successful write, we schedule one ping some time in the future. Since each ping is itself a
// write, if there is no application activity at all, we'll send successive pings at regular intervals.
ResetWriteTimer();
}
}

/// <summary>Constructs a decorator that does nothing until it is enabled by a call to <see cref="Enable"/>.
/// </summary>
internal SlicDuplexConnectionDecorator(IDuplexConnection decoratee) => _decoratee = decoratee;

/// <summary>Constructs a decorator that does nothing until it is enabled by a call to <see cref="Enable"/>.
/// </summary>
internal SlicDuplexConnectionDecorator(IDuplexConnection decoratee, Action sendReadPing, Action sendWritePing)
: this(decoratee)
{
_readTimer = new Timer(_ => sendReadPing());
_writeTimer = new Timer(_ => sendWritePing());
}

/// <summary>Sets the idle timeout and schedules pings once the connection is established.</summary>.
internal void Enable(TimeSpan idleTimeout)
{
Debug.Assert(idleTimeout != Timeout.InfiniteTimeSpan);
_idleTimeout = idleTimeout;

ResetReadTimer();
ResetWriteTimer();
}

/// <summary>Resets the read timer. We send a "read" ping when this timer expires.</summary>
/// <remarks>This method is no-op unless this decorator is constructed with send ping actions.</remarks>
private void ResetReadTimer() => _readTimer?.Change(_idleTimeout * 0.5, Timeout.InfiniteTimeSpan);

/// <summary>Resets the write timer. We send a "write" ping when this timer expires.</summary>
/// <remarks>This method is no-op unless this decorator is constructed with send ping actions.</remarks>
// The write timer factor (0.6) was chosen to be greater than the read timer factor (0.5). This way, when the
// connection is completely idle, the read timer expires before the write timer and has time to send a ping that
// resets the write timer. This reduces the likelihood of duplicate "keep alive" pings.
private void ResetWriteTimer() => _writeTimer?.Change(_idleTimeout * 0.6, Timeout.InfiniteTimeSpan);
}
Loading
Loading