From fc6f4e73a5dd246ed6ffc8691c38eb6cd5fa6a0d Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Fri, 3 May 2024 16:21:35 -0400 Subject: [PATCH] Use vector for router latency metrics (#2989) --- snow/networking/timeout/manager.go | 2 +- snow/networking/timeout/metrics.go | 110 +++++++++-------------------- 2 files changed, 35 insertions(+), 77 deletions(-) diff --git a/snow/networking/timeout/manager.go b/snow/networking/timeout/manager.go index 95a3be25e16..89a7cc56d86 100644 --- a/snow/networking/timeout/manager.go +++ b/snow/networking/timeout/manager.go @@ -149,7 +149,7 @@ func (m *manager) RegisterResponse( op message.Op, latency time.Duration, ) { - m.metrics.Observe(nodeID, chainID, op, latency) + m.metrics.Observe(chainID, op, latency) m.benchlistMgr.RegisterResponse(chainID, nodeID) m.tm.Remove(requestID) } diff --git a/snow/networking/timeout/metrics.go b/snow/networking/timeout/metrics.go index 0892e5d8794..49c08936937 100644 --- a/snow/networking/timeout/metrics.go +++ b/snow/networking/timeout/metrics.go @@ -9,20 +9,20 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/message" "github.com/ava-labs/avalanchego/snow" - "github.com/ava-labs/avalanchego/utils/metric" - "github.com/ava-labs/avalanchego/utils/wrappers" + "github.com/ava-labs/avalanchego/utils" ) const ( - defaultRequestHelpMsg = "time (in ns) spent waiting for a response to this message" - validatorIDLabel = "validatorID" + responseNamespace = "response" + opLabel = "op" ) +var opLabels = []string{opLabel} + type metrics struct { lock sync.Mutex chainToMetrics map[ids.ID]*chainMetrics @@ -38,7 +38,7 @@ func (m *metrics) RegisterChain(ctx *snow.ConsensusContext) error { if _, exists := m.chainToMetrics[ctx.ChainID]; exists { return fmt.Errorf("chain %s has already been registered", ctx.ChainID) } - cm, err := newChainMetrics(ctx, false) + cm, err := newChainMetrics(ctx.Registerer) if err != nil { return fmt.Errorf("couldn't create metrics for chain %s: %w", ctx.ChainID, err) } @@ -47,7 +47,7 @@ func (m *metrics) RegisterChain(ctx *snow.ConsensusContext) error { } // Record that a response of type [op] took [latency] -func (m *metrics) Observe(nodeID ids.NodeID, chainID ids.ID, op message.Op, latency time.Duration) { +func (m *metrics) Observe(chainID ids.ID, op message.Op, latency time.Duration) { m.lock.Lock() defer m.lock.Unlock() @@ -56,86 +56,44 @@ func (m *metrics) Observe(nodeID ids.NodeID, chainID ids.ID, op message.Op, late // TODO should this log an error? return } - cm.observe(nodeID, op, latency) + cm.observe(op, latency) } // chainMetrics contains message response time metrics for a chain type chainMetrics struct { - ctx *snow.ConsensusContext - - messageLatencies map[message.Op]metric.Averager - - summaryEnabled bool - messageSummaries map[message.Op]*prometheus.SummaryVec + messages *prometheus.CounterVec // op + messageLatencies *prometheus.CounterVec // op } -func newChainMetrics(ctx *snow.ConsensusContext, summaryEnabled bool) (*chainMetrics, error) { +func newChainMetrics(reg prometheus.Registerer) (*chainMetrics, error) { cm := &chainMetrics{ - ctx: ctx, - - messageLatencies: make(map[message.Op]metric.Averager, len(message.ConsensusResponseOps)), - - summaryEnabled: summaryEnabled, - messageSummaries: make(map[message.Op]*prometheus.SummaryVec, len(message.ConsensusResponseOps)), - } - - errs := wrappers.Errs{} - for _, op := range message.ConsensusResponseOps { - cm.messageLatencies[op] = metric.NewAveragerWithErrs( - "lat", - op.String(), - defaultRequestHelpMsg, - ctx.Registerer, - &errs, - ) - - if !summaryEnabled { - continue - } - - summaryName := fmt.Sprintf("%s_peer", op) - summary := prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: "lat", - Name: summaryName, - Help: defaultRequestHelpMsg, + messages: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: responseNamespace, + Name: "messages", + Help: "number of responses", }, - []string{validatorIDLabel}, - ) - cm.messageSummaries[op] = summary - - if err := ctx.Registerer.Register(summary); err != nil { - errs.Add(fmt.Errorf("failed to register %s statistics: %w", summaryName, err)) - } + opLabels, + ), + messageLatencies: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: responseNamespace, + Name: "message_latencies", + Help: "message latencies (ns)", + }, + opLabels, + ), } - return cm, errs.Err + return cm, utils.Err( + reg.Register(cm.messages), + reg.Register(cm.messageLatencies), + ) } -func (cm *chainMetrics) observe(nodeID ids.NodeID, op message.Op, latency time.Duration) { - lat := float64(latency) - if msg, exists := cm.messageLatencies[op]; exists { - msg.Observe(lat) - } - - if !cm.summaryEnabled { - return - } - +func (cm *chainMetrics) observe(op message.Op, latency time.Duration) { labels := prometheus.Labels{ - validatorIDLabel: nodeID.String(), - } - - msg, exists := cm.messageSummaries[op] - if !exists { - return - } - - observer, err := msg.GetMetricWith(labels) - if err != nil { - cm.ctx.Log.Warn("failed to get observer with validatorID", - zap.Error(err), - ) - return + opLabel: op.String(), } - observer.Observe(lat) + cm.messages.With(labels).Inc() + cm.messageLatencies.With(labels).Add(float64(latency)) }