Skip to content

Commit

Permalink
Improve brokerage WS default rate limits (QuantConnect#8486)
Browse files Browse the repository at this point in the history
- Improve brokerage WS default reconnection rate limits
- Minor cleanup in WebSocketClientWrapper
  • Loading branch information
Martin-Molinero authored Dec 27, 2024
1 parent 9175b86 commit 0cda5c7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 35 deletions.
40 changes: 20 additions & 20 deletions Brokerages/BrokerageMultiWebSocketSubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public BrokerageMultiWebSocketSubscriptionManager(
_subscribeFunc = subscribeFunc;
_unsubscribeFunc = unsubscribeFunc;
_messageHandler = messageHandler;
_connectionRateLimiter = connectionRateLimiter;
// let's use a reasonable default, no API will like to get DOS on reconnections. 50 WS will take 120s
_connectionRateLimiter = connectionRateLimiter ?? new RateGate(5, TimeSpan.FromSeconds(12));

if (_maximumWebSocketConnections > 0)
{
Expand All @@ -99,25 +100,26 @@ public BrokerageMultiWebSocketSubscriptionManager(
};
_reconnectTimer.Elapsed += (_, _) =>
{
Log.Trace("BrokerageMultiWebSocketSubscriptionManager(): Restarting websocket connections");

List<BrokerageMultiWebSocketEntry> webSocketEntries;
lock (_locker)
{
foreach (var entry in _webSocketEntries)
// let's make a copy so we don't hold the lock
webSocketEntries = _webSocketEntries.ToList();
}

Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Restarting {webSocketEntries.Count} websocket connections");

Parallel.ForEach(webSocketEntries, new ParallelOptions { MaxDegreeOfParallelism = 4 }, entry =>
{
if (entry.WebSocket.IsOpen)
{
if (entry.WebSocket.IsOpen)
{
Task.Factory.StartNew(() =>
{
Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - disconnect: ({entry.WebSocket.GetHashCode()})");
Disconnect(entry.WebSocket);

Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - connect: ({entry.WebSocket.GetHashCode()})");
Connect(entry.WebSocket);
});
}
Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - disconnect: ({entry.WebSocket.GetHashCode()})");
Disconnect(entry.WebSocket);

Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - connect: ({entry.WebSocket.GetHashCode()})");
Connect(entry.WebSocket);
}
}
});
};
_reconnectTimer.Start();

Expand Down Expand Up @@ -285,10 +287,7 @@ private void Connect(IWebSocket webSocket)

webSocket.Open += onOpenAction;

if (_connectionRateLimiter is { IsRateLimited: false })
{
_connectionRateLimiter.WaitToProceed();
}
_connectionRateLimiter.WaitToProceed();

try
{
Expand Down Expand Up @@ -331,6 +330,7 @@ private void OnOpen(object sender, EventArgs e)
_subscribeFunc(webSocket, symbol);
}
});
break;
}
}
}
Expand Down
38 changes: 23 additions & 15 deletions Brokerages/WebSocketClientWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System;
using System.IO;
using System.Net.WebSockets;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -120,6 +121,8 @@ public void Close()
{
try
{
_cts?.Cancel();

try
{
_client?.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", _cts.Token).SynchronouslyAwaitTask();
Expand All @@ -129,8 +132,6 @@ public void Close()
// ignored
}

_cts?.Cancel();

_taskConnect?.Wait(TimeSpan.FromSeconds(5));

_cts.DisposeSafely();
Expand Down Expand Up @@ -253,28 +254,39 @@ private void HandleConnection()
}
}
catch (OperationCanceledException) { }
catch (ObjectDisposedException) { }
catch (WebSocketException ex)
{
OnError(new WebSocketError(ex.Message, ex));
connectionCts.Token.WaitHandle.WaitOne(waitTimeOnError);
if (!connectionCts.IsCancellationRequested)
{
OnError(new WebSocketError(ex.Message, ex));
connectionCts.Token.WaitHandle.WaitOne(waitTimeOnError);

// increase wait time until a maximum value. This is useful during brokerage down times
waitTimeOnError += Math.Min(maximumWaitTimeOnError, waitTimeOnError);
// increase wait time until a maximum value. This is useful during brokerage down times
waitTimeOnError += Math.Min(maximumWaitTimeOnError, waitTimeOnError);
}
}
catch (Exception ex)
{
OnError(new WebSocketError(ex.Message, ex));
if (!connectionCts.IsCancellationRequested)
{
OnError(new WebSocketError(ex.Message, ex));
}
}

if (!connectionCts.IsCancellationRequested)
{
connectionCts.Cancel();
}
connectionCts.Cancel();
}
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private MessageData ReceiveMessage(
WebSocket webSocket,
ClientWebSocket webSocket,
CancellationToken ct,
byte[] receiveBuffer,
long maxSize = long.MaxValue)
byte[] receiveBuffer)
{
var buffer = new ArraySegment<byte>(receiveBuffer);

Expand All @@ -286,10 +298,6 @@ private MessageData ReceiveMessage(
{
result = webSocket.ReceiveAsync(buffer, ct).SynchronouslyAwaitTask();
ms.Write(buffer.Array, buffer.Offset, result.Count);
if (ms.Length > maxSize)
{
throw new InvalidOperationException($"Maximum size of the message was exceeded: {_url}");
}
}
while (!result.EndOfMessage);

Expand Down

0 comments on commit 0cda5c7

Please sign in to comment.