Skip to content

Commit

Permalink
Pass CA file location as an arg to the worker (#255)
Browse files Browse the repository at this point in the history
* Pass CA file location as an arg to the redis instance

* Add return after logging warning

* Propagate error & semver change

* Actually print the error

* Log fatal if we can't load TLS config
  • Loading branch information
isobelormiston authored Aug 11, 2023
1 parent 6500494 commit a872923
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 15 deletions.
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 11.2.0
--------------
* Pass CA cert file location to Redis instance

Version 11.1.0
--------------
* Add configurable number of pub/sub pollers, default to 10
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.1.0
11.2.0
7 changes: 4 additions & 3 deletions mettle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type RedisOpts struct {
ReadURL string `long:"read_url" env:"REDIS_READ_URL" description:"host:port of a Redis read replica, if set any read operation will be routed to it"`
Password string `long:"password" description:"AUTH password"`
PasswordFile string `long:"password_file" env:"REDIS_PASSWORD_FILE" description:"File containing AUTH password"`
CAFile string `long:"ca_file" env:"REDIS_CA_FILE" description:"File containing the Redis instance CA cert"`
TLS bool `long:"tls" description:"Use TLS for connecting to Redis"`
}

Expand Down Expand Up @@ -176,11 +177,11 @@ func main() {
}
for i := 0; i < opts.Dual.NumWorkers; i++ {
storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)]
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, opts.Dual.Redis.URL, opts.Dual.Redis.ReadURL, opts.Dual.Redis.ReadPassword(), opts.Dual.Redis.TLS, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, opts.Dual.Redis.URL, opts.Dual.Redis.ReadURL, opts.Dual.Redis.ReadPassword(), opts.Dual.Redis.CAFile, opts.Dual.Redis.TLS, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
}
api.ServeForever(opts.Dual.GRPC, "", requests, responses, responses, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS, opts.API.Queues.NumPollers)
} else if cmd == "worker" {
worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, opts.Worker.Redis.URL, opts.Worker.Redis.ReadURL, opts.Worker.Redis.ReadPassword(), opts.Worker.Redis.TLS, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown)
worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, opts.Worker.Redis.URL, opts.Worker.Redis.ReadURL, opts.Worker.Redis.ReadPassword(), opts.Worker.Redis.CAFile, opts.Worker.Redis.TLS, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown)
} else if cmd == "api" {
api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.RequestQueue, opts.API.Queues.ResponseQueue+opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.PreResponseQueue, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS, opts.API.Queues.NumPollers)
} else if err := one(); err != nil {
Expand Down Expand Up @@ -209,7 +210,7 @@ func one() error {
defer pprof.WriteHeapProfile(f)
}
for _, action := range opts.One.Args.Actions {
if err := worker.RunOne(opts.InstanceName, "mettle-one", opts.One.Storage.Storage, opts.One.Dir, opts.One.Cache.Dir, opts.One.Sandbox, opts.One.AltSandbox, opts.One.Storage.TokenFile, opts.One.Redis.URL, opts.One.Redis.ReadURL, opts.One.Redis.ReadPassword(), opts.One.Redis.TLS, opts.One.Cache.Prefix, opts.One.Cache.Part, false, opts.One.Storage.TLS, action.ToProto()); err != nil {
if err := worker.RunOne(opts.InstanceName, "mettle-one", opts.One.Storage.Storage, opts.One.Dir, opts.One.Cache.Dir, opts.One.Sandbox, opts.One.AltSandbox, opts.One.Storage.TokenFile, opts.One.Redis.URL, opts.One.Redis.ReadURL, opts.One.Redis.ReadPassword(), opts.One.Redis.CAFile, opts.One.Redis.TLS, opts.One.Cache.Prefix, opts.One.Cache.Part, false, opts.One.Storage.TLS, action.ToProto()); err != nil {
return err
}
}
Expand Down
23 changes: 20 additions & 3 deletions mettle/worker/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package worker
import (
"context"
"crypto/tls"
"crypto/x509"
"os"
"time"

Expand All @@ -28,11 +29,23 @@ var redisBytesRead = prometheus.NewCounter(prometheus.CounterOpts{
Name: "redis_bytes_read_total",
})

func getTLSConfig(caFile string) (*tls.Config, error) {
caCert, err := os.ReadFile(caFile)
if err != nil {
return &tls.Config{}, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
return &tls.Config{
RootCAs: caCertPool,
}, nil
}

// newRedisClient augments an existing elan.Client with a Redis connection.
// All usage of Redis is best-effort only.
// If readURL is set, all reads will happen on this URL. If not, everything
// will go to url.
func newRedisClient(client elan.Client, url, readURL, password string, useTLS bool) elan.Client {
func newRedisClient(client elan.Client, url, readURL, password, caFile string, useTLS bool) elan.Client {
primaryOpts := &redis.Options{
Addr: url,
Password: password,
Expand All @@ -42,8 +55,12 @@ func newRedisClient(client elan.Client, url, readURL, password string, useTLS bo
Password: password,
}
if useTLS {
primaryOpts.TLSConfig = &tls.Config{}
readOpts.TLSConfig = &tls.Config{}
tlsConfig, err := getTLSConfig(caFile)
if err != nil {
log.Fatalf("Failed to read CA file at %s or load TLS config for Redis: %s", caFile, err)
}
primaryOpts.TLSConfig = tlsConfig
readOpts.TLSConfig = tlsConfig
}
primaryClient := redis.NewClient(primaryOpts)
readClient := primaryClient
Expand Down
16 changes: 8 additions & 8 deletions mettle/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,16 @@ func init() {
}

// RunForever runs the worker, receiving jobs until terminated.
func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) {
err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension, immediateShutdown)
func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) {
err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension, immediateShutdown)
log.Fatalf("Failed to run: %s", err)
}

// RunOne runs one single request, returning any error received.
func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tokenFile, redis, readRedis, redisPassword string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, digest *pb.Digest) error {
func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, digest *pb.Digest) error {
// Must create this to submit on first
topic := common.MustOpenTopic("mem://requests")
w, err := initialiseWorker(instanceName, "mem://requests", "mem://responses", name, storage, dir, cacheDir, "", sandbox, altSandbox, "", "", tokenFile, redis, readRedis, redisPassword, redisTLS, cachePrefix, cacheParts, clean, secureStorage, 0, math.MaxInt64, 100.0, "", nil, 0)
w, err := initialiseWorker(instanceName, "mem://requests", "mem://responses", name, storage, dir, cacheDir, "", sandbox, altSandbox, "", "", tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, 0, math.MaxInt64, 100.0, "", nil, 0)
if err != nil {
return err
}
Expand All @@ -182,8 +182,8 @@ func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tok
return nil
}

func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error {
w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension)
func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error {
w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension)
if err != nil {
return err
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, c
}
}

func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration) (*worker, error) {
func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration) (*worker, error) {
// Make sure we have a directory to run in
if err := os.MkdirAll(dir, os.ModeDir|0755); err != nil {
return nil, fmt.Errorf("Failed to create working directory: %s", err)
Expand Down Expand Up @@ -312,7 +312,7 @@ func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage,
metricTicker: time.NewTicker(5 * time.Minute),
}
if redis != "" {
w.client = newRedisClient(client, redis, readRedis, redisPassword, redisTLS)
w.client = newRedisClient(client, redis, readRedis, redisPassword, redisCAFile, redisTLS)
}
if ackExtension > 0 {
if !strings.HasPrefix(requestQueue, "gcppubsub://") {
Expand Down

0 comments on commit a872923

Please sign in to comment.