From 067de22e836288415467b56e3466246e149d01aa Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Fri, 3 May 2024 14:24:08 -0400 Subject: [PATCH] Use vectors for p2p message metrics (#2983) --- network/network.go | 2 +- network/peer/metrics.go | 252 +++++++++++++++++--------------------- network/peer/peer.go | 5 +- network/peer/peer_test.go | 1 - network/peer/test_peer.go | 1 - 5 files changed, 118 insertions(+), 143 deletions(-) diff --git a/network/network.go b/network/network.go index 506e5190858..a143e6202e9 100644 --- a/network/network.go +++ b/network/network.go @@ -225,7 +225,7 @@ func NewNetwork( return nil, fmt.Errorf("initializing outbound message throttler failed with: %w", err) } - peerMetrics, err := peer.NewMetrics(log, config.Namespace, metricsRegisterer) + peerMetrics, err := peer.NewMetrics(config.Namespace, metricsRegisterer) if err != nil { return nil, fmt.Errorf("initializing peer metrics failed with: %w", err) } diff --git a/network/peer/metrics.go b/network/peer/metrics.go index 1dcfcdd6389..94d46ac1e5f 100644 --- a/network/peer/metrics.go +++ b/network/peer/metrics.go @@ -4,180 +4,156 @@ package peer import ( - "fmt" + "strconv" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" "github.com/ava-labs/avalanchego/message" - "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/metric" - "github.com/ava-labs/avalanchego/utils/wrappers" + "github.com/ava-labs/avalanchego/utils" ) -type MessageMetrics struct { - ReceivedBytes, SentBytes, NumSent, NumFailed, NumReceived prometheus.Counter - SavedReceivedBytes, SavedSentBytes metric.Averager -} +const ( + ioLabel = "io" + opLabel = "op" + compressedLabel = "compressed" -func NewMessageMetrics( - op message.Op, - namespace string, - metrics prometheus.Registerer, - errs *wrappers.Errs, -) *MessageMetrics { - msg := &MessageMetrics{ - NumSent: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: fmt.Sprintf("%s_sent", op), - Help: fmt.Sprintf("Number of %s messages sent over the network", op), - }), - NumFailed: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: fmt.Sprintf("%s_failed", op), - Help: fmt.Sprintf("Number of %s messages that failed to be sent over the network", op), - }), - NumReceived: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: fmt.Sprintf("%s_received", op), - Help: fmt.Sprintf("Number of %s messages received from the network", op), - }), - ReceivedBytes: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: fmt.Sprintf("%s_received_bytes", op), - Help: fmt.Sprintf("Number of bytes of %s messages received from the network", op), - }), - SentBytes: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: fmt.Sprintf("%s_sent_bytes", op), - Help: fmt.Sprintf("Size of bytes of %s messages received from the network", op), - }), - } - errs.Add( - metrics.Register(msg.NumSent), - metrics.Register(msg.NumFailed), - metrics.Register(msg.NumReceived), - metrics.Register(msg.ReceivedBytes), - metrics.Register(msg.SentBytes), - ) + sentLabel = "sent" + receivedLabel = "received" +) - msg.SavedReceivedBytes = metric.NewAveragerWithErrs( - namespace, - fmt.Sprintf("%s_compression_saved_received_bytes", op), - fmt.Sprintf("bytes saved (not received) due to compression of %s messages", op), - metrics, - errs, - ) - msg.SavedSentBytes = metric.NewAveragerWithErrs( - namespace, - fmt.Sprintf("%s_compression_saved_sent_bytes", op), - fmt.Sprintf("bytes saved (not sent) due to compression of %s messages", op), - metrics, - errs, - ) - return msg -} +var ( + opLabels = []string{opLabel} + ioOpLabels = []string{ioLabel, opLabel} + ioOpCompressedLabels = []string{ioLabel, opLabel, compressedLabel} +) type Metrics struct { - Log logging.Logger - ClockSkew metric.Averager - FailedToParse prometheus.Counter - MessageMetrics map[message.Op]*MessageMetrics + ClockSkewCount prometheus.Counter + ClockSkewSum prometheus.Gauge + + NumFailedToParse prometheus.Counter + NumSendFailed *prometheus.CounterVec // op + + Messages *prometheus.CounterVec // io + op + compressed + Bytes *prometheus.CounterVec // io + op + BytesSaved *prometheus.GaugeVec // io + op } func NewMetrics( - log logging.Logger, namespace string, registerer prometheus.Registerer, ) (*Metrics, error) { m := &Metrics{ - Log: log, - FailedToParse: prometheus.NewCounter(prometheus.CounterOpts{ + ClockSkewCount: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "clock_skew_count", + Help: "number of handshake timestamps inspected (n)", + }), + ClockSkewSum: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "clock_skew_sum", + Help: "sum of (peer timestamp - local timestamp) from handshake messages (s)", + }), + NumFailedToParse: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Name: "msgs_failed_to_parse", - Help: "Number of messages that could not be parsed or were invalidly formed", + Help: "number of received messages that could not be parsed", }), - MessageMetrics: make(map[message.Op]*MessageMetrics, len(message.ExternalOps)), - } - - errs := wrappers.Errs{} - errs.Add( - registerer.Register(m.FailedToParse), - ) - for _, op := range message.ExternalOps { - m.MessageMetrics[op] = NewMessageMetrics(op, namespace, registerer, &errs) + NumSendFailed: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "msgs_failed_to_send", + Help: "number of messages that failed to be sent", + }, + opLabels, + ), + Messages: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "msgs", + Help: "number of handled messages", + }, + ioOpCompressedLabels, + ), + Bytes: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "msgs_bytes", + Help: "number of message bytes", + }, + ioOpLabels, + ), + BytesSaved: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "msgs_bytes_saved", + Help: "number of message bytes saved", + }, + ioOpLabels, + ), } - - m.ClockSkew = metric.NewAveragerWithErrs( - namespace, - "clock_skew", - "clock skew during peer handshake", - registerer, - &errs, + return m, utils.Err( + registerer.Register(m.ClockSkewCount), + registerer.Register(m.ClockSkewSum), + registerer.Register(m.NumFailedToParse), + registerer.Register(m.NumSendFailed), + registerer.Register(m.Messages), + registerer.Register(m.Bytes), + registerer.Register(m.BytesSaved), ) - return m, errs.Err } // Sent updates the metrics for having sent [msg]. func (m *Metrics) Sent(msg message.OutboundMessage) { - op := msg.Op() - msgMetrics := m.MessageMetrics[op] - if msgMetrics == nil { - m.Log.Error( - "unknown message being sent", - zap.Stringer("messageOp", op), - ) - return - } - msgMetrics.NumSent.Inc() - msgMetrics.SentBytes.Add(float64(len(msg.Bytes()))) - // assume that if [saved] == 0, [msg] wasn't compressed - if saved := msg.BytesSavedCompression(); saved != 0 { - msgMetrics.SavedSentBytes.Observe(float64(saved)) + op := msg.Op().String() + saved := msg.BytesSavedCompression() + compressed := saved != 0 // assume that if [saved] == 0, [msg] wasn't compressed + compressedStr := strconv.FormatBool(compressed) + + m.Messages.With(prometheus.Labels{ + ioLabel: sentLabel, + opLabel: op, + compressedLabel: compressedStr, + }).Inc() + + bytesLabel := prometheus.Labels{ + ioLabel: sentLabel, + opLabel: op, } + m.Bytes.With(bytesLabel).Add(float64(len(msg.Bytes()))) + m.BytesSaved.With(bytesLabel).Add(float64(saved)) } func (m *Metrics) MultipleSendsFailed(op message.Op, count int) { - msgMetrics := m.MessageMetrics[op] - if msgMetrics == nil { - m.Log.Error( - "unknown messages failed to be sent", - zap.Stringer("messageOp", op), - zap.Int("messageCount", count), - ) - return - } - msgMetrics.NumFailed.Add(float64(count)) + m.NumSendFailed.With(prometheus.Labels{ + opLabel: op.String(), + }).Add(float64(count)) } // SendFailed updates the metrics for having failed to send [msg]. func (m *Metrics) SendFailed(msg message.OutboundMessage) { - op := msg.Op() - msgMetrics := m.MessageMetrics[op] - if msgMetrics == nil { - m.Log.Error( - "unknown message failed to be sent", - zap.Stringer("messageOp", op), - ) - return - } - msgMetrics.NumFailed.Inc() + op := msg.Op().String() + m.NumSendFailed.With(prometheus.Labels{ + opLabel: op, + }).Inc() } func (m *Metrics) Received(msg message.InboundMessage, msgLen uint32) { - op := msg.Op() - msgMetrics := m.MessageMetrics[op] - if msgMetrics == nil { - m.Log.Error( - "unknown message received", - zap.Stringer("messageOp", op), - ) - return - } - msgMetrics.NumReceived.Inc() - msgMetrics.ReceivedBytes.Add(float64(msgLen)) - // assume that if [saved] == 0, [msg] wasn't compressed - if saved := msg.BytesSavedCompression(); saved != 0 { - msgMetrics.SavedReceivedBytes.Observe(float64(saved)) + op := msg.Op().String() + saved := msg.BytesSavedCompression() + compressed := saved != 0 // assume that if [saved] == 0, [msg] wasn't compressed + compressedStr := strconv.FormatBool(compressed) + + m.Messages.With(prometheus.Labels{ + ioLabel: receivedLabel, + opLabel: op, + compressedLabel: compressedStr, + }).Inc() + + bytesLabel := prometheus.Labels{ + ioLabel: receivedLabel, + opLabel: op, } + m.Bytes.With(bytesLabel).Add(float64(msgLen)) + m.BytesSaved.With(bytesLabel).Add(float64(saved)) } diff --git a/network/peer/peer.go b/network/peer/peer.go index 1774d3f24f9..08b635600c7 100644 --- a/network/peer/peer.go +++ b/network/peer/peer.go @@ -478,7 +478,7 @@ func (p *peer) readMessages() { zap.Error(err), ) - p.Metrics.FailedToParse.Inc() + p.Metrics.NumFailedToParse.Inc() // Couldn't parse the message. Read the next one. onFinishedHandling() @@ -906,7 +906,8 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { myTimeUnix := uint64(myTime.Unix()) clockDifference := math.Abs(float64(msg.MyTime) - float64(myTimeUnix)) - p.Metrics.ClockSkew.Observe(clockDifference) + p.Metrics.ClockSkewCount.Inc() + p.Metrics.ClockSkewSum.Add(clockDifference) if clockDifference > p.MaxClockDifference.Seconds() { if _, ok := p.Beacons.GetValidator(constants.PrimaryNetworkID, p.id); ok { diff --git a/network/peer/peer_test.go b/network/peer/peer_test.go index 96d6ddb7b98..30dc817c517 100644 --- a/network/peer/peer_test.go +++ b/network/peer/peer_test.go @@ -82,7 +82,6 @@ func makeRawTestPeers(t *testing.T, trackedSubnets set.Set[ids.ID]) (*rawTestPee mc := newMessageCreator(t) metrics, err := NewMetrics( - logging.NoLog{}, "", prometheus.NewRegistry(), ) diff --git a/network/peer/test_peer.go b/network/peer/test_peer.go index 0c8762bc44c..a8f633ccf65 100644 --- a/network/peer/test_peer.go +++ b/network/peer/test_peer.go @@ -85,7 +85,6 @@ func StartTestPeer( } metrics, err := NewMetrics( - logging.NoLog{}, "", prometheus.NewRegistry(), )