From 5485616abfe6499f9646b3950fde9ad2b523fb59 Mon Sep 17 00:00:00 2001 From: Ming Qiu Date: Fri, 4 Aug 2023 18:03:01 +0800 Subject: [PATCH] Fix data mover bugs #6550 #6563 #6600 Signed-off-by: Ming Qiu --- pkg/builder/data_upload_builder.go | 6 + pkg/cmd/cli/nodeagent/server.go | 24 +- pkg/cmd/server/server.go | 23 +- pkg/controller/data_download_controller.go | 157 ++++++++++- .../data_download_controller_test.go | 247 ++++++++++++++-- pkg/controller/data_upload_controller.go | 235 +++++++++++++--- pkg/controller/data_upload_controller_test.go | 265 ++++++++++++++++-- 7 files changed, 842 insertions(+), 115 deletions(-) diff --git a/pkg/builder/data_upload_builder.go b/pkg/builder/data_upload_builder.go index cb5d0b2de3..a35924fe9d 100644 --- a/pkg/builder/data_upload_builder.go +++ b/pkg/builder/data_upload_builder.go @@ -119,3 +119,9 @@ func (d *DataUploadBuilder) StartTimestamp(startTime *metav1.Time) *DataUploadBu d.object.Status.StartTimestamp = startTime return d } + +// Labels sets the DataUpload's Labels. +func (d *DataUploadBuilder) Labels(labels map[string]string) *DataUploadBuilder { + d.object.Labels = labels + return d +} diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index a7b3e053d0..2fa52f8917 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -368,11 +368,14 @@ func (s *nodeAgentServer) markDataUploadsCancel(r *controller.DataUploadReconcil if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted || du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared || du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { - updated := du.DeepCopy() - updated.Spec.Cancel = true - updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase) - if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(&du)); err != nil { - s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", du.GetName()) + err = controller.UpdateDataUploadWithRetry(s.ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, s.logger.WithField("dataupload", du.Name), + func(dataUpload *velerov2alpha1api.DataUpload) { + dataUpload.Spec.Cancel = true + dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase) + }) + + if err != nil { + s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName()) continue } s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message) @@ -396,10 +399,13 @@ func (s *nodeAgentServer) markDataDownloadsCancel(r *controller.DataDownloadReco if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted || dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { - updated := dd.DeepCopy() - updated.Spec.Cancel = true - updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase) - if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(dd)); err != nil { + err = controller.UpdateDataDownloadWithRetry(s.ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, s.logger.WithField("datadownload", dd.Name), + func(dataDownload *velerov2alpha1api.DataDownload) { + dataDownload.Spec.Cancel = true + dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase) + }) + + if err != nil { s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", dd.GetName()) continue } diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index fcd3c3d501..20e14e3686 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -40,6 +40,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" kubeerrs "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -1107,10 +1108,13 @@ func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted || du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared || du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { - updated := du.DeepCopy() - updated.Spec.Cancel = true - updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase) - if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&du)); err != nil { + err := controller.UpdateDataUploadWithRetry(ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log.WithField("dataupload", du.Name), + func(dataUpload *velerov2alpha1api.DataUpload) { + dataUpload.Spec.Cancel = true + dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase) + }) + + if err != nil { log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName()) continue } @@ -1132,10 +1136,13 @@ func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, rest if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted || dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { - updated := dd.DeepCopy() - updated.Spec.Cancel = true - updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase) - if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&dd)); err != nil { + err := controller.UpdateDataDownloadWithRetry(ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log.WithField("datadownload", dd.Name), + func(dataDownload *velerov2alpha1api.DataDownload) { + dataDownload.Spec.Cancel = true + dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase) + }) + + if err != nil { log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", dd.GetName()) continue } diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index a5947444e9..cffeb3e053 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -27,11 +27,13 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -118,6 +120,33 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return r.errorOut(ctx, dd, errors.New("uninitialized generic exposer"), "uninitialized exposer", log) } + // Add finalizer + // Logic for clear resources when datadownload been deleted + if dd.DeletionTimestamp.IsZero() { // add finalizer for all cr at beginning + if !isDataDownloadInFinalState(dd) && !controllerutil.ContainsFinalizer(dd, dataUploadDownloadFinalizer) { + succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) { + controllerutil.AddFinalizer(dd, dataUploadDownloadFinalizer) + }) + if err != nil { + log.Errorf("failed to add finalizer with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name) + return ctrl.Result{}, err + } else if !succeeded { + log.Warnf("failed to add finilizer for %s/%s and will requeue later", dd.Namespace, dd.Name) + return ctrl.Result{Requeue: true}, nil + } + } + } else if controllerutil.ContainsFinalizer(dd, dataUploadDownloadFinalizer) && !dd.Spec.Cancel && !isDataDownloadInFinalState(dd) { + // when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism + // to help clear up resources instead of clear them directly in case of some conflict with Expose action + if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) { + dataDownload.Spec.Cancel = true + dataDownload.Status.Message = fmt.Sprintf("found a dataupload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name) + }); err != nil { + log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name) + return ctrl.Result{}, err + } + } + if dd.Status.Phase == "" || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseNew { log.Info("Data download starting") @@ -150,15 +179,44 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request // And then only the controller who is in the same node could do the rest work. err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration) if err != nil { - return r.errorOut(ctx, dd, err, "error to start restore expose", log) + if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil { + if !apierrors.IsNotFound(err) { + return ctrl.Result{}, errors.Wrap(err, "getting DataUpload") + } + } + if isDataDownloadInFinalState(dd) { + log.Warnf("expose snapshot with err %v but it may caused by clean up resources in cancel action", err) + r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) + return ctrl.Result{}, nil + } else { + return r.errorOut(ctx, dd, err, "error to expose snapshot", log) + } } + log.Info("Restore is exposed") + // we need to get CR again for it may canceled by datadownload controller on other + // nodes when doing expose action, if detectd cancel action we need to clear up the internal + // resources created by velero during backup. + if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil { + if apierrors.IsNotFound(err) { + log.Debug("Unable to find datadownload") + return ctrl.Result{}, nil + } + return ctrl.Result{}, errors.Wrap(err, "getting datadownload") + } + + // we need to clean up resources as resources created in Expose it may later than cancel action or prepare time + // and need to clean up resources again + if isDataDownloadInFinalState(dd) { + r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) + } + return ctrl.Result{}, nil } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { if dd.Spec.Cancel { log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase) - r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName()) + r.TryCancelDataDownload(ctx, dd) } else if dd.Status.StartTimestamp != nil { if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout { r.onPrepareTimeout(ctx, dd) @@ -249,7 +307,15 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, nil } else { - log.Debugf("Data download now is in %s phase and do nothing by current %s controller", dd.Status.Phase, r.nodeName) + // put the finilizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status + // instead of intermediate state + if isDataDownloadInFinalState(dd) && !dd.DeletionTimestamp.IsZero() && controllerutil.ContainsFinalizer(dd, dataUploadDownloadFinalizer) { + original := dd.DeepCopy() + controllerutil.RemoveFinalizer(dd, dataUploadDownloadFinalizer) + if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error to remove finalizer") + } + } return ctrl.Result{}, nil } } @@ -353,6 +419,32 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na } } +func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) { + log := r.logger.WithField("datadownload", dd.Name) + log.Warn("Async fs backup data path canceled") + + succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dataDownload *velerov2alpha1api.DataDownload) { + dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled + if dataDownload.Status.StartTimestamp.IsZero() { + dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} + } + dataDownload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + }) + + if err != nil { + log.WithError(err).Error("error updating datadownload status") + return + } else if !succeeded { + log.Warn("conflict in updating datadownload status and will try it again later") + return + } + + // success update + r.metrics.RegisterDataDownloadCancel(r.nodeName) + r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) + r.closeDataPath(ctx, dd.Name) +} + func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) { log := r.logger.WithField("datadownload", ddName) @@ -515,16 +607,28 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel // For all data download controller in each node-agent will try to update download CR, and only one controller will success, // and the success one could handle later logic - succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) { - dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted - dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} - }) + + updated := dd.DeepCopy() + + updateFunc := func(datadownload *velerov2alpha1api.DataDownload) { + datadownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted + datadownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} + labels := datadownload.GetLabels() + if labels == nil { + labels = make(map[string]string) + } + labels[acceptNodeLabelKey] = r.nodeName + datadownload.SetLabels(labels) + } + + succeeded, err := r.exclusiveUpdateDataDownload(ctx, updated, updateFunc) if err != nil { return false, err } if succeeded { + updateFunc(dd) // If update success, it's need to update du values in memory r.logger.WithField("DataDownload", dd.Name).Infof("This datadownload has been accepted by %s", r.nodeName) return true, nil } @@ -537,7 +641,6 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler log := r.logger.WithField("DataDownload", dd.Name) log.Info("Timeout happened for preparing datadownload") - succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) { dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed dd.Status.Message = "timeout on preparing data download" @@ -562,13 +665,15 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, updateFunc func(*velerov2alpha1api.DataDownload)) (bool, error) { - updated := dd.DeepCopy() - updateFunc(updated) + updateFunc(dd) + + err := r.client.Update(ctx, dd) - err := r.client.Update(ctx, updated) if err == nil { return true, nil - } else if apierrors.IsConflict(err) { + } + // it won't rollback dd in memory when error + if apierrors.IsConflict(err) { return false, nil } else { return false, err @@ -614,3 +719,31 @@ func findDataDownloadByPod(client client.Client, pod v1.Pod) (*velerov2alpha1api return nil, nil } + +func isDataDownloadInFinalState(dd *velerov2alpha1api.DataDownload) bool { + return dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseFailed || + dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseCanceled || + dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseCompleted +} + +func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(dataDownload *velerov2alpha1api.DataDownload)) error { + return wait.PollUntilWithContext(ctx, time.Second, func(ctx context.Context) (done bool, err error) { + dd := &velerov2alpha1api.DataDownload{} + if err := client.Get(ctx, namespacedName, dd); err != nil { + return false, errors.Wrap(err, "getting DataDownload") + } + + updateFunc(dd) + updateErr := client.Update(ctx, dd) + if updateErr != nil { + if apierrors.IsConflict(updateErr) { + log.Warnf("failed to update datadownload for %s/%s and will retry it", dd.Namespace, dd.Name) + return false, nil + } + log.Errorf("failed to update datadownload with error %s for %s/%s", updateErr.Error(), dd.Namespace, dd.Name) + return false, err + } + + return true, nil + }) +} diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index 726d9b6a03..1ef1b9e2d5 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -36,6 +36,7 @@ import ( clientgofake "k8s.io/client-go/kubernetes/fake" ctrl "sigs.k8s.io/controller-runtime" kbclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -68,25 +69,20 @@ func dataDownloadBuilder() *builder.DataDownloadBuilder { } func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) { - var errs []error = make([]error, 4) - if len(needError) == 4 { - if needError[0] { + var errs []error = make([]error, 5) + for k, isError := range needError { + if k == 0 && isError { errs[0] = fmt.Errorf("Get error") - } - - if needError[1] { + } else if k == 1 && isError { errs[1] = fmt.Errorf("Create error") - } - - if needError[2] { + } else if k == 2 && isError { errs[2] = fmt.Errorf("Update error") - } - - if needError[3] { + } else if k == 3 && isError { errs[3] = fmt.Errorf("Patch error") + } else if k == 4 && isError { + errs[4] = apierrors.NewConflict(velerov2alpha1api.Resource("datadownload"), dataDownloadName, errors.New("conflict")) } } - return initDataDownloadReconcilerWithError(objects, errs...) } @@ -109,12 +105,20 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... Client: fake.NewClientBuilder().WithScheme(scheme).Build(), } - if len(needError) == 4 { - fakeClient.getError = needError[0] - fakeClient.createError = needError[1] - fakeClient.updateError = needError[2] - fakeClient.patchError = needError[3] + for k := range needError { + if k == 0 { + fakeClient.getError = needError[0] + } else if k == 1 { + fakeClient.createError = needError[1] + } else if k == 2 { + fakeClient.updateError = needError[2] + } else if k == 3 { + fakeClient.patchError = needError[3] + } else if k == 4 { + fakeClient.updateConflict = needError[4] + } } + var fakeKubeClient *clientgofake.Clientset if len(objects) != 0 { fakeKubeClient = clientgofake.NewSimpleClientset(objects...) @@ -172,6 +176,7 @@ func TestDataDownloadReconcile(t *testing.T) { mockClose bool expected *velerov2alpha1api.DataDownload expectedStatusMsg string + checkFunc func(du velerov2alpha1api.DataDownload) bool expectedResult *ctrl.Result }{ { @@ -290,6 +295,31 @@ func TestDataDownloadReconcile(t *testing.T) { dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).StartTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(), expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Result(), }, + { + name: "dataDownload with enabled cancel", + dd: func() *velerov2alpha1api.DataDownload { + dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result() + controllerutil.AddFinalizer(dd, dataUploadDownloadFinalizer) + dd.DeletionTimestamp = &metav1.Time{Time: time.Now()} + return dd + }(), + checkFunc: func(du velerov2alpha1api.DataDownload) bool { + return du.Spec.Cancel + }, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).Result(), + }, + { + name: "dataDownload with remove finalizer and should not be retrieved", + dd: func() *velerov2alpha1api.DataDownload { + dd := dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Cancel(true).Result() + controllerutil.AddFinalizer(dd, dataUploadDownloadFinalizer) + dd.DeletionTimestamp = &metav1.Time{Time: time.Now()} + return dd + }(), + checkFunc: func(dd velerov2alpha1api.DataDownload) bool { + return !controllerutil.ContainsFinalizer(&dd, dataUploadDownloadFinalizer) + }, + }, } for _, test := range tests { @@ -398,7 +428,11 @@ func TestDataDownloadReconcile(t *testing.T) { assert.Contains(t, dd.Status.Message, test.expectedStatusMsg) } if test.dd.Namespace == velerov1api.DefaultNamespace { - require.Nil(t, err) + if controllerutil.ContainsFinalizer(test.dd, dataUploadDownloadFinalizer) { + assert.True(t, true, apierrors.IsNotFound(err)) + } else { + require.Nil(t, err) + } } else { assert.True(t, true, apierrors.IsNotFound(err)) } @@ -727,3 +761,178 @@ func TestOnDdPrepareTimeout(t *testing.T) { assert.Equal(t, test.expected.Status.Phase, dd.Status.Phase) } } + +func TestTryCancelDataDownload(t *testing.T) { + tests := []struct { + name string + dd *velerov2alpha1api.DataDownload + needErrs []error + succeeded bool + expectedErr string + }{ + { + name: "update fail", + dd: dataDownloadBuilder().Result(), + needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil}, + }, + { + name: "cancel by others", + dd: dataDownloadBuilder().Result(), + needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil}, + }, + { + name: "succeed", + dd: dataDownloadBuilder().Result(), + needErrs: []error{nil, nil, nil, nil}, + succeeded: true, + }, + } + for _, test := range tests { + ctx := context.Background() + r, err := initDataDownloadReconcilerWithError(nil, test.needErrs...) + require.NoError(t, err) + + err = r.client.Create(ctx, test.dd) + require.NoError(t, err) + + r.TryCancelDataDownload(ctx, test.dd) + + if test.expectedErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.expectedErr) + } + } +} + +func TestUpdateDataDownloadWithRetry(t *testing.T) { + + namespacedName := types.NamespacedName{ + Name: dataDownloadName, + Namespace: "velero", + } + + // Define test cases + testCases := []struct { + Name string + needErrs []bool + ExpectErr bool + }{ + { + Name: "SuccessOnFirstAttempt", + needErrs: []bool{false, false, false, false}, + ExpectErr: false, + }, + { + Name: "Error get", + needErrs: []bool{true, false, false, false, false}, + ExpectErr: true, + }, + { + Name: "Error update", + needErrs: []bool{false, false, true, false, false}, + ExpectErr: true, + }, + { + Name: "Conflict with error timeout", + needErrs: []bool{false, false, false, false, true}, + ExpectErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + ctx, cancelFunc := context.WithTimeout(context.TODO(), time.Second*5) + defer cancelFunc() + r, err := initDataDownloadReconciler(nil, tc.needErrs...) + require.NoError(t, err) + err = r.client.Create(ctx, dataDownloadBuilder().Result()) + require.NoError(t, err) + updateFunc := func(dataDownload *velerov2alpha1api.DataDownload) { + dataDownload.Spec.Cancel = true + } + err = UpdateDataDownloadWithRetry(ctx, r.client, namespacedName, velerotest.NewLogger().WithField("name", tc.Name), updateFunc) + if tc.ExpectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestFindDataDownloads(t *testing.T) { + tests := []struct { + name string + pod corev1.Pod + du *velerov2alpha1api.DataDownload + expectedUploads []velerov2alpha1api.DataDownload + expectedError bool + }{ + // Test case 1: Pod with matching nodeName and DataDownload label + { + name: "MatchingPod", + pod: corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "pod-1", + Labels: map[string]string{ + velerov1api.DataDownloadLabel: dataDownloadName, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + }, + du: dataDownloadBuilder().Result(), + expectedUploads: []velerov2alpha1api.DataDownload{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: dataDownloadName, + }, + }, + }, + expectedError: false, + }, + // Test case 2: Pod with non-matching nodeName + { + name: "NonMatchingNodePod", + pod: corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "pod-2", + Labels: map[string]string{ + velerov1api.DataDownloadLabel: dataDownloadName, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-2", + }, + }, + du: dataDownloadBuilder().Result(), + expectedUploads: []velerov2alpha1api.DataDownload{}, + expectedError: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + r, err := initDataDownloadReconcilerWithError(nil) + require.NoError(t, err) + r.nodeName = "node-1" + err = r.client.Create(ctx, test.du) + require.NoError(t, err) + err = r.client.Create(ctx, &test.pod) + require.NoError(t, err) + uploads, err := r.FindDataDownloads(context.Background(), r.client, "velero") + + if test.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, len(test.expectedUploads), len(uploads)) + } + }) + } +} diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index f52de9f9bb..b9297b4ea0 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -27,11 +27,13 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" clocks "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -54,8 +56,10 @@ import ( ) const ( - dataUploadDownloadRequestor string = "snapshot-data-upload-download" - preparingMonitorFrequency time.Duration = time.Minute + dataUploadDownloadRequestor = "snapshot-data-upload-download" + acceptNodeLabelKey = "velero.io/accepted-by" + dataUploadDownloadFinalizer = "velero.io/data-upload-download-finalizer" + preparingMonitorFrequency = time.Minute ) // DataUploadReconciler reconciles a DataUpload object @@ -107,8 +111,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) "dataupload": req.NamespacedName, }) log.Infof("Reconcile %s", req.Name) - var du velerov2alpha1api.DataUpload - if err := r.client.Get(ctx, req.NamespacedName, &du); err != nil { + du := &velerov2alpha1api.DataUpload{} + if err := r.client.Get(ctx, req.NamespacedName, du); err != nil { if apierrors.IsNotFound(err) { log.Debug("Unable to find DataUpload") return ctrl.Result{}, nil @@ -123,15 +127,42 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) ep, ok := r.snapshotExposerList[du.Spec.SnapshotType] if !ok { - return r.errorOut(ctx, &du, errors.Errorf("%s type of snapshot exposer is not exist", du.Spec.SnapshotType), "not exist type of exposer", log) + return r.errorOut(ctx, du, errors.Errorf("%s type of snapshot exposer is not exist", du.Spec.SnapshotType), "not exist type of exposer", log) + } + + // Logic for clear resources when dataupload been deleted + if du.DeletionTimestamp.IsZero() { // add finalizer for all cr at beginning + if !isDataUploadInFinalState(du) && !controllerutil.ContainsFinalizer(du, dataUploadDownloadFinalizer) { + succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(du *velerov2alpha1api.DataUpload) { + controllerutil.AddFinalizer(du, dataUploadDownloadFinalizer) + }) + + if err != nil { + log.Errorf("failed to add finalizer with error %s for %s/%s", err.Error(), du.Namespace, du.Name) + return ctrl.Result{}, err + } else if !succeeded { + log.Warnf("failed to add finilizer for %s/%s and will requeue later", du.Namespace, du.Name) + return ctrl.Result{Requeue: true}, nil + } + } + } else if controllerutil.ContainsFinalizer(du, dataUploadDownloadFinalizer) && !du.Spec.Cancel && !isDataUploadInFinalState(du) { + // when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism + // to help clear up resources instead of clear them directly in case of some conflict with Expose action + if err := UpdateDataUploadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataUpload *velerov2alpha1api.DataUpload) { + dataUpload.Spec.Cancel = true + dataUpload.Status.Message = fmt.Sprintf("found a dataupload %s/%s is being deleted, mark it as cancel", du.Namespace, du.Name) + }); err != nil { + log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), du.Namespace, du.Name) + return ctrl.Result{}, err + } } if du.Status.Phase == "" || du.Status.Phase == velerov2alpha1api.DataUploadPhaseNew { log.Info("Data upload starting") - accepted, err := r.acceptDataUpload(ctx, &du) + accepted, err := r.acceptDataUpload(ctx, du) if err != nil { - return r.errorOut(ctx, &du, err, "error to accept the data upload", log) + return r.errorOut(ctx, du, err, "error to accept the data upload", log) } if !accepted { @@ -146,22 +177,55 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } - exposeParam := r.setupExposeParam(&du) + exposeParam := r.setupExposeParam(du) - if err := ep.Expose(ctx, getOwnerObject(&du), exposeParam); err != nil { - return r.errorOut(ctx, &du, err, "error to expose snapshot", log) - } - log.Info("Snapshot is exposed") // Expose() will trigger to create one pod whose volume is restored by a given volume snapshot, // but the pod maybe is not in the same node of the current controller, so we need to return it here. // And then only the controller who is in the same node could do the rest work. + if err := ep.Expose(ctx, getOwnerObject(du), exposeParam); err != nil { + if err := r.client.Get(ctx, req.NamespacedName, du); err != nil { + if !apierrors.IsNotFound(err) { + return ctrl.Result{}, errors.Wrap(err, "getting DataUpload") + } + } + if isDataUploadInFinalState(du) { + log.Warnf("expose snapshot with err %v but it may caused by clean up resources in cancel action", err) + r.cleanUp(ctx, du, log) + return ctrl.Result{}, nil + } else { + return r.errorOut(ctx, du, err, "error to expose snapshot", log) + } + } + + log.Info("Snapshot is exposed") + + // we need to get CR again for it may canceled by dataupload controller on other + // nodes when doing expose action, if detectd cancel action we need to clear up the internal + // resources created by velero during backup. + if err := r.client.Get(ctx, req.NamespacedName, du); err != nil { + if apierrors.IsNotFound(err) { + log.Debug("Unable to find DataUpload") + return ctrl.Result{}, nil + } + return ctrl.Result{}, errors.Wrap(err, "getting DataUpload") + } + + // we need to clean up resources as resources created in Expose it may later than cancel action or prepare time + // and need to clean up resources again + if isDataUploadInFinalState(du) { + r.cleanUp(ctx, du, log) + } + return ctrl.Result{}, nil } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { if du.Spec.Cancel { - r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName()) + // we don't want to update CR into cancel status forcely as it may conflict with CR update in Expose action + // we could retry when the CR requeue in periodcally + log.Debugf("Data upload is been canceled %s in Phase %s", du.GetName(), du.Status.Phase) + r.TryCancelDataUpload(ctx, du) } else if du.Status.StartTimestamp != nil { if time.Since(du.Status.StartTimestamp.Time) >= r.preparingTimeout { - r.onPrepareTimeout(ctx, &du) + r.onPrepareTimeout(ctx, du) } } @@ -179,10 +243,10 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) log.Info("Cancellable data path is already started") return ctrl.Result{}, nil } - waitExposePara := r.setupWaitExposePara(&du) - res, err := ep.GetExposed(ctx, getOwnerObject(&du), du.Spec.OperationTimeout.Duration, waitExposePara) + waitExposePara := r.setupWaitExposePara(du) + res, err := ep.GetExposed(ctx, getOwnerObject(du), du.Spec.OperationTimeout.Duration, waitExposePara) if err != nil { - return r.errorOut(ctx, &du, err, "exposed snapshot is not ready", log) + return r.errorOut(ctx, du, err, "exposed snapshot is not ready", log) } else if res == nil { log.Debug("Get empty exposer") return ctrl.Result{}, nil @@ -204,19 +268,19 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) log.Info("Data path instance is concurrent limited requeue later") return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil } else { - return r.errorOut(ctx, &du, err, "error to create data path", log) + return r.errorOut(ctx, du, err, "error to create data path", log) } } // Update status to InProgress original := du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress - if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { - return r.errorOut(ctx, &du, err, "error updating dataupload status", log) + if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { + return r.errorOut(ctx, du, err, "error updating dataupload status", log) } log.Info("Data upload is marked as in progress") - result, err := r.runCancelableDataUpload(ctx, fsBackup, &du, res, log) + result, err := r.runCancelableDataUpload(ctx, fsBackup, du, res, log) if err != nil { log.Errorf("Failed to run cancelable data path for %s with err %v", du.Name, err) r.closeDataPath(ctx, du.Name) @@ -234,7 +298,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Update status to Canceling. original := du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceling - if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { + if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { log.WithError(err).Error("error updating data upload into canceling status") return ctrl.Result{}, err } @@ -243,7 +307,15 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } return ctrl.Result{}, nil } else { - log.Debugf("Data upload now is in %s phase and do nothing by current %s controller", du.Status.Phase, r.nodeName) + // put the finilizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status + // instead of intermediate state + if isDataUploadInFinalState(du) && !du.DeletionTimestamp.IsZero() && controllerutil.ContainsFinalizer(du, dataUploadDownloadFinalizer) { + original := du.DeepCopy() + controllerutil.RemoveFinalizer(du, dataUploadDownloadFinalizer) + if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error to remove finalizer") + } + } return ctrl.Result{}, nil } } @@ -341,29 +413,19 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp log.Warn("Async fs backup data path canceled") - var du velerov2alpha1api.DataUpload - if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); getErr != nil { + du := &velerov2alpha1api.DataUpload{} + if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, du); getErr != nil { log.WithError(getErr).Warn("Failed to get dataupload on cancel") } else { // cleans up any objects generated during the snapshot expose - ep, ok := r.snapshotExposerList[du.Spec.SnapshotType] - if !ok { - log.WithError(fmt.Errorf("%v type of snapshot exposer is not exist", du.Spec.SnapshotType)). - Warn("Failed to clean up resources on canceled") - } else { - var volumeSnapshotName string - if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { // Other exposer should have another condition - volumeSnapshotName = du.Spec.CSISnapshot.VolumeSnapshot - } - ep.CleanUp(ctx, getOwnerObject(&du), volumeSnapshotName, du.Spec.SourceNamespace) - } + r.cleanUp(ctx, du, log) original := du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled if du.Status.StartTimestamp.IsZero() { du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { + if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { log.WithError(err).Error("error updating DataUpload status") } else { r.metrics.RegisterDataUploadCancel(r.nodeName) @@ -371,6 +433,47 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp } } +// TryCancelDataUpload clear up resources only when update success +func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload) { + log := r.logger.WithField("dataupload", du.Name) + log.Warn("Async fs backup data path canceled") + succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(dataUpload *velerov2alpha1api.DataUpload) { + dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled + if dataUpload.Status.StartTimestamp.IsZero() { + dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} + } + dataUpload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} + }) + + if err != nil { + log.WithError(err).Error("error updating dataupload status") + return + } else if !succeeded { + log.Warn("conflict in updating dataupload status and will try it again later") + return + } + + // success update + r.metrics.RegisterDataUploadCancel(r.nodeName) + // cleans up any objects generated during the snapshot expose + r.cleanUp(ctx, du, log) + r.closeDataPath(ctx, du.Name) +} + +func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log *logrus.Entry) { + ep, ok := r.snapshotExposerList[du.Spec.SnapshotType] + if !ok { + log.WithError(fmt.Errorf("%v type of snapshot exposer is not exist", du.Spec.SnapshotType)). + Warn("Failed to clean up resources on canceled") + } else { + var volumeSnapshotName string + if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { // Other exposer should have another condition + volumeSnapshotName = du.Spec.CSISnapshot.VolumeSnapshot + } + ep.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace) + } +} + func (r *DataUploadReconciler) OnDataUploadProgress(ctx context.Context, namespace string, duName string, progress *uploader.Progress) { log := r.logger.WithField("dataupload", duName) @@ -451,7 +554,6 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco } r.logger.WithField("Backup pod", pod.Name).Infof("Preparing dataupload %s", du.Name) - // we don't expect anyone else update the CR during the Prepare process updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload) if err != nil || !updated { @@ -511,6 +613,7 @@ func (r *DataUploadReconciler) errorOut(ctx context.Context, du *velerov2alpha1a } else { err = errors.Wrapf(err, "failed to clean up exposed snapshot with could not find %s snapshot exposer", du.Spec.SnapshotType) } + return ctrl.Result{}, r.updateStatusToFailed(ctx, du, err, msg, log) } @@ -537,16 +640,27 @@ func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov // For all data upload controller in each node-agent will try to update dataupload CR, and only one controller will success, // and the success one could handle later logic - succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(du *velerov2alpha1api.DataUpload) { - du.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted - du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} - }) + updated := du.DeepCopy() + + updateFunc := func(dataUpload *velerov2alpha1api.DataUpload) { + dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted + dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} + labels := dataUpload.GetLabels() + if labels == nil { + labels = make(map[string]string) + } + labels[acceptNodeLabelKey] = r.nodeName + dataUpload.SetLabels(labels) + } + + succeeded, err := r.exclusiveUpdateDataUpload(ctx, updated, updateFunc) if err != nil { return false, err } if succeeded { + updateFunc(du) // If update success, it's need to update du values in memory r.logger.WithField("Dataupload", du.Name).Infof("This datauplod has been accepted by %s", r.nodeName) return true, nil } @@ -595,13 +709,15 @@ func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov func (r *DataUploadReconciler) exclusiveUpdateDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, updateFunc func(*velerov2alpha1api.DataUpload)) (bool, error) { - updated := du.DeepCopy() - updateFunc(updated) + updateFunc(du) - err := r.client.Update(ctx, updated) + err := r.client.Update(ctx, du) if err == nil { return true, nil - } else if apierrors.IsConflict(err) { + } + + // warn we won't rollback du values in memory when error + if apierrors.IsConflict(err) { return false, nil } else { return false, err @@ -666,3 +782,30 @@ func findDataUploadByPod(client client.Client, pod corev1.Pod) (*velerov2alpha1a } return nil, nil } + +func isDataUploadInFinalState(du *velerov2alpha1api.DataUpload) bool { + return du.Status.Phase == velerov2alpha1api.DataUploadPhaseFailed || + du.Status.Phase == velerov2alpha1api.DataUploadPhaseCanceled || + du.Status.Phase == velerov2alpha1api.DataUploadPhaseCompleted +} + +func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(dataUpload *velerov2alpha1api.DataUpload)) error { + return wait.PollUntilWithContext(ctx, time.Second, func(ctx context.Context) (done bool, err error) { + du := &velerov2alpha1api.DataUpload{} + if err := client.Get(ctx, namespacedName, du); err != nil { + return false, errors.Wrap(err, "getting DataUpload") + } + + updateFunc(du) + updateErr := client.Update(ctx, du) + if updateErr != nil { + if apierrors.IsConflict(updateErr) { + log.Warnf("failed to update dataupload for %s/%s and will retry it", du.Namespace, du.Name) + return false, nil + } + log.Errorf("failed to update dataupload with error %s for %s/%s", updateErr.Error(), du.Namespace, du.Name) + return false, err + } + return true, nil + }) +} diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index e17aad5271..f825e9d9d6 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -24,11 +24,13 @@ import ( snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" snapshotFake "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -39,6 +41,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" kbclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/vmware-tanzu/velero/internal/credentials" @@ -60,10 +63,11 @@ const fakeSnapshotType velerov2alpha1api.SnapshotType = "fake-snapshot" type FakeClient struct { kbclient.Client - getError error - createError error - updateError error - patchError error + getError error + createError error + updateError error + patchError error + updateConflict error } func (c *FakeClient) Get(ctx context.Context, key kbclient.ObjectKey, obj kbclient.Object) error { @@ -87,6 +91,10 @@ func (c *FakeClient) Update(ctx context.Context, obj kbclient.Object, opts ...kb return c.updateError } + if c.updateConflict != nil { + return c.updateConflict + } + return c.Client.Update(ctx, obj, opts...) } @@ -99,22 +107,18 @@ func (c *FakeClient) Patch(ctx context.Context, obj kbclient.Object, patch kbcli } func initDataUploaderReconciler(needError ...bool) (*DataUploadReconciler, error) { - var errs []error = make([]error, 4) - if len(needError) == 4 { - if needError[0] { + var errs []error = make([]error, 5) + for k, isError := range needError { + if k == 0 && isError { errs[0] = fmt.Errorf("Get error") - } - - if needError[1] { + } else if k == 1 && isError { errs[1] = fmt.Errorf("Create error") - } - - if needError[2] { + } else if k == 2 && isError { errs[2] = fmt.Errorf("Update error") - } - - if needError[3] { + } else if k == 3 && isError { errs[3] = fmt.Errorf("Patch error") + } else if k == 4 && isError { + errs[4] = apierrors.NewConflict(velerov2alpha1api.Resource("datadownload"), dataDownloadName, errors.New("conflict")) } } @@ -181,11 +185,18 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci Client: fake.NewClientBuilder().WithScheme(scheme).Build(), } - if len(needError) == 4 { - fakeClient.getError = needError[0] - fakeClient.createError = needError[1] - fakeClient.updateError = needError[2] - fakeClient.patchError = needError[3] + for k := range needError { + if k == 0 { + fakeClient.getError = needError[0] + } else if k == 1 { + fakeClient.createError = needError[1] + } else if k == 2 { + fakeClient.updateError = needError[2] + } else if k == 3 { + fakeClient.patchError = needError[3] + } else if k == 4 { + fakeClient.updateConflict = needError[4] + } } fakeSnapshotClient := snapshotFake.NewSimpleClientset(vsObject, vscObj) @@ -217,6 +228,7 @@ func dataUploadBuilder() *builder.DataUploadBuilder { VolumeSnapshot: "fake-volume-snapshot", } return builder.ForDataUpload(velerov1api.DefaultNamespace, dataUploadName). + Labels(map[string]string{velerov1api.DataUploadLabel: dataUploadName}). BackupStorageLocation("bsl-loc"). DataMover("velero"). SnapshotType("CSI").SourceNamespace("fake-ns").SourcePVC("test-pvc").CSISnapshot(csi) @@ -298,6 +310,7 @@ func TestReconcile(t *testing.T) { dataMgr *datapath.Manager expectedProcessed bool expected *velerov2alpha1api.DataUpload + checkFunc func(velerov2alpha1api.DataUpload) bool expectedRequeue ctrl.Result expectedErrMsg string needErrs []bool @@ -381,6 +394,37 @@ func TestReconcile(t *testing.T) { du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).StartTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(), expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), }, + { + name: "Dataupload with enabled cancel", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: func() *velerov2alpha1api.DataUpload { + du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).Result() + controllerutil.AddFinalizer(du, dataUploadDownloadFinalizer) + du.DeletionTimestamp = &metav1.Time{Time: time.Now()} + return du + }(), + expectedProcessed: false, + checkFunc: func(du velerov2alpha1api.DataUpload) bool { + return du.Spec.Cancel + }, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + expectedRequeue: ctrl.Result{}, + }, + { + name: "Dataupload with remove finalizer and should not be retrieved", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: func() *velerov2alpha1api.DataUpload { + du := dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Cancel(true).Result() + controllerutil.AddFinalizer(du, dataUploadDownloadFinalizer) + du.DeletionTimestamp = &metav1.Time{Time: time.Now()} + return du + }(), + expectedProcessed: false, + checkFunc: func(du velerov2alpha1api.DataUpload) bool { + return !controllerutil.ContainsFinalizer(&du, dataUploadDownloadFinalizer) + }, + expectedRequeue: ctrl.Result{}, + }, } for _, test := range tests { @@ -466,6 +510,10 @@ func TestReconcile(t *testing.T) { if !test.expectedProcessed { assert.Equal(t, du.Status.CompletionTimestamp.IsZero(), true) } + + if test.checkFunc != nil { + assert.True(t, test.checkFunc(du)) + } }) } } @@ -753,3 +801,178 @@ func TestOnDuPrepareTimeout(t *testing.T) { assert.Equal(t, test.expected.Status.Phase, du.Status.Phase) } } + +func TestTryCancelDataUpload(t *testing.T) { + tests := []struct { + name string + dd *velerov2alpha1api.DataUpload + needErrs []error + succeeded bool + expectedErr string + }{ + { + name: "update fail", + dd: dataUploadBuilder().Result(), + needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil}, + }, + { + name: "cancel by others", + dd: dataUploadBuilder().Result(), + needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil}, + }, + { + name: "succeed", + dd: dataUploadBuilder().Result(), + needErrs: []error{nil, nil, nil, nil}, + succeeded: true, + }, + } + for _, test := range tests { + ctx := context.Background() + r, err := initDataUploaderReconcilerWithError(test.needErrs...) + require.NoError(t, err) + + err = r.client.Create(ctx, test.dd) + require.NoError(t, err) + + r.TryCancelDataUpload(ctx, test.dd) + + if test.expectedErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.expectedErr) + } + } +} + +func TestUpdateDataUploadWithRetry(t *testing.T) { + + namespacedName := types.NamespacedName{ + Name: dataUploadName, + Namespace: "velero", + } + + // Define test cases + testCases := []struct { + Name string + needErrs []bool + ExpectErr bool + }{ + { + Name: "SuccessOnFirstAttempt", + needErrs: []bool{false, false, false, false}, + ExpectErr: false, + }, + { + Name: "Error get", + needErrs: []bool{true, false, false, false, false}, + ExpectErr: true, + }, + { + Name: "Error update", + needErrs: []bool{false, false, true, false, false}, + ExpectErr: true, + }, + { + Name: "Conflict with error timeout", + needErrs: []bool{false, false, false, false, true}, + ExpectErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + ctx, cancelFunc := context.WithTimeout(context.TODO(), time.Second*5) + defer cancelFunc() + r, err := initDataUploaderReconciler(tc.needErrs...) + require.NoError(t, err) + err = r.client.Create(ctx, dataUploadBuilder().Result()) + require.NoError(t, err) + updateFunc := func(dataDownload *velerov2alpha1api.DataUpload) { + dataDownload.Spec.Cancel = true + } + err = UpdateDataUploadWithRetry(ctx, r.client, namespacedName, velerotest.NewLogger().WithField("name", tc.Name), updateFunc) + if tc.ExpectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestFindDataUploads(t *testing.T) { + tests := []struct { + name string + pod corev1.Pod + du *velerov2alpha1api.DataUpload + expectedUploads []velerov2alpha1api.DataUpload + expectedError bool + }{ + // Test case 1: Pod with matching nodeName and DataUpload label + { + name: "MatchingPod", + pod: corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "pod-1", + Labels: map[string]string{ + velerov1api.DataUploadLabel: dataUploadName, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + }, + du: dataUploadBuilder().Result(), + expectedUploads: []velerov2alpha1api.DataUpload{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: dataUploadName, + }, + }, + }, + expectedError: false, + }, + // Test case 2: Pod with non-matching nodeName + { + name: "NonMatchingNodePod", + pod: corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "pod-2", + Labels: map[string]string{ + velerov1api.DataUploadLabel: dataUploadName, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "node-2", + }, + }, + du: dataUploadBuilder().Result(), + expectedUploads: []velerov2alpha1api.DataUpload{}, + expectedError: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + r, err := initDataUploaderReconcilerWithError() + require.NoError(t, err) + r.nodeName = "node-1" + err = r.client.Create(ctx, test.du) + require.NoError(t, err) + err = r.client.Create(ctx, &test.pod) + require.NoError(t, err) + uploads, err := r.FindDataUploads(context.Background(), r.client, "velero") + + if test.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, len(test.expectedUploads), len(uploads)) + } + }) + } +}