Skip to content

Commit

Permalink
feat(backup): support recurring and ondemand full backup
Browse files Browse the repository at this point in the history
ref: longhorn/longhorn 7070

Signed-off-by: Jack Lin <jack.lin@suse.com>
  • Loading branch information
ChanYiLin committed Apr 2, 2024
1 parent ffe359d commit a480c0d
Show file tree
Hide file tree
Showing 97 changed files with 2,462 additions and 954 deletions.
19 changes: 17 additions & 2 deletions api/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type BackupVolume struct {
BackingImageName string `json:"backingImageName"`
BackingImageChecksum string `json:"backingImageChecksum"`
StorageClassName string `json:"storageClassName"`
BackupCount string `json:"backupCount"`
}

type Backup struct {
Expand All @@ -155,12 +156,15 @@ type Backup struct {
Created string `json:"created"`
Size string `json:"size"`
Labels map[string]string `json:"labels"`
Parameters map[string]string `json:"parameters"`
Messages map[string]string `json:"messages"`
VolumeName string `json:"volumeName"`
VolumeSize string `json:"volumeSize"`
VolumeCreated string `json:"volumeCreated"`
VolumeBackingImageName string `json:"volumeBackingImageName"`
CompressionMethod string `json:"compressionMethod"`
NewlyUploadedDataSize string `json:"newlyUploadDataSize"`
ReUploadedDataSize string `json:"reUploadedDataSize"`
}

type BackupBackingImage struct {
Expand Down Expand Up @@ -277,8 +281,9 @@ type DetachInput struct {
}

type SnapshotInput struct {
Name string `json:"name"`
Labels map[string]string `json:"labels"`
Name string `json:"name"`
Labels map[string]string `json:"labels"`
Parameters map[string]string `json:"parameters"`
}

type SnapshotCRInput struct {
Expand Down Expand Up @@ -806,6 +811,11 @@ func recurringJobSchema(job *client.Schema) {
labels.Type = "map[string]"
labels.Nullable = true
job.ResourceFields["labels"] = labels

parameters := job.ResourceFields["parameters"]
parameters.Type = "map[string]"
parameters.Nullable = true
job.ResourceFields["parameters"] = parameters
}

func kubernetesStatusSchema(status *client.Schema) {
Expand Down Expand Up @@ -1742,6 +1752,7 @@ func toBackupVolumeResource(bv *longhorn.BackupVolume, apiContext *api.ApiContex
BackingImageName: bv.Status.BackingImageName,
BackingImageChecksum: bv.Status.BackingImageChecksum,
StorageClassName: bv.Status.StorageClassName,
BackupCount: bv.Status.BackupCount,
}
b.Actions = map[string]string{
"backupList": apiContext.UrlBuilder.ActionLink(b.Resource, "backupList"),
Expand Down Expand Up @@ -1834,12 +1845,15 @@ func toBackupResource(b *longhorn.Backup) *Backup {
Created: b.Status.BackupCreatedAt,
Size: b.Status.Size,
Labels: b.Status.Labels,
Parameters: b.Spec.Parameters,
Messages: b.Status.Messages,
VolumeName: b.Status.VolumeName,
VolumeSize: b.Status.VolumeSize,
VolumeCreated: b.Status.VolumeCreated,
VolumeBackingImageName: b.Status.VolumeBackingImageName,
CompressionMethod: string(b.Status.CompressionMethod),
NewlyUploadedDataSize: b.Status.NewlyUploadedDataSize,
ReUploadedDataSize: b.Status.ReUploadedDataSize,
}
// Set the volume name from backup CR's label if it's empty.
// This field is empty probably because the backup state is not Ready
Expand Down Expand Up @@ -2182,6 +2196,7 @@ func toRecurringJobResource(recurringJob *longhorn.RecurringJob, apiContext *api
Retain: recurringJob.Spec.Retain,
Concurrency: recurringJob.Spec.Concurrency,
Labels: recurringJob.Spec.Labels,
Parameters: recurringJob.Spec.Parameters,
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions api/recurringjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (s *Server) RecurringJobCreate(rw http.ResponseWriter, req *http.Request) e
Retain: input.Retain,
Concurrency: input.Concurrency,
Labels: input.Labels,
Parameters: input.Parameters,
})
if err != nil {
return errors.Wrapf(err, "failed to create recurring job %v", input.Name)
Expand Down Expand Up @@ -90,6 +91,7 @@ func (s *Server) RecurringJobUpdate(rw http.ResponseWriter, req *http.Request) e
Retain: input.Retain,
Concurrency: input.Concurrency,
Labels: input.Labels,
Parameters: input.Parameters,
})
})
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion api/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ func (s *Server) SnapshotBackup(w http.ResponseWriter, req *http.Request) (err e
if err != nil {
return err
}
backupParameters := map[string]string{}
if input.Parameters != nil {
backupParameters = input.Parameters
}
if err := util.ValidateBackupParameters(backupParameters); err != nil {
return err
}

// Cannot directly compare the structs since KubernetesStatus contains a slice which cannot be compared.
if !reflect.DeepEqual(vol.Status.KubernetesStatus, longhorn.KubernetesStatus{}) {
Expand All @@ -195,7 +202,7 @@ func (s *Server) SnapshotBackup(w http.ResponseWriter, req *http.Request) (err e
labels[types.KubernetesStatusLabel] = string(kubeStatus)
}

if err := s.m.BackupSnapshot(bsutil.GenerateName("backup"), volName, input.Name, labels); err != nil {
if err := s.m.BackupSnapshot(bsutil.GenerateName("backup"), volName, input.Name, labels, backupParameters); err != nil {
return err
}

Expand Down
56 changes: 51 additions & 5 deletions app/recurring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/urfave/cli"
"golang.org/x/sync/errgroup"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
Expand All @@ -31,6 +32,8 @@ import (
longhornclient "github.com/longhorn/longhorn-manager/client"
longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned"

lhbackup "github.com/longhorn/go-common-libs/backup"
)

const (
Expand All @@ -56,6 +59,7 @@ type Job struct {
retain int
task longhorn.RecurringJobType
labels map[string]string
parameters map[string]string

eventRecorder record.EventRecorder

Expand Down Expand Up @@ -121,6 +125,15 @@ func recurringJob(c *cli.Context) (err error) {
return errors.Wrap(err, "failed to get JSON encoding for labels")
}

jobParameterMap := map[string]string{}
if recurringJob.Spec.Parameters != nil {
jobParameterMap = recurringJob.Spec.Parameters
}
parameterJSON, err := json.Marshal(jobParameterMap)
if err != nil {
return errors.Wrap(err, "failed to get JSON encoding for parameters")
}

allowDetachedSetting := types.SettingNameAllowRecurringJobWhileVolumeDetached
allowDetached, err := getSettingAsBoolean(allowDetachedSetting, namespace, lhClient)
if err != nil {
Expand Down Expand Up @@ -153,7 +166,7 @@ func recurringJob(c *cli.Context) (err error) {
for _, volumeName := range filteredVolumes {
startJobVolumeName := volumeName
ewg.Go(func() error {
return startVolumeJob(startJobVolumeName, logger, concurrentLimiter, managerURL, jobName, jobTask, jobRetain, jobConcurrent, jobGroups, jobLabelMap, labelJSON)
return startVolumeJob(startJobVolumeName, logger, concurrentLimiter, managerURL, jobName, jobTask, jobRetain, jobConcurrent, jobGroups, jobLabelMap, labelJSON, jobParameterMap, parameterJSON)
})
}

Expand All @@ -162,7 +175,7 @@ func recurringJob(c *cli.Context) (err error) {

Check notice on line 175 in app/recurring_job.go

View check run for this annotation

codefactor.io / CodeFactor

app/recurring_job.go#L86-L175

Complex Method
func startVolumeJob(
volumeName string, logger *logrus.Logger, concurrentLimiter chan struct{}, managerURL string,
jobName string, jobTask longhorn.RecurringJobType, jobRetain int, jobConcurrent int, jobGroups []string, jobLabelMap map[string]string, labelJSON []byte) error {
jobName string, jobTask longhorn.RecurringJobType, jobRetain int, jobConcurrent int, jobGroups []string, jobLabelMap map[string]string, labelJSON []byte, jobParameterMap map[string]string, parameterJSON []byte) error {

concurrentLimiter <- struct{}{}
defer func() {
Expand All @@ -177,6 +190,7 @@ func startVolumeJob(
"concurrent": jobConcurrent,
"groups": strings.Join(jobGroups, ","),
"labels": string(labelJSON),
"parameters": string(parameterJSON),
})
log.Info("Creating job")

Expand All @@ -187,6 +201,7 @@ func startVolumeJob(
volumeName,
snapshotName,
jobLabelMap,
jobParameterMap,
jobRetain,
jobTask)
if err != nil {
Expand All @@ -213,7 +228,7 @@ func sliceStringSafely(s string, begin, end int) string {
return s[begin:end]
}

func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, parameters map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
namespace := os.Getenv(types.EnvPodNamespace)
if namespace == "" {
return nil, fmt.Errorf("failed detect pod namespace, environment variable %v is missing", types.EnvPodNamespace)
Expand Down Expand Up @@ -263,6 +278,7 @@ func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName stri
volumeName: volumeName,
snapshotName: snapshotName,
labels: labels,
parameters: parameters,
retain: retain,
task: task,
api: apiClient,
Expand Down Expand Up @@ -611,9 +627,39 @@ func (job *Job) doRecurringBackup() (err error) {
return err
}

backupParameters := map[string]string{}
if intervalStr, exists := job.parameters[types.RecurringJobBackupParameterFullBackupInterval]; exists {
interval, err := strconv.Atoi(intervalStr)
if err != nil {
return errors.Wrapf(err, "interval %v is not number", intervalStr)
}

if interval != 0 {
backupVolume, err := job.api.BackupVolume.ById(job.volumeName)
if err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to get backup volume %v", job.volumeName)
}
}

backupCount := 0
if backupVolume != nil && backupVolume.BackupCount != "" {
backupCount, err = strconv.Atoi(backupVolume.BackupCount)
if err != nil {
return errors.Wrapf(err, "backup count %v is not number", backupVolume.BackupCount)
}
}

if backupCount%interval == 0 {
backupParameters[lhbackup.LonghornBackupParameterBackupMode] = lhbackup.LonghornBackupModeFull
}
}
}

if _, err := job.api.Volume.ActionSnapshotBackup(volume, &longhornclient.SnapshotInput{
Labels: job.labels,
Name: job.snapshotName,
Labels: job.labels,
Name: job.snapshotName,
Parameters: backupParameters,
}); err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions client/generated_backup_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type BackupVolume struct {

BackingImageName string `json:"backingImageName,omitempty" yaml:"backing_image_name,omitempty"`

BackupCount string `json:"backupCount,omitempty" yaml:"backup_count,omitempty"`

Created string `json:"created,omitempty" yaml:"created,omitempty"`

DataStored string `json:"dataStored,omitempty" yaml:"data_stored,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions client/generated_snapshot_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ type SnapshotInput struct {

Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`

Parameters map[string]string `json:"parameters,omitempty" yaml:"parameters,omitempty"`

Name string `json:"name,omitempty" yaml:"name,omitempty"`
}

Expand Down
2 changes: 2 additions & 0 deletions controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
backup.Status.VolumeBackingImageName = backupInfo.VolumeBackingImageName
backup.Status.CompressionMethod = longhorn.BackupCompressionMethod(backupInfo.CompressionMethod)
backup.Status.LastSyncedAt = syncTime
backup.Status.NewlyUploadedDataSize = backupInfo.NewlyUploadedDataSize
backup.Status.ReUploadedDataSize = backupInfo.ReUploadedDataSize
return nil
}

Expand Down
1 change: 1 addition & 0 deletions controller/backup_volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ func (bvc *BackupVolumeController) reconcile(backupVolumeName string) (err error
backupVolume.Status.BackingImageChecksum = backupVolumeInfo.BackingImageChecksum
backupVolume.Status.StorageClassName = backupVolumeInfo.StorageClassName
backupVolume.Status.LastSyncedAt = syncTime
backupVolume.Status.BackupCount = backupVolumeInfo.BackupCount
return nil
}

Expand Down
35 changes: 35 additions & 0 deletions datastore/longhorn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4139,6 +4139,41 @@ func ValidateRecurringJob(job longhorn.RecurringJobSpec) error {
return err
}
}
if job.Parameters != nil {
if err := ValidateRecurringJobParameters(job.Task, job.Labels); err != nil {
return err
}
}
return nil
}

func ValidateRecurringJobParameters(task longhorn.RecurringJobType, parameters map[string]string) (err error) {
switch task {
case longhorn.RecurringJobTypeBackup, longhorn.RecurringJobTypeBackupForceCreate:
for key, value := range parameters {
if err := validateRecurringJobBackupParameter(key, value); err != nil {
return errors.Wrapf(err, "failed to validate recurring job backup task parameters")
}
}
// we don't support any parameters for other tasks currently
default:
return nil
}

return nil
}

func validateRecurringJobBackupParameter(key, value string) error {
switch key {
case types.RecurringJobBackupParameterFullBackupInterval:
_, err := strconv.Atoi(value)
if err != nil {
return errors.Wrapf(err, "%v:%v is not number", key, value)
}
default:
return fmt.Errorf("%v:%v is not a valid parameter", key, value)
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion engineapi/backup_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewBackupMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, backup
}
_, replicaAddress, err := engineClientProxy.SnapshotBackup(engine, backup.Spec.SnapshotName, backup.Name,
backupTargetClient.URL, volume.Spec.BackingImage, biChecksum, string(compressionMethod), concurrentLimit, storageClassName,
backup.Spec.Labels, backupTargetClient.Credential)
backup.Spec.Labels, backupTargetClient.Credential, backup.Spec.Parameters)
if err != nil {
if !strings.Contains(err.Error(), "DeadlineExceeded") {
m.logger.WithError(err).Warn("Cannot take snapshot backup")
Expand Down
2 changes: 1 addition & 1 deletion engineapi/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (btc *BackupTargetClient) BackupCleanUpAllMounts() (err error) {
// TODO: Deprecated, replaced by gRPC proxy
func (e *EngineBinary) SnapshotBackup(engine *longhorn.Engine, snapName, backupName, backupTarget,
backingImageName, backingImageChecksum, compressionMethod string, concurrentLimit int, storageClassName string,
labels, credential map[string]string) (string, string, error) {
labels, credential, parameters map[string]string) (string, string, error) {
if snapName == etypes.VolumeHeadName {
return "", "", fmt.Errorf("invalid operation: cannot backup %v", etypes.VolumeHeadName)
}
Expand Down
2 changes: 1 addition & 1 deletion engineapi/enginesim.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (e *EngineSimulator) SnapshotPurgeStatus(*longhorn.Engine) (map[string]*lon

func (e *EngineSimulator) SnapshotBackup(engine *longhorn.Engine, snapshotName, backupName, backupTarget,
backingImageName, backingImageChecksum, compressionMethod string, concurrentLimit int, storageClassName string,
labels, credential map[string]string) (string, string, error) {
labels, credential, parameters map[string]string) (string, string, error) {
return "", "", fmt.Errorf(ErrNotImplement)
}

Expand Down
4 changes: 2 additions & 2 deletions engineapi/proxy_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

func (p *Proxy) SnapshotBackup(e *longhorn.Engine, snapshotName, backupName, backupTarget,
backingImageName, backingImageChecksum, compressionMethod string, concurrentLimit int, storageClassName string,
labels, credential map[string]string) (string, string, error) {
labels, credential, parameters map[string]string) (string, string, error) {
if snapshotName == etypes.VolumeHeadName {
return "", "", fmt.Errorf("invalid operation: cannot backup %v", etypes.VolumeHeadName)
}
Expand All @@ -40,7 +40,7 @@ func (p *Proxy) SnapshotBackup(e *longhorn.Engine, snapshotName, backupName, bac

backupID, replicaAddress, err := p.grpcClient.SnapshotBackup(string(e.Spec.DataEngine), e.Name,
e.Spec.VolumeName, p.DirectToURL(e), backupName, snapshotName, backupTarget, backingImageName,
backingImageChecksum, compressionMethod, concurrentLimit, storageClassName, labels, credentialEnv,
backingImageChecksum, compressionMethod, concurrentLimit, storageClassName, labels, credentialEnv, parameters,
)
if err != nil {
return "", "", err
Expand Down
Loading

0 comments on commit a480c0d

Please sign in to comment.