Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass CA file location as an arg to the worker #255

Merged
merged 5 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 11.1.1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're strictly doing semver this is 11.2.0 because it's a feature :) Not that we're that strict in this repo.

--------------
* Pass CA cert file location to Redis instance

Version 11.1.0
--------------
* Add configurable number of pub/sub pollers, default to 10
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.1.0
11.1.1
7 changes: 4 additions & 3 deletions mettle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type RedisOpts struct {
ReadURL string `long:"read_url" env:"REDIS_READ_URL" description:"host:port of a Redis read replica, if set any read operation will be routed to it"`
Password string `long:"password" description:"AUTH password"`
PasswordFile string `long:"password_file" env:"REDIS_PASSWORD_FILE" description:"File containing AUTH password"`
CAFile string `long:"ca_file" env:"REDIS_CA_FILE" description:"File containing the Redis instance CA cert"`
TLS bool `long:"tls" description:"Use TLS for connecting to Redis"`
}

Expand Down Expand Up @@ -176,11 +177,11 @@ func main() {
}
for i := 0; i < opts.Dual.NumWorkers; i++ {
storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)]
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, opts.Dual.Redis.URL, opts.Dual.Redis.ReadURL, opts.Dual.Redis.ReadPassword(), opts.Dual.Redis.TLS, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, opts.Dual.Redis.URL, opts.Dual.Redis.ReadURL, opts.Dual.Redis.ReadPassword(), opts.Dual.Redis.CAFile, opts.Dual.Redis.TLS, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
}
api.ServeForever(opts.Dual.GRPC, "", requests, responses, responses, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS, opts.API.Queues.NumPollers)
} else if cmd == "worker" {
worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, opts.Worker.Redis.URL, opts.Worker.Redis.ReadURL, opts.Worker.Redis.ReadPassword(), opts.Worker.Redis.TLS, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown)
worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, opts.Worker.Redis.URL, opts.Worker.Redis.ReadURL, opts.Worker.Redis.ReadPassword(), opts.Worker.Redis.CAFile, opts.Worker.Redis.TLS, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown)
} else if cmd == "api" {
api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.RequestQueue, opts.API.Queues.ResponseQueue+opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.PreResponseQueue, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS, opts.API.Queues.NumPollers)
} else if err := one(); err != nil {
Expand Down Expand Up @@ -209,7 +210,7 @@ func one() error {
defer pprof.WriteHeapProfile(f)
}
for _, action := range opts.One.Args.Actions {
if err := worker.RunOne(opts.InstanceName, "mettle-one", opts.One.Storage.Storage, opts.One.Dir, opts.One.Cache.Dir, opts.One.Sandbox, opts.One.AltSandbox, opts.One.Storage.TokenFile, opts.One.Redis.URL, opts.One.Redis.ReadURL, opts.One.Redis.ReadPassword(), opts.One.Redis.TLS, opts.One.Cache.Prefix, opts.One.Cache.Part, false, opts.One.Storage.TLS, action.ToProto()); err != nil {
if err := worker.RunOne(opts.InstanceName, "mettle-one", opts.One.Storage.Storage, opts.One.Dir, opts.One.Cache.Dir, opts.One.Sandbox, opts.One.AltSandbox, opts.One.Storage.TokenFile, opts.One.Redis.URL, opts.One.Redis.ReadURL, opts.One.Redis.ReadPassword(), opts.One.Redis.CAFile, opts.One.Redis.TLS, opts.One.Cache.Prefix, opts.One.Cache.Part, false, opts.One.Storage.TLS, action.ToProto()); err != nil {
return err
}
}
Expand Down
20 changes: 17 additions & 3 deletions mettle/worker/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package worker
import (
"context"
"crypto/tls"
"crypto/x509"
"os"
"time"

Expand All @@ -28,11 +29,24 @@ var redisBytesRead = prometheus.NewCounter(prometheus.CounterOpts{
Name: "redis_bytes_read_total",
})

func getTLSConfig(caFile string) *tls.Config {
caCert, err := os.ReadFile(caFile)
if err != nil {
log.Warning("Failed to collect CA cert from file %s: '%s'. Redis connection will not work", caFile, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should probably return an error here, and die if it fails. If someone passes a CA I'd expect it not to carry on if it can't load it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean let the whole worker die?
Atm the redis approach is best-effort, hence just a warning (although it should probably be propagated up to where the worker initialises the redis client). Would a warning along with an alert when we see it make sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well writing and connecting to Redis is best-effort, but I wouldn't expect loading the CA cert to fail?

return &tls.Config{}
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
return &tls.Config{
RootCAs: caCertPool,
}
}

// newRedisClient augments an existing elan.Client with a Redis connection.
// All usage of Redis is best-effort only.
// If readURL is set, all reads will happen on this URL. If not, everything
// will go to url.
func newRedisClient(client elan.Client, url, readURL, password string, useTLS bool) elan.Client {
func newRedisClient(client elan.Client, url, readURL, password, caFile string, useTLS bool) elan.Client {
primaryOpts := &redis.Options{
Addr: url,
Password: password,
Expand All @@ -42,8 +56,8 @@ func newRedisClient(client elan.Client, url, readURL, password string, useTLS bo
Password: password,
}
if useTLS {
primaryOpts.TLSConfig = &tls.Config{}
readOpts.TLSConfig = &tls.Config{}
primaryOpts.TLSConfig = getTLSConfig(caFile)
readOpts.TLSConfig = getTLSConfig(caFile)
}
primaryClient := redis.NewClient(primaryOpts)
readClient := primaryClient
Expand Down
16 changes: 8 additions & 8 deletions mettle/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,16 @@ func init() {
}

// RunForever runs the worker, receiving jobs until terminated.
func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) {
err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension, immediateShutdown)
func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) {
err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension, immediateShutdown)
log.Fatalf("Failed to run: %s", err)
}

// RunOne runs one single request, returning any error received.
func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tokenFile, redis, readRedis, redisPassword string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, digest *pb.Digest) error {
func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, digest *pb.Digest) error {
// Must create this to submit on first
topic := common.MustOpenTopic("mem://requests")
w, err := initialiseWorker(instanceName, "mem://requests", "mem://responses", name, storage, dir, cacheDir, "", sandbox, altSandbox, "", "", tokenFile, redis, readRedis, redisPassword, redisTLS, cachePrefix, cacheParts, clean, secureStorage, 0, math.MaxInt64, 100.0, "", nil, 0)
w, err := initialiseWorker(instanceName, "mem://requests", "mem://responses", name, storage, dir, cacheDir, "", sandbox, altSandbox, "", "", tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, 0, math.MaxInt64, 100.0, "", nil, 0)
if err != nil {
return err
}
Expand All @@ -182,8 +182,8 @@ func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tok
return nil
}

func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error {
w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension)
func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error {
w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension)
if err != nil {
return err
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, c
}
}

func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration) (*worker, error) {
func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration) (*worker, error) {
// Make sure we have a directory to run in
if err := os.MkdirAll(dir, os.ModeDir|0755); err != nil {
return nil, fmt.Errorf("Failed to create working directory: %s", err)
Expand Down Expand Up @@ -312,7 +312,7 @@ func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage,
metricTicker: time.NewTicker(5 * time.Minute),
}
if redis != "" {
w.client = newRedisClient(client, redis, readRedis, redisPassword, redisTLS)
w.client = newRedisClient(client, redis, readRedis, redisPassword, redisCAFile, redisTLS)
}
if ackExtension > 0 {
if !strings.HasPrefix(requestQueue, "gcppubsub://") {
Expand Down
Loading