Skip to content

Commit

Permalink
Move pubsub opts to api package and actually make things configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamishpk committed Oct 23, 2023
1 parent d8b020d commit 6bd56c1
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 36 deletions.
22 changes: 17 additions & 5 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,28 @@ func init() {
}
}

// PubSubOpts holds information to configure queue options in the api server
type PubSubOpts struct {
RequestQueue string `short:"q" long:"request_queue" env:"API_REQUEST_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending requests, e.g. gcppubsub://my-request-queue"`
ResponseQueue string `short:"r" long:"response_queue" env:"API_RESPONSE_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending responses, e.g. gcppubsub://my-response-queue"`
ResponseQueueSuffix string `long:"response_queue_suffix" env:"API_RESPONSE_QUEUE_SUFFIX" description:"Suffix to apply to the response queue name"`
PreResponseQueue string `long:"pre_response_queue" env:"API_PRE_RESPONSE_QUEUE" required:"true" description:"URL describing the pub/sub queue to connect to for preloading responses to other servers"`
NumPollers int `long:"num_pollers" env:"API_NUM_POLLERS" default:"10"`
NumPublishers int `long:"num_publishers" env:"API_NUM_PUBLISHERS" default:"2"`
SubscriptionBatchSize uint `long:"subscription_batch_size" env:"API_SUBSCRIPTION" default:"100"`
TopicBatchSize int `long:"topic_batch_size" env:"API_TOPIC_BATCH_SIZE" default:"1000"`
}

// ServeForever serves on the given port until terminated.
func ServeForever(opts grpcutil.Opts, name string, queueOpts common.APIPubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) {
func ServeForever(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) {
s, lis, err := serve(opts, name, queueOpts, apiURL, connTLS, allowedPlatform, storageURL, storageTLS)
if err != nil {
log.Fatalf("%s", err)
}
grpcutil.ServeForever(lis, s)
}

func serve(opts grpcutil.Opts, name string, queueOpts common.APIPubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) (*grpc.Server, net.Listener, error) {
func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) (*grpc.Server, net.Listener, error) {
if name == "" {
name = "mettle API server"
}
Expand All @@ -132,9 +144,9 @@ func serve(opts grpcutil.Opts, name string, queueOpts common.APIPubSubOpts, apiU
}
srv := &server{
name: name,
requests: common.MustOpenTopic(queueOpts.RequestQueue),
responses: common.MustOpenSubscription(queueOpts.ResponseQueue, queueOpts.SubscriptionBatchSize),
preResponses: common.MustOpenTopic(queueOpts.PreResponseQueue),
requests: common.MustOpenTopic(queueOpts.RequestQueue, queueOpts.TopicBatchSize, queueOpts.NumPublishers),
responses: common.MustOpenSubscription(queueOpts.ResponseQueue+name, queueOpts.SubscriptionBatchSize),
preResponses: common.MustOpenTopic(queueOpts.PreResponseQueue, queueOpts.TopicBatchSize, queueOpts.NumPublishers),
jobs: map[string]*job{},
platform: allowedPlatform,
client: client,
Expand Down
8 changes: 4 additions & 4 deletions mettle/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,16 +239,16 @@ func setupServers(t *testing.T) (pb.ExecutionClient, *executor, *grpc.Server) {
casaddr := setupCASServer()
requests := fmt.Sprintf("mem://requests%d", queueID)
responses := fmt.Sprintf("mem://responses%d", queueID)
queues := common.APIPubSubOpts{
queues := PubSubOpts{
RequestQueue: requests,
ResponseQueue: responses,
PreResponseQueue: responses,
NumPollers: 1,
SubscriptionBatchSize: 1,
}
queueID++
common.MustOpenTopic(requests) // Ensure these are created before anything tries
common.MustOpenTopic(responses) // to open a subscription to either.
common.MustOpenTopic(requests, 1, 1) // Ensure these are created before anything tries
common.MustOpenTopic(responses, 1, 1) // to open a subscription to either.
s, lis, err := serve(grpcutil.Opts{
Host: "127.0.0.1",
Port: 0,
Expand Down Expand Up @@ -336,7 +336,7 @@ type executor struct {
func newExecutor(requests, responses string) *executor {
return &executor{
requests: common.MustOpenSubscription(requests, 1),
responses: common.MustOpenTopic(responses),
responses: common.MustOpenTopic(responses, 1, 1),
}
}

Expand Down
1 change: 1 addition & 0 deletions mettle/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"///third_party/go/github.com_peterebden_go-cli-init_v4//logging",
"///third_party/go/gocloud.dev//pubsub",
"///third_party/go/gocloud.dev//pubsub/gcppubsub",
"///third_party/go/gocloud.dev//pubsub/batcher",
"///third_party/go/gocloud.dev//gcp",
"///third_party/go/google.golang.org_genproto//googleapis/longrunning",
"///third_party/go/google.golang.org_genproto//googleapis/pubsub/v1",
Expand Down
36 changes: 18 additions & 18 deletions mettle/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,10 @@ import (
"google.golang.org/grpc/status"

"github.com/thought-machine/please-servers/mettle/mempubsub" // Register our custom mempubsub scheme
"gocloud.dev/pubsub/gcppubsub" // And gocloud's gcppubsub provider
"gocloud.dev/pubsub/batcher"
"gocloud.dev/pubsub/gcppubsub" // And gocloud's gcppubsub provider
)

type APIPubSubOpts struct {
RequestQueue string `short:"q" long:"request_queue" env:"API_REQUEST_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending requests, e.g. gcppubsub://my-request-queue"`
ResponseQueue string `short:"r" long:"response_queue" env:"API_RESPONSE_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending responses, e.g. gcppubsub://my-response-queue"`
ResponseQueueSuffix string `long:"response_queue_suffix" env:"API_RESPONSE_QUEUE_SUFFIX" description:"Suffix to apply to the response queue name"`
PreResponseQueue string `long:"pre_response_queue" env:"API_PRE_RESPONSE_QUEUE" required:"true" description:"URL describing the pub/sub queue to connect to for preloading responses to other servers"`
NumPollers int `long:"num_pollers" env:"API_NUM_POLLERS" default:"10"`
NumPublishers int `long:"num_publishers" env:"API_NUM_PUBLISHERS" default:"2"`
SubscriptionBatchSize uint `long:"subscription_batch_size" env:"API_SUBSCRIPTION" default:"100"`
TopicBatchSize uint `long:"topic_batch_size" env:"API_TOPIC_BATCH_SIZE" default:"1000"`
}

var log = logging.MustGetLogger()

// For hacking around the fact that mempubsub doesn't allow reopening the same subscription (each call creates a new one)
Expand Down Expand Up @@ -89,9 +79,10 @@ func limitBatchSize(in, size string) string {
}

// MustOpenTopic opens a topic, which must have been created ahead of time.
func MustOpenTopic(url string) *pubsub.Topic {
// Batch size and number of publishers are configurable for GCP queues only.
func MustOpenTopic(url string, batchSize, numPublishers int) *pubsub.Topic {
if strings.HasPrefix(url, "gcppubsub://") {
return mustOpenGCPTopic(url)
return mustOpenGCPTopic(url, batchSize, numPublishers)
}
t, err := pubsub.OpenTopic(context.Background(), url)
if err != nil {
Expand All @@ -101,7 +92,7 @@ func MustOpenTopic(url string) *pubsub.Topic {
return t
}

func mustOpenGCPTopic(in string) *pubsub.Topic {
func mustOpenGCPTopic(in string, batchSize, numPublishers int) *pubsub.Topic {
u, err := url.Parse(in)
if err != nil {
log.Fatal(err)
Expand All @@ -116,9 +107,18 @@ func mustOpenGCPTopic(in string) *pubsub.Topic {
if err != nil {
log.Fatal(err)
}
// TODO(hpitkeathly) add configerable batcher options here
opener := gcppubsub.URLOpener{}
opener.Conn = conn
options := gcppubsub.TopicOptions{
BatcherOptions: batcher.Options{
MaxHandlers: numPublishers,
MaxBatchSize: batchSize,
},
}

opener := gcppubsub.URLOpener{
Conn: conn,
TopicOptions: options,
}

topic, err := opener.OpenTopicURL(ctx, u)
if err != nil {
log.Fatal(err)
Expand Down
14 changes: 7 additions & 7 deletions mettle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ var opts = struct {
URL string `long:"url" description:"URL for communicating with other API servers"`
TLS bool `long:"tls" description:"Use TLS for communication between api servers"`
} `group:"Options controlling communication with other API servers for bootstrapping zero-downtime deployments." namespace:"api"`
Storage StorageOpts `group:"Options controlling communication with the CAS server" namespace:"storage"`
GRPC grpcutil.Opts `group:"Options controlling the gRPC server"`
Queues common.APIPubSubOpts `group:"Options controlling the pub/sub queues"`
AllowedPlatform map[string][]string `long:"allowed_platform" description:"Allowed values for platform properties"`
Storage StorageOpts `group:"Options controlling communication with the CAS server" namespace:"storage"`
GRPC grpcutil.Opts `group:"Options controlling the gRPC server"`
Queues api.PubSubOpts `group:"Options controlling the pub/sub queues"`
AllowedPlatform map[string][]string `long:"allowed_platform" description:"Allowed values for platform properties"`
} `command:"api" description:"Start as an API server"`
Worker struct {
Dir string `short:"d" long:"dir" default:"." description:"Directory to run actions in"`
Expand Down Expand Up @@ -163,15 +163,15 @@ func main() {
if cmd == "dual" {
const requests = "mem://requests"
const responses = "mem://responses"
queues := common.APIPubSubOpts{
queues := api.PubSubOpts{
RequestQueue: requests,
ResponseQueue: responses,
NumPollers: 1,
}

// Must ensure the topics are created ahead of time.
common.MustOpenTopic(requests)
common.MustOpenTopic(responses)
common.MustOpenTopic(requests, 1, 1)
common.MustOpenTopic(responses, 1, 1)
if opts.Dual.NumWorkers == 0 {
opts.Dual.NumWorkers = runtime.NumCPU()
}
Expand Down
4 changes: 2 additions & 2 deletions mettle/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, c
// RunOne runs one single request, returning any error received.
func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, digest *pb.Digest) error {
// Must create this to submit on first
topic := common.MustOpenTopic("mem://requests")
topic := common.MustOpenTopic("mem://requests", 1, 1)
w, err := initialiseWorker(instanceName, "mem://requests", "mem://responses", name, storage, dir, cacheDir, "", sandbox, altSandbox, "", "", tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, 0, math.MaxInt64, 100.0, "", nil, 0)
if err != nil {
return err
Expand Down Expand Up @@ -290,7 +290,7 @@ func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage,

w := &worker{
requests: common.MustOpenSubscription(requestQueue, 1),
responses: common.MustOpenTopic(responseQueue),
responses: common.MustOpenTopic(responseQueue, 1, 1),
ackExtension: ackExtension,
client: client,
rclient: rexclient.Uninitialised(),
Expand Down
5 changes: 5 additions & 0 deletions third_party/go/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,11 @@ go_repo(
version = "v1.31.0",
)

go_repo(
module = "gocloud.dev/pubsub/batcher",
version = "v0.34.0",
)

go_repo(
module = "gocloud.dev/gcp",
version = "v0.34.0",
Expand Down

0 comments on commit 6bd56c1

Please sign in to comment.