From 26e1a25ec40ba2a27ba6831f091d3f453056abbc Mon Sep 17 00:00:00 2001 From: Lucas Martins dos Santos Date: Tue, 23 Jan 2024 11:17:42 -0300 Subject: [PATCH 1/4] fix: send the failed video to the dead letter exchange --- application/services/job_manager.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/application/services/job_manager.go b/application/services/job_manager.go index cf90621..5c0463c 100644 --- a/application/services/job_manager.go +++ b/application/services/job_manager.go @@ -75,7 +75,7 @@ func (j *JobManager) notifySuccess(result JobWorkerResult) error { return err } - err = j.notify(jobJson) + err = j.notify(jobJson, os.Getenv("RABBITMQ_NOTIFICATION_EX")) if err != nil { return err } @@ -106,7 +106,7 @@ func (j *JobManager) checkParseErrors(result JobWorkerResult) error { return err } - err = j.notify(jobJson) + err = j.notify(jobJson, os.Getenv("RABBITMQ_DLX")) if err != nil { return err } @@ -118,11 +118,11 @@ func (j *JobManager) checkParseErrors(result JobWorkerResult) error { return nil } -func (j *JobManager) notify(jobJson []byte) error { +func (j *JobManager) notify(jobJson []byte, exchange string) error { return j.RabbitMQ.Notify( string(jobJson), "application/json", - os.Getenv("RABBITMQ_NOTIFICATION_EX"), + exchange, os.Getenv("RABBITMQ_NOTIFICATION_ROUTING_KEY"), ) } From b77afb548bf49d092f095fa728eb597f257eb345 Mon Sep 17 00:00:00 2001 From: Lucas Martins dos Santos Date: Tue, 23 Jan 2024 11:33:24 -0300 Subject: [PATCH 2/4] fix: rollback manually send to dead letter exchange the bug was in the rabbitMQ configuration not in the application --- application/services/job_manager.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/application/services/job_manager.go b/application/services/job_manager.go index 5c0463c..cf90621 100644 --- a/application/services/job_manager.go +++ b/application/services/job_manager.go @@ -75,7 +75,7 @@ func (j *JobManager) notifySuccess(result JobWorkerResult) error { return err } - err = j.notify(jobJson, os.Getenv("RABBITMQ_NOTIFICATION_EX")) + err = j.notify(jobJson) if err != nil { return err } @@ -106,7 +106,7 @@ func (j *JobManager) checkParseErrors(result JobWorkerResult) error { return err } - err = j.notify(jobJson, os.Getenv("RABBITMQ_DLX")) + err = j.notify(jobJson) if err != nil { return err } @@ -118,11 +118,11 @@ func (j *JobManager) checkParseErrors(result JobWorkerResult) error { return nil } -func (j *JobManager) notify(jobJson []byte, exchange string) error { +func (j *JobManager) notify(jobJson []byte) error { return j.RabbitMQ.Notify( string(jobJson), "application/json", - exchange, + os.Getenv("RABBITMQ_NOTIFICATION_EX"), os.Getenv("RABBITMQ_NOTIFICATION_ROUTING_KEY"), ) } From 9fcc4cbe7e450890d7ec9e12e5edbff0e50be101 Mon Sep 17 00:00:00 2001 From: Lucas Martins dos Santos Date: Tue, 23 Jan 2024 11:46:02 -0300 Subject: [PATCH 3/4] fix: preventing race conditions --- application/services/job_manager.go | 3 +++ application/services/job_worker.go | 9 +++++++++ 2 files changed, 12 insertions(+) diff --git a/application/services/job_manager.go b/application/services/job_manager.go index cf90621..fdaca7f 100644 --- a/application/services/job_manager.go +++ b/application/services/job_manager.go @@ -70,7 +70,10 @@ func (j *JobManager) Start() { } func (j *JobManager) notifySuccess(result JobWorkerResult) error { + Mutex.Lock() jobJson, err := json.Marshal(result.Job) + Mutex.Unlock() + if err != nil { return err } diff --git a/application/services/job_worker.go b/application/services/job_worker.go index 3a5d99c..629ca4b 100644 --- a/application/services/job_worker.go +++ b/application/services/job_worker.go @@ -8,6 +8,7 @@ import ( uuid "github.com/satori/go.uuid" "github.com/streadway/amqp" "os" + "sync" ) type JobWorkerResult struct { @@ -16,6 +17,8 @@ type JobWorkerResult struct { Error error } +var Mutex = &sync.Mutex{} + func JobWorker(messageChannel chan amqp.Delivery, returnChan chan JobWorkerResult, jobService JobService) { for message := range messageChannel { if !utils.IsJson(string(message.Body)) { @@ -23,8 +26,10 @@ func JobWorker(messageChannel chan amqp.Delivery, returnChan chan JobWorkerResul continue } + Mutex.Lock() err := json.Unmarshal(message.Body, &jobService.VideoService.Video) jobService.VideoService.Video.ID = uuid.NewV4().String() + Mutex.Unlock() if err != nil { returnChan <- returnJobResult(domain.Job{}, message, err) @@ -37,7 +42,9 @@ func JobWorker(messageChannel chan amqp.Delivery, returnChan chan JobWorkerResul continue } + Mutex.Lock() err = jobService.VideoService.InsertVideo() + Mutex.Unlock() if err != nil { returnChan <- returnJobResult(domain.Job{}, message, err) @@ -51,7 +58,9 @@ func JobWorker(messageChannel chan amqp.Delivery, returnChan chan JobWorkerResul continue } + Mutex.Lock() _, err = jobService.JobsRepository.Insert(job) + Mutex.Unlock() if err != nil { returnChan <- returnJobResult(domain.Job{}, message, err) From a6e6b1c10c841bc0a5f1936cb781b24977613962 Mon Sep 17 00:00:00 2001 From: Lucas Martins dos Santos Date: Tue, 23 Jan 2024 22:37:48 -0300 Subject: [PATCH 4/4] refactor: remove gorm ORM and adopt raw sql instead --- application/repositories/job_repository.go | 64 ++++++++++++++--- .../repositories/job_repository_test.go | 20 ++++-- application/repositories/video_repository.go | 59 +++++++++++++--- .../repositories/video_repository_test.go | 46 +++++++++++++ application/services/job_manager.go | 6 +- application/services/video_service_test.go | 69 ++++++++++++++++--- domain/job.go | 4 +- domain/video.go | 4 +- framework/cmd/server/server.go | 13 ---- framework/database/db.go | 41 +++-------- 10 files changed, 244 insertions(+), 82 deletions(-) diff --git a/application/repositories/job_repository.go b/application/repositories/job_repository.go index 3511102..e29f3bd 100644 --- a/application/repositories/job_repository.go +++ b/application/repositories/job_repository.go @@ -1,8 +1,7 @@ package repositories import ( - "fmt" - "github.com/jinzhu/gorm" + "database/sql" "github.com/olukkas/go-encoder/domain" ) @@ -13,37 +12,84 @@ type JobRepository interface { } type JobRepositoryDb struct { - DB *gorm.DB + DB *sql.DB } -func NewJobRepositoryDb(DB *gorm.DB) *JobRepositoryDb { +func NewJobRepositoryDb(DB *sql.DB) *JobRepositoryDb { return &JobRepositoryDb{DB: DB} } +//goland:noinspection SqlResolve,SqlNoDataSourceInspection func (j *JobRepositoryDb) Insert(job *domain.Job) (*domain.Job, error) { - if err := j.DB.Create(job).Error; err != nil { + query := ` + insert into jobs + (id, output_bucket_path, status, video_id, error, created_at, updated_at) + values + ($1, $2, $3, $4, $5, $6, $7) + ` + stmt, err := j.DB.Prepare(query) + if err != nil { + return nil, err + } + defer stmt.Close() + + _, err = stmt.Exec(job.ID, job.OutputBucketPath, job.Status, job.VideoId, job.Error, job.CreatedAt, job.UpdatedAt) + if err != nil { return nil, err } return job, nil } +//goland:noinspection SqlResolve,SqlNoDataSourceInspection func (j *JobRepositoryDb) Find(id string) (*domain.Job, error) { job := new(domain.Job) - j.DB.Preload("Video").First(job, "id = ?", id) - if job.ID == "" { - return nil, fmt.Errorf("job with id %s does not exists \n", id) + query := `select id, output_bucket_path, status, video_id, error, created_at, updated_at + from jobs + where id = ? + ` + err := j.DB.QueryRow(query, id). + Scan(&job.ID, &job.OutputBucketPath, &job.Status, &job.VideoId, &job.Error, &job.CreatedAt, &job.UpdatedAt) + if err != nil { + return nil, err + } + + job.Video, err = j.getJobVideo(job.VideoId) + if err != nil { + return nil, err } return job, nil } +//goland:noinspection SqlResolve,SqlNoDataSourceInspection func (j *JobRepositoryDb) Update(job *domain.Job) (*domain.Job, error) { - err := j.DB.Save(&job).Error + query := `update jobs set status = $1, updated_at = $2, error = $3 where id = $4 ` + + stmt, err := j.DB.Prepare(query) + if err != nil { + return nil, err + } + defer stmt.Close() + + _, err = stmt.Exec(job.Status, job.UpdatedAt, job.Error, job.ID) if err != nil { return nil, err } return job, nil } + +//goland:noinspection SqlResolve,SqlNoDataSourceInspection +func (j *JobRepositoryDb) getJobVideo(videoId string) (*domain.Video, error) { + var video domain.Video + + videoQuery := `select id, resource_id, file_path, created_at from videos where id = ?` + err := j.DB.QueryRow(videoQuery, videoId).Scan(&video.ID, &video.ResourceID, &video.FilePath, &video.CreatedAt) + if err != nil { + return nil, err + } + + return &video, nil +} diff --git a/application/repositories/job_repository_test.go b/application/repositories/job_repository_test.go index d9b2c30..a31f47d 100644 --- a/application/repositories/job_repository_test.go +++ b/application/repositories/job_repository_test.go @@ -1,7 +1,7 @@ package repositories_test import ( - "github.com/jinzhu/gorm" + "database/sql" "github.com/olukkas/go-encoder/application/repositories" "github.com/olukkas/go-encoder/domain" "github.com/olukkas/go-encoder/framework/database" @@ -13,11 +13,17 @@ func TestJobRepositoryDb_Insert(t *testing.T) { db := database.NewDataBaseTest() defer db.Close() + err := createVideosTable(db) + require.Nil(t, err) + + err = createJobsTable(db) + require.Nil(t, err) + jobRepo := repositories.NewJobRepositoryDb(db) job := prepareJobHelper(t, db) - _, err := jobRepo.Insert(job) + _, err = jobRepo.Insert(job) require.Nil(t, err) j, err := jobRepo.Find(job.ID) @@ -31,10 +37,16 @@ func TestJobRepositoryDb_Update(t *testing.T) { db := database.NewDataBaseTest() defer db.Close() + err := createVideosTable(db) + require.Nil(t, err) + + err = createJobsTable(db) + require.Nil(t, err) + jobRepo := repositories.NewJobRepositoryDb(db) job := prepareJobHelper(t, db) - _, err := jobRepo.Insert(job) + _, err = jobRepo.Insert(job) require.Nil(t, err) job.Status = domain.JobDownloading @@ -43,7 +55,7 @@ func TestJobRepositoryDb_Update(t *testing.T) { require.Equal(t, domain.JobDownloading, updated.Status) } -func prepareJobHelper(t *testing.T, db *gorm.DB) *domain.Job { +func prepareJobHelper(t *testing.T, db *sql.DB) *domain.Job { video, err := domain.NewVideo("resource", "path") require.Nil(t, err) require.NotNil(t, video) diff --git a/application/repositories/video_repository.go b/application/repositories/video_repository.go index fa1606a..338276f 100644 --- a/application/repositories/video_repository.go +++ b/application/repositories/video_repository.go @@ -1,8 +1,7 @@ package repositories import ( - "fmt" - "github.com/jinzhu/gorm" + "database/sql" "github.com/olukkas/go-encoder/domain" uuid "github.com/satori/go.uuid" ) @@ -13,19 +12,27 @@ type VideoRepository interface { } type VideoRepositoryDb struct { - DB *gorm.DB + DB *sql.DB } -func NewVideoRepositoryDb(DB *gorm.DB) *VideoRepositoryDb { +func NewVideoRepositoryDb(DB *sql.DB) *VideoRepositoryDb { return &VideoRepositoryDb{DB: DB} } +//goland:noinspection SqlNoDataSourceInspection,SqlResolve func (v *VideoRepositoryDb) Insert(video *domain.Video) (*domain.Video, error) { if video.ID == "" { video.ID = uuid.NewV4().String() } - err := v.DB.Create(video).Error + query := "insert into videos (id, resource_id, file_path, created_at) values ($1, $2, $3, $4);" + stmt, err := v.DB.Prepare(query) + if err != nil { + return nil, err + } + defer stmt.Close() + + _, err = stmt.Exec(video.ID, video.ResourceID, video.FilePath, video.CreatedAt) if err != nil { return nil, err } @@ -33,13 +40,49 @@ func (v *VideoRepositoryDb) Insert(video *domain.Video) (*domain.Video, error) { return video, nil } +//goland:noinspection SqlResolve,SqlNoDataSourceInspection func (v *VideoRepositoryDb) Find(id string) (*domain.Video, error) { var video domain.Video - v.DB.Preloads("Jobs").First(&video, "id = ?", id) - if video.ID == "" { - return nil, fmt.Errorf("video with id %s does not exists \n", id) + videoQuery := `select id, resource_id, file_path, created_at from videos where id = ?` + err := v.DB.QueryRow(videoQuery, id).Scan(&video.ID, &video.ResourceID, &video.FilePath, &video.CreatedAt) + if err != nil { + return nil, err + } + + video.Jobs, err = v.getVideoJobs(&video) + if err != nil { + return nil, err } return &video, nil } + +//goland:noinspection SqlNoDataSourceInspection,GoConvertStringLiterals,SqlResolve +func (v *VideoRepositoryDb) getVideoJobs(video *domain.Video) ([]*domain.Job, error) { + var jobs []*domain.Job + + jobsQuery := ` + select id, output_bucket_path, status, video_id, error, created_at, updated_at + from jobs where video_id = ? + ` + + rows, err := v.DB.Query(jobsQuery, video.ID) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var job domain.Job + err = rows.Scan(&job.ID, &job.OutputBucketPath, &job.Status, &job.Error, &job.CreatedAt, &job.UpdatedAt) + if err != nil { + return nil, err + } + + job.Video = video + jobs = append(jobs, &job) + } + + return jobs, err +} diff --git a/application/repositories/video_repository_test.go b/application/repositories/video_repository_test.go index c4f54fd..8fbb2a2 100644 --- a/application/repositories/video_repository_test.go +++ b/application/repositories/video_repository_test.go @@ -1,6 +1,7 @@ package repositories_test import ( + "database/sql" "github.com/olukkas/go-encoder/application/repositories" "github.com/olukkas/go-encoder/domain" "github.com/olukkas/go-encoder/framework/database" @@ -12,6 +13,12 @@ func TestVideoRepositoryDB_Insert(t *testing.T) { db := database.NewDataBaseTest() defer db.Close() + err := createVideosTable(db) + require.Nil(t, err) + + err = createJobsTable(db) + require.Nil(t, err) + video, err := domain.NewVideo("resource", "path") require.Nil(t, err) require.NotNil(t, video) @@ -25,3 +32,42 @@ func TestVideoRepositoryDB_Insert(t *testing.T) { require.NotEmpty(t, v.ID) require.Equal(t, v.ID, video.ID) } + +//goland:noinspection SqlNoDataSourceInspection +func createVideosTable(db *sql.DB) error { + ddl := ` + create table videos ( + id text, + resource_id text, + file_path text, + created_at date + ) + ` + _, err := db.Exec(ddl) + if err != nil { + return err + } + + return nil +} + +//goland:noinspection SqlNoDataSourceInspection +func createJobsTable(db *sql.DB) error { + ddl := ` + create table jobs ( + id text, + output_bucket_path text, + status text, + video_id text, + error text, + created_at date, + updated_at date + ) + ` + _, err := db.Exec(ddl) + if err != nil { + return err + } + + return nil +} diff --git a/application/services/job_manager.go b/application/services/job_manager.go index fdaca7f..bf18b3a 100644 --- a/application/services/job_manager.go +++ b/application/services/job_manager.go @@ -1,8 +1,8 @@ package services import ( + "database/sql" "encoding/json" - "github.com/jinzhu/gorm" "github.com/olukkas/go-encoder/application/repositories" "github.com/olukkas/go-encoder/framework/queue" "github.com/streadway/amqp" @@ -12,7 +12,7 @@ import ( ) type JobManager struct { - DB *gorm.DB + DB *sql.DB MessageChannel chan amqp.Delivery ReturnChannel chan JobWorkerResult RabbitMQ *queue.RabbitMQ @@ -24,7 +24,7 @@ type JobNotificationError struct { } func NewJobManager( - DB *gorm.DB, + DB *sql.DB, rabbitMq *queue.RabbitMQ, returnChannel chan JobWorkerResult, messageChannel chan amqp.Delivery, diff --git a/application/services/video_service_test.go b/application/services/video_service_test.go index 50aa303..7b5f396 100644 --- a/application/services/video_service_test.go +++ b/application/services/video_service_test.go @@ -1,11 +1,13 @@ package services_test import ( + "database/sql" "github.com/joho/godotenv" "github.com/olukkas/go-encoder/application/repositories" "github.com/olukkas/go-encoder/application/services" "github.com/olukkas/go-encoder/domain" "github.com/olukkas/go-encoder/framework/database" + "github.com/olukkas/go-encoder/framework/utils" uuid "github.com/satori/go.uuid" "github.com/stretchr/testify/require" "log" @@ -19,17 +21,6 @@ func init() { } } -func prepare() (*domain.Video, repositories.VideoRepository) { - db := database.NewDataBaseTest() - defer db.Close() - - return &domain.Video{ - ID: uuid.NewV4().String(), - FilePath: "file.mp4", // make sure the file exists - CreatedAt: time.Now(), - }, repositories.NewVideoRepositoryDb(db) -} - func TestVideoService(t *testing.T) { video, repo := prepare() @@ -49,3 +40,59 @@ func TestVideoService(t *testing.T) { err = videoService.Finish() require.Nil(t, err) } + +func prepare() (*domain.Video, repositories.VideoRepository) { + db := database.NewDataBaseTest() + defer db.Close() + + err := createVideosTable(db) + utils.FailOnError(err, "error on create videos table") + + err = createJobsTable(db) + utils.FailOnError(err, "error on create jobs table") + + return &domain.Video{ + ID: uuid.NewV4().String(), + FilePath: "file.mp4", // make sure the file exists + CreatedAt: time.Now(), + }, repositories.NewVideoRepositoryDb(db) +} + +//goland:noinspection SqlNoDataSourceInspection +func createVideosTable(db *sql.DB) error { + ddl := ` + create table videos ( + id text, + resource_id text, + file_path text, + created_at date + ) + ` + _, err := db.Exec(ddl) + if err != nil { + return err + } + + return nil +} + +//goland:noinspection SqlNoDataSourceInspection +func createJobsTable(db *sql.DB) error { + ddl := ` + create table jobs ( + id text, + output_bucket_path text, + status text, + video_id text, + error text, + created_at date, + updated_at date + ) + ` + _, err := db.Exec(ddl) + if err != nil { + return err + } + + return nil +} diff --git a/domain/job.go b/domain/job.go index 08bb57f..847ac60 100644 --- a/domain/job.go +++ b/domain/job.go @@ -25,11 +25,11 @@ const ( ) type Job struct { - ID string `json:"job_id" valid:"uuid" gorm:"type:uuid;primary_key"` + ID string `json:"job_id" valid:"uuid"` OutputBucketPath string `json:"output_bucket_path" valid:"notnull"` Status JobStatus `json:"status" valid:"notnull"` Video *Video `json:"video" valid:"-"` - VideoId string `json:"-" valid:"-" gorm:"column:video_id;type:uuid;notnull"` + VideoId string `json:"-" valid:"-"` Error string `json:"-" valid:"-"` CreatedAt time.Time `json:"created_at" valid:"-"` UpdatedAt time.Time `json:"updated_at" valid:"-"` diff --git a/domain/video.go b/domain/video.go index fa73b47..c7a64fe 100644 --- a/domain/video.go +++ b/domain/video.go @@ -11,11 +11,11 @@ func init() { } type Video struct { - ID string `json:"encoded_vide_folder" valid:"uuid" gorm:"type:uuid;primary_key"` + ID string `json:"encoded_vide_folder" valid:"uuid"` ResourceID string `json:"resource_id" valid:"notnull"` FilePath string `json:"file_path" valid:"notnull"` CreatedAt time.Time `json:"-" valid:"-"` - Jobs []*Job `json:"-" valid:"-" gorm:"ForeignKey:VideoId"` + Jobs []*Job `json:"-" valid:"-"` } func NewVideo(resourceID, filePath string) (*Video, error) { diff --git a/framework/cmd/server/server.go b/framework/cmd/server/server.go index 2205368..74edb6e 100644 --- a/framework/cmd/server/server.go +++ b/framework/cmd/server/server.go @@ -8,7 +8,6 @@ import ( "github.com/olukkas/go-encoder/framework/utils" "github.com/streadway/amqp" "os" - "strconv" ) var db database.DataBase @@ -22,18 +21,6 @@ func init() { } func configDb() error { - autoMigrateDb, err := strconv.ParseBool(os.Getenv("AUTO_MIGRATE_DB")) - if err != nil { - return err - } - - debug, err := strconv.ParseBool(os.Getenv("DEBUG")) - if err != nil { - return err - } - - db.AutoMigrate = autoMigrateDb - db.Debug = debug db.Dsn = os.Getenv("DSN") db.DbType = os.Getenv("DB_TYPE") db.Env = os.Getenv("ENV") diff --git a/framework/database/db.go b/framework/database/db.go index 19bfa1c..2ea0983 100644 --- a/framework/database/db.go +++ b/framework/database/db.go @@ -1,33 +1,24 @@ package database import ( - "github.com/jinzhu/gorm" + "database/sql" _ "github.com/jinzhu/gorm/dialects/sqlite" _ "github.com/lib/pq" - "github.com/olukkas/go-encoder/domain" "log" ) type DataBase struct { - Db *gorm.DB - Env string - Dsn string - Debug bool - DbType string - AutoMigrate bool + Db *sql.DB + Env string + Dsn string + DbType string } -func NewDataBase() *DataBase { - return &DataBase{} -} - -func NewDataBaseTest() *gorm.DB { +func NewDataBaseTest() *sql.DB { db := DataBase{ - Env: "test", - DbType: "sqlite3", - Dsn: ":memory:", - AutoMigrate: true, - Debug: true, + Env: "test", + DbType: "sqlite3", + Dsn: ":memory:", } conn, err := db.Connect() @@ -38,23 +29,13 @@ func NewDataBaseTest() *gorm.DB { return conn } -func (d *DataBase) Connect() (*gorm.DB, error) { +func (d *DataBase) Connect() (*sql.DB, error) { var err error - d.Db, err = gorm.Open(d.DbType, d.Dsn) + d.Db, err = sql.Open(d.DbType, d.Dsn) if err != nil { return nil, err } - if d.Debug { - d.Db.LogMode(true) - } - - if d.AutoMigrate { - d.Db.AutoMigrate(&domain.Video{}, &domain.Job{}) - d.Db.Model(domain.Job{}). - AddForeignKey("video_id", "videos (id)", "CASCADE", "CASCADE") - } - return d.Db, nil }