Skip to content

Commit

Permalink
Handle throttles in kinesis target
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Sep 4, 2024
1 parent b53bee6 commit d92c5dd
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 52 deletions.
2 changes: 2 additions & 0 deletions integration/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ services:
max-file: "10"
environment:
- SERVICES=sqs,kinesis,dynamodb,sts
# Kinesis target handles throttling, but it breaks source tests. Configuration added here so we can manually configure testing with throttling for the target.
- KINESIS_ERROR_PROBABILITY=0.0

pubsub:
image: bigtruedata/gcloud-pubsub-emulator
Expand Down
143 changes: 91 additions & 52 deletions pkg/target/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"fmt"
"time"

rand "math/rand/v2"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
Expand Down Expand Up @@ -123,7 +125,6 @@ func AdaptKinesisTargetFunc(f func(c *KinesisTargetConfig) (*KinesisTarget, erro
}

// Write pushes all messages to the required target
// TODO: Should each put be in its own goroutine?
func (kt *KinesisTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) {
kt.log.Debugf("Writing %d messages to stream ...", len(messages))

Expand Down Expand Up @@ -161,74 +162,112 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit
messageCount := int64(len(messages))
kt.log.Debugf("Writing chunk of %d messages to stream ...", messageCount)

entries := make([]*kinesis.PutRecordsRequestEntry, messageCount)
for i := 0; i < len(entries); i++ {
msg := messages[i]
entries[i] = &kinesis.PutRecordsRequestEntry{
Data: msg.Data,
PartitionKey: aws.String(msg.PartitionKey),
}
}
messagesToTry := messages
success := make([]*models.Message, 0)
nonThrottleFailures := make([]*models.Message, 0)
errorsEncountered := make([]error, 0)

requestStarted := time.Now()
res, err := kt.client.PutRecords(&kinesis.PutRecordsInput{
Records: entries,
StreamName: aws.String(kt.streamName),
})
requestFinished := time.Now()
retryDelay := 100 * time.Millisecond

for _, msg := range messages {
msg.TimeRequestStarted = requestStarted
msg.TimeRequestFinished = requestFinished
}
for {
// We loop through until we have no throttle errors
entries := make([]*kinesis.PutRecordsRequestEntry, len(messagesToTry))

if err != nil {
failed := messages

return models.NewTargetWriteResult(
nil,
failed,
nil,
nil,
), errors.Wrap(err, "Failed to send message batch to Kinesis stream")
}
for i := 0; i < len(entries); i++ {
msg := messagesToTry[i]
entries[i] = &kinesis.PutRecordsRequestEntry{
Data: msg.Data,
PartitionKey: aws.String(msg.PartitionKey),
}
}

// TODO: Can we ack successful messages when some fail in the batch? This will cause duplicate processing on failure.
if res.FailedRecordCount != nil && *res.FailedRecordCount > int64(0) {
failed := messages
requestStarted := time.Now()
res, err := kt.client.PutRecords(&kinesis.PutRecordsInput{
Records: entries,
StreamName: aws.String(kt.streamName),
})
requestFinished := time.Now()

// Assign timings
// These will only get recorded in metrics once the messages are successful
for _, msg := range messagesToTry {
msg.TimeRequestStarted = requestStarted
msg.TimeRequestFinished = requestFinished
}

// Wrap produces nil if the initial error is nil, so create an empty error instead
kinesisErrs := errors.New("")
if err != nil {
// Where the attempt to make a Put request throws an error, treat the whole thing as failed.
nonThrottleFailures = messagesToTry

for _, record := range res.Records {
if record.ErrorMessage != nil {
kinesisErrs = errors.Wrap(kinesisErrs, *record.ErrorMessage)
errorsEncountered = append(errorsEncountered, errors.Wrap(err, "Failed to send message batch to Kinesis stream"))
}

throttled := make([]*models.Message, 0)
throttleMsgs := make([]string, 0)
for i, resultRecord := range res.Records {
// If we have an error code, check if it's a throttle error
if resultRecord.ErrorCode != nil {
switch *resultRecord.ErrorCode {
case "ProvisionedThroughputExceededException":
// If we got throttled, add the corresponding record to the list for next retry
throttled = append(throttled, messagesToTry[i])
throttleMsgs = append(throttleMsgs, *resultRecord.ErrorMessage)
default:
// If it's a different error, treat it as a failure - retries for this will be handled by the main flow of the app
errorsEncountered = append(errorsEncountered, errors.New(*resultRecord.ErrorMessage))
nonThrottleFailures = append(nonThrottleFailures, messagesToTry[i])
}
} else {
// If there is no error, ack and treat as success
if messagesToTry[i].AckFunc != nil {
messagesToTry[i].AckFunc()
}
success = append(success, messagesToTry[i])
}
}
if len(throttled) > 0 {
// Assign throttles to be tried next loop
messagesToTry = throttled

return models.NewTargetWriteResult(
nil,
failed,
nil,
nil,
), errors.Wrap(kinesisErrs, "Failed to write all messages in batch to Kinesis stream")
}
throttleWarn := errors.New(fmt.Sprintf("hit kinesis throttling, backing off and retrying %v messages", len(throttleMsgs)))

// Log a warning message about it
for _, msg := range throttleMsgs {
throttleWarn = errors.Wrap(throttleWarn, msg)
}
kt.log.Warn(throttleWarn.Error())

// Wait for the delay plus jitter before the next loop
jitter := time.Duration(1+rand.IntN(30000-1)) * time.Microsecond // any value between 1 microsecond and 30 milliseconds
time.Sleep(retryDelay + jitter)

for _, msg := range messages {
if msg.AckFunc != nil {
msg.AckFunc()
// Extend delay for next loop, to a maximum of 1s
if retryDelay < 1*time.Second {
retryDelay = retryDelay + 100*time.Millisecond
}
} else {
// Break the loop and handle results if we have no throttles to retry
break
}
}

sent := messages
// If we got non-throttle errors, aggregate them so we can surface to the main app flow
var aggregateErr error

kt.log.Debugf("Successfully wrote %d messages", len(entries))
if len(errorsEncountered) > 0 {
aggregateErr = errors.New("")
for _, errToAdd := range errorsEncountered {
aggregateErr = errors.Wrap(aggregateErr, errToAdd.Error())
}
}

kt.log.Debugf("Successfully wrote %d messages, with %d failures", len(success), len(nonThrottleFailures))
return models.NewTargetWriteResult(
sent,
nil,
success,
nonThrottleFailures,
nil,
nil,
), nil
), aggregateErr
}

// Open does not do anything for this target
Expand Down

0 comments on commit d92c5dd

Please sign in to comment.