Skip to content

Commit

Permalink
Race Conditions (#11)
Browse files Browse the repository at this point in the history
* fix: send the failed video to the dead letter exchange

* fix: rollback manually send to dead letter exchange

the bug was in the rabbitMQ configuration not in the application

* fix: preventing race conditions

---------

Co-authored-by: Lucas Martins dos Santos <msantos.lms@pf.gov.br>
  • Loading branch information
olukkas and Lucas Martins dos Santos committed Jan 23, 2024
1 parent 7427c11 commit 3b0697a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 0 deletions.
3 changes: 3 additions & 0 deletions application/services/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ func (j *JobManager) Start() {
}

func (j *JobManager) notifySuccess(result JobWorkerResult) error {
Mutex.Lock()
jobJson, err := json.Marshal(result.Job)
Mutex.Unlock()

if err != nil {
return err
}
Expand Down
9 changes: 9 additions & 0 deletions application/services/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
uuid "github.com/satori/go.uuid"
"github.com/streadway/amqp"
"os"
"sync"
)

type JobWorkerResult struct {
Expand All @@ -16,15 +17,19 @@ type JobWorkerResult struct {
Error error
}

var Mutex = &sync.Mutex{}

func JobWorker(messageChannel chan amqp.Delivery, returnChan chan JobWorkerResult, jobService JobService) {
for message := range messageChannel {
if !utils.IsJson(string(message.Body)) {
returnChan <- returnJobResult(domain.Job{}, message, errors.New("message is not a json"))
continue
}

Mutex.Lock()
err := json.Unmarshal(message.Body, &jobService.VideoService.Video)
jobService.VideoService.Video.ID = uuid.NewV4().String()
Mutex.Unlock()

if err != nil {
returnChan <- returnJobResult(domain.Job{}, message, err)
Expand All @@ -37,7 +42,9 @@ func JobWorker(messageChannel chan amqp.Delivery, returnChan chan JobWorkerResul
continue
}

Mutex.Lock()
err = jobService.VideoService.InsertVideo()
Mutex.Unlock()

if err != nil {
returnChan <- returnJobResult(domain.Job{}, message, err)
Expand All @@ -51,7 +58,9 @@ func JobWorker(messageChannel chan amqp.Delivery, returnChan chan JobWorkerResul
continue
}

Mutex.Lock()
_, err = jobService.JobsRepository.Insert(job)
Mutex.Unlock()

if err != nil {
returnChan <- returnJobResult(domain.Job{}, message, err)
Expand Down

0 comments on commit 3b0697a

Please sign in to comment.