Skip to content

Commit

Permalink
Allow setting redis timeout via flags/environment. (#301)
Browse files Browse the repository at this point in the history
  • Loading branch information
fische authored Apr 22, 2024
1 parent 7429850 commit 339b2d5
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 38 deletions.
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 11.9.8
--------------
* Allow specifying redis timeout through flags/environment.

Version 11.9.7
--------------
* Do not use redis clients when no URL is provided.
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.9.7
11.9.8
38 changes: 22 additions & 16 deletions mettle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ type StorageOpts struct {
}

type RedisOpts struct {
URL string `long:"url" env:"REDIS_URL" description:"host:port of Redis server"`
ReadURL string `long:"read_url" env:"REDIS_READ_URL" description:"host:port of a Redis read replica, if set any read operation will be routed to it"`
Password string `long:"password" description:"AUTH password"`
PasswordFile string `long:"password_file" env:"REDIS_PASSWORD_FILE" description:"File containing AUTH password"`
PoolSize int `long:"pool_size" env:"REDIS_POOL_SIZE" default:"10" description:"Size of connection pool on primary redis client"`
ReadPoolSize int `long:"read_pool_size" env:"REDIS_READ_POOL_SIZE" default:"10" description:"Size of connection pool on reading redis client"`
CAFile string `long:"ca_file" env:"REDIS_CA_FILE" description:"File containing the Redis instance CA cert"`
TLS bool `long:"tls" description:"Use TLS for connecting to Redis"`
URL string `long:"url" env:"REDIS_URL" description:"host:port of Redis server"`
ReadURL string `long:"read_url" env:"REDIS_READ_URL" description:"host:port of a Redis read replica, if set any read operation will be routed to it"`
Password string `long:"password" description:"AUTH password"`
PasswordFile string `long:"password_file" env:"REDIS_PASSWORD_FILE" description:"File containing AUTH password"`
PoolSize int `long:"pool_size" env:"REDIS_POOL_SIZE" default:"10" description:"Size of connection pool on primary redis client"`
ReadPoolSize int `long:"read_pool_size" env:"REDIS_READ_POOL_SIZE" default:"10" description:"Size of connection pool on reading redis client"`
ReadTimeout flags.Duration `long:"read_timeout" env:"REDIS_READ_TIMEOUT" default:"1s" description:"Timeout on network read (not read commands)"`
WriteTimeout flags.Duration `long:"write_timeout" env:"REDIS_WRITE_TIMEOUT" default:"1m" description:"Timeout on network write (not write commands)"`
CAFile string `long:"ca_file" env:"REDIS_CA_FILE" description:"File containing the Redis instance CA cert"`
TLS bool `long:"tls" description:"Use TLS for connecting to Redis"`
}

type CacheOpts struct {
Expand Down Expand Up @@ -236,17 +238,21 @@ func (r RedisOpts) Clients() (primary, read *redis.Client) {
tlsConfig := r.ReadTLSConfig()

primary = redis.NewClient(&redis.Options{
Addr: r.URL,
Password: password,
TLSConfig: tlsConfig,
PoolSize: r.PoolSize,
Addr: r.URL,
Password: password,
TLSConfig: tlsConfig,
PoolSize: r.PoolSize,
ReadTimeout: time.Duration(r.ReadTimeout),
WriteTimeout: time.Duration(r.WriteTimeout),
})
if r.ReadURL != "" {
read = redis.NewClient(&redis.Options{
Addr: r.ReadURL,
Password: password,
TLSConfig: tlsConfig,
PoolSize: r.ReadPoolSize,
Addr: r.ReadURL,
Password: password,
TLSConfig: tlsConfig,
PoolSize: r.ReadPoolSize,
ReadTimeout: time.Duration(r.ReadTimeout),
WriteTimeout: time.Duration(r.WriteTimeout),
})
} else {
read = primary
Expand Down
26 changes: 5 additions & 21 deletions mettle/worker/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func newRedisClient(client elan.Client, primaryRedis, readRedis *redis.Client) e
elan: client,
redis: &monitoredRedisClient{primaryRedis},
readRedis: &monitoredRedisClient{readRedis},
timeout: 1 * time.Second,
maxSize: 200 * 1012, // 200 Kelly-Bootle standard units
limiter: rate.NewLimiter(rate.Every(time.Second*10), 10),
}
Expand All @@ -57,7 +56,6 @@ type elanRedisWrapper struct {
elan elan.Client
redis redisClient
readRedis redisClient
timeout time.Duration
maxSize int64
limiter *rate.Limiter
}
Expand All @@ -67,11 +65,8 @@ func (r *elanRedisWrapper) Healthcheck() error {
}

func (r *elanRedisWrapper) ReadBlob(dg *pb.Digest) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
defer cancel()

if r.limiter.Tokens() >= 1.0 {
cmd := r.readRedis.Get(ctx, dg.Hash)
cmd := r.readRedis.Get(context.Background(), dg.Hash)
if err := cmd.Err(); err == nil {
blob, _ := cmd.Bytes()
return blob, nil
Expand All @@ -89,9 +84,7 @@ func (r *elanRedisWrapper) ReadBlob(dg *pb.Digest) ([]byte, error) {
}
if dg.SizeBytes < r.maxSize {
go func(hash string) {
ctx, cancel = context.WithTimeout(context.Background(), r.timeout)
defer cancel()
if cmd := r.redis.Set(ctx, hash, blob, 0); cmd.Val() != "OK" {
if cmd := r.redis.Set(context.Background(), hash, blob, 0); cmd.Val() != "OK" {
log.Warning("Failed to set blob in Redis: %s", cmd.Err())
}
}(dg.Hash)
Expand Down Expand Up @@ -205,9 +198,7 @@ func (r *elanRedisWrapper) readBlobs(keys []string, metrics bool) [][]byte {
// Bail out immediately if Redis has exceeded error limit
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
defer cancel()
resp, err := r.readRedis.MGet(ctx, keys...).Result()
resp, err := r.readRedis.MGet(context.Background(), keys...).Result()
if err != nil {
log.Warning("Failed to check blobs in Redis: %s", err)
r.limiter.Reserve()
Expand Down Expand Up @@ -247,21 +238,14 @@ func (r *elanRedisWrapper) writeBlobs(uploads []interface{}) {
return
}
log.Debug("Writing %d blobs to Redis...", len(uploads)/2)
// we are not using the client timeout, as this will most likely take more than the default 10s
// plus this is not in the critical path, so it's not a big issue
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

if cmd := r.redis.MSet(ctx, uploads...); cmd.Val() != "OK" {
if cmd := r.redis.MSet(context.Background(), uploads...); cmd.Val() != "OK" {
log.Warning("Failed to upload %d blobs to Redis: %s", len(uploads), cmd.Err())
}
log.Debug("Wrote %d blobs to Redis...", len(uploads)/2)
}

func (r *elanRedisWrapper) ReadToFile(dg digest.Digest, filename string, compressed bool) error {
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
defer cancel()
blob, err := r.readRedis.Get(ctx, dg.Hash).Bytes()
blob, err := r.readRedis.Get(context.Background(), dg.Hash).Bytes()
if err != nil {
if err != redis.Nil { // Not found.
log.Warning("Error reading blob from Redis: %s", err)
Expand Down

0 comments on commit 339b2d5

Please sign in to comment.