diff --git a/test/helper/retry/retry.go b/test/helper/retry/retry.go new file mode 100644 index 0000000000..ea23cad7df --- /dev/null +++ b/test/helper/retry/retry.go @@ -0,0 +1,28 @@ +package retry + +import ( + "context" + + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// RetryUpdateOnConflict is a wrapper around client-go/util/retry.RetryOnConflict, +// adding the following often repeated actions: +// +// 1. client.Get a resource for the given key +// 2. mutate the retrieved object using the given mutator function. +// 3. client.Update the updated resource and retry on conflict +// using the client-go/util/retry.DefaultRetry strategy. +func RetryUpdateOnConflict[T any](ctx context.Context, k8s client.Client, key client.ObjectKey, mutator func(*T)) (*T, error) { + var obj T + clientObj := any(&obj).(client.Object) + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := k8s.Get(ctx, key, clientObj); err != nil { + return err + } + mutator(&obj) + return k8s.Update(ctx, clientObj) + }) + return &obj, err +} diff --git a/test/helper/retry/retry_test.go b/test/helper/retry/retry_test.go new file mode 100644 index 0000000000..4b826cf14a --- /dev/null +++ b/test/helper/retry/retry_test.go @@ -0,0 +1,113 @@ +package retry + +import ( + "context" + "errors" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" +) + +func TestRetryUpdateOnConflict(t *testing.T) { + for _, tc := range []struct { + name string + key client.ObjectKey + objects []client.Object + interceptorFuncs interceptor.Funcs + + want *akov2.AtlasProject + wantErr string + }{ + { + name: "fail immediately if not found", + key: types.NamespacedName{ + Name: "foo", + Namespace: "bar", + }, + want: &akov2.AtlasProject{}, + wantErr: "atlasprojects.atlas.mongodb.com \"foo\" not found", + }, + { + name: "succeed if found", + key: types.NamespacedName{ + Name: "foo", + Namespace: "bar", + }, + objects: []client.Object{ + &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}}, + }, + want: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}}, + }, + { + name: "exhaust on conflict", + key: types.NamespacedName{ + Name: "foo", + Namespace: "bar", + }, + objects: []client.Object{ + &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}}, + }, + interceptorFuncs: interceptor.Funcs{ + Update: func(context.Context, client.WithWatch, client.Object, ...client.UpdateOption) error { + return &apierrors.StatusError{ErrStatus: metav1.Status{Reason: metav1.StatusReasonConflict, Message: "conflict"}} + }, + }, + want: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}}, + wantErr: "conflict", + }, + { + name: "fail on any other update error", + key: types.NamespacedName{ + Name: "foo", + Namespace: "bar", + }, + objects: []client.Object{ + &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}}, + }, + interceptorFuncs: interceptor.Funcs{ + Update: func(context.Context, client.WithWatch, client.Object, ...client.UpdateOption) error { + return errors.New("boom") + }, + }, + want: &akov2.AtlasProject{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}}, + wantErr: "boom", + }, + } { + t.Run(tc.name, func(t *testing.T) { + testScheme := runtime.NewScheme() + assert.NoError(t, akov2.AddToScheme(testScheme)) + k8sClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(tc.objects...). + WithInterceptorFuncs(tc.interceptorFuncs). + Build() + + got, err := RetryUpdateOnConflict(context.Background(), k8sClient, tc.key, func(*akov2.AtlasProject) {}) + gotErr := "" + if err != nil { + gotErr = err.Error() + } + + if gotErr != tc.wantErr { + t.Errorf("want error %q, got %q", tc.wantErr, gotErr) + } + + // ignore unnecessary fields + got.ResourceVersion = "" + + if !reflect.DeepEqual(got, tc.want) { + t.Errorf("want AtlasProject %+v, got %+v", tc.want, got) + } + }) + } +} diff --git a/test/int/deployment_test.go b/test/int/deployment_test.go index 42d09390c9..ceeba55f6d 100644 --- a/test/int/deployment_test.go +++ b/test/int/deployment_test.go @@ -18,7 +18,6 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/compat" @@ -36,6 +35,7 @@ import ( "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/atlas" "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/conditions" "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/resources" + akoretry "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/retry" ) const ( @@ -293,7 +293,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment- }) By("Filling token secret with invalid data", func() { - _, err := retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(connectionSecret), func(secret *corev1.Secret) { + _, err := akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(connectionSecret), func(secret *corev1.Secret) { secret.StringData = map[string]string{ OrgID: "fake", PrivateAPIKey: "fake", PublicAPIKey: "fake", } @@ -313,7 +313,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment- }) By("Fix the token secret", func() { - _, err := retryUpdateOnConflict(ctx, k8sClient, types.NamespacedName{Namespace: namespace.Name, Name: ConnectionSecretName}, func(secret *corev1.Secret) { + _, err := akoretry.RetryUpdateOnConflict(ctx, k8sClient, types.NamespacedName{Namespace: namespace.Name, Name: ConnectionSecretName}, func(secret *corev1.Secret) { secret.StringData = secretData() }) Expect(err).To(BeNil()) @@ -764,7 +764,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment- }) By("Updating the Deployment tags with a duplicate key and removing all tags", func() { - _, err := retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { + _, err := akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { deployment.Spec.DeploymentSpec.Tags = []*akov2.TagSpec{{Key: "test-1", Value: "value-1"}, {Key: "test-1", Value: "value-2"}} }) Expect(err).To(BeNil()) @@ -812,7 +812,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment- }) By("Updating the Deployment configuration while paused (should fail)", func() { - _, err := retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { + _, err := akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { deployment.Spec.DeploymentSpec.BackupEnabled = pointer.MakePtr(false) }) Expect(err).To(BeNil()) @@ -854,7 +854,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment- oldSizeName string err error ) - createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { + createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { oldSizeName = deployment.Spec.DeploymentSpec.ReplicationSpecs[0].RegionConfigs[0].ElectableSpecs.InstanceSize deployment.Spec.DeploymentSpec.ReplicationSpecs[0].RegionConfigs[0].ElectableSpecs = &akov2.Specs{ InstanceSize: "M42", @@ -1007,7 +1007,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment- performCreate(createdDeployment, 30*time.Minute) var err error - createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { + createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { deployment.ObjectMeta.Annotations = map[string]string{customresource.ReconciliationPolicyAnnotation: customresource.ReconciliationPolicySkip} deployment.Spec.DeploymentSpec.Labels = append(createdDeployment.Spec.DeploymentSpec.Labels, common.LabelSpec{ Key: "some-key", @@ -1045,7 +1045,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment- By(fmt.Sprintf("Updating the InstanceSize of Advanced Deployment %s", kube.ObjectKeyFromObject(createdDeployment)), func() { var err error - createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { + createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { deployment.Spec.DeploymentSpec.ReplicationSpecs[0].RegionConfigs[0].ElectableSpecs = &akov2.Specs{ InstanceSize: "M20", NodeCount: pointer.MakePtr(3), @@ -1071,7 +1071,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment- By(fmt.Sprintf("Enable AutoScaling for the Advanced Deployment %s", kube.ObjectKeyFromObject(createdDeployment)), func() { var err error - createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { + createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { regionConfig := deployment.Spec.DeploymentSpec.ReplicationSpecs[0].RegionConfigs[0] regionConfig.ElectableSpecs.InstanceSize = "M10" regionConfig.ReadOnlySpecs.InstanceSize = "M10" @@ -1100,7 +1100,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment- By(fmt.Sprintf("Update Instance Size Margins with AutoScaling for Deployment %s", kube.ObjectKeyFromObject(createdDeployment)), func() { var err error - createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { + createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { regionConfig := deployment.Spec.DeploymentSpec.ReplicationSpecs[0].RegionConfigs[0] regionConfig.AutoScaling.Compute.MinInstanceSize = "M20" regionConfig.ElectableSpecs.InstanceSize = "M20" @@ -1179,7 +1179,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment- err := compat.JSONCopy(&previousDeployment, createdDeployment) Expect(err).NotTo(HaveOccurred()) - createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { + createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { deployment.Spec.DeploymentSpec.ReplicationSpecs[0]. RegionConfigs[0]. AutoScaling. @@ -1206,7 +1206,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment- err := compat.JSONCopy(&previousDeployment, createdDeployment) Expect(err).NotTo(HaveOccurred()) - createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { + createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { deployment.Spec.DeploymentSpec.ReplicationSpecs[0]. RegionConfigs[0]. ElectableSpecs.InstanceSize = "M20" @@ -1315,7 +1315,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment", "deployment- //nolint:dupl By("Updating the Instance tags with a duplicate key and removing all tags", func() { var err error - createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { + createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { deployment.Spec.ServerlessSpec.Tags = []*akov2.TagSpec{{Key: "test-1", Value: "value-1"}, {Key: "test-1", Value: "value-2"}} }) Expect(err).To(BeNil()) @@ -1531,7 +1531,7 @@ var _ = Describe("AtlasDeployment", Ordered, Label("int", "AtlasDeployment", "de Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(createdDeployment), createdDeployment)).Should(Succeed()) var err error - createdDeployment, err = retryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { + createdDeployment, err = akoretry.RetryUpdateOnConflict(ctx, k8sClient, client.ObjectKeyFromObject(createdDeployment), func(deployment *akov2.AtlasDeployment) { deployment.Spec.BackupScheduleRef = common.ResourceRefNamespaced{ Name: bScheduleName, Namespace: namespace.Name, @@ -1865,7 +1865,7 @@ func mergedAdvancedDeployment( } func performUpdate[T any](ctx context.Context, timeout time.Duration, key client.ObjectKey, mutator func(*T)) *T { - obj, err := retryUpdateOnConflict(ctx, k8sClient, key, mutator) + obj, err := akoretry.RetryUpdateOnConflict(ctx, k8sClient, key, mutator) Expect(err).To(BeNil()) clientObj := any(obj).(api.AtlasCustomResource) @@ -1875,16 +1875,3 @@ func performUpdate[T any](ctx context.Context, timeout time.Duration, key client return obj } - -func retryUpdateOnConflict[T any](ctx context.Context, k8s client.Client, key client.ObjectKey, mutator func(*T)) (*T, error) { - var obj T - clientObj := any(&obj).(client.Object) - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - if err := k8s.Get(ctx, key, clientObj); err != nil { - return err - } - mutator(&obj) - return k8s.Update(ctx, clientObj) - }) - return &obj, err -}