Skip to content

Commit

Permalink
Log a warning when we encounter throttling
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Aug 30, 2024
1 parent 1e2ac20 commit ddf98f9
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions pkg/target/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit
// We loop through until we have no throttle errors
entries := make([]*kinesis.PutRecordsRequestEntry, len(messagesToTry))

// TODO: We need to reconstruct this each time
for i := 0; i < len(entries); i++ {
msg := messagesToTry[i]
entries[i] = &kinesis.PutRecordsRequestEntry{
Expand All @@ -196,7 +195,6 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit
msg.TimeRequestStarted = requestStarted
msg.TimeRequestFinished = requestFinished
}
// TODO: We can do this in the loop below instead

// If the entire request errors, log timings and set all messages as failed
if err != nil {
Expand All @@ -208,19 +206,19 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit
}

throttled := make([]*models.Message, 0)
throttleMsgs := make([]string, 0)
for i, resultRecord := range res.Records {
// If we have an error code, check if it's a throttle error
if resultRecord.ErrorCode != nil {
switch *resultRecord.ErrorCode {
case "ProvisionedThroughputExceededException":
// If we got throttled, add the corresponding record to the list for next retry
throttled = append(throttled, messagesToTry[i])
throttleMsgs = append(throttleMsgs, *resultRecord.ErrorMessage)
default:
// If it's another error, treat this as a failure
errorsEncountered = append(errorsEncountered, errors.New(*resultRecord.ErrorMessage))
nonTrottleFailures = append(nonTrottleFailures, messagesToTry[i])

// TODO: MAYBE THIS CAN BE BETTER??? - REVIEW
}
} else {
// If there is no error, the record was a success!
Expand All @@ -234,7 +232,13 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit
// Assign throttles to be tried next loop
messagesToTry = throttled

// TODO - log a warning or info here
throttleWarn := errors.New(fmt.Sprintf("hit kinesis throttling, backing off and retrying %v messages", len(throttleMsgs)))

// Log a warning message about it
for _, msg := range throttleMsgs {
throttleWarn = errors.Wrap(throttleWarn, msg)
}
kt.log.Warn(throttleWarn.Error())

// Wait for the delay plus jitter before the next loop
jitter := time.Duration(1+rand.IntN(30000-1)) * time.Microsecond // any value between 1 microsecond and 30 milliseconds
Expand Down

0 comments on commit ddf98f9

Please sign in to comment.