Skip to content

Commit

Permalink
Add gRPC API to Lucidity to list workers (#317)
Browse files Browse the repository at this point in the history
* Add hostname reporting & gRPC API to return it

* Add a test

* Rename proto

* Add name filter too

* Version

* Update generated code
  • Loading branch information
peterebden authored Jul 12, 2024
1 parent 06c46ae commit a353393
Show file tree
Hide file tree
Showing 10 changed files with 431 additions and 208 deletions.
5 changes: 5 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Version 11.11.0
---------------
* Add a gRPC API to Lucidity which can be used to programmatically retrieve workers
* Mettle workers now report the hostname they're running on to Lucidity

Version 11.10.6
---------------
* Fix Elan's List endpoint to return true size for compressed blobs.
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.10.6
11.11.0
1 change: 1 addition & 0 deletions lucidity/rpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_test(
deps = [
":rpc",
"//proto/lucidity",
"///third_party/go/github.com_golang_protobuf//proto",
"///third_party/go/github.com_stretchr_testify//assert",
],
)
50 changes: 35 additions & 15 deletions lucidity/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"mime"
"net/http"
"path"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -80,19 +81,44 @@ func newServer(minProportion float64) *server {
}
}

func (s *server) Update(ctx context.Context, req *pb.UpdateRequest) (*pb.UpdateResponse, error) {
func (s *server) Update(ctx context.Context, req *pb.Worker) (*pb.UpdateResponse, error) {
req.UpdateTime = time.Now().Unix()
v, ok := s.workers.Load(req.Name)
req.Disabled = ok && v.(*pb.UpdateRequest).Disabled
req.Disabled = ok && v.(*pb.Worker).Disabled
s.workers.Store(req.Name, req)
if !ok || v.(*pb.UpdateRequest).Version != req.Version {
if !ok || v.(*pb.Worker).Version != req.Version {
s.recalculateValidVersions()
}
s.checkWorkerHealth(req)
return &pb.UpdateResponse{ShouldDisable: req.Disabled || !req.Healthy}, nil
}

func (s *server) checkWorkerHealth(req *pb.UpdateRequest) {
func (s *server) ListWorkers(ctx context.Context, req *pb.ListWorkersRequest) (*pb.ListWorkersResponse, error) {
return s.listWorkers(req), nil
}

func (s *server) listWorkers(req *pb.ListWorkersRequest) *pb.ListWorkersResponse {
workers := &pb.ListWorkersResponse{}
s.workers.Range(func(k, v interface{}) bool {
r := v.(*pb.Worker)
if (req.Hostname == "" || r.Hostname == req.Hostname) && (req.Name == "" || r.Name == req.Name) {
s.checkWorkerHealth(r)
workers.Workers = append(workers.Workers, r)
}
return true
})
slices.SortFunc(workers.Workers, func(a, b *pb.Worker) int {
if a.Name < b.Name {
return -1
} else if a.Name > b.Name {
return 1
}
return 0
})
return workers
}

func (s *server) checkWorkerHealth(req *pb.Worker) {
if req.UpdateTime < time.Now().Add(-10*time.Minute).Unix() {
req.Healthy = false
req.Status = "Too long since last update"
Expand All @@ -103,13 +129,7 @@ func (s *server) checkWorkerHealth(req *pb.UpdateRequest) {
}

func (s *server) ServeWorkers(w http.ResponseWriter, r *http.Request) {
workers := &pb.Workers{}
s.workers.Range(func(k, v interface{}) bool {
r := v.(*pb.UpdateRequest)
s.checkWorkerHealth(r)
workers.Workers = append(workers.Workers, r)
return true
})
workers := s.listWorkers(&pb.ListWorkersRequest{})
m := jsonpb.Marshaler{
OrigName: true,
Indent: " ",
Expand Down Expand Up @@ -146,7 +166,7 @@ func (s *server) ServeDisable(w http.ResponseWriter, r *http.Request) {
log.Warning("Request to disable unknown worker %s", req.Name)
w.WriteHeader(http.StatusNotFound)
} else {
v.(*pb.UpdateRequest).Disabled = req.Disable
v.(*pb.Worker).Disabled = req.Disable
}
}

Expand All @@ -158,7 +178,7 @@ func (s *server) Clean(maxAge time.Duration) {
for range time.NewTicker(maxAge / 10).C {
min := time.Now().Add(-maxAge).Unix()
s.workers.Range(func(k, v interface{}) bool {
if v.(*pb.UpdateRequest).UpdateTime < min {
if v.(*pb.Worker).UpdateTime < min {
go s.removeWorker(k.(string))
}
return true
Expand All @@ -176,7 +196,7 @@ func (s *server) recalculateValidVersions() {
counts := map[string]int{}
n := 0
s.workers.Range(func(k, v interface{}) bool {
counts[v.(*pb.UpdateRequest).Version]++
counts[v.(*pb.Worker).Version]++
n++
return true
})
Expand Down Expand Up @@ -222,7 +242,7 @@ func (s *server) Describe(out chan<- *prometheus.Desc) {
func (s *server) Collect(out chan<- prometheus.Metric) {
var total, unhealthy, dead, busy float64
s.workers.Range(func(_, v interface{}) bool {
r := v.(*pb.UpdateRequest)
r := v.(*pb.Worker)
s.checkWorkerHealth(r)
if !r.Healthy {
unhealthy++
Expand Down
34 changes: 33 additions & 1 deletion lucidity/rpc/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"

pb "github.com/thought-machine/please-servers/proto/lucidity"
Expand All @@ -17,7 +18,7 @@ func TestValidVersion(t *testing.T) {

// Helper function. Returns true if this update would trigger disabling the worker.
update := func(name, version string) bool {
r, err := s.Update(context.Background(), &pb.UpdateRequest{
r, err := s.Update(context.Background(), &pb.Worker{
Name: name,
Version: version,
Healthy: true,
Expand All @@ -35,3 +36,34 @@ func TestValidVersion(t *testing.T) {
assert.False(t, update("worker-1", "2.0")) // worker-1 has now updated and is alive again.
assert.True(t, update("worker-3", "3.0")) // worker-3 has now updated beyond the others and it disables.
}

func TestListWorkers(t *testing.T) {
now := time.Now().Unix()
const version = "1.0"
s := newServer(1.0)
worker1 := &pb.Worker{
Name: "worker-1",
Hostname: "hydrogen",
Version: version,
Healthy: true,
UpdateTime: now,
}
worker2 := &pb.Worker{
Name: "worker-2",
Hostname: "helium",
Version: version,
Healthy: true,
UpdateTime: now,
}
s.Update(context.Background(), worker1)
s.Update(context.Background(), worker2)
resp, err := s.ListWorkers(context.Background(), &pb.ListWorkersRequest{})
assert.NoError(t, err)
assert.Equal(t, 2, len(resp.Workers))
assert.True(t, proto.Equal(worker1, resp.Workers[0]))
assert.True(t, proto.Equal(worker2, resp.Workers[1]))
resp, err = s.ListWorkers(context.Background(), &pb.ListWorkersRequest{Hostname: "helium"})
assert.NoError(t, err)
assert.Equal(t, 1, len(resp.Workers))
assert.True(t, proto.Equal(worker2, resp.Workers[0]))
}
12 changes: 9 additions & 3 deletions mettle/worker/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package worker
import (
"context"
"fmt"
"os"
"syscall"
"time"

Expand All @@ -16,7 +17,7 @@ import (
// If a Lucidity server hasn't been configured, calling this has no effect.
func (w *worker) Report(healthy, busy, alive bool, status string, args ...interface{}) {
if w.lucidChan != nil {
w.lucidChan <- &lpb.UpdateRequest{
w.lucidChan <- &lpb.Worker{
Name: w.name,
Version: w.version,
StartTime: w.startTime.Unix(),
Expand All @@ -41,11 +42,16 @@ func (w *worker) currentTaskID() string {

// sendReports sends reports to Lucidity indefinitely.
func (w *worker) sendReports() {
hostname, err := os.Hostname()
if err != nil {
log.Error("Failed to retrieve hostname: %s", err)
}
t := time.NewTicker(5 * time.Minute)
var last *lpb.UpdateRequest
var last *lpb.Worker
for {
select {
case report := <-w.lucidChan:
report.Hostname = hostname
w.sendReport(report)
last = report
case <-t.C:
Expand All @@ -56,7 +62,7 @@ func (w *worker) sendReports() {
}
}

func (w *worker) sendReport(report *lpb.UpdateRequest) {
func (w *worker) sendReport(report *lpb.Worker) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if resp, err := w.lucidity.Update(ctx, report); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions mettle/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage,
}
}
if lucidity != "" {
w.lucidChan = make(chan *lpb.UpdateRequest, 100)
w.lucidChan = make(chan *lpb.Worker, 100)
log.Notice("Dialling Lucidity...")
conn, err := grpcutil.Dial(lucidity, true, "", tokenFile) // CA is currently not configurable.
if err != nil {
Expand Down Expand Up @@ -375,7 +375,7 @@ type worker struct {
client elan.Client
rclient *client.Client
lucidity lpb.LucidityClient
lucidChan chan *lpb.UpdateRequest
lucidChan chan *lpb.Worker
cache *ristretto.Cache
instanceName string
dir, rootDir string
Expand Down
Loading

0 comments on commit a353393

Please sign in to comment.