Skip to content

Commit

Permalink
Add configuration settings for PubSub source
Browse files Browse the repository at this point in the history
  • Loading branch information
adatzer authored and colmsnowplow committed Nov 15, 2024
1 parent 9302e89 commit 865fde5
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 22 deletions.
23 changes: 20 additions & 3 deletions assets/docs/configuration/sources/pubsub-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,24 @@ source {
# subscription ID for the pubsub subscription
subscription_id = "subscription-id"

# Maximum concurrent goroutines (lightweight threads) for message processing (default: 50)
concurrent_writes = 20
# This option is deprecated for the pubsub source, and will be changed or removed in the next major release.
# Use streaming_pull_goroutines, max_outstanding_messages, and max_outstanding_bytes to configure concurrency instead.
# Where streaming_pull_goroutines is set, this option is ignored.
concurrent_writes = 1

# Maximum number of unprocessed messages (default 1000)
max_outstanding_messages = 2000

# Maximum size of unprocessed messages (default 1e9)
max_outstanding_bytes = 2e9

# Minimum ack extension period when a message is received
min_extension_period_seconds = 10

# Number of streaming pull connections to open at once
streaming_pull_goroutines = 1

# Configures the GRPC connection pool size of the pubsub client
grpc_connection_pool_size = 4
}
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
golang.org/x/oauth2 v0.23.0
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/api v0.197.0 // indirect
google.golang.org/api v0.197.0
google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1
google.golang.org/grpc v1.66.2
)
Expand Down
83 changes: 67 additions & 16 deletions pkg/source/pubsub/pubsub_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"google.golang.org/api/option"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"
Expand All @@ -28,17 +29,25 @@ import (

// Configuration configures the source for records pulled
type Configuration struct {
ProjectID string `hcl:"project_id"`
SubscriptionID string `hcl:"subscription_id"`
ConcurrentWrites int `hcl:"concurrent_writes,optional"`
ProjectID string `hcl:"project_id"`
SubscriptionID string `hcl:"subscription_id"`
ConcurrentWrites int `hcl:"concurrent_writes,optional"`
MaxOutstandingMessages int `hcl:"max_outstanding_messages,optional"`
MaxOutstandingBytes int `hcl:"max_outstanding_bytes,optional"`
MinExtensionPeriodSeconds int `hcl:"min_extension_period_seconds,optional"`
StreamingPullGoRoutines int `hcl:"streaming_pull_goroutines,optional"`
GRPCConnectionPool int `hcl:"grpc_connection_pool_size,optional"`
}

// pubSubSource holds a new client for reading messages from PubSub
type pubSubSource struct {
projectID string
client *pubsub.Client
subscriptionID string
concurrentWrites int
projectID string
client *pubsub.Client
subscriptionID string
maxOutstandingMessages int
maxOutstandingBytes int
minExtensionPeriodSeconds int
streamingPullGoRoutines int

log *log.Entry

Expand All @@ -52,6 +61,11 @@ func configFunction(c *Configuration) (sourceiface.Source, error) {
c.ConcurrentWrites,
c.ProjectID,
c.SubscriptionID,
c.MaxOutstandingMessages,
c.MaxOutstandingBytes,
c.MinExtensionPeriodSeconds,
c.StreamingPullGoRoutines,
c.GRPCConnectionPool,
)
}

Expand All @@ -68,7 +82,12 @@ func (f adapter) Create(i interface{}) (interface{}, error) {
func (f adapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &Configuration{
ConcurrentWrites: 50,
// ConcurrentWrites: 50,
// Default is now handled in newPubsubSource, until we make a breaking release.
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9,
// StreamingPullGoRoutines: 1,
// Similarly handled in newPubsubSource - when we make a breaking release this should be the default.
}

return cfg, nil
Expand All @@ -93,23 +112,52 @@ var ConfigPair = config.ConfigurationPair{
}

// newPubSubSource creates a new client for reading messages from PubSub
func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string) (*pubSubSource, error) {
func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string, maxOutstandingMessages, maxOutstandingBytes int, minExtensionPeriodSeconds int, streamingPullGoRoutines int, grpcConnectionPool int) (*pubSubSource, error) {
ctx := context.Background()

// Ensures as even as possible distribution of UUIDs
uuid.EnableRandPool()

client, err := pubsub.NewClient(ctx, projectID)
log := log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID})

// We use a slice to provide the grpcConnectionPool option only if it is set.
// Otherwise we'll overwrite the client's clever under-the-hood default behaviour:
// https://github.com/googleapis/google-cloud-go/blob/380e7d23e69b22ab46cc6e3be58902accee2f26a/pubsub/pubsub.go#L165-L177
var opt []option.ClientOption
if grpcConnectionPool != 0 {
opt = append(opt, option.WithGRPCConnectionPool(grpcConnectionPool))
}

client, err := pubsub.NewClient(ctx, projectID, opt...)
if err != nil {
return nil, errors.Wrap(err, "Failed to create PubSub client")
}

// This temproary logic allows us to fix suboptimal behaviour without a breaking release.
// The order of priority is streaming_pull_goroutines > concurrent_writes > previous default
// We don't change the default becuase this would cause a major behaviour change in a non-major version bump

// If streamingPullGoRoutines is not set but concurrentWrites is, use concurrentWrites.
if streamingPullGoRoutines == 0 && concurrentWrites != 0 {
streamingPullGoRoutines = concurrentWrites
log.Warn("For the pubsub source, concurrent_writes is deprecated, and will be removed in the next major version. Use streaming_pull_goroutines instead")
}
// If neither are set, set it to the new defult, but warn users of this behaviour change
if streamingPullGoRoutines == 0 && concurrentWrites == 0 {
streamingPullGoRoutines = 50
log.Warn("Neither streaming_pull_goroutines nor concurrent_writes are set. The previous default is preserved, but strongly advise manual configuration of streaming_pull_goroutines, max_outstanding_messages and max_outstanding_bytes")
}
// Otherwise, streamingPullGoRoutines is set in the config and that value will be used.

return &pubSubSource{
projectID: projectID,
client: client,
subscriptionID: subscriptionID,
concurrentWrites: concurrentWrites,
log: log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID}),
projectID: projectID,
client: client,
subscriptionID: subscriptionID,
maxOutstandingMessages: maxOutstandingMessages,
maxOutstandingBytes: maxOutstandingBytes,
minExtensionPeriodSeconds: minExtensionPeriodSeconds,
streamingPullGoRoutines: streamingPullGoRoutines,
log: log,
}, nil
}

Expand All @@ -120,7 +168,10 @@ func (ps *pubSubSource) Read(sf *sourceiface.SourceFunctions) error {
ps.log.Info("Reading messages from subscription ...")

sub := ps.client.Subscription(ps.subscriptionID)
sub.ReceiveSettings.NumGoroutines = ps.concurrentWrites
sub.ReceiveSettings.NumGoroutines = ps.streamingPullGoRoutines // This sets the number of goroutines that can open a streaming pull at once
sub.ReceiveSettings.MaxOutstandingMessages = ps.maxOutstandingMessages // maxOutstandingMessages limits the number of messages processed at once (each spawns a goroutine)
sub.ReceiveSettings.MaxOutstandingBytes = ps.maxOutstandingBytes
sub.ReceiveSettings.MinExtensionPeriod = time.Duration(ps.minExtensionPeriodSeconds) * time.Second

cctx, cancel := context.WithCancel(ctx)

Expand Down
8 changes: 6 additions & 2 deletions pkg/source/pubsub/pubsub_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package pubsubsource

import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -82,6 +83,9 @@ func TestPubSubSource_ReadAndReturnSuccessIntegration(t *testing.T) {

assert.NotNil(pubsubSource)
assert.Nil(err)
if err != nil {
fmt.Println(err.Error())
}
assert.Equal("projects/project-test/subscriptions/test-sub", pubsubSource.GetID())

output := testutil.ReadAndReturnMessages(pubsubSource, 5*time.Second, testutil.DefaultTestWriteBuilder, nil)
Expand Down Expand Up @@ -116,7 +120,7 @@ func TestNewPubSubSource_Success(t *testing.T) {

testutil.InitMockPubsubServer(8010, nil, t)

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub")
pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9, 0, 1, 0)
assert.Nil(err)
assert.IsType(&pubSubSource{}, pubsubSource)
// This should return an error when we can't connect, rather than proceeding to the Write() function before we hit a problem.
Expand All @@ -141,7 +145,7 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) {
}
wg.Wait()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub")
pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9, 0, 1, 0)

assert.NotNil(pubsubSource)
assert.Nil(err)
Expand Down

0 comments on commit 865fde5

Please sign in to comment.