Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backup): support recurring and ondemand full backup #2709

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions api/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,15 @@
Created string `json:"created"`
Size string `json:"size"`
Labels map[string]string `json:"labels"`
BackupMode longhorn.BackupMode `json:"backupMode"`
Messages map[string]string `json:"messages"`
VolumeName string `json:"volumeName"`
VolumeSize string `json:"volumeSize"`
VolumeCreated string `json:"volumeCreated"`
VolumeBackingImageName string `json:"volumeBackingImageName"`
CompressionMethod string `json:"compressionMethod"`
NewlyUploadedDataSize string `json:"newlyUploadDataSize"`
ReUploadedDataSize string `json:"reUploadedDataSize"`
}

type BackupBackingImage struct {
Expand Down Expand Up @@ -287,8 +290,9 @@
}

type SnapshotInput struct {
Name string `json:"name"`
Labels map[string]string `json:"labels"`
Name string `json:"name"`
Labels map[string]string `json:"labels"`
BackupMode string `json:"backupMode"`
}

type SnapshotCRInput struct {
Expand Down Expand Up @@ -553,6 +557,7 @@
type RecurringJob struct {
client.Resource
longhorn.RecurringJobSpec
longhorn.RecurringJobStatus
}

type Orphan struct {
Expand Down Expand Up @@ -834,6 +839,11 @@
labels.Type = "map[string]"
labels.Nullable = true
job.ResourceFields["labels"] = labels

parameters := job.ResourceFields["parameters"]
parameters.Type = "map[string]"
parameters.Nullable = true
job.ResourceFields["parameters"] = parameters
}

func kubernetesStatusSchema(status *client.Schema) {
Expand Down Expand Up @@ -1336,338 +1346,338 @@
return &client.GenericCollection{Data: data, Collection: client.Collection{ResourceType: "setting"}}
}

func toVolumeResource(v *longhorn.Volume, ves []*longhorn.Engine, vrs []*longhorn.Replica, backups []*longhorn.Backup, lhVolumeAttachment *longhorn.VolumeAttachment, apiContext *api.ApiContext) *Volume {
var ve *longhorn.Engine
controllers := []Controller{}
backupStatus := []BackupStatus{}
restoreStatus := []RestoreStatus{}
var purgeStatuses []PurgeStatus
rebuildStatuses := []RebuildStatus{}
volumeAttachment := VolumeAttachment{
Attachments: make(map[string]Attachment),
Volume: v.Name,
}
for _, e := range ves {
actualSize := int64(0)
snapshots := e.Status.Snapshots
for _, snapshot := range snapshots {
snapshotSize, err := util.ConvertSize(snapshot.Size)
if err != nil {
logrus.WithError(err).Warnf("api: failed to convert snapshot size %v for volume %v", snapshot.Size, v.Name)
continue
}
actualSize += snapshotSize
}
controllers = append(controllers, Controller{
Instance: Instance{
Name: e.Name,
Running: e.Status.CurrentState == longhorn.InstanceStateRunning,
NodeID: e.Spec.NodeID,
Address: e.Status.IP,
Image: e.Spec.Image,
CurrentImage: e.Status.CurrentImage,
InstanceManagerName: e.Status.InstanceManagerName,
},
Size: strconv.FormatInt(e.Status.CurrentSize, 10),
ActualSize: strconv.FormatInt(actualSize, 10),
Endpoint: e.Status.Endpoint,
LastRestoredBackup: e.Status.LastRestoredBackup,
RequestedBackupRestore: e.Spec.RequestedBackupRestore,
IsExpanding: e.Status.IsExpanding,
LastExpansionError: e.Status.LastExpansionError,
LastExpansionFailedAt: e.Status.LastExpansionFailedAt,
UnmapMarkSnapChainRemovedEnabled: e.Status.UnmapMarkSnapChainRemovedEnabled,
})
if e.Spec.NodeID == v.Status.CurrentNodeID {
ve = e
}
rs := e.Status.RestoreStatus
if rs != nil {
replicas := util.GetSortedKeysFromMap(rs)
for _, replica := range replicas {
restoreStatus = append(restoreStatus, RestoreStatus{
Resource: client.Resource{},
Replica: datastore.ReplicaAddressToReplicaName(replica, vrs),
IsRestoring: rs[replica].IsRestoring,
LastRestored: rs[replica].LastRestored,
Progress: rs[replica].Progress,
Error: rs[replica].Error,
Filename: rs[replica].Filename,
State: rs[replica].State,
BackupURL: rs[replica].BackupURL,
})
}
}
purgeStatus := e.Status.PurgeStatus
if purgeStatus != nil {
replicas := util.GetSortedKeysFromMap(purgeStatus)
for _, replica := range replicas {
purgeStatuses = append(purgeStatuses, PurgeStatus{
Resource: client.Resource{},
Replica: datastore.ReplicaAddressToReplicaName(replica, vrs),
Error: purgeStatus[replica].Error,
IsPurging: purgeStatus[replica].IsPurging,
Progress: purgeStatus[replica].Progress,
State: purgeStatus[replica].State,
})
}
}
rebuildStatus := e.Status.RebuildStatus
if rebuildStatus != nil {
replicas := util.GetSortedKeysFromMap(rebuildStatus)
for _, replica := range replicas {
rebuildStatuses = append(rebuildStatuses, RebuildStatus{
Resource: client.Resource{},
Replica: datastore.ReplicaAddressToReplicaName(replica, vrs),
Error: rebuildStatus[replica].Error,
IsRebuilding: rebuildStatus[replica].IsRebuilding,
Progress: rebuildStatus[replica].Progress,
State: rebuildStatus[replica].State,
FromReplica: datastore.ReplicaAddressToReplicaName(rebuildStatus[replica].FromReplicaAddress, vrs),
})
}
}
}

replicas := []Replica{}
for _, r := range vrs {
mode := ""
if ve != nil && ve.Status.ReplicaModeMap != nil {
mode = string(ve.Status.ReplicaModeMap[r.Name])
}
replicas = append(replicas, Replica{
Instance: Instance{
Name: r.Name,
Running: r.Status.CurrentState == longhorn.InstanceStateRunning,
Address: r.Status.IP,
NodeID: r.Spec.NodeID,
Image: r.Spec.Image,
CurrentImage: r.Status.CurrentImage,
InstanceManagerName: r.Status.InstanceManagerName,
},
DiskID: r.Spec.DiskID,
DiskPath: r.Spec.DiskPath,
DataPath: types.GetReplicaDataPath(r.Spec.DiskPath, r.Spec.DataDirectoryName),
Mode: mode,
FailedAt: r.Spec.FailedAt,
DataEngine: string(r.Spec.DataEngine),
})
}

for _, b := range backups {
backupStatus = append(backupStatus, BackupStatus{
Resource: client.Resource{},
Name: b.Name,
Snapshot: b.Status.SnapshotName,
Progress: b.Status.Progress,
BackupURL: b.Status.URL,
Error: b.Status.Error,
State: string(b.Status.State),
Replica: datastore.ReplicaAddressToReplicaName(b.Status.ReplicaAddress, vrs),
Size: b.Status.Size,
})
}

if lhVolumeAttachment != nil {
for k, v := range lhVolumeAttachment.Spec.AttachmentTickets {
if v != nil {
volumeAttachment.Attachments[k] = Attachment{
AttachmentID: v.ID,
AttachmentType: string(v.Type),
NodeID: v.NodeID,
Parameters: v.Parameters,
}
}
}
for k, v := range lhVolumeAttachment.Status.AttachmentTicketStatuses {
if v == nil {
continue
}
attachment, ok := volumeAttachment.Attachments[k]
if !ok {
continue
}
attachment.Satisfied = longhorn.IsAttachmentTicketSatisfied(attachment.AttachmentID, lhVolumeAttachment)
attachment.Conditions = v.Conditions
volumeAttachment.Attachments[k] = attachment
}
}

// The volume is not ready for workloads if:
// 1. It's auto attached.
// 2. It fails to schedule replicas during the volume creation,
// in which case scheduling failure will happen when the volume is detached.
// In other cases, scheduling failure only happens when the volume is attached.
// 3. It's faulted.
// 4. It's restore pending.
// 5. It's failed to clone
ready := true
scheduledCondition := types.GetCondition(v.Status.Conditions, longhorn.VolumeConditionTypeScheduled)
if (v.Spec.NodeID == "" && v.Status.State != longhorn.VolumeStateDetached) ||
(v.Status.State == longhorn.VolumeStateDetached && scheduledCondition.Status != longhorn.ConditionStatusTrue) ||
v.Status.Robustness == longhorn.VolumeRobustnessFaulted ||
v.Status.RestoreRequired ||
v.Status.CloneStatus.State == longhorn.VolumeCloneStateFailed {
ready = false
}

r := &Volume{
Resource: client.Resource{
Id: v.Name,
Type: "volume",
Actions: map[string]string{},
Links: map[string]string{},
},
Name: v.Name,
Size: strconv.FormatInt(v.Spec.Size, 10),
Frontend: v.Spec.Frontend,
DisableFrontend: v.Spec.DisableFrontend,
LastAttachedBy: v.Spec.LastAttachedBy,
FromBackup: v.Spec.FromBackup,
DataSource: v.Spec.DataSource,
NumberOfReplicas: v.Spec.NumberOfReplicas,
ReplicaAutoBalance: v.Spec.ReplicaAutoBalance,
DataLocality: v.Spec.DataLocality,
SnapshotDataIntegrity: v.Spec.SnapshotDataIntegrity,
SnapshotMaxCount: v.Spec.SnapshotMaxCount,
SnapshotMaxSize: strconv.FormatInt(v.Spec.SnapshotMaxSize, 10),
BackupCompressionMethod: v.Spec.BackupCompressionMethod,
StaleReplicaTimeout: v.Spec.StaleReplicaTimeout,
Created: v.CreationTimestamp.String(),
Image: v.Spec.Image,
BackingImage: v.Spec.BackingImage,
Standby: v.Spec.Standby,
DiskSelector: v.Spec.DiskSelector,
NodeSelector: v.Spec.NodeSelector,
RestoreVolumeRecurringJob: v.Spec.RestoreVolumeRecurringJob,
FreezeFilesystemForSnapshot: v.Spec.FreezeFilesystemForSnapshot,

State: v.Status.State,
Robustness: v.Status.Robustness,
CurrentImage: v.Status.CurrentImage,
LastBackup: v.Status.LastBackup,
LastBackupAt: v.Status.LastBackupAt,
RestoreRequired: v.Status.RestoreRequired,
RestoreInitiated: v.Status.RestoreInitiated,
RevisionCounterDisabled: v.Spec.RevisionCounterDisabled,
UnmapMarkSnapChainRemoved: v.Spec.UnmapMarkSnapChainRemoved,
ReplicaSoftAntiAffinity: v.Spec.ReplicaSoftAntiAffinity,
ReplicaZoneSoftAntiAffinity: v.Spec.ReplicaZoneSoftAntiAffinity,
ReplicaDiskSoftAntiAffinity: v.Spec.ReplicaDiskSoftAntiAffinity,
DataEngine: v.Spec.DataEngine,
OfflineReplicaRebuilding: v.Spec.OfflineReplicaRebuilding,
OfflineReplicaRebuildingRequired: v.Status.OfflineReplicaRebuildingRequired,
Ready: ready,

AccessMode: v.Spec.AccessMode,
ShareEndpoint: v.Status.ShareEndpoint,
ShareState: v.Status.ShareState,

Migratable: v.Spec.Migratable,

Encrypted: v.Spec.Encrypted,

Conditions: sliceToMap(v.Status.Conditions),
KubernetesStatus: v.Status.KubernetesStatus,
CloneStatus: v.Status.CloneStatus,

Controllers: controllers,
Replicas: replicas,
BackupStatus: backupStatus,
RestoreStatus: restoreStatus,
PurgeStatus: purgeStatuses,
RebuildStatus: rebuildStatuses,
VolumeAttachment: volumeAttachment,
}

// api attach & detach calls are always allowed
// the volume manager is responsible for handling them appropriately
actions := map[string]struct{}{
"attach": {},
"detach": {},
}

if v.Status.Robustness == longhorn.VolumeRobustnessFaulted {
actions["salvage"] = struct{}{}
} else {

actions["snapshotCRCreate"] = struct{}{}
actions["snapshotCRGet"] = struct{}{}
actions["snapshotCRList"] = struct{}{}
actions["snapshotCRDelete"] = struct{}{}
actions["snapshotBackup"] = struct{}{}

switch v.Status.State {
case longhorn.VolumeStateDetached:
actions["activate"] = struct{}{}
actions["expand"] = struct{}{}
actions["cancelExpansion"] = struct{}{}
actions["replicaRemove"] = struct{}{}
actions["engineUpgrade"] = struct{}{}
actions["pvCreate"] = struct{}{}
actions["pvcCreate"] = struct{}{}
actions["updateDataLocality"] = struct{}{}
actions["updateAccessMode"] = struct{}{}
actions["updateReplicaAutoBalance"] = struct{}{}
actions["updateUnmapMarkSnapChainRemoved"] = struct{}{}
actions["updateSnapshotDataIntegrity"] = struct{}{}
actions["updateSnapshotMaxCount"] = struct{}{}
actions["updateSnapshotMaxSize"] = struct{}{}
actions["updateOfflineReplicaRebuilding"] = struct{}{}
actions["updateBackupCompressionMethod"] = struct{}{}
actions["updateReplicaSoftAntiAffinity"] = struct{}{}
actions["updateReplicaZoneSoftAntiAffinity"] = struct{}{}
actions["updateReplicaDiskSoftAntiAffinity"] = struct{}{}
actions["updateFreezeFilesystemForSnapshot"] = struct{}{}
actions["recurringJobAdd"] = struct{}{}
actions["recurringJobDelete"] = struct{}{}
actions["recurringJobList"] = struct{}{}
case longhorn.VolumeStateAttaching:
actions["cancelExpansion"] = struct{}{}
actions["recurringJobAdd"] = struct{}{}
actions["recurringJobDelete"] = struct{}{}
actions["recurringJobList"] = struct{}{}
case longhorn.VolumeStateAttached:
actions["activate"] = struct{}{}
actions["expand"] = struct{}{}
actions["snapshotPurge"] = struct{}{}
actions["snapshotCreate"] = struct{}{}
actions["snapshotList"] = struct{}{}
actions["snapshotGet"] = struct{}{}
actions["snapshotDelete"] = struct{}{}
actions["snapshotRevert"] = struct{}{}
actions["replicaRemove"] = struct{}{}
actions["engineUpgrade"] = struct{}{}
actions["updateReplicaCount"] = struct{}{}
actions["updateDataLocality"] = struct{}{}
actions["updateReplicaAutoBalance"] = struct{}{}
actions["updateUnmapMarkSnapChainRemoved"] = struct{}{}
actions["updateSnapshotDataIntegrity"] = struct{}{}
actions["updateSnapshotMaxCount"] = struct{}{}
actions["updateSnapshotMaxSize"] = struct{}{}
actions["updateOfflineReplicaRebuilding"] = struct{}{}
actions["updateBackupCompressionMethod"] = struct{}{}
actions["updateReplicaSoftAntiAffinity"] = struct{}{}
actions["updateReplicaZoneSoftAntiAffinity"] = struct{}{}
actions["updateReplicaDiskSoftAntiAffinity"] = struct{}{}
actions["updateFreezeFilesystemForSnapshot"] = struct{}{}
actions["pvCreate"] = struct{}{}
actions["pvcCreate"] = struct{}{}
actions["cancelExpansion"] = struct{}{}
actions["trimFilesystem"] = struct{}{}
actions["recurringJobAdd"] = struct{}{}
actions["recurringJobDelete"] = struct{}{}
actions["recurringJobList"] = struct{}{}
}
}

for action := range actions {
r.Actions[action] = apiContext.UrlBuilder.ActionLink(r.Resource, action)
}

return r
}

Check notice on line 1680 in api/model.go

View check run for this annotation

codefactor.io / CodeFactor

api/model.go#L1349-L1680

Complex Method
func toSnapshotCRResource(s *longhorn.Snapshot) *SnapshotCR {
if s == nil {
return nil
Expand Down Expand Up @@ -1904,12 +1914,15 @@
Created: b.Status.BackupCreatedAt,
Size: b.Status.Size,
Labels: b.Status.Labels,
BackupMode: b.Spec.BackupMode,
Messages: b.Status.Messages,
VolumeName: b.Status.VolumeName,
VolumeSize: b.Status.VolumeSize,
VolumeCreated: b.Status.VolumeCreated,
VolumeBackingImageName: b.Status.VolumeBackingImageName,
CompressionMethod: string(b.Status.CompressionMethod),
NewlyUploadedDataSize: b.Status.NewlyUploadedDataSize,
ReUploadedDataSize: b.Status.ReUploadedDataSize,
}
// Set the volume name from backup CR's label if it's empty.
// This field is empty probably because the backup state is not Ready
Expand Down Expand Up @@ -2252,6 +2265,10 @@
Retain: recurringJob.Spec.Retain,
Concurrency: recurringJob.Spec.Concurrency,
Labels: recurringJob.Spec.Labels,
Parameters: recurringJob.Spec.Parameters,
},
RecurringJobStatus: longhorn.RecurringJobStatus{
ExecutionCount: recurringJob.Status.ExecutionCount,
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions api/recurringjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (s *Server) RecurringJobCreate(rw http.ResponseWriter, req *http.Request) e
Retain: input.Retain,
Concurrency: input.Concurrency,
Labels: input.Labels,
Parameters: input.Parameters,
})
if err != nil {
return errors.Wrapf(err, "failed to create recurring job %v", input.Name)
Expand Down Expand Up @@ -90,6 +91,7 @@ func (s *Server) RecurringJobUpdate(rw http.ResponseWriter, req *http.Request) e
Retain: input.Retain,
Concurrency: input.Concurrency,
Labels: input.Labels,
Parameters: input.Parameters,
})
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion api/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (s *Server) SnapshotBackup(w http.ResponseWriter, req *http.Request) (err e
labels[types.KubernetesStatusLabel] = string(kubeStatus)
}

if err := s.m.BackupSnapshot(bsutil.GenerateName("backup"), volName, input.Name, labels); err != nil {
if err := s.m.BackupSnapshot(bsutil.GenerateName("backup"), volName, input.Name, labels, input.BackupMode); err != nil {
return err
}

Expand Down
108 changes: 72 additions & 36 deletions app/recurring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@
)

type Job struct {
logger logrus.FieldLogger
lhClient lhclientset.Interface
namespace string
volumeName string
snapshotName string
retain int
task longhorn.RecurringJobType
labels map[string]string
logger logrus.FieldLogger
lhClient lhclientset.Interface
namespace string
volumeName string
snapshotName string
retain int
task longhorn.RecurringJobType
labels map[string]string
parameters map[string]string
executionCount int

eventRecorder record.EventRecorder

Expand All @@ -79,104 +81,118 @@
}
}

func recurringJob(c *cli.Context) (err error) {
logger := logrus.StandardLogger()

var managerURL string = c.String(FlagManagerURL)
if managerURL == "" {
return fmt.Errorf("require %v", FlagManagerURL)
}

if c.NArg() != 1 {
return errors.New("job name is required")
}
jobName := c.Args()[0]

namespace := os.Getenv(types.EnvPodNamespace)
if namespace == "" {
return fmt.Errorf("failed detect pod namespace, environment variable %v is missing", types.EnvPodNamespace)
}
lhClient, err := getLonghornClientset()
if err != nil {
return errors.Wrap(err, "failed to get clientset")
}
recurringJob, err := lhClient.LonghornV1beta2().RecurringJobs(namespace).Get(context.TODO(), jobName, metav1.GetOptions{})
if err != nil {
logrus.WithError(err).Errorf("Failed to get recurring job %v.", jobName)
return nil
}

var jobGroups []string = recurringJob.Spec.Groups
var jobRetain int = recurringJob.Spec.Retain
var jobConcurrent int = recurringJob.Spec.Concurrency
jobTask := recurringJob.Spec.Task

recurringJob.Status.ExecutionCount += 1
if _, err = lhClient.LonghornV1beta2().RecurringJobs(namespace).UpdateStatus(context.TODO(), recurringJob, metav1.UpdateOptions{}); err != nil {
return errors.Wrap(err, "failed to update job execution count")
}

jobExecutionCount := recurringJob.Status.ExecutionCount
derekbit marked this conversation as resolved.
Show resolved Hide resolved

jobLabelMap := map[string]string{}
if recurringJob.Spec.Labels != nil {
jobLabelMap = recurringJob.Spec.Labels
}
jobLabelMap[types.RecurringJobLabel] = recurringJob.Name
labelJSON, err := json.Marshal(jobLabelMap)
if err != nil {
return errors.Wrap(err, "failed to get JSON encoding for labels")
}

jobParameterMap := map[string]string{}
if recurringJob.Spec.Parameters != nil {
jobParameterMap = recurringJob.Spec.Parameters
}

allowDetachedSetting := types.SettingNameAllowRecurringJobWhileVolumeDetached
allowDetached, err := getSettingAsBoolean(allowDetachedSetting, namespace, lhClient)
if err != nil {
return errors.Wrapf(err, "failed to get %v setting", allowDetachedSetting)
}
logger.Infof("Setting %v is %v", allowDetachedSetting, allowDetached)

volumes, err := getVolumesBySelector(types.LonghornLabelRecurringJob, jobName, namespace, lhClient)
if err != nil {
return err
}
filteredVolumes := []string{}
filterVolumesForJob(allowDetached, volumes, &filteredVolumes)
for _, jobGroup := range jobGroups {
volumes, err := getVolumesBySelector(types.LonghornLabelRecurringJobGroup, jobGroup, namespace, lhClient)
if err != nil {
return err
}
filterVolumesForJob(allowDetached, volumes, &filteredVolumes)
}
logger.Infof("Found %v volumes with recurring job %v", len(filteredVolumes), jobName)

concurrentLimiter := make(chan struct{}, jobConcurrent)
ewg := &errgroup.Group{}
defer func() {
if wgError := ewg.Wait(); wgError != nil {
err = wgError
}
}()
for _, volumeName := range filteredVolumes {
startJobVolumeName := volumeName
ewg.Go(func() error {
return startVolumeJob(startJobVolumeName, logger, concurrentLimiter, managerURL, jobName, jobTask, jobRetain, jobConcurrent, jobGroups, jobLabelMap, labelJSON)
return startVolumeJob(startJobVolumeName, logger, concurrentLimiter, managerURL, jobName, jobTask, jobRetain, jobConcurrent, jobGroups, jobLabelMap, labelJSON, jobParameterMap, jobExecutionCount)
})
}

return err
}

Check notice on line 176 in app/recurring_job.go

View check run for this annotation

codefactor.io / CodeFactor

app/recurring_job.go#L84-L176

Complex Method
func startVolumeJob(
volumeName string, logger *logrus.Logger, concurrentLimiter chan struct{}, managerURL string,
jobName string, jobTask longhorn.RecurringJobType, jobRetain int, jobConcurrent int, jobGroups []string, jobLabelMap map[string]string, labelJSON []byte) error {
jobName string, jobTask longhorn.RecurringJobType, jobRetain int, jobConcurrent int, jobGroups []string, jobLabelMap map[string]string, labelJSON []byte, jobParameterMap map[string]string, jobExecutionCount int) error {

concurrentLimiter <- struct{}{}
defer func() {
<-concurrentLimiter
}()

log := logger.WithFields(logrus.Fields{
"job": jobName,
"volume": volumeName,
"task": jobTask,
"retain": jobRetain,
"concurrent": jobConcurrent,
"groups": strings.Join(jobGroups, ","),
"labels": string(labelJSON),
"job": jobName,
"volume": volumeName,
"task": jobTask,
"retain": jobRetain,
"concurrent": jobConcurrent,
"groups": strings.Join(jobGroups, ","),
"labels": string(labelJSON),
"parameters": jobParameterMap,
"executionCount": jobExecutionCount,
})
log.Info("Creating job")

Expand All @@ -187,8 +203,10 @@
volumeName,
snapshotName,
jobLabelMap,
jobParameterMap,
jobRetain,
jobTask)
jobTask,
jobExecutionCount)
if err != nil {
log.WithError(err).Error("Failed to create new job for volume")
return err
Expand All @@ -213,7 +231,7 @@
return s[begin:end]
}

func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, parameters map[string]string, retain int, task longhorn.RecurringJobType, executionCount int) (*Job, error) {
namespace := os.Getenv(types.EnvPodNamespace)
if namespace == "" {
return nil, fmt.Errorf("failed detect pod namespace, environment variable %v is missing", types.EnvPodNamespace)
Expand All @@ -238,12 +256,13 @@
}

logger = logger.WithFields(logrus.Fields{
"namespace": namespace,
"volumeName": volumeName,
"snapshotName": snapshotName,
"labels": labels,
"retain": retain,
"task": task,
"namespace": namespace,
"volumeName": volumeName,
"snapshotName": snapshotName,
"labels": labels,
"retain": retain,
"task": task,
"executionCount": executionCount,
})

scheme := runtime.NewScheme()
Expand All @@ -257,15 +276,17 @@
}

return &Job{
logger: logger,
lhClient: lhClient,
namespace: namespace,
volumeName: volumeName,
snapshotName: snapshotName,
labels: labels,
retain: retain,
task: task,
api: apiClient,
logger: logger,
lhClient: lhClient,
namespace: namespace,
volumeName: volumeName,
snapshotName: snapshotName,
labels: labels,
parameters: parameters,
executionCount: executionCount,
retain: retain,
task: task,
api: apiClient,

eventRecorder: eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "longhorn-recurring-job"}),
}, nil
Expand Down Expand Up @@ -600,9 +621,24 @@
return err
}

backupMode := longhorn.BackupModeIncremental
if intervalStr, exists := job.parameters[types.RecurringJobBackupParameterFullBackupInterval]; exists {
interval, err := strconv.Atoi(intervalStr)
if err != nil {
return errors.Wrapf(err, "interval %v is not number", intervalStr)
}

if interval != 0 {
if job.executionCount%interval == 0 {
backupMode = longhorn.BackupModeFull
}
}
}

if _, err := job.api.Volume.ActionSnapshotBackup(volume, &longhornclient.SnapshotInput{
Labels: job.labels,
Name: job.snapshotName,
Labels: job.labels,
Name: job.snapshotName,
BackupMode: string(backupMode),
}); err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions client/generated_recurring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type RecurringJob struct {

Cron string `json:"cron,omitempty" yaml:"cron,omitempty"`

ExecutionCount int64 `json:"executionCount,omitempty" yaml:"execution_count,omitempty"`

Groups []string `json:"groups,omitempty" yaml:"groups,omitempty"`

Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions client/generated_snapshot_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const (
type SnapshotInput struct {
Resource `yaml:"-"`

BackupMode string `json:"backupMode,omitempty" yaml:"backupMode,omitempty"`

Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`

Name string `json:"name,omitempty" yaml:"name,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,279 +215,281 @@
return true, nil
}

func (bc *BackupController) reconcile(backupName string) (err error) {
// Get Backup CR
backup, err := bc.ds.GetBackup(backupName)
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}
return nil
}

// Check the responsible node
defaultEngineImage, err := bc.ds.GetSettingValueExisted(types.SettingNameDefaultEngineImage)
if err != nil {
return err
}
isResponsible, err := bc.isResponsibleFor(backup, defaultEngineImage)
if err != nil {
return nil
}
if !isResponsible {
return nil
}
if backup.Status.OwnerID != bc.controllerID {
backup.Status.OwnerID = bc.controllerID
backup, err = bc.ds.UpdateBackupStatus(backup)
if err != nil {
// we don't mind others coming first
if apierrors.IsConflict(errors.Cause(err)) {
return nil
}
return err
}
}

log := getLoggerForBackup(bc.logger, backup)

// Get default backup target
backupTarget, err := bc.ds.GetBackupTargetRO(types.DefaultBackupTargetName)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return errors.Wrapf(err, "failed to get the backup target %v", types.DefaultBackupTargetName)
}

// Find the backup volume name from label
backupVolumeName, err := bc.getBackupVolumeName(backup)
if err != nil {
if types.ErrorIsNotFound(err) {
return nil // Ignore error to prevent enqueue
}
return errors.Wrap(err, "failed to get backup volume name")
}

// Examine DeletionTimestamp to determine if object is under deletion
if !backup.DeletionTimestamp.IsZero() {
if err := bc.handleAttachmentTicketDeletion(backup, backupVolumeName); err != nil {
return err
}

backupVolume, err := bc.ds.GetBackupVolume(backupVolumeName)
if err != nil && !apierrors.IsNotFound(err) {
return err
}

if backupTarget.Spec.BackupTargetURL != "" &&
backupVolume != nil && backupVolume.DeletionTimestamp == nil {
backupTargetClient, err := newBackupTargetClientFromDefaultEngineImage(bc.ds, backupTarget)
if err != nil {
log.WithError(err).Warn("Failed to init backup target clients")
return nil // Ignore error to prevent enqueue
}

if unused, err := bc.isBackupNotBeingUsedForVolumeRestore(backup.Name, backupVolumeName); !unused {
log.WithError(err).Warn("Failed to delete remote backup")
return nil
}

backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)
if err := backupTargetClient.BackupDelete(backupURL, backupTargetClient.Credential); err != nil {
return errors.Wrap(err, "failed to delete remote backup")
}
}

// Request backup_volume_controller to reconcile BackupVolume immediately if it's the last backup
if backupVolume != nil && backupVolume.Status.LastBackupName == backup.Name {
backupVolume.Spec.SyncRequestedAt = metav1.Time{Time: time.Now().UTC()}
if _, err = bc.ds.UpdateBackupVolume(backupVolume); err != nil && !apierrors.IsConflict(errors.Cause(err)) {
log.WithError(err).Errorf("Failed to update backup volume %s spec", backupVolumeName)
// Do not return err to enqueue since backup_controller is responsible to
// reconcile Backup CR spec, waits the backup_volume_controller next reconcile time
// to update it's BackupVolume CR status
}
}

// Disable monitor regardless of backup state
bc.disableBackupMonitor(backup.Name)

if backup.Status.State == longhorn.BackupStateError || backup.Status.State == longhorn.BackupStateUnknown {
bc.eventRecorder.Eventf(backup, corev1.EventTypeWarning, string(backup.Status.State), "Failed backup %s has been deleted: %s", backup.Name, backup.Status.Error)
}

autocleanup, err := bc.ds.GetSettingAsBool(types.SettingNameAutoCleanupSnapshotWhenDeleteBackup)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"name": types.SettingNameAutoCleanupSnapshotWhenDeleteBackup,
}).Warn("Failed to get the setting")
}
if autocleanup {
// do the best effort to delete the snapshot
snapshot, err := bc.ds.GetSnapshotRO(backup.Spec.SnapshotName)
if err != nil {
if !apierrors.IsNotFound(err) {
logrus.WithError(err).WithFields(logrus.Fields{
"backup": backup.Name,
"snapshot": snapshot.Name,
}).Warn("Failed to get snapshot")
}
return nil
}
if err = bc.ds.DeleteSnapshot(snapshot.Name); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"backup": backup.Name,
"snapshot": snapshot.Name,
}).Warn("Failed to delete snapshot")
}
}
return bc.ds.RemoveFinalizerForBackup(backup)
}

syncTime := metav1.Time{Time: time.Now().UTC()}
existingBackup := backup.DeepCopy()
existingBackupState := backup.Status.State
defer func() {
if err != nil {
return
}

if bc.backupInFinalState(backup) && (!backup.Status.LastSyncedAt.IsZero() || backup.Spec.SnapshotName == "") {
err = bc.handleAttachmentTicketDeletion(backup, backupVolumeName)
}
if reflect.DeepEqual(existingBackup.Status, backup.Status) {
return
}
if _, err := bc.ds.UpdateBackupStatus(backup); err != nil && apierrors.IsConflict(errors.Cause(err)) {
log.WithError(err).Debugf("Requeue %v due to conflict", backupName)
bc.enqueueBackup(backup)
err = nil // nolint: ineffassign
return
}
if backup.Status.State == longhorn.BackupStateCompleted && existingBackupState != backup.Status.State {
if err := bc.syncBackupVolume(backupVolumeName); err != nil {
log.Warnf("failed to sync Backup Volume: %v", backupVolumeName)
return
}
}
}()

// Perform backup snapshot to the remote backup target
// If the Backup CR is created by the user/API layer (spec.snapshotName != ""), has not been synced (status.lastSyncedAt == "")
// and is not in final state, it means creating a backup from a volume snapshot is required.
// Hence the source of truth is the engine/replica and the controller needs to sync the status with it.
// Otherwise, the Backup CR is created by the backup volume controller, which means the backup already
// exists in the remote backup target before the CR creation.
// What the controller needs to do for this case is retrieve the info from the remote backup target.
if backup.Status.LastSyncedAt.IsZero() && backup.Spec.SnapshotName != "" && !bc.backupInFinalState(backup) {
volume, err := bc.ds.GetVolume(backupVolumeName)
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}
err = fmt.Errorf("Cannot find the corresponding volume: %v", err)
log.WithError(err).Error()
backup.Status.Error = err.Error()
backup.Status.State = longhorn.BackupStateError
backup.Status.LastSyncedAt = syncTime
return nil // Ignore error to prevent enqueue
}

if err := bc.handleAttachmentTicketCreation(backup, backupVolumeName); err != nil {
return err
}

if backup.Status.SnapshotCreatedAt == "" || backup.Status.VolumeSize == "" {
bc.syncBackupStatusWithSnapshotCreationTimeAndVolumeSize(volume, backup)
}

if err := bc.backupBackingImage(volume); err != nil {
return err
}

if backup.Status.Messages == nil {
backup.Status.Messages = map[string]string{}
}

monitor, err := bc.checkMonitor(backup, volume, backupTarget)
if err != nil {
if backup.Status.State == longhorn.BackupStateError {
log.WithError(err).Warnf("Failed to enable the backup monitor for backup %v", backup.Name)
return nil
}
if backup.Status.State == longhorn.BackupStatePending {
log.WithError(err).Warnf("Waiting to enable the backup monitor for backup %v", backup.Name)
return nil
}
return err
}

if err = bc.syncWithMonitor(backup, volume, monitor); err != nil {
return err
}

switch backup.Status.State {
case longhorn.BackupStateNew, longhorn.BackupStatePending, longhorn.BackupStateInProgress:
return nil
case longhorn.BackupStateCompleted:
bc.disableBackupMonitor(backup.Name)
case longhorn.BackupStateError, longhorn.BackupStateUnknown:
backup.Status.LastSyncedAt = syncTime
bc.disableBackupMonitor(backup.Name)
return nil
}
}

// The backup config had synced
if !backup.Status.LastSyncedAt.IsZero() &&
!backup.Spec.SyncRequestedAt.After(backup.Status.LastSyncedAt.Time) {
return nil
}

// The backup creation is complete, then the source of truth becomes the remote backup target
backupTargetClient, err := newBackupTargetClientFromDefaultEngineImage(bc.ds, backupTarget)
if err != nil {
log.WithError(err).Error("Error init backup target clients")
return nil // Ignore error to prevent enqueue
}

backupURL := backupstore.EncodeBackupURL(backup.Name, backupVolumeName, backupTargetClient.URL)
backupInfo, err := backupTargetClient.BackupGet(backupURL, backupTargetClient.Credential)
if err != nil {
if !strings.Contains(err.Error(), "in progress") {
log.WithError(err).Error("Error inspecting backup config")
}
return nil // Ignore error to prevent enqueue
}
if backupInfo == nil {
log.Warn("Backup info is nil")
return nil
}

// Remove the Backup Volume recurring jobs/groups information.
// Only record the latest recurring jobs/groups information in backup volume CR and volume.cfg on remote backup target.
delete(backupInfo.Labels, types.VolumeRecurringJobInfoLabel)

// Update Backup CR status
backup.Status.State = longhorn.BackupStateCompleted
backup.Status.URL = backupInfo.URL
backup.Status.SnapshotName = backupInfo.SnapshotName
backup.Status.SnapshotCreatedAt = backupInfo.SnapshotCreated
backup.Status.BackupCreatedAt = backupInfo.Created
backup.Status.Size = backupInfo.Size
backup.Status.Labels = backupInfo.Labels
backup.Status.Messages = backupInfo.Messages
backup.Status.VolumeName = backupInfo.VolumeName
backup.Status.VolumeSize = backupInfo.VolumeSize
backup.Status.VolumeCreated = backupInfo.VolumeCreated
backup.Status.VolumeBackingImageName = backupInfo.VolumeBackingImageName
backup.Status.CompressionMethod = longhorn.BackupCompressionMethod(backupInfo.CompressionMethod)
backup.Status.LastSyncedAt = syncTime
backup.Status.NewlyUploadedDataSize = backupInfo.NewlyUploadedDataSize
backup.Status.ReUploadedDataSize = backupInfo.ReUploadedDataSize
return nil
}

// handleAttachmentTicketDeletion check and delete attachment so that the source volume is detached if needed

Check warning on line 492 in controller/backup_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/backup_controller.go#L218-L492

Very Complex Method
func (bc *BackupController) handleAttachmentTicketDeletion(backup *longhorn.Backup, volumeName string) (err error) {
defer func() {
err = errors.Wrap(err, "handleAttachmentTicketDeletion: failed to clean up attachment")
Expand Down Expand Up @@ -682,100 +684,100 @@

// checkMonitor checks if the replica monitor existed.
// If yes, returns the replica monitor. Otherwise, create a new replica monitor.
func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longhorn.Volume, backupTarget *longhorn.BackupTarget) (*engineapi.BackupMonitor, error) {
if backup == nil || volume == nil || backupTarget == nil {
return nil, nil
}

// There is a monitor already
if monitor := bc.hasMonitor(backup.Name); monitor != nil {
return monitor, nil
}

// Backing image checksum validation
biChecksum, err := bc.validateBackingImageChecksum(volume.Name, volume.Spec.BackingImage)
if err != nil {
return nil, err
}

concurrentLimit, err := bc.ds.GetSettingAsInt(types.SettingNameBackupConcurrentLimit)
if err != nil {
return nil, errors.Wrapf(err, "failed to assert %v value", types.SettingNameBackupConcurrentLimit)
}
// check if my ticket is satisfied
ok, err := bc.VerifyAttachment(backup, volume.Name)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("waiting for attachment %v to be attached before enabling backup monitor", longhorn.GetAttachmentTicketID(longhorn.AttacherTypeBackupController, backup.Name))
}

engineClientProxy, backupTargetClient, err := getBackupTarget(bc.controllerID, backupTarget, bc.ds, bc.logger, bc.proxyConnCounter)
if err != nil {
return nil, err
}

// get storage class of the pvc binding with the volume
kubernetesStatus := &volume.Status.KubernetesStatus
storageClassName := ""
if kubernetesStatus.PVCName != "" && kubernetesStatus.LastPVCRefAt == "" {
pvc, _ := bc.ds.GetPersistentVolumeClaim(kubernetesStatus.Namespace, kubernetesStatus.PVCName)
if pvc != nil {
if pvc.Spec.StorageClassName != nil {
storageClassName = *pvc.Spec.StorageClassName
}
if storageClassName == "" {
if v, exist := pvc.Annotations[corev1.BetaStorageClassAnnotation]; exist {
storageClassName = v
}
}
if storageClassName == "" {
bc.logger.Warnf("Failed to find the StorageClassName from the pvc %v", pvc.Name)
}
}
}

engine, err := bc.ds.GetVolumeCurrentEngine(volume.Name)
if err != nil {
return nil, err
}

if engine.Status.CurrentState != longhorn.InstanceStateRunning ||
engine.Spec.DesireState != longhorn.InstanceStateRunning ||
volume.Status.State != longhorn.VolumeStateAttached {
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(WaitForEngineMessage, engine.Name)
return nil, fmt.Errorf("waiting for the engine %v to be running before enabling backup monitor", engine.Name)
}

snapshot, err := bc.ds.GetSnapshotRO(backup.Spec.SnapshotName)
if err != nil {
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(FailedToGetSnapshotMessage, backup.Spec.SnapshotName)
return nil, errors.Wrapf(err, "failed to get the snapshot %v before enabling backup monitor", backup.Spec.SnapshotName)
}
if snapshot != nil {
if !snapshot.Status.ReadyToUse {
backup.Status.State = longhorn.BackupStatePending
backup.Status.Messages[MessageTypeReconcileInfo] = fmt.Sprintf(WaitForSnapshotMessage, backup.Spec.SnapshotName)
return nil, fmt.Errorf("waiting for the snapshot %v to be ready before enabling backup monitor", backup.Spec.SnapshotName)
}
}

// Enable the backup monitor
monitor, err := bc.enableBackupMonitor(backup, volume, backupTargetClient, biChecksum,
volume.Spec.BackupCompressionMethod, int(concurrentLimit), storageClassName, engineClientProxy)
if err != nil {
backup.Status.Error = err.Error()
backup.Status.State = longhorn.BackupStateError
backup.Status.LastSyncedAt = metav1.Time{Time: time.Now().UTC()}
return nil, err
}
return monitor, nil
}

// syncWithMonitor syncs the backup state/progress from the replica monitor

Check notice on line 780 in controller/backup_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/backup_controller.go#L687-L780

Complex Method
func (bc *BackupController) syncWithMonitor(backup *longhorn.Backup, volume *longhorn.Volume, monitor *engineapi.BackupMonitor) error {
if backup == nil || volume == nil || monitor == nil {
return nil
Expand Down
35 changes: 35 additions & 0 deletions datastore/longhorn.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,152 +272,152 @@
}

// ValidateSetting checks the given setting value types and condition
func (s *DataStore) ValidateSetting(name, value string) (err error) {
defer func() {
err = errors.Wrapf(err, "failed to set the setting %v with invalid value %v", name, value)
}()
sName := types.SettingName(name)

if err := types.ValidateSetting(name, value); err != nil {
return err
}

switch sName {
case types.SettingNameBackupTarget:
vs, err := s.ListDRVolumesRO()
if err != nil {
return errors.Wrapf(err, "failed to list standby volume when modifying BackupTarget")
}
if len(vs) != 0 {
standbyVolumeNames := make([]string, len(vs))
for k := range vs {
standbyVolumeNames = append(standbyVolumeNames, k)
}
return fmt.Errorf("cannot modify BackupTarget since there are existing standby volumes: %v", standbyVolumeNames)
}
case types.SettingNameBackupTargetCredentialSecret:
secret, err := s.GetSecretRO(s.namespace, value)
if err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to get the secret before modifying backup target credential secret setting")
}
return nil
}
checkKeyList := []string{
types.AWSAccessKey,
types.AWSIAMRoleAnnotation,
types.AWSIAMRoleArn,
types.AWSAccessKey,
types.AWSSecretKey,
types.AWSEndPoint,
types.AWSCert,
types.CIFSUsername,
types.CIFSPassword,
types.AZBlobAccountName,
types.AZBlobAccountKey,
types.AZBlobEndpoint,
types.AZBlobCert,
types.HTTPSProxy,
types.HTTPProxy,
types.NOProxy,
types.VirtualHostedStyle,
}
for _, checkKey := range checkKeyList {
if value, ok := secret.Data[checkKey]; ok {
if strings.TrimSpace(string(value)) != string(value) {
switch {
case strings.TrimLeft(string(value), " ") != string(value):
return fmt.Errorf("invalid leading white space in %s", checkKey)
case strings.TrimRight(string(value), " ") != string(value):
return fmt.Errorf("invalid trailing white space in %s", checkKey)
case strings.TrimLeft(string(value), "\n") != string(value):
return fmt.Errorf("invalid leading new line in %s", checkKey)
case strings.TrimRight(string(value), "\n") != string(value):
return fmt.Errorf("invalid trailing new line in %s", checkKey)
}
return fmt.Errorf("invalid white space or new line in %s", checkKey)
}
}
}
case types.SettingNamePriorityClass:
if value != "" {
if _, err := s.GetPriorityClass(value); err != nil {
return errors.Wrapf(err, "failed to get priority class %v before modifying priority class setting", value)
}
}
case types.SettingNameGuaranteedInstanceManagerCPU, types.SettingNameV2DataEngineGuaranteedInstanceManagerCPU:
guaranteedInstanceManagerCPU, err := s.GetSettingWithAutoFillingRO(sName)
if err != nil {
return err
}
guaranteedInstanceManagerCPU.Value = value
if err := types.ValidateCPUReservationValues(sName, guaranteedInstanceManagerCPU.Value); err != nil {
return err
}
case types.SettingNameV1DataEngine:
old, err := s.GetSettingWithAutoFillingRO(types.SettingNameV1DataEngine)
if err != nil {
return err
}

if old.Value != value {
dataEngineEnabled, err := strconv.ParseBool(value)
if err != nil {
return err
}

_, err = s.ValidateV1DataEngineEnabled(dataEngineEnabled)
if err != nil {
return err
}
}

case types.SettingNameV2DataEngine:
old, err := s.GetSettingWithAutoFillingRO(types.SettingNameV2DataEngine)
if err != nil {
return err
}

if old.Value != value {
dataEngineEnabled, err := strconv.ParseBool(value)
if err != nil {
return err
}

_, err = s.ValidateV2DataEngineEnabled(dataEngineEnabled)
if err != nil {
return err
}
}

case types.SettingNameAutoCleanupSystemGeneratedSnapshot:
disablePurgeValue, err := s.GetSettingAsBool(types.SettingNameDisableSnapshotPurge)
if err != nil {
return err
}
if value == "true" && disablePurgeValue {
return errors.Errorf("cannot set %v setting to true when %v setting is true", name, types.SettingNameDisableSnapshotPurge)
}
case types.SettingNameDisableSnapshotPurge:
autoCleanupValue, err := s.GetSettingAsBool(types.SettingNameAutoCleanupSystemGeneratedSnapshot)
if err != nil {
return err
}
if value == "true" && autoCleanupValue {
return errors.Errorf("cannot set %v setting to true when %v setting is true", name, types.SettingNameAutoCleanupSystemGeneratedSnapshot)
}
case types.SettingNameSnapshotMaxCount:
v, err := strconv.Atoi(value)
if err != nil {
return err
}
if v < 2 || v > 250 {
return fmt.Errorf("%s should be between 2 and 250", name)
}
}
return nil
}

Check notice on line 420 in datastore/longhorn.go

View check run for this annotation

codefactor.io / CodeFactor

datastore/longhorn.go#L275-L420

Complex Method
func (s *DataStore) ValidateV1DataEngineEnabled(dataEngineEnabled bool) (ims []*longhorn.InstanceManager, err error) {
if !dataEngineEnabled {
allVolumesDetached, _ims, err := s.AreAllVolumesDetached(longhorn.DataEngineTypeV1)
Expand Down Expand Up @@ -4193,6 +4193,41 @@
return err
}
}
if job.Parameters != nil {
if err := ValidateRecurringJobParameters(job.Task, job.Labels); err != nil {
return err
}
}
return nil
}

func ValidateRecurringJobParameters(task longhorn.RecurringJobType, parameters map[string]string) (err error) {
switch task {
case longhorn.RecurringJobTypeBackup, longhorn.RecurringJobTypeBackupForceCreate:
for key, value := range parameters {
if err := validateRecurringJobBackupParameter(key, value); err != nil {
return errors.Wrapf(err, "failed to validate recurring job backup task parameters")
}
}
// we don't support any parameters for other tasks currently
default:
return nil
}

return nil
}

func validateRecurringJobBackupParameter(key, value string) error {
switch key {
case types.RecurringJobBackupParameterFullBackupInterval:
_, err := strconv.Atoi(value)
if err != nil {
return errors.Wrapf(err, "%v:%v is not number", key, value)
}
default:
return fmt.Errorf("%v:%v is not a valid parameter", key, value)
}

return nil
}

Expand Down
13 changes: 12 additions & 1 deletion engineapi/backup_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/clock"

lhbackup "github.com/longhorn/go-common-libs/backup"

"github.com/longhorn/longhorn-manager/datastore"
"github.com/longhorn/longhorn-manager/types"
"github.com/longhorn/longhorn-manager/util"
Expand Down Expand Up @@ -73,6 +75,9 @@ func NewBackupMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, backup

// Call engine API snapshot backup
if backup.Status.State == longhorn.BackupStateNew || backup.Status.State == longhorn.BackupStatePending {

backupParameters := getBackupParameters(backup)

// volumeRecurringJobInfo could be "".
volumeRecurringJobInfo, err := m.getVolumeRecurringJobInfos(ds, volume)
if err != nil {
Expand All @@ -84,7 +89,7 @@ func NewBackupMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, backup
}
_, replicaAddress, err := engineClientProxy.SnapshotBackup(engine, backup.Spec.SnapshotName, backup.Name,
backupTargetClient.URL, volume.Spec.BackingImage, biChecksum, string(compressionMethod), concurrentLimit, storageClassName,
backup.Spec.Labels, backupTargetClient.Credential)
backup.Spec.Labels, backupTargetClient.Credential, backupParameters)
if err != nil {
if !strings.Contains(err.Error(), "DeadlineExceeded") {
m.logger.WithError(err).Warn("Cannot take snapshot backup")
Expand Down Expand Up @@ -320,3 +325,9 @@ func (m *BackupMonitor) Close() {
m.engineClientProxy.Close()
m.quit()
}

func getBackupParameters(backup *longhorn.Backup) map[string]string {
parameters := map[string]string{}
parameters[lhbackup.LonghornBackupParameterBackupMode] = string(backup.Spec.BackupMode)
return parameters
}
Loading