diff --git a/assets/docs/configuration/sources/pubsub-full-example.hcl b/assets/docs/configuration/sources/pubsub-full-example.hcl index d74e828d..1bec85d6 100644 --- a/assets/docs/configuration/sources/pubsub-full-example.hcl +++ b/assets/docs/configuration/sources/pubsub-full-example.hcl @@ -8,7 +8,24 @@ source { # subscription ID for the pubsub subscription subscription_id = "subscription-id" - # Maximum concurrent goroutines (lightweight threads) for message processing (default: 50) - concurrent_writes = 20 + # This option is deprecated for the pubsub source, and will be changed or removed in the next major release. + # Use streaming_pull_goroutines, max_outstanding_messages, and max_outstanding_bytes to configure concurrency instead. + # Where streaming_pull_goroutines is set, this option is ignored. + concurrent_writes = 1 + + # Maximum number of unprocessed messages (default 1000) + max_outstanding_messages = 2000 + + # Maximum size of unprocessed messages (default 1e9) + max_outstanding_bytes = 2e9 + + # Minimum ack extension period when a message is received + min_extension_period_seconds = 10 + + # Number of streaming pull connections to open at once + streaming_pull_goroutines = 1 + + # Configures the GRPC connection pool size of the pubsub client + grpc_connection_pool_size = 4 } -} \ No newline at end of file +} diff --git a/go.mod b/go.mod index 8a0c2cf9..fdd5c8a6 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( golang.org/x/oauth2 v0.23.0 golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect - google.golang.org/api v0.197.0 // indirect + google.golang.org/api v0.197.0 google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 google.golang.org/grpc v1.66.2 ) diff --git a/pkg/source/pubsub/pubsub_source.go b/pkg/source/pubsub/pubsub_source.go index 9ec8ef99..cbacaf61 100644 --- a/pkg/source/pubsub/pubsub_source.go +++ b/pkg/source/pubsub/pubsub_source.go @@ -20,6 +20,7 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "google.golang.org/api/option" "github.com/snowplow/snowbridge/config" "github.com/snowplow/snowbridge/pkg/models" @@ -28,17 +29,25 @@ import ( // Configuration configures the source for records pulled type Configuration struct { - ProjectID string `hcl:"project_id"` - SubscriptionID string `hcl:"subscription_id"` - ConcurrentWrites int `hcl:"concurrent_writes,optional"` + ProjectID string `hcl:"project_id"` + SubscriptionID string `hcl:"subscription_id"` + ConcurrentWrites int `hcl:"concurrent_writes,optional"` + MaxOutstandingMessages int `hcl:"max_outstanding_messages,optional"` + MaxOutstandingBytes int `hcl:"max_outstanding_bytes,optional"` + MinExtensionPeriodSeconds int `hcl:"min_extension_period_seconds,optional"` + StreamingPullGoRoutines int `hcl:"streaming_pull_goroutines,optional"` + GRPCConnectionPool int `hcl:"grpc_connection_pool_size,optional"` } // pubSubSource holds a new client for reading messages from PubSub type pubSubSource struct { - projectID string - client *pubsub.Client - subscriptionID string - concurrentWrites int + projectID string + client *pubsub.Client + subscriptionID string + maxOutstandingMessages int + maxOutstandingBytes int + minExtensionPeriodSeconds int + streamingPullGoRoutines int log *log.Entry @@ -52,6 +61,11 @@ func configFunction(c *Configuration) (sourceiface.Source, error) { c.ConcurrentWrites, c.ProjectID, c.SubscriptionID, + c.MaxOutstandingMessages, + c.MaxOutstandingBytes, + c.MinExtensionPeriodSeconds, + c.StreamingPullGoRoutines, + c.GRPCConnectionPool, ) } @@ -68,7 +82,12 @@ func (f adapter) Create(i interface{}) (interface{}, error) { func (f adapter) ProvideDefault() (interface{}, error) { // Provide defaults cfg := &Configuration{ - ConcurrentWrites: 50, + // ConcurrentWrites: 50, + // Default is now handled in newPubsubSource, until we make a breaking release. + MaxOutstandingMessages: 1000, + MaxOutstandingBytes: 1e9, + // StreamingPullGoRoutines: 1, + // Similarly handled in newPubsubSource - when we make a breaking release this should be the default. } return cfg, nil @@ -93,23 +112,52 @@ var ConfigPair = config.ConfigurationPair{ } // newPubSubSource creates a new client for reading messages from PubSub -func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string) (*pubSubSource, error) { +func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string, maxOutstandingMessages, maxOutstandingBytes int, minExtensionPeriodSeconds int, streamingPullGoRoutines int, grpcConnectionPool int) (*pubSubSource, error) { ctx := context.Background() // Ensures as even as possible distribution of UUIDs uuid.EnableRandPool() - client, err := pubsub.NewClient(ctx, projectID) + log := log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID}) + + // We use a slice to provide the grpcConnectionPool option only if it is set. + // Otherwise we'll overwrite the client's clever under-the-hood default behaviour: + // https://github.com/googleapis/google-cloud-go/blob/380e7d23e69b22ab46cc6e3be58902accee2f26a/pubsub/pubsub.go#L165-L177 + var opt []option.ClientOption + if grpcConnectionPool != 0 { + opt = append(opt, option.WithGRPCConnectionPool(grpcConnectionPool)) + } + + client, err := pubsub.NewClient(ctx, projectID, opt...) if err != nil { return nil, errors.Wrap(err, "Failed to create PubSub client") } + // This temproary logic allows us to fix suboptimal behaviour without a breaking release. + // The order of priority is streaming_pull_goroutines > concurrent_writes > previous default + // We don't change the default becuase this would cause a major behaviour change in a non-major version bump + + // If streamingPullGoRoutines is not set but concurrentWrites is, use concurrentWrites. + if streamingPullGoRoutines == 0 && concurrentWrites != 0 { + streamingPullGoRoutines = concurrentWrites + log.Warn("For the pubsub source, concurrent_writes is deprecated, and will be removed in the next major version. Use streaming_pull_goroutines instead") + } + // If neither are set, set it to the new defult, but warn users of this behaviour change + if streamingPullGoRoutines == 0 && concurrentWrites == 0 { + streamingPullGoRoutines = 50 + log.Warn("Neither streaming_pull_goroutines nor concurrent_writes are set. The previous default is preserved, but strongly advise manual configuration of streaming_pull_goroutines, max_outstanding_messages and max_outstanding_bytes") + } + // Otherwise, streamingPullGoRoutines is set in the config and that value will be used. + return &pubSubSource{ - projectID: projectID, - client: client, - subscriptionID: subscriptionID, - concurrentWrites: concurrentWrites, - log: log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID}), + projectID: projectID, + client: client, + subscriptionID: subscriptionID, + maxOutstandingMessages: maxOutstandingMessages, + maxOutstandingBytes: maxOutstandingBytes, + minExtensionPeriodSeconds: minExtensionPeriodSeconds, + streamingPullGoRoutines: streamingPullGoRoutines, + log: log, }, nil } @@ -120,7 +168,10 @@ func (ps *pubSubSource) Read(sf *sourceiface.SourceFunctions) error { ps.log.Info("Reading messages from subscription ...") sub := ps.client.Subscription(ps.subscriptionID) - sub.ReceiveSettings.NumGoroutines = ps.concurrentWrites + sub.ReceiveSettings.NumGoroutines = ps.streamingPullGoRoutines // This sets the number of goroutines that can open a streaming pull at once + sub.ReceiveSettings.MaxOutstandingMessages = ps.maxOutstandingMessages // maxOutstandingMessages limits the number of messages processed at once (each spawns a goroutine) + sub.ReceiveSettings.MaxOutstandingBytes = ps.maxOutstandingBytes + sub.ReceiveSettings.MinExtensionPeriod = time.Duration(ps.minExtensionPeriodSeconds) * time.Second cctx, cancel := context.WithCancel(ctx) diff --git a/pkg/source/pubsub/pubsub_source_test.go b/pkg/source/pubsub/pubsub_source_test.go index 0e218acd..96058bff 100644 --- a/pkg/source/pubsub/pubsub_source_test.go +++ b/pkg/source/pubsub/pubsub_source_test.go @@ -13,6 +13,7 @@ package pubsubsource import ( "context" + "fmt" "os" "path/filepath" "sort" @@ -82,6 +83,9 @@ func TestPubSubSource_ReadAndReturnSuccessIntegration(t *testing.T) { assert.NotNil(pubsubSource) assert.Nil(err) + if err != nil { + fmt.Println(err.Error()) + } assert.Equal("projects/project-test/subscriptions/test-sub", pubsubSource.GetID()) output := testutil.ReadAndReturnMessages(pubsubSource, 5*time.Second, testutil.DefaultTestWriteBuilder, nil) @@ -116,7 +120,7 @@ func TestNewPubSubSource_Success(t *testing.T) { testutil.InitMockPubsubServer(8010, nil, t) - pubsubSource, err := newPubSubSource(10, "project-test", "test-sub") + pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9, 0, 1, 0) assert.Nil(err) assert.IsType(&pubSubSource{}, pubsubSource) // This should return an error when we can't connect, rather than proceeding to the Write() function before we hit a problem. @@ -141,7 +145,7 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) { } wg.Wait() - pubsubSource, err := newPubSubSource(10, "project-test", "test-sub") + pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9, 0, 1, 0) assert.NotNil(pubsubSource) assert.Nil(err)