From b4018091900cb48abbb6ec2867e475e3186d23b9 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Wed, 27 Mar 2024 12:06:13 -0600 Subject: [PATCH] feat: fix infinite scheduling loop when job gets scheduled after deadline --- backends/postgres/postgres_backend.go | 17 ++++++++-- backends/postgres/postgres_backend_test.go | 39 +++++++++++++++++++--- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 8826329..36774dd 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -70,9 +70,11 @@ var ( shutdownJobID = "-1" // job ID announced when triggering a shutdown shutdownAnnouncementAllowance = 100 // ms ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings") + ErrConnectionStringEmpty = errors.New("connection string cannot be empty") ErrDuplicateJob = errors.New("duplicate job") ErrNoTransactionInContext = errors.New("context does not have a Tx set") ErrExceededConnectionPoolTimeout = errors.New("exceeded timeout acquiring a connection from the pool") + ErrUnsupportedURIScheme = errors.New("only postgres:// and postgresql:// scheme URIs are supported, invalid connection string") ) // PgBackend is a Postgres-based Neoq backend @@ -855,6 +857,17 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) { err = jobs.ErrJobExceededDeadline p.logger.Debug("job deadline is in the past, skipping", slog.String("queue", job.Queue), slog.Int64("job_id", job.ID)) err = p.updateJob(ctx, err) + if err != nil { + p.logger.Error("unable to update job status", "error", err, "job_id", job.ID) + return + } + + err = tx.Commit(ctx) + if err != nil { + p.logger.Error("unable to update job status", "error", err, "job_id", job.ID) + return + } + return } @@ -1027,7 +1040,7 @@ func GetPQConnectionString(connectionString string) (string, error) { } if dbURI.String() == "" { - return "", fmt.Errorf("connection string cannot be empty") + return "", ErrConnectionStringEmpty } scheme := dbURI.Scheme @@ -1038,7 +1051,7 @@ func GetPQConnectionString(connectionString string) (string, error) { if scheme != "postgres" && scheme != "postgresql" { // This isn't a postgresql URI-style string (postgres://hostname/db) - return "", fmt.Errorf("only postgres and postgresql scheme URIs are supported, invalid connection string: %s", connectionString) + return "", ErrUnsupportedURIScheme } sslMode := "verify-ca" diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 63d4540..9a8ee48 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -906,7 +906,7 @@ func TestGetPQConnectionString(t *testing.T) { { name: "multiple hostnames", input: "postgres://username:password@hostname1,hostname2,hostname3:5432/database", - want: "postgres://username:password@hostname1,hostname2,hostname3:5432/database?sslmode=require&x-migrations-table=neoq_schema_migrations", + want: "postgres://username:password@hostname1,hostname2,hostname3:5432/database?sslmode=require&x-migrations-table=neoq_schema_migrations", // nolint: lll wantErr: false, }, @@ -1004,8 +1004,9 @@ func TestGetPQConnectionString(t *testing.T) { // with an error indicating that its deadline was not met // https://github.com/acaloiaro/neoq/issues/123 func TestJobWithPastDeadline(t *testing.T) { - connString, _ := prepareAndCleanupDB(t) + connString, conn := prepareAndCleanupDB(t) const queue = "testing" + timeoutTimer := time.After(5 * time.Second) maxRetries := 5 done := make(chan bool) defer close(done) @@ -1018,7 +1019,6 @@ func TestJobWithPastDeadline(t *testing.T) { defer nq.Shutdown(ctx) h := handler.New(queue, func(_ context.Context) (err error) { - done <- true return }) @@ -1038,10 +1038,39 @@ func TestJobWithPastDeadline(t *testing.T) { MaxRetries: &maxRetries, }) if e != nil || jid == jobs.DuplicateJobID { - t.Error(e) + t.Error(e) // nolint: goerr113 } if e != nil && !errors.Is(e, jobs.ErrJobExceededDeadline) { - t.Error(err) + t.Error(err) // nolint: goerr113 + } + + var status string + go func() { + // ensure job has failed/has the correct status + for { + err = conn. + QueryRow(context.Background(), "SELECT status FROM neoq_jobs WHERE id = $1", jid). + Scan(&status) + if err != nil { + break + } + + if status == internal.JobStatusFailed { + done <- true + break + } + + time.Sleep(50 * time.Millisecond) + } + }() + + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + case <-done: + } + if err != nil { + t.Errorf("job should have resulted in a status of 'failed', but its status is %s", status) } }