Skip to content

Commit

Permalink
Add grpc connection pool option
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Nov 12, 2024
1 parent 403387c commit bf16476
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
14 changes: 12 additions & 2 deletions pkg/source/pubsub/pubsub_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"google.golang.org/api/option"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"
Expand All @@ -35,6 +36,7 @@ type Configuration struct {
MaxOutstandingBytes int `hcl:"max_outstanding_bytes,optional"`
MinExtensionPeriodSeconds int `hcl:"min_extension_period_seconds,optional"`
StreamingPullGoRoutines int `hcl:"streaming_pull_goroutines,optional"`
GRPCConnectionPool int `hcl:"grpc_connection_pool,optional"`
}

// pubSubSource holds a new client for reading messages from PubSub
Expand Down Expand Up @@ -64,6 +66,7 @@ func configFunction(c *Configuration) (sourceiface.Source, error) {
c.MaxOutstandingBytes,
c.MinExtensionPeriodSeconds,
c.StreamingPullGoRoutines,
c.GRPCConnectionPool,
)
}

Expand Down Expand Up @@ -111,15 +114,22 @@ var ConfigPair = config.ConfigurationPair{
}

// newPubSubSource creates a new client for reading messages from PubSub
func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string, maxOutstandingMessages, maxOutstandingBytes int, minExtensionPeriodSeconds int, streamingPullGoRoutines int) (*pubSubSource, error) {
func newPubSubSource(concurrentWrites int, projectID string, subscriptionID string, maxOutstandingMessages, maxOutstandingBytes int, minExtensionPeriodSeconds int, streamingPullGoRoutines int, grpcConnectionPool int) (*pubSubSource, error) {
ctx := context.Background()

// Ensures as even as possible distribution of UUIDs
uuid.EnableRandPool()

log := log.WithFields(log.Fields{"source": "pubsub", "cloud": "GCP", "project": projectID, "subscription": subscriptionID})

client, err := pubsub.NewClient(ctx, projectID)
// We use a slice to provide the grpcConnectionPool option only if it is set.
// Otherwise we'll overwrite the client's clever under-the-hood default behaviour
var opt []option.ClientOption
if grpcConnectionPool != 0 {
opt = append(opt, option.WithGRPCConnectionPool(grpcConnectionPool))
}

client, err := pubsub.NewClient(ctx, projectID, opt...)
if err != nil {
return nil, errors.Wrap(err, "Failed to create PubSub client")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/source/pubsub/pubsub_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestNewPubSubSource_Success(t *testing.T) {

testutil.InitMockPubsubServer(8010, nil, t)

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9, 0, 1)
pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9, 0, 1, 0)
assert.Nil(err)
assert.IsType(&pubSubSource{}, pubsubSource)
// This should return an error when we can't connect, rather than proceeding to the Write() function before we hit a problem.
Expand All @@ -145,7 +145,7 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) {
}
wg.Wait()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9, 0, 1)
pubsubSource, err := newPubSubSource(10, "project-test", "test-sub", 1000, 1e9, 0, 1, 0)

assert.NotNil(pubsubSource)
assert.Nil(err)
Expand Down

0 comments on commit bf16476

Please sign in to comment.