Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make kinesis source scaling checks configurable (close #313) #314

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion assets/docs/configuration/sources/kinesis-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ source {
# Default is 250, cannot be set to lower than 200.
read_throttle_delay_ms = 500

# Optional configures how often each kinesis consumer checks for whether it needs to change which shards it owns
shard_check_freq_seconds = 15

# Optional configures how often the kinesis client checks the stream for shard count changes, which triggers consumer ownership changes
leader_action_freq_seconds = 305

# Maximum concurrent goroutines (lightweight threads) for message processing (default: 50)
concurrent_writes = 15
}
}
}
47 changes: 32 additions & 15 deletions pkg/source/kinesis/kinesis_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ import (

// Configuration configures the source for records pulled
type Configuration struct {
StreamName string `hcl:"stream_name" env:"SOURCE_KINESIS_STREAM_NAME"`
Region string `hcl:"region" env:"SOURCE_KINESIS_REGION"`
AppName string `hcl:"app_name" env:"SOURCE_KINESIS_APP_NAME"`
RoleARN string `hcl:"role_arn,optional" env:"SOURCE_KINESIS_ROLE_ARN"`
StartTimestamp string `hcl:"start_timestamp,optional" env:"SOURCE_KINESIS_START_TIMESTAMP"` // Timestamp for the kinesis shard iterator to begin processing. Format YYYY-MM-DD HH:MM:SS.MS (miliseconds optional)
ReadThrottleDelayMs int `hcl:"read_throttle_delay_ms,optional" env:"SOURCE_KINESIS_READ_THROTTLE_DELAY_MS"`
CustomAWSEndpoint string `hcl:"custom_aws_endpoint,optional" env:"SOURCE_CUSTOM_AWS_ENDPOINT"`
ConcurrentWrites int `hcl:"concurrent_writes,optional" env:"SOURCE_CONCURRENT_WRITES"`
StreamName string `hcl:"stream_name" env:"SOURCE_KINESIS_STREAM_NAME"`
Region string `hcl:"region" env:"SOURCE_KINESIS_REGION"`
AppName string `hcl:"app_name" env:"SOURCE_KINESIS_APP_NAME"`
RoleARN string `hcl:"role_arn,optional" env:"SOURCE_KINESIS_ROLE_ARN"`
StartTimestamp string `hcl:"start_timestamp,optional" env:"SOURCE_KINESIS_START_TIMESTAMP"` // Timestamp for the kinesis shard iterator to begin processing. Format YYYY-MM-DD HH:MM:SS.MS (miliseconds optional)
ReadThrottleDelayMs int `hcl:"read_throttle_delay_ms,optional" env:"SOURCE_KINESIS_READ_THROTTLE_DELAY_MS"`
CustomAWSEndpoint string `hcl:"custom_aws_endpoint,optional" env:"SOURCE_CUSTOM_AWS_ENDPOINT"`
ShardCheckFreqSeconds int `hcl:"shard_check_freq_seconds,optional" env:"SHARD_CHECK_FREQ_SECONDS"`
LeaderActionFreqSeconds int `hcl:"leader_action_freq_seconds,optional" env:"LEADER_ACTION_FREQ_SECONDS"`
ConcurrentWrites int `hcl:"concurrent_writes,optional" env:"SOURCE_CONCURRENT_WRITES"`
}

// --- Kinesis source
Expand Down Expand Up @@ -79,7 +81,9 @@ func configFunctionGeneratorWithInterfaces(kinesisClient kinesisiface.KinesisAPI
c.StreamName,
c.AppName,
&iteratorTstamp,
c.ReadThrottleDelayMs)
c.ReadThrottleDelayMs,
c.ShardCheckFreqSeconds,
c.LeaderActionFreqSeconds)
}
}

Expand Down Expand Up @@ -113,8 +117,10 @@ func (f adapter) Create(i interface{}) (interface{}, error) {
func (f adapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &Configuration{
ReadThrottleDelayMs: 250, // Kinsumer default is 250ms
ConcurrentWrites: 50,
ReadThrottleDelayMs: 250, // Kinsumer default is 250ms
ConcurrentWrites: 50,
ShardCheckFreqSeconds: 10,
LeaderActionFreqSeconds: 300,
}

return cfg, nil
Expand Down Expand Up @@ -150,11 +156,22 @@ func (kl *KinsumerLogrus) Log(format string, v ...interface{}) {

// newKinesisSourceWithInterfaces allows you to provide a Kinesis + DynamoDB client directly to allow
// for mocking and localstack usage
func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynamodbClient dynamodbiface.DynamoDBAPI, awsAccountID string, concurrentWrites int, region string, streamName string, appName string, startTimestamp *time.Time, readThrottleDelay int) (*kinesisSource, error) {
// TODO: Add statistics monitoring to be able to report on consumer latency
func newKinesisSourceWithInterfaces(
kinesisClient kinesisiface.KinesisAPI,
dynamodbClient dynamodbiface.DynamoDBAPI,
awsAccountID string,
concurrentWrites int,
region string,
streamName string,
appName string,
startTimestamp *time.Time,
readThrottleDelay int,
shardCheckFreq int,
leaderActionFreq int) (*kinesisSource, error) {

config := kinsumer.NewConfig().
WithShardCheckFrequency(10 * time.Second).
WithLeaderActionFrequency(10 * time.Second).
WithShardCheckFrequency(time.Duration(shardCheckFreq) * time.Second).
WithLeaderActionFrequency(time.Duration(leaderActionFreq) * time.Second).
WithManualCheckpointing(true).
WithLogger(&KinsumerLogrus{}).
WithIteratorStartTimestamp(startTimestamp).
Expand Down
40 changes: 22 additions & 18 deletions pkg/source/kinesis/kinesis_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestNewKinesisSourceWithInterfaces_Success(t *testing.T) {

defer testutil.DeleteAWSLocalstackDynamoDBTables(dynamodbClient, appName)

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250)
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250, 10, 10)

assert.IsType(&kinesisSource{}, source)
assert.Nil(err)
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestKinesisSource_ReadFailure_NoResources(t *testing.T) {
kinesisClient := testutil.GetAWSLocalstackKinesisClient()
dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient()

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil, 250)
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil, 250, 10, 10)
assert.Nil(err)
assert.NotNil(source)
assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", source.GetID())
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestKinesisSource_ReadMessages(t *testing.T) {
time.Sleep(1 * time.Second)

// Create the source and assert that it's there
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250)
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 250, 10, 10)
assert.Nil(err)
assert.NotNil(source)
assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/kinesis-source-integration-2", source.GetID())
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestKinesisSource_StartTimestamp(t *testing.T) {
}

// Create the source (with start timestamp) and assert that it's there
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, &timeToStart, 250)
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, &timeToStart, 250, 10, 10)
assert.Nil(err)
assert.NotNil(source)
assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/kinesis-source-integration-3", source.GetID())
Expand Down Expand Up @@ -270,26 +270,30 @@ func TestKinesisSourceHCL(t *testing.T) {
File: "source-kinesis-simple.hcl",
Plug: testKinesisSourceAdapter(testKinesisSourceFunc),
Expected: &Configuration{
StreamName: "testStream",
Region: "us-test-1",
AppName: "testApp",
RoleARN: "",
StartTimestamp: "",
ConcurrentWrites: 50,
ReadThrottleDelayMs: 250,
StreamName: "testStream",
Region: "us-test-1",
AppName: "testApp",
RoleARN: "",
StartTimestamp: "",
ConcurrentWrites: 50,
ReadThrottleDelayMs: 250,
ShardCheckFreqSeconds: 10,
LeaderActionFreqSeconds: 300,
},
},
{
File: "source-kinesis-extended.hcl",
Plug: testKinesisSourceAdapter(testKinesisSourceFunc),
Expected: &Configuration{
StreamName: "testStream",
Region: "us-test-1",
AppName: "testApp",
RoleARN: "xxx-test-role-arn",
StartTimestamp: "2022-03-15 07:52:53",
ConcurrentWrites: 51,
ReadThrottleDelayMs: 250,
StreamName: "testStream",
Region: "us-test-1",
AppName: "testApp",
RoleARN: "xxx-test-role-arn",
StartTimestamp: "2022-03-15 07:52:53",
ConcurrentWrites: 51,
ReadThrottleDelayMs: 250,
ShardCheckFreqSeconds: 10,
LeaderActionFreqSeconds: 300,
},
},
}
Expand Down
Loading