Skip to content

Commit

Permalink
feat: add metrics middleware (#5)
Browse files Browse the repository at this point in the history
* feat: add metrics middleware

* fmt: imports

* dash: update metrics dashboard

* rename to metrics
  • Loading branch information
shrimalmadhur authored Oct 23, 2024
1 parent 7e6e83b commit 7320f9e
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 82 deletions.
37 changes: 9 additions & 28 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,22 @@ package metrics
import "github.com/prometheus/client_golang/prometheus"

const (
SuccessLabel = "success"
FailureLabel = "failure"

SubsystemRPCServer = "rpc_server"

MetricRequestTotal = "request_total"
MetricRequestDurationSeconds = "request_duration_seconds"
MetricResponseTotal = "response_total"

MethodLabelName = "method"
StatusLabelName = "status"
CodeLabelName = "code"
)

type Recorder interface {
RecordRPCServerRequest(method string) func()
RecordRPCServerResponse(method string, status string)
RecordRPCServerRequest(method string) func(code string)
}

type RPCServerMetrics struct {
RPCServerRequestTotal *prometheus.CounterVec
RPCServerRequestDurationSeconds *prometheus.SummaryVec
RPCServerResponseTotal *prometheus.CounterVec
}

func NewRPCServerMetrics(ns string, registry *prometheus.Registry) *RPCServerMetrics {
Expand All @@ -33,50 +27,37 @@ func NewRPCServerMetrics(ns string, registry *prometheus.Registry) *RPCServerMet
Namespace: ns,
Subsystem: SubsystemRPCServer,
Name: MetricRequestTotal,
Help: "Total number of RPC server requests.",
}, []string{MethodLabelName}),
Help: "Total number of RPC server requests with status codes",
}, []string{MethodLabelName, CodeLabelName}),
RPCServerRequestDurationSeconds: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: ns,
Subsystem: SubsystemRPCServer,
Name: MetricRequestDurationSeconds,
Help: "Duration of RPC server requests in seconds.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
}, []string{MethodLabelName}),
RPCServerResponseTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: SubsystemRPCServer,
Name: MetricResponseTotal,
Help: "Total number of RPC server responses.",
}, []string{MethodLabelName, StatusLabelName}),
}
registry.MustRegister(m.RPCServerRequestTotal)
registry.MustRegister(m.RPCServerRequestDurationSeconds)
registry.MustRegister(m.RPCServerResponseTotal)
return m
}

func (m *RPCServerMetrics) RecordRPCServerRequest(method string) func() {
m.RPCServerRequestTotal.WithLabelValues(method).Inc()
func (m *RPCServerMetrics) RecordRPCServerRequest(method string) func(code string) {
timer := prometheus.NewTimer(m.RPCServerRequestDurationSeconds.WithLabelValues(method))
return func() {
return func(code string) {
m.RPCServerRequestTotal.WithLabelValues(method, code).Inc()
timer.ObserveDuration()
}
}

func (m *RPCServerMetrics) RecordRPCServerResponse(method string, status string) {
m.RPCServerResponseTotal.WithLabelValues(method, status).Inc()
}

type NoopRPCMetrics struct{}

func NewNoopRPCMetrics() *NoopRPCMetrics {
return &NoopRPCMetrics{}
}

func (NoopRPCMetrics) RecordRPCServerRequest(method string) func() {
return func() {}
func (NoopRPCMetrics) RecordRPCServerRequest(method string) func(code string) {
return func(code string) {}
}

func (NoopRPCMetrics) RecordRPCServerResponse(method string, status string) {}

var _ Recorder = (*NoopRPCMetrics)(nil)
43 changes: 43 additions & 0 deletions internal/middleware/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package middleware

import (
"context"

"github.com/Layr-Labs/cerberus/internal/metrics"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
)

type MetricsMiddleware struct {
registry *prometheus.Registry
recorder metrics.Recorder
}

func NewMetricsMiddleware(
registry *prometheus.Registry,
recorder metrics.Recorder,
) *MetricsMiddleware {
return &MetricsMiddleware{
registry: registry,
recorder: recorder,
}
}

// UnaryServerInterceptor returns a new unary server interceptor for metrics
func (m *MetricsMiddleware) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
recordDuration := m.recorder.RecordRPCServerRequest(info.FullMethod)

// Handle request
resp, err := handler(ctx, req)

// Get status code
code := status.Code(err)

// Record response
recordDuration(code.String())

return resp, err
}
}
8 changes: 7 additions & 1 deletion internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/Layr-Labs/cerberus/internal/configuration"
"github.com/Layr-Labs/cerberus/internal/metrics"
"github.com/Layr-Labs/cerberus/internal/middleware"
"github.com/Layr-Labs/cerberus/internal/services/kms"
"github.com/Layr-Labs/cerberus/internal/services/signing"
"github.com/Layr-Labs/cerberus/internal/store/filesystem"
Expand All @@ -35,7 +36,8 @@ func Start(config *configuration.Configuration, logger *slog.Logger) {
registry := prometheus.NewRegistry()
registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
registry.MustRegister(collectors.NewGoCollector())
rpcMetrics := metrics.NewRPCServerMetrics("remote_bls", registry)
rpcMetrics := metrics.NewRPCServerMetrics("cerberus", registry)

go startMetricsServer(registry, config.MetricsPort, logger)

keystore := filesystem.NewStore(config.KeystoreDir, logger)
Expand All @@ -51,6 +53,10 @@ func Start(config *configuration.Configuration, logger *slog.Logger) {
opts = append(opts, grpc.Creds(creds))
}

// Register metrics middleware
metricsMiddleware := middleware.NewMetricsMiddleware(registry, rpcMetrics)
opts = append(opts, grpc.UnaryInterceptor(metricsMiddleware.UnaryServerInterceptor()))

s := grpc.NewServer(opts...)
kmsService := kms.NewService(config, keystore, logger, rpcMetrics)
signingService := signing.NewService(config, keystore, logger, rpcMetrics)
Expand Down
15 changes: 0 additions & 15 deletions internal/services/kms/kms.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,18 @@ func (k *Service) GenerateKeyPair(
ctx context.Context,
req *v1.GenerateKeyPairRequest,
) (*v1.GenerateKeyPairResponse, error) {
observe := k.metrics.RecordRPCServerRequest("kms/GenerateKeyPair")
defer observe()
password := req.GetPassword()

// Generate a new BLS key pair
keyPair, err := keystore.NewKeyPair(password, mnemonic.English)
if err != nil {
k.logger.Error(fmt.Sprintf("Failed to generate BLS key pair: %v", err))
k.metrics.RecordRPCServerResponse("kms/GenerateKeyPair", metrics.FailureLabel)
return nil, status.Error(codes.Internal, err.Error())
}

pubKeyHex, err := k.store.StoreKey(ctx, keyPair)
if err != nil {
k.logger.Error(fmt.Sprintf("Failed to save BLS key pair to file: %v", err))
k.metrics.RecordRPCServerResponse("kms/GenerateKeyPair", metrics.FailureLabel)
return nil, status.Error(codes.Internal, err.Error())
}

Expand All @@ -72,7 +68,6 @@ func (k *Service) GenerateKeyPair(
copy(pkBytesSlice, keyPair.PrivateKey[:])
privKeyHex := common.Trim0x(hex.EncodeToString(pkBytesSlice))

k.metrics.RecordRPCServerResponse("kms/GenerateKeyPair", metrics.SuccessLabel)
return &v1.GenerateKeyPairResponse{
PublicKey: pubKeyHex,
PrivateKey: privKeyHex,
Expand All @@ -84,8 +79,6 @@ func (k *Service) ImportKey(
ctx context.Context,
req *v1.ImportKeyRequest,
) (*v1.ImportKeyResponse, error) {
observe := k.metrics.RecordRPCServerRequest("kms/ImportKey")
defer observe()
pkString := req.GetPrivateKey()
password := req.GetPassword()
pkMnemonic := req.GetMnemonic()
Expand All @@ -96,7 +89,6 @@ func (k *Service) ImportKey(
ks, err := keystore.NewKeyPairFromMnemonic(pkMnemonic, password)
if err != nil {
k.logger.Error(fmt.Sprintf("Failed to import key pair from mnemonic: %v", err))
k.metrics.RecordRPCServerResponse("kms/ImportKey", metrics.FailureLabel)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
pkBytes = ks.PrivateKey
Expand All @@ -111,7 +103,6 @@ func (k *Service) ImportKey(
pkBytes, err = hex.DecodeString(pkHex)
if err != nil {
k.logger.Error(fmt.Sprintf("Failed to import key pair from string: %v", err))
k.metrics.RecordRPCServerResponse("kms/ImportKey", metrics.FailureLabel)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
}
Expand All @@ -123,27 +114,21 @@ func (k *Service) ImportKey(
)
if err != nil {
k.logger.Error(fmt.Sprintf("Failed to save BLS key pair to file: %v", err))
k.metrics.RecordRPCServerResponse("kms/ImportKey", metrics.FailureLabel)
return nil, status.Error(codes.Internal, err.Error())
}

k.metrics.RecordRPCServerResponse("kms/ImportKey", metrics.SuccessLabel)
return &v1.ImportKeyResponse{PublicKey: pubKeyHex}, nil
}

func (k *Service) ListKeys(
ctx context.Context,
req *v1.ListKeysRequest,
) (*v1.ListKeysResponse, error) {
observe := k.metrics.RecordRPCServerRequest("kms/ListKeys")
defer observe()
pubKeys, err := k.store.ListKeys(ctx)
if err != nil {
k.logger.Error(fmt.Sprintf("Failed to list keys: %v", err))
k.metrics.RecordRPCServerResponse("kms/ListKeys", metrics.FailureLabel)
return nil, status.Error(codes.Internal, err.Error())
}

k.metrics.RecordRPCServerResponse("kms/ListKeys", metrics.SuccessLabel)
return &v1.ListKeysResponse{PublicKeys: pubKeys}, nil
}
5 changes: 0 additions & 5 deletions internal/services/signing/signing.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func (s *Service) SignGeneric(
ctx context.Context,
req *v1.SignGenericRequest,
) (*v1.SignGenericResponse, error) {
observe := s.metrics.RecordRPCServerRequest("signing/SignGeneric")
defer observe()
// Take the public key and data from the request
pubKeyHex := common.Trim0x(req.GetPublicKey())
password := req.GetPassword()
Expand All @@ -56,7 +54,6 @@ func (s *Service) SignGeneric(
blsKey, err := s.store.RetrieveKey(ctx, pubKeyHex, password)
if err != nil {
s.logger.Error(fmt.Sprintf("Failed to retrieve key: %v", err))
s.metrics.RecordRPCServerResponse("signing/SignGeneric", metrics.FailureLabel)
return nil, status.Error(codes.Internal, err.Error())
}
s.keyCache[pubKeyHex] = blsKey
Expand All @@ -66,7 +63,6 @@ func (s *Service) SignGeneric(
data := req.GetData()
if len(data) > 32 {
s.logger.Error("Data is too long, must be 32 bytes")
s.metrics.RecordRPCServerResponse("signing/SignGeneric", metrics.FailureLabel)
return nil, status.Error(codes.InvalidArgument, "data is too long, must be 32 bytes")
}

Expand All @@ -75,6 +71,5 @@ func (s *Service) SignGeneric(
// Sign the data with the private key
sig := blsKey.SignMessage(byteArray)
s.logger.Info(fmt.Sprintf("Signed a message successfully using %s", req.PublicKey))
s.metrics.RecordRPCServerResponse("signing/SignGeneric", metrics.SuccessLabel)
return &v1.SignGenericResponse{Signature: sig.Serialize()}, nil
}
43 changes: 10 additions & 33 deletions monitoring/signer.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "remote_bls_rpc_server_request_duration_seconds{method=\"signing/SignGeneric\", quantile=\"0.95\"}",
"expr": "cerberus_rpc_server_request_duration_seconds{quantile=\"0.95\", method=\"/signer.v1.Signer/SignGeneric\"}",
"fullMetaSearch": false,
"includeNullMetadata": false,
"instant": false,
Expand Down Expand Up @@ -212,7 +212,7 @@
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "rate(remote_bls_rpc_server_request_total{method=\"signing/SignGeneric\"}[$__range])",
"expr": "rate(cerberus_rpc_server_request_total{method=\"/signer.v1.Signer/SignGeneric\"}[$__range])",
"fullMetaSearch": false,
"includeNullMetadata": true,
"instant": false,
Expand Down Expand Up @@ -240,6 +240,7 @@
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
Expand Down Expand Up @@ -285,34 +286,10 @@
}
},
"overrides": [
{
"__systemRef": "hideSeriesFrom",
"matcher": {
"id": "byNames",
"options": {
"mode": "exclude",
"names": [
"C {__name__=\"remote_bls_rpc_server_response_total\", instance=\"localhost:9091\", job=\"prometheus\", method=\"signing/SignGeneric\", status=\"success\"}"
],
"prefix": "All except:",
"readOnly": true
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": false,
"tooltip": false,
"viz": true
}
}
]
},
{
"matcher": {
"id": "byName",
"options": "C {__name__=\"remote_bls_rpc_server_response_total\", instance=\"localhost:9091\", job=\"prometheus\", method=\"signing/SignGeneric\", status=\"success\"}"
"options": "C {__name__=\"cerberus_rpc_server_request_total\", code=\"OK\", instance=\"localhost:9091\", job=\"prometheus\", method=\"/signer.v1.Signer/SignGeneric\"}"
},
"properties": [
{
Expand Down Expand Up @@ -351,10 +328,10 @@
"disableTextWrap": false,
"editorMode": "builder",
"exemplar": false,
"expr": "remote_bls_rpc_server_response_total{status=\"success\", method=\"signing/SignGeneric\"}",
"expr": "cerberus_rpc_server_request_total{method=\"/signer.v1.Signer/SignGeneric\", code=\"OK\"}",
"fullMetaSearch": false,
"hide": true,
"includeNullMetadata": true,
"includeNullMetadata": false,
"instant": false,
"legendFormat": "{{method}}",
"range": true,
Expand All @@ -368,7 +345,7 @@
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "remote_bls_rpc_server_response_total{method=\"signing/SignGeneric\"}",
"expr": "cerberus_rpc_server_request_total{method=\"/signer.v1.Signer/SignGeneric\"}",
"fullMetaSearch": false,
"hide": true,
"includeNullMetadata": true,
Expand Down Expand Up @@ -401,13 +378,13 @@
"list": []
},
"time": {
"from": "now-15m",
"from": "now-30m",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "BLS Signer",
"title": "Cerberus",
"uid": "bdzhsw9k29am8d",
"version": 17,
"version": 23,
"weekStart": ""
}

0 comments on commit 7320f9e

Please sign in to comment.