diff --git a/application/services/job_manager.go b/application/services/job_manager.go index cf90621..fdaca7f 100644 --- a/application/services/job_manager.go +++ b/application/services/job_manager.go @@ -70,7 +70,10 @@ func (j *JobManager) Start() { } func (j *JobManager) notifySuccess(result JobWorkerResult) error { + Mutex.Lock() jobJson, err := json.Marshal(result.Job) + Mutex.Unlock() + if err != nil { return err } diff --git a/application/services/job_worker.go b/application/services/job_worker.go index 3a5d99c..629ca4b 100644 --- a/application/services/job_worker.go +++ b/application/services/job_worker.go @@ -8,6 +8,7 @@ import ( uuid "github.com/satori/go.uuid" "github.com/streadway/amqp" "os" + "sync" ) type JobWorkerResult struct { @@ -16,6 +17,8 @@ type JobWorkerResult struct { Error error } +var Mutex = &sync.Mutex{} + func JobWorker(messageChannel chan amqp.Delivery, returnChan chan JobWorkerResult, jobService JobService) { for message := range messageChannel { if !utils.IsJson(string(message.Body)) { @@ -23,8 +26,10 @@ func JobWorker(messageChannel chan amqp.Delivery, returnChan chan JobWorkerResul continue } + Mutex.Lock() err := json.Unmarshal(message.Body, &jobService.VideoService.Video) jobService.VideoService.Video.ID = uuid.NewV4().String() + Mutex.Unlock() if err != nil { returnChan <- returnJobResult(domain.Job{}, message, err) @@ -37,7 +42,9 @@ func JobWorker(messageChannel chan amqp.Delivery, returnChan chan JobWorkerResul continue } + Mutex.Lock() err = jobService.VideoService.InsertVideo() + Mutex.Unlock() if err != nil { returnChan <- returnJobResult(domain.Job{}, message, err) @@ -51,7 +58,9 @@ func JobWorker(messageChannel chan amqp.Delivery, returnChan chan JobWorkerResul continue } + Mutex.Lock() _, err = jobService.JobsRepository.Insert(job) + Mutex.Unlock() if err != nil { returnChan <- returnJobResult(domain.Job{}, message, err)