diff --git a/integration/docker-compose.yml b/integration/docker-compose.yml index 5d834451..d6b1b9c2 100644 --- a/integration/docker-compose.yml +++ b/integration/docker-compose.yml @@ -15,6 +15,7 @@ services: max-file: "10" environment: - SERVICES=sqs,kinesis,dynamodb,sts + - KINESIS_ERROR_PROBABILITY=0.5 pubsub: image: bigtruedata/gcloud-pubsub-emulator diff --git a/pkg/target/kinesis.go b/pkg/target/kinesis.go index cd7fdfdc..77b3bdbf 100644 --- a/pkg/target/kinesis.go +++ b/pkg/target/kinesis.go @@ -15,6 +15,8 @@ import ( "fmt" "time" + rand "math/rand/v2" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" @@ -123,7 +125,6 @@ func AdaptKinesisTargetFunc(f func(c *KinesisTargetConfig) (*KinesisTarget, erro } // Write pushes all messages to the required target -// TODO: Should each put be in its own goroutine? func (kt *KinesisTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { kt.log.Debugf("Writing %d messages to stream ...", len(messages)) @@ -161,74 +162,112 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit messageCount := int64(len(messages)) kt.log.Debugf("Writing chunk of %d messages to stream ...", messageCount) - entries := make([]*kinesis.PutRecordsRequestEntry, messageCount) - for i := 0; i < len(entries); i++ { - msg := messages[i] - entries[i] = &kinesis.PutRecordsRequestEntry{ - Data: msg.Data, - PartitionKey: aws.String(msg.PartitionKey), - } - } + messagesToTry := messages + success := make([]*models.Message, 0) + nonTrottleFailures := make([]*models.Message, 0) + errorsEncountered := make([]error, 0) - requestStarted := time.Now() - res, err := kt.client.PutRecords(&kinesis.PutRecordsInput{ - Records: entries, - StreamName: aws.String(kt.streamName), - }) - requestFinished := time.Now() + retryDelay := 100 * time.Millisecond - for _, msg := range messages { - msg.TimeRequestStarted = requestStarted - msg.TimeRequestFinished = requestFinished - } + for { + // We loop through until we have no throttle errors + entries := make([]*kinesis.PutRecordsRequestEntry, len(messagesToTry)) - if err != nil { - failed := messages - - return models.NewTargetWriteResult( - nil, - failed, - nil, - nil, - ), errors.Wrap(err, "Failed to send message batch to Kinesis stream") - } + for i := 0; i < len(entries); i++ { + msg := messagesToTry[i] + entries[i] = &kinesis.PutRecordsRequestEntry{ + Data: msg.Data, + PartitionKey: aws.String(msg.PartitionKey), + } + } - // TODO: Can we ack successful messages when some fail in the batch? This will cause duplicate processing on failure. - if res.FailedRecordCount != nil && *res.FailedRecordCount > int64(0) { - failed := messages + requestStarted := time.Now() + res, err := kt.client.PutRecords(&kinesis.PutRecordsInput{ + Records: entries, + StreamName: aws.String(kt.streamName), + }) + requestFinished := time.Now() + + // Assign timings + // These will only get recorded in metrics once the messages are successful + for _, msg := range messagesToTry { + msg.TimeRequestStarted = requestStarted + msg.TimeRequestFinished = requestFinished + } - // Wrap produces nil if the initial error is nil, so create an empty error instead - kinesisErrs := errors.New("") + if err != nil { + // Where the attempt to make a Put request throws an error, treat the whole thing as failed. + nonTrottleFailures = messagesToTry - for _, record := range res.Records { - if record.ErrorMessage != nil { - kinesisErrs = errors.Wrap(kinesisErrs, *record.ErrorMessage) + errorsEncountered = append(errorsEncountered, errors.Wrap(err, "Failed to send message batch to Kinesis stream")) + } + + throttled := make([]*models.Message, 0) + throttleMsgs := make([]string, 0) + for i, resultRecord := range res.Records { + // If we have an error code, check if it's a throttle error + if resultRecord.ErrorCode != nil { + switch *resultRecord.ErrorCode { + case "ProvisionedThroughputExceededException": + // If we got throttled, add the corresponding record to the list for next retry + throttled = append(throttled, messagesToTry[i]) + throttleMsgs = append(throttleMsgs, *resultRecord.ErrorMessage) + default: + // If it's a different error, treat it as a failure - retries for this will be handled by the main flow of the app + errorsEncountered = append(errorsEncountered, errors.New(*resultRecord.ErrorMessage)) + nonTrottleFailures = append(nonTrottleFailures, messagesToTry[i]) + } + } else { + // If there is no error, ack and treat as success + if messagesToTry[i].AckFunc != nil { + messagesToTry[i].AckFunc() + } + success = append(success, messagesToTry[i]) } } + if len(throttled) > 0 { + // Assign throttles to be tried next loop + messagesToTry = throttled - return models.NewTargetWriteResult( - nil, - failed, - nil, - nil, - ), errors.Wrap(kinesisErrs, "Failed to write all messages in batch to Kinesis stream") - } + throttleWarn := errors.New(fmt.Sprintf("hit kinesis throttling, backing off and retrying %v messages", len(throttleMsgs))) + + // Log a warning message about it + for _, msg := range throttleMsgs { + throttleWarn = errors.Wrap(throttleWarn, msg) + } + kt.log.Warn(throttleWarn.Error()) + + // Wait for the delay plus jitter before the next loop + jitter := time.Duration(1+rand.IntN(30000-1)) * time.Microsecond // any value between 1 microsecond and 30 milliseconds + time.Sleep(retryDelay + jitter) - for _, msg := range messages { - if msg.AckFunc != nil { - msg.AckFunc() + // Extend delay for next loop, to a maximum of 1s + if retryDelay < 1*time.Second { + retryDelay = retryDelay + 100*time.Millisecond + } + } else { + // Break the loop and handle results if we have no throttles to retry + break } } - sent := messages + // If we got non-throttle errors, aggregate them so we can surface to the main app flow + var aggregateErr error - kt.log.Debugf("Successfully wrote %d messages", len(entries)) + if len(errorsEncountered) > 0 { + aggregateErr = errors.New("") + for _, errToAdd := range errorsEncountered { + aggregateErr = errors.Wrap(aggregateErr, errToAdd.Error()) + } + } + + kt.log.Debugf("Successfully wrote %d messages, with %d failures", len(success), len(nonTrottleFailures)) return models.NewTargetWriteResult( - sent, - nil, + success, + nonTrottleFailures, nil, nil, - ), nil + ), aggregateErr } // Open does not do anything for this target