From ddf98f9fe13512ae05c15370bed143843fe3df14 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Fri, 30 Aug 2024 16:49:29 +0100 Subject: [PATCH] Log a warning when we encounter throttling --- pkg/target/kinesis.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/target/kinesis.go b/pkg/target/kinesis.go index b2968f00..231f3df1 100644 --- a/pkg/target/kinesis.go +++ b/pkg/target/kinesis.go @@ -174,7 +174,6 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit // We loop through until we have no throttle errors entries := make([]*kinesis.PutRecordsRequestEntry, len(messagesToTry)) - // TODO: We need to reconstruct this each time for i := 0; i < len(entries); i++ { msg := messagesToTry[i] entries[i] = &kinesis.PutRecordsRequestEntry{ @@ -196,7 +195,6 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit msg.TimeRequestStarted = requestStarted msg.TimeRequestFinished = requestFinished } - // TODO: We can do this in the loop below instead // If the entire request errors, log timings and set all messages as failed if err != nil { @@ -208,6 +206,7 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit } throttled := make([]*models.Message, 0) + throttleMsgs := make([]string, 0) for i, resultRecord := range res.Records { // If we have an error code, check if it's a throttle error if resultRecord.ErrorCode != nil { @@ -215,12 +214,11 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit case "ProvisionedThroughputExceededException": // If we got throttled, add the corresponding record to the list for next retry throttled = append(throttled, messagesToTry[i]) + throttleMsgs = append(throttleMsgs, *resultRecord.ErrorMessage) default: // If it's another error, treat this as a failure errorsEncountered = append(errorsEncountered, errors.New(*resultRecord.ErrorMessage)) nonTrottleFailures = append(nonTrottleFailures, messagesToTry[i]) - - // TODO: MAYBE THIS CAN BE BETTER??? - REVIEW } } else { // If there is no error, the record was a success! @@ -234,7 +232,13 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit // Assign throttles to be tried next loop messagesToTry = throttled - // TODO - log a warning or info here + throttleWarn := errors.New(fmt.Sprintf("hit kinesis throttling, backing off and retrying %v messages", len(throttleMsgs))) + + // Log a warning message about it + for _, msg := range throttleMsgs { + throttleWarn = errors.Wrap(throttleWarn, msg) + } + kt.log.Warn(throttleWarn.Error()) // Wait for the delay plus jitter before the next loop jitter := time.Duration(1+rand.IntN(30000-1)) * time.Microsecond // any value between 1 microsecond and 30 milliseconds