Skip to content

Commit

Permalink
feat(backup): support recurring and ondemand full backup
Browse files Browse the repository at this point in the history
ref: longhorn/longhorn 7070

Signed-off-by: Jack Lin <jack.lin@suse.com>
  • Loading branch information
ChanYiLin committed May 31, 2024
1 parent 63cf8e5 commit 0307a44
Show file tree
Hide file tree
Showing 67 changed files with 1,964 additions and 1,131 deletions.
21 changes: 19 additions & 2 deletions api/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,15 @@ type Backup struct {
Created string `json:"created"`
Size string `json:"size"`
Labels map[string]string `json:"labels"`
Parameters map[string]string `json:"parameters"`
Messages map[string]string `json:"messages"`
VolumeName string `json:"volumeName"`
VolumeSize string `json:"volumeSize"`
VolumeCreated string `json:"volumeCreated"`
VolumeBackingImageName string `json:"volumeBackingImageName"`
CompressionMethod string `json:"compressionMethod"`
NewlyUploadedDataSize string `json:"newlyUploadDataSize"`
ReUploadedDataSize string `json:"reUploadedDataSize"`
}

type BackupBackingImage struct {
Expand Down Expand Up @@ -286,8 +289,9 @@ type DetachInput struct {
}

type SnapshotInput struct {
Name string `json:"name"`
Labels map[string]string `json:"labels"`
Name string `json:"name"`
Labels map[string]string `json:"labels"`
Parameters map[string]string `json:"parameters"`
}

type SnapshotCRInput struct {
Expand Down Expand Up @@ -548,6 +552,7 @@ type InstanceManager struct {
type RecurringJob struct {
client.Resource
longhorn.RecurringJobSpec
longhorn.RecurringJobStatus
}

type Orphan struct {
Expand Down Expand Up @@ -828,6 +833,11 @@ func recurringJobSchema(job *client.Schema) {
labels.Type = "map[string]"
labels.Nullable = true
job.ResourceFields["labels"] = labels

parameters := job.ResourceFields["parameters"]
parameters.Type = "map[string]"
parameters.Nullable = true
job.ResourceFields["parameters"] = parameters
}

func kubernetesStatusSchema(status *client.Schema) {
Expand Down Expand Up @@ -1891,12 +1901,15 @@ func toBackupResource(b *longhorn.Backup) *Backup {
Created: b.Status.BackupCreatedAt,
Size: b.Status.Size,
Labels: b.Status.Labels,
Parameters: b.Spec.Parameters,
Messages: b.Status.Messages,
VolumeName: b.Status.VolumeName,
VolumeSize: b.Status.VolumeSize,
VolumeCreated: b.Status.VolumeCreated,
VolumeBackingImageName: b.Status.VolumeBackingImageName,
CompressionMethod: string(b.Status.CompressionMethod),
NewlyUploadedDataSize: b.Status.NewlyUploadedDataSize,
ReUploadedDataSize: b.Status.ReUploadedDataSize,
}
// Set the volume name from backup CR's label if it's empty.
// This field is empty probably because the backup state is not Ready
Expand Down Expand Up @@ -2239,6 +2252,10 @@ func toRecurringJobResource(recurringJob *longhorn.RecurringJob, apiContext *api
Retain: recurringJob.Spec.Retain,
Concurrency: recurringJob.Spec.Concurrency,
Labels: recurringJob.Spec.Labels,
Parameters: recurringJob.Spec.Parameters,
},
RecurringJobStatus: longhorn.RecurringJobStatus{
ExecutionCount: recurringJob.Status.ExecutionCount,
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions api/recurringjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (s *Server) RecurringJobCreate(rw http.ResponseWriter, req *http.Request) e
Retain: input.Retain,
Concurrency: input.Concurrency,
Labels: input.Labels,
Parameters: input.Parameters,
})
if err != nil {
return errors.Wrapf(err, "failed to create recurring job %v", input.Name)
Expand Down Expand Up @@ -90,6 +91,7 @@ func (s *Server) RecurringJobUpdate(rw http.ResponseWriter, req *http.Request) e
Retain: input.Retain,
Concurrency: input.Concurrency,
Labels: input.Labels,
Parameters: input.Parameters,
})
})
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion api/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ func (s *Server) SnapshotBackup(w http.ResponseWriter, req *http.Request) (err e
if err != nil {
return err
}
backupParameters := map[string]string{}
if input.Parameters != nil {
backupParameters = input.Parameters
}
if err := util.ValidateBackupParameters(backupParameters); err != nil {
return err
}

// Cannot directly compare the structs since KubernetesStatus contains a slice which cannot be compared.
if !reflect.DeepEqual(vol.Status.KubernetesStatus, longhorn.KubernetesStatus{}) {
Expand All @@ -195,7 +202,7 @@ func (s *Server) SnapshotBackup(w http.ResponseWriter, req *http.Request) (err e
labels[types.KubernetesStatusLabel] = string(kubeStatus)
}

if err := s.m.BackupSnapshot(bsutil.GenerateName("backup"), volName, input.Name, labels); err != nil {
if err := s.m.BackupSnapshot(bsutil.GenerateName("backup"), volName, input.Name, labels, backupParameters); err != nil {
return err
}

Expand Down
114 changes: 78 additions & 36 deletions app/recurring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
longhornclient "github.com/longhorn/longhorn-manager/client"
longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned"

lhbackup "github.com/longhorn/go-common-libs/backup"
)

const (
Expand All @@ -48,14 +50,16 @@ const (
)

type Job struct {
logger logrus.FieldLogger
lhClient lhclientset.Interface
namespace string
volumeName string
snapshotName string
retain int
task longhorn.RecurringJobType
labels map[string]string
logger logrus.FieldLogger
lhClient lhclientset.Interface
namespace string
volumeName string
snapshotName string
retain int
task longhorn.RecurringJobType
labels map[string]string
parameters map[string]string
executionCount int

eventRecorder record.EventRecorder

Expand Down Expand Up @@ -111,6 +115,13 @@ func recurringJob(c *cli.Context) (err error) {
var jobConcurrent int = recurringJob.Spec.Concurrency
jobTask := recurringJob.Spec.Task

recurringJob.Status.ExecutionCount += 1
if _, err = lhClient.LonghornV1beta2().RecurringJobs(namespace).UpdateStatus(context.TODO(), recurringJob, metav1.UpdateOptions{}); err != nil {
return errors.Wrap(err, "failed to update job execution count")
}

jobExecutionCount := recurringJob.Status.ExecutionCount

jobLabelMap := map[string]string{}
if recurringJob.Spec.Labels != nil {
jobLabelMap = recurringJob.Spec.Labels
Expand All @@ -121,6 +132,15 @@ func recurringJob(c *cli.Context) (err error) {
return errors.Wrap(err, "failed to get JSON encoding for labels")
}

jobParameterMap := map[string]string{}
if recurringJob.Spec.Parameters != nil {
jobParameterMap = recurringJob.Spec.Parameters
}
parameterJSON, err := json.Marshal(jobParameterMap)
if err != nil {
return errors.Wrap(err, "failed to get JSON encoding for parameters")
}

allowDetachedSetting := types.SettingNameAllowRecurringJobWhileVolumeDetached
allowDetached, err := getSettingAsBoolean(allowDetachedSetting, namespace, lhClient)
if err != nil {
Expand Down Expand Up @@ -153,7 +173,7 @@ func recurringJob(c *cli.Context) (err error) {
for _, volumeName := range filteredVolumes {
startJobVolumeName := volumeName
ewg.Go(func() error {
return startVolumeJob(startJobVolumeName, logger, concurrentLimiter, managerURL, jobName, jobTask, jobRetain, jobConcurrent, jobGroups, jobLabelMap, labelJSON)
return startVolumeJob(startJobVolumeName, logger, concurrentLimiter, managerURL, jobName, jobTask, jobRetain, jobConcurrent, jobGroups, jobLabelMap, labelJSON, jobParameterMap, parameterJSON, jobExecutionCount)
})
}

Expand All @@ -162,21 +182,23 @@ func recurringJob(c *cli.Context) (err error) {

Check notice on line 182 in app/recurring_job.go

View check run for this annotation

codefactor.io / CodeFactor

app/recurring_job.go#L86-L182

Complex Method
func startVolumeJob(
volumeName string, logger *logrus.Logger, concurrentLimiter chan struct{}, managerURL string,
jobName string, jobTask longhorn.RecurringJobType, jobRetain int, jobConcurrent int, jobGroups []string, jobLabelMap map[string]string, labelJSON []byte) error {
jobName string, jobTask longhorn.RecurringJobType, jobRetain int, jobConcurrent int, jobGroups []string, jobLabelMap map[string]string, labelJSON []byte, jobParameterMap map[string]string, parameterJSON []byte, jobExecutionCount int) error {

concurrentLimiter <- struct{}{}
defer func() {
<-concurrentLimiter
}()

log := logger.WithFields(logrus.Fields{
"job": jobName,
"volume": volumeName,
"task": jobTask,
"retain": jobRetain,
"concurrent": jobConcurrent,
"groups": strings.Join(jobGroups, ","),
"labels": string(labelJSON),
"job": jobName,
"volume": volumeName,
"task": jobTask,
"retain": jobRetain,
"concurrent": jobConcurrent,
"groups": strings.Join(jobGroups, ","),
"labels": string(labelJSON),
"parameters": string(parameterJSON),
"executionCount": jobExecutionCount,
})
log.Info("Creating job")

Expand All @@ -187,8 +209,10 @@ func startVolumeJob(
volumeName,
snapshotName,
jobLabelMap,
jobParameterMap,
jobRetain,
jobTask)
jobTask,
jobExecutionCount)
if err != nil {
log.WithError(err).Error("Failed to create new job for volume")
return err
Expand All @@ -213,7 +237,7 @@ func sliceStringSafely(s string, begin, end int) string {
return s[begin:end]
}

func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, parameters map[string]string, retain int, task longhorn.RecurringJobType, executionCount int) (*Job, error) {
namespace := os.Getenv(types.EnvPodNamespace)
if namespace == "" {
return nil, fmt.Errorf("failed detect pod namespace, environment variable %v is missing", types.EnvPodNamespace)
Expand All @@ -238,12 +262,13 @@ func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName stri
}

logger = logger.WithFields(logrus.Fields{
"namespace": namespace,
"volumeName": volumeName,
"snapshotName": snapshotName,
"labels": labels,
"retain": retain,
"task": task,
"namespace": namespace,
"volumeName": volumeName,
"snapshotName": snapshotName,
"labels": labels,
"retain": retain,
"task": task,
"executionCount": executionCount,
})

scheme := runtime.NewScheme()
Expand All @@ -257,15 +282,17 @@ func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName stri
}

return &Job{
logger: logger,
lhClient: lhClient,
namespace: namespace,
volumeName: volumeName,
snapshotName: snapshotName,
labels: labels,
retain: retain,
task: task,
api: apiClient,
logger: logger,
lhClient: lhClient,
namespace: namespace,
volumeName: volumeName,
snapshotName: snapshotName,
labels: labels,
parameters: parameters,
executionCount: executionCount,
retain: retain,
task: task,
api: apiClient,

eventRecorder: eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "longhorn-recurring-job"}),
}, nil
Expand Down Expand Up @@ -600,9 +627,24 @@ func (job *Job) doRecurringBackup() (err error) {
return err
}

backupParameters := map[string]string{}
if intervalStr, exists := job.parameters[types.RecurringJobBackupParameterFullBackupInterval]; exists {
interval, err := strconv.Atoi(intervalStr)
if err != nil {
return errors.Wrapf(err, "interval %v is not number", intervalStr)
}

if interval != 0 {
if job.executionCount%interval == 0 {
backupParameters[lhbackup.LonghornBackupParameterBackupMode] = string(lhbackup.LonghornBackupModeFull)
}
}
}

if _, err := job.api.Volume.ActionSnapshotBackup(volume, &longhornclient.SnapshotInput{
Labels: job.labels,
Name: job.snapshotName,
Labels: job.labels,
Name: job.snapshotName,
Parameters: backupParameters,
}); err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions client/generated_recurring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type RecurringJob struct {

Cron string `json:"cron,omitempty" yaml:"cron,omitempty"`

ExecutionCount int64 `json:"executionCount,omitempty" yaml:"execution_count,omitempty"`

Groups []string `json:"groups,omitempty" yaml:"groups,omitempty"`

Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions client/generated_snapshot_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ type SnapshotInput struct {

Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`

Parameters map[string]string `json:"parameters,omitempty" yaml:"parameters,omitempty"`

Name string `json:"name,omitempty" yaml:"name,omitempty"`
}

Expand Down
2 changes: 2 additions & 0 deletions controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
backup.Status.VolumeBackingImageName = backupInfo.VolumeBackingImageName
backup.Status.CompressionMethod = longhorn.BackupCompressionMethod(backupInfo.CompressionMethod)
backup.Status.LastSyncedAt = syncTime
backup.Status.NewlyUploadedDataSize = backupInfo.NewlyUploadedDataSize
backup.Status.ReUploadedDataSize = backupInfo.ReUploadedDataSize
return nil
}

Expand Down
35 changes: 35 additions & 0 deletions datastore/longhorn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4193,6 +4193,41 @@ func ValidateRecurringJob(job longhorn.RecurringJobSpec) error {
return err
}
}
if job.Parameters != nil {
if err := ValidateRecurringJobParameters(job.Task, job.Labels); err != nil {
return err
}
}
return nil
}

func ValidateRecurringJobParameters(task longhorn.RecurringJobType, parameters map[string]string) (err error) {
switch task {
case longhorn.RecurringJobTypeBackup, longhorn.RecurringJobTypeBackupForceCreate:
for key, value := range parameters {
if err := validateRecurringJobBackupParameter(key, value); err != nil {
return errors.Wrapf(err, "failed to validate recurring job backup task parameters")
}
}
// we don't support any parameters for other tasks currently
default:
return nil
}

return nil
}

func validateRecurringJobBackupParameter(key, value string) error {
switch key {
case types.RecurringJobBackupParameterFullBackupInterval:
_, err := strconv.Atoi(value)
if err != nil {
return errors.Wrapf(err, "%v:%v is not number", key, value)
}
default:
return fmt.Errorf("%v:%v is not a valid parameter", key, value)
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion engineapi/backup_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewBackupMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, backup
}
_, replicaAddress, err := engineClientProxy.SnapshotBackup(engine, backup.Spec.SnapshotName, backup.Name,
backupTargetClient.URL, volume.Spec.BackingImage, biChecksum, string(compressionMethod), concurrentLimit, storageClassName,
backup.Spec.Labels, backupTargetClient.Credential)
backup.Spec.Labels, backupTargetClient.Credential, backup.Spec.Parameters)
if err != nil {
if !strings.Contains(err.Error(), "DeadlineExceeded") {
m.logger.WithError(err).Warn("Cannot take snapshot backup")
Expand Down
Loading

0 comments on commit 0307a44

Please sign in to comment.