Skip to content

Commit

Permalink
Use vectors for message handler metrics (#2987)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed May 3, 2024
1 parent 067de22 commit 7467a40
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 104 deletions.
106 changes: 58 additions & 48 deletions snow/networking/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (h *handler) dispatchSync(ctx context.Context) {
for {
// Get the next message we should process. If the handler is shutting
// down, we may fail to pop a message.
ctx, msg, ok := h.popUnexpiredMsg(h.syncMessageQueue, h.metrics.expired)
ctx, msg, ok := h.popUnexpiredMsg(h.syncMessageQueue)
if !ok {
return
}
Expand Down Expand Up @@ -397,7 +397,7 @@ func (h *handler) dispatchAsync(ctx context.Context) {
for {
// Get the next message we should process. If the handler is shutting
// down, we may fail to pop a message.
ctx, msg, ok := h.popUnexpiredMsg(h.asyncMessageQueue, h.metrics.asyncExpired)
ctx, msg, ok := h.popUnexpiredMsg(h.asyncMessageQueue)
if !ok {
return
}
Expand Down Expand Up @@ -445,7 +445,7 @@ func (h *handler) dispatchChans(ctx context.Context) {
func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error {
var (
nodeID = msg.NodeID()
op = msg.Op()
op = msg.Op().String()
body = msg.Message()
startTime = h.clock.Time()
// Check if the chain is in normal operation at the start of message
Expand All @@ -455,13 +455,13 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error {
if h.ctx.Log.Enabled(logging.Verbo) {
h.ctx.Log.Verbo("forwarding sync message to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
zap.Stringer("message", body),
)
} else {
h.ctx.Log.Debug("forwarding sync message to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
)
}
h.resourceTracker.StartProcessing(nodeID, startTime)
Expand All @@ -471,24 +471,28 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error {
h.ctx.Lock.Unlock()

var (
endTime = h.clock.Time()
messageHistograms = h.metrics.messages[op]
processingTime = endTime.Sub(startTime)
msgHandlingTime = endTime.Sub(lockAcquiredTime)
endTime = h.clock.Time()
lockingTime = lockAcquiredTime.Sub(startTime)
handlingTime = endTime.Sub(lockAcquiredTime)
)
h.resourceTracker.StopProcessing(nodeID, endTime)
messageHistograms.processingTime.Observe(float64(processingTime))
messageHistograms.msgHandlingTime.Observe(float64(msgHandlingTime))
h.metrics.lockingTime.Add(float64(lockingTime))
labels := prometheus.Labels{
opLabel: op,
}
h.metrics.messages.With(labels).Inc()
h.metrics.messageHandlingTime.With(labels).Add(float64(handlingTime))

msg.OnFinishedHandling()
h.ctx.Log.Debug("finished handling sync message",
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
)
if processingTime > syncProcessingTimeWarnLimit && isNormalOp {
if lockingTime+handlingTime > syncProcessingTimeWarnLimit && isNormalOp {
h.ctx.Log.Warn("handling sync message took longer than expected",
zap.Duration("processingTime", processingTime),
zap.Duration("msgHandlingTime", msgHandlingTime),
zap.Duration("lockingTime", lockingTime),
zap.Duration("handlingTime", handlingTime),
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
zap.Stringer("message", body),
)
}
Expand All @@ -504,7 +508,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error {
// drop the message.
h.ctx.Log.Debug("dropping sync message",
zap.String("reason", "uninitialized engine type"),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
zap.Stringer("currentEngineType", currentState.Type),
zap.Stringer("requestedEngineType", msg.EngineType),
)
Expand Down Expand Up @@ -534,7 +538,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error {
// requested an Avalanche engine handle the message.
h.ctx.Log.Debug("dropping sync message",
zap.String("reason", "uninitialized engine state"),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
zap.Stringer("currentEngineType", currentState.Type),
zap.Stringer("requestedEngineType", msg.EngineType),
zap.Stringer("engineState", currentState.State),
Expand Down Expand Up @@ -787,36 +791,38 @@ func (h *handler) handleAsyncMsg(ctx context.Context, msg Message) {
func (h *handler) executeAsyncMsg(ctx context.Context, msg Message) error {
var (
nodeID = msg.NodeID()
op = msg.Op()
op = msg.Op().String()
body = msg.Message()
startTime = h.clock.Time()
)
if h.ctx.Log.Enabled(logging.Verbo) {
h.ctx.Log.Verbo("forwarding async message to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
zap.Stringer("message", body),
)
} else {
h.ctx.Log.Debug("forwarding async message to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
)
}
h.resourceTracker.StartProcessing(nodeID, startTime)
defer func() {
var (
endTime = h.clock.Time()
messageHistograms = h.metrics.messages[op]
processingTime = endTime.Sub(startTime)
endTime = h.clock.Time()
handlingTime = endTime.Sub(startTime)
)
h.resourceTracker.StopProcessing(nodeID, endTime)
// There is no lock grabbed here, so both metrics are identical
messageHistograms.processingTime.Observe(float64(processingTime))
messageHistograms.msgHandlingTime.Observe(float64(processingTime))
labels := prometheus.Labels{
opLabel: op,
}
h.metrics.messages.With(labels).Inc()
h.metrics.messageHandlingTime.With(labels).Add(float64(handlingTime))

msg.OnFinishedHandling()
h.ctx.Log.Debug("finished handling async message",
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
)
}()

Expand Down Expand Up @@ -901,7 +907,7 @@ func (h *handler) executeAsyncMsg(ctx context.Context, msg Message) error {
// Any returned error is treated as fatal
func (h *handler) handleChanMsg(msg message.InboundMessage) error {
var (
op = msg.Op()
op = msg.Op().String()
body = msg.Message()
startTime = h.clock.Time()
// Check if the chain is in normal operation at the start of message
Expand All @@ -910,12 +916,12 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error {
)
if h.ctx.Log.Enabled(logging.Verbo) {
h.ctx.Log.Verbo("forwarding chan message to consensus",
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
zap.Stringer("message", body),
)
} else {
h.ctx.Log.Debug("forwarding chan message to consensus",
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
)
}
h.ctx.Lock.Lock()
Expand All @@ -924,22 +930,26 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error {
h.ctx.Lock.Unlock()

var (
endTime = h.clock.Time()
messageHistograms = h.metrics.messages[op]
processingTime = endTime.Sub(startTime)
msgHandlingTime = endTime.Sub(lockAcquiredTime)
endTime = h.clock.Time()
lockingTime = lockAcquiredTime.Sub(startTime)
handlingTime = endTime.Sub(lockAcquiredTime)
)
messageHistograms.processingTime.Observe(float64(processingTime))
messageHistograms.msgHandlingTime.Observe(float64(msgHandlingTime))
h.metrics.lockingTime.Add(float64(lockingTime))
labels := prometheus.Labels{
opLabel: op,
}
h.metrics.messages.With(labels).Inc()
h.metrics.messageHandlingTime.With(labels).Add(float64(handlingTime))

msg.OnFinishedHandling()
h.ctx.Log.Debug("finished handling chan message",
zap.Stringer("messageOp", op),
zap.String("messageOp", op),
)
if processingTime > syncProcessingTimeWarnLimit && isNormalOp {
if lockingTime+handlingTime > syncProcessingTimeWarnLimit && isNormalOp {
h.ctx.Log.Warn("handling chan message took longer than expected",
zap.Duration("processingTime", processingTime),
zap.Duration("msgHandlingTime", msgHandlingTime),
zap.Stringer("messageOp", op),
zap.Duration("lockingTime", lockingTime),
zap.Duration("handlingTime", handlingTime),
zap.String("messageOp", op),
zap.Stringer("message", body),
)
}
Expand Down Expand Up @@ -974,10 +984,7 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error {
}
}

func (h *handler) popUnexpiredMsg(
queue MessageQueue,
expired prometheus.Counter,
) (context.Context, Message, bool) {
func (h *handler) popUnexpiredMsg(queue MessageQueue) (context.Context, Message, bool) {
for {
// Get the next message we should process. If the handler is shutting
// down, we may fail to pop a message.
Expand All @@ -988,16 +995,19 @@ func (h *handler) popUnexpiredMsg(

// If this message's deadline has passed, don't process it.
if expiration := msg.Expiration(); h.clock.Time().After(expiration) {
op := msg.Op().String()
h.ctx.Log.Debug("dropping message",
zap.String("reason", "timeout"),
zap.Stringer("nodeID", msg.NodeID()),
zap.Stringer("messageOp", msg.Op()),
zap.String("messageOp", op),
)
span := trace.SpanFromContext(ctx)
span.AddEvent("dropping message", trace.WithAttributes(
attribute.String("reason", "timeout"),
))
expired.Inc()
h.metrics.expired.With(prometheus.Labels{
opLabel: op,
}).Inc()
msg.OnFinishedHandling()
continue
}
Expand Down
97 changes: 41 additions & 56 deletions snow/networking/handler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,69 +4,54 @@
package handler

import (
"fmt"

"github.com/prometheus/client_golang/prometheus"

"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/utils/metric"
"github.com/ava-labs/avalanchego/utils/wrappers"
"github.com/ava-labs/avalanchego/utils"
)

type metrics struct {
expired prometheus.Counter
asyncExpired prometheus.Counter
messages map[message.Op]*messageProcessing
}

type messageProcessing struct {
processingTime metric.Averager
msgHandlingTime metric.Averager
expired *prometheus.CounterVec // op
messages *prometheus.CounterVec // op
lockingTime prometheus.Counter
messageHandlingTime *prometheus.CounterVec // op
}

func newMetrics(namespace string, reg prometheus.Registerer) (*metrics, error) {
errs := wrappers.Errs{}

expired := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "expired",
Help: "Incoming sync messages dropped because the message deadline expired",
})
asyncExpired := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "async_expired",
Help: "Incoming async messages dropped because the message deadline expired",
})
errs.Add(
reg.Register(expired),
reg.Register(asyncExpired),
)

messages := make(map[message.Op]*messageProcessing, len(message.ConsensusOps))
for _, op := range message.ConsensusOps {
opStr := op.String()
messageProcessing := &messageProcessing{
processingTime: metric.NewAveragerWithErrs(
namespace,
opStr,
"time (in ns) spent handling a "+opStr,
reg,
&errs,
),
msgHandlingTime: metric.NewAveragerWithErrs(
namespace,
opStr+"_msg_handling",
fmt.Sprintf("time (in ns) spent handling a %s after grabbing the lock", opStr),
reg,
&errs,
),
}
messages[op] = messageProcessing
m := &metrics{
expired: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "expired",
Help: "messages dropped because the deadline expired",
},
opLabels,
),
messages: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "messages",
Help: "messages handled",
},
opLabels,
),
messageHandlingTime: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "message_handling_time",
Help: "time spent handling messages",
},
opLabels,
),
lockingTime: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "locking_time",
Help: "time spent acquiring the context lock",
}),
}

return &metrics{
expired: expired,
asyncExpired: asyncExpired,
messages: messages,
}, errs.Err
return m, utils.Err(
reg.Register(m.expired),
reg.Register(m.messages),
reg.Register(m.messageHandlingTime),
reg.Register(m.lockingTime),
)
}

0 comments on commit 7467a40

Please sign in to comment.