diff --git a/src/Transports.AspNetCore/WebSockets/AsyncMessagePump.cs b/src/Transports.AspNetCore/WebSockets/AsyncMessagePump.cs index c9da3dcf..6c3da2d6 100644 --- a/src/Transports.AspNetCore/WebSockets/AsyncMessagePump.cs +++ b/src/Transports.AspNetCore/WebSockets/AsyncMessagePump.cs @@ -22,6 +22,21 @@ internal class AsyncMessagePump private readonly Func _callback; private readonly Queue> _queue = new(); + /// + /// Returns the number of messages in the queue. + /// This count includes any message currently being processed. + /// + public int Count + { + get + { + lock (_queue) + { + return _queue.Count; + } + } + } + /// /// Initializes a new instance with the specified asynchronous callback delegate. /// diff --git a/src/Transports.AspNetCore/WebSockets/WebSocketConnection.cs b/src/Transports.AspNetCore/WebSockets/WebSocketConnection.cs index e75f2349..40c92ddb 100644 --- a/src/Transports.AspNetCore/WebSockets/WebSocketConnection.cs +++ b/src/Transports.AspNetCore/WebSockets/WebSocketConnection.cs @@ -43,6 +43,13 @@ public class WebSocketConnection : IWebSocketConnection /// public HttpContext HttpContext { get; } + /// + /// Returns the number of packets waiting in the send queue, including + /// messages, keep-alive packets, and the close message. + /// This count includes any packet currently being processed. + /// + protected int SendQueueCount => _pump.Count; + /// /// Initializes an instance with the specified parameters. /// @@ -218,7 +225,7 @@ public Task CloseAsync(int eventId, string? description) /// /// The message is posted to a queue and execution returns immediately. /// - public Task SendMessageAsync(OperationMessage message) + public virtual Task SendMessageAsync(OperationMessage message) { _pump.Post(new Message { OperationMessage = message }); return Task.CompletedTask; diff --git a/tests/ApiApprovalTests/net50+net60+net80/GraphQL.Server.Transports.AspNetCore.approved.txt b/tests/ApiApprovalTests/net50+net60+net80/GraphQL.Server.Transports.AspNetCore.approved.txt index 066e8a44..94573a13 100644 --- a/tests/ApiApprovalTests/net50+net60+net80/GraphQL.Server.Transports.AspNetCore.approved.txt +++ b/tests/ApiApprovalTests/net50+net60+net80/GraphQL.Server.Transports.AspNetCore.approved.txt @@ -340,6 +340,7 @@ namespace GraphQL.Server.Transports.AspNetCore.WebSockets public Microsoft.AspNetCore.Http.HttpContext HttpContext { get; } public System.DateTime LastMessageSentAt { get; } public System.Threading.CancellationToken RequestAborted { get; } + protected int SendQueueCount { get; } public System.Threading.Tasks.Task CloseAsync() { } public System.Threading.Tasks.Task CloseAsync(int eventId, string? description) { } public virtual void Dispose() { } @@ -348,7 +349,7 @@ namespace GraphQL.Server.Transports.AspNetCore.WebSockets protected virtual System.Threading.Tasks.Task OnDispatchMessageAsync(GraphQL.Server.Transports.AspNetCore.WebSockets.IOperationMessageProcessor operationMessageProcessor, GraphQL.Transport.OperationMessage message) { } protected virtual System.Threading.Tasks.Task OnNonGracefulShutdownAsync(bool receivedCloseMessage, bool sentCloseMessage) { } protected virtual System.Threading.Tasks.Task OnSendMessageAsync(GraphQL.Transport.OperationMessage message) { } - public System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { } + public virtual System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { } } } namespace GraphQL.Server.Transports.AspNetCore.WebSockets.GraphQLWs diff --git a/tests/ApiApprovalTests/netcoreapp21+netstandard20/GraphQL.Server.Transports.AspNetCore.approved.txt b/tests/ApiApprovalTests/netcoreapp21+netstandard20/GraphQL.Server.Transports.AspNetCore.approved.txt index 1216fed0..6e037d49 100644 --- a/tests/ApiApprovalTests/netcoreapp21+netstandard20/GraphQL.Server.Transports.AspNetCore.approved.txt +++ b/tests/ApiApprovalTests/netcoreapp21+netstandard20/GraphQL.Server.Transports.AspNetCore.approved.txt @@ -358,6 +358,7 @@ namespace GraphQL.Server.Transports.AspNetCore.WebSockets public Microsoft.AspNetCore.Http.HttpContext HttpContext { get; } public System.DateTime LastMessageSentAt { get; } public System.Threading.CancellationToken RequestAborted { get; } + protected int SendQueueCount { get; } public System.Threading.Tasks.Task CloseAsync() { } public System.Threading.Tasks.Task CloseAsync(int eventId, string? description) { } public virtual void Dispose() { } @@ -366,7 +367,7 @@ namespace GraphQL.Server.Transports.AspNetCore.WebSockets protected virtual System.Threading.Tasks.Task OnDispatchMessageAsync(GraphQL.Server.Transports.AspNetCore.WebSockets.IOperationMessageProcessor operationMessageProcessor, GraphQL.Transport.OperationMessage message) { } protected virtual System.Threading.Tasks.Task OnNonGracefulShutdownAsync(bool receivedCloseMessage, bool sentCloseMessage) { } protected virtual System.Threading.Tasks.Task OnSendMessageAsync(GraphQL.Transport.OperationMessage message) { } - public System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { } + public virtual System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { } } } namespace GraphQL.Server.Transports.AspNetCore.WebSockets.GraphQLWs diff --git a/tests/ApiApprovalTests/netcoreapp31/GraphQL.Server.Transports.AspNetCore.approved.txt b/tests/ApiApprovalTests/netcoreapp31/GraphQL.Server.Transports.AspNetCore.approved.txt index 685396c3..38f847f6 100644 --- a/tests/ApiApprovalTests/netcoreapp31/GraphQL.Server.Transports.AspNetCore.approved.txt +++ b/tests/ApiApprovalTests/netcoreapp31/GraphQL.Server.Transports.AspNetCore.approved.txt @@ -340,6 +340,7 @@ namespace GraphQL.Server.Transports.AspNetCore.WebSockets public Microsoft.AspNetCore.Http.HttpContext HttpContext { get; } public System.DateTime LastMessageSentAt { get; } public System.Threading.CancellationToken RequestAborted { get; } + protected int SendQueueCount { get; } public System.Threading.Tasks.Task CloseAsync() { } public System.Threading.Tasks.Task CloseAsync(int eventId, string? description) { } public virtual void Dispose() { } @@ -348,7 +349,7 @@ namespace GraphQL.Server.Transports.AspNetCore.WebSockets protected virtual System.Threading.Tasks.Task OnDispatchMessageAsync(GraphQL.Server.Transports.AspNetCore.WebSockets.IOperationMessageProcessor operationMessageProcessor, GraphQL.Transport.OperationMessage message) { } protected virtual System.Threading.Tasks.Task OnNonGracefulShutdownAsync(bool receivedCloseMessage, bool sentCloseMessage) { } protected virtual System.Threading.Tasks.Task OnSendMessageAsync(GraphQL.Transport.OperationMessage message) { } - public System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { } + public virtual System.Threading.Tasks.Task SendMessageAsync(GraphQL.Transport.OperationMessage message) { } } } namespace GraphQL.Server.Transports.AspNetCore.WebSockets.GraphQLWs diff --git a/tests/Transports.AspNetCore.Tests/WebSockets/TestWebSocketConnection.cs b/tests/Transports.AspNetCore.Tests/WebSockets/TestWebSocketConnection.cs index 8c3b8860..14ac2b17 100644 --- a/tests/Transports.AspNetCore.Tests/WebSockets/TestWebSocketConnection.cs +++ b/tests/Transports.AspNetCore.Tests/WebSockets/TestWebSocketConnection.cs @@ -25,4 +25,6 @@ public Task Do_OnCloseOutputAsync(WebSocketCloseStatus closeStatus, string? clos public TimeSpan Get_DefaultDisconnectionTimeout => DefaultDisconnectionTimeout; + + public int Get_SendQueueCount => base.SendQueueCount; } diff --git a/tests/Transports.AspNetCore.Tests/WebSockets/WebSocketConnectionTests.cs b/tests/Transports.AspNetCore.Tests/WebSockets/WebSocketConnectionTests.cs index b67f134b..6f140822 100644 --- a/tests/Transports.AspNetCore.Tests/WebSockets/WebSocketConnectionTests.cs +++ b/tests/Transports.AspNetCore.Tests/WebSockets/WebSocketConnectionTests.cs @@ -597,18 +597,44 @@ public async Task CloseConnectionAsync_Specific() public async Task SendMessageAsync() { var message = new OperationMessage(); + _mockConnection.Setup(x => x.SendMessageAsync(It.IsAny())).CallBase().Verifiable(); _mockConnection.Protected().Setup("OnSendMessageAsync", message) .Returns(Task.CompletedTask).Verifiable(); await _connection.SendMessageAsync(message); _mockConnection.Verify(); } + [Fact] + public async Task MessageCountAsync() + { + var tc = new TaskCompletionSource(); + var message = new OperationMessage(); + _mockConnection.Setup(x => x.SendMessageAsync(It.IsAny())).CallBase().Verifiable(); + _mockConnection.Protected().Setup("OnSendMessageAsync", message) + .Returns(tc.Task).Verifiable(); + await _connection.SendMessageAsync(message); + _connection.Get_SendQueueCount.ShouldBe(1); + await _connection.SendMessageAsync(message); + _connection.Get_SendQueueCount.ShouldBe(2); + tc.SetResult(true); + for (int i = 0; i < 100; i++) + { + if (_connection.Get_SendQueueCount != 0) + await Task.Delay(100); + else + break; + } + _connection.Get_SendQueueCount.ShouldBe(0); + _mockConnection.Verify(); + } + [Fact] public async Task LastMessageSentAt() { var oldTime = _connection.LastMessageSentAt; await Task.Delay(100); var message = new OperationMessage(); + _mockConnection.Setup(x => x.SendMessageAsync(It.IsAny())).CallBase().Verifiable(); _mockConnection.Protected().Setup("OnSendMessageAsync", message) .Returns(Task.CompletedTask).Verifiable(); await _connection.SendMessageAsync(message); @@ -623,6 +649,7 @@ public async Task DoNotSendMessagesAfterOutputIsClosed() { // send a message var message = new OperationMessage(); + _mockConnection.Setup(x => x.SendMessageAsync(It.IsAny())).CallBase().Verifiable(); _mockConnection.Protected().SetupGet("DefaultDisconnectionTimeout").CallBase().Verifiable(); _mockConnection.Protected().Setup("OnSendMessageAsync", message) .Returns(Task.CompletedTask).Verifiable();