Skip to content

Commit

Permalink
Add a new flag for connectivity checks to an external server (#322)
Browse files Browse the repository at this point in the history
* Add a new flag for preflight checks to an external server

* gofmt
  • Loading branch information
peterebden authored Oct 25, 2024
1 parent 6c63323 commit fcda4c0
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 7 deletions.
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 11.12.0
---------------
* Add a new flag for connectivity checks to an external server

Version 11.11.1
---------------
* Add keepalive config to gRPC dial parameters
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.11.1
11.12.0
2 changes: 1 addition & 1 deletion mettle/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ go_binary(

sh_cmd(
name = "run_local",
cmd = f"mkdir -p plz-out/mettle && exec $(out_location :mettle) dual --host {CONFIG.LOCAL_HOST} --port 7778 -s 127.0.0.1:7777 -s file://\\\\$PWD/plz-out/elan -s 127.0.0.1:7772 -d plz-out/mettle --log_file plz-out/log/mettle.log --browser {CONFIG.BROWSER_URL} --sandbox $(out_location //sandbox:sandbox) --alt_sandbox $(out_location //sandbox:alt_sandbox) --admin_host 127.0.0.1 --token_file grpcutil/token.txt --redis_url 127.0.0.1:6379 --redis_password_file redis/auth --lucidity grpc://127.0.0.1:7774 --cache_dir plz-out/mettle-cache --cache_prefix third_party --memory_threshold 95.0 --version_file VERSION --cost cpu:£0.02 --cost mem:£0.01 --allowed_platform OSFamily:linux --allowed_platform OSFamily:macos --allowed_platform ISA:x86-64",
cmd = f"mkdir -p plz-out/mettle && exec $(out_location :mettle) dual --host {CONFIG.LOCAL_HOST} --port 7778 -s 127.0.0.1:7777 -s file://\\\\$PWD/plz-out/elan -s 127.0.0.1:7772 -d plz-out/mettle --log_file plz-out/log/mettle.log --browser {CONFIG.BROWSER_URL} --sandbox $(out_location //sandbox:sandbox) --alt_sandbox $(out_location //sandbox:alt_sandbox) --admin_host 127.0.0.1 --token_file grpcutil/token.txt --redis_url 127.0.0.1:6379 --redis_password_file redis/auth --lucidity grpc://127.0.0.1:7774 --cache_dir plz-out/mettle-cache --cache_prefix third_party --memory_threshold 95.0 --version_file VERSION --cost cpu:£0.02 --cost mem:£0.01 --allowed_platform OSFamily:linux --allowed_platform OSFamily:macos --allowed_platform ISA:x86-64 --connectivity_check gstatic",
data = [
":mettle",
"//sandbox",
Expand Down
8 changes: 6 additions & 2 deletions mettle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ var opts = struct {
Timeout flags.Duration `long:"timeout" hidden:"true" description:"Deprecated, has no effect."`
MinDiskSpace flags.ByteSize `long:"min_disk_space" default:"1G" description:"Don't accept builds unless at least this much disk space is available"`
MemoryThreshold float64 `long:"memory_threshold" default:"100.0" description:"Don't accept builds unless available memory is under this percentage"`
ConnCheck string `long:"connectivity_check" choice:"gstatic" choice:"firefox" description:"Run an HTTP connectivity check periodically to verify if HTTP access is working"`
ConnCheckPeriod flags.Duration `long:"connectivity_check_period" default:"1h" description:"Periodicity to re-check connectivity at"`
VersionFile string `long:"version_file" description:"File containing version tag"`
Costs map[string]cli.Currency `long:"cost" description:"Per-second costs to associate with each build action."`
ImmediateShutdown bool `long:"immediate_shutdown" description:"True if the worker should shut down immediately on a sigterm."`
Expand All @@ -84,6 +86,8 @@ var opts = struct {
Timeout flags.Duration `long:"timeout" hidden:"true" description:"Deprecated, has no effect."`
MinDiskSpace flags.ByteSize `long:"min_disk_space" default:"1G" description:"Don't accept builds unless at least this much disk space is available"`
MemoryThreshold float64 `long:"memory_threshold" default:"100.0" description:"Don't accept builds unless available memory is under this percentage"`
ConnCheck string `long:"connectivity_check" choice:"gstatic" choice:"firefox" description:"Run an HTTP connectivity check periodically to verify if HTTP access is working"`
ConnCheckPeriod flags.Duration `long:"connectivity_check_period" default:"1h" description:"Periodicity to re-check connectivity at"`
VersionFile string `long:"version_file" description:"File containing version tag"`
Costs map[string]cli.Currency `long:"cost" description:"Per-second costs to associate with each build action."`
Cache CacheOpts `group:"Options controlling caching" namespace:"cache"`
Expand Down Expand Up @@ -171,12 +175,12 @@ func main() {
}
for i := 0; i < opts.Dual.NumWorkers; i++ {
storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)]
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, primaryRedis, readRedis, opts.Dual.Redis.MaxSize, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, primaryRedis, readRedis, opts.Dual.Redis.MaxSize, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.ConnCheck, time.Duration(opts.Dual.ConnCheckPeriod), opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
}
api.ServeForever(opts.Dual.GRPC, "", queues, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS)
} else if cmd == "worker" {
primaryRedis, readRedis := opts.Worker.Redis.Clients()
worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, primaryRedis, readRedis, opts.Worker.Redis.MaxSize, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown)
worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, primaryRedis, readRedis, opts.Worker.Redis.MaxSize, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.ConnCheck, time.Duration(opts.Worker.ConnCheckPeriod), opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown)
} else if cmd == "api" {
api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS)
} else if err := one(); err != nil {
Expand Down
24 changes: 24 additions & 0 deletions mettle/worker/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package worker
import (
"context"
"fmt"
"net/http"
"os"
"syscall"
"time"
Expand Down Expand Up @@ -167,3 +168,26 @@ func (w *worker) checkLiveConnection() bool {
}
return true
}

// checkConnectivity tries to check connectivity to an external checker. It dies if it doesn't work.
func (w *worker) checkConnectivity(check string) {
switch check {
case "gstatic":
if resp, err := http.Get("https://connectivitycheck.gstatic.com/generate_204"); err != nil {
log.Fatalf("Failed to complete connectivity check: %s", err)
} else if resp.StatusCode != http.StatusNoContent {
log.Fatalf("Connectivity check returned unexpected status: %s", resp.Status)
}
case "firefox":
if resp, err := http.Get("https://detectportal.firefox.com/canonical.html"); err != nil {
log.Fatalf("Failed to complete connectivity check: %s", err)
} else if resp.StatusCode != http.StatusOK {
log.Fatalf("Connectivity check returned unexpected status: %s", resp.Status)
}
case "":
return // no check
default:
log.Fatalf("unknown connectivity check type: %s", check)
}
log.Notice("%s connectivity check succeeded", check)
}
17 changes: 14 additions & 3 deletions mettle/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func init() {
}

// RunForever runs the worker, receiving jobs until terminated.
func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) {
err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension, immediateShutdown)
func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) {
err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, connCheck, connCheckPeriod, versionFile, costs, ackExtension, immediateShutdown)
log.Fatalf("Failed to run: %s", err)
}

Expand Down Expand Up @@ -184,7 +184,7 @@ func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tok
return nil
}

func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error {
func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error {
w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension)
if err != nil {
return err
Expand All @@ -210,12 +210,23 @@ func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, c
}
}
}()
w.checkConnectivity(connCheck)
go w.periodicallyPushMetrics()
defer w.metricTicker.Stop()
t := time.NewTicker(connCheckPeriod)
defer t.Stop()
for {
w.waitForFreeResources()
w.waitForLiveConnection()
w.waitIfDisabled()

// Run the connectivity check if the period has expired
select {
case <-t.C:
w.checkConnectivity(connCheck)
default:
}

// Run an explicit GC to clear up after the last task; ideally we leave as much free as
// possible for the subprocesses.
runtime.GC()
Expand Down

0 comments on commit fcda4c0

Please sign in to comment.