Skip to content

Commit

Permalink
temp commit
Browse files Browse the repository at this point in the history
Need to be splitted into previous commits.

Signed-off-by: James Lu <james.lu@suse.com>
  • Loading branch information
mantissahz committed Oct 24, 2023
1 parent c04564f commit ec2bf08
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 18 deletions.
23 changes: 20 additions & 3 deletions api/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"github.com/rancher/go-rancher/client"
)

const (
BackupTargetDefaultPollInterval = 300
)

func (s *Server) BackupTargetList(w http.ResponseWriter, req *http.Request) error {
apiContext := api.GetApiContext(req)

Expand Down Expand Up @@ -104,7 +108,7 @@ func (s *Server) BackupTargetUpdate(rw http.ResponseWriter, req *http.Request) e
func newBackupTarget(input BackupTarget) (*longhorn.BackupTargetSpec, error) {
pollInterval, err := strconv.ParseInt(input.PollInterval, 10, 64)
if err != nil {
return nil, err
pollInterval = BackupTargetDefaultPollInterval
}

return &longhorn.BackupTargetSpec{
Expand Down Expand Up @@ -164,12 +168,25 @@ func (s *Server) BackupVolumeDelete(w http.ResponseWriter, req *http.Request) er
return nil
}

func (s *Server) BackupList(w http.ResponseWriter, req *http.Request) error {
func (s *Server) BackupListByVolumeName(w http.ResponseWriter, req *http.Request) error {
apiContext := api.GetApiContext(req)

volName := mux.Vars(req)["volName"]

bs, err := s.m.ListBackupsForVolumeSorted(volName, false)
if err != nil {
return errors.Wrapf(err, "failed to list backups for volume '%s'", volName)
}
apiContext.Write(toBackupCollection(bs))
return nil
}

func (s *Server) BackupListByBVName(w http.ResponseWriter, req *http.Request) error {
apiContext := api.GetApiContext(req)

volName := mux.Vars(req)["volName"]

bs, err := s.m.ListBackupsForVolumeSorted(volName)
bs, err := s.m.ListBackupsForVolumeSorted(volName, true)
if err != nil {
return errors.Wrapf(err, "failed to list backups for volume '%s'", volName)
}
Expand Down
4 changes: 2 additions & 2 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewRouter(s *Server) *mux.Router {
r.Methods("DELETE").Path("/v1/secrets/{name}").Handler(f(schemas, s.SecretDelete))
r.Methods("PUT").Path("/v1/secrets/{name}").Handler(f(schemas, s.SecretUpdate))

r.Methods("GET").Path("/v1/backuptargets").Handler(f(schemas, s.BackupTargetList))
r.Methods("GET").Path("/v1/backuptargets").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupTargetList)))
r.Methods("GET").Path("/v1/backuptargets/{name}").Handler(f(schemas, s.BackupTargetGet))
r.Methods("POST").Path("/v1/backuptargets").Handler(f(schemas, s.BackupTargetCreate))
r.Methods("DELETE").Path("/v1/backuptargets/{name}").Handler(f(schemas, s.BackupTargetDelete))
Expand All @@ -133,7 +133,7 @@ func NewRouter(s *Server) *mux.Router {
r.Methods("GET").Path("/v1/backupvolumes/{volName}").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupVolumeGet)))
r.Methods("DELETE").Path("/v1/backupvolumes/{volName}").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupVolumeDelete)))
backupActions := map[string]func(http.ResponseWriter, *http.Request) error{
"backupList": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupList),
"backupList": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupListByBVName),
"backupGet": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupGet),
"backupDelete": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupDelete),
}
Expand Down
4 changes: 2 additions & 2 deletions api/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *Server) volumeList(apiContext *api.ApiContext) (*client.GenericCollecti
if err != nil {
return nil, err
}
backups, err := s.m.ListBackupsForVolumeSorted(v.Name)
backups, err := s.m.ListBackupsForVolumeSorted(v.Name, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func (s *Server) responseWithVolume(rw http.ResponseWriter, req *http.Request, i
if err != nil {
return err
}
backups, err := s.m.ListBackupsForVolumeSorted(id)
backups, err := s.m.ListBackupsForVolumeSorted(id, false)
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ func (bc *BackupController) reconcile(backupName string) (err error) {
return
}
if backup.Status.State == longhorn.BackupStateCompleted && existingBackupState != backup.Status.State {
bc.logger.Warnf("[James_DBG] backup.Spec.BackupTargetName: %v", backup.Spec.BackupTargetName)
if err := bc.syncBackupVolume(volumeName, backup.Spec.BackupTargetName); err != nil {
log.Warnf("failed to sync Backup Volume: %v", volumeName)
return
Expand Down Expand Up @@ -630,10 +631,12 @@ func (bc *BackupController) checkMonitor(backup *longhorn.Backup, volume *longho
return nil, fmt.Errorf("waiting for attachment %v to be attached before enabling backup monitor", longhorn.GetAttachmentTicketID(longhorn.AttacherTypeBackupController, backup.Name))
}

bc.logger.Warnf("[James_DBG] bc backupTarget: %+v", backupTarget)
engineClientProxy, backupTargetClient, err := getBackupTarget(bc.controllerID, backupTarget, bc.ds, bc.logger, bc.proxyConnCounter)
if err != nil {
return nil, err
}
bc.logger.Warnf("[James_DBG] bc backupTargetClient: %+v", backupTargetClient)

// get storage class of the pvc binding with the volume
kubernetesStatus := &volume.Status.KubernetesStatus
Expand Down Expand Up @@ -713,7 +716,9 @@ func (bc *BackupController) syncWithMonitor(backup *longhorn.Backup, volume *lon
// to run reconcile immediately
func (bc *BackupController) syncBackupVolume(volumeName, backupTargetName string) error {
syncTime := metav1.Time{Time: time.Now().UTC()}
bc.logger.Warnf("[James_DBG] syncBackupVolume BV name : %v", volumeName+"-"+backupTargetName)
backupVolume, err := bc.ds.GetBackupVolume(volumeName + "-" + backupTargetName)
bc.logger.Warnf("[James_DBG] syncBackupVolume backupVolume: %v and err: %v", backupVolume, err)
if err == nil {
// Request backup_volume_controller to reconcile BackupVolume immediately.
backupVolume.Spec.SyncRequestedAt = syncTime
Expand Down
3 changes: 3 additions & 0 deletions controller/backup_target_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ func (btc *BackupTargetController) cleanUpAllMounts(backupTarget *longhorn.Backu

func (btc *BackupTargetController) syncBackupVolume(backupTarget *longhorn.BackupTarget, backupTargetClient *engineapi.BackupTargetClient, syncTime metav1.Time, log logrus.FieldLogger) error {
// Get a list of all the backup volumes that are stored in the backup target
btc.logger.Warnf("[James_DBG] btc controller syncBackupVolume backupTargetName: %v", backupTarget.Name)
res, err := backupTargetClient.BackupVolumeNameList(backupTargetClient.URL, backupTargetClient.Credential)
if err != nil {
backupTarget.Status.Available = false
Expand All @@ -494,6 +495,7 @@ func (btc *BackupTargetController) syncBackupVolume(backupTarget *longhorn.Backu
}
backupStoreBackupVolumes := sets.NewString(res...)

btc.logger.Warnf("[James_DBG] btc controller syncBackupVolume bv from engine: %+v", backupStoreBackupVolumes)
// Get a list of all the backup volumes that exist as custom resources in the cluster
clusterBackupVolumes, err := btc.ds.ListBackupVolumesWithBackupTargetName(backupTarget.Name)
if err != nil {
Expand Down Expand Up @@ -552,6 +554,7 @@ func (btc *BackupTargetController) syncBackupVolume(backupTarget *longhorn.Backu

// Update the BackupVolume CR spec.syncRequestAt to request the
// backup_volume_controller to reconcile the BackupVolume CR
btc.logger.Warnf("[James_DBG] btc controller syncBackupVolume bv from cluster will be updated: %+v", clusterBackupVolumes)
for backupVolumeName, backupVolume := range clusterBackupVolumes {
backupVolume.Spec.SyncRequestedAt = syncTime
if _, err = btc.ds.UpdateBackupVolume(backupVolume); err != nil && !apierrors.IsConflict(errors.Cause(err)) {
Expand Down
2 changes: 1 addition & 1 deletion controller/backup_volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (bvc *BackupVolumeController) reconcile(backupVolumeName string) (err error
// Examine DeletionTimestamp to determine if object is under deletion
if !backupVolume.DeletionTimestamp.IsZero() {

backupsOfVolume, err := bvc.ds.ListBackupsWithBackupVolumeName(backupVolume.Spec.VolumeName)
backupsOfVolume, err := bvc.ds.ListBackupsWithVolumeName(backupVolume.Spec.VolumeName)
if err != nil {
return errors.Wrap(err, "failed to get backups by volume name")
}
Expand Down
2 changes: 1 addition & 1 deletion controller/volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4431,7 +4431,7 @@ func (c *VolumeController) ReconcileBackupVolumeState(volume *longhorn.Volume) e
backupVolumeName = volume.Name
}

bv, err := c.ds.GetBackupVolume(backupVolumeName)
bv, err := c.ds.GetLastUpdatedBackupVolumeWithVolumeName(backupVolumeName)
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to get backup volume %s for volume %v", backupVolumeName, volume.Name)
}
Expand Down
64 changes: 61 additions & 3 deletions datastore/longhorn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3359,8 +3359,28 @@ func (s *DataStore) ListBackupVolumes() (map[string]*longhorn.BackupVolume, erro

// ListBackupVolumesWithBackupTargetName returns an object contains all backup volumes in the cluster BackupVolumes CR
// of the given backup target name
func (s *DataStore) ListBackupVolumesWithBackupTargetName(backupTagetName string) (map[string]*longhorn.BackupVolume, error) {
selector, err := getBackupTargetSelector(backupTagetName)
func (s *DataStore) ListBackupVolumesWithBackupTargetName(backupTargetName string) (map[string]*longhorn.BackupVolume, error) {
selector, err := getBackupTargetSelector(backupTargetName)
if err != nil {
return nil, err
}

list, err := s.backupVolumeLister.BackupVolumes(s.namespace).List(selector)
if err != nil {
return nil, err
}

itemMap := map[string]*longhorn.BackupVolume{}
for _, itemRO := range list {
itemMap[itemRO.Name] = itemRO.DeepCopy()
}
return itemMap, nil
}

// ListBackupVolumesWithVolumeName returns an object contains all backup volumes in the cluster BackupVolumes CR
// of the given volume name
func (s *DataStore) ListBackupVolumesWithVolumeName(volumeName string) (map[string]*longhorn.BackupVolume, error) {
selector, err := getBackupVolumeSelector(volumeName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3404,6 +3424,26 @@ func (s *DataStore) GetBackupVolume(name string) (*longhorn.BackupVolume, error)
return resultRO.DeepCopy(), nil
}

// GetLastUpdatedBackupVolumeWithVolumeName returns a copy of last updated BackupVolume with the given volume name in the cluster
func (s *DataStore) GetLastUpdatedBackupVolumeWithVolumeName(volumeName string) (*longhorn.BackupVolume, error) {
backupVolumeMap, err := s.ListBackupVolumesWithVolumeName(volumeName)
if err != nil {
return nil, err
}
var lastUpdatedBackupVolume *longhorn.BackupVolume
for _, backupVolume := range backupVolumeMap {
if lastUpdatedBackupVolume == nil {
lastUpdatedBackupVolume = backupVolume
continue
}
if lastUpdatedBackupVolume.Status.LastSyncedAt.Before(&backupVolume.Status.LastSyncedAt) {
lastUpdatedBackupVolume = backupVolume
}
}
// Cannot use cached object from lister
return lastUpdatedBackupVolume, nil
}

// UpdateBackupVolume updates the given Longhorn backup volume in the cluster BackupVolume CR and verifies update
func (s *DataStore) UpdateBackupVolume(backupVolume *longhorn.BackupVolume) (*longhorn.BackupVolume, error) {
obj, err := s.lhClient.LonghornV1beta2().BackupVolumes(s.namespace).Update(context.TODO(), backupVolume, metav1.UpdateOptions{})
Expand Down Expand Up @@ -3482,7 +3522,25 @@ func (s *DataStore) CreateBackup(backup *longhorn.Backup, backupVolumeName strin
// ListBackupsWithBackupVolumeName returns an object contains all backups in the cluster Backups CR
// of the given backup volume name
func (s *DataStore) ListBackupsWithBackupVolumeName(backupVolumeName string) (map[string]*longhorn.Backup, error) {
selector, err := getBackupVolumeSelector(backupVolumeName)
backupMap, err := s.ListBackups()
if err != nil {
return nil, err
}

itemMap := map[string]*longhorn.Backup{}
for backupName, backup := range backupMap {
bvName := backup.Status.VolumeName + "-" + backup.Spec.BackupTargetName
if bvName == backupVolumeName {
itemMap[backupName] = backup.DeepCopy()
}
}
return itemMap, nil
}

// ListBackupsWithVolumeName returns an object contains all backups in the cluster Backups CR
// of the given volume name
func (s *DataStore) ListBackupsWithVolumeName(volumeName string) (map[string]*longhorn.Backup, error) {
selector, err := getBackupVolumeSelector(volumeName)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions engineapi/backup_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func NewBackupMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, backup
ctx: ctx,
quit: quit,
}
logger.Warnf("[James_DBG] NewBackupMonitor backupTargetClient: %+v", backupTargetClient)

// Call engine API snapshot backup
if backup.Status.State == longhorn.BackupStateNew {
Expand All @@ -84,6 +85,7 @@ func NewBackupMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, backup
if volumeRecurringJobInfo != "" {
backup.Spec.Labels[types.VolumeRecurringJobInfoLabel] = volumeRecurringJobInfo
}
logger.Warnf("[James_DBG] NewBackupMonitor backup.Status.State == longhorn.BackupStateNew backupTargetClient: %+v", backupTargetClient)
_, replicaAddress, err := engineClientProxy.SnapshotBackup(engine, backup.Spec.SnapshotName, backup.Name,
backupTargetClient.URL, volume.Spec.BackingImage, biChecksum, string(compressionMethod), concurrentLimit, storageClassName,
backup.Spec.Labels, backupTargetClient.Credential)
Expand Down
1 change: 1 addition & 0 deletions engineapi/proxy_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (p *Proxy) SnapshotBackup(e *longhorn.Engine, snapshotName, backupName, bac
return "", "", err
}

p.logger.Warnf("[James_DBG] proxy SnapshotBackup backupTarget: %+v", backupTarget)
backupID, replicaAddress, err := p.grpcClient.SnapshotBackup(string(e.Spec.BackendStoreDriver), e.Name,
e.Spec.VolumeName, p.DirectToURL(e), backupName, snapshotName, backupTarget, backingImageName,
backingImageChecksum, compressionMethod, concurrentLimit, storageClassName, labels, credentialEnv,
Expand Down
11 changes: 7 additions & 4 deletions manager/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,12 +439,15 @@ func (m *VolumeManager) ListAllBackupsSorted() ([]*longhorn.Backup, error) {
return backups, nil
}

func (m *VolumeManager) ListBackupsForVolume(volumeName string) (map[string]*longhorn.Backup, error) {
return m.ds.ListBackupsWithBackupVolumeName(volumeName)
func (m *VolumeManager) ListBackupsForVolume(volumeName string, isBVName bool) (map[string]*longhorn.Backup, error) {
if isBVName {
return m.ds.ListBackupsWithBackupVolumeName(volumeName)
}
return m.ds.ListBackupsWithVolumeName(volumeName)
}

func (m *VolumeManager) ListBackupsForVolumeSorted(volumeName string) ([]*longhorn.Backup, error) {
backupMap, err := m.ListBackupsForVolume(volumeName)
func (m *VolumeManager) ListBackupsForVolumeSorted(volumeName string, isBVName bool) ([]*longhorn.Backup, error) {
backupMap, err := m.ListBackupsForVolume(volumeName, isBVName)
if err != nil {
return []*longhorn.Backup{}, err
}
Expand Down
12 changes: 10 additions & 2 deletions manager/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/longhorn/backupstore"
"github.com/longhorn/longhorn-manager/datastore"
"github.com/longhorn/longhorn-manager/engineapi"
"github.com/longhorn/longhorn-manager/scheduler"
Expand Down Expand Up @@ -445,8 +446,15 @@ func (m *VolumeManager) triggerBackupVolumeToSync(volume *longhorn.Volume) error
if !isExist || backupVolumeName == "" {
return errors.Errorf("cannot find the backup volume label for volume: %v", volume.Name)
}

backupVolume, err := m.ds.GetBackupVolume(backupVolumeName)
backupName, _, _, err := backupstore.DecodeBackupURL(volume.Spec.FromBackup)
if err != nil {
return errors.Wrapf(err, "failed to decode backup URL: %v", volume.Spec.FromBackup)
}
backup, err := m.ds.GetBackupRO(backupName)
if err != nil {
return errors.Wrapf(err, "failed to get backup: %v", backupName)
}
backupVolume, err := m.ds.GetBackupVolume(backupVolumeName + "-" + backup.Spec.BackupTargetName)
if err != nil {
return errors.Wrapf(err, "failed to get backup volume: %v", backupVolumeName)
}
Expand Down

0 comments on commit ec2bf08

Please sign in to comment.