Skip to content

Commit

Permalink
CLOUDP-251510: test/helper/retry: add helper for get+mutate+update re…
Browse files Browse the repository at this point in the history
…tries (#1625)

* test/helper/retry: add helper for get+mutate+update retries

* address import name
  • Loading branch information
s-urbaniak authored Jun 1, 2024
1 parent 81988e5 commit 37f229e
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 28 deletions.
28 changes: 28 additions & 0 deletions test/helper/retry/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package retry

import (
"context"

"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// RetryUpdateOnConflict is a wrapper around client-go/util/retry.RetryOnConflict,
// adding the following often repeated actions:
//
// 1. client.Get a resource for the given key
// 2. mutate the retrieved object using the given mutator function.
// 3. client.Update the updated resource and retry on conflict
// using the client-go/util/retry.DefaultRetry strategy.
func RetryUpdateOnConflict[T any](ctx context.Context, k8s client.Client, key client.ObjectKey, mutator func(*T)) (*T, error) {
var obj T
clientObj := any(&obj).(client.Object)
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := k8s.Get(ctx, key, clientObj); err != nil {
return err
}
mutator(&obj)
return k8s.Update(ctx, clientObj)
})
return &obj, err
}
113 changes: 113 additions & 0 deletions test/helper/retry/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package retry

import (
"context"
"errors"
"reflect"
"testing"

"github.com/stretchr/testify/assert"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1"
)

func TestRetryUpdateOnConflict(t *testing.T) {
for _, tc := range []struct {
name string
key client.ObjectKey
objects []client.Object
interceptorFuncs interceptor.Funcs

want *akov2.AtlasProject
wantErr string
}{
{
name: "fail immediately if not found",
key: types.NamespacedName{
Name: "foo",
Namespace: "bar",
},
want: &akov2.AtlasProject{},
wantErr: "atlasprojects.atlas.mongodb.com \"foo\" not found",
},
{
name: "succeed if found",
key: types.NamespacedName{
Name: "foo",
Namespace: "bar",
},
objects: []client.Object{
&akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}},
},
want: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}},
},
{
name: "exhaust on conflict",
key: types.NamespacedName{
Name: "foo",
Namespace: "bar",
},
objects: []client.Object{
&akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}},
},
interceptorFuncs: interceptor.Funcs{
Update: func(context.Context, client.WithWatch, client.Object, ...client.UpdateOption) error {
return &apierrors.StatusError{ErrStatus: metav1.Status{Reason: metav1.StatusReasonConflict, Message: "conflict"}}
},
},
want: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}},
wantErr: "conflict",
},
{
name: "fail on any other update error",
key: types.NamespacedName{
Name: "foo",
Namespace: "bar",
},
objects: []client.Object{
&akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}},
},
interceptorFuncs: interceptor.Funcs{
Update: func(context.Context, client.WithWatch, client.Object, ...client.UpdateOption) error {
return errors.New("boom")
},
},
want: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}},
wantErr: "boom",
},
} {
t.Run(tc.name, func(t *testing.T) {
testScheme := runtime.NewScheme()
assert.NoError(t, akov2.AddToScheme(testScheme))
k8sClient := fake.NewClientBuilder().
WithScheme(testScheme).
WithObjects(tc.objects...).
WithInterceptorFuncs(tc.interceptorFuncs).
Build()

got, err := RetryUpdateOnConflict(context.Background(), k8sClient, tc.key, func(*akov2.AtlasProject) {})
gotErr := ""
if err != nil {
gotErr = err.Error()
}

if gotErr != tc.wantErr {
t.Errorf("want error %q, got %q", tc.wantErr, gotErr)
}

// ignore unnecessary fields
got.ResourceVersion = ""

if !reflect.DeepEqual(got, tc.want) {
t.Errorf("want AtlasProject %+v, got %+v", tc.want, got)
}
})
}
}
43 changes: 15 additions & 28 deletions test/int/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/compat"
Expand All @@ -36,6 +35,7 @@ import (
"github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/atlas"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/conditions"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/resources"
akoretry "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/retry"
)

const (
Expand Down Expand Up @@ -293,7 +293,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment-
})

By("Filling token secret with invalid data", func() {
_, err := retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(connectionSecret), func(secret *corev1.Secret) {
_, err := akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(connectionSecret), func(secret *corev1.Secret) {
secret.StringData = map[string]string{
OrgID: "fake", PrivateAPIKey: "fake", PublicAPIKey: "fake",
}
Expand All @@ -313,7 +313,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment-
})

By("Fix the token secret", func() {
_, err := retryUpdateOnConflict(ctx, k8sClient, types.NamespacedName{Namespace: namespace.Name, Name: ConnectionSecretName}, func(secret *corev1.Secret) {
_, err := akoretry.RetryUpdateOnConflict(ctx, k8sClient, types.NamespacedName{Namespace: namespace.Name, Name: ConnectionSecretName}, func(secret *corev1.Secret) {
secret.StringData = secretData()
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -764,7 +764,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment-
})

By("Updating the Deployment tags with a duplicate key and removing all tags", func() {
_, err := retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
_, err := akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
deployment.Spec.DeploymentSpec.Tags = []*akov2.TagSpec{{Key: "test-1", Value: "value-1"}, {Key: "test-1", Value: "value-2"}}
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -812,7 +812,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment-
})

By("Updating the Deployment configuration while paused (should fail)", func() {
_, err := retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
_, err := akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
deployment.Spec.DeploymentSpec.BackupEnabled = pointer.MakePtr(false)
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -854,7 +854,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment-
oldSizeName string
err error
)
createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
oldSizeName = deployment.Spec.DeploymentSpec.ReplicationSpecs[0].RegionConfigs[0].ElectableSpecs.InstanceSize
deployment.Spec.DeploymentSpec.ReplicationSpecs[0].RegionConfigs[0].ElectableSpecs = &akov2.Specs{
InstanceSize: "M42",
Expand Down Expand Up @@ -1007,7 +1007,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment-
performCreate(createdDeployment, 30*time.Minute)

var err error
createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
deployment.ObjectMeta.Annotations = map[string]string{customresource.ReconciliationPolicyAnnotation: customresource.ReconciliationPolicySkip}
deployment.Spec.DeploymentSpec.Labels = append(createdDeployment.Spec.DeploymentSpec.Labels, common.LabelSpec{
Key: "some-key",
Expand Down Expand Up @@ -1045,7 +1045,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment-

By(fmt.Sprintf("Updating the InstanceSize of Advanced Deployment %s", kube.ObjectKeyFromObject(createdDeployment)), func() {
var err error
createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
deployment.Spec.DeploymentSpec.ReplicationSpecs[0].RegionConfigs[0].ElectableSpecs = &akov2.Specs{
InstanceSize: "M20",
NodeCount: pointer.MakePtr(3),
Expand All @@ -1071,7 +1071,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment-

By(fmt.Sprintf("Enable AutoScaling for the Advanced Deployment %s", kube.ObjectKeyFromObject(createdDeployment)), func() {
var err error
createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
regionConfig := deployment.Spec.DeploymentSpec.ReplicationSpecs[0].RegionConfigs[0]
regionConfig.ElectableSpecs.InstanceSize = "M10"
regionConfig.ReadOnlySpecs.InstanceSize = "M10"
Expand Down Expand Up @@ -1100,7 +1100,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment-

By(fmt.Sprintf("Update Instance Size Margins with AutoScaling for Deployment %s", kube.ObjectKeyFromObject(createdDeployment)), func() {
var err error
createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
regionConfig := deployment.Spec.DeploymentSpec.ReplicationSpecs[0].RegionConfigs[0]
regionConfig.AutoScaling.Compute.MinInstanceSize = "M20"
regionConfig.ElectableSpecs.InstanceSize = "M20"
Expand Down Expand Up @@ -1179,7 +1179,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment-
err := compat.JSONCopy(&previousDeployment, createdDeployment)
Expect(err).NotTo(HaveOccurred())

createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
deployment.Spec.DeploymentSpec.ReplicationSpecs[0].
RegionConfigs[0].
AutoScaling.
Expand All @@ -1206,7 +1206,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment-
err := compat.JSONCopy(&previousDeployment, createdDeployment)
Expect(err).NotTo(HaveOccurred())

createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
deployment.Spec.DeploymentSpec.ReplicationSpecs[0].
RegionConfigs[0].
ElectableSpecs.InstanceSize = "M20"
Expand Down Expand Up @@ -1315,7 +1315,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment-
//nolint:dupl
By("Updating the Instance tags with a duplicate key and removing all tags", func() {
var err error
createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
deployment.Spec.ServerlessSpec.Tags = []*akov2.TagSpec{{Key: "test-1", Value: "value-1"}, {Key: "test-1", Value: "value-2"}}
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -1531,7 +1531,7 @@ var _ = Describe("AtlasDeployment", Ordered, Label("int", "AtlasDeployment", "de

Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(createdDeployment), createdDeployment)).Should(Succeed())
var err error
createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) {
deployment.Spec.BackupScheduleRef = common.ResourceRefNamespaced{
Name: bScheduleName,
Namespace: namespace.Name,
Expand Down Expand Up @@ -1865,7 +1865,7 @@ func mergedAdvancedDeployment(
}

func performUpdate[T any](ctx context.Context, timeout time.Duration, key client.ObjectKey, mutator func(*T)) *T {
obj, err := retryUpdateOnConflict(ctx, k8sClient, key, mutator)
obj, err := akoretry.RetryUpdateOnConflict(ctx, k8sClient, key, mutator)
Expect(err).To(BeNil())

clientObj := any(obj).(api.AtlasCustomResource)
Expand All @@ -1875,16 +1875,3 @@ func performUpdate[T any](ctx context.Context, timeout time.Duration, key client

return obj
}

func retryUpdateOnConflict[T any](ctx context.Context, k8s client.Client, key client.ObjectKey, mutator func(*T)) (*T, error) {
var obj T
clientObj := any(&obj).(client.Object)
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := k8s.Get(ctx, key, clientObj); err != nil {
return err
}
mutator(&obj)
return k8s.Update(ctx, clientObj)
})
return &obj, err
}

0 comments on commit 37f229e

Please sign in to comment.