diff --git a/ChangeLog b/ChangeLog index fa7bec9a..d7f7b3ff 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +Version 11.9.8 +-------------- + * Allow specifying redis timeout through flags/environment. + Version 11.9.7 -------------- * Do not use redis clients when no URL is provided. diff --git a/VERSION b/VERSION index 2f9c5843..76d3a6cf 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -11.9.7 +11.9.8 diff --git a/mettle/main.go b/mettle/main.go index 5cf7d06d..41b89ca4 100644 --- a/mettle/main.go +++ b/mettle/main.go @@ -31,14 +31,16 @@ type StorageOpts struct { } type RedisOpts struct { - URL string `long:"url" env:"REDIS_URL" description:"host:port of Redis server"` - ReadURL string `long:"read_url" env:"REDIS_READ_URL" description:"host:port of a Redis read replica, if set any read operation will be routed to it"` - Password string `long:"password" description:"AUTH password"` - PasswordFile string `long:"password_file" env:"REDIS_PASSWORD_FILE" description:"File containing AUTH password"` - PoolSize int `long:"pool_size" env:"REDIS_POOL_SIZE" default:"10" description:"Size of connection pool on primary redis client"` - ReadPoolSize int `long:"read_pool_size" env:"REDIS_READ_POOL_SIZE" default:"10" description:"Size of connection pool on reading redis client"` - CAFile string `long:"ca_file" env:"REDIS_CA_FILE" description:"File containing the Redis instance CA cert"` - TLS bool `long:"tls" description:"Use TLS for connecting to Redis"` + URL string `long:"url" env:"REDIS_URL" description:"host:port of Redis server"` + ReadURL string `long:"read_url" env:"REDIS_READ_URL" description:"host:port of a Redis read replica, if set any read operation will be routed to it"` + Password string `long:"password" description:"AUTH password"` + PasswordFile string `long:"password_file" env:"REDIS_PASSWORD_FILE" description:"File containing AUTH password"` + PoolSize int `long:"pool_size" env:"REDIS_POOL_SIZE" default:"10" description:"Size of connection pool on primary redis client"` + ReadPoolSize int `long:"read_pool_size" env:"REDIS_READ_POOL_SIZE" default:"10" description:"Size of connection pool on reading redis client"` + ReadTimeout flags.Duration `long:"read_timeout" env:"REDIS_READ_TIMEOUT" default:"1s" description:"Timeout on network read (not read commands)"` + WriteTimeout flags.Duration `long:"write_timeout" env:"REDIS_WRITE_TIMEOUT" default:"1m" description:"Timeout on network write (not write commands)"` + CAFile string `long:"ca_file" env:"REDIS_CA_FILE" description:"File containing the Redis instance CA cert"` + TLS bool `long:"tls" description:"Use TLS for connecting to Redis"` } type CacheOpts struct { @@ -236,17 +238,21 @@ func (r RedisOpts) Clients() (primary, read *redis.Client) { tlsConfig := r.ReadTLSConfig() primary = redis.NewClient(&redis.Options{ - Addr: r.URL, - Password: password, - TLSConfig: tlsConfig, - PoolSize: r.PoolSize, + Addr: r.URL, + Password: password, + TLSConfig: tlsConfig, + PoolSize: r.PoolSize, + ReadTimeout: time.Duration(r.ReadTimeout), + WriteTimeout: time.Duration(r.WriteTimeout), }) if r.ReadURL != "" { read = redis.NewClient(&redis.Options{ - Addr: r.ReadURL, - Password: password, - TLSConfig: tlsConfig, - PoolSize: r.ReadPoolSize, + Addr: r.ReadURL, + Password: password, + TLSConfig: tlsConfig, + PoolSize: r.ReadPoolSize, + ReadTimeout: time.Duration(r.ReadTimeout), + WriteTimeout: time.Duration(r.WriteTimeout), }) } else { read = primary diff --git a/mettle/worker/redis.go b/mettle/worker/redis.go index adcd1209..7c2eab33 100644 --- a/mettle/worker/redis.go +++ b/mettle/worker/redis.go @@ -47,7 +47,6 @@ func newRedisClient(client elan.Client, primaryRedis, readRedis *redis.Client) e elan: client, redis: &monitoredRedisClient{primaryRedis}, readRedis: &monitoredRedisClient{readRedis}, - timeout: 1 * time.Second, maxSize: 200 * 1012, // 200 Kelly-Bootle standard units limiter: rate.NewLimiter(rate.Every(time.Second*10), 10), } @@ -57,7 +56,6 @@ type elanRedisWrapper struct { elan elan.Client redis redisClient readRedis redisClient - timeout time.Duration maxSize int64 limiter *rate.Limiter } @@ -67,11 +65,8 @@ func (r *elanRedisWrapper) Healthcheck() error { } func (r *elanRedisWrapper) ReadBlob(dg *pb.Digest) ([]byte, error) { - ctx, cancel := context.WithTimeout(context.Background(), r.timeout) - defer cancel() - if r.limiter.Tokens() >= 1.0 { - cmd := r.readRedis.Get(ctx, dg.Hash) + cmd := r.readRedis.Get(context.Background(), dg.Hash) if err := cmd.Err(); err == nil { blob, _ := cmd.Bytes() return blob, nil @@ -89,9 +84,7 @@ func (r *elanRedisWrapper) ReadBlob(dg *pb.Digest) ([]byte, error) { } if dg.SizeBytes < r.maxSize { go func(hash string) { - ctx, cancel = context.WithTimeout(context.Background(), r.timeout) - defer cancel() - if cmd := r.redis.Set(ctx, hash, blob, 0); cmd.Val() != "OK" { + if cmd := r.redis.Set(context.Background(), hash, blob, 0); cmd.Val() != "OK" { log.Warning("Failed to set blob in Redis: %s", cmd.Err()) } }(dg.Hash) @@ -205,9 +198,7 @@ func (r *elanRedisWrapper) readBlobs(keys []string, metrics bool) [][]byte { // Bail out immediately if Redis has exceeded error limit return nil } - ctx, cancel := context.WithTimeout(context.Background(), r.timeout) - defer cancel() - resp, err := r.readRedis.MGet(ctx, keys...).Result() + resp, err := r.readRedis.MGet(context.Background(), keys...).Result() if err != nil { log.Warning("Failed to check blobs in Redis: %s", err) r.limiter.Reserve() @@ -247,21 +238,14 @@ func (r *elanRedisWrapper) writeBlobs(uploads []interface{}) { return } log.Debug("Writing %d blobs to Redis...", len(uploads)/2) - // we are not using the client timeout, as this will most likely take more than the default 10s - // plus this is not in the critical path, so it's not a big issue - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() - - if cmd := r.redis.MSet(ctx, uploads...); cmd.Val() != "OK" { + if cmd := r.redis.MSet(context.Background(), uploads...); cmd.Val() != "OK" { log.Warning("Failed to upload %d blobs to Redis: %s", len(uploads), cmd.Err()) } log.Debug("Wrote %d blobs to Redis...", len(uploads)/2) } func (r *elanRedisWrapper) ReadToFile(dg digest.Digest, filename string, compressed bool) error { - ctx, cancel := context.WithTimeout(context.Background(), r.timeout) - defer cancel() - blob, err := r.readRedis.Get(ctx, dg.Hash).Bytes() + blob, err := r.readRedis.Get(context.Background(), dg.Hash).Bytes() if err != nil { if err != redis.Nil { // Not found. log.Warning("Error reading blob from Redis: %s", err)