From 5632804c012bf6b2d9197988ea696a921504bad7 Mon Sep 17 00:00:00 2001 From: isobel Date: Thu, 10 Aug 2023 09:38:16 +0100 Subject: [PATCH 1/5] Pass CA file location as an arg to the redis instance --- ChangeLog | 4 ++++ VERSION | 2 +- mettle/main.go | 7 ++++--- mettle/worker/redis.go | 19 ++++++++++++++++--- mettle/worker/worker.go | 16 ++++++++-------- 5 files changed, 33 insertions(+), 15 deletions(-) diff --git a/ChangeLog b/ChangeLog index a6da13fa..a1783c93 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +Version 11.1.1 +-------------- + * Pass CA cert file location to Redis instance + Version 11.1.0 -------------- * Add configurable number of pub/sub pollers, default to 10 diff --git a/VERSION b/VERSION index d8ffaa07..0763616c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -11.1.0 \ No newline at end of file +11.1.1 \ No newline at end of file diff --git a/mettle/main.go b/mettle/main.go index 015ccbb4..855b99b4 100644 --- a/mettle/main.go +++ b/mettle/main.go @@ -32,6 +32,7 @@ type RedisOpts struct { ReadURL string `long:"read_url" env:"REDIS_READ_URL" description:"host:port of a Redis read replica, if set any read operation will be routed to it"` Password string `long:"password" description:"AUTH password"` PasswordFile string `long:"password_file" env:"REDIS_PASSWORD_FILE" description:"File containing AUTH password"` + CAFile string `long:"ca_file" env:"REDIS_CA_FILE" description:"File containing the Redis instance CA cert"` TLS bool `long:"tls" description:"Use TLS for connecting to Redis"` } @@ -176,11 +177,11 @@ func main() { } for i := 0; i < opts.Dual.NumWorkers; i++ { storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)] - go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, opts.Dual.Redis.URL, opts.Dual.Redis.ReadURL, opts.Dual.Redis.ReadPassword(), opts.Dual.Redis.TLS, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown) + go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, opts.Dual.Redis.URL, opts.Dual.Redis.ReadURL, opts.Dual.Redis.ReadPassword(), opts.Dual.Redis.CAFile, opts.Dual.Redis.TLS, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown) } api.ServeForever(opts.Dual.GRPC, "", requests, responses, responses, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS, opts.API.Queues.NumPollers) } else if cmd == "worker" { - worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, opts.Worker.Redis.URL, opts.Worker.Redis.ReadURL, opts.Worker.Redis.ReadPassword(), opts.Worker.Redis.TLS, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown) + worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, opts.Worker.Redis.URL, opts.Worker.Redis.ReadURL, opts.Worker.Redis.ReadPassword(), opts.Worker.Redis.CAFile, opts.Worker.Redis.TLS, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown) } else if cmd == "api" { api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.RequestQueue, opts.API.Queues.ResponseQueue+opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.PreResponseQueue, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS, opts.API.Queues.NumPollers) } else if err := one(); err != nil { @@ -209,7 +210,7 @@ func one() error { defer pprof.WriteHeapProfile(f) } for _, action := range opts.One.Args.Actions { - if err := worker.RunOne(opts.InstanceName, "mettle-one", opts.One.Storage.Storage, opts.One.Dir, opts.One.Cache.Dir, opts.One.Sandbox, opts.One.AltSandbox, opts.One.Storage.TokenFile, opts.One.Redis.URL, opts.One.Redis.ReadURL, opts.One.Redis.ReadPassword(), opts.One.Redis.TLS, opts.One.Cache.Prefix, opts.One.Cache.Part, false, opts.One.Storage.TLS, action.ToProto()); err != nil { + if err := worker.RunOne(opts.InstanceName, "mettle-one", opts.One.Storage.Storage, opts.One.Dir, opts.One.Cache.Dir, opts.One.Sandbox, opts.One.AltSandbox, opts.One.Storage.TokenFile, opts.One.Redis.URL, opts.One.Redis.ReadURL, opts.One.Redis.ReadPassword(), opts.One.Redis.CAFile, opts.One.Redis.TLS, opts.One.Cache.Prefix, opts.One.Cache.Part, false, opts.One.Storage.TLS, action.ToProto()); err != nil { return err } } diff --git a/mettle/worker/redis.go b/mettle/worker/redis.go index f700f85c..9c22314b 100644 --- a/mettle/worker/redis.go +++ b/mettle/worker/redis.go @@ -3,6 +3,7 @@ package worker import ( "context" "crypto/tls" + "crypto/x509" "os" "time" @@ -28,11 +29,23 @@ var redisBytesRead = prometheus.NewCounter(prometheus.CounterOpts{ Name: "redis_bytes_read_total", }) +func getTLSConfig(caFile string) *tls.Config { + caCert, err := os.ReadFile(caFile) + if err != nil { + log.Warning("Failed to collect CA cert from file %s: '%s'. Redis connection will not work", caFile, err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + return &tls.Config{ + RootCAs: caCertPool, + } +} + // newRedisClient augments an existing elan.Client with a Redis connection. // All usage of Redis is best-effort only. // If readURL is set, all reads will happen on this URL. If not, everything // will go to url. -func newRedisClient(client elan.Client, url, readURL, password string, useTLS bool) elan.Client { +func newRedisClient(client elan.Client, url, readURL, password, caFile string, useTLS bool) elan.Client { primaryOpts := &redis.Options{ Addr: url, Password: password, @@ -42,8 +55,8 @@ func newRedisClient(client elan.Client, url, readURL, password string, useTLS bo Password: password, } if useTLS { - primaryOpts.TLSConfig = &tls.Config{} - readOpts.TLSConfig = &tls.Config{} + primaryOpts.TLSConfig = getTLSConfig(caFile) + readOpts.TLSConfig = getTLSConfig(caFile) } primaryClient := redis.NewClient(primaryOpts) readClient := primaryClient diff --git a/mettle/worker/worker.go b/mettle/worker/worker.go index 5a8afe05..6e36f598 100644 --- a/mettle/worker/worker.go +++ b/mettle/worker/worker.go @@ -146,16 +146,16 @@ func init() { } // RunForever runs the worker, receiving jobs until terminated. -func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) { - err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension, immediateShutdown) +func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) { + err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension, immediateShutdown) log.Fatalf("Failed to run: %s", err) } // RunOne runs one single request, returning any error received. -func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tokenFile, redis, readRedis, redisPassword string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, digest *pb.Digest) error { +func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, digest *pb.Digest) error { // Must create this to submit on first topic := common.MustOpenTopic("mem://requests") - w, err := initialiseWorker(instanceName, "mem://requests", "mem://responses", name, storage, dir, cacheDir, "", sandbox, altSandbox, "", "", tokenFile, redis, readRedis, redisPassword, redisTLS, cachePrefix, cacheParts, clean, secureStorage, 0, math.MaxInt64, 100.0, "", nil, 0) + w, err := initialiseWorker(instanceName, "mem://requests", "mem://responses", name, storage, dir, cacheDir, "", sandbox, altSandbox, "", "", tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, 0, math.MaxInt64, 100.0, "", nil, 0) if err != nil { return err } @@ -182,8 +182,8 @@ func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tok return nil } -func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error { - w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension) +func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error { + w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension) if err != nil { return err } @@ -235,7 +235,7 @@ func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, c } } -func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration) (*worker, error) { +func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration) (*worker, error) { // Make sure we have a directory to run in if err := os.MkdirAll(dir, os.ModeDir|0755); err != nil { return nil, fmt.Errorf("Failed to create working directory: %s", err) @@ -312,7 +312,7 @@ func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, metricTicker: time.NewTicker(5 * time.Minute), } if redis != "" { - w.client = newRedisClient(client, redis, readRedis, redisPassword, redisTLS) + w.client = newRedisClient(client, redis, readRedis, redisPassword, redisCAFile, redisTLS) } if ackExtension > 0 { if !strings.HasPrefix(requestQueue, "gcppubsub://") { From 7b2c3749bba4321c147d7130eeee872000526665 Mon Sep 17 00:00:00 2001 From: isobel Date: Thu, 10 Aug 2023 09:48:36 +0100 Subject: [PATCH 2/5] Add return after logging warning --- mettle/worker/redis.go | 1 + 1 file changed, 1 insertion(+) diff --git a/mettle/worker/redis.go b/mettle/worker/redis.go index 9c22314b..b0c61f1e 100644 --- a/mettle/worker/redis.go +++ b/mettle/worker/redis.go @@ -33,6 +33,7 @@ func getTLSConfig(caFile string) *tls.Config { caCert, err := os.ReadFile(caFile) if err != nil { log.Warning("Failed to collect CA cert from file %s: '%s'. Redis connection will not work", caFile, err) + return &tls.Config{} } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) From eeaa899a120c1fe0f6a2a2d51d3380230d1daff1 Mon Sep 17 00:00:00 2001 From: isobel Date: Thu, 10 Aug 2023 10:55:24 +0100 Subject: [PATCH 3/5] Propagate error & semver change --- ChangeLog | 2 +- VERSION | 2 +- mettle/worker/redis.go | 19 +++++++++++-------- mettle/worker/worker.go | 5 ++++- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/ChangeLog b/ChangeLog index a1783c93..17eb54b6 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,4 +1,4 @@ -Version 11.1.1 +Version 11.2.0 -------------- * Pass CA cert file location to Redis instance diff --git a/VERSION b/VERSION index 0763616c..db500e35 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -11.1.1 \ No newline at end of file +11.2.0 \ No newline at end of file diff --git a/mettle/worker/redis.go b/mettle/worker/redis.go index b0c61f1e..f81f5983 100644 --- a/mettle/worker/redis.go +++ b/mettle/worker/redis.go @@ -29,24 +29,23 @@ var redisBytesRead = prometheus.NewCounter(prometheus.CounterOpts{ Name: "redis_bytes_read_total", }) -func getTLSConfig(caFile string) *tls.Config { +func getTLSConfig(caFile string) (*tls.Config, error) { caCert, err := os.ReadFile(caFile) if err != nil { - log.Warning("Failed to collect CA cert from file %s: '%s'. Redis connection will not work", caFile, err) - return &tls.Config{} + return &tls.Config{}, err } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) return &tls.Config{ RootCAs: caCertPool, - } + }, nil } // newRedisClient augments an existing elan.Client with a Redis connection. // All usage of Redis is best-effort only. // If readURL is set, all reads will happen on this URL. If not, everything // will go to url. -func newRedisClient(client elan.Client, url, readURL, password, caFile string, useTLS bool) elan.Client { +func newRedisClient(client elan.Client, url, readURL, password, caFile string, useTLS bool) (elan.Client, error) { primaryOpts := &redis.Options{ Addr: url, Password: password, @@ -56,8 +55,12 @@ func newRedisClient(client elan.Client, url, readURL, password, caFile string, u Password: password, } if useTLS { - primaryOpts.TLSConfig = getTLSConfig(caFile) - readOpts.TLSConfig = getTLSConfig(caFile) + tlsConfig, err := getTLSConfig(caFile) + if err != nil { + return &redisClient{}, err + } + primaryOpts.TLSConfig = tlsConfig + readOpts.TLSConfig = tlsConfig } primaryClient := redis.NewClient(primaryOpts) readClient := primaryClient @@ -70,7 +73,7 @@ func newRedisClient(client elan.Client, url, readURL, password, caFile string, u readRedis: readClient, timeout: 10 * time.Second, maxSize: 200 * 1012, // 200 Kelly-Bootle standard units - } + }, nil } type redisClient struct { diff --git a/mettle/worker/worker.go b/mettle/worker/worker.go index 6e36f598..3dce7031 100644 --- a/mettle/worker/worker.go +++ b/mettle/worker/worker.go @@ -312,7 +312,10 @@ func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, metricTicker: time.NewTicker(5 * time.Minute), } if redis != "" { - w.client = newRedisClient(client, redis, readRedis, redisPassword, redisCAFile, redisTLS) + w.client, err = newRedisClient(client, redis, readRedis, redisPassword, redisCAFile, redisTLS) + if err != nil { + log.Warningf("Redis client could not be initialised: %s") + } } if ackExtension > 0 { if !strings.HasPrefix(requestQueue, "gcppubsub://") { From f042d81724ba899dcfeaeef28f8a0c2e004abeac Mon Sep 17 00:00:00 2001 From: isobel Date: Thu, 10 Aug 2023 11:57:41 +0100 Subject: [PATCH 4/5] Actually print the error --- mettle/worker/worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mettle/worker/worker.go b/mettle/worker/worker.go index 3dce7031..df06ccfc 100644 --- a/mettle/worker/worker.go +++ b/mettle/worker/worker.go @@ -314,7 +314,7 @@ func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, if redis != "" { w.client, err = newRedisClient(client, redis, readRedis, redisPassword, redisCAFile, redisTLS) if err != nil { - log.Warningf("Redis client could not be initialised: %s") + log.Warningf("Redis client could not be initialised: %s", err) } } if ackExtension > 0 { From 31ff0f721be7d4809c88b9e32a046893f6559f03 Mon Sep 17 00:00:00 2001 From: isobel Date: Thu, 10 Aug 2023 15:37:04 +0100 Subject: [PATCH 5/5] Log fatal if we can't load TLS config --- mettle/worker/redis.go | 6 +++--- mettle/worker/worker.go | 5 +---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/mettle/worker/redis.go b/mettle/worker/redis.go index f81f5983..e553ba5f 100644 --- a/mettle/worker/redis.go +++ b/mettle/worker/redis.go @@ -45,7 +45,7 @@ func getTLSConfig(caFile string) (*tls.Config, error) { // All usage of Redis is best-effort only. // If readURL is set, all reads will happen on this URL. If not, everything // will go to url. -func newRedisClient(client elan.Client, url, readURL, password, caFile string, useTLS bool) (elan.Client, error) { +func newRedisClient(client elan.Client, url, readURL, password, caFile string, useTLS bool) elan.Client { primaryOpts := &redis.Options{ Addr: url, Password: password, @@ -57,7 +57,7 @@ func newRedisClient(client elan.Client, url, readURL, password, caFile string, u if useTLS { tlsConfig, err := getTLSConfig(caFile) if err != nil { - return &redisClient{}, err + log.Fatalf("Failed to read CA file at %s or load TLS config for Redis: %s", caFile, err) } primaryOpts.TLSConfig = tlsConfig readOpts.TLSConfig = tlsConfig @@ -73,7 +73,7 @@ func newRedisClient(client elan.Client, url, readURL, password, caFile string, u readRedis: readClient, timeout: 10 * time.Second, maxSize: 200 * 1012, // 200 Kelly-Bootle standard units - }, nil + } } type redisClient struct { diff --git a/mettle/worker/worker.go b/mettle/worker/worker.go index df06ccfc..6e36f598 100644 --- a/mettle/worker/worker.go +++ b/mettle/worker/worker.go @@ -312,10 +312,7 @@ func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, metricTicker: time.NewTicker(5 * time.Minute), } if redis != "" { - w.client, err = newRedisClient(client, redis, readRedis, redisPassword, redisCAFile, redisTLS) - if err != nil { - log.Warningf("Redis client could not be initialised: %s", err) - } + w.client = newRedisClient(client, redis, readRedis, redisPassword, redisCAFile, redisTLS) } if ackExtension > 0 { if !strings.HasPrefix(requestQueue, "gcppubsub://") {