From efe0132ef9f118553812d0eb2e6c0a2577c43e2f Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Wed, 10 Apr 2024 17:59:41 +0100 Subject: [PATCH] Make kinesis source scaling checks configurable (close #313) --- .../sources/kinesis-full-example.hcl | 8 +++- pkg/source/kinesis/kinesis_source.go | 47 +++++++++++++------ pkg/source/kinesis/kinesis_source_test.go | 40 +++++++++------- 3 files changed, 61 insertions(+), 34 deletions(-) diff --git a/assets/docs/configuration/sources/kinesis-full-example.hcl b/assets/docs/configuration/sources/kinesis-full-example.hcl index d9a7a966..ced07691 100644 --- a/assets/docs/configuration/sources/kinesis-full-example.hcl +++ b/assets/docs/configuration/sources/kinesis-full-example.hcl @@ -28,7 +28,13 @@ source { # Default is 250, cannot be set to lower than 200. read_throttle_delay_ms = 500 + # Optional configures how often each kinesis consumer checks for whether it needs to change which shards it owns + shard_check_freq_seconds = 15 + + # Optional configures how often the kinesis client checks the stream for shard count changes, which triggers consumer ownership changes + leader_action_freq_seconds = 305 + # Maximum concurrent goroutines (lightweight threads) for message processing (default: 50) concurrent_writes = 15 } -} \ No newline at end of file +} diff --git a/pkg/source/kinesis/kinesis_source.go b/pkg/source/kinesis/kinesis_source.go index 5fb15784..b190f30d 100644 --- a/pkg/source/kinesis/kinesis_source.go +++ b/pkg/source/kinesis/kinesis_source.go @@ -30,14 +30,16 @@ import ( // Configuration configures the source for records pulled type Configuration struct { - StreamName string `hcl:"stream_name" env:"SOURCE_KINESIS_STREAM_NAME"` - Region string `hcl:"region" env:"SOURCE_KINESIS_REGION"` - AppName string `hcl:"app_name" env:"SOURCE_KINESIS_APP_NAME"` - RoleARN string `hcl:"role_arn,optional" env:"SOURCE_KINESIS_ROLE_ARN"` - StartTimestamp string `hcl:"start_timestamp,optional" env:"SOURCE_KINESIS_START_TIMESTAMP"` // Timestamp for the kinesis shard iterator to begin processing. Format YYYY-MM-DD HH:MM:SS.MS (miliseconds optional) - ReadThrottleDelayMs int `hcl:"read_throttle_delay_ms,optional" env:"SOURCE_KINESIS_READ_THROTTLE_DELAY_MS"` - CustomAWSEndpoint string `hcl:"custom_aws_endpoint,optional" env:"SOURCE_CUSTOM_AWS_ENDPOINT"` - ConcurrentWrites int `hcl:"concurrent_writes,optional" env:"SOURCE_CONCURRENT_WRITES"` + StreamName string `hcl:"stream_name" env:"SOURCE_KINESIS_STREAM_NAME"` + Region string `hcl:"region" env:"SOURCE_KINESIS_REGION"` + AppName string `hcl:"app_name" env:"SOURCE_KINESIS_APP_NAME"` + RoleARN string `hcl:"role_arn,optional" env:"SOURCE_KINESIS_ROLE_ARN"` + StartTimestamp string `hcl:"start_timestamp,optional" env:"SOURCE_KINESIS_START_TIMESTAMP"` // Timestamp for the kinesis shard iterator to begin processing. Format YYYY-MM-DD HH:MM:SS.MS (miliseconds optional) + ReadThrottleDelayMs int `hcl:"read_throttle_delay_ms,optional" env:"SOURCE_KINESIS_READ_THROTTLE_DELAY_MS"` + CustomAWSEndpoint string `hcl:"custom_aws_endpoint,optional" env:"SOURCE_CUSTOM_AWS_ENDPOINT"` + ShardCheckFreqSeconds int `hcl:"shard_check_freq_seconds,optional" env:"SHARD_CHECK_FREQ_SECONDS"` + LeaderActionFreqSeconds int `hcl:"leader_action_freq_seconds,optional" env:"LEADER_ACTION_FREQ_SECONDS"` + ConcurrentWrites int `hcl:"concurrent_writes,optional" env:"SOURCE_CONCURRENT_WRITES"` } // --- Kinesis source @@ -79,7 +81,9 @@ func configFunctionGeneratorWithInterfaces(kinesisClient kinesisiface.KinesisAPI c.StreamName, c.AppName, &iteratorTstamp, - c.ReadThrottleDelayMs) + c.ReadThrottleDelayMs, + c.ShardCheckFreqSeconds, + c.LeaderActionFreqSeconds) } } @@ -113,8 +117,10 @@ func (f adapter) Create(i interface{}) (interface{}, error) { func (f adapter) ProvideDefault() (interface{}, error) { // Provide defaults cfg := &Configuration{ - ReadThrottleDelayMs: 250, // Kinsumer default is 250ms - ConcurrentWrites: 50, + ReadThrottleDelayMs: 250, // Kinsumer default is 250ms + ConcurrentWrites: 50, + ShardCheckFreqSeconds: 10, + LeaderActionFreqSeconds: 300, } return cfg, nil @@ -150,11 +156,22 @@ func (kl *KinsumerLogrus) Log(format string, v ...interface{}) { // newKinesisSourceWithInterfaces allows you to provide a Kinesis + DynamoDB client directly to allow // for mocking and localstack usage -func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynamodbClient dynamodbiface.DynamoDBAPI, awsAccountID string, concurrentWrites int, region string, streamName string, appName string, startTimestamp *time.Time, readThrottleDelay int) (*kinesisSource, error) { - // TODO: Add statistics monitoring to be able to report on consumer latency +func newKinesisSourceWithInterfaces( + kinesisClient kinesisiface.KinesisAPI, + dynamodbClient dynamodbiface.DynamoDBAPI, + awsAccountID string, + concurrentWrites int, + region string, + streamName string, + appName string, + startTimestamp *time.Time, + readThrottleDelay int, + shardCheckFreq int, + leaderActionFreq int) (*kinesisSource, error) { + config := kinsumer.NewConfig(). - WithShardCheckFrequency(10 * time.Second). - WithLeaderActionFrequency(10 * time.Second). + WithShardCheckFrequency(time.Duration(shardCheckFreq) * time.Second). + WithLeaderActionFrequency(time.Duration(leaderActionFreq) * time.Second). WithManualCheckpointing(true). WithLogger(&KinsumerLogrus{}). WithIteratorStartTimestamp(startTimestamp). diff --git a/pkg/source/kinesis/kinesis_source_test.go b/pkg/source/kinesis/kinesis_source_test.go index dcd625fd..dc38dac9 100644 --- a/pkg/source/kinesis/kinesis_source_test.go +++ b/pkg/source/kinesis/kinesis_source_test.go @@ -57,7 +57,7 @@ func TestNewKinesisSourceWithInterfaces_Success(t *testing.T) { defer testutil.DeleteAWSLocalstackDynamoDBTables(dynamodbClient, appName) - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250) + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250, 10, 10) assert.IsType(&kinesisSource{}, source) assert.Nil(err) @@ -92,7 +92,7 @@ func TestKinesisSource_ReadFailure_NoResources(t *testing.T) { kinesisClient := testutil.GetAWSLocalstackKinesisClient() dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient() - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil, 250) + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil, 250, 10, 10) assert.Nil(err) assert.NotNil(source) assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", source.GetID()) @@ -138,7 +138,7 @@ func TestKinesisSource_ReadMessages(t *testing.T) { time.Sleep(1 * time.Second) // Create the source and assert that it's there - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250) + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250, 10, 10) assert.Nil(err) assert.NotNil(source) assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/kinesis-source-integration-2", source.GetID()) @@ -191,7 +191,7 @@ func TestKinesisSource_StartTimestamp(t *testing.T) { } // Create the source (with start timestamp) and assert that it's there - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, &timeToStart, 250) + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, &timeToStart, 250, 10, 10) assert.Nil(err) assert.NotNil(source) assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/kinesis-source-integration-3", source.GetID()) @@ -270,26 +270,30 @@ func TestKinesisSourceHCL(t *testing.T) { File: "source-kinesis-simple.hcl", Plug: testKinesisSourceAdapter(testKinesisSourceFunc), Expected: &Configuration{ - StreamName: "testStream", - Region: "us-test-1", - AppName: "testApp", - RoleARN: "", - StartTimestamp: "", - ConcurrentWrites: 50, - ReadThrottleDelayMs: 250, + StreamName: "testStream", + Region: "us-test-1", + AppName: "testApp", + RoleARN: "", + StartTimestamp: "", + ConcurrentWrites: 50, + ReadThrottleDelayMs: 250, + ShardCheckFreqSeconds: 10, + LeaderActionFreqSeconds: 300, }, }, { File: "source-kinesis-extended.hcl", Plug: testKinesisSourceAdapter(testKinesisSourceFunc), Expected: &Configuration{ - StreamName: "testStream", - Region: "us-test-1", - AppName: "testApp", - RoleARN: "xxx-test-role-arn", - StartTimestamp: "2022-03-15 07:52:53", - ConcurrentWrites: 51, - ReadThrottleDelayMs: 250, + StreamName: "testStream", + Region: "us-test-1", + AppName: "testApp", + RoleARN: "xxx-test-role-arn", + StartTimestamp: "2022-03-15 07:52:53", + ConcurrentWrites: 51, + ReadThrottleDelayMs: 250, + ShardCheckFreqSeconds: 10, + LeaderActionFreqSeconds: 300, }, }, }