diff --git a/scheduler/actions/scheduler/stuck-jobs.go b/scheduler/actions/scheduler/stuck-jobs.go index 906da5b93a..8a307549ca 100644 --- a/scheduler/actions/scheduler/stuck-jobs.go +++ b/scheduler/actions/scheduler/stuck-jobs.go @@ -74,9 +74,9 @@ func (c *PruneJobsAction) GetParametersForm() *forms.Form { &forms.FormField{ Name: "maxRunningTime", Type: forms.ParamInteger, - Label: "Maximum running time per task", - Description: "Clean tasks that have been running for more than ... (minutes)", - Default: 60 * 10, + Label: "Maximum running time per task (in seconds)", + Description: "Clean tasks that have been running for more than ... (seconds)", + Default: 60 * 60, }, }, }, @@ -98,7 +98,7 @@ func (c *PruneJobsAction) Init(job *jobs.Job, action *jobs.Action) error { if n, o := action.Parameters["maxRunningTime"]; o { c.maxRunningTime = n } else { - c.maxRunningTime = "600" + c.maxRunningTime = "3600" } return nil } @@ -111,8 +111,8 @@ func (c *PruneJobsAction) Run(ctx context.Context, channels *actions.RunnableCha return input.WithError(e), e } maxRunningTime, e := jobs.EvaluateFieldInt(ctx, input, c.maxRunningTime) - if e != nil { - maxRunningTime = 600 + if e != nil || maxRunningTime == 0 { + maxRunningTime = 3600 } cli := jobs.NewJobServiceClient(grpc.GetClientConnFromCtx(c.GetRuntimeContext(), common.ServiceJobs)) diff --git a/scheduler/jobs/grpc/handler.go b/scheduler/jobs/grpc/handler.go index f2ea7d87a3..aa9214fc77 100644 --- a/scheduler/jobs/grpc/handler.go +++ b/scheduler/jobs/grpc/handler.go @@ -23,6 +23,7 @@ package grpc import ( "context" "fmt" + servicecontext "github.com/pydio/cells/v4/common/service/context" "math" "strings" "sync" @@ -544,6 +545,17 @@ func (j *JobsHandler) cleanStuckByStatus(ctx context.Context, serverStart bool, tcli := proto.NewTaskServiceClient(grpc.GetClientConnFromCtx(ctx, common.ServiceTasks)) shouldRetry := false + var currentTaskID string + if mm, ok := metadata.FromContextRead(ctx); ok { + currentTaskID = mm[servicecontext.ContextMetaTaskUuid] + } + if !isRetry { + if len(duration) > 0 { + logger.Info("Clean tasks with status " + status.String() + " and check duration " + duration[0].String()) + } else { + logger.Info("Clean tasks with status " + status.String()) + } + } var fixedTasks []*proto.Task res, done, err := j.store.ListTasks("", status) @@ -556,33 +568,54 @@ func (j *JobsHandler) cleanStuckByStatus(ctx context.Context, serverStart bool, case <-done: for _, t := range fixedTasks { - logger.Info("Setting task " + t.ID + " in error status as it was saved as running") _ = j.store.PutTask(t) } return fixedTasks, shouldRetry, nil case t := <-res: - // Check if Job should in fact be restarted - var autoRestartJob *proto.Job - if job, e := j.store.GetJob(t.JobID, proto.TaskStatus_Unknown); e == nil && job.AutoRestart { - autoRestartJob = job + + // Ignore if current task is in fact this task ! + if t.ID == currentTaskID { + logger.Info("Ignore my own task!") + break } - if serverStart && autoRestartJob != nil { - logger.Warn("Should now restart " + autoRestartJob.Label) - // Mark as complete, not Error - t.Status = proto.TaskStatus_Interrupted - t.StatusMessage = "Task restarted" - t.EndTime = int32(time.Now().Unix()) - fixedTasks = append(fixedTasks, t) + + // Load corresponding job + job, e := j.store.GetJob(t.JobID, proto.TaskStatus_Unknown) + if e != nil { break } - if autoRestartJob != nil && !serverStart { - // do not kill running task ! - logger.Info("Ignoring running task for " + autoRestartJob.Label + " as it is not stuck ") + + // AutoRestart Jobs Case + if job.AutoRestart { + if serverStart { + logger.Warn("Should now restart " + job.Label) + // Mark as complete, not Error + t.Status = proto.TaskStatus_Interrupted + t.StatusMessage = "Task restarted" + t.EndTime = int32(time.Now().Unix()) + fixedTasks = append(fixedTasks, t) + } else { + logger.Info("Ignoring running task for " + job.Label + " as it is not stuck ") + } break } + + var runningTimeOvertime bool + if status == proto.TaskStatus_Running && !serverStart { + if len(duration) > 0 && t.StartTime > 0 && job.Timeout == "" { + check := duration[0] + startTime := time.Unix(int64(t.StartTime), 0) + runningTimeOvertime = time.Since(startTime) > check + } + if !runningTimeOvertime { + break + } + } + // Send a stop signal to kill the task and flag a retry is required - if !serverStart && !isRetry { + if !serverStart && !isRetry && runningTimeOvertime { + logger.Info("Kill task for job " + job.Label + " as it is running for more than " + duration[0].String() + " (no timeout set)") _, e := tcli.Control(ctx, &proto.CtrlCommand{ Cmd: proto.Command_Stop, JobId: t.JobID, @@ -598,15 +631,8 @@ func (j *JobsHandler) cleanStuckByStatus(ctx context.Context, serverStart bool, t.Status = proto.TaskStatus_Error t.StatusMessage = "Task stuck" t.EndTime = int32(time.Now().Unix()) - if len(duration) > 0 && t.StartTime > 0 { - check := duration[0] - startTime := time.Unix(int64(t.StartTime), 0) - if time.Since(startTime) > check { - fixedTasks = append(fixedTasks, t) - } - } else { - fixedTasks = append(fixedTasks, t) - } + logger.Info("Setting task " + job.Label + "/" + t.ID + " in error status as it was saved as running") + fixedTasks = append(fixedTasks, t) } } diff --git a/scheduler/tasks/runnable.go b/scheduler/tasks/runnable.go index e774f2ea25..96e8b403ec 100644 --- a/scheduler/tasks/runnable.go +++ b/scheduler/tasks/runnable.go @@ -51,12 +51,14 @@ type Runnable struct { selfCollector *collector } -func itemTimeout(to string) (time.Duration, bool) { +func itemTimeout(ctx context.Context, to string) (time.Duration, bool) { if to == "" { return 0, false } if d, e := time.ParseDuration(to); e == nil { return d, true + } else { + log.TasksLogger(ctx).Warn("Cannot parse timeout " + to + ", must be a golang duration.") } return 0, false } @@ -279,7 +281,7 @@ func (r *Runnable) RunAction(queue chan RunnerFunc) { outputMessage = proto.Clone(r.Message).(*jobs.ActionMessage) } else { runCtx := r.Context - if d, o := itemTimeout(r.Action.Timeout); o { + if d, o := itemTimeout(r.Context, r.Action.Timeout); o { var can context.CancelFunc runCtx, can = context.WithTimeout(runCtx, d) defer can() diff --git a/scheduler/tasks/task.go b/scheduler/tasks/task.go index 3bd759a923..f2fa61796f 100644 --- a/scheduler/tasks/task.go +++ b/scheduler/tasks/task.go @@ -88,7 +88,7 @@ func NewTaskFromEvent(runtime, ctx context.Context, job *jobs.Job, event interfa func (t *Task) Queue(queue chan RunnerFunc) { var ct context.Context var can context.CancelFunc - if d, o := itemTimeout(t.Job.Timeout); o { + if d, o := itemTimeout(t.context, t.Job.Timeout); o { ct, can = context.WithTimeout(t.context, d) } else { ct, can = context.WithCancel(t.context)