From cb710490ae2d17028a9bf79614006d5dbc6a4df9 Mon Sep 17 00:00:00 2001 From: Maxime Fischer Date: Mon, 15 Apr 2024 15:28:09 +0100 Subject: [PATCH] Add metrics to measure latency with redis in mettle worker. (#296) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We are planning on making elan read from redis as at the moment it is costing us >100£ on normal days to read from the bucket (can reach 300£/day on toolchain upgrades). However we're seeing a lot of context deadline exceeded in the logs regarding redis, so we need to fix those before doing the change to elan. This PR is for us to start monitoring the latency of redis commands to help us understand the impact of the fixes we'll add. --- ChangeLog | 4 ++ VERSION | 2 +- mettle/worker/redis.go | 90 ++++++++++++++++++++++++++++++++++-------- 3 files changed, 78 insertions(+), 18 deletions(-) diff --git a/ChangeLog b/ChangeLog index e5c3541f..14fcc780 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +Version 11.9.4 +-------------- + * Add metrics to measure latency with redis in mettle workers. + Version 11.9.3 -------------- * Avoid some cases of potential reordering of streamed events diff --git a/VERSION b/VERSION index 31ae9dea..ead8ba09 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -11.9.3 +11.9.4 diff --git a/mettle/worker/redis.go b/mettle/worker/redis.go index c4af1e9d..71af828d 100644 --- a/mettle/worker/redis.go +++ b/mettle/worker/redis.go @@ -4,10 +4,11 @@ import ( "context" "crypto/tls" "crypto/x509" - "golang.org/x/time/rate" "os" "time" + "golang.org/x/time/rate" + "github.com/bazelbuild/remote-apis-sdks/go/pkg/digest" "github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo" pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" @@ -29,6 +30,11 @@ var redisBytesRead = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "mettle", Name: "redis_bytes_read_total", }) +var redisLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "mettle", + Name: "redis_latency_ms", + Buckets: []float64{20, 50, 100, 200, 500, 1000, 2000, 5000, 10000}, +}, []string{"command"}) func getTLSConfig(caFile string) (*tls.Config, error) { caCert, err := os.ReadFile(caFile) @@ -68,30 +74,30 @@ func newRedisClient(client elan.Client, url, readURL, password, caFile string, u if readURL != "" { readClient = redis.NewClient(readOpts) } - return &redisClient{ + return &elanRedisWrapper{ elan: client, - redis: primaryClient, - readRedis: readClient, + redis: &monitoredRedisClient{primaryClient}, + readRedis: &monitoredRedisClient{readClient}, timeout: 1 * time.Second, maxSize: 200 * 1012, // 200 Kelly-Bootle standard units limiter: rate.NewLimiter(rate.Every(time.Second*10), 10), } } -type redisClient struct { +type elanRedisWrapper struct { elan elan.Client - redis *redis.Client - readRedis *redis.Client + redis redisClient + readRedis redisClient timeout time.Duration maxSize int64 limiter *rate.Limiter } -func (r *redisClient) Healthcheck() error { +func (r *elanRedisWrapper) Healthcheck() error { return r.elan.Healthcheck() } -func (r *redisClient) ReadBlob(dg *pb.Digest) ([]byte, error) { +func (r *elanRedisWrapper) ReadBlob(dg *pb.Digest) ([]byte, error) { ctx, cancel := context.WithTimeout(context.Background(), r.timeout) defer cancel() @@ -124,17 +130,17 @@ func (r *redisClient) ReadBlob(dg *pb.Digest) ([]byte, error) { return blob, nil } -func (r *redisClient) WriteBlob(blob []byte) (*pb.Digest, error) { +func (r *elanRedisWrapper) WriteBlob(blob []byte) (*pb.Digest, error) { return r.elan.WriteBlob(blob) } -func (r *redisClient) UpdateActionResult(ar *pb.UpdateActionResultRequest) (*pb.ActionResult, error) { +func (r *elanRedisWrapper) UpdateActionResult(ar *pb.UpdateActionResultRequest) (*pb.ActionResult, error) { // We don't currently do anything with this, because we never try to read action results from // Redis. return r.elan.UpdateActionResult(ar) } -func (r *redisClient) UploadIfMissing(entries []*uploadinfo.Entry, compressors []pb.Compressor_Value) error { +func (r *elanRedisWrapper) UploadIfMissing(entries []*uploadinfo.Entry, compressors []pb.Compressor_Value) error { // All we use Redis for is to filter down the missing set. // This is approximate only and assumes that Redis always has a strict subset of the total keys. missing := make([]*uploadinfo.Entry, 0, len(entries)) @@ -184,7 +190,7 @@ func (r *redisClient) UploadIfMissing(entries []*uploadinfo.Entry, compressors [ return nil // We never propagate Redis errors regardless. } -func (r *redisClient) BatchDownload(dgs []digest.Digest) (map[digest.Digest][]byte, error) { +func (r *elanRedisWrapper) BatchDownload(dgs []digest.Digest) (map[digest.Digest][]byte, error) { log.Debug("Checking Redis for batch of %d files...", len(dgs)) keys := make([]string, len(dgs)) for i, dg := range dgs { @@ -222,7 +228,7 @@ func (r *redisClient) BatchDownload(dgs []digest.Digest) (map[digest.Digest][]by // readBlobs reads a set of blobs from Redis. It returns nil on any failure. // On success some blobs may not have been available in Redis, in which case they'll be nil. -func (r *redisClient) readBlobs(keys []string, metrics bool) [][]byte { +func (r *elanRedisWrapper) readBlobs(keys []string, metrics bool) [][]byte { if len(keys) == 0 { return nil } @@ -267,7 +273,7 @@ func (r *redisClient) readBlobs(keys []string, metrics bool) [][]byte { } // writeBlobs writes a series of blobs to Redis. They should be passed in interleaved key/value order. -func (r *redisClient) writeBlobs(uploads []interface{}) { +func (r *elanRedisWrapper) writeBlobs(uploads []interface{}) { if len(uploads) == 0 { return } @@ -276,13 +282,14 @@ func (r *redisClient) writeBlobs(uploads []interface{}) { // plus this is not in the critical path, so it's not a big issue ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() + if cmd := r.redis.MSet(ctx, uploads...); cmd.Val() != "OK" { log.Warning("Failed to upload %d blobs to Redis: %s", len(uploads), cmd.Err()) } log.Debug("Wrote %d blobs to Redis...", len(uploads)/2) } -func (r *redisClient) ReadToFile(dg digest.Digest, filename string, compressed bool) error { +func (r *elanRedisWrapper) ReadToFile(dg digest.Digest, filename string, compressed bool) error { ctx, cancel := context.WithTimeout(context.Background(), r.timeout) defer cancel() blob, err := r.readRedis.Get(ctx, dg.Hash).Bytes() @@ -297,7 +304,56 @@ func (r *redisClient) ReadToFile(dg digest.Digest, filename string, compressed b return os.WriteFile(filename, blob, 0644) } -func (r *redisClient) GetDirectoryTree(dg *pb.Digest, usePacks bool) ([]*pb.Directory, error) { +func (r *elanRedisWrapper) GetDirectoryTree(dg *pb.Digest, usePacks bool) ([]*pb.Directory, error) { // TODO(peterebden): Figure out how we are going to stitch Redis into this. There are a lot of // little internal requests that we ideally want to be able to cache into Redis. return r.elan.GetDirectoryTree(dg, usePacks) } + +type redisClient interface { + MGet(ctx context.Context, keys ...string) *redis.SliceCmd + Get(ctx context.Context, key string) *redis.StringCmd + MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd + Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd +} + +// monitoredRedisClient wraps redis client with an histogram to measure +// latency on different commands. +type monitoredRedisClient struct { + redisClient +} + +func (c *monitoredRedisClient) MGet(ctx context.Context, keys ...string) *redis.SliceCmd { + start := time.Now() + defer func() { + redisLatency.WithLabelValues("MGET"). + Observe(float64(time.Since(start).Milliseconds())) + }() + return c.redisClient.MGet(ctx, keys...) +} + +func (c *monitoredRedisClient) Get(ctx context.Context, key string) *redis.StringCmd { + start := time.Now() + defer func() { + redisLatency.WithLabelValues("GET"). + Observe(float64(time.Since(start).Milliseconds())) + }() + return c.redisClient.Get(ctx, key) +} + +func (c *monitoredRedisClient) MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd { + start := time.Now() + defer func() { + redisLatency.WithLabelValues("MSET"). + Observe(float64(time.Since(start).Milliseconds())) + }() + return c.redisClient.MSet(ctx, pairs...) +} + +func (c *monitoredRedisClient) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd { + start := time.Now() + defer func() { + redisLatency.WithLabelValues("SET"). + Observe(float64(time.Since(start).Milliseconds())) + }() + return c.redisClient.Set(ctx, key, value, expiration) +}