From 61c0f515df4ec85d19e06659c2ede2aac611be8c Mon Sep 17 00:00:00 2001 From: hpitkeathly Date: Mon, 23 Oct 2023 15:17:13 +0100 Subject: [PATCH] add topic opts --- mettle/common/common.go | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/mettle/common/common.go b/mettle/common/common.go index 8c1b1b58..4a9d436e 100644 --- a/mettle/common/common.go +++ b/mettle/common/common.go @@ -23,8 +23,9 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/thought-machine/please-servers/mettle/mempubsub" // Register our custom mempubsub scheme - // And gocloud's gcppubsub provider + // Register our custom pubsub schemes + "github.com/thought-machine/please-servers/mettle/mempubsub" + _ "github.com/thought-machine/please-servers/mettle/mempubsub" ) var log = logging.MustGetLogger() @@ -79,14 +80,37 @@ func limitBatchSize(in, size string) string { // MustOpenTopic opens a topic, which must have been created ahead of time. // Batch size and number of publishers are configurable for GCP queues only. func MustOpenTopic(url string, batchSize, numPublishers int) *pubsub.Topic { - t, err := pubsub.OpenTopic(context.Background(), url) + u := addTopicOpts(url, batchSize, numPublishers) + t, err := pubsub.OpenTopic(context.Background(), u) if err != nil { - log.Fatalf("Failed to open topic %s: %s", url, err) + log.Fatalf("Failed to open topic %s: %s", u, err) } log.Debug("Opened topic %s", url) return t } +func addTopicOpts(in string, batchSize, numPublishers int) string { + u, err := url.Parse(in) + if err != nil { + // It's not clear exactly how we can even get here; url.Parse seems to pretty much never + // return an error. Anyway, panicking at this point shouldn't be an issue. + panic(err) + } + v := u.Query() + if batchSize > 1 { + v.Add("max_send_batch_size", strconv.Itoa(batchSize)) + } + if numPublishers > 1 { + if strings.HasPrefix(in, "configurable") { + v.Add("num_handlers", strconv.Itoa(numPublishers)) + } else { + log.Fatal("Can only set numPublishers on configurablepubsub queues") + } + } + u.RawQuery = v.Encode() + return u.String() +} + func handleSignals(cancel context.CancelFunc, s Shutdownable) { ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGABRT, syscall.SIGTERM)