From 3f8cb7ba57f5c49e256d6800276ecccb6b59ed5f Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Sat, 3 Feb 2024 13:58:00 -0500 Subject: [PATCH 1/2] publish - avoid async state machine when possible Signed-off-by: Caleb Lloyd --- .../Commands/CommandWriter.cs | 662 +++++++++++++----- src/NATS.Client.Core/NatsOpts.cs | 4 +- .../CancellationTest.cs | 4 +- 3 files changed, 487 insertions(+), 183 deletions(-) diff --git a/src/NATS.Client.Core/Commands/CommandWriter.cs b/src/NATS.Client.Core/Commands/CommandWriter.cs index 158eb874b..5c726b528 100644 --- a/src/NATS.Client.Core/Commands/CommandWriter.cs +++ b/src/NATS.Client.Core/Commands/CommandWriter.cs @@ -18,8 +18,13 @@ namespace NATS.Client.Core.Commands; /// internal sealed class CommandWriter : IAsyncDisposable { + // memory segment used to consolidate multiple small memory chunks + // 8520 should fit into 6 packets on 1500 MTU TLS connection or 1 packet on 9000 MTU TLS connection + // assuming 40 bytes TCP overhead + 40 bytes TLS overhead per packet + private const int SendMemSize = 8520; + // set to a reasonable socket write mem size - private const int MaxSendSize = 16384; + private const int MinSegmentSize = 65536; private readonly ILogger _logger; private readonly NatsConnection _connection; @@ -28,6 +33,7 @@ internal sealed class CommandWriter : IAsyncDisposable private readonly object _lock = new(); private readonly CancellationTokenSource _cts; private readonly ConnectionStatsCounter _counter; + private readonly Memory _consolidateMem = new byte[SendMemSize].AsMemory(); private readonly TimeSpan _defaultCommandTimeout; private readonly Action _enqueuePing; private readonly ProtocolWriter _protocolWriter; @@ -37,6 +43,7 @@ internal sealed class CommandWriter : IAsyncDisposable private readonly CancellationTimerPool _ctPool; private readonly PipeReader _pipeReader; private readonly PipeWriter _pipeWriter; + private readonly SemaphoreSlim _semLock = new(1); private ISocketConnection? _socketConnection; private Task? _flushTask; private Task? _readerLoopTask; @@ -65,6 +72,7 @@ public CommandWriter(NatsConnection connection, ObjectPool pool, NatsOpts opts, var pipe = new Pipe(new PipeOptions( pauseWriterThreshold: opts.WriterBufferSize, // flush will block after hitting resumeWriterThreshold: opts.WriterBufferSize / 2, + minimumSegmentSize: MinSegmentSize, useSynchronizationContext: false)); _pipeReader = pipe.Reader; _pipeWriter = pipe.Writer; @@ -86,7 +94,14 @@ public void Reset(ISocketConnection socketConnection) _readerLoopTask = Task.Run(async () => { - await ReaderLoopAsync(_logger, _socketConnection, _pipeReader, _channelSize, _ctsReader.Token).ConfigureAwait(false); + await ReaderLoopAsync( + _logger, + _socketConnection, + _pipeReader, + _channelSize, + _consolidateMem, + _ctsReader.Token) + .ConfigureAwait(false); }); } } @@ -143,10 +158,22 @@ public async ValueTask DisposeAsync() await readerTask.ConfigureAwait(false); } - public async ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancellationToken) + public ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancellationToken) { - var cancellationTimer = _ctPool.Start(_defaultCommandTimeout, cancellationToken); - await LockAsync(cancellationTimer.Token).ConfigureAwait(false); +#pragma warning disable CA2016 +#pragma warning disable VSTHRD103 + if (!_semLock.Wait(0)) +#pragma warning restore VSTHRD103 +#pragma warning restore CA2016 + { + return ConnectStateMachineAsync(false, connectOpts, cancellationToken); + } + + if (_flushTask is { IsCompletedSuccessfully: false }) + { + return ConnectStateMachineAsync(true, connectOpts, cancellationToken); + } + try { if (_disposed) @@ -154,28 +181,33 @@ public async ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken ca throw new ObjectDisposedException(nameof(CommandWriter)); } - if (_flushTask is { IsCompletedSuccessfully: false }) - { - await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); - } - _protocolWriter.WriteConnect(_pipeWriter, connectOpts); - - _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); - var flush = _pipeWriter.FlushAsync(CancellationToken.None); - _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); + EnqueueCommand(); } finally { - await UnLockAsync().ConfigureAwait(false); - cancellationTimer.TryReturn(); + _semLock.Release(); } + + return ValueTask.CompletedTask; } - public async ValueTask PingAsync(PingCommand pingCommand, CancellationToken cancellationToken) + public ValueTask PingAsync(PingCommand pingCommand, CancellationToken cancellationToken) { - var cancellationTimer = _ctPool.Start(_defaultCommandTimeout, cancellationToken); - await LockAsync(cancellationTimer.Token).ConfigureAwait(false); +#pragma warning disable CA2016 +#pragma warning disable VSTHRD103 + if (!_semLock.Wait(0)) +#pragma warning restore VSTHRD103 +#pragma warning restore CA2016 + { + return PingStateMachineAsync(false, pingCommand, cancellationToken); + } + + if (_flushTask is { IsCompletedSuccessfully: false }) + { + return PingStateMachineAsync(true, pingCommand, cancellationToken); + } + try { if (_disposed) @@ -183,29 +215,34 @@ public async ValueTask PingAsync(PingCommand pingCommand, CancellationToken canc throw new ObjectDisposedException(nameof(CommandWriter)); } - if (_flushTask is { IsCompletedSuccessfully: false }) - { - await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); - } - - _enqueuePing(pingCommand); _protocolWriter.WritePing(_pipeWriter); - - _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); - var flush = _pipeWriter.FlushAsync(CancellationToken.None); - _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); + _enqueuePing(pingCommand); + EnqueueCommand(); } finally { - await UnLockAsync().ConfigureAwait(false); - cancellationTimer.TryReturn(); + _semLock.Release(); } + + return ValueTask.CompletedTask; } - public async ValueTask PongAsync(CancellationToken cancellationToken = default) + public ValueTask PongAsync(CancellationToken cancellationToken = default) { - var cancellationTimer = _ctPool.Start(_defaultCommandTimeout, cancellationToken); - await LockAsync(cancellationTimer.Token).ConfigureAwait(false); +#pragma warning disable CA2016 +#pragma warning disable VSTHRD103 + if (!_semLock.Wait(0)) +#pragma warning restore VSTHRD103 +#pragma warning restore CA2016 + { + return PongStateMachineAsync(false, cancellationToken); + } + + if (_flushTask is { IsCompletedSuccessfully: false }) + { + return PongStateMachineAsync(true, cancellationToken); + } + try { if (_disposed) @@ -213,22 +250,15 @@ public async ValueTask PongAsync(CancellationToken cancellationToken = default) throw new ObjectDisposedException(nameof(CommandWriter)); } - if (_flushTask is { IsCompletedSuccessfully: false }) - { - await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); - } - _protocolWriter.WritePong(_pipeWriter); - - _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); - var flush = _pipeWriter.FlushAsync(CancellationToken.None); - _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); + EnqueueCommand(); } finally { - await UnLockAsync().ConfigureAwait(false); - cancellationTimer.TryReturn(); + _semLock.Release(); } + + return ValueTask.CompletedTask; } public ValueTask PublishAsync(string subject, T? value, NatsHeaders? headers, string? replyTo, INatsSerialize serializer, CancellationToken cancellationToken) @@ -238,28 +268,54 @@ public ValueTask PublishAsync(string subject, T? value, NatsHeaders? headers, { if (!_pool.TryRent(out headersBuffer)) headersBuffer = new NatsPooledBufferWriter(_arrayPoolInitialSize); - _headerWriter.Write(headersBuffer, headers); } NatsPooledBufferWriter payloadBuffer; if (!_pool.TryRent(out payloadBuffer!)) payloadBuffer = new NatsPooledBufferWriter(_arrayPoolInitialSize); - if (value != null) - serializer.Serialize(payloadBuffer, value); - var size = payloadBuffer.WrittenMemory.Length + (headersBuffer?.WrittenMemory.Length ?? 0); - if (_connection.ServerInfo is { } info && size > info.MaxPayload) + try { - ThrowOnMaxPayload(size, info.MaxPayload); + if (headers != null) + _headerWriter.Write(headersBuffer!, headers); + + if (value != null) + serializer.Serialize(payloadBuffer, value); + + var size = payloadBuffer.WrittenMemory.Length + (headersBuffer?.WrittenMemory.Length ?? 0); + if (_connection.ServerInfo is { } info && size > info.MaxPayload) + { + throw new NatsException($"Payload size {size} exceeds server's maximum payload size {info.MaxPayload}"); + } } + catch + { + payloadBuffer.Reset(); + _pool.Return(payloadBuffer); - return PublishLockedAsync(subject, replyTo, payloadBuffer, headersBuffer, cancellationToken); - } + if (headersBuffer != null) + { + headersBuffer.Reset(); + _pool.Return(headersBuffer); + } + + throw; + } + +#pragma warning disable CA2016 +#pragma warning disable VSTHRD103 + if (!_semLock.Wait(0)) +#pragma warning restore VSTHRD103 +#pragma warning restore CA2016 + { + return PublishStateMachineAsync(false, subject, replyTo, headersBuffer, payloadBuffer, cancellationToken); + } + + if (_flushTask is { IsCompletedSuccessfully: false }) + { + return PublishStateMachineAsync(true, subject, replyTo, headersBuffer, payloadBuffer, cancellationToken); + } - public async ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken) - { - var cancellationTimer = _ctPool.Start(_defaultCommandTimeout, cancellationToken); - await LockAsync(cancellationTimer.Token).ConfigureAwait(false); try { if (_disposed) @@ -267,28 +323,42 @@ public async ValueTask SubscribeAsync(int sid, string subject, string? queueGrou throw new ObjectDisposedException(nameof(CommandWriter)); } - if (_flushTask is { IsCompletedSuccessfully: false }) + _protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headersBuffer?.WrittenMemory, payloadBuffer.WrittenMemory); + EnqueueCommand(); + } + finally + { + payloadBuffer.Reset(); + _pool.Return(payloadBuffer); + + if (headersBuffer != null) { - await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); + headersBuffer.Reset(); + _pool.Return(headersBuffer); } - _protocolWriter.WriteSubscribe(_pipeWriter, sid, subject, queueGroup, maxMsgs); + _semLock.Release(); + } + + return ValueTask.CompletedTask; + } - _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); - var flush = _pipeWriter.FlushAsync(CancellationToken.None); - _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); + public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken) + { +#pragma warning disable CA2016 +#pragma warning disable VSTHRD103 + if (!_semLock.Wait(0)) +#pragma warning restore VSTHRD103 +#pragma warning restore CA2016 + { + return SubscribeStateMachineAsync(false, sid, subject, queueGroup, maxMsgs, cancellationToken); } - finally + + if (_flushTask is { IsCompletedSuccessfully: false }) { - await UnLockAsync().ConfigureAwait(false); - cancellationTimer.TryReturn(); + return SubscribeStateMachineAsync(true, sid, subject, queueGroup, maxMsgs, cancellationToken); } - } - public async ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken cancellationToken) - { - var cancellationTimer = _ctPool.Start(_defaultCommandTimeout, cancellationToken); - await LockAsync(cancellationTimer.Token).ConfigureAwait(false); try { if (_disposed) @@ -296,35 +366,83 @@ public async ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken throw new ObjectDisposedException(nameof(CommandWriter)); } - if (_flushTask is { IsCompletedSuccessfully: false }) + _protocolWriter.WriteSubscribe(_pipeWriter, sid, subject, queueGroup, maxMsgs); + EnqueueCommand(); + } + finally + { + _semLock.Release(); + } + + return ValueTask.CompletedTask; + } + + public ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken cancellationToken) + { +#pragma warning disable CA2016 +#pragma warning disable VSTHRD103 + if (!_semLock.Wait(0)) +#pragma warning restore VSTHRD103 +#pragma warning restore CA2016 + { + return UnsubscribeStateMachineAsync(false, sid, maxMsgs, cancellationToken); + } + + if (_flushTask is { IsCompletedSuccessfully: false }) + { + return UnsubscribeStateMachineAsync(true, sid, maxMsgs, cancellationToken); + } + + try + { + if (_disposed) { - await _flushTask.WaitAsync(cancellationTimer.Token).ConfigureAwait(false); + throw new ObjectDisposedException(nameof(CommandWriter)); } _protocolWriter.WriteUnsubscribe(_pipeWriter, sid, maxMsgs); - - _channelSize.Writer.TryWrite((int)_pipeWriter.UnflushedBytes); - var flush = _pipeWriter.FlushAsync(CancellationToken.None); - _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); + EnqueueCommand(); } finally { - await UnLockAsync().ConfigureAwait(false); - cancellationTimer.TryReturn(); + _semLock.Release(); } + + return ValueTask.CompletedTask; } // only used for internal testing - internal bool TestStallFlush() => _channelLock.Writer.TryWrite(1); + internal async Task TestStallFlushAsync(TimeSpan timeSpan) + { + await _semLock.WaitAsync().ConfigureAwait(false); - [MethodImpl(MethodImplOptions.NoInlining)] - private static void ThrowOnMaxPayload(int size, int max) => throw new NatsException($"Payload size {size} exceeds server's maximum payload size {max}"); + try + { + if (_flushTask is { IsCompletedSuccessfully: false }) + { + await _flushTask.ConfigureAwait(false); + } - private static async Task ReaderLoopAsync(ILogger logger, ISocketConnection connection, PipeReader pipeReader, Channel channelSize, CancellationToken cancellationToken) + _flushTask = Task.Delay(timeSpan); + } + finally + { + _semLock.Release(); + } + } + + private static async Task ReaderLoopAsync( + ILogger logger, + ISocketConnection connection, + PipeReader pipeReader, + Channel channelSize, + Memory consolidateMem, + CancellationToken cancellationToken) { try { var examinedOffset = 0; + var pending = 0; while (true) { var result = await pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false); @@ -337,90 +455,85 @@ private static async Task ReaderLoopAsync(ILogger logger, ISocket var buffer = result.Buffer; var consumed = buffer.Start; var examined = buffer.GetPosition(examinedOffset); - var readBuffer = buffer.Slice(examinedOffset); + buffer = result.Buffer.Slice(examinedOffset); try { - if (!buffer.IsEmpty && !readBuffer.IsEmpty) + while (!buffer.IsEmpty) { - var bufferLength = (int)readBuffer.Length; - - var bytes = ArrayPool.Shared.Rent(bufferLength); - readBuffer.CopyTo(bytes); - var memory = bytes.AsMemory(0, bufferLength); + var sendMem = buffer.First; + if (sendMem.Length > SendMemSize) + { + sendMem = sendMem[..SendMemSize]; + } + else if (sendMem.Length < SendMemSize && buffer.Length > sendMem.Length) + { + var consolidateLen = Math.Min(SendMemSize, (int)buffer.Length); + buffer.Slice(0, consolidateLen).CopyTo(consolidateMem.Span); + sendMem = consolidateMem[..consolidateLen]; + } + int sent; + Exception? sendEx = null; try { - var totalSent = 0; - var totalSize = 0; - while (totalSent < bufferLength) - { - var sendMemory = memory; - if (sendMemory.Length > MaxSendSize) - { - // cap the send size, the OS can only handle so much in a send buffer at a time - // also if the send fails, we have to throw this many bytes away - sendMemory = memory[..MaxSendSize]; - } + sent = await connection.SendAsync(sendMem).ConfigureAwait(false); + } + catch (Exception ex) + { + // we have no idea how many bytes were actually sent, so we have to assume they all were + // this could result in message loss, but is consistent with at-most once delivery + sendEx = ex; + sent = sendMem.Length; + } - int sent; - Exception? sendEx = null; - try - { - sent = await connection.SendAsync(sendMemory).ConfigureAwait(false); - } - catch (Exception ex) + var totalSize = 0; + while (totalSize < sent) + { + if (pending == 0) + { + while (!channelSize.Reader.TryPeek(out pending)) { - // we have no idea how many bytes were actually sent, so we have to assume they all were - // this could result in message loss, but is consistent with at-most once delivery - sendEx = ex; - sent = sendMemory.Length; + // should never happen; channel sizes are written before flush is called + await channelSize.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false); } + } - totalSent += sent; - memory = memory[sent..]; + // don't mark the message as complete if we have more data to send + if (totalSize + pending > sent) + { + pending += totalSize - sent; + break; + } - while (totalSize < totalSent) - { - int peek; - while (!channelSize.Reader.TryPeek(out peek)) - { - // should never happen; channel sizes are written before flush is called - await channelSize.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false); - } - - // Don't just mark the message as complete if we have more data to send - if (totalSize + peek > totalSent) - { - break; - } - - int size; - while (!channelSize.Reader.TryRead(out size)) - { - // should never happen; channel sizes are written before flush is called (plus we just peeked) - await channelSize.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false); - } - - totalSize += size; - examinedOffset = 0; - } + while (!channelSize.Reader.TryRead(out _)) + { + // should never happen; channel sizes are written before flush is called + await channelSize.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false); + } - // make sure to mark the buffer only at message boundaries. - consumed = buffer.GetPosition(totalSize); - examined = buffer.GetPosition(totalSent); - examinedOffset += totalSent - totalSize; + totalSize += pending; + examinedOffset = 0; + pending = 0; + } - // throw if there was a send failure - if (sendEx != null) - { - throw sendEx; - } - } + // only mark bytes as consumed if a full command was sent + if (totalSize > 0) + { + consumed = buffer.GetPosition(totalSize); } - finally + + // mark sent bytes as examined + examined = buffer.GetPosition(sent); + examinedOffset += sent - totalSize; + + // slice the buffer for next iteration + buffer = buffer.Slice(sent); + + // throw if there was a send failure + if (sendEx != null) { - ArrayPool.Shared.Return(bytes); + throw sendEx; } } } @@ -450,23 +563,179 @@ private static async Task ReaderLoopAsync(ILogger logger, ISocket } } - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - private async ValueTask PublishLockedAsync(string subject, string? replyTo, NatsPooledBufferWriter payloadBuffer, NatsPooledBufferWriter? headersBuffer, CancellationToken cancellationToken) + /// + /// Enqueues a command, and kicks off a flush + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void EnqueueCommand() { - var cancellationTimer = _ctPool.Start(_defaultCommandTimeout, cancellationToken); - await LockAsync(cancellationTimer.Token).ConfigureAwait(false); + var size = (int)_pipeWriter.UnflushedBytes; + if (size == 0) + { + // no unflushed bytes means no command was produced + _flushTask = null; + return; + } + + Interlocked.Add(ref _counter.PendingMessages, 1); + + _channelSize.Writer.TryWrite(size); + var flush = _pipeWriter.FlushAsync(); + _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); + } + + private async ValueTask ConnectStateMachineAsync(bool lockHeld, ClientOpts connectOpts, CancellationToken cancellationToken) + { + if (!lockHeld) + { + if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false)) + { + throw new OperationCanceledException(); + } + } + try { - var payload = payloadBuffer.WrittenMemory; - var headers = headersBuffer?.WrittenMemory; + if (_disposed) + { + throw new ObjectDisposedException(nameof(CommandWriter)); + } + + if (_flushTask is { IsCompletedSuccessfully: false }) + { + await _flushTask.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false); + } + + _protocolWriter.WriteConnect(_pipeWriter, connectOpts); + EnqueueCommand(); + } + catch (TimeoutException) + { + // WaitAsync throws a TimeoutException when the TimeSpan is exceeded + // standardize to an OperationCanceledException as if a cancellationToken was used + throw new OperationCanceledException(); + } + finally + { + _semLock.Release(); + } + } + private async ValueTask PingStateMachineAsync(bool lockHeld, PingCommand pingCommand, CancellationToken cancellationToken) + { + if (!lockHeld) + { + if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false)) + { + throw new OperationCanceledException(); + } + } + + try + { if (_disposed) { throw new ObjectDisposedException(nameof(CommandWriter)); } - _protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headers, payload); + if (_flushTask is { IsCompletedSuccessfully: false }) + { + await _flushTask.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false); + } + _protocolWriter.WritePing(_pipeWriter); + _enqueuePing(pingCommand); + EnqueueCommand(); + } + catch (TimeoutException) + { + // WaitAsync throws a TimeoutException when the TimeSpan is exceeded + // standardize to an OperationCanceledException as if a cancellationToken was used + throw new OperationCanceledException(); + } + finally + { + _semLock.Release(); + } + } + + private async ValueTask PongStateMachineAsync(bool lockHeld, CancellationToken cancellationToken) + { + if (!lockHeld) + { + if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false)) + { + throw new OperationCanceledException(); + } + } + + try + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(CommandWriter)); + } + + if (_flushTask is { IsCompletedSuccessfully: false }) + { + await _flushTask.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false); + } + + _protocolWriter.WritePong(_pipeWriter); + EnqueueCommand(); + } + catch (TimeoutException) + { + // WaitAsync throws a TimeoutException when the TimeSpan is exceeded + // standardize to an OperationCanceledException as if a cancellationToken was used + throw new OperationCanceledException(); + } + finally + { + _semLock.Release(); + } + } + + private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject, string? replyTo, NatsPooledBufferWriter? headersBuffer, NatsPooledBufferWriter payloadBuffer, CancellationToken cancellationToken) + { + try + { + if (!lockHeld) + { + if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false)) + { + throw new OperationCanceledException(); + } + } + + try + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(CommandWriter)); + } + + if (_flushTask is { IsCompletedSuccessfully: false }) + { + await _flushTask.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false); + } + + _protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headersBuffer?.WrittenMemory, payloadBuffer.WrittenMemory); + EnqueueCommand(); + } + catch (TimeoutException) + { + // WaitAsync throws a TimeoutException when the TimeSpan is exceeded + // standardize to an OperationCanceledException as if a cancellationToken was used + throw new OperationCanceledException(); + } + finally + { + _semLock.Release(); + } + } + finally + { payloadBuffer.Reset(); _pool.Return(payloadBuffer); @@ -475,45 +744,80 @@ private async ValueTask PublishLockedAsync(string subject, string? replyTo, Nats headersBuffer.Reset(); _pool.Return(headersBuffer); } + } + } - var size = (int)_pipeWriter.UnflushedBytes; - _channelSize.Writer.TryWrite(size); - - var result = await _pipeWriter.FlushAsync(cancellationTimer.Token).ConfigureAwait(false); - if (result.IsCanceled) + private async ValueTask SubscribeStateMachineAsync(bool lockHeld, int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken) + { + if (!lockHeld) + { + if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false)) { throw new OperationCanceledException(); } } + + try + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(CommandWriter)); + } + + if (_flushTask is { IsCompletedSuccessfully: false }) + { + await _flushTask.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false); + } + + _protocolWriter.WriteSubscribe(_pipeWriter, sid, subject, queueGroup, maxMsgs); + EnqueueCommand(); + } + catch (TimeoutException) + { + // WaitAsync throws a TimeoutException when the TimeSpan is exceeded + // standardize to an OperationCanceledException as if a cancellationToken was used + throw new OperationCanceledException(); + } finally { - await UnLockAsync().ConfigureAwait(false); - cancellationTimer.TryReturn(); + _semLock.Release(); } } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private async ValueTask LockAsync(CancellationToken cancellationToken) + private async ValueTask UnsubscribeStateMachineAsync(bool lockHeld, int sid, int? maxMsgs, CancellationToken cancellationToken) { - Interlocked.Increment(ref _counter.PendingMessages); + if (!lockHeld) + { + if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false)) + { + throw new OperationCanceledException(); + } + } + try { - await _channelLock.Writer.WriteAsync(1, cancellationToken).ConfigureAwait(false); + if (_disposed) + { + throw new ObjectDisposedException(nameof(CommandWriter)); + } + + if (_flushTask is { IsCompletedSuccessfully: false }) + { + await _flushTask.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false); + } + + _protocolWriter.WriteUnsubscribe(_pipeWriter, sid, maxMsgs); + EnqueueCommand(); } - catch (TaskCanceledException) + catch (TimeoutException) { + // WaitAsync throws a TimeoutException when the TimeSpan is exceeded + // standardize to an OperationCanceledException as if a cancellationToken was used throw new OperationCanceledException(); } - catch (ChannelClosedException) + finally { - throw new OperationCanceledException(); + _semLock.Release(); } } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private ValueTask UnLockAsync() - { - Interlocked.Decrement(ref _counter.PendingMessages); - return _channelLock.Reader.ReadAsync(_cts.Token); - } } diff --git a/src/NATS.Client.Core/NatsOpts.cs b/src/NATS.Client.Core/NatsOpts.cs index dc7222403..4056427e8 100644 --- a/src/NATS.Client.Core/NatsOpts.cs +++ b/src/NATS.Client.Core/NatsOpts.cs @@ -31,9 +31,7 @@ public sealed record NatsOpts public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance; - // Same as default pipelines pause writer size - // Performing better compared to nats bench on localhost - public int WriterBufferSize { get; init; } = 65536; + public int WriterBufferSize { get; init; } = 1048576; public int ReaderBufferSize { get; init; } = 1048576; diff --git a/tests/NATS.Client.Core.Tests/CancellationTest.cs b/tests/NATS.Client.Core.Tests/CancellationTest.cs index 732c07f84..cba0bad21 100644 --- a/tests/NATS.Client.Core.Tests/CancellationTest.cs +++ b/tests/NATS.Client.Core.Tests/CancellationTest.cs @@ -19,7 +19,7 @@ public async Task CommandTimeoutTest() var cancellationToken = cts.Token; // stall the flush task - Assert.True(conn.CommandWriter.TestStallFlush()); + var stallTask = conn.CommandWriter.TestStallFlushAsync(TimeSpan.FromSeconds(1)); // commands that call ConnectAsync throw OperationCanceledException await Assert.ThrowsAsync(() => conn.PingAsync(cancellationToken).AsTask()); @@ -30,6 +30,8 @@ await Assert.ThrowsAsync(async () => { } }); + + await stallTask; } // check that cancellation works on commands that call ConnectAsync From 3e24a07febf24aa9f02de8e85d811a2792453c64 Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Thu, 8 Feb 2024 00:17:48 -0500 Subject: [PATCH 2/2] release lock before returning buffers Signed-off-by: Caleb Lloyd --- src/NATS.Client.Core/Commands/CommandWriter.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NATS.Client.Core/Commands/CommandWriter.cs b/src/NATS.Client.Core/Commands/CommandWriter.cs index 5c726b528..501a761ae 100644 --- a/src/NATS.Client.Core/Commands/CommandWriter.cs +++ b/src/NATS.Client.Core/Commands/CommandWriter.cs @@ -328,6 +328,8 @@ public ValueTask PublishAsync(string subject, T? value, NatsHeaders? headers, } finally { + _semLock.Release(); + payloadBuffer.Reset(); _pool.Return(payloadBuffer); @@ -336,8 +338,6 @@ public ValueTask PublishAsync(string subject, T? value, NatsHeaders? headers, headersBuffer.Reset(); _pool.Return(headersBuffer); } - - _semLock.Release(); } return ValueTask.CompletedTask;