Skip to content

Commit

Permalink
Fix #6562 #6563 data mover bug
Browse files Browse the repository at this point in the history
Signed-off-by: Ming Qiu <mqiu@vmware.com>
  • Loading branch information
qiuming-best committed Aug 4, 2023
1 parent 06628cf commit 142c66b
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 22 deletions.
12 changes: 4 additions & 8 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,10 +1107,8 @@ func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
updated := du.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&du)); err != nil {
msg := fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
if err := controller.MarkDataUploadCancel(ctx, client, &du, msg); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
continue
}
Expand All @@ -1132,10 +1130,8 @@ func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, rest
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&dd)); err != nil {
msg := fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
if err := controller.MarkDataDownloadCancel(ctx, client, &dd, msg); err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", dd.GetName())
continue
}
Expand Down
56 changes: 48 additions & 8 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
return false
}

if newObj.Status.Phase != v1.PodRunning {
if newObj.Status.Phase == "" {
return false
}

Expand Down Expand Up @@ -433,10 +433,18 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object)
return []reconcile.Request{}
}

requests := make([]reconcile.Request, 1)
if kube.IsPodInAbnormalState(pod) { // let the abnormal restore pod failed early
msg := fmt.Sprintf("restore mark as cancel to failed early for restore pod %s/%s is not in running status", pod.Namespace, pod.Name)
if err := MarkDataDownloadCancel(context.Background(), r.client, dd, msg); err != nil {
r.logger.WithFields(logrus.Fields{
"Datadownload": dd.Name,
"Restore pod": pod.Name,
}).WithError(err).Warn("failed to cancel datadownload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
}

r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name)

// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload)
if err != nil || !updated {
Expand All @@ -448,6 +456,7 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object)
return []reconcile.Request{}
}

requests := make([]reconcile.Request, 1)
requests[0] = reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: dd.Namespace,
Expand Down Expand Up @@ -524,13 +533,34 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel
return false, err
}

if succeeded {
r.logger.WithField("DataDownload", dd.Name).Infof("This datadownload has been accepted by %s", r.nodeName)
return true, nil
if !succeeded {
r.logger.WithField("DataDownload", dd.Name).Info("This datadownload has been accepted by others")
return false, nil
}

if err = r.AddAcceptedLabel(ctx, dd); err != nil {
return false, err
}

r.logger.WithField("DataDownload", dd.Name).Info("This datadownload has been accepted by others")
return false, nil
r.logger.WithField("DataDownload", dd.Name).Infof("This datadownload has been accepted by %s", r.nodeName)
return true, nil

}

func (r *DataDownloadReconciler) AddAcceptedLabel(ctx context.Context, dd *velerov2alpha1api.DataDownload) error {
updated := dd.DeepCopy()
labels := updated.GetLabels()
if labels == nil {
labels = make(map[string]string)
}

labels[acceptNodeLabelKey] = r.nodeName
updated.SetLabels(labels)
if err := r.client.Patch(ctx, updated, client.MergeFrom(dd)); err != nil {
return errors.Wrapf(err, "failed to add accepted label for datadownload %s", dd.Name)
}

return nil
}

func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *velerov2alpha1api.DataDownload) {
Expand Down Expand Up @@ -614,3 +644,13 @@ func findDataDownloadByPod(client client.Client, pod v1.Pod) (*velerov2alpha1api

return nil, nil
}

func MarkDataDownloadCancel(ctx context.Context, cli client.Client, dd *velerov2alpha1api.DataDownload, msg string) error {
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = msg
if err := cli.Patch(ctx, updated, client.MergeFrom(dd)); err != nil {
return errors.Wrapf(err, "failed to mark datadownload as canceled %s", dd.Name)
}
return nil
}
54 changes: 48 additions & 6 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (

const (
dataUploadDownloadRequestor string = "snapshot-data-upload-download"
acceptNodeLabelKey string = "velero.io/accepted-by"
preparingMonitorFrequency time.Duration = time.Minute
)

Expand Down Expand Up @@ -412,7 +413,7 @@ func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
return false
}

if newObj.Status.Phase != corev1.PodRunning {
if newObj.Status.Phase == "" {
return false
}

Expand Down Expand Up @@ -450,6 +451,17 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco
return []reconcile.Request{}
}

if kube.IsPodInAbnormalState(pod) { // let the abnormal backup pod failed early
msg := fmt.Sprintf("backup mark as cancel to failed early for restore pod %s/%s is not in running status", pod.Namespace, pod.Name)
if err := MarkDataUploadCancel(context.Background(), r.client, du, msg); err != nil {
r.logger.WithFields(logrus.Fields{
"Datadupload": du.Name,
"Backup pod": pod.Name,
}).WithError(err).Warn("failed to cancel dataupload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
}

r.logger.WithField("Backup pod", pod.Name).Infof("Preparing dataupload %s", du.Name)

// we don't expect anyone else update the CR during the Prepare process
Expand Down Expand Up @@ -546,13 +558,33 @@ func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov
return false, err
}

if succeeded {
r.logger.WithField("Dataupload", du.Name).Infof("This datauplod has been accepted by %s", r.nodeName)
return true, nil
if !succeeded {
r.logger.WithField("Dataupload", du.Name).Info("This datauplod has been accepted by others")
return false, nil
}

if err = r.AddAcceptedLabel(ctx, du); err != nil {
return false, err
}

r.logger.WithField("Dataupload", du.Name).Infof("This datauplod has been accepted by %s", r.nodeName)
return true, nil
}

func (r *DataUploadReconciler) AddAcceptedLabel(ctx context.Context, du *velerov2alpha1api.DataUpload) error {
updated := du.DeepCopy()
labels := updated.GetLabels()
if labels == nil {
labels = make(map[string]string)
}

r.logger.WithField("Dataupload", du.Name).Info("This datauplod has been accepted by others")
return false, nil
labels[acceptNodeLabelKey] = r.nodeName
updated.SetLabels(labels)
if err := r.client.Patch(ctx, updated, client.MergeFrom(du)); err != nil {
return errors.Wrapf(err, "failed to add accepted label for dataupload %s", du.Name)
}

return nil
}

func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov2alpha1api.DataUpload) {
Expand Down Expand Up @@ -666,3 +698,13 @@ func findDataUploadByPod(client client.Client, pod corev1.Pod) (*velerov2alpha1a
}
return nil, nil
}

func MarkDataUploadCancel(ctx context.Context, cli client.Client, du *velerov2alpha1api.DataUpload, msg string) error {
updated := du.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = msg
if err := cli.Patch(ctx, updated, client.MergeFrom(du)); err != nil {
return errors.Wrapf(err, "failed to mark dataupload as canceled %s", du.Name)
}
return nil
}
12 changes: 12 additions & 0 deletions pkg/util/kube/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,15 @@ func EnsureDeletePod(ctx context.Context, podGetter corev1client.CoreV1Interface

return nil
}

func IsPodInAbnormalState(pod *corev1api.Pod) bool {
for _, containerStatus := range pod.Status.ContainerStatuses {
if !containerStatus.Ready {
return true
}
if containerStatus.State.Waiting != nil {
return true
}
}
return false
}
93 changes: 93 additions & 0 deletions pkg/util/kube/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,96 @@ func TestDeletePodIfAny(t *testing.T) {
})
}
}

func TestIsPodInAbnormalState(t *testing.T) {
testCases := []struct {
description string
pod *corev1api.Pod
expected bool
}{
{
description: "All containers ready",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: true},
{Ready: true},
},
},
},
expected: false,
},
{
description: "Some containers not ready",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: true},
{Ready: false},
},
},
},
expected: true,
},
{
description: "Container waiting",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: true},
{
Ready: false,
State: corev1api.ContainerState{
Waiting: &corev1api.ContainerStateWaiting{},
},
},
},
},
},
expected: true,
},
{
description: "All containers ready but waiting",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: true},
{
Ready: true,
State: corev1api.ContainerState{
Waiting: &corev1api.ContainerStateWaiting{},
},
},
},
},
},
expected: true,
},
{
description: "All containers ready and running",
pod: &corev1api.Pod{
Status: corev1api.PodStatus{
ContainerStatuses: []corev1api.ContainerStatus{
{Ready: true},
{
Ready: true,
State: corev1api.ContainerState{
Running: &corev1api.ContainerStateRunning{},
},
},
},
},
},
expected: false,
},
}

for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
actual := IsPodInAbnormalState(tc.pod)
if actual != tc.expected {
t.Errorf("Expected pod to be in abnormal state: %v, but got: %v", tc.expected, actual)
}
})
}
}

0 comments on commit 142c66b

Please sign in to comment.