Skip to content

Commit

Permalink
Show a warning if job or action Timeout is not a golang Duration. Fix…
Browse files Browse the repository at this point in the history
… cleaning job for not killing running tasks without reason!
  • Loading branch information
cdujeu committed May 18, 2023
1 parent 6f20a06 commit bc8f565
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 34 deletions.
12 changes: 6 additions & 6 deletions scheduler/actions/scheduler/stuck-jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func (c *PruneJobsAction) GetParametersForm() *forms.Form {
&forms.FormField{
Name: "maxRunningTime",
Type: forms.ParamInteger,
Label: "Maximum running time per task",
Description: "Clean tasks that have been running for more than ... (minutes)",
Default: 60 * 10,
Label: "Maximum running time per task (in seconds)",
Description: "Clean tasks that have been running for more than ... (seconds)",
Default: 60 * 60,
},
},
},
Expand All @@ -98,7 +98,7 @@ func (c *PruneJobsAction) Init(job *jobs.Job, action *jobs.Action) error {
if n, o := action.Parameters["maxRunningTime"]; o {
c.maxRunningTime = n
} else {
c.maxRunningTime = "600"
c.maxRunningTime = "3600"
}
return nil
}
Expand All @@ -111,8 +111,8 @@ func (c *PruneJobsAction) Run(ctx context.Context, channels *actions.RunnableCha
return input.WithError(e), e
}
maxRunningTime, e := jobs.EvaluateFieldInt(ctx, input, c.maxRunningTime)
if e != nil {
maxRunningTime = 600
if e != nil || maxRunningTime == 0 {
maxRunningTime = 3600
}

cli := jobs.NewJobServiceClient(grpc.GetClientConnFromCtx(c.GetRuntimeContext(), common.ServiceJobs))
Expand Down
76 changes: 51 additions & 25 deletions scheduler/jobs/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package grpc
import (
"context"
"fmt"
servicecontext "github.com/pydio/cells/v4/common/service/context"
"math"
"strings"
"sync"
Expand Down Expand Up @@ -544,6 +545,17 @@ func (j *JobsHandler) cleanStuckByStatus(ctx context.Context, serverStart bool,

tcli := proto.NewTaskServiceClient(grpc.GetClientConnFromCtx(ctx, common.ServiceTasks))
shouldRetry := false
var currentTaskID string
if mm, ok := metadata.FromContextRead(ctx); ok {
currentTaskID = mm[servicecontext.ContextMetaTaskUuid]
}
if !isRetry {
if len(duration) > 0 {
logger.Info("Clean tasks with status " + status.String() + " and check duration " + duration[0].String())
} else {
logger.Info("Clean tasks with status " + status.String())
}
}

var fixedTasks []*proto.Task
res, done, err := j.store.ListTasks("", status)
Expand All @@ -556,33 +568,54 @@ func (j *JobsHandler) cleanStuckByStatus(ctx context.Context, serverStart bool,

case <-done:
for _, t := range fixedTasks {
logger.Info("Setting task " + t.ID + " in error status as it was saved as running")
_ = j.store.PutTask(t)
}
return fixedTasks, shouldRetry, nil

case t := <-res:
// Check if Job should in fact be restarted
var autoRestartJob *proto.Job
if job, e := j.store.GetJob(t.JobID, proto.TaskStatus_Unknown); e == nil && job.AutoRestart {
autoRestartJob = job

// Ignore if current task is in fact this task !
if t.ID == currentTaskID {
logger.Info("Ignore my own task!")
break
}
if serverStart && autoRestartJob != nil {
logger.Warn("Should now restart " + autoRestartJob.Label)
// Mark as complete, not Error
t.Status = proto.TaskStatus_Interrupted
t.StatusMessage = "Task restarted"
t.EndTime = int32(time.Now().Unix())
fixedTasks = append(fixedTasks, t)

// Load corresponding job
job, e := j.store.GetJob(t.JobID, proto.TaskStatus_Unknown)
if e != nil {
break
}
if autoRestartJob != nil && !serverStart {
// do not kill running task !
logger.Info("Ignoring running task for " + autoRestartJob.Label + " as it is not stuck ")

// AutoRestart Jobs Case
if job.AutoRestart {
if serverStart {
logger.Warn("Should now restart " + job.Label)
// Mark as complete, not Error
t.Status = proto.TaskStatus_Interrupted
t.StatusMessage = "Task restarted"
t.EndTime = int32(time.Now().Unix())
fixedTasks = append(fixedTasks, t)
} else {
logger.Info("Ignoring running task for " + job.Label + " as it is not stuck ")
}
break
}

var runningTimeOvertime bool
if status == proto.TaskStatus_Running && !serverStart {
if len(duration) > 0 && t.StartTime > 0 && job.Timeout == "" {
check := duration[0]
startTime := time.Unix(int64(t.StartTime), 0)
runningTimeOvertime = time.Since(startTime) > check
}
if !runningTimeOvertime {
break
}
}

// Send a stop signal to kill the task and flag a retry is required
if !serverStart && !isRetry {
if !serverStart && !isRetry && runningTimeOvertime {
logger.Info("Kill task for job " + job.Label + " as it is running for more than " + duration[0].String() + " (no timeout set)")
_, e := tcli.Control(ctx, &proto.CtrlCommand{
Cmd: proto.Command_Stop,
JobId: t.JobID,
Expand All @@ -598,15 +631,8 @@ func (j *JobsHandler) cleanStuckByStatus(ctx context.Context, serverStart bool,
t.Status = proto.TaskStatus_Error
t.StatusMessage = "Task stuck"
t.EndTime = int32(time.Now().Unix())
if len(duration) > 0 && t.StartTime > 0 {
check := duration[0]
startTime := time.Unix(int64(t.StartTime), 0)
if time.Since(startTime) > check {
fixedTasks = append(fixedTasks, t)
}
} else {
fixedTasks = append(fixedTasks, t)
}
logger.Info("Setting task " + job.Label + "/" + t.ID + " in error status as it was saved as running")
fixedTasks = append(fixedTasks, t)
}
}

Expand Down
6 changes: 4 additions & 2 deletions scheduler/tasks/runnable.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ type Runnable struct {
selfCollector *collector
}

func itemTimeout(to string) (time.Duration, bool) {
func itemTimeout(ctx context.Context, to string) (time.Duration, bool) {
if to == "" {
return 0, false
}
if d, e := time.ParseDuration(to); e == nil {
return d, true
} else {
log.TasksLogger(ctx).Warn("Cannot parse timeout " + to + ", must be a golang duration.")
}
return 0, false
}
Expand Down Expand Up @@ -279,7 +281,7 @@ func (r *Runnable) RunAction(queue chan RunnerFunc) {
outputMessage = proto.Clone(r.Message).(*jobs.ActionMessage)
} else {
runCtx := r.Context
if d, o := itemTimeout(r.Action.Timeout); o {
if d, o := itemTimeout(r.Context, r.Action.Timeout); o {
var can context.CancelFunc
runCtx, can = context.WithTimeout(runCtx, d)
defer can()
Expand Down
2 changes: 1 addition & 1 deletion scheduler/tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewTaskFromEvent(runtime, ctx context.Context, job *jobs.Job, event interfa
func (t *Task) Queue(queue chan RunnerFunc) {
var ct context.Context
var can context.CancelFunc
if d, o := itemTimeout(t.Job.Timeout); o {
if d, o := itemTimeout(t.context, t.Job.Timeout); o {
ct, can = context.WithTimeout(t.context, d)
} else {
ct, can = context.WithCancel(t.context)
Expand Down

0 comments on commit bc8f565

Please sign in to comment.