Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data mover controller bugs #6616

Merged
merged 1 commit into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/builder/data_upload_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,9 @@ func (d *DataUploadBuilder) StartTimestamp(startTime *metav1.Time) *DataUploadBu
d.object.Status.StartTimestamp = startTime
return d
}

// Labels sets the DataUpload's Labels.
func (d *DataUploadBuilder) Labels(labels map[string]string) *DataUploadBuilder {
d.object.Labels = labels
return d
}
24 changes: 15 additions & 9 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,14 @@
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
updated := du.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(&du)); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", du.GetName())
err = controller.UpdateDataUploadWithRetry(s.ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, s.logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
})

Check warning on line 375 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L371-L375

Added lines #L371 - L375 were not covered by tests

if err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())

Check warning on line 378 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L377-L378

Added lines #L377 - L378 were not covered by tests
continue
}
s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
Expand All @@ -396,10 +399,13 @@
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(dd)); err != nil {
err = controller.UpdateDataDownloadWithRetry(s.ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, s.logger.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})

Check warning on line 406 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L402-L406

Added lines #L402 - L406 were not covered by tests

if err != nil {

Check warning on line 408 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L408

Added line #L408 was not covered by tests
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", dd.GetName())
continue
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -1107,10 +1108,13 @@
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
updated := du.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&du)); err != nil {
err := controller.UpdateDataUploadWithRetry(ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
})

Check warning on line 1115 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L1111-L1115

Added lines #L1111 - L1115 were not covered by tests

if err != nil {

Check warning on line 1117 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L1117

Added line #L1117 was not covered by tests
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
continue
}
Expand All @@ -1132,10 +1136,13 @@
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&dd)); err != nil {
err := controller.UpdateDataDownloadWithRetry(ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
})

Check warning on line 1143 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L1139-L1143

Added lines #L1139 - L1143 were not covered by tests

if err != nil {

Check warning on line 1145 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L1145

Added line #L1145 was not covered by tests
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", dd.GetName())
continue
}
Expand Down
157 changes: 145 additions & 12 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -118,6 +120,33 @@
return r.errorOut(ctx, dd, errors.New("uninitialized generic exposer"), "uninitialized exposer", log)
}

// Add finalizer
// Logic for clear resources when datadownload been deleted
if dd.DeletionTimestamp.IsZero() { // add finalizer for all cr at beginning
if !isDataDownloadInFinalState(dd) && !controllerutil.ContainsFinalizer(dd, dataUploadDownloadFinalizer) {
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
controllerutil.AddFinalizer(dd, dataUploadDownloadFinalizer)
})
if err != nil {
log.Errorf("failed to add finalizer with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name)
return ctrl.Result{}, err
} else if !succeeded {
log.Warnf("failed to add finilizer for %s/%s and will requeue later", dd.Namespace, dd.Name)
return ctrl.Result{Requeue: true}, nil
}

Check warning on line 136 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L134-L136

Added lines #L134 - L136 were not covered by tests
}
} else if controllerutil.ContainsFinalizer(dd, dataUploadDownloadFinalizer) && !dd.Spec.Cancel && !isDataDownloadInFinalState(dd) {
// when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a dataupload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name)
}); err != nil {
log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name)
return ctrl.Result{}, err
}

Check warning on line 147 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L145-L147

Added lines #L145 - L147 were not covered by tests
}

if dd.Status.Phase == "" || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseNew {
log.Info("Data download starting")

Expand Down Expand Up @@ -150,15 +179,44 @@
// And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration)
if err != nil {
return r.errorOut(ctx, dd, err, "error to start restore expose", log)
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrap(err, "getting DataUpload")
}

Check warning on line 185 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L183-L185

Added lines #L183 - L185 were not covered by tests
}
if isDataDownloadInFinalState(dd) {
log.Warnf("expose snapshot with err %v but it may caused by clean up resources in cancel action", err)
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
return ctrl.Result{}, nil

Check warning on line 190 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L188-L190

Added lines #L188 - L190 were not covered by tests
} else {
return r.errorOut(ctx, dd, err, "error to expose snapshot", log)
}
}

log.Info("Restore is exposed")

// we need to get CR again for it may canceled by datadownload controller on other
// nodes when doing expose action, if detectd cancel action we need to clear up the internal
// resources created by velero during backup.
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
if apierrors.IsNotFound(err) {
log.Debug("Unable to find datadownload")
return ctrl.Result{}, nil
}
return ctrl.Result{}, errors.Wrap(err, "getting datadownload")

Check warning on line 206 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L202-L206

Added lines #L202 - L206 were not covered by tests
}

// we need to clean up resources as resources created in Expose it may later than cancel action or prepare time
// and need to clean up resources again
if isDataDownloadInFinalState(dd) {
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
}

Check warning on line 213 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L212-L213

Added lines #L212 - L213 were not covered by tests

return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
r.TryCancelDataDownload(ctx, dd)

Check warning on line 219 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L219

Added line #L219 was not covered by tests
} else if dd.Status.StartTimestamp != nil {
if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout {
r.onPrepareTimeout(ctx, dd)
Expand Down Expand Up @@ -249,7 +307,15 @@

return ctrl.Result{}, nil
} else {
log.Debugf("Data download now is in %s phase and do nothing by current %s controller", dd.Status.Phase, r.nodeName)
// put the finilizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status
// instead of intermediate state
if isDataDownloadInFinalState(dd) && !dd.DeletionTimestamp.IsZero() && controllerutil.ContainsFinalizer(dd, dataUploadDownloadFinalizer) {
original := dd.DeepCopy()
controllerutil.RemoveFinalizer(dd, dataUploadDownloadFinalizer)
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error to remove finalizer")
}

Check warning on line 317 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L316-L317

Added lines #L316 - L317 were not covered by tests
}
return ctrl.Result{}, nil
}
}
Expand Down Expand Up @@ -353,6 +419,32 @@
}
}

func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) {
log := r.logger.WithField("datadownload", dd.Name)
log.Warn("Async fs backup data path canceled")

succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
if dataDownload.Status.StartTimestamp.IsZero() {
dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
dataDownload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
})

if err != nil {
log.WithError(err).Error("error updating datadownload status")
return
} else if !succeeded {
log.Warn("conflict in updating datadownload status and will try it again later")
return
}

// success update
r.metrics.RegisterDataDownloadCancel(r.nodeName)
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
r.closeDataPath(ctx, dd.Name)
}

func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) {
log := r.logger.WithField("datadownload", ddName)

Expand Down Expand Up @@ -515,16 +607,28 @@

// For all data download controller in each node-agent will try to update download CR, and only one controller will success,
// and the success one could handle later logic
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
})

updated := dd.DeepCopy()

updateFunc := func(datadownload *velerov2alpha1api.DataDownload) {
datadownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
datadownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
labels := datadownload.GetLabels()
if labels == nil {
labels = make(map[string]string)
}
labels[acceptNodeLabelKey] = r.nodeName
datadownload.SetLabels(labels)
}

succeeded, err := r.exclusiveUpdateDataDownload(ctx, updated, updateFunc)

if err != nil {
return false, err
}

if succeeded {
updateFunc(dd) // If update success, it's need to update du values in memory
r.logger.WithField("DataDownload", dd.Name).Infof("This datadownload has been accepted by %s", r.nodeName)
return true, nil
}
Expand All @@ -537,7 +641,6 @@
log := r.logger.WithField("DataDownload", dd.Name)

log.Info("Timeout happened for preparing datadownload")

succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = "timeout on preparing data download"
Expand All @@ -562,13 +665,15 @@

func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload,
updateFunc func(*velerov2alpha1api.DataDownload)) (bool, error) {
updated := dd.DeepCopy()
updateFunc(updated)
updateFunc(dd)

err := r.client.Update(ctx, dd)

err := r.client.Update(ctx, updated)
if err == nil {
return true, nil
} else if apierrors.IsConflict(err) {
}
// it won't rollback dd in memory when error
if apierrors.IsConflict(err) {
return false, nil
} else {
return false, err
Expand Down Expand Up @@ -614,3 +719,31 @@

return nil, nil
}

func isDataDownloadInFinalState(dd *velerov2alpha1api.DataDownload) bool {
return dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseFailed ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseCanceled ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseCompleted
}

func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(dataDownload *velerov2alpha1api.DataDownload)) error {
return wait.PollUntilWithContext(ctx, time.Second, func(ctx context.Context) (done bool, err error) {
dd := &velerov2alpha1api.DataDownload{}
if err := client.Get(ctx, namespacedName, dd); err != nil {
return false, errors.Wrap(err, "getting DataDownload")
}

updateFunc(dd)
updateErr := client.Update(ctx, dd)
if updateErr != nil {
if apierrors.IsConflict(updateErr) {
log.Warnf("failed to update datadownload for %s/%s and will retry it", dd.Namespace, dd.Name)
return false, nil
}
log.Errorf("failed to update datadownload with error %s for %s/%s", updateErr.Error(), dd.Namespace, dd.Name)
return false, err
}

return true, nil
})
}
Loading
Loading