Skip to content

Commit cda9b09

Browse files
committed
fix: send message to DLQ after correct number of attempts
Closes: #263
1 parent f58f9b0 commit cda9b09

File tree

3 files changed

+99
-8
lines changed

3 files changed

+99
-8
lines changed

app/gosqs/change_message_visibility.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,9 @@ func ChangeMessageVisibilityV1(req *http.Request) (int, interfaces.AbstractRespo
5656
msgs[i].Retry++
5757
if queue.MaxReceiveCount > 0 &&
5858
queue.DeadLetterQueue != nil &&
59-
msgs[i].Retry > queue.MaxReceiveCount {
59+
msgs[i].Retry >= queue.MaxReceiveCount {
6060
queue.DeadLetterQueue.Messages = append(queue.DeadLetterQueue.Messages, msgs[i])
6161
queue.Messages = append(queue.Messages[:i], queue.Messages[i+1:]...)
62-
i++
6362
}
6463
} else {
6564
msgs[i].VisibilityTimeout = time.Now().Add(time.Duration(visibilityTimeout) * time.Second)

app/gosqs/gosqs.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ func PeriodicTasks(d time.Duration, quit <-chan struct{}) {
4343
msg.Retry++
4444
if queue.MaxReceiveCount > 0 &&
4545
queue.DeadLetterQueue != nil &&
46-
msg.Retry > queue.MaxReceiveCount {
46+
msg.Retry >= queue.MaxReceiveCount {
4747
queue.DeadLetterQueue.Messages = append(queue.DeadLetterQueue.Messages, *msg)
4848
queue.Messages = append(queue.Messages[:i], queue.Messages[i+1:]...)
49-
i++
49+
i--
5050
}
5151
}
5252
}

app/gosqs/gosqs_test.go

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func TestDeadLetterQueue(t *testing.T) {
242242
form.Add("Attribute.1.Name", "VisibilityTimeout")
243243
form.Add("Attribute.1.Value", "1")
244244
form.Add("Attribute.2.Name", "RedrivePolicy")
245-
form.Add("Attribute.2.Value", `{"maxReceiveCount": 1, "deadLetterTargetArn":"arn:aws:sqs::000000000000:failed-messages"}`)
245+
form.Add("Attribute.2.Value", `{"maxReceiveCount": 2, "deadLetterTargetArn":"arn:aws:sqs::000000000000:failed-messages"}`)
246246
form.Add("Version", "2012-11-05")
247247
req.PostForm = form
248248

@@ -267,6 +267,9 @@ func TestDeadLetterQueue(t *testing.T) {
267267
t.Errorf("handler returned wrong status code: got \n%v want %v",
268268
status, http.StatusOK)
269269
}
270+
if len(models.SyncQueues.Queues["testing-deadletter"].Messages) != 1 {
271+
t.Fatal("expected a message in testing-deadletter")
272+
}
270273

271274
// receive message
272275
req, err = http.NewRequest("POST", "/", nil)
@@ -285,6 +288,13 @@ func TestDeadLetterQueue(t *testing.T) {
285288

286289
time.Sleep(2 * time.Second)
287290

291+
if len(models.SyncQueues.Queues["testing-deadletter"].Messages) != 1 {
292+
t.Fatal("expected message in testing-deadletter after 1 receive attempt")
293+
}
294+
if len(deadLetterQueue.Messages) > 0 {
295+
t.Fatal("expected no message in DLQ")
296+
}
297+
288298
// receive the message one more time
289299
req, err = http.NewRequest("POST", "/", nil)
290300
if err != nil {
@@ -295,20 +305,102 @@ func TestDeadLetterQueue(t *testing.T) {
295305

296306
status, _ = ReceiveMessageV1(req)
297307
assert.Equal(t, status, http.StatusOK)
308+
309+
// wait for messages to be moved to DLQ and stop the periodic tasks to prevent data races.
298310
time.Sleep(2 * time.Second)
311+
done <- struct{}{}
312+
313+
if len(models.SyncQueues.Queues["testing-deadletter"].Messages) != 0 {
314+
t.Fatal("expected no message in testing-deadletter")
315+
}
316+
if len(deadLetterQueue.Messages) == 0 {
317+
t.Fatal("expected a message in DLQ")
318+
}
319+
}
320+
321+
func TestDeadLetterQueueMultiple(t *testing.T) {
322+
done := make(chan struct{}, 0)
323+
go PeriodicTasks(1*time.Second, done)
324+
325+
// create a queue
326+
req, err := http.NewRequest("POST", "/", nil)
327+
if err != nil {
328+
t.Fatal(err)
329+
}
330+
deadLetterQueue := &models.Queue{
331+
Name: "failed-messages-multiple",
332+
Messages: []models.SqsMessage{},
333+
}
334+
models.SyncQueues.Lock()
335+
models.SyncQueues.Queues["failed-messages-multiple"] = deadLetterQueue
336+
models.SyncQueues.Unlock()
337+
form := url.Values{}
338+
form.Add("Action", "CreateQueue")
339+
form.Add("QueueName", "testing-deadletter-multiple")
340+
form.Add("Attribute.1.Name", "VisibilityTimeout")
341+
form.Add("Attribute.1.Value", "1")
342+
form.Add("Attribute.2.Name", "RedrivePolicy")
343+
form.Add("Attribute.2.Value", `{"maxReceiveCount": 1, "deadLetterTargetArn":"arn:aws:sqs::000000000000:failed-messages-multiple"}`)
344+
form.Add("Version", "2012-11-05")
345+
req.PostForm = form
299346

300-
// another receive attempt
347+
status, _ := CreateQueueV1(req)
348+
assert.Equal(t, status, http.StatusOK)
349+
350+
// send 2 messages
301351
req, err = http.NewRequest("POST", "/", nil)
302352
if err != nil {
303353
t.Fatal(err)
304354
}
305355

356+
form = url.Values{}
357+
form.Add("Action", "SendMessage")
358+
form.Add("QueueUrl", "http://localhost:4100/queue/testing-deadletter-multiple")
359+
form.Add("MessageBody", "1")
360+
form.Add("Version", "2012-11-05")
361+
req.PostForm = form
362+
363+
status, _ = SendMessageV1(req)
364+
if status != http.StatusOK {
365+
t.Errorf("handler returned wrong status code: got \n%v want %v",
366+
status, http.StatusOK)
367+
}
368+
status, _ = SendMessageV1(req)
369+
if status != http.StatusOK {
370+
t.Errorf("handler returned wrong status code: got \n%v want %v",
371+
status, http.StatusOK)
372+
}
373+
374+
if len(models.SyncQueues.Queues["testing-deadletter-multiple"].Messages) != 2 {
375+
t.Fatal("expected 2 messages in testing-deadletter-multiple")
376+
}
377+
378+
// receive messages
379+
req, err = http.NewRequest("POST", "/", nil)
380+
if err != nil {
381+
t.Fatal(err)
382+
}
383+
384+
form = url.Values{}
385+
form.Add("Action", "ReceiveMessage")
386+
form.Add("QueueUrl", "http://localhost:4100/queue/testing-deadletter-multiple")
387+
form.Add("MaxNumberOfMessages", "2")
388+
form.Add("Version", "2012-11-05")
306389
req.PostForm = form
307390

308391
status, _ = ReceiveMessageV1(req)
309392
assert.Equal(t, status, http.StatusOK)
310-
if len(deadLetterQueue.Messages) == 0 {
311-
t.Fatal("expected a message")
393+
394+
// wait for messages to be moved to DLQ and stop the periodic tasks to prevent data races.
395+
time.Sleep(3 * time.Second)
396+
done <- struct{}{}
397+
398+
numMessages := len(models.SyncQueues.Queues["testing-deadletter-multiple"].Messages)
399+
if numMessages != 0 {
400+
t.Fatalf("expected no messages in testing-deadletter-multiple, found: %d", numMessages)
401+
}
402+
if len(deadLetterQueue.Messages) != 2 {
403+
t.Fatal("expected 2 messages in DLQ")
312404
}
313405
}
314406

0 commit comments

Comments
 (0)