From afe13c01744e969beeb7af6e3a28a20bf57037e3 Mon Sep 17 00:00:00 2001 From: James Lu Date: Wed, 8 May 2024 12:06:54 +0800 Subject: [PATCH] fix(backup): manually synchronizing backup volumes. When users make a requst to synchronize the backup target by a new Backup Volume APIs `syncBackupTarget`, it will also trigger the synchronization for thee backup volumes of the backup target. Ref: 7982 Signed-off-by: James Lu --- api/backup.go | 111 ++++++++++++++++++++++++++++++++++++++++++++- api/model.go | 71 +++++++++++++++++++++++++---- api/router.go | 15 ++++-- engineapi/types.go | 1 + manager/engine.go | 25 ++++++++++ 5 files changed, 211 insertions(+), 12 deletions(-) diff --git a/api/backup.go b/api/backup.go index 003994053f..510434a75d 100644 --- a/api/backup.go +++ b/api/backup.go @@ -1,6 +1,7 @@ package api import ( + "fmt" "net/http" "github.com/gorilla/mux" @@ -18,7 +19,61 @@ func (s *Server) BackupTargetList(w http.ResponseWriter, req *http.Request) erro if err != nil { return errors.Wrap(err, "failed to list backup targets") } - apiContext.Write(toBackupTargetCollection(backupTargets)) + apiContext.Write(toBackupTargetCollection(backupTargets, apiContext)) + return nil +} + +func (s *Server) BackupTargetSyncAll(w http.ResponseWriter, req *http.Request) error { + var input SyncBackupResource + + apiContext := api.GetApiContext(req) + if err := apiContext.Read(&input); err != nil { + return err + } + + bts, err := s.m.ListBackupTargetsSorted() + if err != nil { + return errors.Wrap(err, "failed to list backup targets") + } + + if input.SyncAllBackupTargets { + for _, bt := range bts { + if _, err := s.m.SyncBackupTarget(bt); err != nil { + logrus.WithError(err).Warnf("Failed to synchronize backup target %v", bt.Name) + } + } + } + + apiContext.Write(toBackupTargetCollection(bts, apiContext)) + return nil +} + +func (s *Server) BackupTargetSync(w http.ResponseWriter, req *http.Request) error { + var input SyncBackupResource + + apiContext := api.GetApiContext(req) + if err := apiContext.Read(&input); err != nil { + return err + } + + backupTargetName := mux.Vars(req)["backupTargetName"] + if backupTargetName == "" { + return fmt.Errorf("backup target name is required") + } + + bt, err := s.m.GetBackupTarget(backupTargetName) + if err != nil { + return errors.Wrapf(err, "failed to get backup target %v", backupTargetName) + } + + if input.SyncBackupTarget { + bt, err = s.m.SyncBackupTarget(bt) + if err != nil { + return errors.Wrapf(err, "failed to synchronize backup target %v", backupTargetName) + } + } + + apiContext.Write(toBackupTargetResource(bt, apiContext)) return nil } @@ -54,6 +109,60 @@ func (s *Server) BackupVolumeGet(w http.ResponseWriter, req *http.Request) error return nil } +func (s *Server) BackupVolumeSyncAll(w http.ResponseWriter, req *http.Request) error { + var input SyncBackupResource + + apiContext := api.GetApiContext(req) + if err := apiContext.Read(&input); err != nil { + return err + } + + bvs, err := s.m.ListBackupVolumesSorted() + if err != nil { + return errors.Wrap(err, "failed to list backup volumes") + } + + if input.SyncAllBackupVolumes { + for _, bv := range bvs { + if _, err := s.m.SyncBackupVolume(bv); err != nil { + logrus.WithError(err).Warnf("Failed to synchronize backup volume %v", bv.Name) + } + } + } + + apiContext.Write(toBackupVolumeCollection(bvs, apiContext)) + return nil +} + +func (s *Server) SyncBackupVolume(w http.ResponseWriter, req *http.Request) error { + var input SyncBackupResource + + apiContext := api.GetApiContext(req) + if err := apiContext.Read(&input); err != nil { + return err + } + + volName := mux.Vars(req)["volName"] + if volName == "" { + return fmt.Errorf("backup volume name is required") + } + + bv, err := s.m.GetBackupVolume(volName) + if err != nil { + return errors.Wrapf(err, "failed to get backup volume '%s'", volName) + } + + if input.SyncBackupVolume { + bv, err = s.m.SyncBackupVolume(bv) + if err != nil { + return errors.Wrapf(err, "failed to synchronize backup volume %v", volName) + } + } + + apiContext.Write(toBackupVolumeResource(bv, apiContext)) + return nil +} + func (s *Server) BackupVolumeDelete(w http.ResponseWriter, req *http.Request) error { volName := mux.Vars(req)["volName"] if err := s.m.DeleteBackupVolume(volName); err != nil { diff --git a/api/model.go b/api/model.go index 7bef5cf591..71a1ce82a3 100644 --- a/api/model.go +++ b/api/model.go @@ -142,6 +142,14 @@ type BackupVolume struct { StorageClassName string `json:"storageClassName"` } +// SyncBackupResource is used for the Backup*Sync* actions +type SyncBackupResource struct { + SyncAllBackupTargets bool `json:"syncAllBackupTargets"` + SyncBackupTarget bool `json:"syncBackupTarget"` + SyncAllBackupVolumes bool `json:"syncAllBackupVolumes"` + SyncBackupVolume bool `json:"syncBackupVolume"` +} + type Backup struct { client.Resource @@ -556,6 +564,16 @@ type VolumeRecurringJobInput struct { longhorn.VolumeRecurringJob } +type BackupTargetListOutput struct { + Data []BackupTarget `json:"data"` + Type string `json:"type"` +} + +type BackupVolumeListOutput struct { + Data []BackupVolume `json:"data"` + Type string `json:"type"` +} + type BackupListOutput struct { Data []Backup `json:"data"` Type string `json:"type"` @@ -581,10 +599,10 @@ func NewSchema() *client.Schemas { schemas.AddType("detachInput", DetachInput{}) schemas.AddType("snapshotInput", SnapshotInput{}) schemas.AddType("snapshotCRInput", SnapshotCRInput{}) - schemas.AddType("backupTarget", BackupTarget{}) schemas.AddType("backup", Backup{}) schemas.AddType("backupInput", BackupInput{}) schemas.AddType("backupStatus", BackupStatus{}) + schemas.AddType("syncBackupResource", SyncBackupResource{}) schemas.AddType("orphan", Orphan{}) schemas.AddType("restoreStatus", RestoreStatus{}) schemas.AddType("purgeStatus", PurgeStatus{}) @@ -644,6 +662,7 @@ func NewSchema() *client.Schemas { volumeSchema(schemas.AddType("volume", Volume{})) snapshotSchema(schemas.AddType("snapshot", Snapshot{})) snapshotCRSchema(schemas.AddType("snapshotCR", SnapshotCR{})) + backupTargetSchema(schemas.AddType("backupTarget", BackupTarget{})) backupVolumeSchema(schemas.AddType("backupVolume", BackupVolume{})) backupBackingImageSchema(schemas.AddType("backupBackingImage", BackupBackingImage{})) settingSchema(schemas.AddType("setting", Setting{})) @@ -654,6 +673,8 @@ func NewSchema() *client.Schemas { diskSchema(schemas.AddType("diskUpdateInput", DiskUpdateInput{})) diskInfoSchema(schemas.AddType("diskInfo", DiskInfo{})) kubernetesStatusSchema(schemas.AddType("kubernetesStatus", longhorn.KubernetesStatus{})) + backupTargetListOutputSchema(schemas.AddType("backupTargetListOutput", BackupTargetListOutput{})) + backupVolumeListOutputSchema(schemas.AddType("backupVolumeListOutput", BackupVolumeListOutput{})) backupListOutputSchema(schemas.AddType("backupListOutput", BackupListOutput{})) snapshotListOutputSchema(schemas.AddType("snapshotListOutput", SnapshotListOutput{})) systemBackupSchema(schemas.AddType("systemBackup", SystemBackup{})) @@ -814,9 +835,21 @@ func kubernetesStatusSchema(status *client.Schema) { status.ResourceFields["workloadsStatus"] = workloadsStatus } +func backupTargetSchema(backupTarget *client.Schema) { + backupTarget.CollectionMethods = []string{"GET"} + backupTarget.ResourceMethods = []string{"GET", "PUT"} + + backupTarget.ResourceActions = map[string]client.Action{ + "backupTargetSync": { + Input: "syncBackupResource", + Output: "backupTargetListOutput", + }, + } +} + func backupVolumeSchema(backupVolume *client.Schema) { backupVolume.CollectionMethods = []string{"GET"} - backupVolume.ResourceMethods = []string{"GET", "DELETE"} + backupVolume.ResourceMethods = []string{"GET", "PUT", "DELETE"} backupVolume.ResourceActions = map[string]client.Action{ "backupList": { Output: "backupListOutput", @@ -829,6 +862,10 @@ func backupVolumeSchema(backupVolume *client.Schema) { Input: "backupInput", Output: "backupVolume", }, + "backupVolumeSync": { + Input: "syncBackupResource", + Output: "backupVolumeListOutput", + }, } } @@ -1186,6 +1223,18 @@ func snapshotCRSchema(snapshotCR *client.Schema) { snapshotCR.ResourceFields["children"] = children } +func backupTargetListOutputSchema(backupTargetList *client.Schema) { + data := backupTargetList.ResourceFields["data"] + data.Type = "array[backupTarget]" + backupTargetList.ResourceFields["data"] = data +} + +func backupVolumeListOutputSchema(backupVolumeList *client.Schema) { + data := backupVolumeList.ResourceFields["data"] + data.Type = "array[backupVolume]" + backupVolumeList.ResourceFields["data"] = data +} + func backupListOutputSchema(backupList *client.Schema) { data := backupList.ResourceFields["data"] data.Type = "array[backup]" @@ -1699,7 +1748,7 @@ func toVolumeRecurringJobCollection(recurringJobs map[string]*longhorn.VolumeRec return &client.GenericCollection{Data: data, Collection: client.Collection{ResourceType: "volumeRecurringJob"}} } -func toBackupTargetResource(bt *longhorn.BackupTarget) *BackupTarget { +func toBackupTargetResource(bt *longhorn.BackupTarget, apiContext *api.ApiContext) *BackupTarget { if bt == nil { return nil } @@ -1711,6 +1760,7 @@ func toBackupTargetResource(bt *longhorn.BackupTarget) *BackupTarget { Links: map[string]string{}, }, BackupTarget: engineapi.BackupTarget{ + Name: bt.Name, BackupTargetURL: bt.Spec.BackupTargetURL, CredentialSecret: bt.Spec.CredentialSecret, PollInterval: bt.Spec.PollInterval.Duration.String(), @@ -1718,6 +1768,10 @@ func toBackupTargetResource(bt *longhorn.BackupTarget) *BackupTarget { Message: types.GetCondition(bt.Status.Conditions, longhorn.BackupTargetConditionTypeUnavailable).Message, }, } + res.Actions = map[string]string{ + "backupTargetSync": apiContext.UrlBuilder.ActionLink(res.Resource, "backupTargetSync"), + } + return res } @@ -1744,17 +1798,18 @@ func toBackupVolumeResource(bv *longhorn.BackupVolume, apiContext *api.ApiContex StorageClassName: bv.Status.StorageClassName, } b.Actions = map[string]string{ - "backupList": apiContext.UrlBuilder.ActionLink(b.Resource, "backupList"), - "backupGet": apiContext.UrlBuilder.ActionLink(b.Resource, "backupGet"), - "backupDelete": apiContext.UrlBuilder.ActionLink(b.Resource, "backupDelete"), + "backupList": apiContext.UrlBuilder.ActionLink(b.Resource, "backupList"), + "backupGet": apiContext.UrlBuilder.ActionLink(b.Resource, "backupGet"), + "backupDelete": apiContext.UrlBuilder.ActionLink(b.Resource, "backupDelete"), + "backupVolumeSync": apiContext.UrlBuilder.ActionLink(b.Resource, "backupVolumeSync"), } return b } -func toBackupTargetCollection(bts []*longhorn.BackupTarget) *client.GenericCollection { +func toBackupTargetCollection(bts []*longhorn.BackupTarget, apiContext *api.ApiContext) *client.GenericCollection { data := []interface{}{} for _, bt := range bts { - data = append(data, toBackupTargetResource(bt)) + data = append(data, toBackupTargetResource(bt, apiContext)) } return &client.GenericCollection{Data: data, Collection: client.Collection{ResourceType: "backupTarget"}} } diff --git a/api/router.go b/api/router.go index 25f01e2745..5e5a237b2b 100644 --- a/api/router.go +++ b/api/router.go @@ -120,13 +120,22 @@ func NewRouter(s *Server) *mux.Router { } r.Methods("GET").Path("/v1/backuptargets").Handler(f(schemas, s.BackupTargetList)) + r.Methods("PUT").Path("/v1/backuptargets").Handler(f(schemas, s.BackupTargetSyncAll)) + backupTargetActions := map[string]func(http.ResponseWriter, *http.Request) error{ + "backupTargetSync": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupTargetSync), + } + for name, action := range backupTargetActions { + r.Methods("POST").Path("/v1/backuptargets/{backupTargetName}").Queries("action", name).Handler(f(schemas, action)) + } r.Methods("GET").Path("/v1/backupvolumes").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupVolumeList))) + r.Methods("PUT").Path("/v1/backupvolumes").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupVolumeSyncAll))) r.Methods("GET").Path("/v1/backupvolumes/{volName}").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupVolumeGet))) r.Methods("DELETE").Path("/v1/backupvolumes/{volName}").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupVolumeDelete))) backupActions := map[string]func(http.ResponseWriter, *http.Request) error{ - "backupList": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupList), - "backupGet": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupGet), - "backupDelete": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupDelete), + "backupList": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupList), + "backupGet": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupGet), + "backupDelete": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupDelete), + "backupVolumeSync": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.SyncBackupVolume), } for name, action := range backupActions { r.Methods("POST").Path("/v1/backupvolumes/{volName}").Queries("action", name).Handler(f(schemas, action)) diff --git a/engineapi/types.go b/engineapi/types.go index 972087aacb..87822cb400 100644 --- a/engineapi/types.go +++ b/engineapi/types.go @@ -141,6 +141,7 @@ type Volume struct { } type BackupTarget struct { + Name string `json:"name"` BackupTargetURL string `json:"backupTargetURL"` CredentialSecret string `json:"credentialSecret"` PollInterval string `json:"pollInterval"` diff --git a/manager/engine.go b/manager/engine.go index 7f6e023a1a..bec839a6db 100644 --- a/manager/engine.go +++ b/manager/engine.go @@ -18,6 +18,9 @@ import ( const ( BackupStatusQueryInterval = 2 * time.Second + + // minSyncBackupTargetIntervalSec interval in seconds to synchronize backup target to avoid frequent sync + minSyncBackupTargetIntervalSec = 10 ) func (m *VolumeManager) ListSnapshotInfos(volumeName string) (map[string]*longhorn.SnapshotInfo, error) { @@ -328,6 +331,19 @@ func (m *VolumeManager) ListBackupTargetsSorted() ([]*longhorn.BackupTarget, err return backupTargets, nil } +func (m *VolumeManager) GetBackupTarget(backupTargetName string) (*longhorn.BackupTarget, error) { + return m.ds.GetBackupTarget(backupTargetName) +} + +func (m *VolumeManager) SyncBackupTarget(backupTarget *longhorn.BackupTarget) (*longhorn.BackupTarget, error) { + now := metav1.Time{Time: time.Now().UTC()} + if now.Sub(backupTarget.Spec.SyncRequestedAt.Time).Seconds() < minSyncBackupTargetIntervalSec { + return nil, errors.Errorf("cannot synchronize backup target '%v' in %v seconds", backupTarget.Name, minSyncBackupTargetIntervalSec) + } + backupTarget.Spec.SyncRequestedAt = metav1.Time{Time: time.Now().UTC()} + return m.ds.UpdateBackupTarget(backupTarget) +} + func (m *VolumeManager) ListBackupVolumes() (map[string]*longhorn.BackupVolume, error) { return m.ds.ListBackupVolumes() } @@ -363,6 +379,15 @@ func (m *VolumeManager) GetBackupVolume(volumeName string) (*longhorn.BackupVolu return backupVolume, err } +func (m *VolumeManager) SyncBackupVolume(backupVolume *longhorn.BackupVolume) (*longhorn.BackupVolume, error) { + now := metav1.Time{Time: time.Now().UTC()} + if now.Sub(backupVolume.Spec.SyncRequestedAt.Time).Seconds() < minSyncBackupTargetIntervalSec { + return nil, errors.Errorf("failed to synchronize backup volume '%v' in %v seconds", backupVolume.Name, minSyncBackupTargetIntervalSec) + } + backupVolume.Spec.SyncRequestedAt = metav1.Time{Time: time.Now().UTC()} + return m.ds.UpdateBackupVolume(backupVolume) +} + func (m *VolumeManager) DeleteBackupVolume(volumeName string) error { return m.ds.DeleteBackupVolume(volumeName) }