diff --git a/assets/docs/configuration/sources/kinesis-full-example.hcl b/assets/docs/configuration/sources/kinesis-full-example.hcl index ced07691..9b0da0cd 100644 --- a/assets/docs/configuration/sources/kinesis-full-example.hcl +++ b/assets/docs/configuration/sources/kinesis-full-example.hcl @@ -36,5 +36,8 @@ source { # Maximum concurrent goroutines (lightweight threads) for message processing (default: 50) concurrent_writes = 15 + + # The name of the Kinesis client that is used to allocate shards. It must be unique per instance of Snowbridge. + client_name = env.HOSTNAME } } diff --git a/assets/test/source/configs/source-kinesis-extended.hcl b/assets/test/source/configs/source-kinesis-extended.hcl index cbb85ada..36e7a496 100644 --- a/assets/test/source/configs/source-kinesis-extended.hcl +++ b/assets/test/source/configs/source-kinesis-extended.hcl @@ -8,5 +8,6 @@ source { app_name = "testApp" start_timestamp = "2022-03-15 07:52:53" concurrent_writes = 51 + client_name = "test_client_name" } } diff --git a/docs/configuration_source_docs_test.go b/docs/configuration_source_docs_test.go index 65e5c432..9aa54562 100644 --- a/docs/configuration_source_docs_test.go +++ b/docs/configuration_source_docs_test.go @@ -31,6 +31,7 @@ func TestSourceDocumentation(t *testing.T) { // Set env vars referenced in the config examples t.Setenv("MY_AUTH_PASSWORD", "test") t.Setenv("SASL_PASSWORD", "test") + t.Setenv("HOSTNAME", "hostname") sourcesToTest := []string{"kafka", "kinesis", "pubsub", "sqs", "stdin"} diff --git a/docs/configuration_transformations_docs_test.go b/docs/configuration_transformations_docs_test.go index b98f9a71..c3ba57dc 100644 --- a/docs/configuration_transformations_docs_test.go +++ b/docs/configuration_transformations_docs_test.go @@ -44,7 +44,7 @@ func TestBuiltinTransformationDocumentation(t *testing.T) { } func TestBuiltinSnowplowTransformationDocumentation(t *testing.T) { - transformationsToTest := []string{"spEnrichedFilter", "spEnrichedFilterContext", "spEnrichedFilterUnstructEvent", "spEnrichedSetPk", "spEnrichedToJson", "spGtmssPreview"} + transformationsToTest := []string{"spEnrichedFilter", "spEnrichedFilterContext", "spEnrichedFilterUnstructEvent", "spEnrichedSetPk", "spEnrichedToJson", "spGtmssPreview"} for _, tfm := range transformationsToTest { diff --git a/pkg/source/kinesis/kinesis_source.go b/pkg/source/kinesis/kinesis_source.go index e0d48089..4220028f 100644 --- a/pkg/source/kinesis/kinesis_source.go +++ b/pkg/source/kinesis/kinesis_source.go @@ -44,6 +44,7 @@ type Configuration struct { ShardCheckFreqSeconds int `hcl:"shard_check_freq_seconds,optional"` LeaderActionFreqSeconds int `hcl:"leader_action_freq_seconds,optional"` ConcurrentWrites int `hcl:"concurrent_writes,optional"` + ClientName string `hcl:"client_name,optional"` } // --- Kinesis source @@ -87,7 +88,8 @@ func configFunctionGeneratorWithInterfaces(kinesisClient kinesisiface.KinesisAPI &iteratorTstamp, c.ReadThrottleDelayMs, c.ShardCheckFreqSeconds, - c.LeaderActionFreqSeconds) + c.LeaderActionFreqSeconds, + c.ClientName) } } @@ -119,12 +121,16 @@ func (f adapter) Create(i interface{}) (interface{}, error) { // ProvideDefault implements the ComponentConfigurable interface. func (f adapter) ProvideDefault() (interface{}, error) { + // Ensures as even as possible distribution of UUIDs + uuid.EnableRandPool() + // Provide defaults cfg := &Configuration{ ReadThrottleDelayMs: 250, // Kinsumer default is 250ms ConcurrentWrites: 50, ShardCheckFreqSeconds: 10, LeaderActionFreqSeconds: 60, + ClientName: uuid.New().String(), } return cfg, nil @@ -171,7 +177,8 @@ func newKinesisSourceWithInterfaces( startTimestamp *time.Time, readThrottleDelay int, shardCheckFreq int, - leaderActionFreq int) (*kinesisSource, error) { + leaderActionFreq int, + clientName string) (*kinesisSource, error) { config := kinsumer.NewConfig(). WithShardCheckFrequency(time.Duration(shardCheckFreq) * time.Second). @@ -181,13 +188,7 @@ func newKinesisSourceWithInterfaces( WithIteratorStartTimestamp(startTimestamp). WithThrottleDelay(time.Duration(readThrottleDelay) * time.Millisecond) - // Ensures as even as possible distribution of UUIDs - uuid.EnableRandPool() - - // TODO: See if the client name can be reused to survive same node reboots - name := uuid.New().String() - - k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, name, config) + k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, clientName, config) if err != nil { return nil, errors.Wrap(err, "Failed to create Kinsumer client") } diff --git a/pkg/source/kinesis/kinesis_source_test.go b/pkg/source/kinesis/kinesis_source_test.go index 3035990e..e0fd71e6 100644 --- a/pkg/source/kinesis/kinesis_source_test.go +++ b/pkg/source/kinesis/kinesis_source_test.go @@ -16,11 +16,10 @@ import ( "fmt" "os" "path/filepath" - "reflect" "testing" "time" - "github.com/davecgh/go-spew/spew" + "github.com/google/uuid" "github.com/hashicorp/hcl/v2/hclparse" "github.com/stretchr/testify/assert" @@ -63,7 +62,7 @@ func TestNewKinesisSourceWithInterfaces_Success(t *testing.T) { defer testutil.DeleteAWSLocalstackDynamoDBTables(dynamodbClient, appName) - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250, 10, 10) + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250, 10, 10, "test_client_name") assert.IsType(&kinesisSource{}, source) assert.Nil(err) @@ -98,7 +97,7 @@ func TestKinesisSource_ReadFailure_NoResources(t *testing.T) { kinesisClient := testutil.GetAWSLocalstackKinesisClient() dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient() - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil, 250, 10, 10) + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil, 250, 10, 10, "test_client_name") assert.Nil(err) assert.NotNil(source) assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", source.GetID()) @@ -144,7 +143,7 @@ func TestKinesisSource_ReadMessages(t *testing.T) { time.Sleep(1 * time.Second) // Create the source and assert that it's there - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250, 10, 10) + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250, 10, 10, "test_client_name") assert.Nil(err) assert.NotNil(source) assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/kinesis-source-integration-2", source.GetID()) @@ -197,7 +196,7 @@ func TestKinesisSource_StartTimestamp(t *testing.T) { } // Create the source (with start timestamp) and assert that it's there - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, &timeToStart, 250, 10, 10) + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, &timeToStart, 250, 10, 10, "test_client_name") assert.Nil(err) assert.NotNil(source) assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/kinesis-source-integration-3", source.GetID()) @@ -281,9 +280,10 @@ func TestGetSource_WithKinesisSource(t *testing.T) { func TestKinesisSourceHCL(t *testing.T) { testFixPath := filepath.Join(assets.AssetsRootDir, "test", "source", "configs") testCases := []struct { - File string - Plug config.Pluggable - Expected interface{} + File string + Plug config.Pluggable + Expected *Configuration + ClientNameUUID bool }{ { File: "source-kinesis-simple.hcl", @@ -299,6 +299,7 @@ func TestKinesisSourceHCL(t *testing.T) { ShardCheckFreqSeconds: 10, LeaderActionFreqSeconds: 60, }, + ClientNameUUID: true, }, { File: "source-kinesis-extended.hcl", @@ -313,7 +314,9 @@ func TestKinesisSourceHCL(t *testing.T) { ReadThrottleDelayMs: 250, ShardCheckFreqSeconds: 10, LeaderActionFreqSeconds: 60, + ClientName: "test_client_name", }, + ClientNameUUID: false, }, } @@ -339,11 +342,28 @@ func TestKinesisSourceHCL(t *testing.T) { assert.NotNil(result) assert.Nil(err) - if !reflect.DeepEqual(result, tt.Expected) { - t.Errorf("GOT:\n%s\nEXPECTED:\n%s", - spew.Sdump(result), - spew.Sdump(tt.Expected)) + resultConf, ok := result.(*Configuration) + if !ok { + t.Fatal("result is not of type pointer to Configuration") } + + assert.Equal(resultConf.StreamName, tt.Expected.StreamName) + assert.Equal(resultConf.Region, tt.Expected.Region) + assert.Equal(resultConf.AppName, tt.Expected.AppName) + assert.Equal(resultConf.RoleARN, tt.Expected.RoleARN) + assert.Equal(resultConf.StartTimestamp, tt.Expected.StartTimestamp) + assert.Equal(resultConf.ConcurrentWrites, tt.Expected.ConcurrentWrites) + assert.Equal(resultConf.ReadThrottleDelayMs, tt.Expected.ReadThrottleDelayMs) + assert.Equal(resultConf.ShardCheckFreqSeconds, tt.Expected.ShardCheckFreqSeconds) + assert.Equal(resultConf.LeaderActionFreqSeconds, tt.Expected.LeaderActionFreqSeconds) + + if !tt.ClientNameUUID { + assert.Equal(resultConf.ClientName, tt.Expected.ClientName) + } else { + _, err := uuid.Parse(resultConf.ClientName) + assert.Nil(err) + } + }) } }