diff --git a/app/gosqs/change_message_visibility.go b/app/gosqs/change_message_visibility.go index f590e53e..39c920d3 100644 --- a/app/gosqs/change_message_visibility.go +++ b/app/gosqs/change_message_visibility.go @@ -56,10 +56,9 @@ func ChangeMessageVisibilityV1(req *http.Request) (int, interfaces.AbstractRespo msgs[i].Retry++ if queue.MaxReceiveCount > 0 && queue.DeadLetterQueue != nil && - msgs[i].Retry > queue.MaxReceiveCount { + msgs[i].Retry >= queue.MaxReceiveCount { queue.DeadLetterQueue.Messages = append(queue.DeadLetterQueue.Messages, msgs[i]) queue.Messages = append(queue.Messages[:i], queue.Messages[i+1:]...) - i++ } } else { msgs[i].VisibilityTimeout = time.Now().Add(time.Duration(visibilityTimeout) * time.Second) diff --git a/app/gosqs/gosqs.go b/app/gosqs/gosqs.go index baf1667f..bb80a8ab 100644 --- a/app/gosqs/gosqs.go +++ b/app/gosqs/gosqs.go @@ -43,10 +43,10 @@ func PeriodicTasks(d time.Duration, quit <-chan struct{}) { msg.Retry++ if queue.MaxReceiveCount > 0 && queue.DeadLetterQueue != nil && - msg.Retry > queue.MaxReceiveCount { + msg.Retry >= queue.MaxReceiveCount { queue.DeadLetterQueue.Messages = append(queue.DeadLetterQueue.Messages, *msg) queue.Messages = append(queue.Messages[:i], queue.Messages[i+1:]...) - i++ + i-- } } } diff --git a/app/gosqs/gosqs_test.go b/app/gosqs/gosqs_test.go index 35a15bf2..5b8582d4 100644 --- a/app/gosqs/gosqs_test.go +++ b/app/gosqs/gosqs_test.go @@ -242,7 +242,7 @@ func TestDeadLetterQueue(t *testing.T) { form.Add("Attribute.1.Name", "VisibilityTimeout") form.Add("Attribute.1.Value", "1") form.Add("Attribute.2.Name", "RedrivePolicy") - form.Add("Attribute.2.Value", `{"maxReceiveCount": 1, "deadLetterTargetArn":"arn:aws:sqs::000000000000:failed-messages"}`) + form.Add("Attribute.2.Value", `{"maxReceiveCount": 2, "deadLetterTargetArn":"arn:aws:sqs::000000000000:failed-messages"}`) form.Add("Version", "2012-11-05") req.PostForm = form @@ -267,6 +267,9 @@ func TestDeadLetterQueue(t *testing.T) { t.Errorf("handler returned wrong status code: got \n%v want %v", status, http.StatusOK) } + if len(models.SyncQueues.Queues["testing-deadletter"].Messages) != 1 { + t.Fatal("expected a message in testing-deadletter") + } // receive message req, err = http.NewRequest("POST", "/", nil) @@ -285,6 +288,13 @@ func TestDeadLetterQueue(t *testing.T) { time.Sleep(2 * time.Second) + if len(models.SyncQueues.Queues["testing-deadletter"].Messages) != 1 { + t.Fatal("expected message in testing-deadletter after 1 receive attempt") + } + if len(deadLetterQueue.Messages) > 0 { + t.Fatal("expected no message in DLQ") + } + // receive the message one more time req, err = http.NewRequest("POST", "/", nil) if err != nil { @@ -295,20 +305,102 @@ func TestDeadLetterQueue(t *testing.T) { status, _ = ReceiveMessageV1(req) assert.Equal(t, status, http.StatusOK) + + // wait for messages to be moved to DLQ and stop the periodic tasks to prevent data races. time.Sleep(2 * time.Second) + done <- struct{}{} + + if len(models.SyncQueues.Queues["testing-deadletter"].Messages) != 0 { + t.Fatal("expected no message in testing-deadletter") + } + if len(deadLetterQueue.Messages) == 0 { + t.Fatal("expected a message in DLQ") + } +} + +func TestDeadLetterQueueMultiple(t *testing.T) { + done := make(chan struct{}, 0) + go PeriodicTasks(1*time.Second, done) + + // create a queue + req, err := http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + deadLetterQueue := &models.Queue{ + Name: "failed-messages-multiple", + Messages: []models.SqsMessage{}, + } + models.SyncQueues.Lock() + models.SyncQueues.Queues["failed-messages-multiple"] = deadLetterQueue + models.SyncQueues.Unlock() + form := url.Values{} + form.Add("Action", "CreateQueue") + form.Add("QueueName", "testing-deadletter-multiple") + form.Add("Attribute.1.Name", "VisibilityTimeout") + form.Add("Attribute.1.Value", "1") + form.Add("Attribute.2.Name", "RedrivePolicy") + form.Add("Attribute.2.Value", `{"maxReceiveCount": 1, "deadLetterTargetArn":"arn:aws:sqs::000000000000:failed-messages-multiple"}`) + form.Add("Version", "2012-11-05") + req.PostForm = form - // another receive attempt + status, _ := CreateQueueV1(req) + assert.Equal(t, status, http.StatusOK) + + // send 2 messages req, err = http.NewRequest("POST", "/", nil) if err != nil { t.Fatal(err) } + form = url.Values{} + form.Add("Action", "SendMessage") + form.Add("QueueUrl", "http://localhost:4100/queue/testing-deadletter-multiple") + form.Add("MessageBody", "1") + form.Add("Version", "2012-11-05") + req.PostForm = form + + status, _ = SendMessageV1(req) + if status != http.StatusOK { + t.Errorf("handler returned wrong status code: got \n%v want %v", + status, http.StatusOK) + } + status, _ = SendMessageV1(req) + if status != http.StatusOK { + t.Errorf("handler returned wrong status code: got \n%v want %v", + status, http.StatusOK) + } + + if len(models.SyncQueues.Queues["testing-deadletter-multiple"].Messages) != 2 { + t.Fatal("expected 2 messages in testing-deadletter-multiple") + } + + // receive messages + req, err = http.NewRequest("POST", "/", nil) + if err != nil { + t.Fatal(err) + } + + form = url.Values{} + form.Add("Action", "ReceiveMessage") + form.Add("QueueUrl", "http://localhost:4100/queue/testing-deadletter-multiple") + form.Add("MaxNumberOfMessages", "2") + form.Add("Version", "2012-11-05") req.PostForm = form status, _ = ReceiveMessageV1(req) assert.Equal(t, status, http.StatusOK) - if len(deadLetterQueue.Messages) == 0 { - t.Fatal("expected a message") + + // wait for messages to be moved to DLQ and stop the periodic tasks to prevent data races. + time.Sleep(3 * time.Second) + done <- struct{}{} + + numMessages := len(models.SyncQueues.Queues["testing-deadletter-multiple"].Messages) + if numMessages != 0 { + t.Fatalf("expected no messages in testing-deadletter-multiple, found: %d", numMessages) + } + if len(deadLetterQueue.Messages) != 2 { + t.Fatal("expected 2 messages in DLQ") } }