Skip to content

Commit

Permalink
scheduler: revise job starting code
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Gutekanst <stephen@hexops.com>
  • Loading branch information
slimsag committed Jun 2, 2024
1 parent 871aeed commit 2c45dfd
Showing 1 changed file with 22 additions and 36 deletions.
58 changes: 22 additions & 36 deletions internal/wrench/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (b *Bot) schedulerWork(ctx context.Context) error {
}

for _, schedule := range b.schedule {
if _, err := b.scheduleJob(ctx, schedule, runners, false); err != nil {
if _, err := b.ensureJobScheduled(ctx, schedule, runners, false); err != nil {
b.idLogf(schedulerLogID, "%v", err)
continue
}
Expand All @@ -203,10 +203,10 @@ func (b *Bot) scheduleJobNow(ctx context.Context, scheduledJobID api.JobID, runn
if found == nil {
return "", errors.New("scheduled job not found")
}
return b.scheduleJob(ctx, *found, runners, true)
return b.ensureJobScheduled(ctx, *found, runners, true)
}

func (b *Bot) scheduleJob(ctx context.Context, schedule ScheduledJob, runners []api.Runner, force bool) (api.JobID, error) {
func (b *Bot) ensureJobScheduled(ctx context.Context, schedule ScheduledJob, runners []api.Runner, force bool) (api.JobID, error) {
if schedule.Job.TargetRunnerID == "*" {
for _, runner := range runners {
schedule.Job.TargetRunnerID = runner.ID
Expand All @@ -223,44 +223,30 @@ func (b *Bot) scheduleJob(ctx context.Context, schedule ScheduledJob, runners []
return "", errors.Wrap(err, "failed to query last job")
}

startable := lastJob == nil || (lastJob.State != api.JobStateReady &&
lastJob.State != api.JobStateStarting &&
lastJob.State != api.JobStateRunning &&
lastJob.State != api.JobStateError)
// We can start the existing job if it exists and it is ready to start
lastJobDoesNotExist := lastJob == nil
lastJobErrored := lastJob != nil && lastJob.State == api.JobStateError
shouldStart := startable || lastJobErrored
if startable && schedule.Every == 0 {
shouldStart = false // Job can be started, but is not scheduled to start automatically.
}
lastJobSucceeded := lastJob != nil && lastJob.State == api.JobStateSuccess
lastJobDone := lastJob != nil && (lastJobErrored || lastJobSucceeded)

if force {
if startable {
lastJob.ScheduledStart = time.Time{}
if err := b.store.UpsertRunnerJob(ctx, *lastJob); err != nil {
return "", errors.Wrap(err, "failed to update job")
}
return lastJob.ID, nil
} else {
schedule.Job.ScheduledStart = time.Time{}
jobID, err := b.store.NewRunnerJob(ctx, schedule.Job)
if err != nil {
return "", errors.Wrap(err, "failed to create job")
}
b.idLogf(schedulerLogID, "job created: %v", schedule.Job.Title)
return jobID, nil
}
}
if !shouldStart {
jobSchedulesAutomatically := schedule.Every != 0 // If zero, job can be started manually only
shouldStartNow := force || (jobSchedulesAutomatically && (lastJobDoesNotExist || lastJobDone))

if !shouldStartNow {
// There is already a job for this running
return "", nil
}

// Job is not running/scheduled, and is set to Always run OR is a ScheduledStart.
if !schedule.Always {
if lastJob == nil || lastJobErrored {
schedule.Job.ScheduledStart = time.Now().Add(30 * time.Second)
} else {
schedule.Job.ScheduledStart = time.Now().Add(schedule.Every)
}
// Create a new job
if schedule.Always || force {
// Job should be running right now, so ScheduledStart should be zero.
schedule.Job.ScheduledStart = time.Time{}
} else if lastJob == nil || lastJobErrored {
// If there is no last job, or the last job errored, schedule it to run again soon.
schedule.Job.ScheduledStart = time.Now().Add(30 * time.Second)
} else {
// If the job ran successfully, schedule it to run again at the desired interval.
schedule.Job.ScheduledStart = time.Now().Add(schedule.Every)
}

jobID, err := b.store.NewRunnerJob(ctx, schedule.Job)
Expand Down

0 comments on commit 2c45dfd

Please sign in to comment.