Skip to content

Commit

Permalink
Merge pull request #8 from francescopepe/RT/deduplicate-entries-befor…
Browse files Browse the repository at this point in the history
…e-deletion

De-duplicate message batch prior to deletion
  • Loading branch information
francescopepe authored Feb 9, 2024
2 parents cef5550 + 0b4ed39 commit 1df7df7
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions clients/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,34 @@ func (c sqsClient) ReceiveMessages() ([]messages.Message, error) {
}

func (c sqsClient) DeleteMessages(messages []messages.Message) error {
_, err := c.svc.DeleteMessageBatch(context.Background(), &awsSqs.DeleteMessageBatchInput{
Entries: c.prepareMessagesForDeletion(messages),
QueueUrl: c.receiveMessageInput.QueueUrl,
})

return err
}

// prepareMessagesForDeletion takes the processed message batch to transform into a de-duplicated struct for SQS to handle
func (c sqsClient) prepareMessagesForDeletion(messages []messages.Message) []types.DeleteMessageBatchRequestEntry {
deleteEntries := make([]types.DeleteMessageBatchRequestEntry, 0, len(messages))
processed := map[string]bool{}

for _, message := range messages {
msgId := message.Msg.(types.Message).MessageId
if _, exists := processed[*msgId]; exists {
continue
}

deleteEntries = append(deleteEntries, types.DeleteMessageBatchRequestEntry{
Id: message.Msg.(types.Message).MessageId,
Id: msgId,
ReceiptHandle: message.Msg.(types.Message).ReceiptHandle,
})
}

_, err := c.svc.DeleteMessageBatch(context.Background(), &awsSqs.DeleteMessageBatchInput{
Entries: deleteEntries,
QueueUrl: c.receiveMessageInput.QueueUrl,
})
processed[*msgId] = true
}

return err
return deleteEntries
}

func (c sqsClient) createMessage(sqsMessage types.Message) messages.Message {
Expand Down

0 comments on commit 1df7df7

Please sign in to comment.