Skip to content

Commit

Permalink
Merge pull request #23 from snowplow-devops/develop
Browse files Browse the repository at this point in the history
Release 1.4.0
  • Loading branch information
adatzer authored Nov 19, 2024
2 parents 4e19415 + fdafe31 commit a7a42d9
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 5 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
integration-reset: integration-down integration-up

integration-up:
(cd integration && docker-compose -f ./docker-compose.yml up -d)
(cd integration && docker compose -f ./docker-compose.yml up -d)
sleep 5

integration-down:
(cd integration && docker-compose -f ./docker-compose.yml down)
(cd integration && docker compose -f ./docker-compose.yml down)
rm -rf integration/.localstack

integration: integration-up
Expand Down
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type Config struct {
dynamoWriteCapacity int64
// Time to wait between attempts to verify tables were created/deleted completely
dynamoWaiterDelay time.Duration

// use ListShards to avoid LimitExceedException from DescribeStream
useListShardsForKinesisStreamReady bool
}

// NewConfig returns a default Config struct
Expand Down Expand Up @@ -148,6 +151,12 @@ func (c Config) WithLogger(logger Logger) Config {
return c
}

// WithUseListShardsForKinesisStreamReady returns a config with a modified useListShardsForKinesisStreamReady toggle
func (c Config) WithUseListShardsForKinesisStreamReady(shouldUse bool) Config {
c.useListShardsForKinesisStreamReady = shouldUse
return c
}

// Verify that a config struct has sane and valid values
func validateConfig(c *Config) error {
if c.throttleDelay < 200*time.Millisecond {
Expand Down
4 changes: 3 additions & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func TestConfigWithMethods(t *testing.T) {
WithLeaderActionFrequency(1 * time.Second).
WithThrottleDelay(1 * time.Second).
WithStats(stats).
WithIteratorStartTimestamp(&tstamp)
WithIteratorStartTimestamp(&tstamp).
WithUseListShardsForKinesisStreamReady(true)

err := validateConfig(&config)
require.NoError(t, err)
Expand All @@ -76,4 +77,5 @@ func TestConfigWithMethods(t *testing.T) {
require.Equal(t, 1*time.Second, config.leaderActionFrequency)
require.Equal(t, stats, config.stats)
require.Equal(t, &tstamp, config.iteratorStartTimestamp)
require.Equal(t, true, config.useListShardsForKinesisStreamReady)
}
10 changes: 9 additions & 1 deletion kinsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,15 @@ func (k *Kinsumer) dynamoDeleteTableIfExists(name string) error {

// kinesisStreamReady returns an error if the given stream is not ACTIVE
func (k *Kinsumer) kinesisStreamReady() error {
if k.config.useListShardsForKinesisStreamReady {
_, err := k.kinesis.ListShards(&kinesis.ListShardsInput{
StreamName: aws.String(k.streamName),
})
if err != nil {
return fmt.Errorf("error listing shards for stream %s: %v", k.streamName, err)
}
return nil
}
out, err := k.kinesis.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: aws.String(k.streamName),
})
Expand All @@ -336,7 +345,6 @@ func (k *Kinsumer) kinesisStreamReady() error {
if status != "ACTIVE" && status != "UPDATING" {
return fmt.Errorf("stream %s exists but state '%s' is not 'ACTIVE' or 'UPDATING'", k.streamName, status)
}

return nil
}

Expand Down
18 changes: 17 additions & 1 deletion shard_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,25 @@ mainloop:
if awsErr.OrigErr() != nil {
origErrStr = fmt.Sprintf("(%s) ", awsErr.OrigErr())
}
k.config.logger.Log("Got error: %s %s %sretry count is %d / %d", awsErr.Code(), awsErr.Message(), origErrStr, retryCount, maxErrorRetries)

switch awsErr.Code() {
case kinesis.ErrCodeExpiredIteratorException:
k.config.logger.Log("Got error: %s %s %s", awsErr.Code(), awsErr.Message(), origErrStr)
newIterator, ierr := getShardIterator(k.kinesis, k.streamName, shardID, lastSeqToCheckp, nil)
if ierr != nil {
k.shardErrors <- shardConsumerError{shardID: shardID, action: "getShardIterator", err: err}
return
}
iterator = newIterator

// retry infinitely after expired iterator is renewed successfully
continue mainloop
}

// Only retry for errors that should be retried; notably, don't retry serialization errors because something bad is happening
shouldRetry := request.IsErrorRetryable(err) || request.IsErrorThrottle(err)
k.config.logger.Log("Got error: %s %s %sretry count is %d / %d", awsErr.Code(), awsErr.Message(), origErrStr, retryCount, maxErrorRetries)

if shouldRetry && retryCount < maxErrorRetries {
retryCount++

Expand Down

0 comments on commit a7a42d9

Please sign in to comment.