Skip to content

Commit

Permalink
fix(recurring-job): failed with backup failed
Browse files Browse the repository at this point in the history
Recurring job pod status should be failed (error) if it creates
and starts a backup or snapshot failed.

Recurring job will start a new pod to start a new backup because
setting  RestartPolicy `onFailure`.

Ref: 4255

Signed-off-by: James Lu <james.lu@suse.com>
(cherry picked from commit 460b23b)
  • Loading branch information
mantissahz authored and David Ko committed Aug 8, 2023
1 parent c69f7bf commit e8cfdc8
Showing 1 changed file with 58 additions and 46 deletions.
104 changes: 58 additions & 46 deletions app/recurring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"

"golang.org/x/sync/errgroup"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -71,15 +72,14 @@ func RecurringJobCmd() cli.Command {
},
Action: func(c *cli.Context) {
if err := recurringJob(c); err != nil {
logrus.WithError(err).Fatal("Failed to snapshot")
logrus.WithError(err).Fatal("Failed to do a recurring job")
}
},
}
}

func recurringJob(c *cli.Context) error {
func recurringJob(c *cli.Context) (err error) {
logger := logrus.StandardLogger()
var err error

var managerURL string = c.String(FlagManagerURL)
if managerURL == "" {
Expand Down Expand Up @@ -108,6 +108,7 @@ func recurringJob(c *cli.Context) error {
var jobGroups []string = recurringJob.Spec.Groups
var jobRetain int = recurringJob.Spec.Retain
var jobConcurrent int = recurringJob.Spec.Concurrency
jobTask := recurringJob.Spec.Task

jobLabelMap := map[string]string{}
if recurringJob.Spec.Labels != nil {
Expand Down Expand Up @@ -142,51 +143,62 @@ func recurringJob(c *cli.Context) error {
logger.Infof("Found %v volumes with recurring job %v", len(filteredVolumes), jobName)

concurrentLimiter := make(chan struct{}, jobConcurrent)
var wg sync.WaitGroup
defer wg.Wait()
ewg := &errgroup.Group{}
defer func() {
if wgError := ewg.Wait(); wgError != nil {
err = wgError
}
}()
for _, volumeName := range filteredVolumes {
wg.Add(1)
go func(volumeName string) {
concurrentLimiter <- struct{}{}
defer func() {
<-concurrentLimiter
wg.Done()
}()

log := logger.WithFields(logrus.Fields{
"job": jobName,
"volume": volumeName,
"task": recurringJob.Spec.Task,
"retain": jobRetain,
"concurrent": jobConcurrent,
"groups": strings.Join(jobGroups, ","),
"labels": string(labelJSON),
})
log.Info("Creating job")

snapshotName := sliceStringSafely(types.GetCronJobNameForRecurringJob(jobName), 0, 8) + "-" + util.UUID()
job, err := NewJob(
logger,
managerURL,
volumeName,
snapshotName,
jobLabelMap,
jobRetain,
recurringJob.Spec.Task)
if err != nil {
log.WithError(err).Error("Failed to create new job for volume")
return
}
err = job.run()
if err != nil {
log.WithError(err).Errorf("Failed to run job for volume")
return
}
startJobVolumeName := volumeName
ewg.Go(func() error {
return startVolumeJob(startJobVolumeName, logger, concurrentLimiter, managerURL, jobName, jobTask, jobRetain, jobConcurrent, jobGroups, jobLabelMap, labelJSON)
})
}

log.Info("Created job")
}(volumeName)
return err
}

func startVolumeJob(
volumeName string, logger *logrus.Logger, concurrentLimiter chan struct{}, managerURL string,
jobName string, jobTask longhorn.RecurringJobType, jobRetain int, jobConcurrent int, jobGroups []string, jobLabelMap map[string]string, labelJSON []byte) error {

concurrentLimiter <- struct{}{}
defer func() {
<-concurrentLimiter
}()

log := logger.WithFields(logrus.Fields{
"job": jobName,
"volume": volumeName,
"task": jobTask,
"retain": jobRetain,
"concurrent": jobConcurrent,
"groups": strings.Join(jobGroups, ","),
"labels": string(labelJSON),
})
log.Info("Creating job")

snapshotName := sliceStringSafely(types.GetCronJobNameForRecurringJob(jobName), 0, 8) + "-" + util.UUID()
job, err := newJob(
logger,
managerURL,
volumeName,
snapshotName,
jobLabelMap,
jobRetain,
jobTask)
if err != nil {
log.WithError(err).Error("Failed to create new job for volume")
return err
}
err = job.run()
if err != nil {
log.WithError(err).Errorf("Failed to run job for volume")
return err
}

log.Info("Created job")
return nil
}

Expand All @@ -200,7 +212,7 @@ func sliceStringSafely(s string, begin, end int) string {
return s[begin:end]
}

func NewJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
namespace := os.Getenv(types.EnvPodNamespace)
if namespace == "" {
return nil, fmt.Errorf("failed detect pod namespace, environment variable %v is missing", types.EnvPodNamespace)
Expand Down

0 comments on commit e8cfdc8

Please sign in to comment.