Skip to content

Commit

Permalink
InputFlowControl update
Browse files Browse the repository at this point in the history
  • Loading branch information
ladeak committed Sep 8, 2024
1 parent 97c47e2 commit 8878dad
Show file tree
Hide file tree
Showing 4 changed files with 525 additions and 93 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,94 +19,125 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl;
/// <seealso href="https://datatracker.ietf.org/doc/html/rfc9113#name-flow-control"/>
internal sealed class InputFlowControl
{
private readonly int _initialWindowSize;
private record struct FlowControlState
{
private const long AbortedBitMask = 1L << 32; // uint MaxValue + 1
internal long _state;

public FlowControlState(uint initialWindowSize, bool isAborted)
{
_state = initialWindowSize;
if (isAborted)
{
_state |= AbortedBitMask;
}
}

public uint Available => (uint)_state;

public bool IsAborted => (_state & AbortedBitMask) > 0;
}

private readonly uint _initialWindowSize;
private readonly int _minWindowSizeIncrement;

private FlowControl _flow;
private FlowControlState _flow;
private int _pendingUpdateSize;
private bool _windowUpdatesDisabled;
private readonly object _flowLock = new();

public InputFlowControl(uint initialWindowSize, uint minWindowSizeIncrement)
{
Debug.Assert(initialWindowSize >= minWindowSizeIncrement, "minWindowSizeIncrement is greater than the window size.");

_flow = new FlowControl(initialWindowSize);
_initialWindowSize = (int)initialWindowSize;
_flow = new FlowControlState(initialWindowSize, false);
_initialWindowSize = initialWindowSize;
_minWindowSizeIncrement = (int)minWindowSizeIncrement;
}

public bool IsAvailabilityLow => _flow.Available < _minWindowSizeIncrement;

// Test hook, not participating in mutual exclusion
internal uint Available => _flow.Available;

public void Reset()
{
_flow = new FlowControl((uint)_initialWindowSize);
_flow = new FlowControlState(_initialWindowSize, false);
_pendingUpdateSize = 0;
_windowUpdatesDisabled = false;
}

public bool TryAdvance(int bytes)
{
lock (_flowLock)
FlowControlState currentFlow, computedFlow;
do
{
currentFlow = _flow; // Copy
// Even if the stream is aborted, the client should never send more data than was available in the
// flow-control window at the time of the abort.
if (bytes > _flow.Available)
if (bytes > currentFlow.Available)
{
throw new Http2ConnectionErrorException(CoreStrings.Http2ErrorFlowControlWindowExceeded, Http2ErrorCode.FLOW_CONTROL_ERROR, ConnectionEndReason.FlowControlWindowExceeded);
}

if (_flow.IsAborted)
if (currentFlow.IsAborted)
{
// This data won't be read by the app, so tell the caller to count the data as already consumed.
return false;
}

_flow.Advance(bytes);
return true;
}
computedFlow = new FlowControlState(currentFlow.Available - (uint)bytes, currentFlow.IsAborted);
} while (currentFlow._state != Interlocked.CompareExchange(ref _flow._state, computedFlow._state, currentFlow._state));

return true;
}

public bool TryUpdateWindow(int bytes, out int updateSize)
{
lock (_flowLock)
FlowControlState currentFlow, computedFlow;
do
{
updateSize = 0;

if (_flow.IsAborted)
currentFlow = _flow; // Copy
if (currentFlow.IsAborted)
{
// All data received by stream has already been returned to the connection window.
return false;
}

if (!_flow.TryUpdateWindow(bytes))
var maxUpdate = int.MaxValue - currentFlow.Available;
if (bytes > maxUpdate)
{
// We only try to update the window back to its initial size after the app consumes data.
// It shouldn't be possible for the window size to ever exceed Http2PeerSettings.MaxWindowSize.
Debug.Assert(false, $"{nameof(TryUpdateWindow)} attempted to grow window past max size.");
}
computedFlow = new FlowControlState(currentFlow.Available + (uint)bytes, currentFlow.IsAborted);
} while (currentFlow._state != Interlocked.CompareExchange(ref _flow._state, computedFlow._state, currentFlow._state));

if (_windowUpdatesDisabled)
{
// Continue returning space to the connection window. The end of the stream has already
// been received, so don't send window updates for the stream window.
return true;
}

var potentialUpdateSize = _pendingUpdateSize + bytes;
if (_windowUpdatesDisabled)
{
// Continue returning space to the connection window. The end of the stream has already
// been received, so don't send window updates for the stream window.
return true;
}

int computedPendingUpdateSize, currentPendingSize;
do
{
updateSize = 0;
currentPendingSize = _pendingUpdateSize;
var potentialUpdateSize = currentPendingSize + bytes;
if (potentialUpdateSize > _minWindowSizeIncrement)
{
_pendingUpdateSize = 0;
computedPendingUpdateSize = 0;
updateSize = potentialUpdateSize;
}
else
{
_pendingUpdateSize = potentialUpdateSize;
computedPendingUpdateSize = potentialUpdateSize;
}
} while (currentPendingSize != Interlocked.CompareExchange(ref _pendingUpdateSize, computedPendingUpdateSize, currentPendingSize));

return true;
}
return true;
}

public void StopWindowUpdates()
Expand All @@ -116,18 +147,21 @@ public void StopWindowUpdates()

public int Abort()
{
lock (_flowLock)
FlowControlState currentFlow, computedFlow;
do
{
if (_flow.IsAborted)
currentFlow = _flow; // Copy
if (currentFlow.IsAborted)
{
return 0;
}

_flow.Abort();
computedFlow = new FlowControlState(currentFlow.Available, true);
} while (currentFlow._state != Interlocked.CompareExchange(ref _flow._state, computedFlow._state, currentFlow._state));

// Tell caller to return connection window space consumed by this stream. Even if window updates have
// been disabled at the stream level, connection-level window updates may still be necessary.
return _initialWindowSize - _flow.Available;
}
// Tell caller to return connection window space consumed by this stream. Even if window updates have
// been disabled at the stream level, connection-level window updates may still be necessary.
return (int)(_initialWindowSize - computedFlow.Available);
}
}

Loading

0 comments on commit 8878dad

Please sign in to comment.