Skip to content

Commit

Permalink
Fix missing metrics and verify metrics are not lost
Browse files Browse the repository at this point in the history
Signed-off-by: Davanum Srinivas <davanum@gmail.com>
  • Loading branch information
dims committed Jan 21, 2025
1 parent 3a2afb3 commit da7dd38
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 14 deletions.
3 changes: 3 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ type ServerConfig struct {

// ServerFeatureGate is a server level feature gate
ServerFeatureGate featuregate.FeatureGate

// Metrics types of metrics - should be either 'basic' or 'extensive'
Metrics string
}

// VerifyBootstrap sanity-checks the initial config for bootstrap case
Expand Down
14 changes: 1 addition & 13 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -227,6 +226,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
V2Deprecation: cfg.V2DeprecationEffective(),
ExperimentalLocalAddress: cfg.InferLocalAddr(),
ServerFeatureGate: cfg.ServerFeatureGate,
Metrics: cfg.Metrics,
}

if srvcfg.ExperimentalEnableDistributedTracing {
Expand Down Expand Up @@ -844,18 +844,6 @@ func (e *Etcd) createMetricsListener(murl url.URL) (net.Listener, error) {
}

func (e *Etcd) serveMetrics() (err error) {
if e.cfg.Metrics == "extensive" {
var opts prometheus.HistogramOpts
serverHandledHistogram := prometheus.NewHistogramVec(
opts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
err := prometheus.Register(serverHandledHistogram)
if err != nil {
e.GetLogger().Error("setting up prometheus metrics failed.", zap.Error(err))
}
}

if len(e.cfg.ListenMetricsUrls) > 0 {
metricsMux := http.NewServeMux()
etcdhttp.HandleMetrics(metricsMux)
Expand Down
7 changes: 7 additions & 0 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
serverMetrics.StreamServerInterceptor(),
}

// If extensive metrics are enabled, register a histogram to track the reponse latency of gRPC requests
if s.Cfg.Metrics == "extensive" {
unaryInterceptor, streamInterceptor := constructExtensiveMetricsInterceptors()
chainUnaryInterceptors = append(chainUnaryInterceptors, unaryInterceptor)
chainStreamInterceptors = append(chainStreamInterceptors, streamInterceptor)

Check warning on line 65 in server/etcdserver/api/v3rpc/grpc.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/api/v3rpc/grpc.go#L63-L65

Added lines #L63 - L65 were not covered by tests
}

if s.Cfg.ExperimentalEnableDistributedTracing {
chainUnaryInterceptors = append(chainUnaryInterceptors, otelgrpc.UnaryServerInterceptor(s.Cfg.ExperimentalTracerOptions...))
chainStreamInterceptors = append(chainStreamInterceptors, otelgrpc.StreamServerInterceptor(s.Cfg.ExperimentalTracerOptions...))
Expand Down
52 changes: 51 additions & 1 deletion server/etcdserver/api/v3rpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@

package v3rpc

import "github.com/prometheus/client_golang/prometheus"
import (
"context"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
)

var (
sentBytes = prometheus.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -56,3 +63,46 @@ func init() {
prometheus.MustRegister(streamFailures)
prometheus.MustRegister(clientRequests)
}

func splitMethodName(fullMethodName string) (string, string) {
fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
if i := strings.Index(fullMethodName, "/"); i >= 0 {
return fullMethodName[:i], fullMethodName[i+1:]

Check warning on line 70 in server/etcdserver/api/v3rpc/metrics.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/api/v3rpc/metrics.go#L67-L70

Added lines #L67 - L70 were not covered by tests
}
return "unknown", "unknown"

Check warning on line 72 in server/etcdserver/api/v3rpc/metrics.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/api/v3rpc/metrics.go#L72

Added line #L72 was not covered by tests
}

// constructExtensiveMetricsInterceptors constructs unary and stream interceptors to record histogram metrics for gRPC requests
func constructExtensiveMetricsInterceptors() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) {

Check warning on line 76 in server/etcdserver/api/v3rpc/metrics.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/api/v3rpc/metrics.go#L76

Added line #L76 was not covered by tests
// Define a new histogram metric using default buckets
serverHandledHistogram := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "grpc_server_handling_seconds",
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
Buckets: prometheus.DefBuckets,
},
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
prometheus.Register(serverHandledHistogram)

Check warning on line 86 in server/etcdserver/api/v3rpc/metrics.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/api/v3rpc/metrics.go#L78-L86

Added lines #L78 - L86 were not covered by tests

// method to record histogram metrics for both unary and stream requests
recordHistogramMetrics := func(serverHandledHistogram *prometheus.HistogramVec, grpcType, fullMethodName string, startTime time.Time) {
grpcService, grpcMethod := splitMethodName(fullMethodName)
serverHandledHistogram.WithLabelValues(grpcType, grpcService, grpcMethod).Observe(time.Since(startTime).Seconds())

Check warning on line 91 in server/etcdserver/api/v3rpc/metrics.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/api/v3rpc/metrics.go#L89-L91

Added lines #L89 - L91 were not covered by tests
}

// Add a new interceptor to spit out histogram metrics for unary requests
unaryInterceptor := func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
startTime := time.Now()
resp, err = handler(ctx, req)
recordHistogramMetrics(serverHandledHistogram, "unary", info.FullMethod, startTime)
return resp, err

Check warning on line 99 in server/etcdserver/api/v3rpc/metrics.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/api/v3rpc/metrics.go#L95-L99

Added lines #L95 - L99 were not covered by tests
}
streamInterceptor := func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
startTime := time.Now()
err := handler(srv, ss)
recordHistogramMetrics(serverHandledHistogram, "stream", info.FullMethod, startTime)
return err

Check warning on line 105 in server/etcdserver/api/v3rpc/metrics.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/api/v3rpc/metrics.go#L101-L105

Added lines #L101 - L105 were not covered by tests
}
return unaryInterceptor, streamInterceptor

Check warning on line 107 in server/etcdserver/api/v3rpc/metrics.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/api/v3rpc/metrics.go#L107

Added line #L107 was not covered by tests
}
4 changes: 4 additions & 0 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type ClusterConfig struct {
ExperimentalMaxLearners int
DisableStrictReconfigCheck bool
CorruptCheckTime time.Duration
Metrics string
}

type Cluster struct {
Expand Down Expand Up @@ -292,6 +293,7 @@ func (c *Cluster) MustNewMember(t testutil.TB) *Member {
ExperimentalMaxLearners: c.Cfg.ExperimentalMaxLearners,
DisableStrictReconfigCheck: c.Cfg.DisableStrictReconfigCheck,
CorruptCheckTime: c.Cfg.CorruptCheckTime,
Metrics: c.Cfg.Metrics,
})
m.DiscoveryURL = c.Cfg.DiscoveryURL
return m
Expand Down Expand Up @@ -617,6 +619,7 @@ type MemberConfig struct {
ExperimentalMaxLearners int
DisableStrictReconfigCheck bool
CorruptCheckTime time.Duration
Metrics string
}

// MustNewMember return an inited member with the given name. If peerTLS is
Expand Down Expand Up @@ -731,6 +734,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
if mcfg.ExperimentalMaxLearners != 0 {
m.ExperimentalMaxLearners = mcfg.ExperimentalMaxLearners
}
m.Metrics = mcfg.Metrics
m.V2Deprecation = config.V2_DEPR_DEFAULT
m.GRPCServerRecorder = &grpctesting.GRPCRecorder{}

Expand Down

0 comments on commit da7dd38

Please sign in to comment.