diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 048e1a5..e476ff8 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -31,21 +31,20 @@ import ( var migrationsFS embed.FS const ( - PendingJobIDQuery = `SELECT id + JobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error FROM neoq_jobs - WHERE queue = $1 + WHERE id = $1 AND status NOT IN ('processed') - AND run_after <= NOW() FOR UPDATE SKIP LOCKED LIMIT 1` - PendingJobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error + PendingJobIDQuery = `SELECT id FROM neoq_jobs - WHERE id = $1 + WHERE queue = $1 AND status NOT IN ('processed') AND run_after <= NOW() FOR UPDATE SKIP LOCKED LIMIT 1` - FutureJobQuery = `SELECT id,run_after + FutureJobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error FROM neoq_jobs WHERE queue = $1 AND status NOT IN ('processed') @@ -75,7 +74,7 @@ type PgBackend struct { cron *cron.Cron mu *sync.RWMutex // mutex to protect mutating state on a pgWorker pool *pgxpool.Pool - futureJobs map[string]time.Time // map of future job IDs to their due time + futureJobs map[string]*jobs.Job // map of future job IDs to the corresponding job record handlers map[string]handler.Handler // a map of queue names to queue handlers cancelFuncs []context.CancelFunc // A collection of cancel functions to be called upon Shutdown() } @@ -112,7 +111,7 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err mu: &sync.RWMutex{}, config: cfg, handlers: make(map[string]handler.Handler), - futureJobs: make(map[string]time.Time), + futureJobs: make(map[string]*jobs.Job), cron: cron.New(), cancelFuncs: []context.CancelFunc{}, } @@ -322,7 +321,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e // add future jobs to the future job list if job.RunAfter.After(time.Now().UTC()) { p.mu.Lock() - p.futureJobs[jobID] = job.RunAfter + p.futureJobs[jobID] = job p.mu.Unlock() p.logger.Debug("added job to future jobs list", "queue", job.Queue, "job_id", jobID, "run_after", job.RunAfter) } @@ -511,7 +510,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { if time.Until(runAfter) > 0 { p.mu.Lock() - p.futureJobs[fmt.Sprint(job.ID)] = runAfter + p.futureJobs[fmt.Sprint(job.ID)] = job p.mu.Unlock() } @@ -579,14 +578,16 @@ func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error return } - var id string - var runAfter time.Time - _, err = pgx.ForEachRow(rows, []any{&id, &runAfter}, func() error { + futureJobs, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByName[jobs.Job]) + if err != nil { + return + } + + for _, job := range futureJobs { p.mu.Lock() - p.futureJobs[id] = runAfter + p.futureJobs[fmt.Sprintf("%d", job.ID)] = job p.mu.Unlock() - return nil - }) + } return } @@ -604,15 +605,15 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) { for { // loop over list of future jobs, scheduling goroutines to wait for jobs that are due within the next 30 seconds p.mu.Lock() - for jobID, runAfter := range p.futureJobs { - timeUntillRunAfter := time.Until(runAfter) + for jobID, job := range p.futureJobs { + timeUntillRunAfter := time.Until(job.RunAfter) if timeUntillRunAfter <= p.config.FutureJobWindow { delete(p.futureJobs, jobID) - go func(jid string) { + go func(jid string, j *jobs.Job) { scheduleCh := time.After(timeUntillRunAfter) <-scheduleCh - p.announceJob(ctx, queue, jid) - }(jobID) + p.announceJob(ctx, j.Queue, jid) + }(jobID, job) } } p.mu.Unlock() @@ -708,7 +709,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl } defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed - job, err = p.getPendingJob(ctx, tx, jobID) + job, err = p.getJob(ctx, tx, jobID) if err != nil { return } @@ -815,8 +816,8 @@ func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue strin conn.Release() } -func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID string) (job *jobs.Job, err error) { - row, err := tx.Query(ctx, PendingJobQuery, jobID) +func (p *PgBackend) getJob(ctx context.Context, tx pgx.Tx, jobID string) (job *jobs.Job, err error) { + row, err := tx.Query(ctx, JobQuery, jobID) if err != nil { return } diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index df5c8a2..57bc0e3 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -192,8 +192,8 @@ func TestMultipleProcessors(t *testing.T) { // From one of the neoq clients, enqueue several jobs. At least one per processor registered above. nq := neos[0] + wg.Add(ConcurrentWorkers) for i := 0; i < ConcurrentWorkers; i++ { - wg.Add(1) ctx := context.Background() deadline := time.Now().UTC().Add(10 * time.Second) jid, e := nq.Enqueue(ctx, &jobs.Job{ @@ -498,6 +498,7 @@ results_loop: } // Test_MoveJobsToDeadQueue tests that when a job's MaxRetries is reached, that the job is moved ot the dead queue successfully +// https://github.com/acaloiaro/neoq/issues/98 func Test_MoveJobsToDeadQueue(t *testing.T) { connString, conn := prepareAndCleanupDB(t) const queue = "testing" @@ -639,3 +640,101 @@ func TestJobEnqueuedSeparately(t *testing.T) { t.Error(err) } } + +// TestBasicJobMultipleQueueWithError tests that the postgres backend is able to process jobs on multiple queues +// and retries occur +// https://github.com/acaloiaro/neoq/issues/98 +// nolint: gocyclo +func TestBasicJobMultipleQueueWithError(t *testing.T) { + connString, conn := prepareAndCleanupDB(t) + const queue = "testing" + const queue2 = "testing2" + + ctx := context.TODO() + nq, err := neoq.New(ctx, + neoq.WithBackend(postgres.Backend), + neoq.WithLogLevel(logging.LogLevelDebug), + postgres.WithConnectionString(connString)) + if err != nil { + t.Fatal(err) + } + defer nq.Shutdown(ctx) + + h := handler.New(queue, func(_ context.Context) (err error) { + return + }) + + h2 := handler.New(queue2, func(_ context.Context) (err error) { + panic("no good") + }) + + err = nq.Start(ctx, h) + if err != nil { + t.Error(err) + } + + err = nq.Start(ctx, h2) + if err != nil { + t.Error(err) + } + + job2Chan := make(chan string, 100) + go func() { + jid, err := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": fmt.Sprintf("should not fail: %d", internal.RandInt(10000000000)), + }, + }) + if err != nil || jid == jobs.DuplicateJobID { + t.Error(err) + } + + maxRetries := 1 + jid2, err := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue2, + Payload: map[string]interface{}{ + "message": fmt.Sprintf("should fail: %d", internal.RandInt(10000000000)), + }, + MaxRetries: &maxRetries, + }) + if err != nil || jid2 == jobs.DuplicateJobID { + t.Error(err) + } + + job2Chan <- jid2 + }() + + // wait for the job to process before waiting for updates + jid2 := <-job2Chan + + // ensure job has fields set correctly + maxWait := time.Now().Add(30 * time.Second) + var status string + for { + if time.Now().After(maxWait) { + break + } + + err = conn. + QueryRow(context.Background(), "SELECT status FROM neoq_dead_jobs WHERE id = $1", jid2). + Scan(&status) + + if err == nil { + break + } + + // jid2 is empty until the job gets queued + if jid2 == "" || err != nil && errors.Is(err, pgx.ErrNoRows) { + time.Sleep(100 * time.Millisecond) + continue + } else if err != nil { + t.Error(err) + return + } + } + + if status != internal.JobStatusFailed { + t.Error("should be dead") + } +}