Skip to content

Commit

Permalink
Add option to set kinsumer client name
Browse files Browse the repository at this point in the history
  • Loading branch information
adatzer committed Dec 5, 2024
1 parent b49d0d2 commit 212b3df
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 23 deletions.
3 changes: 3 additions & 0 deletions assets/docs/configuration/sources/kinesis-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,8 @@ source {

# Maximum concurrent goroutines (lightweight threads) for message processing (default: 50)
concurrent_writes = 15

# The name of the Kinesis client that is used to allocate shards. It must be unique per instance of Snowbridge.
client_name = env.HOSTNAME
}
}
1 change: 1 addition & 0 deletions assets/test/source/configs/source-kinesis-extended.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ source {
app_name = "testApp"
start_timestamp = "2022-03-15 07:52:53"
concurrent_writes = 51
client_name = "test_client_name"
}
}
1 change: 1 addition & 0 deletions docs/configuration_source_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestSourceDocumentation(t *testing.T) {
// Set env vars referenced in the config examples
t.Setenv("MY_AUTH_PASSWORD", "test")
t.Setenv("SASL_PASSWORD", "test")
t.Setenv("HOSTNAME", "hostname")

sourcesToTest := []string{"kafka", "kinesis", "pubsub", "sqs", "stdin"}

Expand Down
2 changes: 1 addition & 1 deletion docs/configuration_transformations_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestBuiltinTransformationDocumentation(t *testing.T) {
}

func TestBuiltinSnowplowTransformationDocumentation(t *testing.T) {
transformationsToTest := []string{"spEnrichedFilter", "spEnrichedFilterContext", "spEnrichedFilterUnstructEvent", "spEnrichedSetPk", "spEnrichedToJson", "spGtmssPreview"}
transformationsToTest := []string{"spEnrichedFilter", "spEnrichedFilterContext", "spEnrichedFilterUnstructEvent", "spEnrichedSetPk", "spEnrichedToJson", "spGtmssPreview"}

for _, tfm := range transformationsToTest {

Expand Down
19 changes: 10 additions & 9 deletions pkg/source/kinesis/kinesis_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Configuration struct {
ShardCheckFreqSeconds int `hcl:"shard_check_freq_seconds,optional"`
LeaderActionFreqSeconds int `hcl:"leader_action_freq_seconds,optional"`
ConcurrentWrites int `hcl:"concurrent_writes,optional"`
ClientName string `hcl:"client_name,optional"`
}

// --- Kinesis source
Expand Down Expand Up @@ -87,7 +88,8 @@ func configFunctionGeneratorWithInterfaces(kinesisClient kinesisiface.KinesisAPI
&iteratorTstamp,
c.ReadThrottleDelayMs,
c.ShardCheckFreqSeconds,
c.LeaderActionFreqSeconds)
c.LeaderActionFreqSeconds,
c.ClientName)
}
}

Expand Down Expand Up @@ -119,12 +121,16 @@ func (f adapter) Create(i interface{}) (interface{}, error) {

// ProvideDefault implements the ComponentConfigurable interface.
func (f adapter) ProvideDefault() (interface{}, error) {
// Ensures as even as possible distribution of UUIDs
uuid.EnableRandPool()

// Provide defaults
cfg := &Configuration{
ReadThrottleDelayMs: 250, // Kinsumer default is 250ms
ConcurrentWrites: 50,
ShardCheckFreqSeconds: 10,
LeaderActionFreqSeconds: 60,
ClientName: uuid.New().String(),
}

return cfg, nil
Expand Down Expand Up @@ -171,7 +177,8 @@ func newKinesisSourceWithInterfaces(
startTimestamp *time.Time,
readThrottleDelay int,
shardCheckFreq int,
leaderActionFreq int) (*kinesisSource, error) {
leaderActionFreq int,
clientName string) (*kinesisSource, error) {

config := kinsumer.NewConfig().
WithShardCheckFrequency(time.Duration(shardCheckFreq) * time.Second).
Expand All @@ -181,13 +188,7 @@ func newKinesisSourceWithInterfaces(
WithIteratorStartTimestamp(startTimestamp).
WithThrottleDelay(time.Duration(readThrottleDelay) * time.Millisecond)

// Ensures as even as possible distribution of UUIDs
uuid.EnableRandPool()

// TODO: See if the client name can be reused to survive same node reboots
name := uuid.New().String()

k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, name, config)
k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, clientName, config)
if err != nil {
return nil, errors.Wrap(err, "Failed to create Kinsumer client")
}
Expand Down
46 changes: 33 additions & 13 deletions pkg/source/kinesis/kinesis_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ import (
"fmt"
"os"
"path/filepath"
"reflect"
"testing"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/google/uuid"
"github.com/hashicorp/hcl/v2/hclparse"
"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -63,7 +62,7 @@ func TestNewKinesisSourceWithInterfaces_Success(t *testing.T) {

defer testutil.DeleteAWSLocalstackDynamoDBTables(dynamodbClient, appName)

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250, 10, 10)
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250, 10, 10, "test_client_name")

assert.IsType(&kinesisSource{}, source)
assert.Nil(err)
Expand Down Expand Up @@ -98,7 +97,7 @@ func TestKinesisSource_ReadFailure_NoResources(t *testing.T) {
kinesisClient := testutil.GetAWSLocalstackKinesisClient()
dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient()

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil, 250, 10, 10)
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil, 250, 10, 10, "test_client_name")
assert.Nil(err)
assert.NotNil(source)
assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", source.GetID())
Expand Down Expand Up @@ -144,7 +143,7 @@ func TestKinesisSource_ReadMessages(t *testing.T) {
time.Sleep(1 * time.Second)

// Create the source and assert that it's there
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250, 10, 10)
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250, 10, 10, "test_client_name")
assert.Nil(err)
assert.NotNil(source)
assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/kinesis-source-integration-2", source.GetID())
Expand Down Expand Up @@ -197,7 +196,7 @@ func TestKinesisSource_StartTimestamp(t *testing.T) {
}

// Create the source (with start timestamp) and assert that it's there
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, &timeToStart, 250, 10, 10)
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, &timeToStart, 250, 10, 10, "test_client_name")
assert.Nil(err)
assert.NotNil(source)
assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/kinesis-source-integration-3", source.GetID())
Expand Down Expand Up @@ -281,9 +280,10 @@ func TestGetSource_WithKinesisSource(t *testing.T) {
func TestKinesisSourceHCL(t *testing.T) {
testFixPath := filepath.Join(assets.AssetsRootDir, "test", "source", "configs")
testCases := []struct {
File string
Plug config.Pluggable
Expected interface{}
File string
Plug config.Pluggable
Expected *Configuration
ClientNameUUID bool
}{
{
File: "source-kinesis-simple.hcl",
Expand All @@ -299,6 +299,7 @@ func TestKinesisSourceHCL(t *testing.T) {
ShardCheckFreqSeconds: 10,
LeaderActionFreqSeconds: 60,
},
ClientNameUUID: true,
},
{
File: "source-kinesis-extended.hcl",
Expand All @@ -313,7 +314,9 @@ func TestKinesisSourceHCL(t *testing.T) {
ReadThrottleDelayMs: 250,
ShardCheckFreqSeconds: 10,
LeaderActionFreqSeconds: 60,
ClientName: "test_client_name",
},
ClientNameUUID: false,
},
}

Expand All @@ -339,11 +342,28 @@ func TestKinesisSourceHCL(t *testing.T) {
assert.NotNil(result)
assert.Nil(err)

if !reflect.DeepEqual(result, tt.Expected) {
t.Errorf("GOT:\n%s\nEXPECTED:\n%s",
spew.Sdump(result),
spew.Sdump(tt.Expected))
resultConf, ok := result.(*Configuration)
if !ok {
t.Fatal("result is not of type pointer to Configuration")
}

assert.Equal(resultConf.StreamName, tt.Expected.StreamName)
assert.Equal(resultConf.Region, tt.Expected.Region)
assert.Equal(resultConf.AppName, tt.Expected.AppName)
assert.Equal(resultConf.RoleARN, tt.Expected.RoleARN)
assert.Equal(resultConf.StartTimestamp, tt.Expected.StartTimestamp)
assert.Equal(resultConf.ConcurrentWrites, tt.Expected.ConcurrentWrites)
assert.Equal(resultConf.ReadThrottleDelayMs, tt.Expected.ReadThrottleDelayMs)
assert.Equal(resultConf.ShardCheckFreqSeconds, tt.Expected.ShardCheckFreqSeconds)
assert.Equal(resultConf.LeaderActionFreqSeconds, tt.Expected.LeaderActionFreqSeconds)

if !tt.ClientNameUUID {
assert.Equal(resultConf.ClientName, tt.Expected.ClientName)
} else {
_, err := uuid.Parse(resultConf.ClientName)
assert.Nil(err)
}

})
}
}
Expand Down

0 comments on commit 212b3df

Please sign in to comment.