Skip to content

Commit

Permalink
fix: do not panic when looking up BackupVolume of non-existing Volume
Browse files Browse the repository at this point in the history
1. One Volume can have multiple BackupVolumes, need to check all BackupVolumes when finding backups
2. It is totally valid to look up BackupVolumes of non-existing Volume. A deleted Volume can still
   have Backups in the cluster. Do not panic

Signed-off-by: Phan Le <phan.le@suse.com>
  • Loading branch information
PhanLe1010 committed Feb 3, 2025
1 parent 2820078 commit c1375e4
Showing 1 changed file with 43 additions and 36 deletions.
79 changes: 43 additions & 36 deletions csi/controller_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,24 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Errorf(codes.NotFound, "volume source snapshot %v is not found", snapshot.SnapshotId)
}
backupVolume, backupName := sourceVolumeName, id
bv, err := cs.getBackupVolume(backupVolume)
bvs, err := cs.getBackupVolumes(backupVolume)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve backup volume %s: %v", backupVolume, err)
}
if bv == nil {
if len(bvs) == 0 {
return nil, status.Errorf(codes.NotFound, "failed to restore CSI snapshot %s backup volume %s unavailable", snapshot.SnapshotId, backupVolume)
}

backup, err := cs.apiClient.BackupVolume.ActionBackupGet(bv, &longhornclient.BackupInput{Name: backupName})
if err != nil {
var backup *longhornclient.Backup
for _, bv := range bvs {
backup, err = cs.apiClient.BackupVolume.ActionBackupGet(bv, &longhornclient.BackupInput{Name: backupName})
if err != nil {
return nil, status.Errorf(codes.NotFound, "failed to restore CSI snapshot %v backup %s unavailable", snapshot.SnapshotId, backupName)
}
if backup != nil {
break
}
}
if backup == nil {
return nil, status.Errorf(codes.NotFound, "failed to restore CSI snapshot %v backup %s unavailable", snapshot.SnapshotId, backupName)
}

Expand Down Expand Up @@ -268,7 +276,8 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}, nil
}

func (cs *ControllerServer) getBackupVolume(volumeName string) (*longhornclient.BackupVolume, error) {
func (cs *ControllerServer) getBackupVolumes(volumeName string) ([]*longhornclient.BackupVolume, error) {
bvs := []*longhornclient.BackupVolume{}
log := cs.log.WithFields(logrus.Fields{"function": "getBackupVolume"})
list, err := cs.apiClient.BackupVolume.List(&longhornclient.ListOpts{})
if err != nil {
Expand All @@ -277,20 +286,17 @@ func (cs *ControllerServer) getBackupVolume(volumeName string) (*longhornclient.
if len(list.Data) == 0 {
return nil, nil
}

vol, err := cs.apiClient.Volume.ById(volumeName)
if err != nil {
return nil, errors.Wrapf(err, "getBackupVolume: fail to get source volume %v", volumeName)
}

for _, bv := range list.Data {
if bv.BackupTargetName == vol.BackupTargetName && bv.VolumeName == volumeName {
return &bv, nil
if bv.VolumeName == volumeName {
bvs = append(bvs, &bv)
}
}
log.Debugf("cannot find backup volume with backup target %s and volume %s", vol.BackupTargetName, volumeName)
// return no error if no backup volume found as backup is not completed and backup volume is not created yet.
return nil, nil
if len(bvs) == 0 {
log.Debugf("cannot find backup volume for volume %s", volumeName)
// return no error if no backup volume found as backup is not completed and backup volume is not created yet.
return nil, nil
}
return bvs, nil
}

func (cs *ControllerServer) checkAndPrepareBackingImage(volumeName, backingImageName string, volumeParameters map[string]string, dataEngine string) error {
Expand Down Expand Up @@ -1071,17 +1077,20 @@ func (cs *ControllerServer) cleanupSnapshot(sourceVolumeName, id string) error {

func (cs *ControllerServer) cleanupBackupVolume(sourceVolumeName, id string) error {
backupVolumeName, backupName := sourceVolumeName, id
backupVolume, err := cs.getBackupVolume(backupVolumeName)
backupVolumes, err := cs.getBackupVolumes(backupVolumeName)
if err != nil {
return err
}
if backupVolume != nil && backupVolume.Name != "" {
if _, err = cs.apiClient.BackupVolume.ActionBackupDelete(backupVolume, &longhornclient.BackupInput{
Name: backupName,
}); err != nil {
return err
for _, bv := range backupVolumes {
if bv.Name != "" {
if _, err = cs.apiClient.BackupVolume.ActionBackupDelete(bv, &longhornclient.BackupInput{
Name: backupName,
}); err != nil {
return err
}
}
}

return nil
}

Expand Down Expand Up @@ -1287,24 +1296,22 @@ func (cs *ControllerServer) waitForBackupControllerSync(volumeName, snapshotName
// (and in particular, its name) as quickly as possible and in any state.
func (cs *ControllerServer) getBackup(volumeName, snapshotName string) (*longhornclient.Backup, error) {
// Successfully returns an empty BackupVolume with volumeName even if one doesn't exist.
backupVolume, err := cs.getBackupVolume(volumeName)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if backupVolume == nil {
return nil, nil
}

backupListOutput, err := cs.apiClient.BackupVolume.ActionBackupList(backupVolume)
backupVolumes, err := cs.getBackupVolumes(volumeName)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

var backup *longhornclient.Backup
for _, b := range backupListOutput.Data {
if b.SnapshotName == snapshotName {
backup = &b
break
for _, bv := range backupVolumes {
backupListOutput, err := cs.apiClient.BackupVolume.ActionBackupList(bv)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
for _, b := range backupListOutput.Data {
if b.SnapshotName == snapshotName {
backup = &b
return backup, nil
}
}
}

Expand Down

0 comments on commit c1375e4

Please sign in to comment.