Skip to content

Commit

Permalink
Add metrics to measure latency with redis in mettle worker. (#296)
Browse files Browse the repository at this point in the history
We are planning on making elan read from redis as at the moment it is costing us >100£ on normal days to read from the bucket (can reach 300£/day on toolchain upgrades). However we're seeing a lot of context deadline exceeded in the logs regarding redis, so we need to fix those before doing the change to elan. This PR is for us to start monitoring the latency of redis commands to help us understand the impact of the fixes we'll add.
  • Loading branch information
fische authored Apr 15, 2024
1 parent 6c38863 commit cb71049
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 18 deletions.
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 11.9.4
--------------
* Add metrics to measure latency with redis in mettle workers.

Version 11.9.3
--------------
* Avoid some cases of potential reordering of streamed events
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.9.3
11.9.4
90 changes: 73 additions & 17 deletions mettle/worker/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"golang.org/x/time/rate"
"os"
"time"

"golang.org/x/time/rate"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo"
pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
Expand All @@ -29,6 +30,11 @@ var redisBytesRead = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Name: "redis_bytes_read_total",
})
var redisLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "mettle",
Name: "redis_latency_ms",
Buckets: []float64{20, 50, 100, 200, 500, 1000, 2000, 5000, 10000},
}, []string{"command"})

func getTLSConfig(caFile string) (*tls.Config, error) {
caCert, err := os.ReadFile(caFile)
Expand Down Expand Up @@ -68,30 +74,30 @@ func newRedisClient(client elan.Client, url, readURL, password, caFile string, u
if readURL != "" {
readClient = redis.NewClient(readOpts)
}
return &redisClient{
return &elanRedisWrapper{
elan: client,
redis: primaryClient,
readRedis: readClient,
redis: &monitoredRedisClient{primaryClient},
readRedis: &monitoredRedisClient{readClient},
timeout: 1 * time.Second,
maxSize: 200 * 1012, // 200 Kelly-Bootle standard units
limiter: rate.NewLimiter(rate.Every(time.Second*10), 10),
}
}

type redisClient struct {
type elanRedisWrapper struct {
elan elan.Client
redis *redis.Client
readRedis *redis.Client
redis redisClient
readRedis redisClient
timeout time.Duration
maxSize int64
limiter *rate.Limiter
}

func (r *redisClient) Healthcheck() error {
func (r *elanRedisWrapper) Healthcheck() error {
return r.elan.Healthcheck()
}

func (r *redisClient) ReadBlob(dg *pb.Digest) ([]byte, error) {
func (r *elanRedisWrapper) ReadBlob(dg *pb.Digest) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
defer cancel()

Expand Down Expand Up @@ -124,17 +130,17 @@ func (r *redisClient) ReadBlob(dg *pb.Digest) ([]byte, error) {
return blob, nil
}

func (r *redisClient) WriteBlob(blob []byte) (*pb.Digest, error) {
func (r *elanRedisWrapper) WriteBlob(blob []byte) (*pb.Digest, error) {
return r.elan.WriteBlob(blob)
}

func (r *redisClient) UpdateActionResult(ar *pb.UpdateActionResultRequest) (*pb.ActionResult, error) {
func (r *elanRedisWrapper) UpdateActionResult(ar *pb.UpdateActionResultRequest) (*pb.ActionResult, error) {
// We don't currently do anything with this, because we never try to read action results from
// Redis.
return r.elan.UpdateActionResult(ar)
}

func (r *redisClient) UploadIfMissing(entries []*uploadinfo.Entry, compressors []pb.Compressor_Value) error {
func (r *elanRedisWrapper) UploadIfMissing(entries []*uploadinfo.Entry, compressors []pb.Compressor_Value) error {
// All we use Redis for is to filter down the missing set.
// This is approximate only and assumes that Redis always has a strict subset of the total keys.
missing := make([]*uploadinfo.Entry, 0, len(entries))
Expand Down Expand Up @@ -184,7 +190,7 @@ func (r *redisClient) UploadIfMissing(entries []*uploadinfo.Entry, compressors [
return nil // We never propagate Redis errors regardless.
}

func (r *redisClient) BatchDownload(dgs []digest.Digest) (map[digest.Digest][]byte, error) {
func (r *elanRedisWrapper) BatchDownload(dgs []digest.Digest) (map[digest.Digest][]byte, error) {
log.Debug("Checking Redis for batch of %d files...", len(dgs))
keys := make([]string, len(dgs))
for i, dg := range dgs {
Expand Down Expand Up @@ -222,7 +228,7 @@ func (r *redisClient) BatchDownload(dgs []digest.Digest) (map[digest.Digest][]by

// readBlobs reads a set of blobs from Redis. It returns nil on any failure.
// On success some blobs may not have been available in Redis, in which case they'll be nil.
func (r *redisClient) readBlobs(keys []string, metrics bool) [][]byte {
func (r *elanRedisWrapper) readBlobs(keys []string, metrics bool) [][]byte {
if len(keys) == 0 {
return nil
}
Expand Down Expand Up @@ -267,7 +273,7 @@ func (r *redisClient) readBlobs(keys []string, metrics bool) [][]byte {
}

// writeBlobs writes a series of blobs to Redis. They should be passed in interleaved key/value order.
func (r *redisClient) writeBlobs(uploads []interface{}) {
func (r *elanRedisWrapper) writeBlobs(uploads []interface{}) {
if len(uploads) == 0 {
return
}
Expand All @@ -276,13 +282,14 @@ func (r *redisClient) writeBlobs(uploads []interface{}) {
// plus this is not in the critical path, so it's not a big issue
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

if cmd := r.redis.MSet(ctx, uploads...); cmd.Val() != "OK" {
log.Warning("Failed to upload %d blobs to Redis: %s", len(uploads), cmd.Err())
}
log.Debug("Wrote %d blobs to Redis...", len(uploads)/2)
}

func (r *redisClient) ReadToFile(dg digest.Digest, filename string, compressed bool) error {
func (r *elanRedisWrapper) ReadToFile(dg digest.Digest, filename string, compressed bool) error {
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
defer cancel()
blob, err := r.readRedis.Get(ctx, dg.Hash).Bytes()
Expand All @@ -297,7 +304,56 @@ func (r *redisClient) ReadToFile(dg digest.Digest, filename string, compressed b
return os.WriteFile(filename, blob, 0644)
}

func (r *redisClient) GetDirectoryTree(dg *pb.Digest, usePacks bool) ([]*pb.Directory, error) {
func (r *elanRedisWrapper) GetDirectoryTree(dg *pb.Digest, usePacks bool) ([]*pb.Directory, error) {
// TODO(peterebden): Figure out how we are going to stitch Redis into this. There are a lot of // little internal requests that we ideally want to be able to cache into Redis.
return r.elan.GetDirectoryTree(dg, usePacks)
}

type redisClient interface {
MGet(ctx context.Context, keys ...string) *redis.SliceCmd
Get(ctx context.Context, key string) *redis.StringCmd
MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd
}

// monitoredRedisClient wraps redis client with an histogram to measure
// latency on different commands.
type monitoredRedisClient struct {
redisClient
}

func (c *monitoredRedisClient) MGet(ctx context.Context, keys ...string) *redis.SliceCmd {
start := time.Now()
defer func() {
redisLatency.WithLabelValues("MGET").
Observe(float64(time.Since(start).Milliseconds()))
}()
return c.redisClient.MGet(ctx, keys...)
}

func (c *monitoredRedisClient) Get(ctx context.Context, key string) *redis.StringCmd {
start := time.Now()
defer func() {
redisLatency.WithLabelValues("GET").
Observe(float64(time.Since(start).Milliseconds()))
}()
return c.redisClient.Get(ctx, key)
}

func (c *monitoredRedisClient) MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd {
start := time.Now()
defer func() {
redisLatency.WithLabelValues("MSET").
Observe(float64(time.Since(start).Milliseconds()))
}()
return c.redisClient.MSet(ctx, pairs...)
}

func (c *monitoredRedisClient) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd {
start := time.Now()
defer func() {
redisLatency.WithLabelValues("SET").
Observe(float64(time.Since(start).Milliseconds()))
}()
return c.redisClient.Set(ctx, key, value, expiration)
}

0 comments on commit cb71049

Please sign in to comment.