Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job Scheduler: Performance Improvements #5204

Merged
merged 2 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions internal/daemon/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/hashicorp/boundary/internal/ratelimit"
"github.com/hashicorp/boundary/internal/recording"
"github.com/hashicorp/boundary/internal/scheduler"
"github.com/hashicorp/boundary/internal/scheduler/cleaner"
"github.com/hashicorp/boundary/internal/scheduler/job"
"github.com/hashicorp/boundary/internal/server"
serversjob "github.com/hashicorp/boundary/internal/server/job"
Expand Down Expand Up @@ -637,9 +636,6 @@ func (c *Controller) registerJobs() error {
if err := kmsjob.RegisterJobs(c.baseContext, c.scheduler, c.kms); err != nil {
return err
}
if err := cleaner.RegisterJob(c.baseContext, c.scheduler, rw); err != nil {
return err
}
if err := snapshot.RegisterJob(c.baseContext, c.scheduler, rw, rw); err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions internal/db/schema/migrations/oss/postgres/7/03_job.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ begin;
create trigger immutable_columns before update on job
for each row execute procedure immutable_columns('plugin_id', 'name');

-- updated in 93/01_job_run_clean.up.sql
create table job_run_status_enm (
name text not null primary key
constraint only_predefined_job_status_allowed
Expand All @@ -28,6 +29,7 @@ begin;
comment on table job_run_status_enm is
'job_run_status_enm is an enumeration table where each row contains a valid job run state.';

-- updated in 93/01_job_run_clean.up.sql
insert into job_run_status_enm (name)
values
('running'),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- Copyright (c) HashiCorp, Inc.
-- SPDX-License-Identifier: BUSL-1.1

-- Boundary's design on removing entries from job_run has changed from having a
-- job that periodically cleans the table to a design where the scheduler
-- handles this by itself if the job is successful. It is possible that some
-- entries are left in the table with this change (eg: Boundary is stopped after
-- some jobs run but before the cleaner job runs).
--
-- These entries would forever be stored, so this migration cleans them to
-- ensure no dangling rows are left behind.
--
-- It also updates the valid statues enum to reflect the ones in use.

begin;
hugoghx marked this conversation as resolved.
Show resolved Hide resolved
delete from job_run where status = 'completed';

delete from job_run where job_name = 'job_run_cleaner';
delete from job where name = 'job_run_cleaner';

comment on index job_run_status_ix is
'the job_run_status_ix indexes the commonly-used status field';

comment on table job_run is
'job_run is a table where each row represents an instance of a job run that is either actively running or has failed in some way.';

-- Since we don't set completed anymore, but rather remove the job_run entry,
-- remove 'completed' from the valid statuses.
-- updates 7/03_job.up.sql.
delete from job_run_status_enm where name = 'completed';

alter table job_run_status_enm
drop constraint only_predefined_job_status_allowed,
add constraint only_predefined_job_status_allowed
check(name in ('running', 'failed', 'interrupted'));

commit;
2 changes: 1 addition & 1 deletion internal/proto/controller/storage/job/store/v1/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ message JobRun {
// @inject_tag: `gorm:"default:0"`
uint32 retries_count = 12;

// status of the job run (running, completed, failed or interrupted).
// status of the job run (running, failed or interrupted).
// @inject_tag: `gorm:"not_null"`
string status = 10;

Expand Down
58 changes: 39 additions & 19 deletions internal/scheduler/additional_verification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func TestSchedulerFinalStatusUpdate(t *testing.T) {
repo, err := job.NewRepository(ctx, rw, rw, kmsCache)
require.NoError(err)

run := waitForRunStatus(t, repo, runId, string(job.Failed))
run := waitForRunStatus(t, repo, runId, job.Failed)
assert.Equal(uint32(10), run.TotalCount)
assert.Equal(uint32(10), run.CompletedCount)

Expand All @@ -502,17 +502,9 @@ func TestSchedulerFinalStatusUpdate(t *testing.T) {
runId = runJob.(*runningJob).runId

// Complete job without error so CompleteRun is called
completeFn := waitForRunComplete(t, sched, repo, runId, tj.name)
jobErr <- nil

// Report status
jobStatus <- JobStatus{Total: 20, Completed: 20}

repo, err = job.NewRepository(ctx, rw, rw, kmsCache)
require.NoError(err)

run = waitForRunStatus(t, repo, runId, string(job.Completed))
assert.Equal(uint32(20), run.TotalCount)
assert.Equal(uint32(20), run.CompletedCount)
completeFn()

baseCnl()
close(testDone)
Expand Down Expand Up @@ -570,12 +562,13 @@ func TestSchedulerRunNow(t *testing.T) {
require.True(ok)
runId := runJob.(*runningJob).runId

// Complete job
jobCh <- struct{}{}

repo, err := job.NewRepository(ctx, rw, rw, kmsCache)
require.NoError(err)
waitForRunStatus(t, repo, runId, string(job.Completed))

// Complete job
completeFn := waitForRunComplete(t, sched, repo, runId, tj.name)
jobCh <- struct{}{}
completeFn()

// Update job to run immediately once scheduling loop is called
err = sched.UpdateJobNextRunInAtLeast(context.Background(), tj.name, 0)
Expand All @@ -600,9 +593,9 @@ func TestSchedulerRunNow(t *testing.T) {
runId = runJob.(*runningJob).runId

// Complete job
completeFn = waitForRunComplete(t, sched, repo, runId, tj.name)
jobCh <- struct{}{}

waitForRunStatus(t, repo, runId, string(job.Completed))
completeFn()

// Update job to run again with RunNow option
err = sched.UpdateJobNextRunInAtLeast(context.Background(), tj.name, 0, WithRunNow(true))
Expand All @@ -620,7 +613,34 @@ func TestSchedulerRunNow(t *testing.T) {
close(jobCh)
}

func waitForRunStatus(t *testing.T, repo *job.Repository, runId, status string) *job.Run {
func waitForRunComplete(t *testing.T, sched *Scheduler, repo *job.Repository, runId, jobName string) func() {
r, err := repo.LookupRun(context.Background(), runId)
require.NoError(t, err)
require.EqualValues(t, job.Running, r.Status)

return func() {
timeout := time.NewTimer(5 * time.Second)
for {
select {
case <-timeout.C:
t.Fatal(fmt.Errorf("timed out waiting for job run %q to be completed", runId))
case <-time.After(100 * time.Millisecond):
}

// A run is complete when we don't find it in the scheduler's
// running jobs list and also not in the job_run table.
_, ok := sched.runningJobs.Load(jobName)
if !ok {
r, err = repo.LookupRun(context.Background(), runId)
require.Nil(t, r)
require.Nil(t, err)
break
}
}
}
}

func waitForRunStatus(t *testing.T, repo *job.Repository, runId string, status job.Status) *job.Run {
t.Helper()
var run *job.Run

Expand All @@ -636,7 +656,7 @@ func waitForRunStatus(t *testing.T, repo *job.Repository, runId, status string)
var err error
run, err = repo.LookupRun(context.Background(), runId)
require.NoError(t, err)
if run.Status == status {
if run.Status == string(status) {
break
}
}
Expand Down
30 changes: 0 additions & 30 deletions internal/scheduler/cleaner/cleaner.go

This file was deleted.

57 changes: 0 additions & 57 deletions internal/scheduler/cleaner/cleaner_job.go

This file was deleted.

64 changes: 0 additions & 64 deletions internal/scheduler/cleaner/cleaner_test.go

This file was deleted.

3 changes: 1 addition & 2 deletions internal/scheduler/job/additional_verification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ func TestJobWorkflow(t *testing.T) {
require.NoError(err)
assert.Nil(newRuns)

run, err = repo.CompleteRun(ctx, run.PrivateId, time.Hour, 0, 0, 0)
err = repo.CompleteRun(ctx, run.PrivateId, time.Hour)
require.NoError(err)
assert.Equal(Completed.string(), run.Status)

job, err = repo.LookupJob(ctx, job.Name)
require.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/job/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@
// nextJobRun = time.Now().Add(time.Hour)
//
// repo, _ = job.NewRepository(db, db, wrapper)
// run, _ = repo.CompleteRun(ctx, run.PrivateId, job.Completed, nextJobRun)
// _ = repo.CompleteRun(ctx, run.PrivateId nextJobRun)
package job
9 changes: 1 addition & 8 deletions internal/scheduler/job/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,7 @@ const updateProgressQuery = `
`

const completeRunQuery = `
update
job_run
set
completed_count = ?,
total_count = ?,
retries_count = ?,
status = 'completed',
end_time = current_timestamp
delete from job_run
where
private_id = ?
and status = 'running'
Expand Down
Loading
Loading