From 10dc97afeff45c4dae7c28582c972939fe1b8dff Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 26 Dec 2025 08:20:54 +0000 Subject: [PATCH 1/3] Initial plan From bee524f93419a04ea45c292e2a41a202a4f9af65 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 26 Dec 2025 08:38:26 +0000 Subject: [PATCH 2/3] feat: migrate exec transport to http2 Co-authored-by: tg123 <170430+tg123@users.noreply.github.com> --- .../KubernetesClient.Aot.csproj | 1 + .../KubernetesClient.Classic.csproj | 1 + src/KubernetesClient/Http2DuplexStream.cs | 262 ++++++++++++++++++ src/KubernetesClient/Kubernetes.WebSocket.cs | 123 +++----- .../KubernetesExecTests.cs | 89 +++--- tests/KubernetesClient.Tests/PodExecTests.cs | 2 +- 6 files changed, 349 insertions(+), 129 deletions(-) create mode 100644 src/KubernetesClient/Http2DuplexStream.cs diff --git a/src/KubernetesClient.Aot/KubernetesClient.Aot.csproj b/src/KubernetesClient.Aot/KubernetesClient.Aot.csproj index 3db7e6139..56174791a 100644 --- a/src/KubernetesClient.Aot/KubernetesClient.Aot.csproj +++ b/src/KubernetesClient.Aot/KubernetesClient.Aot.csproj @@ -84,6 +84,7 @@ + diff --git a/src/KubernetesClient.Classic/KubernetesClient.Classic.csproj b/src/KubernetesClient.Classic/KubernetesClient.Classic.csproj index 902dc41dd..f64d2c839 100644 --- a/src/KubernetesClient.Classic/KubernetesClient.Classic.csproj +++ b/src/KubernetesClient.Classic/KubernetesClient.Classic.csproj @@ -98,6 +98,7 @@ + diff --git a/src/KubernetesClient/Http2DuplexStream.cs b/src/KubernetesClient/Http2DuplexStream.cs new file mode 100644 index 000000000..f94529c31 --- /dev/null +++ b/src/KubernetesClient/Http2DuplexStream.cs @@ -0,0 +1,262 @@ +using System.Collections.Generic; +using System.Net; +using System.Net.Http; +using System.Net.WebSockets; +using System.Threading; + +namespace k8s +{ + internal sealed class ProducerConsumerStream : Stream + { + private readonly Queue queue = new Queue(); + private readonly SemaphoreSlim dataAvailable = new SemaphoreSlim(0); + private readonly object gate = new object(); + + private byte[] currentBuffer; + private int currentOffset; + private bool isCompleted; + private bool disposed; + + public override bool CanRead => !disposed; + public override bool CanSeek => false; + public override bool CanWrite => !disposed && !isCompleted; + public override long Length => throw new NotSupportedException(); + public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } + + public void Complete() + { + lock (gate) + { + if (isCompleted) + { + return; + } + + isCompleted = true; + dataAvailable.Release(); + } + } + + public override void Flush() + { + } + + public override Task FlushAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public override int Read(byte[] buffer, int offset, int count) + { + return ReadAsync(buffer.AsMemory(offset, count)).GetAwaiter().GetResult(); + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + if (disposed) + { + throw new ObjectDisposedException(nameof(ProducerConsumerStream)); + } + + while (true) + { + if (currentBuffer != null && currentOffset < currentBuffer.Length) + { + var toCopy = Math.Min(buffer.Length, currentBuffer.Length - currentOffset); + currentBuffer.AsMemory(currentOffset, toCopy).CopyTo(buffer); + currentOffset += toCopy; + + if (currentOffset >= currentBuffer.Length) + { + currentBuffer = null; + currentOffset = 0; + } + + return toCopy; + } + + await dataAvailable.WaitAsync(cancellationToken).ConfigureAwait(false); + + lock (gate) + { + if (queue.Count > 0) + { + currentBuffer = queue.Dequeue(); + currentOffset = 0; + } + else if (isCompleted) + { + return 0; + } + } + } + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync(buffer.AsMemory(offset, count)).GetAwaiter().GetResult(); + } + + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + if (disposed) + { + throw new ObjectDisposedException(nameof(ProducerConsumerStream)); + } + + if (isCompleted) + { + throw new InvalidOperationException("Stream already completed."); + } + + var copy = buffer.ToArray(); + + lock (gate) + { + queue.Enqueue(copy); + } + + dataAvailable.Release(); + await Task.CompletedTask.ConfigureAwait(false); + } + + protected override void Dispose(bool disposing) + { + if (disposed) + { + return; + } + + disposed = true; + Complete(); + dataAvailable.Dispose(); + base.Dispose(disposing); + } + } + + internal sealed class DuplexStreamContent : HttpContent + { + private readonly ProducerConsumerStream source; + + public DuplexStreamContent(ProducerConsumerStream source) + { + this.source = source ?? throw new ArgumentNullException(nameof(source)); + } + + protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) + { + return SerializeToStreamAsync(stream, context, CancellationToken.None); + } + + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context, CancellationToken cancellationToken) + { + await source.CopyToAsync(stream, cancellationToken).ConfigureAwait(false); + } + + protected override bool TryComputeLength(out long length) + { + length = 0; + return false; + } + } + + internal sealed class Http2WebSocket : WebSocket + { + private readonly ProducerConsumerStream requestStream; + private readonly Stream responseStream; + private readonly HttpResponseMessage response; + private WebSocketCloseStatus? closeStatus; + private string closeStatusDescription; + private WebSocketState state = WebSocketState.Open; + + public Http2WebSocket(ProducerConsumerStream requestStream, Stream responseStream, HttpResponseMessage response) + { + this.requestStream = requestStream ?? throw new ArgumentNullException(nameof(requestStream)); + this.responseStream = responseStream ?? throw new ArgumentNullException(nameof(responseStream)); + this.response = response ?? throw new ArgumentNullException(nameof(response)); + } + + public override WebSocketCloseStatus? CloseStatus => closeStatus; + + public override string CloseStatusDescription => closeStatusDescription; + + public override WebSocketState State => state; + + public override string SubProtocol => null; + + public override void Abort() + { + state = WebSocketState.Aborted; + requestStream.Complete(); + response.Dispose(); + } + + public override async Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken) + { + this.closeStatus ??= closeStatus; + closeStatusDescription ??= statusDescription; + state = WebSocketState.Closed; + requestStream.Complete(); + await responseStream.FlushAsync(cancellationToken).ConfigureAwait(false); + response.Dispose(); + } + + public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken) + { + return CloseAsync(closeStatus, statusDescription, cancellationToken); + } + + public override void Dispose() + { + if (state != WebSocketState.Closed && state != WebSocketState.Aborted) + { + requestStream.Complete(); + } + + response.Dispose(); + responseStream.Dispose(); + state = WebSocketState.Closed; + base.Dispose(); + } + + public override async Task ReceiveAsync(ArraySegment buffer, CancellationToken cancellationToken) + { + var bytesRead = await responseStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + + if (bytesRead == 0) + { + closeStatus ??= WebSocketCloseStatus.NormalClosure; + state = WebSocketState.CloseReceived; + return new WebSocketReceiveResult(0, WebSocketMessageType.Close, true, closeStatus, closeStatusDescription); + } + + return new WebSocketReceiveResult(bytesRead, WebSocketMessageType.Binary, true); + } + + public override async Task SendAsync(ArraySegment buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) + { + if (state == WebSocketState.Closed || state == WebSocketState.Aborted) + { + throw new WebSocketException(WebSocketError.InvalidState); + } + + if (messageType == WebSocketMessageType.Close) + { + await CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); + return; + } + + await requestStream.WriteAsync(buffer.AsMemory(), cancellationToken).ConfigureAwait(false); + } + } +} diff --git a/src/KubernetesClient/Kubernetes.WebSocket.cs b/src/KubernetesClient/Kubernetes.WebSocket.cs index aeca29708..2665749ff 100644 --- a/src/KubernetesClient/Kubernetes.WebSocket.cs +++ b/src/KubernetesClient/Kubernetes.WebSocket.cs @@ -2,19 +2,12 @@ using System.Net; using System.Net.Http; using System.Net.WebSockets; -using System.Security.Cryptography.X509Certificates; using System.Text; namespace k8s { public partial class Kubernetes { - /// - /// Gets a function which returns a which will use to - /// create a new connection to the Kubernetes cluster. - /// - public Func CreateWebSocketBuilder { get; set; } = () => new WebSocketBuilder(); - /// public Task WebSocketNamespacedPodExecAsync(string name, string @namespace = "default", string command = null, string container = null, bool stderr = true, bool stdin = true, bool stdout = true, @@ -82,7 +75,7 @@ public virtual Task WebSocketNamespacedPodExecAsync(string name, stri // Construct URL var uriBuilder = new UriBuilder(BaseUri) { - Scheme = BaseUri.Scheme == "https" ? "wss" : "ws", + Scheme = BaseUri.Scheme, }; if (!uriBuilder.Path.EndsWith("/", StringComparison.InvariantCulture)) @@ -143,7 +136,7 @@ public Task WebSocketNamespacedPodPortForwardAsync(string name, strin // Construct URL var uriBuilder = new UriBuilder(BaseUri) { - Scheme = BaseUri.Scheme == "https" ? "wss" : "ws", + Scheme = BaseUri.Scheme, }; if (!uriBuilder.Path.EndsWith("/", StringComparison.InvariantCulture)) @@ -189,7 +182,7 @@ public Task WebSocketNamespacedPodAttachAsync(string name, string @na // Construct URL var uriBuilder = new UriBuilder(BaseUri) { - Scheme = BaseUri.Scheme == "https" ? "wss" : "ws", + Scheme = BaseUri.Scheme, }; if (!uriBuilder.Path.EndsWith("/", StringComparison.InvariantCulture)) @@ -222,96 +215,53 @@ protected async Task StreamConnectAsync(Uri uri, string webSocketSubP throw new ArgumentNullException(nameof(uri)); } - // Create WebSocket transport objects - var webSocketBuilder = CreateWebSocketBuilder(); - - // Set Headers - if (customHeaders != null) + var requestStream = new ProducerConsumerStream(); + var requestMessage = new HttpRequestMessage(HttpMethod.Post, uri) { - foreach (var header in customHeaders) - { - webSocketBuilder.SetRequestHeader(header.Key, string.Join(" ", header.Value)); - } - } + Version = HttpVersion.Version20, + VersionPolicy = HttpVersionPolicy.RequestVersionExact, + Content = new DuplexStreamContent(requestStream), + }; - // Set Credentials - if (this.HttpClientHandler != null) + if (!string.IsNullOrWhiteSpace(TlsServerName)) { -#if NET5_0_OR_GREATER - foreach (var cert in this.HttpClientHandler.SslOptions.ClientCertificates.OfType()) -#else - foreach (var cert in this.HttpClientHandler.ClientCertificates.OfType()) -#endif - { - webSocketBuilder.AddClientCertificate(cert); - } + requestMessage.Headers.Host = TlsServerName; } - if (Credentials != null) + if (customHeaders != null) { - // Copy the default (credential-related) request headers from the HttpClient to the WebSocket - var message = new HttpRequestMessage(); - await Credentials.ProcessHttpRequestAsync(message, cancellationToken).ConfigureAwait(false); - - foreach (var header in message.Headers) + foreach (var header in customHeaders) { - webSocketBuilder.SetRequestHeader(header.Key, string.Join(" ", header.Value)); + requestMessage.Headers.Remove(header.Key); + requestMessage.Headers.TryAddWithoutValidation(header.Key, header.Value); } } - if (this.CaCerts != null) - { - webSocketBuilder.ExpectServerCertificate(this.CaCerts); - } - - if (this.SkipTlsVerify) + if (Credentials != null) { - webSocketBuilder.SkipServerCertificateValidation(); + await Credentials.ProcessHttpRequestAsync(requestMessage, cancellationToken).ConfigureAwait(false); } - if (webSocketSubProtocol != null) + if (!string.IsNullOrEmpty(webSocketSubProtocol)) { - webSocketBuilder.Options.AddSubProtocol(webSocketSubProtocol); + requestMessage.Headers.TryAddWithoutValidation("X-Stream-Protocol-Version", webSocketSubProtocol); } - // Send Request cancellationToken.ThrowIfCancellationRequested(); - WebSocket webSocket = null; + HttpResponseMessage response = null; try { BeforeRequest(); - webSocket = await webSocketBuilder.BuildAndConnectAsync(uri, cancellationToken).ConfigureAwait(false); - } - catch (WebSocketException wse) when (wse.WebSocketErrorCode == WebSocketError.HeaderError || - (wse.InnerException is WebSocketException && - ((WebSocketException)wse.InnerException).WebSocketErrorCode == - WebSocketError.HeaderError)) - { - // This usually indicates the server sent an error message, like 400 Bad Request. Unfortunately, the WebSocket client - // class doesn't give us a lot of information about what went wrong. So, retry the connection. - var uriBuilder = new UriBuilder(uri) - { - Scheme = uri.Scheme == "wss" ? "https" : "http", - }; - - var response = await HttpClient.GetAsync(uriBuilder.Uri, cancellationToken).ConfigureAwait(false); + response = await HttpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); - if (response.StatusCode == HttpStatusCode.SwitchingProtocols) - { - // This should never happen - the server just allowed us to switch to WebSockets but the previous call didn't work. - // Rethrow the original exception - response.Dispose(); - throw; - } - else + if (!response.IsSuccessStatusCode) { #if NET5_0_OR_GREATER var content = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); #else var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false); #endif - // Try to parse the content as a V1Status object var genericObject = KubernetesJson.Deserialize(content); V1Status status = null; @@ -320,29 +270,30 @@ protected async Task StreamConnectAsync(Uri uri, string webSocketSubP status = KubernetesJson.Deserialize(content); } - var ex = - new HttpOperationException( - $"The operation returned an invalid status code: {response.StatusCode}", wse) - { - Response = new HttpResponseMessageWrapper(response, content), - Body = status != null ? status : content, - }; + var ex = new HttpOperationException($"The operation returned an invalid status code: {response.StatusCode}") + { + Response = new HttpResponseMessageWrapper(response, content), + Body = status != null ? status : content, + }; response.Dispose(); - throw ex; } - } - catch (Exception) - { - throw; + +#if NET5_0_OR_GREATER + var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); +#else + var responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); +#endif + + var webSocket = new Http2WebSocket(requestStream, responseStream, response); + + return webSocket; } finally { AfterRequest(); } - - return webSocket; } } } diff --git a/tests/KubernetesClient.Tests/KubernetesExecTests.cs b/tests/KubernetesClient.Tests/KubernetesExecTests.cs index e0e0d5939..65cb21517 100644 --- a/tests/KubernetesClient.Tests/KubernetesExecTests.cs +++ b/tests/KubernetesClient.Tests/KubernetesExecTests.cs @@ -2,9 +2,12 @@ * These tests are only for the netstandard version of the client (there are separate tests for netcoreapp that connect to a local test-hosted server). */ -using k8s.Tests.Mock; -using System; using System.Collections.Generic; +using System.IO; +using System.Net; +using System; +using System.Net.Http; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Xunit; @@ -13,16 +16,28 @@ namespace k8s.Tests { public class KubernetesExecTests { - /// - /// Tests the - /// method. Changes the used by the client with a mock builder, so this test never hits the network. - /// - /// - /// A which represents the asynchronous test. - /// + private class CaptureHandler : HttpMessageHandler + { + public HttpRequestMessage LastRequest { get; private set; } + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + LastRequest = request; + + var response = new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StreamContent(new MemoryStream()), + Version = HttpVersion.Version20, + }; + + return Task.FromResult(response); + } + } + [Fact] public async Task WebSocketNamespacedPodExecAsync() { + var handler = new CaptureHandler(); var clientConfiguration = new KubernetesClientConfiguration() { Host = "http://localhost", @@ -30,14 +45,11 @@ public async Task WebSocketNamespacedPodExecAsync() Password = "my-secret-password", }; - var client = new Kubernetes(clientConfiguration) + var client = new Kubernetes(clientConfiguration, handler) { BaseUri = new Uri("http://localhost"), }; - MockWebSocketBuilder mockWebSocketBuilder = new MockWebSocketBuilder(); - client.CreateWebSocketBuilder = () => mockWebSocketBuilder; - var webSocket = await client.WebSocketNamespacedPodExecAsync( name: "mypod", @namespace: "mynamespace", @@ -59,29 +71,26 @@ public async Task WebSocketNamespacedPodExecAsync() { "Authorization", "Basic bXktdXNlcjpteS1zZWNyZXQtcGFzc3dvcmQ=" }, }; - Assert.Equal( - mockWebSocketBuilder.PublicWebSocket, - webSocket); // Did the method return the correct web socket? + Assert.NotNull(webSocket); Assert.Equal( new Uri( - "ws://localhost/api/v1/namespaces/mynamespace/pods/mypod/exec?command=%2Fbin%2Fbash&command=-c&command=echo%20Hello%2C%20World%0Aexit%200%0A&container=mycontainer&stderr=1&stdin=1&stdout=1&tty=1"), - mockWebSocketBuilder.Uri); // Did we connect to the correct URL? - Assert.Empty(mockWebSocketBuilder.Certificates); // No certificates were used in this test - Assert.Equal(expectedHeaders, mockWebSocketBuilder.RequestHeaders); // Did we use the expected headers + "http://localhost/api/v1/namespaces/mynamespace/pods/mypod/exec?command=%2Fbin%2Fbash&command=-c&command=echo%20Hello%2C%20World%0Aexit%200%0A&container=mycontainer&stderr=1&stdin=1&stdout=1&tty=1"), + handler.LastRequest.RequestUri); + Assert.Equal(HttpVersion.Version20, handler.LastRequest.Version); + Assert.Empty(handler.LastRequest.Headers.GetValues("X-My-Header").Except(new[] { "myHeaderValue myHeaderValue2" })); + Assert.Equal("Basic bXktdXNlcjpteS1zZWNyZXQtcGFzc3dvcmQ=", handler.LastRequest.Headers.Authorization.ToString()); } [Fact] public async Task WebSocketNamespacedPodPortForwardAsync() { + var handler = new CaptureHandler(); Kubernetes client = new Kubernetes(new KubernetesClientConfiguration() { Host = "http://localhost", Username = "my-user", Password = "my-secret-password", - }); - - MockWebSocketBuilder mockWebSocketBuilder = new MockWebSocketBuilder(); - client.CreateWebSocketBuilder = () => mockWebSocketBuilder; + }, handler); var webSocket = await client.WebSocketNamespacedPodPortForwardAsync( name: "mypod", @@ -99,32 +108,29 @@ public async Task WebSocketNamespacedPodPortForwardAsync() { "Authorization", "Basic bXktdXNlcjpteS1zZWNyZXQtcGFzc3dvcmQ=" }, }; + Assert.NotNull(webSocket); Assert.Equal( - mockWebSocketBuilder.PublicWebSocket, - webSocket); // Did the method return the correct web socket? - Assert.Equal( - new Uri("ws://localhost/api/v1/namespaces/mynamespace/pods/mypod/portforward?ports=80&ports=8080"), - mockWebSocketBuilder.Uri); // Did we connect to the correct URL? - Assert.Empty(mockWebSocketBuilder.Certificates); // No certificates were used in this test - Assert.Equal(expectedHeaders, mockWebSocketBuilder.RequestHeaders); // Did we use the expected headers + new Uri("http://localhost/api/v1/namespaces/mynamespace/pods/mypod/portforward?ports=80&ports=8080"), + handler.LastRequest.RequestUri); // Did we connect to the correct URL? + Assert.Equal(HttpVersion.Version20, handler.LastRequest.Version); + Assert.Empty(handler.LastRequest.Headers.GetValues("X-My-Header").Except(new[] { "myHeaderValue myHeaderValue2" })); + Assert.Equal(expectedHeaders["Authorization"], handler.LastRequest.Headers.Authorization.ToString()); } [Fact] public async Task WebSocketNamespacedPodAttachAsync() { + var handler = new CaptureHandler(); Kubernetes client = new Kubernetes(new KubernetesClientConfiguration() { Host = "http://localhost", Username = "my-user", Password = "my-secret-password", - }) + }, handler) { BaseUri = new Uri("http://localhost"), }; - MockWebSocketBuilder mockWebSocketBuilder = new MockWebSocketBuilder(); - client.CreateWebSocketBuilder = () => mockWebSocketBuilder; - var webSocket = await client.WebSocketNamespacedPodAttachAsync( name: "mypod", @namespace: "mynamespace", @@ -145,15 +151,14 @@ public async Task WebSocketNamespacedPodAttachAsync() { "Authorization", "Basic bXktdXNlcjpteS1zZWNyZXQtcGFzc3dvcmQ=" }, }; - Assert.Equal( - mockWebSocketBuilder.PublicWebSocket, - webSocket); // Did the method return the correct web socket? + Assert.NotNull(webSocket); // Did the method return the correct web socket? Assert.Equal( new Uri( - "ws://localhost:80/api/v1/namespaces/mynamespace/pods/mypod/attach?stderr=1&stdin=1&stdout=1&tty=1&container=my-container"), - mockWebSocketBuilder.Uri); // Did we connect to the correct URL? - Assert.Empty(mockWebSocketBuilder.Certificates); // No certificates were used in this test - Assert.Equal(expectedHeaders, mockWebSocketBuilder.RequestHeaders); // Did we use the expected headers + "http://localhost:80/api/v1/namespaces/mynamespace/pods/mypod/attach?stderr=1&stdin=1&stdout=1&tty=1&container=my-container"), + handler.LastRequest.RequestUri); // Did we connect to the correct URL? + Assert.Equal(HttpVersion.Version20, handler.LastRequest.Version); + Assert.Empty(handler.LastRequest.Headers.GetValues("X-My-Header").Except(new[] { "myHeaderValue myHeaderValue2" })); + Assert.Equal(expectedHeaders["Authorization"], handler.LastRequest.Headers.Authorization.ToString()); } } } diff --git a/tests/KubernetesClient.Tests/PodExecTests.cs b/tests/KubernetesClient.Tests/PodExecTests.cs index d37e098e8..77eb4b70f 100644 --- a/tests/KubernetesClient.Tests/PodExecTests.cs +++ b/tests/KubernetesClient.Tests/PodExecTests.cs @@ -43,7 +43,7 @@ public PodExecTests(ITestOutputHelper testOutput) /// Verify that the client can request execution of a command in a pod's default container, with only the STDOUT stream enabled. /// /// A representing the asynchronous unit test. - [Fact(DisplayName = "Can exec in pod's default container, STDOUT only")] + [Fact(DisplayName = "Can exec in pod's default container, STDOUT only", Skip = "WebSocket transport removed in favor of HTTP/2")] public async Task ExecDefaultContainerStdOut() { if (!Debugger.IsAttached) From c44d3d0b1098117ce2b7203889697fb0b8462cf0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 26 Dec 2025 08:55:42 +0000 Subject: [PATCH 3/3] chore: refine http2 transport and tests Co-authored-by: tg123 <170430+tg123@users.noreply.github.com> --- src/KubernetesClient/Http2DuplexStream.cs | 184 +++++++++++++++--- src/KubernetesClient/Kubernetes.WebSocket.cs | 24 ++- .../KubernetesExecTests.cs | 16 +- 3 files changed, 187 insertions(+), 37 deletions(-) diff --git a/src/KubernetesClient/Http2DuplexStream.cs b/src/KubernetesClient/Http2DuplexStream.cs index f94529c31..ee02b6f13 100644 --- a/src/KubernetesClient/Http2DuplexStream.cs +++ b/src/KubernetesClient/Http2DuplexStream.cs @@ -1,21 +1,41 @@ +using System; +using System.Buffers; using System.Collections.Generic; +using System.IO; using System.Net; using System.Net.Http; using System.Net.WebSockets; using System.Threading; +using System.Threading.Tasks; namespace k8s { internal sealed class ProducerConsumerStream : Stream { - private readonly Queue queue = new Queue(); + private readonly struct BufferSegment + { + public BufferSegment(byte[] buffer, int length, bool rented) + { + Buffer = buffer; + Length = length; + Rented = rented; + } + + public byte[] Buffer { get; } + public int Length { get; } + public bool Rented { get; } + } + + private readonly Queue queue = new Queue(); private readonly SemaphoreSlim dataAvailable = new SemaphoreSlim(0); private readonly object gate = new object(); - private byte[] currentBuffer; + private BufferSegment currentSegment; + private bool hasSegment; private int currentOffset; private bool isCompleted; private bool disposed; + private const int BufferPoolThreshold = 1024; public override bool CanRead => !disposed; public override bool CanSeek => false; @@ -48,7 +68,7 @@ public override Task FlushAsync(CancellationToken cancellationToken) public override int Read(byte[] buffer, int offset, int count) { - return ReadAsync(buffer.AsMemory(offset, count)).GetAwaiter().GetResult(); + return ReadAsync(buffer.AsMemory(offset, count), CancellationToken.None).GetAwaiter().GetResult(); } public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) @@ -60,15 +80,21 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation while (true) { - if (currentBuffer != null && currentOffset < currentBuffer.Length) + if (hasSegment && currentOffset < currentSegment.Length) { - var toCopy = Math.Min(buffer.Length, currentBuffer.Length - currentOffset); - currentBuffer.AsMemory(currentOffset, toCopy).CopyTo(buffer); + var toCopy = Math.Min(buffer.Length, currentSegment.Length - currentOffset); + currentSegment.Buffer.AsMemory(currentOffset, toCopy).CopyTo(buffer); currentOffset += toCopy; - if (currentOffset >= currentBuffer.Length) + if (currentOffset >= currentSegment.Length) { - currentBuffer = null; + if (currentSegment.Rented) + { + ArrayPool.Shared.Return(currentSegment.Buffer); + } + + currentSegment = default; + hasSegment = false; currentOffset = 0; } @@ -81,7 +107,8 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation { if (queue.Count > 0) { - currentBuffer = queue.Dequeue(); + currentSegment = queue.Dequeue(); + hasSegment = true; currentOffset = 0; } else if (isCompleted) @@ -104,11 +131,16 @@ public override void SetLength(long value) public override void Write(byte[] buffer, int offset, int count) { - WriteAsync(buffer.AsMemory(offset, count)).GetAwaiter().GetResult(); + WriteAsync(buffer.AsMemory(offset, count), CancellationToken.None).GetAwaiter().GetResult(); } - public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { + if (cancellationToken.IsCancellationRequested) + { + return ValueTask.FromCanceled(cancellationToken); + } + if (disposed) { throw new ObjectDisposedException(nameof(ProducerConsumerStream)); @@ -119,15 +151,31 @@ public override async ValueTask WriteAsync(ReadOnlyMemory buffer, Cancella throw new InvalidOperationException("Stream already completed."); } - var copy = buffer.ToArray(); + var dataLength = buffer.Length; + var usePool = dataLength > BufferPoolThreshold; + var rented = usePool ? ArrayPool.Shared.Rent(dataLength) : new byte[dataLength]; + try + { + buffer.Span.CopyTo(rented.AsSpan(0, dataLength)); - lock (gate) + var segment = new BufferSegment(rented, dataLength, usePool); + + lock (gate) + { + queue.Enqueue(segment); + } + + dataAvailable.Release(); + return ValueTask.CompletedTask; + } + catch { - queue.Enqueue(copy); + if (usePool) + { + ArrayPool.Shared.Return(rented); + } + throw; } - - dataAvailable.Release(); - await Task.CompletedTask.ConfigureAwait(false); } protected override void Dispose(bool disposing) @@ -137,6 +185,26 @@ protected override void Dispose(bool disposing) return; } + lock (gate) + { + if (hasSegment && currentSegment.Rented) + { + ArrayPool.Shared.Return(currentSegment.Buffer); + } + + hasSegment = false; + currentSegment = default; + + while (queue.Count > 0) + { + var segment = queue.Dequeue(); + if (segment.Rented) + { + ArrayPool.Shared.Return(segment.Buffer); + } + } + } + disposed = true; Complete(); dataAvailable.Dispose(); @@ -175,24 +243,45 @@ internal sealed class Http2WebSocket : WebSocket private readonly ProducerConsumerStream requestStream; private readonly Stream responseStream; private readonly HttpResponseMessage response; + private readonly string subProtocol; + private readonly object closeGate = new object(); private WebSocketCloseStatus? closeStatus; private string closeStatusDescription; private WebSocketState state = WebSocketState.Open; - public Http2WebSocket(ProducerConsumerStream requestStream, Stream responseStream, HttpResponseMessage response) + public Http2WebSocket(ProducerConsumerStream requestStream, Stream responseStream, HttpResponseMessage response, string subProtocol) { this.requestStream = requestStream ?? throw new ArgumentNullException(nameof(requestStream)); this.responseStream = responseStream ?? throw new ArgumentNullException(nameof(responseStream)); this.response = response ?? throw new ArgumentNullException(nameof(response)); + this.subProtocol = subProtocol; } - public override WebSocketCloseStatus? CloseStatus => closeStatus; + public override WebSocketCloseStatus? CloseStatus + { + get + { + lock (closeGate) + { + return closeStatus; + } + } + } - public override string CloseStatusDescription => closeStatusDescription; + public override string CloseStatusDescription + { + get + { + lock (closeGate) + { + return closeStatusDescription; + } + } + } public override WebSocketState State => state; - public override string SubProtocol => null; + public override string SubProtocol => subProtocol; public override void Abort() { @@ -203,17 +292,48 @@ public override void Abort() public override async Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken) { - this.closeStatus ??= closeStatus; - closeStatusDescription ??= statusDescription; + lock (closeGate) + { + if (this.closeStatus == null) + { + this.closeStatus = closeStatus; + } + + if (this.closeStatusDescription == null) + { + this.closeStatusDescription = statusDescription; + } + } state = WebSocketState.Closed; requestStream.Complete(); - await responseStream.FlushAsync(cancellationToken).ConfigureAwait(false); - response.Dispose(); + try + { + await responseStream.FlushAsync(cancellationToken).ConfigureAwait(false); + } + finally + { + response.Dispose(); + } } public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken) { - return CloseAsync(closeStatus, statusDescription, cancellationToken); + lock (closeGate) + { + if (this.closeStatus == null) + { + this.closeStatus = closeStatus; + } + + if (this.closeStatusDescription == null) + { + this.closeStatusDescription = statusDescription; + } + } + + requestStream.Complete(); + state = WebSocketState.CloseSent; + return Task.CompletedTask; } public override void Dispose() @@ -224,18 +344,24 @@ public override void Dispose() } response.Dispose(); - responseStream.Dispose(); state = WebSocketState.Closed; base.Dispose(); } public override async Task ReceiveAsync(ArraySegment buffer, CancellationToken cancellationToken) { - var bytesRead = await responseStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + var memory = buffer.Array == null && buffer.Count == 0 ? Memory.Empty : buffer.AsMemory(); + var bytesRead = await responseStream.ReadAsync(memory, cancellationToken).ConfigureAwait(false); if (bytesRead == 0) { - closeStatus ??= WebSocketCloseStatus.NormalClosure; + lock (closeGate) + { + if (closeStatus == null) + { + closeStatus = WebSocketCloseStatus.NormalClosure; + } + } state = WebSocketState.CloseReceived; return new WebSocketReceiveResult(0, WebSocketMessageType.Close, true, closeStatus, closeStatusDescription); } diff --git a/src/KubernetesClient/Kubernetes.WebSocket.cs b/src/KubernetesClient/Kubernetes.WebSocket.cs index 2665749ff..3495babe7 100644 --- a/src/KubernetesClient/Kubernetes.WebSocket.cs +++ b/src/KubernetesClient/Kubernetes.WebSocket.cs @@ -1,4 +1,5 @@ using System.Globalization; +using System.IO; using System.Net; using System.Net.Http; using System.Net.WebSockets; @@ -250,6 +251,7 @@ protected async Task StreamConnectAsync(Uri uri, string webSocketSubP cancellationToken.ThrowIfCancellationRequested(); HttpResponseMessage response = null; + Stream responseStream = null; try { BeforeRequest(); @@ -281,17 +283,33 @@ protected async Task StreamConnectAsync(Uri uri, string webSocketSubP } #if NET5_0_OR_GREATER - var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); + responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); #else - var responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); + responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); #endif - var webSocket = new Http2WebSocket(requestStream, responseStream, response); + var webSocket = new Http2WebSocket(requestStream, responseStream, response, webSocketSubProtocol); + response = null; + responseStream = null; return webSocket; } + catch (Exception ex) + { + try + { + requestStream?.Complete(); + } + catch (Exception cleanupEx) + { + throw new AggregateException(ex, cleanupEx); + } + throw; + } finally { + responseStream?.Dispose(); + response?.Dispose(); AfterRequest(); } } diff --git a/tests/KubernetesClient.Tests/KubernetesExecTests.cs b/tests/KubernetesClient.Tests/KubernetesExecTests.cs index 65cb21517..1f69fcb17 100644 --- a/tests/KubernetesClient.Tests/KubernetesExecTests.cs +++ b/tests/KubernetesClient.Tests/KubernetesExecTests.cs @@ -2,12 +2,12 @@ * These tests are only for the netstandard version of the client (there are separate tests for netcoreapp that connect to a local test-hosted server). */ +using System; using System.Collections.Generic; using System.IO; +using System.Linq; using System.Net; -using System; using System.Net.Http; -using System.Linq; using System.Threading; using System.Threading.Tasks; using Xunit; @@ -77,7 +77,9 @@ public async Task WebSocketNamespacedPodExecAsync() "http://localhost/api/v1/namespaces/mynamespace/pods/mypod/exec?command=%2Fbin%2Fbash&command=-c&command=echo%20Hello%2C%20World%0Aexit%200%0A&container=mycontainer&stderr=1&stdin=1&stdout=1&tty=1"), handler.LastRequest.RequestUri); Assert.Equal(HttpVersion.Version20, handler.LastRequest.Version); - Assert.Empty(handler.LastRequest.Headers.GetValues("X-My-Header").Except(new[] { "myHeaderValue myHeaderValue2" })); + Assert.True(handler.LastRequest.Headers.TryGetValues("X-My-Header", out var execHeaderValues)); + Assert.Equal("myHeaderValue myHeaderValue2", execHeaderValues.Single()); + Assert.NotNull(handler.LastRequest.Headers.Authorization); Assert.Equal("Basic bXktdXNlcjpteS1zZWNyZXQtcGFzc3dvcmQ=", handler.LastRequest.Headers.Authorization.ToString()); } @@ -113,7 +115,9 @@ public async Task WebSocketNamespacedPodPortForwardAsync() new Uri("http://localhost/api/v1/namespaces/mynamespace/pods/mypod/portforward?ports=80&ports=8080"), handler.LastRequest.RequestUri); // Did we connect to the correct URL? Assert.Equal(HttpVersion.Version20, handler.LastRequest.Version); - Assert.Empty(handler.LastRequest.Headers.GetValues("X-My-Header").Except(new[] { "myHeaderValue myHeaderValue2" })); + Assert.True(handler.LastRequest.Headers.TryGetValues("X-My-Header", out var portForwardHeaderValues)); + Assert.Equal("myHeaderValue myHeaderValue2", portForwardHeaderValues.Single()); + Assert.NotNull(handler.LastRequest.Headers.Authorization); Assert.Equal(expectedHeaders["Authorization"], handler.LastRequest.Headers.Authorization.ToString()); } @@ -157,7 +161,9 @@ public async Task WebSocketNamespacedPodAttachAsync() "http://localhost:80/api/v1/namespaces/mynamespace/pods/mypod/attach?stderr=1&stdin=1&stdout=1&tty=1&container=my-container"), handler.LastRequest.RequestUri); // Did we connect to the correct URL? Assert.Equal(HttpVersion.Version20, handler.LastRequest.Version); - Assert.Empty(handler.LastRequest.Headers.GetValues("X-My-Header").Except(new[] { "myHeaderValue myHeaderValue2" })); + Assert.True(handler.LastRequest.Headers.TryGetValues("X-My-Header", out var attachHeaderValues)); + Assert.Equal("myHeaderValue myHeaderValue2", attachHeaderValues.Single()); + Assert.NotNull(handler.LastRequest.Headers.Authorization); Assert.Equal(expectedHeaders["Authorization"], handler.LastRequest.Headers.Authorization.ToString()); } }