diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index 3ac24f483e..65b2bdb4ce 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -45,7 +45,6 @@ import ( "github.com/hashicorp/boundary/internal/ratelimit" "github.com/hashicorp/boundary/internal/recording" "github.com/hashicorp/boundary/internal/scheduler" - "github.com/hashicorp/boundary/internal/scheduler/cleaner" "github.com/hashicorp/boundary/internal/scheduler/job" "github.com/hashicorp/boundary/internal/server" serversjob "github.com/hashicorp/boundary/internal/server/job" @@ -397,8 +396,8 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { jobRepoFn := func() (*job.Repository, error) { return job.NewRepository(ctx, dbase, dbase, c.kms) } - // TODO: Allow setting run jobs limit from config - schedulerOpts := []scheduler.Option{scheduler.WithRunJobsLimit(-1)} + + schedulerOpts := []scheduler.Option{} if c.conf.RawConfig.Controller.Scheduler.JobRunIntervalDuration > 0 { schedulerOpts = append(schedulerOpts, scheduler.WithRunJobsInterval(c.conf.RawConfig.Controller.Scheduler.JobRunIntervalDuration)) } @@ -637,9 +636,6 @@ func (c *Controller) registerJobs() error { if err := kmsjob.RegisterJobs(c.baseContext, c.scheduler, c.kms); err != nil { return err } - if err := cleaner.RegisterJob(c.baseContext, c.scheduler, rw); err != nil { - return err - } if err := snapshot.RegisterJob(c.baseContext, c.scheduler, rw, rw); err != nil { return err } diff --git a/internal/db/schema/migrations/oss/postgres/7/03_job.up.sql b/internal/db/schema/migrations/oss/postgres/7/03_job.up.sql index 472762ca49..cf7cb29de5 100644 --- a/internal/db/schema/migrations/oss/postgres/7/03_job.up.sql +++ b/internal/db/schema/migrations/oss/postgres/7/03_job.up.sql @@ -20,6 +20,7 @@ begin; create trigger immutable_columns before update on job for each row execute procedure immutable_columns('plugin_id', 'name'); + -- updated in 93/01_job_run_clean.up.sql create table job_run_status_enm ( name text not null primary key constraint only_predefined_job_status_allowed @@ -28,6 +29,7 @@ begin; comment on table job_run_status_enm is 'job_run_status_enm is an enumeration table where each row contains a valid job run state.'; + -- updated in 93/01_job_run_clean.up.sql insert into job_run_status_enm (name) values ('running'), @@ -84,6 +86,7 @@ begin; create trigger immutable_columns before update on job_run for each row execute procedure immutable_columns('private_id', 'job_plugin_id', 'job_name', 'create_time'); + -- dropped in 93/02_drop_job_jobs_to_run.up.sql create view job_jobs_to_run as with running_jobs (job_plugin_id, job_name) as ( diff --git a/internal/db/schema/migrations/oss/postgres/93/01_job_run_clean.up.sql b/internal/db/schema/migrations/oss/postgres/93/01_job_run_clean.up.sql new file mode 100644 index 0000000000..d09a197775 --- /dev/null +++ b/internal/db/schema/migrations/oss/postgres/93/01_job_run_clean.up.sql @@ -0,0 +1,37 @@ +-- Copyright (c) HashiCorp, Inc. +-- SPDX-License-Identifier: BUSL-1.1 + +-- Boundary's design on removing entries from job_run has changed from having a +-- job that periodically cleans the table to a design where the scheduler +-- handles this by itself if the job is successful. It is possible that some +-- entries are left in the table with this change (eg: Boundary is stopped after +-- some jobs run but before the cleaner job runs). +-- +-- These entries would forever be stored, so this migration cleans them to +-- ensure no dangling rows are left behind. +-- +-- It also updates the valid statues enum to reflect the ones in use. + +begin; + delete from job_run where status = 'completed'; + + delete from job_run where job_name = 'job_run_cleaner'; + delete from job where name = 'job_run_cleaner'; + + comment on index job_run_status_ix is + 'the job_run_status_ix indexes the commonly-used status field'; + + comment on table job_run is + 'job_run is a table where each row represents an instance of a job run that is either actively running or has failed in some way.'; + + -- Since we don't set completed anymore, but rather remove the job_run entry, + -- remove 'completed' from the valid statuses. + -- updates 7/03_job.up.sql. + delete from job_run_status_enm where name = 'completed'; + + alter table job_run_status_enm + drop constraint only_predefined_job_status_allowed, + add constraint only_predefined_job_status_allowed + check(name in ('running', 'failed', 'interrupted')); + +commit; diff --git a/internal/db/schema/migrations/oss/postgres/93/02_drop_job_jobs_to_run.up.sql b/internal/db/schema/migrations/oss/postgres/93/02_drop_job_jobs_to_run.up.sql new file mode 100644 index 0000000000..a1be8736dc --- /dev/null +++ b/internal/db/schema/migrations/oss/postgres/93/02_drop_job_jobs_to_run.up.sql @@ -0,0 +1,10 @@ +-- Copyright (c) HashiCorp, Inc. +-- SPDX-License-Identifier: BUSL-1.1 + +-- This migration removes support for the job_jobs_to_run view as it is not used +-- anymore by the job repository. + +begin; + -- drops view from 7/03_job.up.sql + drop view job_jobs_to_run; +commit; \ No newline at end of file diff --git a/internal/proto/controller/storage/job/store/v1/job.proto b/internal/proto/controller/storage/job/store/v1/job.proto index 949c249270..e362b79ff5 100644 --- a/internal/proto/controller/storage/job/store/v1/job.proto +++ b/internal/proto/controller/storage/job/store/v1/job.proto @@ -72,7 +72,7 @@ message JobRun { // @inject_tag: `gorm:"default:0"` uint32 retries_count = 12; - // status of the job run (running, completed, failed or interrupted). + // status of the job run (running, failed or interrupted). // @inject_tag: `gorm:"not_null"` string status = 10; diff --git a/internal/scheduler/additional_verification_test.go b/internal/scheduler/additional_verification_test.go index cacab68ea1..2d1bf498f8 100644 --- a/internal/scheduler/additional_verification_test.go +++ b/internal/scheduler/additional_verification_test.go @@ -35,7 +35,7 @@ func TestSchedulerWorkflow(t *testing.T) { }) err := event.InitSysEventer(testLogger, testLock, "TestSchedulerWorkflow", event.WithEventerConfig(testConfig)) require.NoError(err) - sched := TestScheduler(t, conn, wrapper, WithRunJobsLimit(10), WithRunJobsInterval(time.Second)) + sched := TestScheduler(t, conn, wrapper, WithRunJobsInterval(time.Second)) job1Ch := make(chan error) job1Ready := make(chan struct{}) @@ -118,7 +118,7 @@ func TestSchedulerCancelCtx(t *testing.T) { err := event.InitSysEventer(testLogger, testLock, "TestSchedulerCancelCtx", event.WithEventerConfig(testConfig)) require.NoError(err) - sched := TestScheduler(t, conn, wrapper, WithRunJobsLimit(10), WithRunJobsInterval(time.Second)) + sched := TestScheduler(t, conn, wrapper, WithRunJobsInterval(time.Second)) fn, jobReady, jobDone := testJobFn() tj := testJob{name: "name", description: "desc", fn: fn, nextRunIn: time.Hour} @@ -168,7 +168,7 @@ func TestSchedulerInterruptedCancelCtx(t *testing.T) { err := event.InitSysEventer(testLogger, testLock, "TestSchedulerInterruptedCancelCtx", event.WithEventerConfig(testConfig)) require.NoError(err) - sched := TestScheduler(t, conn, wrapper, WithRunJobsLimit(10), WithRunJobsInterval(time.Second), WithMonitorInterval(time.Second)) + sched := TestScheduler(t, conn, wrapper, WithRunJobsInterval(time.Second), WithMonitorInterval(time.Second)) fn, job1Ready, job1Done := testJobFn() tj1 := testJob{name: "name1", description: "desc", fn: fn, nextRunIn: time.Hour} @@ -270,7 +270,7 @@ func TestSchedulerJobProgress(t *testing.T) { err := event.InitSysEventer(testLogger, testLock, "TestSchedulerJobProgress", event.WithEventerConfig(testConfig)) require.NoError(err) - sched := TestScheduler(t, conn, wrapper, WithRunJobsLimit(10), WithRunJobsInterval(time.Second), WithMonitorInterval(time.Second)) + sched := TestScheduler(t, conn, wrapper, WithRunJobsInterval(time.Second), WithMonitorInterval(time.Second)) jobReady := make(chan struct{}) done := make(chan struct{}) @@ -380,7 +380,7 @@ func TestSchedulerMonitorLoop(t *testing.T) { err := event.InitSysEventer(testLogger, testLock, "TestSchedulerMonitorLoop", event.WithEventerConfig(testConfig)) require.NoError(err) - sched := TestScheduler(t, conn, wrapper, WithRunJobsLimit(10), WithInterruptThreshold(time.Second), WithRunJobsInterval(time.Second), WithMonitorInterval(time.Second)) + sched := TestScheduler(t, conn, wrapper, WithInterruptThreshold(time.Second), WithRunJobsInterval(time.Second), WithMonitorInterval(time.Second)) jobReady := make(chan struct{}) jobDone := make(chan struct{}) @@ -446,7 +446,7 @@ func TestSchedulerFinalStatusUpdate(t *testing.T) { err := event.InitSysEventer(testLogger, testLock, "TestSchedulerFinalStatusUpdate", event.WithEventerConfig(testConfig)) require.NoError(err) - sched := TestScheduler(t, conn, wrapper, WithRunJobsLimit(10), WithRunJobsInterval(time.Second)) + sched := TestScheduler(t, conn, wrapper, WithRunJobsInterval(time.Second)) jobReady := make(chan struct{}) jobErr := make(chan error) @@ -489,7 +489,7 @@ func TestSchedulerFinalStatusUpdate(t *testing.T) { repo, err := job.NewRepository(ctx, rw, rw, kmsCache) require.NoError(err) - run := waitForRunStatus(t, repo, runId, string(job.Failed)) + run := waitForRunStatus(t, repo, runId, job.Failed) assert.Equal(uint32(10), run.TotalCount) assert.Equal(uint32(10), run.CompletedCount) @@ -502,17 +502,9 @@ func TestSchedulerFinalStatusUpdate(t *testing.T) { runId = runJob.(*runningJob).runId // Complete job without error so CompleteRun is called + completeFn := waitForRunComplete(t, sched, repo, runId, tj.name) jobErr <- nil - - // Report status - jobStatus <- JobStatus{Total: 20, Completed: 20} - - repo, err = job.NewRepository(ctx, rw, rw, kmsCache) - require.NoError(err) - - run = waitForRunStatus(t, repo, runId, string(job.Completed)) - assert.Equal(uint32(20), run.TotalCount) - assert.Equal(uint32(20), run.CompletedCount) + completeFn() baseCnl() close(testDone) @@ -538,7 +530,7 @@ func TestSchedulerRunNow(t *testing.T) { require.NoError(err) // Create test scheduler that only runs jobs every hour - sched := TestScheduler(t, conn, wrapper, WithRunJobsLimit(10), WithRunJobsInterval(time.Hour)) + sched := TestScheduler(t, conn, wrapper, WithRunJobsInterval(time.Hour)) jobCh := make(chan struct{}) jobReady := make(chan struct{}) @@ -570,12 +562,13 @@ func TestSchedulerRunNow(t *testing.T) { require.True(ok) runId := runJob.(*runningJob).runId - // Complete job - jobCh <- struct{}{} - repo, err := job.NewRepository(ctx, rw, rw, kmsCache) require.NoError(err) - waitForRunStatus(t, repo, runId, string(job.Completed)) + + // Complete job + completeFn := waitForRunComplete(t, sched, repo, runId, tj.name) + jobCh <- struct{}{} + completeFn() // Update job to run immediately once scheduling loop is called err = sched.UpdateJobNextRunInAtLeast(context.Background(), tj.name, 0) @@ -600,9 +593,9 @@ func TestSchedulerRunNow(t *testing.T) { runId = runJob.(*runningJob).runId // Complete job + completeFn = waitForRunComplete(t, sched, repo, runId, tj.name) jobCh <- struct{}{} - - waitForRunStatus(t, repo, runId, string(job.Completed)) + completeFn() // Update job to run again with RunNow option err = sched.UpdateJobNextRunInAtLeast(context.Background(), tj.name, 0, WithRunNow(true)) @@ -620,7 +613,34 @@ func TestSchedulerRunNow(t *testing.T) { close(jobCh) } -func waitForRunStatus(t *testing.T, repo *job.Repository, runId, status string) *job.Run { +func waitForRunComplete(t *testing.T, sched *Scheduler, repo *job.Repository, runId, jobName string) func() { + r, err := repo.LookupRun(context.Background(), runId) + require.NoError(t, err) + require.EqualValues(t, job.Running, r.Status) + + return func() { + timeout := time.NewTimer(5 * time.Second) + for { + select { + case <-timeout.C: + t.Fatal(fmt.Errorf("timed out waiting for job run %q to be completed", runId)) + case <-time.After(100 * time.Millisecond): + } + + // A run is complete when we don't find it in the scheduler's + // running jobs list and also not in the job_run table. + _, ok := sched.runningJobs.Load(jobName) + if !ok { + r, err = repo.LookupRun(context.Background(), runId) + require.Nil(t, r) + require.Nil(t, err) + break + } + } + } +} + +func waitForRunStatus(t *testing.T, repo *job.Repository, runId string, status job.Status) *job.Run { t.Helper() var run *job.Run @@ -636,7 +656,7 @@ func waitForRunStatus(t *testing.T, repo *job.Repository, runId, status string) var err error run, err = repo.LookupRun(context.Background(), runId) require.NoError(t, err) - if run.Status == status { + if run.Status == string(status) { break } } diff --git a/internal/scheduler/cleaner/cleaner.go b/internal/scheduler/cleaner/cleaner.go deleted file mode 100644 index 86358cb586..0000000000 --- a/internal/scheduler/cleaner/cleaner.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package cleaner - -import ( - "context" - - "github.com/hashicorp/boundary/internal/db" - "github.com/hashicorp/boundary/internal/errors" - "github.com/hashicorp/boundary/internal/scheduler" - "github.com/hashicorp/boundary/internal/util" -) - -// RegisterJob registers the cleaner job with the provided scheduler. -func RegisterJob(ctx context.Context, s *scheduler.Scheduler, w db.Writer) error { - const op = "cleaner.RegisterJob" - if s == nil { - return errors.New(ctx, errors.Internal, "nil scheduler", op, errors.WithoutEvent()) - } - if util.IsNil(w) { - return errors.New(ctx, errors.Internal, "nil DB writer", op, errors.WithoutEvent()) - } - - if err := s.RegisterJob(ctx, newCleanerJob(w)); err != nil { - return errors.Wrap(ctx, err, op) - } - - return nil -} diff --git a/internal/scheduler/cleaner/cleaner_job.go b/internal/scheduler/cleaner/cleaner_job.go deleted file mode 100644 index a7fa39abd9..0000000000 --- a/internal/scheduler/cleaner/cleaner_job.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package cleaner - -import ( - "context" - "time" - - "github.com/hashicorp/boundary/internal/db" - "github.com/hashicorp/boundary/internal/errors" - "github.com/hashicorp/boundary/internal/scheduler" -) - -type cleanerJob struct { - w db.Writer -} - -func newCleanerJob(w db.Writer) *cleanerJob { - return &cleanerJob{ - w: w, - } -} - -// Status reports the job’s current status. -func (c *cleanerJob) Status() scheduler.JobStatus { - return scheduler.JobStatus{} -} - -// Run performs the required work depending on the implementation. -// The context is used to notify the job that it should exit early. -func (c *cleanerJob) Run(ctx context.Context, _ time.Duration) error { - const op = "cleaner.(cleanerJob).Run" - - if _, err := c.w.Exec(ctx, "delete from job_run where status='completed'", nil); err != nil { - return errors.Wrap(ctx, err, op) - } - - return nil -} - -// NextRunIn returns the duration until the next job run should be scheduled. -// We report as ready immediately after a successful run. This doesn't mean that -// this job will run immediately, only about as often as the configured scheduler interval. -func (c *cleanerJob) NextRunIn(_ context.Context) (time.Duration, error) { - return 0, nil -} - -// Name is the unique name of the job. -func (c *cleanerJob) Name() string { - return "job_run_cleaner" -} - -// Description is the human readable description of the job. -func (c *cleanerJob) Description() string { - return "Cleans completed job runs" -} diff --git a/internal/scheduler/cleaner/cleaner_test.go b/internal/scheduler/cleaner/cleaner_test.go deleted file mode 100644 index 13eb4bbbdf..0000000000 --- a/internal/scheduler/cleaner/cleaner_test.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package cleaner_test - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/hashicorp/boundary/internal/db" - "github.com/hashicorp/boundary/internal/scheduler" - "github.com/hashicorp/boundary/internal/scheduler/cleaner" - "github.com/hashicorp/boundary/internal/scheduler/job" - "github.com/stretchr/testify/require" -) - -func TestCleanerJob(t *testing.T) { - conn, _ := db.TestSetup(t, "postgres") - rw := db.New(conn) - wrapper := db.TestWrapper(t) - s := scheduler.TestScheduler(t, conn, wrapper, scheduler.WithMonitorInterval(10*time.Millisecond)) - err := cleaner.RegisterJob(context.Background(), s, rw) - require.NoError(t, err) - wg := &sync.WaitGroup{} - err = s.Start(context.Background(), wg) - require.NoError(t, err) - - // Trigger some runs, waiting for the cleaner to run - for i := 0; i < 10; i++ { - s.RunNow() - // Wait to allow for the job to finish - time.Sleep(50 * time.Millisecond) - } - - var jobRuns []*job.Run - err = rw.SearchWhere(context.Background(), &jobRuns, "", nil) - require.NoError(t, err) - - // We should have run 10 times, as long as some of them - // have been cleaned we should succeed. - require.True(t, len(jobRuns) < 10, "expected fewer than 10 job_run rows, found %d", len(jobRuns)) -} - -func TestRegisterJob(t *testing.T) { - conn, _ := db.TestSetup(t, "postgres") - rw := db.New(conn) - wrapper := db.TestWrapper(t) - s := scheduler.TestScheduler(t, conn, wrapper) - - t.Run("succeeds", func(t *testing.T) { - err := cleaner.RegisterJob(context.Background(), s, rw) - require.NoError(t, err) - }) - t.Run("fails-on-nil-scheduler", func(t *testing.T) { - err := cleaner.RegisterJob(context.Background(), nil, rw) - require.Error(t, err) - }) - t.Run("fails-on-nil-db-writer", func(t *testing.T) { - err := cleaner.RegisterJob(context.Background(), s, nil) - require.Error(t, err) - }) -} diff --git a/internal/scheduler/job/additional_verification_test.go b/internal/scheduler/job/additional_verification_test.go index 0e9c351fd2..6ec748efe6 100644 --- a/internal/scheduler/job/additional_verification_test.go +++ b/internal/scheduler/job/additional_verification_test.go @@ -54,9 +54,8 @@ func TestJobWorkflow(t *testing.T) { require.NoError(err) assert.Nil(newRuns) - run, err = repo.CompleteRun(ctx, run.PrivateId, time.Hour, 0, 0, 0) + err = repo.CompleteRun(ctx, run.PrivateId, time.Hour) require.NoError(err) - assert.Equal(Completed.string(), run.Status) job, err = repo.LookupJob(ctx, job.Name) require.NoError(err) diff --git a/internal/scheduler/job/doc.go b/internal/scheduler/job/doc.go index dce76363e9..b67e53e999 100644 --- a/internal/scheduler/job/doc.go +++ b/internal/scheduler/job/doc.go @@ -45,5 +45,5 @@ // nextJobRun = time.Now().Add(time.Hour) // // repo, _ = job.NewRepository(db, db, wrapper) -// run, _ = repo.CompleteRun(ctx, run.PrivateId, job.Completed, nextJobRun) +// _ = repo.CompleteRun(ctx, run.PrivateId nextJobRun) package job diff --git a/internal/scheduler/job/options.go b/internal/scheduler/job/options.go index 9dd8521333..4d68ca4b6a 100644 --- a/internal/scheduler/job/options.go +++ b/internal/scheduler/job/options.go @@ -8,8 +8,7 @@ import ( ) const ( - defaultRunJobsLimit = 1 - defaultPluginId = "pi_system" + defaultPluginId = "pi_system" ) // getOpts - iterate the inbound Options and return a struct @@ -27,16 +26,13 @@ type Option func(*options) // options = how options are represented type options struct { withNextRunIn time.Duration - withRunJobsLimit int withLimit int withName string withControllerId string } func getDefaultOptions() options { - return options{ - withRunJobsLimit: defaultRunJobsLimit, - } + return options{} // No default options. } // WithNextRunIn provides an option to provide the duration until the next run is scheduled. @@ -48,18 +44,6 @@ func WithNextRunIn(d time.Duration) Option { } } -// WithRunJobsLimit provides an option to provide the number of jobs to run. -// If WithRunJobsLimit == 0, then default run jobs limit is used. -// If WithRunJobsLimit < 0, then no limit is used. -func WithRunJobsLimit(l int) Option { - return func(o *options) { - o.withRunJobsLimit = l - if o.withRunJobsLimit == 0 { - o.withRunJobsLimit = defaultRunJobsLimit - } - } -} - // WithLimit provides an option to provide a limit for ListJobs. Intentionally // allowing negative integers. If WithLimit < 0, then unlimited results are // returned. If WithLimit == 0, then default limits are used for results. diff --git a/internal/scheduler/job/options_test.go b/internal/scheduler/job/options_test.go index ca0a53c88a..115381b6e4 100644 --- a/internal/scheduler/job/options_test.go +++ b/internal/scheduler/job/options_test.go @@ -21,21 +21,6 @@ func Test_GetOpts(t *testing.T) { testOpts.withNextRunIn = time.Hour assert.Equal(opts, testOpts) }) - t.Run("WithRunJobsLimit", func(t *testing.T) { - assert := assert.New(t) - opts := getOpts(WithRunJobsLimit(10)) - testOpts := getDefaultOptions() - assert.NotEqual(opts, testOpts) - testOpts.withRunJobsLimit = 10 - assert.Equal(opts, testOpts) - }) - t.Run("WithZeroRunJobsLimit", func(t *testing.T) { - assert := assert.New(t) - opts := getOpts(WithRunJobsLimit(0)) - testOpts := getDefaultOptions() - assert.Equal(opts, testOpts) - assert.Equal(defaultRunJobsLimit, opts.withRunJobsLimit) - }) t.Run("WithLimit", func(t *testing.T) { assert := assert.New(t) opts := getOpts(WithLimit(100)) diff --git a/internal/scheduler/job/query.go b/internal/scheduler/job/query.go index 41bd0c3185..686fb9b9cc 100644 --- a/internal/scheduler/job/query.go +++ b/internal/scheduler/job/query.go @@ -7,13 +7,13 @@ const runJobsQuery = ` insert into job_run ( job_plugin_id, job_name, controller_id ) - select - job_plugin_id, job_name, ? - from job_jobs_to_run + select + j.plugin_id, j."name", ? + from job j + where next_scheduled_run <= current_timestamp order by next_scheduled_run asc - %s - on conflict - (job_plugin_id, job_name) + on conflict + (job_plugin_id, job_name) where status = 'running' do nothing returning *; @@ -74,14 +74,7 @@ const updateProgressQuery = ` ` const completeRunQuery = ` - update - job_run - set - completed_count = ?, - total_count = ?, - retries_count = ?, - status = 'completed', - end_time = current_timestamp + delete from job_run where private_id = ? and status = 'running' diff --git a/internal/scheduler/job/repository_run.go b/internal/scheduler/job/repository_run.go index e31044e995..b5c0eec419 100644 --- a/internal/scheduler/job/repository_run.go +++ b/internal/scheduler/job/repository_run.go @@ -17,29 +17,17 @@ import ( // If there are not jobs to run, an empty slice will be returned with a nil error. // // • serverId is required and is the private_id of the server that will run the jobs. -// -// The only valid option is WithRunJobsLimit, if not provided RunJobs will run only 1 job. -func (r *Repository) RunJobs(ctx context.Context, serverId string, opt ...Option) ([]*Run, error) { +// No options are supported. +func (r *Repository) RunJobs(ctx context.Context, serverId string, _ ...Option) ([]*Run, error) { const op = "job.(Repository).RunJobs" if serverId == "" { return nil, errors.New(ctx, errors.InvalidParameter, op, "missing server id") } - opts := getOpts(opt...) - var limit string - switch { - case opts.withRunJobsLimit == 0: - // zero signals the defaults should be used. - limit = fmt.Sprintf("limit %d", defaultRunJobsLimit) - case opts.withRunJobsLimit > 0: - limit = fmt.Sprintf("limit %d", opts.withRunJobsLimit) - } - - query := fmt.Sprintf(runJobsQuery, limit) var runs []*Run _, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(r db.Reader, w db.Writer) error { - rows, err := w.Query(ctx, query, []any{serverId}) + rows, err := w.Query(ctx, runJobsQuery, []any{serverId}) if err != nil { return errors.Wrap(ctx, err, op) } @@ -68,7 +56,7 @@ func (r *Repository) RunJobs(ctx context.Context, serverId string, opt ...Option // UpdateProgress updates the repository entry's completed and total counts for the provided runId. // -// Once a run has been persisted with a final run status (completed, failed or interrupted), +// Once a run has been persisted with a final run status (failed or interrupted), // any future UpdateProgress attempts will return an error with Code errors.InvalidJobRunState. // All options are ignored. func (r *Repository) UpdateProgress(ctx context.Context, runId string, completed, total, retries int, _ ...Option) (*Run, error) { @@ -123,32 +111,26 @@ func (r *Repository) UpdateProgress(ctx context.Context, runId string, completed return run, nil } -// CompleteRun updates the Run repository entry for the provided runId. -// It sets the status to 'completed', updates the run's EndTime to the current database -// time, and sets the completed and total counts. -// CompleteRun also updates the Job repository entry that is associated with this run, -// setting the job's NextScheduledRun to the current database time incremented by the nextRunIn +// CompleteRun is intended to be called when a job completes successfully. It +// deletes the job_run entry for the provided runId. It also updates the Job +// repository entry that is associated with this run, setting the job's +// NextScheduledRun to the current database time incremented by the nextRunIn // parameter. // -// Once a run has been persisted with a final run status (completed, failed -// or interrupted), any future calls to CompleteRun will return an error with Code -// errors.InvalidJobRunState. -// All options are ignored. -func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn time.Duration, completed, total, retries int, _ ...Option) (*Run, error) { +// If a run is persisted with a final run status (failed or interrupted), any +// calls to CompleteRun will return an error with Code +// errors.InvalidJobRunState. All options are ignored. +func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn time.Duration, _ ...Option) error { const op = "job.(Repository).CompleteRun" if runId == "" { - return nil, errors.New(ctx, errors.InvalidParameter, op, "missing run id") + return errors.New(ctx, errors.InvalidParameter, op, "missing run id") } run := allocRun() run.PrivateId = runId _, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(r db.Reader, w db.Writer) error { - // TODO (lcr 07/2021) this can potentially overwrite completed and total values - // persisted by the scheduler's monitor jobs loop. - // Add an on update sql trigger to protect the job_run table, once progress - // values are used in the critical path. - rows, err := w.Query(ctx, completeRunQuery, []any{completed, total, retries, runId}) + rows, err := w.Query(ctx, completeRunQuery, []any{runId}) if err != nil { return errors.Wrap(ctx, err, op) } @@ -169,7 +151,8 @@ func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn ti return errors.Wrap(ctx, err, op, errors.WithMsg("unable to get next row for job run")) } if rowCnt == 0 { - // Failed to update run, either it does not exist or was in an invalid state + // No rows returned from the query: Either it's already been + // removed or was in a final state (not 'running'). if err = r.LookupById(ctx, run); err != nil { if errors.IsNotFoundError(err) { return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("job run %q does not exist", runId))) @@ -206,17 +189,17 @@ func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn ti }, ) if err != nil { - return nil, errors.Wrap(ctx, err, op) + return errors.Wrap(ctx, err, op) } - return run, nil + return nil } // FailRun updates the Run repository entry for the provided runId. // It sets the status to 'failed' and updates the run's EndTime to the current database // time, and sets the completed and total counts. // -// Once a run has been persisted with a final run status (completed, failed +// Once a run has been persisted with a final run status (failed // or interrupted), any future calls to FailRun will return an error with Code // errors.InvalidJobRunState. // All options are ignored. @@ -280,7 +263,7 @@ func (r *Repository) FailRun(ctx context.Context, runId string, completed, total // updated for the provided interruptThreshold. It sets the status to 'interrupted' and // updates the run's EndTime to the current database time. // -// Once a run has been persisted with a final run status (completed, failed +// Once a run has been persisted with a final run status (failed // or interrupted), any future calls to InterruptRuns will return an error with Code // errors.InvalidJobRunState. // WithControllerId is the only valid option diff --git a/internal/scheduler/job/repository_run_test.go b/internal/scheduler/job/repository_run_test.go index fcc415e251..a5565520a3 100644 --- a/internal/scheduler/job/repository_run_test.go +++ b/internal/scheduler/job/repository_run_test.go @@ -5,7 +5,6 @@ package job import ( "context" - "fmt" "sort" "testing" "time" @@ -120,73 +119,6 @@ func TestRepository_RunJobs(t *testing.T) { } } -func TestRepository_RunJobs_Limits(t *testing.T) { - t.Parallel() - ctx := context.Background() - conn, _ := db.TestSetup(t, "postgres") - rw := db.New(conn) - wrapper := db.TestWrapper(t) - kms := kms.TestKms(t, conn, wrapper) - iam.TestRepo(t, conn, wrapper) - - numJobs := 20 - server := testController(t, conn, wrapper) - - tests := []struct { - name string - opts []Option - wantLen int - }{ - { - name: "with-more-than-available", - opts: []Option{WithRunJobsLimit(numJobs * 2)}, - wantLen: numJobs, - }, - { - name: "with-no-option", - wantLen: defaultRunJobsLimit, - }, - { - name: "with-limit", - opts: []Option{WithRunJobsLimit(3)}, - wantLen: 3, - }, - { - name: "with-zero-limit", - opts: []Option{WithRunJobsLimit(0)}, - wantLen: defaultRunJobsLimit, - }, - { - name: "unlimited", - opts: []Option{WithRunJobsLimit(-1)}, - wantLen: numJobs, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - assert, require := assert.New(t), require.New(t) - repo, err := NewRepository(ctx, rw, rw, kms) - assert.NoError(err) - require.NotNil(repo) - - for i := 0; i < numJobs; i++ { - testJob(t, conn, fmt.Sprintf("%v-%d", tt.name, i), "description", wrapper) - } - - got, err := repo.RunJobs(ctx, server.PrivateId, tt.opts...) - require.NoError(err) - assert.Len(got, tt.wantLen) - - // Clean up jobs for next run - rows, err := rw.Query(ctx, "delete from job", nil) - require.NoError(err) - _ = rows.Close() - }) - } -} - func TestRepository_RunJobsOrder(t *testing.T) { t.Parallel() ctx := context.Background() @@ -210,41 +142,18 @@ func TestRepository_RunJobsOrder(t *testing.T) { runs, err := repo.RunJobs(ctx, server.PrivateId) require.NoError(err) - require.Len(runs, 1) - run := runs[0] - assert.Equal(run.JobName, firstJob.Name) - assert.Equal(run.JobPluginId, firstJob.PluginId) - - // End first job with time between last and middle - _, err = repo.CompleteRun(ctx, run.PrivateId, -6*time.Hour, 0, 0, 0) - require.NoError(err) - - runs, err = repo.RunJobs(ctx, server.PrivateId) - require.NoError(err) - require.Len(runs, 1) - run = runs[0] - assert.Equal(run.JobName, middleJob.Name) - assert.Equal(run.JobPluginId, middleJob.PluginId) + require.Len(runs, 3) - // firstJob should be up again, as it is scheduled before lastJob - runs, err = repo.RunJobs(ctx, server.PrivateId) - require.NoError(err) - require.Len(runs, 1) - run = runs[0] - assert.Equal(run.JobName, firstJob.Name) - assert.Equal(run.JobPluginId, firstJob.PluginId) + // We should see the job runs ordered by scheduled time. + // firstJob > middleJob > lastJob + assert.Equal(firstJob.Name, runs[0].JobName) + assert.Equal(firstJob.PluginId, runs[0].JobPluginId) - runs, err = repo.RunJobs(ctx, server.PrivateId) - require.NoError(err) - require.Len(runs, 1) - run = runs[0] - assert.Equal(run.JobName, lastJob.Name) - assert.Equal(run.JobPluginId, lastJob.PluginId) + assert.Equal(middleJob.Name, runs[1].JobName) + assert.Equal(middleJob.PluginId, runs[1].JobPluginId) - // All jobs are running no work should be returned - runs, err = repo.RunJobs(ctx, server.PrivateId) - require.NoError(err) - require.Len(runs, 0) + assert.Equal(lastJob.Name, runs[2].JobName) + assert.Equal(lastJob.PluginId, runs[2].JobPluginId) } func TestRepository_UpdateProgress(t *testing.T) { @@ -306,20 +215,6 @@ func TestRepository_UpdateProgress(t *testing.T) { wantErrCode: errors.InvalidJobRunState, wantErrMsg: "job.(Repository).UpdateProgress: db.DoTx: job.(Repository).UpdateProgress: job run was in a final run state: failed: integrity violation: error #115", }, - { - name: "status-already-completed", - orig: &Run{ - JobRun: &store.JobRun{ - JobName: job.Name, - JobPluginId: job.PluginId, - ControllerId: server.PrivateId, - Status: Completed.string(), - }, - }, - wantErr: true, - wantErrCode: errors.InvalidJobRunState, - wantErrMsg: "job.(Repository).UpdateProgress: db.DoTx: job.(Repository).UpdateProgress: job run was in a final run state: completed: integrity violation: error #115", - }, { name: "valid-no-changes", orig: &Run{ @@ -479,14 +374,10 @@ func TestRepository_CompleteRun(t *testing.T) { server := testController(t, conn, wrapper) job := testJob(t, conn, "name", "description", wrapper) - type args struct { - completed, total, retries int - } tests := []struct { name string orig *Run nextRunIn time.Duration - args args wantErr bool wantErrCode errors.Code wantErrMsg string @@ -525,20 +416,6 @@ func TestRepository_CompleteRun(t *testing.T) { wantErrCode: errors.InvalidJobRunState, wantErrMsg: "job.(Repository).CompleteRun: db.DoTx: job.(Repository).CompleteRun: job run was in a final run state: failed: integrity violation: error #115", }, - { - name: "status-already-completed", - orig: &Run{ - JobRun: &store.JobRun{ - JobName: job.Name, - JobPluginId: job.PluginId, - ControllerId: server.PrivateId, - Status: Completed.string(), - }, - }, - wantErr: true, - wantErrCode: errors.InvalidJobRunState, - wantErrMsg: "job.(Repository).CompleteRun: db.DoTx: job.(Repository).CompleteRun: job run was in a final run state: completed: integrity violation: error #115", - }, { name: "valid", orig: &Run{ @@ -561,7 +438,6 @@ func TestRepository_CompleteRun(t *testing.T) { Status: Running.string(), }, }, - args: args{completed: 10, total: 20, retries: 1}, }, } @@ -579,9 +455,13 @@ func TestRepository_CompleteRun(t *testing.T) { require.NoError(err) assert.Empty(tt.orig.EndTime) privateId = tt.orig.PrivateId + + r, err := repo.LookupRun(ctx, privateId) + require.NoError(err) + require.NotNil(r) } - got, err := repo.CompleteRun(ctx, privateId, tt.nextRunIn, tt.args.completed, tt.args.total, tt.args.retries) + err = repo.CompleteRun(ctx, privateId, tt.nextRunIn) if tt.wantErr { require.Error(err) assert.Truef(errors.Match(errors.T(tt.wantErrCode), err), "Unexpected error %s", err) @@ -596,27 +476,21 @@ func TestRepository_CompleteRun(t *testing.T) { return } assert.NoError(err) - require.NotNil(got) - assert.NotEmpty(got.EndTime) - assert.Equal(Completed.string(), got.Status) - assert.Equal(tt.args.completed, int(got.CompletedCount)) - assert.Equal(tt.args.total, int(got.TotalCount)) - assert.Equal(tt.args.retries, int(got.RetriesCount)) updatedJob, err := repo.LookupJob(ctx, tt.orig.JobName) assert.NoError(err) require.NotNil(updatedJob) - // The previous run is ended before the next run is scheduled, therefore the previous - // run end time incremented by the nextRunIn duration, should be less than or equal to the - // NextScheduledRun time that is persisted in the repo. - nextRunAt := updatedJob.NextScheduledRun.AsTime() - previousRunEnd := got.EndTime.AsTime() - assert.Equal(nextRunAt.Round(time.Minute), previousRunEnd.Add(tt.nextRunIn).Round(time.Minute)) + // The next run is expected to be ~ now + whatever duration was + // passed into CompleteRun. + expectedNextRunIn := time.Now().Add(tt.nextRunIn).Round(time.Minute).UTC() + actualNextRunIn := updatedJob.NextScheduledRun.AsTime().Round(time.Minute).UTC() + require.EqualValues(expectedNextRunIn, actualNextRunIn) - // Delete job run so it does not clash with future runs - _, err = repo.deleteRun(ctx, privateId) - assert.NoError(err) + // If we can't find the run, it means it was complete. + r, err := repo.LookupRun(ctx, privateId) + require.NoError(err) + require.Nil(r) }) } @@ -626,9 +500,8 @@ func TestRepository_CompleteRun(t *testing.T) { require.NoError(err) require.NotNil(repo) - got, err := repo.CompleteRun(ctx, "fake-run-id", time.Hour, 0, 0, 0) + err = repo.CompleteRun(ctx, "fake-run-id", time.Hour) require.Error(err) - require.Nil(got) assert.Truef(errors.Match(errors.T(errors.RecordNotFound), err), "Unexpected error %s", err) assert.Equal("job.(Repository).CompleteRun: db.DoTx: job.(Repository).CompleteRun: job run \"fake-run-id\" does not exist: db.LookupById: record not found, search issue: error #1100: dbw.LookupById: record not found", err.Error()) }) @@ -691,20 +564,6 @@ func TestRepository_FailRun(t *testing.T) { wantErrCode: errors.InvalidJobRunState, wantErrMsg: "job.(Repository).FailRun: db.DoTx: job.(Repository).FailRun: job run was in a final run state: failed: integrity violation: error #115", }, - { - name: "status-already-completed", - orig: &Run{ - JobRun: &store.JobRun{ - JobName: job.Name, - JobPluginId: job.PluginId, - ControllerId: server.PrivateId, - Status: Completed.string(), - }, - }, - wantErr: true, - wantErrCode: errors.InvalidJobRunState, - wantErrMsg: "job.(Repository).FailRun: db.DoTx: job.(Repository).FailRun: job run was in a final run state: completed: integrity violation: error #115", - }, { name: "valid", orig: &Run{ @@ -912,7 +771,6 @@ func TestRepository_InterruptServerRuns(t *testing.T) { runs: []args{ { ControllerId: server1.PrivateId, - opts: []Option{WithRunJobsLimit(3)}, expectedJobs: []*Job{job1, job2, job3}, }, }, @@ -927,7 +785,6 @@ func TestRepository_InterruptServerRuns(t *testing.T) { runs: []args{ { ControllerId: server2.PrivateId, - opts: []Option{WithRunJobsLimit(3)}, expectedJobs: []*Job{job1, job2, job3}, }, }, @@ -973,124 +830,6 @@ func TestRepository_InterruptServerRuns(t *testing.T) { }, }, }, - { - name: "multiple-servers-interrupt-all", - runs: []args{ - { - ControllerId: server1.PrivateId, - opts: []Option{WithRunJobsLimit(1)}, - expectedJobs: []*Job{job1}, - }, - { - ControllerId: server2.PrivateId, - opts: []Option{WithRunJobsLimit(1)}, - expectedJobs: []*Job{job2}, - }, - { - ControllerId: server3.PrivateId, - opts: []Option{WithRunJobsLimit(1)}, - expectedJobs: []*Job{job3}, - }, - }, - interrupts: []args{ - { - expectedJobs: []*Job{job1, job2, job3}, - }, - }, - }, - { - name: "multiple-servers-with-server-id", - runs: []args{ - { - ControllerId: server1.PrivateId, - opts: []Option{WithRunJobsLimit(1)}, - expectedJobs: []*Job{job1}, - }, - { - ControllerId: server2.PrivateId, - opts: []Option{WithRunJobsLimit(1)}, - expectedJobs: []*Job{job2}, - }, - { - ControllerId: server3.PrivateId, - opts: []Option{WithRunJobsLimit(1)}, - expectedJobs: []*Job{job3}, - }, - }, - interrupts: []args{ - { - opts: []Option{WithControllerId(server1.PrivateId)}, - expectedJobs: []*Job{job1}, - }, - { - opts: []Option{WithControllerId(server2.PrivateId)}, - expectedJobs: []*Job{job2}, - }, - { - opts: []Option{WithControllerId(server3.PrivateId)}, - expectedJobs: []*Job{job3}, - }, - }, - }, - { - name: "multiple-servers-distributed-runs", - runs: []args{ - { - ControllerId: server1.PrivateId, - opts: []Option{WithRunJobsLimit(2)}, - expectedJobs: []*Job{job1, job2}, - }, - { - ControllerId: server2.PrivateId, - opts: []Option{WithRunJobsLimit(1)}, - expectedJobs: []*Job{job3}, - }, - { - ControllerId: server3.PrivateId, - opts: []Option{WithRunJobsLimit(1)}, - expectedJobs: []*Job{}, - }, - }, - interrupts: []args{ - { - opts: []Option{WithControllerId(server1.PrivateId)}, - expectedJobs: []*Job{job1, job2}, - }, - { - opts: []Option{WithControllerId(server2.PrivateId)}, - expectedJobs: []*Job{job3}, - }, - { - opts: []Option{WithControllerId(server3.PrivateId)}, - expectedJobs: []*Job{}, - }, - }, - }, - { - name: "multiple-servers-distributed-runs-interrupt-all", - runs: []args{ - { - ControllerId: server1.PrivateId, - opts: []Option{WithRunJobsLimit(2)}, - expectedJobs: []*Job{job1, job2}, - }, - { - ControllerId: server2.PrivateId, - opts: []Option{WithRunJobsLimit(1)}, - expectedJobs: []*Job{job3}, - }, - { - ControllerId: server3.PrivateId, - opts: []Option{WithRunJobsLimit(1)}, - expectedJobs: []*Job{}, - }, - }, - interrupts: []args{ - { - expectedJobs: []*Job{job1, job2, job3}, - }, - }, - }, } for _, tt := range tests { tt := tt @@ -1099,7 +838,7 @@ func TestRepository_InterruptServerRuns(t *testing.T) { require.NoError(err) for _, r := range tt.runs { - runs, err := repo.RunJobs(ctx, r.ControllerId, r.opts...) + runs, err := repo.RunJobs(ctx, r.ControllerId) require.NoError(err) assert.Len(runs, len(r.expectedJobs)) sort.Slice(runs, func(i, j int) bool { return runs[i].JobName < runs[j].JobName }) diff --git a/internal/scheduler/job/run.go b/internal/scheduler/job/run.go index 2128af7b48..0229c18fe0 100644 --- a/internal/scheduler/job/run.go +++ b/internal/scheduler/job/run.go @@ -8,7 +8,7 @@ import ( "google.golang.org/protobuf/proto" ) -// Run represents an instance of a job that is either actively running or has already completed. +// Run represents an instance of a job that is either actively running or has failed in some way. type Run struct { *store.JobRun tableName string `gorm:"-"` diff --git a/internal/scheduler/job/status.go b/internal/scheduler/job/status.go index 5db19ff0d0..4fe5df0077 100644 --- a/internal/scheduler/job/status.go +++ b/internal/scheduler/job/status.go @@ -9,9 +9,6 @@ const ( // Running represents that the job run is actively running on a server Running Status = "running" - // Completed represents that the job run has successfully finished - Completed Status = "completed" - // Failed represent that the job run had an error during execution Failed Status = "failed" diff --git a/internal/scheduler/job/store/job.pb.go b/internal/scheduler/job/store/job.pb.go index c6d4020a84..f64fae2c2c 100644 --- a/internal/scheduler/job/store/job.pb.go +++ b/internal/scheduler/job/store/job.pb.go @@ -138,7 +138,7 @@ type JobRun struct { // a job has retried work. // @inject_tag: `gorm:"default:0"` RetriesCount uint32 `protobuf:"varint,12,opt,name=retries_count,json=retriesCount,proto3" json:"retries_count,omitempty" gorm:"default:0"` - // status of the job run (running, completed, failed or interrupted). + // status of the job run (running, failed or interrupted). // @inject_tag: `gorm:"not_null"` Status string `protobuf:"bytes,10,opt,name=status,proto3" json:"status,omitempty" gorm:"not_null"` // The controller_id of the controller running the job and must be set. diff --git a/internal/scheduler/options.go b/internal/scheduler/options.go index c9a3c1d424..056ced1861 100644 --- a/internal/scheduler/options.go +++ b/internal/scheduler/options.go @@ -6,7 +6,6 @@ package scheduler import "time" const ( - defaultRunJobsLimit = 1 defaultRunJobsInterval = time.Minute defaultMonitorInterval = 30 * time.Second defaultInterruptThreshold = 5 * time.Minute @@ -27,7 +26,6 @@ type Option func(*options) // options = how options are represented type options struct { withNextRunIn time.Duration - withRunJobsLimit int withRunJobInterval time.Duration withMonitorInterval time.Duration withInterruptThreshold time.Duration @@ -36,26 +34,12 @@ type options struct { func getDefaultOptions() options { return options{ - withRunJobsLimit: defaultRunJobsLimit, withRunJobInterval: defaultRunJobsInterval, withMonitorInterval: defaultMonitorInterval, withInterruptThreshold: defaultInterruptThreshold, } } -// WithRunJobsLimit provides an option to provide the number of jobs that will be requested -// by the scheduler when querying for jobs to run. -// If WithRunJobsLimit == 0, then default run jobs limit is used. -// If WithRunJobsLimit < 0, then no limit is used. -func WithRunJobsLimit(l int) Option { - return func(o *options) { - o.withRunJobsLimit = l - if o.withRunJobsLimit == 0 { - o.withRunJobsLimit = defaultRunJobsLimit - } - } -} - // WithRunJobsInterval provides an option to provide the interval at which the scheduler // will query the repository for jobs to run. // If WithRunJobsInterval == 0, then default interval is used. diff --git a/internal/scheduler/options_test.go b/internal/scheduler/options_test.go index d5e872a67c..5b3fc4733a 100644 --- a/internal/scheduler/options_test.go +++ b/internal/scheduler/options_test.go @@ -13,20 +13,6 @@ import ( // Test_GetOpts provides unit tests for GetOpts and all the options func Test_GetOpts(t *testing.T) { t.Parallel() - t.Run("WithRunJobsLimit", func(t *testing.T) { - assert := assert.New(t) - opts := getOpts(WithRunJobsLimit(10)) - testOpts := getDefaultOptions() - assert.NotEqual(opts, testOpts) - testOpts.withRunJobsLimit = 10 - assert.Equal(opts, testOpts) - }) - t.Run("WithZeroRunJobsLimit", func(t *testing.T) { - assert := assert.New(t) - opts := getOpts(WithRunJobsLimit(0)) - testOpts := getDefaultOptions() - assert.Equal(opts, testOpts) - }) t.Run("WithRunJobsInterval", func(t *testing.T) { assert := assert.New(t) opts := getOpts(WithRunJobsInterval(time.Hour)) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index b71f77b93d..66592fe7b8 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -32,7 +32,6 @@ type Scheduler struct { runningJobs *sync.Map started ua.Bool - runJobsLimit int runJobsInterval time.Duration monitorInterval time.Duration interruptThreshold time.Duration @@ -45,7 +44,7 @@ type Scheduler struct { // // • jobRepoFn must be provided and is a function that returns the job repository // -// WithRunJobsLimit, WithRunJobsInterval, WithMonitorInterval and WithInterruptThreshold are +// WithRunJobsInterval, WithMonitorInterval and WithInterruptThreshold are // the only valid options. func New(ctx context.Context, serverId string, jobRepoFn jobRepoFactory, opt ...Option) (*Scheduler, error) { const op = "scheduler.New" @@ -62,7 +61,6 @@ func New(ctx context.Context, serverId string, jobRepoFn jobRepoFactory, opt ... jobRepoFn: jobRepoFn, registeredJobs: new(sync.Map), runningJobs: new(sync.Map), - runJobsLimit: opts.withRunJobsLimit, runJobsInterval: opts.withRunJobInterval, monitorInterval: opts.withMonitorInterval, interruptThreshold: opts.withInterruptThreshold, @@ -190,7 +188,7 @@ func (s *Scheduler) start(ctx context.Context) { event.WriteSysEvent(ctx, op, "scheduling loop running", "server id", s.serverId, "run interval", s.runJobsInterval.String(), - "run limit", s.runJobsLimit) + ) timer := time.NewTimer(0) var wg sync.WaitGroup for { @@ -218,7 +216,7 @@ func (s *Scheduler) schedule(ctx context.Context, wg *sync.WaitGroup) { return } - runs, err := repo.RunJobs(ctx, s.serverId, job.WithRunJobsLimit(s.runJobsLimit)) + runs, err := repo.RunJobs(ctx, s.serverId) if err != nil { event.WriteError(ctx, op, err, event.WithInfoMsg("error getting jobs to run from repo")) return @@ -262,8 +260,6 @@ func (s *Scheduler) runJob(ctx context.Context, wg *sync.WaitGroup, r *job.Run) defer wg.Done() runErr := j.Run(jobContext, s.interruptThreshold) - // Get final status report to update run progress with - status := j.Status() var updateErr error switch { case ctx.Err() != nil: @@ -273,9 +269,11 @@ func (s *Scheduler) runJob(ctx context.Context, wg *sync.WaitGroup, r *job.Run) if inner != nil { event.WriteError(ctx, op, inner, event.WithInfoMsg("error getting next run time", "name", j.Name())) } - _, updateErr = repo.CompleteRun(ctx, r.PrivateId, nextRun, status.Completed, status.Total, status.Retries) + updateErr = repo.CompleteRun(ctx, r.PrivateId, nextRun) default: event.WriteError(ctx, op, runErr, event.WithInfoMsg("job run failed", "run id", r.PrivateId, "name", j.Name())) + + status := j.Status() // Get final status report to update run progress with _, updateErr = repo.FailRun(ctx, r.PrivateId, status.Completed, status.Total, status.Retries) } diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index c9b22bcbcb..b96a55cdb3 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -34,7 +34,6 @@ func TestScheduler_New(t *testing.T) { type args struct { serverId string jobRepo jobRepoFactory - runLimit int runInterval time.Duration monitorInterval time.Duration } @@ -70,7 +69,6 @@ func TestScheduler_New(t *testing.T) { }, want: args{ serverId: "test-server", - runLimit: defaultRunJobsLimit, runInterval: defaultRunJobsInterval, monitorInterval: defaultMonitorInterval, }, @@ -86,7 +84,6 @@ func TestScheduler_New(t *testing.T) { }, want: args{ serverId: "test-server", - runLimit: defaultRunJobsLimit, monitorInterval: defaultMonitorInterval, runInterval: time.Hour, }, @@ -97,12 +94,9 @@ func TestScheduler_New(t *testing.T) { serverId: "test-server", jobRepo: jobRepoFn, }, - opts: []Option{ - WithRunJobsLimit(-1), - }, + opts: []Option{}, want: args{ serverId: "test-server", - runLimit: -1, runInterval: defaultRunJobsInterval, monitorInterval: defaultMonitorInterval, }, @@ -113,12 +107,9 @@ func TestScheduler_New(t *testing.T) { serverId: "test-server", jobRepo: jobRepoFn, }, - opts: []Option{ - WithRunJobsLimit(20), - }, + opts: []Option{}, want: args{ serverId: "test-server", - runLimit: 20, runInterval: defaultRunJobsInterval, monitorInterval: defaultMonitorInterval, }, @@ -134,7 +125,6 @@ func TestScheduler_New(t *testing.T) { }, want: args{ serverId: "test-server", - runLimit: defaultRunJobsLimit, runInterval: defaultRunJobsInterval, monitorInterval: time.Hour, }, @@ -147,12 +137,10 @@ func TestScheduler_New(t *testing.T) { }, opts: []Option{ WithRunJobsInterval(time.Hour), - WithRunJobsLimit(20), WithMonitorInterval(2 * time.Hour), }, want: args{ serverId: "test-server", - runLimit: 20, runInterval: time.Hour, monitorInterval: 2 * time.Hour, }, @@ -174,7 +162,6 @@ func TestScheduler_New(t *testing.T) { require.NoError(err) assert.Equal(tt.want.serverId, got.serverId) - assert.Equal(tt.want.runLimit, got.runJobsLimit) assert.Equal(tt.want.runInterval, got.runJobsInterval) assert.Equal(tt.want.monitorInterval, got.monitorInterval) assert.NotNil(got.jobRepoFn)