From e8cfdc828f6a575ad6b14e6216c984bcd4c2ac2a Mon Sep 17 00:00:00 2001 From: James Lu Date: Tue, 8 Aug 2023 15:26:38 +0800 Subject: [PATCH] fix(recurring-job): failed with backup failed Recurring job pod status should be failed (error) if it creates and starts a backup or snapshot failed. Recurring job will start a new pod to start a new backup because setting RestartPolicy `onFailure`. Ref: 4255 Signed-off-by: James Lu (cherry picked from commit 460b23bc72b7914237e76345a9de8a22c16451e7) --- app/recurring_job.go | 104 ++++++++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 46 deletions(-) diff --git a/app/recurring_job.go b/app/recurring_job.go index 78b8b421ca..c3eebbeb06 100644 --- a/app/recurring_job.go +++ b/app/recurring_job.go @@ -8,13 +8,14 @@ import ( "sort" "strconv" "strings" - "sync" "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/urfave/cli" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -71,15 +72,14 @@ func RecurringJobCmd() cli.Command { }, Action: func(c *cli.Context) { if err := recurringJob(c); err != nil { - logrus.WithError(err).Fatal("Failed to snapshot") + logrus.WithError(err).Fatal("Failed to do a recurring job") } }, } } -func recurringJob(c *cli.Context) error { +func recurringJob(c *cli.Context) (err error) { logger := logrus.StandardLogger() - var err error var managerURL string = c.String(FlagManagerURL) if managerURL == "" { @@ -108,6 +108,7 @@ func recurringJob(c *cli.Context) error { var jobGroups []string = recurringJob.Spec.Groups var jobRetain int = recurringJob.Spec.Retain var jobConcurrent int = recurringJob.Spec.Concurrency + jobTask := recurringJob.Spec.Task jobLabelMap := map[string]string{} if recurringJob.Spec.Labels != nil { @@ -142,51 +143,62 @@ func recurringJob(c *cli.Context) error { logger.Infof("Found %v volumes with recurring job %v", len(filteredVolumes), jobName) concurrentLimiter := make(chan struct{}, jobConcurrent) - var wg sync.WaitGroup - defer wg.Wait() + ewg := &errgroup.Group{} + defer func() { + if wgError := ewg.Wait(); wgError != nil { + err = wgError + } + }() for _, volumeName := range filteredVolumes { - wg.Add(1) - go func(volumeName string) { - concurrentLimiter <- struct{}{} - defer func() { - <-concurrentLimiter - wg.Done() - }() - - log := logger.WithFields(logrus.Fields{ - "job": jobName, - "volume": volumeName, - "task": recurringJob.Spec.Task, - "retain": jobRetain, - "concurrent": jobConcurrent, - "groups": strings.Join(jobGroups, ","), - "labels": string(labelJSON), - }) - log.Info("Creating job") - - snapshotName := sliceStringSafely(types.GetCronJobNameForRecurringJob(jobName), 0, 8) + "-" + util.UUID() - job, err := NewJob( - logger, - managerURL, - volumeName, - snapshotName, - jobLabelMap, - jobRetain, - recurringJob.Spec.Task) - if err != nil { - log.WithError(err).Error("Failed to create new job for volume") - return - } - err = job.run() - if err != nil { - log.WithError(err).Errorf("Failed to run job for volume") - return - } + startJobVolumeName := volumeName + ewg.Go(func() error { + return startVolumeJob(startJobVolumeName, logger, concurrentLimiter, managerURL, jobName, jobTask, jobRetain, jobConcurrent, jobGroups, jobLabelMap, labelJSON) + }) + } - log.Info("Created job") - }(volumeName) + return err +} + +func startVolumeJob( + volumeName string, logger *logrus.Logger, concurrentLimiter chan struct{}, managerURL string, + jobName string, jobTask longhorn.RecurringJobType, jobRetain int, jobConcurrent int, jobGroups []string, jobLabelMap map[string]string, labelJSON []byte) error { + + concurrentLimiter <- struct{}{} + defer func() { + <-concurrentLimiter + }() + + log := logger.WithFields(logrus.Fields{ + "job": jobName, + "volume": volumeName, + "task": jobTask, + "retain": jobRetain, + "concurrent": jobConcurrent, + "groups": strings.Join(jobGroups, ","), + "labels": string(labelJSON), + }) + log.Info("Creating job") + + snapshotName := sliceStringSafely(types.GetCronJobNameForRecurringJob(jobName), 0, 8) + "-" + util.UUID() + job, err := newJob( + logger, + managerURL, + volumeName, + snapshotName, + jobLabelMap, + jobRetain, + jobTask) + if err != nil { + log.WithError(err).Error("Failed to create new job for volume") + return err + } + err = job.run() + if err != nil { + log.WithError(err).Errorf("Failed to run job for volume") + return err } + log.Info("Created job") return nil } @@ -200,7 +212,7 @@ func sliceStringSafely(s string, begin, end int) string { return s[begin:end] } -func NewJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) { +func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) { namespace := os.Getenv(types.EnvPodNamespace) if namespace == "" { return nil, fmt.Errorf("failed detect pod namespace, environment variable %v is missing", types.EnvPodNamespace)