Skip to content

Commit

Permalink
feat(backup): support recurring and ondemand full backup
Browse files Browse the repository at this point in the history
ref: longhorn/longhorn 7070

Signed-off-by: Jack Lin <jack.lin@suse.com>
  • Loading branch information
ChanYiLin committed Mar 28, 2024
1 parent 340c10a commit 4d99c6c
Show file tree
Hide file tree
Showing 32 changed files with 282 additions and 43 deletions.
19 changes: 17 additions & 2 deletions api/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type BackupVolume struct {
BackingImageName string `json:"backingImageName"`
BackingImageChecksum string `json:"backingImageChecksum"`
StorageClassName string `json:"storageClassName"`
BackupCount string `json:"backupCount"`
}

type Backup struct {
Expand All @@ -155,12 +156,15 @@ type Backup struct {
Created string `json:"created"`
Size string `json:"size"`
Labels map[string]string `json:"labels"`
Parameters map[string]string `json:"parameters"`
Messages map[string]string `json:"messages"`
VolumeName string `json:"volumeName"`
VolumeSize string `json:"volumeSize"`
VolumeCreated string `json:"volumeCreated"`
VolumeBackingImageName string `json:"volumeBackingImageName"`
CompressionMethod string `json:"compressionMethod"`
NewlyUploadedDataSize string `json:"newlyUploadDataSize"`
ReUploadedDataSize string `json:"reUploadedDataSize"`
}

type BackupBackingImage struct {
Expand Down Expand Up @@ -277,8 +281,9 @@ type DetachInput struct {
}

type SnapshotInput struct {
Name string `json:"name"`
Labels map[string]string `json:"labels"`
Name string `json:"name"`
Labels map[string]string `json:"labels"`
Parameters map[string]string `json:"parameters"`
}

type SnapshotCRInput struct {
Expand Down Expand Up @@ -806,6 +811,11 @@ func recurringJobSchema(job *client.Schema) {
labels.Type = "map[string]"
labels.Nullable = true
job.ResourceFields["labels"] = labels

parameters := job.ResourceFields["parameters"]
parameters.Type = "map[string]"
parameters.Nullable = true
job.ResourceFields["parameters"] = parameters
}

func kubernetesStatusSchema(status *client.Schema) {
Expand Down Expand Up @@ -1742,6 +1752,7 @@ func toBackupVolumeResource(bv *longhorn.BackupVolume, apiContext *api.ApiContex
BackingImageName: bv.Status.BackingImageName,
BackingImageChecksum: bv.Status.BackingImageChecksum,
StorageClassName: bv.Status.StorageClassName,
BackupCount: bv.Status.BackupCount,
}
b.Actions = map[string]string{
"backupList": apiContext.UrlBuilder.ActionLink(b.Resource, "backupList"),
Expand Down Expand Up @@ -1834,12 +1845,15 @@ func toBackupResource(b *longhorn.Backup) *Backup {
Created: b.Status.BackupCreatedAt,
Size: b.Status.Size,
Labels: b.Status.Labels,
Parameters: b.Spec.Parameters,
Messages: b.Status.Messages,
VolumeName: b.Status.VolumeName,
VolumeSize: b.Status.VolumeSize,
VolumeCreated: b.Status.VolumeCreated,
VolumeBackingImageName: b.Status.VolumeBackingImageName,
CompressionMethod: string(b.Status.CompressionMethod),
NewlyUploadedDataSize: b.Status.NewlyUploadedDataSize,
ReUploadedDataSize: b.Status.ReUploadedDataSize,
}
// Set the volume name from backup CR's label if it's empty.
// This field is empty probably because the backup state is not Ready
Expand Down Expand Up @@ -2182,6 +2196,7 @@ func toRecurringJobResource(recurringJob *longhorn.RecurringJob, apiContext *api
Retain: recurringJob.Spec.Retain,
Concurrency: recurringJob.Spec.Concurrency,
Labels: recurringJob.Spec.Labels,
Parameters: recurringJob.Spec.Parameters,
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions api/recurringjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (s *Server) RecurringJobCreate(rw http.ResponseWriter, req *http.Request) e
Retain: input.Retain,
Concurrency: input.Concurrency,
Labels: input.Labels,
Parameters: input.Parameters,
})
if err != nil {
return errors.Wrapf(err, "failed to create recurring job %v", input.Name)
Expand Down Expand Up @@ -90,6 +91,7 @@ func (s *Server) RecurringJobUpdate(rw http.ResponseWriter, req *http.Request) e
Retain: input.Retain,
Concurrency: input.Concurrency,
Labels: input.Labels,
Parameters: input.Parameters,
})
})
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion api/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,14 @@ func (s *Server) SnapshotBackup(w http.ResponseWriter, req *http.Request) (err e
return err
}

backupParameters := map[string]string{}
if input.Parameters == nil {
backupParameters = input.Parameters
}
if err := util.ValidateBackupParameters(backupParameters); err != nil {
return err
}

// Cannot directly compare the structs since KubernetesStatus contains a slice which cannot be compared.
if !reflect.DeepEqual(vol.Status.KubernetesStatus, longhorn.KubernetesStatus{}) {
kubeStatus, err := json.Marshal(vol.Status.KubernetesStatus)
Expand All @@ -195,7 +203,7 @@ func (s *Server) SnapshotBackup(w http.ResponseWriter, req *http.Request) (err e
labels[types.KubernetesStatusLabel] = string(kubeStatus)
}

if err := s.m.BackupSnapshot(bsutil.GenerateName("backup"), volName, input.Name, labels); err != nil {
if err := s.m.BackupSnapshot(bsutil.GenerateName("backup"), volName, input.Name, labels, backupParameters); err != nil {
return err
}

Expand Down
53 changes: 48 additions & 5 deletions app/recurring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/urfave/cli"
"golang.org/x/sync/errgroup"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/longhorn/longhorn-manager/types"
"github.com/longhorn/longhorn-manager/util"

btypes "github.com/longhorn/backupstore/types"
longhornclient "github.com/longhorn/longhorn-manager/client"
longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -56,6 +58,7 @@ type Job struct {
retain int
task longhorn.RecurringJobType
labels map[string]string
parameters map[string]string

eventRecorder record.EventRecorder

Expand Down Expand Up @@ -121,6 +124,15 @@ func recurringJob(c *cli.Context) (err error) {
return errors.Wrap(err, "failed to get JSON encoding for labels")
}

jobParameterMap := map[string]string{}
if recurringJob.Spec.Parameters != nil {
jobParameterMap = recurringJob.Spec.Parameters
}
parameterJSON, err := json.Marshal(jobParameterMap)
if err != nil {
return errors.Wrap(err, "failed to get JSON encoding for parameters")
}

allowDetachedSetting := types.SettingNameAllowRecurringJobWhileVolumeDetached
allowDetached, err := getSettingAsBoolean(allowDetachedSetting, namespace, lhClient)
if err != nil {
Expand Down Expand Up @@ -153,7 +165,7 @@ func recurringJob(c *cli.Context) (err error) {
for _, volumeName := range filteredVolumes {
startJobVolumeName := volumeName
ewg.Go(func() error {
return startVolumeJob(startJobVolumeName, logger, concurrentLimiter, managerURL, jobName, jobTask, jobRetain, jobConcurrent, jobGroups, jobLabelMap, labelJSON)
return startVolumeJob(startJobVolumeName, logger, concurrentLimiter, managerURL, jobName, jobTask, jobRetain, jobConcurrent, jobGroups, jobLabelMap, labelJSON, jobParameterMap, parameterJSON)
})
}

Expand All @@ -162,7 +174,7 @@ func recurringJob(c *cli.Context) (err error) {

func startVolumeJob(
volumeName string, logger *logrus.Logger, concurrentLimiter chan struct{}, managerURL string,
jobName string, jobTask longhorn.RecurringJobType, jobRetain int, jobConcurrent int, jobGroups []string, jobLabelMap map[string]string, labelJSON []byte) error {
jobName string, jobTask longhorn.RecurringJobType, jobRetain int, jobConcurrent int, jobGroups []string, jobLabelMap map[string]string, labelJSON []byte, jobParameterMap map[string]string, parameterJSON []byte) error {

concurrentLimiter <- struct{}{}
defer func() {
Expand All @@ -177,6 +189,7 @@ func startVolumeJob(
"concurrent": jobConcurrent,
"groups": strings.Join(jobGroups, ","),
"labels": string(labelJSON),
"parameters": string(parameterJSON),
})
log.Info("Creating job")

Expand All @@ -187,6 +200,7 @@ func startVolumeJob(
volumeName,
snapshotName,
jobLabelMap,
jobParameterMap,
jobRetain,
jobTask)
if err != nil {
Expand All @@ -213,7 +227,7 @@ func sliceStringSafely(s string, begin, end int) string {
return s[begin:end]
}

func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, parameters map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
namespace := os.Getenv(types.EnvPodNamespace)
if namespace == "" {
return nil, fmt.Errorf("failed detect pod namespace, environment variable %v is missing", types.EnvPodNamespace)
Expand Down Expand Up @@ -263,6 +277,7 @@ func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName stri
volumeName: volumeName,
snapshotName: snapshotName,
labels: labels,
parameters: parameters,
retain: retain,
task: task,
api: apiClient,
Expand Down Expand Up @@ -611,9 +626,37 @@ func (job *Job) doRecurringBackup() (err error) {
return err
}

backupParameters := map[string]string{}
if intervalStr, exists := job.parameters[types.RecurringJobBackupParameterFullBackupInterval]; exists {
interval, err := strconv.Atoi(intervalStr)
if err != nil {
return errors.Wrapf(err, "interval %v is not number", intervalStr)
}

backupVolume, err := job.api.BackupVolume.ById(job.volumeName)
if err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to get backup volume %v", job.volumeName)
}
}

backupCount := 0
if backupVolume != nil && backupVolume.BackupCount != "" {
backupCount, err = strconv.Atoi(backupVolume.BackupCount)
if err != nil {
return errors.Wrapf(err, "backup count %v is not number", backupVolume.BackupCount)
}
}

if backupCount%interval == 0 {
backupParameters[btypes.LonghornBackupOptionBackupMode] = btypes.LonghornBackupModeFull
}
}

if _, err := job.api.Volume.ActionSnapshotBackup(volume, &longhornclient.SnapshotInput{
Labels: job.labels,
Name: job.snapshotName,
Labels: job.labels,
Name: job.snapshotName,
Parameters: job.parameters,
}); err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions client/generated_backup_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type BackupVolume struct {
Size string `json:"size,omitempty" yaml:"size,omitempty"`

StorageClassName string `json:"storageClassName,omitempty" yaml:"storage_class_name,omitempty"`

BackupCount string `json:"backupCount,omitempty" yaml:"backup_count,omitempty"`
}

type BackupVolumeCollection struct {
Expand Down
3 changes: 3 additions & 0 deletions controller/backup_volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ func (bvc *BackupVolumeController) reconcile(backupVolumeName string) (err error
return nil
}

logrus.Infof("[DEBUG]: backupVolumeInfo: %v", backupVolumeInfo)

// Update the Backup CR spec.syncRequestAt to request the
// backup_controller to reconcile the Backup CR if the last backup changed
if backupVolume.Status.LastBackupName != backupVolumeInfo.LastBackupName {
Expand All @@ -408,6 +410,7 @@ func (bvc *BackupVolumeController) reconcile(backupVolumeName string) (err error
backupVolume.Status.BackingImageChecksum = backupVolumeInfo.BackingImageChecksum
backupVolume.Status.StorageClassName = backupVolumeInfo.StorageClassName
backupVolume.Status.LastSyncedAt = syncTime
backupVolume.Status.BackupCount = backupVolumeInfo.BackupCount
return nil
}

Expand Down
35 changes: 35 additions & 0 deletions datastore/longhorn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4139,6 +4139,41 @@ func ValidateRecurringJob(job longhorn.RecurringJobSpec) error {
return err
}
}
if job.Parameters != nil {
if err := ValidateRecurringJobParameters(job.Task, job.Labels); err != nil {
return err
}
}
return nil
}

func ValidateRecurringJobParameters(task longhorn.RecurringJobType, parameters map[string]string) (err error) {
switch task {
case longhorn.RecurringJobTypeBackup, longhorn.RecurringJobTypeBackupForceCreate:
for key, value := range parameters {
if err := validateRecurringJobBackupParameter(key, value); err != nil {
return errors.Wrapf(err, "failed to validate recurring job backup task parameters")
}
}
// we don't support any parameters for other tasks currently
default:
return nil
}

return nil
}

func validateRecurringJobBackupParameter(key, value string) error {
switch key {
case types.RecurringJobBackupParameterFullBackupInterval:
_, err := strconv.Atoi(value)
if err != nil {
return errors.Wrapf(err, "%v:%v is not number", key, value)
}
default:
return fmt.Errorf("%v:%v is not a valid parameter", key, value)
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion engineapi/backup_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewBackupMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, backup
}
_, replicaAddress, err := engineClientProxy.SnapshotBackup(engine, backup.Spec.SnapshotName, backup.Name,
backupTargetClient.URL, volume.Spec.BackingImage, biChecksum, string(compressionMethod), concurrentLimit, storageClassName,
backup.Spec.Labels, backupTargetClient.Credential)
backup.Spec.Labels, backupTargetClient.Credential, backup.Spec.Parameters)
if err != nil {
if !strings.Contains(err.Error(), "DeadlineExceeded") {
m.logger.WithError(err).Warn("Cannot take snapshot backup")
Expand Down
3 changes: 2 additions & 1 deletion engineapi/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (btc *BackupTargetClient) BackupVolumeDelete(destURL, volumeName string, cr
// parseBackupVolumeConfig parses a backup volume config
func parseBackupVolumeConfig(output string) (*BackupVolume, error) {
backupVolume := new(BackupVolume)
logrus.Infof("[DEBUG] parseBackupVolumeConfig output: %v", output)
if err := json.Unmarshal([]byte(output), backupVolume); err != nil {
return nil, errors.Wrapf(err, "error parsing one backup volume config: \n%s", output)
}
Expand Down Expand Up @@ -318,7 +319,7 @@ func (btc *BackupTargetClient) BackupCleanUpAllMounts() (err error) {
// TODO: Deprecated, replaced by gRPC proxy
func (e *EngineBinary) SnapshotBackup(engine *longhorn.Engine, snapName, backupName, backupTarget,
backingImageName, backingImageChecksum, compressionMethod string, concurrentLimit int, storageClassName string,
labels, credential map[string]string) (string, string, error) {
labels, credential, parameters map[string]string) (string, string, error) {
if snapName == etypes.VolumeHeadName {
return "", "", fmt.Errorf("invalid operation: cannot backup %v", etypes.VolumeHeadName)
}
Expand Down
2 changes: 1 addition & 1 deletion engineapi/enginesim.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (e *EngineSimulator) SnapshotPurgeStatus(*longhorn.Engine) (map[string]*lon

func (e *EngineSimulator) SnapshotBackup(engine *longhorn.Engine, snapshotName, backupName, backupTarget,
backingImageName, backingImageChecksum, compressionMethod string, concurrentLimit int, storageClassName string,
labels, credential map[string]string) (string, string, error) {
labels, credential, parameters map[string]string) (string, string, error) {
return "", "", fmt.Errorf(ErrNotImplement)
}

Expand Down
4 changes: 2 additions & 2 deletions engineapi/proxy_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

func (p *Proxy) SnapshotBackup(e *longhorn.Engine, snapshotName, backupName, backupTarget,
backingImageName, backingImageChecksum, compressionMethod string, concurrentLimit int, storageClassName string,
labels, credential map[string]string) (string, string, error) {
labels, credential, parameters map[string]string) (string, string, error) {
if snapshotName == etypes.VolumeHeadName {
return "", "", fmt.Errorf("invalid operation: cannot backup %v", etypes.VolumeHeadName)
}
Expand All @@ -40,7 +40,7 @@ func (p *Proxy) SnapshotBackup(e *longhorn.Engine, snapshotName, backupName, bac

backupID, replicaAddress, err := p.grpcClient.SnapshotBackup(string(e.Spec.DataEngine), e.Name,
e.Spec.VolumeName, p.DirectToURL(e), backupName, snapshotName, backupTarget, backingImageName,
backingImageChecksum, compressionMethod, concurrentLimit, storageClassName, labels, credentialEnv,
backingImageChecksum, compressionMethod, concurrentLimit, storageClassName, labels, credentialEnv, parameters
)
if err != nil {
return "", "", err
Expand Down
3 changes: 2 additions & 1 deletion engineapi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type EngineClient interface {
SnapshotRevert(engine *longhorn.Engine, name string) error
SnapshotPurge(engine *longhorn.Engine) error
SnapshotPurgeStatus(engine *longhorn.Engine) (map[string]*longhorn.PurgeStatus, error)
SnapshotBackup(engine *longhorn.Engine, snapshotName, backupName, backupTarget, backingImageName, backingImageChecksum, compressionMethod string, concurrentLimit int, storageClassName string, labels, credential map[string]string) (string, string, error)
SnapshotBackup(engine *longhorn.Engine, snapshotName, backupName, backupTarget, backingImageName, backingImageChecksum, compressionMethod string, concurrentLimit int, storageClassName string, labels, credential, parameters map[string]string) (string, string, error)
SnapshotBackupStatus(engine *longhorn.Engine, backupName, replicaAddress, replicaName string) (*longhorn.EngineBackupStatus, error)
SnapshotCloneStatus(engine *longhorn.Engine) (map[string]*longhorn.SnapshotCloneStatus, error)
SnapshotClone(engine *longhorn.Engine, snapshotName, fromEngineAddress, fromVolumeName, fromEngineName string, fileSyncHTTPClientTimeout int64) error
Expand Down Expand Up @@ -162,6 +162,7 @@ type BackupVolume struct {
BackingImageName string `json:"backingImageName"`
BackingImageChecksum string `json:"backingImageChecksum"`
StorageClassName string `json:"storageClassName"`
BackupCount string `json:"backupCount"`
}

type Backup struct {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/longhorn/longhorn-manager
go 1.21

replace (
github.com/longhorn/backupstore v0.0.0-20240219094812-3a87ee02df77 => github.com/ChanYiLin/backupstore v0.0.0-20240322024320-ab6fa5ee775f
k8s.io/api => k8s.io/api v0.28.5
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.28.5
k8s.io/apimachinery => k8s.io/apimachinery v0.28.5
Expand Down
Loading

0 comments on commit 4d99c6c

Please sign in to comment.