Skip to content

Commit

Permalink
add topic opts
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamishpk committed Oct 23, 2023
1 parent 4397456 commit 61c0f51
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions mettle/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/thought-machine/please-servers/mettle/mempubsub" // Register our custom mempubsub scheme
// And gocloud's gcppubsub provider
// Register our custom pubsub schemes
"github.com/thought-machine/please-servers/mettle/mempubsub"
_ "github.com/thought-machine/please-servers/mettle/mempubsub"

Check warning on line 28 in mettle/common/common.go

View workflow job for this annotation

GitHub Actions / lint

blank-imports: a blank import should be only in a main or test package, or have a comment justifying it (revive)
)

var log = logging.MustGetLogger()
Expand Down Expand Up @@ -79,14 +80,37 @@ func limitBatchSize(in, size string) string {
// MustOpenTopic opens a topic, which must have been created ahead of time.
// Batch size and number of publishers are configurable for GCP queues only.
func MustOpenTopic(url string, batchSize, numPublishers int) *pubsub.Topic {
t, err := pubsub.OpenTopic(context.Background(), url)
u := addTopicOpts(url, batchSize, numPublishers)
t, err := pubsub.OpenTopic(context.Background(), u)
if err != nil {
log.Fatalf("Failed to open topic %s: %s", url, err)
log.Fatalf("Failed to open topic %s: %s", u, err)
}
log.Debug("Opened topic %s", url)
return t
}

func addTopicOpts(in string, batchSize, numPublishers int) string {
u, err := url.Parse(in)
if err != nil {
// It's not clear exactly how we can even get here; url.Parse seems to pretty much never
// return an error. Anyway, panicking at this point shouldn't be an issue.
panic(err)
}
v := u.Query()
if batchSize > 1 {
v.Add("max_send_batch_size", strconv.Itoa(batchSize))
}
if numPublishers > 1 {
if strings.HasPrefix(in, "configurable") {
v.Add("num_handlers", strconv.Itoa(numPublishers))
} else {
log.Fatal("Can only set numPublishers on configurablepubsub queues")
}
}
u.RawQuery = v.Encode()
return u.String()
}

func handleSignals(cancel context.CancelFunc, s Shutdownable) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGABRT, syscall.SIGTERM)
Expand Down

0 comments on commit 61c0f51

Please sign in to comment.