Skip to content

Commit

Permalink
Use vector for router latency metrics (#2989)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed May 3, 2024
1 parent 1b2f3d9 commit fc6f4e7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 77 deletions.
2 changes: 1 addition & 1 deletion snow/networking/timeout/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (m *manager) RegisterResponse(
op message.Op,
latency time.Duration,
) {
m.metrics.Observe(nodeID, chainID, op, latency)
m.metrics.Observe(chainID, op, latency)
m.benchlistMgr.RegisterResponse(chainID, nodeID)
m.tm.Remove(requestID)
}
Expand Down
110 changes: 34 additions & 76 deletions snow/networking/timeout/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/utils/metric"
"github.com/ava-labs/avalanchego/utils/wrappers"
"github.com/ava-labs/avalanchego/utils"
)

const (
defaultRequestHelpMsg = "time (in ns) spent waiting for a response to this message"
validatorIDLabel = "validatorID"
responseNamespace = "response"
opLabel = "op"
)

var opLabels = []string{opLabel}

type metrics struct {
lock sync.Mutex
chainToMetrics map[ids.ID]*chainMetrics
Expand All @@ -38,7 +38,7 @@ func (m *metrics) RegisterChain(ctx *snow.ConsensusContext) error {
if _, exists := m.chainToMetrics[ctx.ChainID]; exists {
return fmt.Errorf("chain %s has already been registered", ctx.ChainID)
}
cm, err := newChainMetrics(ctx, false)
cm, err := newChainMetrics(ctx.Registerer)
if err != nil {
return fmt.Errorf("couldn't create metrics for chain %s: %w", ctx.ChainID, err)
}
Expand All @@ -47,7 +47,7 @@ func (m *metrics) RegisterChain(ctx *snow.ConsensusContext) error {
}

// Record that a response of type [op] took [latency]
func (m *metrics) Observe(nodeID ids.NodeID, chainID ids.ID, op message.Op, latency time.Duration) {
func (m *metrics) Observe(chainID ids.ID, op message.Op, latency time.Duration) {
m.lock.Lock()
defer m.lock.Unlock()

Expand All @@ -56,86 +56,44 @@ func (m *metrics) Observe(nodeID ids.NodeID, chainID ids.ID, op message.Op, late
// TODO should this log an error?
return
}
cm.observe(nodeID, op, latency)
cm.observe(op, latency)
}

// chainMetrics contains message response time metrics for a chain
type chainMetrics struct {
ctx *snow.ConsensusContext

messageLatencies map[message.Op]metric.Averager

summaryEnabled bool
messageSummaries map[message.Op]*prometheus.SummaryVec
messages *prometheus.CounterVec // op
messageLatencies *prometheus.CounterVec // op
}

func newChainMetrics(ctx *snow.ConsensusContext, summaryEnabled bool) (*chainMetrics, error) {
func newChainMetrics(reg prometheus.Registerer) (*chainMetrics, error) {
cm := &chainMetrics{
ctx: ctx,

messageLatencies: make(map[message.Op]metric.Averager, len(message.ConsensusResponseOps)),

summaryEnabled: summaryEnabled,
messageSummaries: make(map[message.Op]*prometheus.SummaryVec, len(message.ConsensusResponseOps)),
}

errs := wrappers.Errs{}
for _, op := range message.ConsensusResponseOps {
cm.messageLatencies[op] = metric.NewAveragerWithErrs(
"lat",
op.String(),
defaultRequestHelpMsg,
ctx.Registerer,
&errs,
)

if !summaryEnabled {
continue
}

summaryName := fmt.Sprintf("%s_peer", op)
summary := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: "lat",
Name: summaryName,
Help: defaultRequestHelpMsg,
messages: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: responseNamespace,
Name: "messages",
Help: "number of responses",
},
[]string{validatorIDLabel},
)
cm.messageSummaries[op] = summary

if err := ctx.Registerer.Register(summary); err != nil {
errs.Add(fmt.Errorf("failed to register %s statistics: %w", summaryName, err))
}
opLabels,
),
messageLatencies: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: responseNamespace,
Name: "message_latencies",
Help: "message latencies (ns)",
},
opLabels,
),
}
return cm, errs.Err
return cm, utils.Err(
reg.Register(cm.messages),
reg.Register(cm.messageLatencies),
)
}

func (cm *chainMetrics) observe(nodeID ids.NodeID, op message.Op, latency time.Duration) {
lat := float64(latency)
if msg, exists := cm.messageLatencies[op]; exists {
msg.Observe(lat)
}

if !cm.summaryEnabled {
return
}

func (cm *chainMetrics) observe(op message.Op, latency time.Duration) {
labels := prometheus.Labels{
validatorIDLabel: nodeID.String(),
}

msg, exists := cm.messageSummaries[op]
if !exists {
return
}

observer, err := msg.GetMetricWith(labels)
if err != nil {
cm.ctx.Log.Warn("failed to get observer with validatorID",
zap.Error(err),
)
return
opLabel: op.String(),
}
observer.Observe(lat)
cm.messages.With(labels).Inc()
cm.messageLatencies.With(labels).Add(float64(latency))
}

0 comments on commit fc6f4e7

Please sign in to comment.