Skip to content

Commit

Permalink
Add Volume's snapshot data mover result check.
Browse files Browse the repository at this point in the history
1. Add warning log for snapshot data mover fell backup to Velero native snapshot.
2. Add volume backing up result check for snapshot data mover scenario.

Signed-off-by: Xun Jiang <jxun@vmware.com>
  • Loading branch information
Xun Jiang committed Aug 4, 2023
1 parent a6d79fc commit 60d82a3
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/6602-blackpiglet
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add Volume's snapshot data mover result check.
5 changes: 5 additions & 0 deletions pkg/backup/item_backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,11 @@ func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.Fie
// After that, this warning can be removed.
if boolptr.IsSetToTrue(ib.backupRequest.Spec.SnapshotMoveData) {
log.Warnf("VolumeSnapshotter plugin doesn't support data movement.")

if features.IsEnabled(velerov1api.CSIFeatureFlag) && pv.Spec.CSI == nil {
log.Warn("Cannot use CSI data mover to handle PV, because PV doesn't contain CSI in spec.",
"Fall back to Velero native snapshot.")
}
}

if ib.backupRequest.ResPolicies != nil {
Expand Down
33 changes: 33 additions & 0 deletions pkg/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,10 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
}

if pvc.Spec.VolumeName != "" {
if err := ctx.checkVolumeRestoreByDataMover(pvc); err != nil {
errs.Add(namespace, err)
}

// This used to only happen with PVB volumes, but now always remove this binding metadata
obj = resetVolumeBindingInfo(obj)

Expand Down Expand Up @@ -2206,3 +2210,32 @@ func (ctx *restoreContext) processUpdateResourcePolicy(fromCluster, fromClusterW
}
return warnings, errs
}

// isVolumeRestoreByDataMover checks whether the passed PVC is restored by CSI snapshot data
// mover plugin, or it is restored by the fall-backed Velero native snapshot.
// If it's not restored by either of the mentioned method, return a error.
func (ctx *restoreContext) checkVolumeRestoreByDataMover(pvc *v1.PersistentVolumeClaim) error {
if boolptr.IsSetToTrue(ctx.backup.Spec.SnapshotMoveData) {
dataMoverOperationFound := false
for _, operation := range *ctx.itemOperationsList {
if operation.Spec.RestoreUID == string(pvc.UID) && strings.Contains(operation.Spec.OperationID, string(velerov1api.AsyncOperationIDPrefixDataDownload)) {
ctx.log.Debug("PVC has Async operation. It's handled by CSI snapshot data mover.")
return nil
}
}

if !dataMoverOperationFound {
for k, v := range ctx.restoredItems {
if strings.EqualFold(strings.ToLower(k.resource), "v1/persistentvolume") &&
k.name == pvc.Spec.VolumeName && v.action == itemRestoreResultCreated {
ctx.log.Debug("PVC's PV is restored by fall-backed Velero native snapshot.")
return nil
}
}
}

ctx.log.Errorf("PVC %s/%s is not handled by both snapshot data mover and Velero native snapshot", pvc.Namespace, pvc.Name)
return fmt.Errorf("pvc %s/%s is not restored by both snapshot data mover and Velero native snapshot", pvc.Namespace, pvc.Name)
}
return nil
}
66 changes: 66 additions & 0 deletions pkg/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3630,3 +3630,69 @@ func TestIsAlreadyExistsError(t *testing.T) {
})
}
}

func TestCheckVolumeRestoreByDataMover(t *testing.T) {
tests := []struct {
name string
backup velerov1api.Backup
operation *itemoperation.RestoreOperation
restoredItems map[itemKey]restoredItemStatus
pvc *corev1api.PersistentVolumeClaim
wantErr bool
}{
{
name: "backup with snapshotMoveData set to false",
backup: *builder.ForBackup("velero", "backup").SnapshotMoveData(false).Result(),
operation: nil,
restoredItems: nil,
pvc: builder.ForPersistentVolumeClaim("velero", "pvc1").Result(),
wantErr: false,
},
{
name: "no operation and restoredItems",
backup: *builder.ForBackup("velero", "backup").SnapshotMoveData(true).Result(),
operation: nil,
restoredItems: nil,
pvc: builder.ForPersistentVolumeClaim("velero", "pvc1").Result(),
wantErr: true,
},
{
name: "has operation",
backup: *builder.ForBackup("velero", "backup").SnapshotMoveData(true).Result(),
operation: &itemoperation.RestoreOperation{Spec: itemoperation.RestoreOperationSpec{OperationID: "dd-abcdef", RestoreUID: "pvc-uid"}},
restoredItems: nil,
pvc: builder.ForPersistentVolumeClaim("velero", "pvc1").ObjectMeta(builder.WithUID("pvc-uid")).Result(),
wantErr: false,
},
{
name: "has restoredItems",
backup: *builder.ForBackup("velero", "backup").SnapshotMoveData(true).Result(),
operation: nil,
restoredItems: map[itemKey]restoredItemStatus{{resource: "v1/PersistentVolume", name: "pv-name"}: {action: itemRestoreResultCreated}},
pvc: builder.ForPersistentVolumeClaim("velero", "pvc1").VolumeName("pv-name").ObjectMeta(builder.WithUID("pvc-uid")).Result(),
wantErr: false,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
h := newHarness(t)

ctx := &restoreContext{
log: h.log,
backup: &tc.backup,
restoredItems: tc.restoredItems,
}

list := []*itemoperation.RestoreOperation{}
if tc.operation != nil {
list = append(list, tc.operation)
}
ctx.itemOperationsList = &list

if tc.wantErr {
require.Error(t, ctx.checkVolumeRestoreByDataMover(tc.pvc))
}
})
}
}

0 comments on commit 60d82a3

Please sign in to comment.