From a7f38a20f2b40d5ea39e62e30c1ddd072fddded5 Mon Sep 17 00:00:00 2001 From: James Lu Date: Tue, 24 Oct 2023 11:58:05 +0800 Subject: [PATCH] temp commit Need to be splitted into previous commits. Signed-off-by: James Lu --- api/backup.go | 23 +++++++-- api/router.go | 4 +- api/volume.go | 4 +- controller/backup_controller.go | 5 ++ controller/backup_target_controller.go | 3 ++ controller/backup_volume_controller.go | 2 +- controller/volume_controller.go | 2 +- datastore/longhorn.go | 64 ++++++++++++++++++++++++-- engineapi/backup_monitor.go | 2 + engineapi/proxy_backup.go | 1 + manager/engine.go | 11 +++-- manager/volume.go | 12 ++++- 12 files changed, 115 insertions(+), 18 deletions(-) diff --git a/api/backup.go b/api/backup.go index 982cc851dd..109e8ab5bb 100644 --- a/api/backup.go +++ b/api/backup.go @@ -18,6 +18,10 @@ import ( "github.com/rancher/go-rancher/client" ) +const ( + BackupTargetDefaultPollInterval = 300 +) + func (s *Server) BackupTargetList(w http.ResponseWriter, req *http.Request) error { apiContext := api.GetApiContext(req) @@ -104,7 +108,7 @@ func (s *Server) BackupTargetUpdate(rw http.ResponseWriter, req *http.Request) e func newBackupTarget(input BackupTarget) (*longhorn.BackupTargetSpec, error) { pollInterval, err := strconv.ParseInt(input.PollInterval, 10, 64) if err != nil { - return nil, err + pollInterval = BackupTargetDefaultPollInterval } return &longhorn.BackupTargetSpec{ @@ -164,12 +168,25 @@ func (s *Server) BackupVolumeDelete(w http.ResponseWriter, req *http.Request) er return nil } -func (s *Server) BackupList(w http.ResponseWriter, req *http.Request) error { +func (s *Server) BackupListByVolumeName(w http.ResponseWriter, req *http.Request) error { + apiContext := api.GetApiContext(req) + + volName := mux.Vars(req)["volName"] + + bs, err := s.m.ListBackupsForVolumeSorted(volName, false) + if err != nil { + return errors.Wrapf(err, "failed to list backups for volume '%s'", volName) + } + apiContext.Write(toBackupCollection(bs)) + return nil +} + +func (s *Server) BackupListByBVName(w http.ResponseWriter, req *http.Request) error { apiContext := api.GetApiContext(req) volName := mux.Vars(req)["volName"] - bs, err := s.m.ListBackupsForVolumeSorted(volName) + bs, err := s.m.ListBackupsForVolumeSorted(volName, true) if err != nil { return errors.Wrapf(err, "failed to list backups for volume '%s'", volName) } diff --git a/api/router.go b/api/router.go index 209b949f3f..2bf1d5daf8 100644 --- a/api/router.go +++ b/api/router.go @@ -123,7 +123,7 @@ func NewRouter(s *Server) *mux.Router { r.Methods("DELETE").Path("/v1/secrets/{name}").Handler(f(schemas, s.SecretDelete)) r.Methods("PUT").Path("/v1/secrets/{name}").Handler(f(schemas, s.SecretUpdate)) - r.Methods("GET").Path("/v1/backuptargets").Handler(f(schemas, s.BackupTargetList)) + r.Methods("GET").Path("/v1/backuptargets").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupTargetList))) r.Methods("GET").Path("/v1/backuptargets/{name}").Handler(f(schemas, s.BackupTargetGet)) r.Methods("POST").Path("/v1/backuptargets").Handler(f(schemas, s.BackupTargetCreate)) r.Methods("DELETE").Path("/v1/backuptargets/{name}").Handler(f(schemas, s.BackupTargetDelete)) @@ -133,7 +133,7 @@ func NewRouter(s *Server) *mux.Router { r.Methods("GET").Path("/v1/backupvolumes/{volName}").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupVolumeGet))) r.Methods("DELETE").Path("/v1/backupvolumes/{volName}").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupVolumeDelete))) backupActions := map[string]func(http.ResponseWriter, *http.Request) error{ - "backupList": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupList), + "backupList": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupListByBVName), "backupGet": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupGet), "backupDelete": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupDelete), } diff --git a/api/volume.go b/api/volume.go index d1da17e994..a4698a4fc2 100644 --- a/api/volume.go +++ b/api/volume.go @@ -53,7 +53,7 @@ func (s *Server) volumeList(apiContext *api.ApiContext) (*client.GenericCollecti if err != nil { return nil, err } - backups, err := s.m.ListBackupsForVolumeSorted(v.Name) + backups, err := s.m.ListBackupsForVolumeSorted(v.Name, false) if err != nil { return nil, err } @@ -104,7 +104,7 @@ func (s *Server) responseWithVolume(rw http.ResponseWriter, req *http.Request, i if err != nil { return err } - backups, err := s.m.ListBackupsForVolumeSorted(id) + backups, err := s.m.ListBackupsForVolumeSorted(id, false) if err != nil { return err } diff --git a/controller/backup_controller.go b/controller/backup_controller.go index c820226a3b..8fc628262f 100644 --- a/controller/backup_controller.go +++ b/controller/backup_controller.go @@ -334,6 +334,7 @@ func (bc *BackupController) reconcile(backupName string) (err error) { return } if backup.Status.State == longhorn.BackupStateCompleted && existingBackupState != backup.Status.State { + bc.logger.Warnf("[James_DBG] backup.Spec.BackupTargetName: %v", backup.Spec.BackupTargetName) if err := bc.syncBackupVolume(volumeName, backup.Spec.BackupTargetName); err != nil { log.Warnf("failed to sync Backup Volume: %v", volumeName) return @@ -630,10 +631,12 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho return nil, fmt.Errorf("waiting for attachment %v to be attached before enabling backup monitor", longhorn.GetAttachmentTicketID(longhorn.AttacherTypeBackupController, backup.Name)) } + bc.logger.Warnf("[James_DBG] bc backupTarget: %+v", backupTarget) engineClientProxy, backupTargetClient, err := getBackupTarget(bc.controllerID, backupTarget, bc.ds, bc.logger, bc.proxyConnCounter) if err != nil { return nil, err } + bc.logger.Warnf("[James_DBG] bc backupTargetClient: %+v", backupTargetClient) // get storage class of the pvc binding with the volume kubernetesStatus := &volume.Status.KubernetesStatus @@ -713,7 +716,9 @@ func (bc *BackupController) syncWithMonitor(backup *longhorn.Backup, volume *lon // to run reconcile immediately func (bc *BackupController) syncBackupVolume(volumeName, backupTargetName string) error { syncTime := metav1.Time{Time: time.Now().UTC()} + bc.logger.Warnf("[James_DBG] syncBackupVolume BV name : %v", volumeName+"-"+backupTargetName) backupVolume, err := bc.ds.GetBackupVolume(volumeName + "-" + backupTargetName) + bc.logger.Warnf("[James_DBG] syncBackupVolume backupVolume: %v and err: %v", backupVolume, err) if err == nil { // Request backup_volume_controller to reconcile BackupVolume immediately. backupVolume.Spec.SyncRequestedAt = syncTime diff --git a/controller/backup_target_controller.go b/controller/backup_target_controller.go index e35a883e9c..8e19717ecf 100644 --- a/controller/backup_target_controller.go +++ b/controller/backup_target_controller.go @@ -483,6 +483,7 @@ func (btc *BackupTargetController) cleanUpAllMounts(backupTarget *longhorn.Backu func (btc *BackupTargetController) syncBackupVolume(backupTarget *longhorn.BackupTarget, backupTargetClient *engineapi.BackupTargetClient, syncTime metav1.Time, log logrus.FieldLogger) error { // Get a list of all the backup volumes that are stored in the backup target + btc.logger.Warnf("[James_DBG] btc controller syncBackupVolume backupTargetName: %v", backupTarget.Name) res, err := backupTargetClient.BackupVolumeNameList(backupTargetClient.URL, backupTargetClient.Credential) if err != nil { backupTarget.Status.Available = false @@ -494,6 +495,7 @@ func (btc *BackupTargetController) syncBackupVolume(backupTarget *longhorn.Backu } backupStoreBackupVolumes := sets.NewString(res...) + btc.logger.Warnf("[James_DBG] btc controller syncBackupVolume bv from engine: %+v", backupStoreBackupVolumes) // Get a list of all the backup volumes that exist as custom resources in the cluster clusterBackupVolumes, err := btc.ds.ListBackupVolumesWithBackupTargetName(backupTarget.Name) if err != nil { @@ -552,6 +554,7 @@ func (btc *BackupTargetController) syncBackupVolume(backupTarget *longhorn.Backu // Update the BackupVolume CR spec.syncRequestAt to request the // backup_volume_controller to reconcile the BackupVolume CR + btc.logger.Warnf("[James_DBG] btc controller syncBackupVolume bv from cluster will be updated: %+v", clusterBackupVolumes) for backupVolumeName, backupVolume := range clusterBackupVolumes { backupVolume.Spec.SyncRequestedAt = syncTime if _, err = btc.ds.UpdateBackupVolume(backupVolume); err != nil && !apierrors.IsConflict(errors.Cause(err)) { diff --git a/controller/backup_volume_controller.go b/controller/backup_volume_controller.go index e9ce035e21..dd16cc200a 100644 --- a/controller/backup_volume_controller.go +++ b/controller/backup_volume_controller.go @@ -221,7 +221,7 @@ func (bvc *BackupVolumeController) reconcile(backupVolumeName string) (err error // Examine DeletionTimestamp to determine if object is under deletion if !backupVolume.DeletionTimestamp.IsZero() { - backupsOfVolume, err := bvc.ds.ListBackupsWithBackupVolumeName(backupVolume.Spec.VolumeName) + backupsOfVolume, err := bvc.ds.ListBackupsWithVolumeName(backupVolume.Spec.VolumeName) if err != nil { return errors.Wrap(err, "failed to get backups by volume name") } diff --git a/controller/volume_controller.go b/controller/volume_controller.go index 5e9b923971..e8047c75c5 100644 --- a/controller/volume_controller.go +++ b/controller/volume_controller.go @@ -4431,7 +4431,7 @@ func (c *VolumeController) ReconcileBackupVolumeState(volume *longhorn.Volume) e backupVolumeName = volume.Name } - bv, err := c.ds.GetBackupVolume(backupVolumeName) + bv, err := c.ds.GetLastUpdatedBackupVolumeWithVolumeName(backupVolumeName) if err != nil && !apierrors.IsNotFound(err) { return errors.Wrapf(err, "failed to get backup volume %s for volume %v", backupVolumeName, volume.Name) } diff --git a/datastore/longhorn.go b/datastore/longhorn.go index c3e6c88486..9a90df1d02 100644 --- a/datastore/longhorn.go +++ b/datastore/longhorn.go @@ -3366,8 +3366,28 @@ func (s *DataStore) ListBackupVolumes() (map[string]*longhorn.BackupVolume, erro // ListBackupVolumesWithBackupTargetName returns an object contains all backup volumes in the cluster BackupVolumes CR // of the given backup target name -func (s *DataStore) ListBackupVolumesWithBackupTargetName(backupTagetName string) (map[string]*longhorn.BackupVolume, error) { - selector, err := getBackupTargetSelector(backupTagetName) +func (s *DataStore) ListBackupVolumesWithBackupTargetName(backupTargetName string) (map[string]*longhorn.BackupVolume, error) { + selector, err := getBackupTargetSelector(backupTargetName) + if err != nil { + return nil, err + } + + list, err := s.backupVolumeLister.BackupVolumes(s.namespace).List(selector) + if err != nil { + return nil, err + } + + itemMap := map[string]*longhorn.BackupVolume{} + for _, itemRO := range list { + itemMap[itemRO.Name] = itemRO.DeepCopy() + } + return itemMap, nil +} + +// ListBackupVolumesWithVolumeName returns an object contains all backup volumes in the cluster BackupVolumes CR +// of the given volume name +func (s *DataStore) ListBackupVolumesWithVolumeName(volumeName string) (map[string]*longhorn.BackupVolume, error) { + selector, err := getBackupVolumeSelector(volumeName) if err != nil { return nil, err } @@ -3411,6 +3431,26 @@ func (s *DataStore) GetBackupVolume(name string) (*longhorn.BackupVolume, error) return resultRO.DeepCopy(), nil } +// GetLastUpdatedBackupVolumeWithVolumeName returns a copy of last updated BackupVolume with the given volume name in the cluster +func (s *DataStore) GetLastUpdatedBackupVolumeWithVolumeName(volumeName string) (*longhorn.BackupVolume, error) { + backupVolumeMap, err := s.ListBackupVolumesWithVolumeName(volumeName) + if err != nil { + return nil, err + } + var lastUpdatedBackupVolume *longhorn.BackupVolume + for _, backupVolume := range backupVolumeMap { + if lastUpdatedBackupVolume == nil { + lastUpdatedBackupVolume = backupVolume + continue + } + if lastUpdatedBackupVolume.Status.LastSyncedAt.Before(&backupVolume.Status.LastSyncedAt) { + lastUpdatedBackupVolume = backupVolume + } + } + // Cannot use cached object from lister + return lastUpdatedBackupVolume, nil +} + // UpdateBackupVolume updates the given Longhorn backup volume in the cluster BackupVolume CR and verifies update func (s *DataStore) UpdateBackupVolume(backupVolume *longhorn.BackupVolume) (*longhorn.BackupVolume, error) { obj, err := s.lhClient.LonghornV1beta2().BackupVolumes(s.namespace).Update(context.TODO(), backupVolume, metav1.UpdateOptions{}) @@ -3489,7 +3529,25 @@ func (s *DataStore) CreateBackup(backup *longhorn.Backup, backupVolumeName strin // ListBackupsWithBackupVolumeName returns an object contains all backups in the cluster Backups CR // of the given backup volume name func (s *DataStore) ListBackupsWithBackupVolumeName(backupVolumeName string) (map[string]*longhorn.Backup, error) { - selector, err := getBackupVolumeSelector(backupVolumeName) + backupMap, err := s.ListBackups() + if err != nil { + return nil, err + } + + itemMap := map[string]*longhorn.Backup{} + for backupName, backup := range backupMap { + bvName := backup.Status.VolumeName + "-" + backup.Spec.BackupTargetName + if bvName == backupVolumeName { + itemMap[backupName] = backup.DeepCopy() + } + } + return itemMap, nil +} + +// ListBackupsWithVolumeName returns an object contains all backups in the cluster Backups CR +// of the given volume name +func (s *DataStore) ListBackupsWithVolumeName(volumeName string) (map[string]*longhorn.Backup, error) { + selector, err := getBackupVolumeSelector(volumeName) if err != nil { return nil, err } diff --git a/engineapi/backup_monitor.go b/engineapi/backup_monitor.go index 74b97b9ce1..fee25cd09e 100644 --- a/engineapi/backup_monitor.go +++ b/engineapi/backup_monitor.go @@ -72,6 +72,7 @@ func NewBackupMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, backup ctx: ctx, quit: quit, } + logger.Warnf("[James_DBG] NewBackupMonitor backupTargetClient: %+v", backupTargetClient) // Call engine API snapshot backup if backup.Status.State == longhorn.BackupStateNew { @@ -84,6 +85,7 @@ func NewBackupMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, backup if volumeRecurringJobInfo != "" { backup.Spec.Labels[types.VolumeRecurringJobInfoLabel] = volumeRecurringJobInfo } + logger.Warnf("[James_DBG] NewBackupMonitor backup.Status.State == longhorn.BackupStateNew backupTargetClient: %+v", backupTargetClient) _, replicaAddress, err := engineClientProxy.SnapshotBackup(engine, backup.Spec.SnapshotName, backup.Name, backupTargetClient.URL, volume.Spec.BackingImage, biChecksum, string(compressionMethod), concurrentLimit, storageClassName, backup.Spec.Labels, backupTargetClient.Credential) diff --git a/engineapi/proxy_backup.go b/engineapi/proxy_backup.go index 0066e3bc20..485ef7e8b6 100644 --- a/engineapi/proxy_backup.go +++ b/engineapi/proxy_backup.go @@ -38,6 +38,7 @@ func (p *Proxy) SnapshotBackup(e *longhorn.Engine, snapshotName, backupName, bac return "", "", err } + p.logger.Warnf("[James_DBG] proxy SnapshotBackup backupTarget: %+v", backupTarget) backupID, replicaAddress, err := p.grpcClient.SnapshotBackup(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e), backupName, snapshotName, backupTarget, backingImageName, backingImageChecksum, compressionMethod, concurrentLimit, storageClassName, labels, credentialEnv, diff --git a/manager/engine.go b/manager/engine.go index 3f790b5af0..4814ccedeb 100644 --- a/manager/engine.go +++ b/manager/engine.go @@ -439,12 +439,15 @@ func (m *VolumeManager) ListAllBackupsSorted() ([]*longhorn.Backup, error) { return backups, nil } -func (m *VolumeManager) ListBackupsForVolume(volumeName string) (map[string]*longhorn.Backup, error) { - return m.ds.ListBackupsWithBackupVolumeName(volumeName) +func (m *VolumeManager) ListBackupsForVolume(volumeName string, isBVName bool) (map[string]*longhorn.Backup, error) { + if isBVName { + return m.ds.ListBackupsWithBackupVolumeName(volumeName) + } + return m.ds.ListBackupsWithVolumeName(volumeName) } -func (m *VolumeManager) ListBackupsForVolumeSorted(volumeName string) ([]*longhorn.Backup, error) { - backupMap, err := m.ListBackupsForVolume(volumeName) +func (m *VolumeManager) ListBackupsForVolumeSorted(volumeName string, isBVName bool) ([]*longhorn.Backup, error) { + backupMap, err := m.ListBackupsForVolume(volumeName, isBVName) if err != nil { return []*longhorn.Backup{}, err } diff --git a/manager/volume.go b/manager/volume.go index 8e471fe47c..7222047a21 100644 --- a/manager/volume.go +++ b/manager/volume.go @@ -13,6 +13,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/longhorn/backupstore" "github.com/longhorn/longhorn-manager/datastore" "github.com/longhorn/longhorn-manager/engineapi" "github.com/longhorn/longhorn-manager/scheduler" @@ -445,8 +446,15 @@ func (m *VolumeManager) triggerBackupVolumeToSync(volume *longhorn.Volume) error if !isExist || backupVolumeName == "" { return errors.Errorf("cannot find the backup volume label for volume: %v", volume.Name) } - - backupVolume, err := m.ds.GetBackupVolume(backupVolumeName) + backupName, _, _, err := backupstore.DecodeBackupURL(volume.Spec.FromBackup) + if err != nil { + return errors.Wrapf(err, "failed to decode backup URL: %v", volume.Spec.FromBackup) + } + backup, err := m.ds.GetBackupRO(backupName) + if err != nil { + return errors.Wrapf(err, "failed to get backup: %v", backupName) + } + backupVolume, err := m.ds.GetBackupVolume(backupVolumeName + "-" + backup.Spec.BackupTargetName) if err != nil { return errors.Wrapf(err, "failed to get backup volume: %v", backupVolumeName) }