Skip to content

Commit

Permalink
Use vectors for p2p message metrics (#2983)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed May 3, 2024
1 parent b158abd commit 067de22
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 143 deletions.
2 changes: 1 addition & 1 deletion network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func NewNetwork(
return nil, fmt.Errorf("initializing outbound message throttler failed with: %w", err)
}

peerMetrics, err := peer.NewMetrics(log, config.Namespace, metricsRegisterer)
peerMetrics, err := peer.NewMetrics(config.Namespace, metricsRegisterer)
if err != nil {
return nil, fmt.Errorf("initializing peer metrics failed with: %w", err)
}
Expand Down
252 changes: 114 additions & 138 deletions network/peer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,180 +4,156 @@
package peer

import (
"fmt"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/metric"
"github.com/ava-labs/avalanchego/utils/wrappers"
"github.com/ava-labs/avalanchego/utils"
)

type MessageMetrics struct {
ReceivedBytes, SentBytes, NumSent, NumFailed, NumReceived prometheus.Counter
SavedReceivedBytes, SavedSentBytes metric.Averager
}
const (
ioLabel = "io"
opLabel = "op"
compressedLabel = "compressed"

func NewMessageMetrics(
op message.Op,
namespace string,
metrics prometheus.Registerer,
errs *wrappers.Errs,
) *MessageMetrics {
msg := &MessageMetrics{
NumSent: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: fmt.Sprintf("%s_sent", op),
Help: fmt.Sprintf("Number of %s messages sent over the network", op),
}),
NumFailed: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: fmt.Sprintf("%s_failed", op),
Help: fmt.Sprintf("Number of %s messages that failed to be sent over the network", op),
}),
NumReceived: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: fmt.Sprintf("%s_received", op),
Help: fmt.Sprintf("Number of %s messages received from the network", op),
}),
ReceivedBytes: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: fmt.Sprintf("%s_received_bytes", op),
Help: fmt.Sprintf("Number of bytes of %s messages received from the network", op),
}),
SentBytes: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: fmt.Sprintf("%s_sent_bytes", op),
Help: fmt.Sprintf("Size of bytes of %s messages received from the network", op),
}),
}
errs.Add(
metrics.Register(msg.NumSent),
metrics.Register(msg.NumFailed),
metrics.Register(msg.NumReceived),
metrics.Register(msg.ReceivedBytes),
metrics.Register(msg.SentBytes),
)
sentLabel = "sent"
receivedLabel = "received"
)

msg.SavedReceivedBytes = metric.NewAveragerWithErrs(
namespace,
fmt.Sprintf("%s_compression_saved_received_bytes", op),
fmt.Sprintf("bytes saved (not received) due to compression of %s messages", op),
metrics,
errs,
)
msg.SavedSentBytes = metric.NewAveragerWithErrs(
namespace,
fmt.Sprintf("%s_compression_saved_sent_bytes", op),
fmt.Sprintf("bytes saved (not sent) due to compression of %s messages", op),
metrics,
errs,
)
return msg
}
var (
opLabels = []string{opLabel}
ioOpLabels = []string{ioLabel, opLabel}
ioOpCompressedLabels = []string{ioLabel, opLabel, compressedLabel}
)

type Metrics struct {
Log logging.Logger
ClockSkew metric.Averager
FailedToParse prometheus.Counter
MessageMetrics map[message.Op]*MessageMetrics
ClockSkewCount prometheus.Counter
ClockSkewSum prometheus.Gauge

NumFailedToParse prometheus.Counter
NumSendFailed *prometheus.CounterVec // op

Messages *prometheus.CounterVec // io + op + compressed
Bytes *prometheus.CounterVec // io + op
BytesSaved *prometheus.GaugeVec // io + op
}

func NewMetrics(
log logging.Logger,
namespace string,
registerer prometheus.Registerer,
) (*Metrics, error) {
m := &Metrics{
Log: log,
FailedToParse: prometheus.NewCounter(prometheus.CounterOpts{
ClockSkewCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "clock_skew_count",
Help: "number of handshake timestamps inspected (n)",
}),
ClockSkewSum: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "clock_skew_sum",
Help: "sum of (peer timestamp - local timestamp) from handshake messages (s)",
}),
NumFailedToParse: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "msgs_failed_to_parse",
Help: "Number of messages that could not be parsed or were invalidly formed",
Help: "number of received messages that could not be parsed",
}),
MessageMetrics: make(map[message.Op]*MessageMetrics, len(message.ExternalOps)),
}

errs := wrappers.Errs{}
errs.Add(
registerer.Register(m.FailedToParse),
)
for _, op := range message.ExternalOps {
m.MessageMetrics[op] = NewMessageMetrics(op, namespace, registerer, &errs)
NumSendFailed: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "msgs_failed_to_send",
Help: "number of messages that failed to be sent",
},
opLabels,
),
Messages: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "msgs",
Help: "number of handled messages",
},
ioOpCompressedLabels,
),
Bytes: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "msgs_bytes",
Help: "number of message bytes",
},
ioOpLabels,
),
BytesSaved: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "msgs_bytes_saved",
Help: "number of message bytes saved",
},
ioOpLabels,
),
}

m.ClockSkew = metric.NewAveragerWithErrs(
namespace,
"clock_skew",
"clock skew during peer handshake",
registerer,
&errs,
return m, utils.Err(
registerer.Register(m.ClockSkewCount),
registerer.Register(m.ClockSkewSum),
registerer.Register(m.NumFailedToParse),
registerer.Register(m.NumSendFailed),
registerer.Register(m.Messages),
registerer.Register(m.Bytes),
registerer.Register(m.BytesSaved),
)
return m, errs.Err
}

// Sent updates the metrics for having sent [msg].
func (m *Metrics) Sent(msg message.OutboundMessage) {
op := msg.Op()
msgMetrics := m.MessageMetrics[op]
if msgMetrics == nil {
m.Log.Error(
"unknown message being sent",
zap.Stringer("messageOp", op),
)
return
}
msgMetrics.NumSent.Inc()
msgMetrics.SentBytes.Add(float64(len(msg.Bytes())))
// assume that if [saved] == 0, [msg] wasn't compressed
if saved := msg.BytesSavedCompression(); saved != 0 {
msgMetrics.SavedSentBytes.Observe(float64(saved))
op := msg.Op().String()
saved := msg.BytesSavedCompression()
compressed := saved != 0 // assume that if [saved] == 0, [msg] wasn't compressed
compressedStr := strconv.FormatBool(compressed)

m.Messages.With(prometheus.Labels{
ioLabel: sentLabel,
opLabel: op,
compressedLabel: compressedStr,
}).Inc()

bytesLabel := prometheus.Labels{
ioLabel: sentLabel,
opLabel: op,
}
m.Bytes.With(bytesLabel).Add(float64(len(msg.Bytes())))
m.BytesSaved.With(bytesLabel).Add(float64(saved))
}

func (m *Metrics) MultipleSendsFailed(op message.Op, count int) {
msgMetrics := m.MessageMetrics[op]
if msgMetrics == nil {
m.Log.Error(
"unknown messages failed to be sent",
zap.Stringer("messageOp", op),
zap.Int("messageCount", count),
)
return
}
msgMetrics.NumFailed.Add(float64(count))
m.NumSendFailed.With(prometheus.Labels{
opLabel: op.String(),
}).Add(float64(count))
}

// SendFailed updates the metrics for having failed to send [msg].
func (m *Metrics) SendFailed(msg message.OutboundMessage) {
op := msg.Op()
msgMetrics := m.MessageMetrics[op]
if msgMetrics == nil {
m.Log.Error(
"unknown message failed to be sent",
zap.Stringer("messageOp", op),
)
return
}
msgMetrics.NumFailed.Inc()
op := msg.Op().String()
m.NumSendFailed.With(prometheus.Labels{
opLabel: op,
}).Inc()
}

func (m *Metrics) Received(msg message.InboundMessage, msgLen uint32) {
op := msg.Op()
msgMetrics := m.MessageMetrics[op]
if msgMetrics == nil {
m.Log.Error(
"unknown message received",
zap.Stringer("messageOp", op),
)
return
}
msgMetrics.NumReceived.Inc()
msgMetrics.ReceivedBytes.Add(float64(msgLen))
// assume that if [saved] == 0, [msg] wasn't compressed
if saved := msg.BytesSavedCompression(); saved != 0 {
msgMetrics.SavedReceivedBytes.Observe(float64(saved))
op := msg.Op().String()
saved := msg.BytesSavedCompression()
compressed := saved != 0 // assume that if [saved] == 0, [msg] wasn't compressed
compressedStr := strconv.FormatBool(compressed)

m.Messages.With(prometheus.Labels{
ioLabel: receivedLabel,
opLabel: op,
compressedLabel: compressedStr,
}).Inc()

bytesLabel := prometheus.Labels{
ioLabel: receivedLabel,
opLabel: op,
}
m.Bytes.With(bytesLabel).Add(float64(msgLen))
m.BytesSaved.With(bytesLabel).Add(float64(saved))
}
5 changes: 3 additions & 2 deletions network/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (p *peer) readMessages() {
zap.Error(err),
)

p.Metrics.FailedToParse.Inc()
p.Metrics.NumFailedToParse.Inc()

// Couldn't parse the message. Read the next one.
onFinishedHandling()
Expand Down Expand Up @@ -906,7 +906,8 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) {
myTimeUnix := uint64(myTime.Unix())
clockDifference := math.Abs(float64(msg.MyTime) - float64(myTimeUnix))

p.Metrics.ClockSkew.Observe(clockDifference)
p.Metrics.ClockSkewCount.Inc()
p.Metrics.ClockSkewSum.Add(clockDifference)

if clockDifference > p.MaxClockDifference.Seconds() {
if _, ok := p.Beacons.GetValidator(constants.PrimaryNetworkID, p.id); ok {
Expand Down
1 change: 0 additions & 1 deletion network/peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func makeRawTestPeers(t *testing.T, trackedSubnets set.Set[ids.ID]) (*rawTestPee
mc := newMessageCreator(t)

metrics, err := NewMetrics(
logging.NoLog{},
"",
prometheus.NewRegistry(),
)
Expand Down
1 change: 0 additions & 1 deletion network/peer/test_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func StartTestPeer(
}

metrics, err := NewMetrics(
logging.NoLog{},
"",
prometheus.NewRegistry(),
)
Expand Down

0 comments on commit 067de22

Please sign in to comment.