diff --git a/src/NATS.Client.Core/Commands/CommandWriter.cs b/src/NATS.Client.Core/Commands/CommandWriter.cs index ff77e0e89..1c6794bfb 100644 --- a/src/NATS.Client.Core/Commands/CommandWriter.cs +++ b/src/NATS.Client.Core/Commands/CommandWriter.cs @@ -37,7 +37,6 @@ internal sealed class CommandWriter : IAsyncDisposable private readonly int _arrayPoolInitialSize; private readonly object _lock = new(); private readonly CancellationTokenSource _cts; - private readonly ConnectionStatsCounter _counter; private readonly Memory _consolidateMem = new byte[SendMemSize].AsMemory(); private readonly TimeSpan _defaultCommandTimeout; private readonly Action _enqueuePing; @@ -55,7 +54,7 @@ internal sealed class CommandWriter : IAsyncDisposable private CancellationTokenSource? _ctsReader; private volatile bool _disposed; - public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, ConnectionStatsCounter counter, Action enqueuePing, TimeSpan? overrideCommandTimeout = default) + public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, Action enqueuePing, TimeSpan? overrideCommandTimeout = default) { _logger = opts.LoggerFactory.CreateLogger(); _trace = _logger.IsEnabled(LogLevel.Trace); @@ -67,7 +66,6 @@ public CommandWriter(string name, NatsConnection connection, ObjectPool pool, Na // avoid defining another option. _arrayPoolInitialSize = opts.WriterBufferSize / 256; - _counter = counter; _defaultCommandTimeout = overrideCommandTimeout ?? opts.CommandTimeout; _enqueuePing = enqueuePing; _protocolWriter = new ProtocolWriter(opts.SubjectEncoding); @@ -693,11 +691,20 @@ private void EnqueueCommand() return; } - Interlocked.Add(ref _counter.PendingMessages, 1); + NatsMetrics.AddPendingMessages(1); _channelSize.Writer.TryWrite(size); var flush = _pipeWriter.FlushAsync(); - _flushTask = flush.IsCompletedSuccessfully ? null : flush.AsTask(); + + if (flush.IsCompletedSuccessfully) + { + _flushTask = null; + NatsMetrics.AddPendingMessages(-1); + } + else + { + _flushTask = flush.AsTask(); + } } private async ValueTask ConnectStateMachineAsync(bool lockHeld, ClientOpts connectOpts, CancellationToken cancellationToken) diff --git a/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs b/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs index 627c9afc9..a1c5393f9 100644 --- a/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs +++ b/src/NATS.Client.Core/Commands/PriorityCommandWriter.cs @@ -6,9 +6,9 @@ internal sealed class PriorityCommandWriter : IAsyncDisposable { private int _disposed; - public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, ConnectionStatsCounter counter, Action enqueuePing) + public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, Action enqueuePing) { - CommandWriter = new CommandWriter("init", connection, pool, opts, counter, enqueuePing); + CommandWriter = new CommandWriter("init", connection, pool, opts, enqueuePing); CommandWriter.Reset(socketConnection); } diff --git a/src/NATS.Client.Core/Internal/NatsMetrics.cs b/src/NATS.Client.Core/Internal/NatsMetrics.cs new file mode 100644 index 000000000..a502033b1 --- /dev/null +++ b/src/NATS.Client.Core/Internal/NatsMetrics.cs @@ -0,0 +1,94 @@ +using System.Diagnostics.Metrics; + +namespace NATS.Client.Core.Internal; + +public class NatsMetrics +{ + public const string MeterName = "NATS.Client"; + public const string PendingMessagesInstrumentName = $"{InstrumentPrefix}.pending.messages"; + public const string SentBytesInstrumentName = $"{InstrumentPrefix}.sent.bytes"; + public const string ReceivedBytesInstrumentName = $"{InstrumentPrefix}.received.bytes"; + public const string SentMessagesInstrumentName = $"{InstrumentPrefix}.sent.messages"; + public const string ReceivedMessagesInstrumentName = $"{InstrumentPrefix}.received.messages"; + public const string SubscriptionInstrumentName = $"{InstrumentPrefix}.subscription.count"; + + private const string InstrumentPrefix = "nats.client"; + + private static readonly Meter _meter; + private static readonly Counter _subscriptionCounter; + private static readonly Counter _pendingMessagesCounter; + private static readonly Counter _sentBytesCounter; + private static readonly Counter _receivedBytesCounter; + private static readonly Counter _sentMessagesCounter; + private static readonly Counter _receivedMessagesCounter; + + static NatsMetrics() + { + _meter = new Meter(MeterName); + + _subscriptionCounter = _meter.CreateCounter( + SubscriptionInstrumentName, + unit: "{subscriptions}", + description: "Number of subscriptions"); + + _pendingMessagesCounter = _meter.CreateCounter( + PendingMessagesInstrumentName, + unit: "{messages}", + description: "Number of pending messages"); + + _sentBytesCounter = _meter.CreateCounter( + SentBytesInstrumentName, + unit: "{bytes}", + description: "Number of bytes sent"); + + _receivedBytesCounter = _meter.CreateCounter( + ReceivedBytesInstrumentName, + unit: "{bytes}", + description: "Number of bytes received"); + + _sentMessagesCounter = _meter.CreateCounter( + SentMessagesInstrumentName, + unit: "{messages}", + description: "Number of messages sent"); + + _receivedMessagesCounter = _meter.CreateCounter( + ReceivedMessagesInstrumentName, + unit: "{messages}", + description: "Number of messages received"); + } + + public static void IncrementSubscriptionCount() + { + _subscriptionCounter.Add(1); + } + + public static void DecrementSubscriptionCount() + { + _subscriptionCounter.Add(-1); + } + + public static void AddPendingMessages(long messages) + { + _pendingMessagesCounter.Add(messages); + } + + public static void AddSentBytes(long bytes) + { + _sentBytesCounter.Add(bytes); + } + + public static void AddReceivedBytes(long bytes) + { + _receivedBytesCounter.Add(bytes); + } + + public static void AddSentMessages(long messages) + { + _sentMessagesCounter.Add(messages); + } + + public static void AddReceivedMessages(long messages) + { + _receivedMessagesCounter.Add(messages); + } +} diff --git a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs index c0d426d15..f278a2f03 100644 --- a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs @@ -33,7 +33,7 @@ public NatsReadProtocolProcessor(ISocketConnection socketConnection, NatsConnect _waitForPongOrErrorSignal = waitForPongOrErrorSignal; _infoParsed = infoParsed; _pingCommands = new ConcurrentQueue(); - _socketReader = new SocketReader(socketConnection, connection.Opts.ReaderBufferSize, connection.Counter, connection.Opts.LoggerFactory); + _socketReader = new SocketReader(socketConnection, connection.Opts.ReaderBufferSize, connection.Opts.LoggerFactory); _readLoop = Task.Run(ReadLoopAsync); } @@ -156,7 +156,7 @@ private async Task ReadLoopAsync() code = GetCode(buffer); } - Interlocked.Increment(ref _connection.Counter.ReceivedMessages); + NatsMetrics.AddReceivedMessages(1); // Optimize for Msg parsing, Inline async code if (code == ServerOpCodes.Msg) @@ -443,7 +443,7 @@ private async ValueTask> DispatchCommandAsync(int code, R { // reaches invalid line, log warn and try to get newline and go to nextloop. _logger.LogWarning(NatsLogEvents.Protocol, "Reached invalid line"); - Interlocked.Decrement(ref _connection.Counter.ReceivedMessages); + NatsMetrics.AddReceivedMessages(-1); var position = buffer.PositionOf((byte)'\n'); if (position == null) diff --git a/src/NATS.Client.Core/Internal/SocketReader.cs b/src/NATS.Client.Core/Internal/SocketReader.cs index 2aa1feb76..bd48da542 100644 --- a/src/NATS.Client.Core/Internal/SocketReader.cs +++ b/src/NATS.Client.Core/Internal/SocketReader.cs @@ -9,7 +9,6 @@ namespace NATS.Client.Core.Internal; internal sealed class SocketReader { private readonly int _minimumBufferSize; - private readonly ConnectionStatsCounter _counter; private readonly SeqeunceBuilder _seqeunceBuilder = new SeqeunceBuilder(); private readonly Stopwatch _stopwatch = new Stopwatch(); private readonly ILogger _logger; @@ -18,11 +17,10 @@ internal sealed class SocketReader private Memory _availableMemory; - public SocketReader(ISocketConnection socketConnection, int minimumBufferSize, ConnectionStatsCounter counter, ILoggerFactory loggerFactory) + public SocketReader(ISocketConnection socketConnection, int minimumBufferSize, ILoggerFactory loggerFactory) { _socketConnection = socketConnection; _minimumBufferSize = minimumBufferSize; - _counter = counter; _logger = loggerFactory.CreateLogger(); _isTraceLogging = _logger.IsEnabled(LogLevel.Trace); } @@ -66,7 +64,7 @@ public async ValueTask> ReadAtLeastAsync(int minimumSize) } totalRead += read; - Interlocked.Add(ref _counter.ReceivedBytes, read); + NatsMetrics.AddReceivedBytes(read); _seqeunceBuilder.Append(_availableMemory.Slice(0, read)); _availableMemory = _availableMemory.Slice(read); } @@ -112,7 +110,7 @@ public async ValueTask> ReadUntilReceiveNewLineAsync() throw ex; } - Interlocked.Add(ref _counter.ReceivedBytes, read); + NatsMetrics.AddReceivedBytes(read); var appendMemory = _availableMemory.Slice(0, read); _seqeunceBuilder.Append(appendMemory); _availableMemory = _availableMemory.Slice(read); diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index f08315b8f..cf54f24e3 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -1,10 +1,10 @@ using System.Buffers; using System.Diagnostics; -using System.Runtime.CompilerServices; using System.Threading.Channels; using Microsoft.Extensions.Logging; using NATS.Client.Core.Commands; using NATS.Client.Core.Internal; + #if NETSTANDARD using Random = NATS.Client.Core.Internal.NetStandardExtensions.Random; #endif @@ -35,7 +35,6 @@ public partial class NatsConnection : INatsConnection /// public Func<(string Host, int Port), ValueTask<(string Host, int Port)>>? OnConnectingAsync; - internal readonly ConnectionStatsCounter Counter; // allow to call from external sources internal volatile ServerInfo? WritableServerInfo; #pragma warning restore SA1401 @@ -81,8 +80,7 @@ public NatsConnection(NatsOpts opts) _disposedCancellationTokenSource = new CancellationTokenSource(); _pool = new ObjectPool(opts.ObjectPoolSize); _name = opts.Name; - Counter = new ConnectionStatsCounter(); - CommandWriter = new CommandWriter("main", this, _pool, Opts, Counter, EnqueuePing); + CommandWriter = new CommandWriter("main", this, _pool, Opts, EnqueuePing); InboxPrefix = NewInbox(opts.InboxPrefix); SubscriptionManager = new SubscriptionManager(this, InboxPrefix); _clientOpts = ClientOpts.Create(Opts); @@ -220,8 +218,6 @@ internal string SpanDestinationName(string subject) return tokens.Length < 2 ? subject : $"{tokens[0]}.{tokens[1]}"; } - internal NatsStats GetStats() => Counter.ToStats(); - internal ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, int sid, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) { return SubscriptionManager.PublishToClientHandlersAsync(subject, replyTo, sid, headersBuffer, payloadBuffer); @@ -455,7 +451,7 @@ private async ValueTask SetupReaderWriterAsync(bool reconnect) // Authentication _userCredentials?.Authenticate(_clientOpts, WritableServerInfo); - await using (var priorityCommandWriter = new PriorityCommandWriter(this, _pool, _socket!, Opts, Counter, EnqueuePing)) + await using (var priorityCommandWriter = new PriorityCommandWriter(this, _pool, _socket!, Opts, EnqueuePing)) { // add CONNECT and PING command to priority lane await priorityCommandWriter.CommandWriter.ConnectAsync(_clientOpts, CancellationToken.None).ConfigureAwait(false); diff --git a/src/NATS.Client.Core/NatsStats.cs b/src/NATS.Client.Core/NatsStats.cs deleted file mode 100644 index 403a75a08..000000000 --- a/src/NATS.Client.Core/NatsStats.cs +++ /dev/null @@ -1,28 +0,0 @@ -namespace NATS.Client.Core; - -public readonly record struct NatsStats -( - long SentBytes, - long ReceivedBytes, - long PendingMessages, - long SentMessages, - long ReceivedMessages, - long SubscriptionCount); - -internal sealed class ConnectionStatsCounter -{ - // for operate Interlocked.Increment/Decrement/Add, expose field as public -#pragma warning disable SA1401 - public long SentBytes; - public long SentMessages; - public long PendingMessages; - public long ReceivedBytes; - public long ReceivedMessages; - public long SubscriptionCount; -#pragma warning restore SA1401 - - public NatsStats ToStats() - { - return new NatsStats(SentBytes, ReceivedBytes, PendingMessages, SentMessages, ReceivedMessages, SubscriptionCount); - } -}