Skip to content

Commit

Permalink
Cleanup compression metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed May 3, 2024
1 parent fc6f4e7 commit bec3917
Showing 1 changed file with 54 additions and 52 deletions.
106 changes: 54 additions & 52 deletions message/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,32 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/proto/pb/p2p"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/compression"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/metric"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
"github.com/ava-labs/avalanchego/utils/wrappers"
)

const (
typeLabel = "type"
opLabel = "op"
directionLabel = "direction"

compressionLabel = "compression"
decompressionLabel = "decompression"
)

var (
_ InboundMessage = (*inboundMessage)(nil)
_ OutboundMessage = (*outboundMessage)(nil)

metricLabels = []string{typeLabel, opLabel, directionLabel}

errUnknownCompressionType = errors.New("message is compressed with an unknown compression type")
)

Expand Down Expand Up @@ -131,9 +140,9 @@ func (m *outboundMessage) BytesSavedCompression() int {
type msgBuilder struct {
log logging.Logger

zstdCompressor compression.Compressor
zstdCompressTimeMetrics map[Op]metric.Averager
zstdDecompressTimeMetrics map[Op]metric.Averager
zstdCompressor compression.Compressor
count *prometheus.CounterVec // type + op + direction
duration *prometheus.CounterVec // type + op + direction

maxMessageTimeout time.Duration
}
Expand All @@ -152,31 +161,30 @@ func newMsgBuilder(
mb := &msgBuilder{
log: log,

zstdCompressor: zstdCompressor,
zstdCompressTimeMetrics: make(map[Op]metric.Averager, len(ExternalOps)),
zstdDecompressTimeMetrics: make(map[Op]metric.Averager, len(ExternalOps)),
zstdCompressor: zstdCompressor,
count: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "compressed_count",
Help: "number of compressed messages",
},
metricLabels,
),
duration: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "compressed_duration",
Help: "time spent handling compressed messages",
},
metricLabels,
),

maxMessageTimeout: maxMessageTimeout,
}

errs := wrappers.Errs{}
for _, op := range ExternalOps {
mb.zstdCompressTimeMetrics[op] = metric.NewAveragerWithErrs(
namespace,
fmt.Sprintf("zstd_%s_compress_time", op),
fmt.Sprintf("time (in ns) to compress %s messages with zstd", op),
metrics,
&errs,
)
mb.zstdDecompressTimeMetrics[op] = metric.NewAveragerWithErrs(
namespace,
fmt.Sprintf("zstd_%s_decompress_time", op),
fmt.Sprintf("time (in ns) to decompress %s messages with zstd", op),
metrics,
&errs,
)
}
return mb, errs.Err
return mb, utils.Err(
metrics.Register(mb.count),
metrics.Register(mb.duration),
)
}

func (mb *msgBuilder) marshal(
Expand All @@ -200,9 +208,8 @@ func (mb *msgBuilder) marshal(
// This recursive packing allows us to avoid an extra compression on/off
// field in the message.
var (
startTime = time.Now()
compressedMsg p2p.Message
opToCompressTimeMetrics map[Op]metric.Averager
startTime = time.Now()
compressedMsg p2p.Message
)
switch compressionType {
case compression.TypeNone:
Expand All @@ -217,7 +224,6 @@ func (mb *msgBuilder) marshal(
CompressedZstd: compressedBytes,
},
}
opToCompressTimeMetrics = mb.zstdCompressTimeMetrics
default:
return nil, 0, 0, errUnknownCompressionType
}
Expand All @@ -228,15 +234,13 @@ func (mb *msgBuilder) marshal(
}
compressTook := time.Since(startTime)

if compressTimeMetric, ok := opToCompressTimeMetrics[op]; ok {
compressTimeMetric.Observe(float64(compressTook))
} else {
// Should never happen
mb.log.Warn("no compression metric found for op",
zap.Stringer("op", op),
zap.Stringer("compressionType", compressionType),
)
labels := prometheus.Labels{
typeLabel: compressionType.String(),
opLabel: op.String(),
directionLabel: compressionLabel,
}
mb.count.With(labels).Inc()
mb.duration.With(labels).Add(float64(compressTook))

bytesSaved := len(uncompressedMsgBytes) - len(compressedMsgBytes)
return compressedMsgBytes, bytesSaved, op, nil
Expand All @@ -250,14 +254,12 @@ func (mb *msgBuilder) unmarshal(b []byte) (*p2p.Message, int, Op, error) {

// Figure out what compression type, if any, was used to compress the message.
var (
opToDecompressTimeMetrics map[Op]metric.Averager
compressor compression.Compressor
compressedBytes []byte
zstdCompressed = m.GetCompressedZstd()
compressor compression.Compressor
compressedBytes []byte
zstdCompressed = m.GetCompressedZstd()
)
switch {
case len(zstdCompressed) > 0:
opToDecompressTimeMetrics = mb.zstdDecompressTimeMetrics
compressor = mb.zstdCompressor
compressedBytes = zstdCompressed
default:
Expand All @@ -284,14 +286,14 @@ func (mb *msgBuilder) unmarshal(b []byte) (*p2p.Message, int, Op, error) {
if err != nil {
return nil, 0, 0, err
}
if decompressTimeMetric, ok := opToDecompressTimeMetrics[op]; ok {
decompressTimeMetric.Observe(float64(decompressTook))
} else {
// Should never happen
mb.log.Warn("no decompression metric found for op",
zap.Stringer("op", op),
)

labels := prometheus.Labels{
typeLabel: compression.TypeZstd.String(),
opLabel: op.String(),
directionLabel: decompressionLabel,
}
mb.count.With(labels).Inc()
mb.duration.With(labels).Add(float64(decompressTook))

return m, bytesSavedCompression, op, nil
}
Expand Down

0 comments on commit bec3917

Please sign in to comment.