From 25c44660583a58ed44d75bcebfd08207c65bd4b5 Mon Sep 17 00:00:00 2001 From: Johannes Aubart Date: Mon, 23 Jun 2025 17:40:20 +0200 Subject: [PATCH 1/4] add retrying k8s client implementation --- docs/README.md | 1 + docs/libs/retry.md | 26 +++ pkg/clusters/cluster.go | 7 + pkg/pairs/pairs_test.go | 2 +- pkg/retry/retry.go | 324 ++++++++++++++++++++++++++++++++ pkg/retry/retry_test.go | 407 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 766 insertions(+), 1 deletion(-) create mode 100644 docs/libs/retry.md create mode 100644 pkg/retry/retry.go create mode 100644 pkg/retry/retry_test.go diff --git a/docs/README.md b/docs/README.md index 750b6d5..424e1e6 100644 --- a/docs/README.md +++ b/docs/README.md @@ -14,6 +14,7 @@ - [Key-Value Pairs](libs/pairs.md) - [Readiness Checks](libs/readiness.md) - [Kubernetes Resource Management](libs/resource.md) +- [Retrying k8s Operations](libs/retry.md) - [Kubernetes Resource Status Updating](libs/status.md) - [Testing](libs/testing.md) - [Thread Management](libs/threads.md) diff --git a/docs/libs/retry.md b/docs/libs/retry.md new file mode 100644 index 0000000..14b26b5 --- /dev/null +++ b/docs/libs/retry.md @@ -0,0 +1,26 @@ +# Retrying k8s Operations + +The `pkg/retry` package contains a `Client` that wraps a `client.Client` while implementing the interface itself and retries any failed (= the returned error is not `nil`) operation. +Methods that don't return an error are simply forwarded to the internal client. + +In addition to the `client.Client` interface's methods, the `retry.Client` also has `CreateOrUpdate` and `CreateOrPatch` methods, which use the corresponding controller-runtime implementations internally. + +The default retry parameters are: +- retry every 100 milliseconds +- don't increase retry interval +- no maximum number of attempts +- timeout after 1 second + +The `retry.Client` struct has builder-style methods to configure the parameters: +```golang +retryingClient := retry.NewRetryingClient(myClient). + WithTimeout(10 * time.Second). // try for at max 10 seconds + WithInterval(500 * time.Millisecond). // try every 500 milliseconds, but ... + WithBackoffMultiplier(2.0) // ... double the interval after each retry +``` + +For convenience, the `clusters.Cluster` type can return a retrying client for its internal client: +```golang +// cluster is of type *clusters.Cluster +err := cluster.Retry().WithMaxAttempts(3).Get(...) +``` diff --git a/pkg/clusters/cluster.go b/pkg/clusters/cluster.go index 91a8d65..a75e976 100644 --- a/pkg/clusters/cluster.go +++ b/pkg/clusters/cluster.go @@ -12,6 +12,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cluster" "github.com/openmcp-project/controller-utils/pkg/controller" + "github.com/openmcp-project/controller-utils/pkg/retry" ) type Cluster struct { @@ -238,6 +239,12 @@ func (c *Cluster) APIServerEndpoint() string { return c.restCfg.Host } +// Retry returns a retrying client for the cluster. +// Returns nil if the client has not been initialized. +func (c *Cluster) Retry() *retry.Client { + return retry.NewRetryingClient(c.Client()) +} + ///////////////// // Serializing // ///////////////// diff --git a/pkg/pairs/pairs_test.go b/pkg/pairs/pairs_test.go index 0c68421..2e7e6c5 100644 --- a/pkg/pairs/pairs_test.go +++ b/pkg/pairs/pairs_test.go @@ -14,7 +14,7 @@ import ( func TestConditions(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "ClusterAccess Test Suite") + RunSpecs(t, "Pairs Test Suite") } type comparableIntAlias int diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go new file mode 100644 index 0000000..3edda49 --- /dev/null +++ b/pkg/retry/retry.go @@ -0,0 +1,324 @@ +package retry + +import ( + "context" + "reflect" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +type Client struct { + internal client.Client + interval time.Duration + backoffMultiplier float64 + maxAttempts int + timeout time.Duration +} + +// NewRetryingClient returns a retry.Client that implements client.Client, but retries each operation that can fail with the specified parameters. +// Returns nil if the provided client is nil. +// The default parameters are: +// - interval: 100 milliseconds +// - backoffMultiplier: 1.0 (no backoff) +// - maxAttempts: 0 (no limit on attempts) +// - timeout: 1 second (timeout for retries) +// Use the builder-style With... methods to adapt the parameters. +func NewRetryingClient(c client.Client) *Client { + if c == nil { + return nil + } + return &Client{ + internal: c, + interval: 100 * time.Millisecond, // default retry interval + backoffMultiplier: 1.0, // default backoff multiplier + maxAttempts: 0, // default max retries + timeout: 1 * time.Second, // default timeout for retries + } +} + +var _ client.Client = &Client{} + +///////////// +// GETTERS // +///////////// + +// Interval returns the configured retry interval. +func (rc *Client) Interval() time.Duration { + return rc.interval +} + +// BackoffMultiplier returns the configured backoff multiplier for retries. +func (rc *Client) BackoffMultiplier() float64 { + return rc.backoffMultiplier +} + +// MaxRetries returns the configured maximum number of retries. +func (rc *Client) MaxRetries() int { + return rc.maxAttempts +} + +// Timeout returns the configured timeout for retries. +func (rc *Client) Timeout() time.Duration { + return rc.timeout +} + +///////////// +// SETTERS // +///////////// + +// WithInterval sets the retry interval for the Client. +// Default is 100 milliseconds. +// Noop if the interval is less than or equal to 0. +// It returns the Client for chaining. +func (rc *Client) WithInterval(interval time.Duration) *Client { + if interval > 0 { + rc.interval = interval + } + return rc +} + +// WithBackoffMultiplier sets the backoff multiplier for the Client. +// After each retry, the configured interval is multiplied by this factor. +// Setting it to a value less than 1 will default it to 1. +// Default is 1.0, meaning no backoff. +// Noop if the multiplier is less than 1. +// It returns the Client for chaining. +func (rc *Client) WithBackoffMultiplier(multiplier float64) *Client { + if multiplier >= 1 { + rc.backoffMultiplier = multiplier + } + return rc +} + +// WithMaxAttempts sets the maximum number of attempts for the Client. +// If set to 0, it will retry indefinitely until the timeout is reached. +// Default is 0, meaning no limit on attempts. +// Noop if the maxAttempts is less than 0. +// It returns the Client for chaining. +func (rc *Client) WithMaxAttempts(maxAttempts int) *Client { + if maxAttempts >= 0 { + rc.maxAttempts = maxAttempts + } + return rc +} + +// WithTimeout sets the timeout for retries in the Client. +// If set to 0, there is no timeout and it will retry until the maximum number of retries is reached. +// Default is 1 second. +// Noop if the timeout is less than 0. +// It returns the Client for chaining. +func (rc *Client) WithTimeout(timeout time.Duration) *Client { + if timeout >= 0 { + rc.timeout = timeout + } + return rc +} + +/////////////////////////// +// CLIENT IMPLEMENTATION // +/////////////////////////// + +type operation struct { + parent *Client + interval time.Duration + attempts int + startTime time.Time + method reflect.Value + args []reflect.Value +} + +func (rc *Client) newOperation(method reflect.Value, args ...any) *operation { + op := &operation{ + parent: rc, + interval: rc.interval, + attempts: 0, + startTime: time.Now(), + method: method, + } + if method.Type().IsVariadic() { + argCountWithoutVariadic := len(args) - 1 + last := args[argCountWithoutVariadic] + lastVal := reflect.ValueOf(last) + argCountVariadic := lastVal.Len() + op.args = make([]reflect.Value, argCountWithoutVariadic+argCountVariadic) + for i, arg := range args[:argCountWithoutVariadic] { + op.args[i] = reflect.ValueOf(arg) + } + for i := range argCountVariadic { + op.args[argCountWithoutVariadic+i] = lastVal.Index(i) + } + } else { + op.args = make([]reflect.Value, len(args)) + for i, arg := range args { + op.args[i] = reflect.ValueOf(arg) + } + } + return op +} + +// try attempts the operation. +// The first return value indicates success (true) or failure (false). +// The second return value is the duration to wait before the next retry. +// +// If it is 0, no retry is needed. +// This can be because the operation succeeded, or because the timeout or retry limit was reached. +// +// The third return value contains the return values of the operation. +func (op *operation) try() (bool, time.Duration, []reflect.Value) { + res := op.method.Call(op.args) + + // check for success by converting the last return value to an error + success := true + if len(res) > 0 { + if err, ok := res[len(res)-1].Interface().(error); ok && err != nil { + success = false + } + } + + // if the operation succeeded, return true and no retry + if success { + return true, 0, res + } + + // if the operation failed, check if we should retry + op.attempts++ + retryAfter := op.interval + op.interval = time.Duration(float64(op.interval) * op.parent.backoffMultiplier) + if (op.parent.maxAttempts > 0 && op.attempts >= op.parent.maxAttempts) || (op.parent.timeout > 0 && time.Now().Add(retryAfter).After(op.startTime.Add(op.parent.timeout))) { + // if we reached the maximum number of retries or the next retry would exceed the timeout, return false and no retry + return false, 0, res + } + + return false, retryAfter, res +} + +// retry executes the given method with the provided arguments, retrying on failure. +func (rc *Client) retry(method reflect.Value, args ...any) []reflect.Value { + op := rc.newOperation(method, args...) + var ctx context.Context + if len(args) > 0 { + if ctxArg, ok := args[0].(context.Context); ok { + ctx = ctxArg + } + } + if ctx == nil { + ctx = context.Background() + } + if rc.Timeout() > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithDeadline(ctx, op.startTime.Add(rc.timeout)) + defer cancel() + } + interruptedOrTimeouted := ctx.Done() + success, retryAfter, res := op.try() + for !success && retryAfter > 0 { + opCtx, opCancel := context.WithTimeout(ctx, retryAfter) + expired := opCtx.Done() + select { + case <-interruptedOrTimeouted: + retryAfter = 0 // stop retrying if the context was cancelled + case <-expired: + success, retryAfter, res = op.try() + } + opCancel() + } + return res +} + +func errOrNil(val reflect.Value) error { + if val.IsNil() { + return nil + } + return val.Interface().(error) +} + +// CreateOrUpdate wraps the controllerutil.CreateOrUpdate function and retries it on failure. +func (rc *Client) CreateOrUpdate(ctx context.Context, obj client.Object, f controllerutil.MutateFn) (controllerutil.OperationResult, error) { + res := rc.retry(reflect.ValueOf(controllerutil.CreateOrUpdate), ctx, rc.internal, obj, f) + return res[0].Interface().(controllerutil.OperationResult), errOrNil(res[1]) +} + +// CreateOrPatch wraps the controllerutil.CreateOrPatch function and retries it on failure. +func (rc *Client) CreateOrPatch(ctx context.Context, obj client.Object, f controllerutil.MutateFn) (controllerutil.OperationResult, error) { + res := rc.retry(reflect.ValueOf(controllerutil.CreateOrPatch), ctx, rc.internal, obj, f) + return res[0].Interface().(controllerutil.OperationResult), errOrNil(res[1]) +} + +// Create wraps the client's Create method and retries it on failure. +func (rc *Client) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + res := rc.retry(reflect.ValueOf(rc.internal.Create), ctx, obj, opts) + return errOrNil(res[0]) +} + +// Delete wraps the client's Delete method and retries it on failure. +func (rc *Client) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { + res := rc.retry(reflect.ValueOf(rc.internal.Delete), ctx, obj, opts) + return errOrNil(res[0]) +} + +// DeleteAllOf wraps the client's DeleteAllOf method and retries it on failure. +func (rc *Client) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { + res := rc.retry(reflect.ValueOf(rc.internal.DeleteAllOf), ctx, obj, opts) + return errOrNil(res[0]) +} + +// Get wraps the client's Get method and retries it on failure. +func (rc *Client) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + res := rc.retry(reflect.ValueOf(rc.internal.Get), ctx, key, obj, opts) + return errOrNil(res[0]) +} + +// List wraps the client's List method and retries it on failure. +func (rc *Client) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + res := rc.retry(reflect.ValueOf(rc.internal.List), ctx, list, opts) + return errOrNil(res[0]) +} + +// Patch wraps the client's Patch method and retries it on failure. +func (rc *Client) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + res := rc.retry(reflect.ValueOf(rc.internal.Patch), ctx, obj, patch, opts) + return errOrNil(res[0]) +} + +// Update wraps the client's Update method and retries it on failure. +func (rc *Client) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + res := rc.retry(reflect.ValueOf(rc.internal.Update), ctx, obj, opts) + return errOrNil(res[0]) +} + +// GroupVersionKindFor wraps the client's GroupVersionKindFor method and retries it on failure. +func (rc *Client) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) { + res := rc.retry(reflect.ValueOf(rc.internal.GroupVersionKindFor), obj) + return res[0].Interface().(schema.GroupVersionKind), errOrNil(res[1]) +} + +// IsObjectNamespaced wraps the client's IsObjectNamespaced method and retries it on failure. +func (rc *Client) IsObjectNamespaced(obj runtime.Object) (bool, error) { + res := rc.retry(reflect.ValueOf(rc.internal.IsObjectNamespaced), obj) + return res[0].Interface().(bool), errOrNil(res[1]) +} + +// RESTMapper calls the internal client's RESTMapper method. +func (rc *Client) RESTMapper() meta.RESTMapper { + return rc.internal.RESTMapper() +} + +// Scheme calls the internal client's Scheme method. +func (rc *Client) Scheme() *runtime.Scheme { + return rc.internal.Scheme() +} + +// Status calls the internal client's Status method. +func (rc *Client) Status() client.SubResourceWriter { + return rc.internal.Status() +} + +// SubResource calls the internal client's SubResource method. +func (rc *Client) SubResource(subResource string) client.SubResourceClient { + return rc.internal.SubResource(subResource) +} diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go new file mode 100644 index 0000000..d163307 --- /dev/null +++ b/pkg/retry/retry_test.go @@ -0,0 +1,407 @@ +package retry_test + +import ( + "context" + "fmt" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "github.com/onsi/gomega/gstruct" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + + "github.com/openmcp-project/controller-utils/pkg/retry" + testutils "github.com/openmcp-project/controller-utils/pkg/testing" +) + +func TestConditions(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecs(t, "Retry Test Suite") +} + +// mockControl is a helper struct to control the behavior of the fake client. +// Each attempt will increase the 'attempts' counter. +// Returns a mockError if +// - 'fail' is less than 0 +// - 'fail' is greater than 0 and the number of attempts is less than or equal to 'fail' +// Returns nil otherwise. +type mockControl struct { + fail int + attempts int +} + +func (mc *mockControl) reset(failCount int) { + mc.fail = failCount + mc.attempts = 0 +} + +func (mc *mockControl) try() error { + mc.attempts++ + if mc.fail < 0 || (mc.fail > 0 && mc.attempts <= mc.fail) { + return errMock + } + return nil +} + +var errMock = fmt.Errorf("mock error") + +func defaultTestSetup() (*testutils.Environment, *mockControl) { + mc := &mockControl{} + return testutils.NewEnvironmentBuilder(). + WithFakeClient(nil). + WithFakeClientBuilderCall("WithInterceptorFuncs", interceptor.Funcs{ + Get: func(ctx context.Context, client client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + if err := mc.try(); err != nil { + return err + } + return client.Get(ctx, key, obj, opts...) + }, + List: func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if err := mc.try(); err != nil { + return err + } + return client.List(ctx, list, opts...) + }, + Create: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.CreateOption) error { + if err := mc.try(); err != nil { + return err + } + return client.Create(ctx, obj, opts...) + }, + Delete: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.DeleteOption) error { + if err := mc.try(); err != nil { + return err + } + return client.Delete(ctx, obj, opts...) + }, + DeleteAllOf: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.DeleteAllOfOption) error { + if err := mc.try(); err != nil { + return err + } + return client.DeleteAllOf(ctx, obj, opts...) + }, + Update: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.UpdateOption) error { + if err := mc.try(); err != nil { + return err + } + return client.Update(ctx, obj, opts...) + }, + Patch: func(ctx context.Context, client client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + if err := mc.try(); err != nil { + return err + } + return client.Patch(ctx, obj, patch, opts...) + }, + }). + Build(), mc +} + +var _ = Describe("Client", func() { + + It("should not retry if the operation succeeds immediately", func() { + env, mc := defaultTestSetup() + c := retry.NewRetryingClient(env.Client()) + + // create a Namespace + ns := &corev1.Namespace{} + ns.Name = "test" + mc.reset(0) + Expect(c.Create(env.Ctx, ns)).To(Succeed()) + Expect(mc.attempts).To(Equal(1)) + + // get the Namespace + mc.reset(0) + Expect(c.Get(env.Ctx, client.ObjectKeyFromObject(ns), ns)).To(Succeed()) + Expect(mc.attempts).To(Equal(1)) + + // list Namespaces + mc.reset(0) + nsList := &corev1.NamespaceList{} + Expect(c.List(env.Ctx, nsList)).To(Succeed()) + Expect(mc.attempts).To(Equal(1)) + Expect(nsList.Items).To(ContainElement(MatchFields(IgnoreExtras, Fields{ + "ObjectMeta": MatchFields(IgnoreExtras, Fields{ + "Name": Equal("test"), + }), + }))) + + // update the Namespace + mc.reset(0) + ns.Labels = map[string]string{"test": "label"} + Expect(c.Update(env.Ctx, ns)).To(Succeed()) + Expect(mc.attempts).To(Equal(1)) + + // patch the Namespace + mc.reset(0) + old := ns.DeepCopy() + ns.Labels = nil + Expect(c.Patch(env.Ctx, ns, client.MergeFrom(old))).To(Succeed()) + Expect(mc.attempts).To(Equal(1)) + + // delete the Namespace + mc.reset(0) + Expect(c.Delete(env.Ctx, ns)).To(Succeed()) + Expect(mc.attempts).To(Equal(1)) + + // delete all Namespaces + mc.reset(0) + Expect(c.DeleteAllOf(env.Ctx, &corev1.Namespace{})).To(Succeed()) + Expect(mc.attempts).To(Equal(1)) + }) + + It("should retry if the operation does not succeed immediately", func() { + env, mc := defaultTestSetup() + c := retry.NewRetryingClient(env.Client()).WithMaxAttempts(5).WithTimeout(0) + + // create a Namespace + ns := &corev1.Namespace{} + ns.Name = "test" + mc.reset(2) + Expect(c.Create(env.Ctx, ns)).To(Succeed()) + Expect(mc.attempts).To(Equal(3)) + + // get the Namespace + mc.reset(2) + Expect(c.Get(env.Ctx, client.ObjectKeyFromObject(ns), ns)).To(Succeed()) + Expect(mc.attempts).To(Equal(3)) + + // list Namespaces + mc.reset(2) + nsList := &corev1.NamespaceList{} + Expect(c.List(env.Ctx, nsList)).To(Succeed()) + Expect(mc.attempts).To(Equal(3)) + Expect(nsList.Items).To(ContainElement(MatchFields(IgnoreExtras, Fields{ + "ObjectMeta": MatchFields(IgnoreExtras, Fields{ + "Name": Equal("test"), + }), + }))) + + // update the Namespace + mc.reset(2) + ns.Labels = map[string]string{"test": "label"} + Expect(c.Update(env.Ctx, ns)).To(Succeed()) + Expect(mc.attempts).To(Equal(3)) + + // patch the Namespace + mc.reset(2) + old := ns.DeepCopy() + ns.Labels = nil + Expect(c.Patch(env.Ctx, ns, client.MergeFrom(old))).To(Succeed()) + Expect(mc.attempts).To(Equal(3)) + + // delete the Namespace + mc.reset(2) + Expect(c.Delete(env.Ctx, ns)).To(Succeed()) + Expect(mc.attempts).To(Equal(3)) + + // delete all Namespaces + mc.reset(2) + Expect(c.DeleteAllOf(env.Ctx, &corev1.Namespace{})).To(Succeed()) + Expect(mc.attempts).To(Equal(3)) + }) + + It("should not retry more often than configured", func() { + env, mc := defaultTestSetup() + c := retry.NewRetryingClient(env.Client()).WithMaxAttempts(5).WithTimeout(0) + + // create a Namespace + ns := &corev1.Namespace{} + ns.Name = "test" + mc.reset(-1) + Expect(c.Create(env.Ctx, ns)).ToNot(Succeed()) + Expect(mc.attempts).To(Equal(5)) + + // get the Namespace + mc.reset(-1) + Expect(c.Get(env.Ctx, client.ObjectKeyFromObject(ns), ns)).ToNot(Succeed()) + Expect(mc.attempts).To(Equal(5)) + + // list Namespaces + mc.reset(-1) + nsList := &corev1.NamespaceList{} + Expect(c.List(env.Ctx, nsList)).ToNot(Succeed()) + Expect(mc.attempts).To(Equal(5)) + + // update the Namespace + mc.reset(-1) + ns.Labels = map[string]string{"test": "label"} + Expect(c.Update(env.Ctx, ns)).ToNot(Succeed()) + Expect(mc.attempts).To(Equal(5)) + + // patch the Namespace + mc.reset(-1) + old := ns.DeepCopy() + ns.Labels = nil + Expect(c.Patch(env.Ctx, ns, client.MergeFrom(old))).ToNot(Succeed()) + Expect(mc.attempts).To(Equal(5)) + + // delete the Namespace + mc.reset(-1) + Expect(c.Delete(env.Ctx, ns)).ToNot(Succeed()) + Expect(mc.attempts).To(Equal(5)) + + // delete all Namespaces + mc.reset(-1) + Expect(c.DeleteAllOf(env.Ctx, &corev1.Namespace{})).ToNot(Succeed()) + Expect(mc.attempts).To(Equal(5)) + }) + + It("should not retry longer than configured", func() { + env, mc := defaultTestSetup() + c := retry.NewRetryingClient(env.Client()).WithMaxAttempts(0).WithTimeout(500 * time.Millisecond) + + // for performance reasons, let's test this for Create only + ns := &corev1.Namespace{} + ns.Name = "test" + mc.reset(-1) + now := time.Now() + timeoutCtx, cancel := context.WithTimeout(env.Ctx, 1*time.Second) + defer cancel() + Expect(c.Create(timeoutCtx, ns)).ToNot(Succeed()) + after := time.Now() + Expect(after.Sub(now)).To(BeNumerically(">=", 400*time.Millisecond)) + Expect(after.Sub(now)).To(BeNumerically("<", 1*time.Second)) + Expect(mc.attempts).To(BeNumerically(">=", 4)) + Expect(mc.attempts).To(BeNumerically("<=", 5)) + }) + + It("should apply the backoff multiplier correctly", func() { + env, mc := defaultTestSetup() + c := retry.NewRetryingClient(env.Client()).WithMaxAttempts(0).WithTimeout(500 * time.Millisecond).WithBackoffMultiplier(3.0) + + // for performance reasons, let's test this for Create only + ns := &corev1.Namespace{} + ns.Name = "test" + mc.reset(-1) + now := time.Now() + timeoutCtx, cancel := context.WithTimeout(env.Ctx, 1*time.Second) + defer cancel() + Expect(c.Create(timeoutCtx, ns)).ToNot(Succeed()) + after := time.Now() + Expect(after.Sub(now)).To(BeNumerically(">=", 400*time.Millisecond)) + Expect(after.Sub(now)).To(BeNumerically("<", 1*time.Second)) + Expect(mc.attempts).To(BeNumerically("==", 3)) + }) + + It("should abort if the context is canceled", func() { + env, mc := defaultTestSetup() + c := retry.NewRetryingClient(env.Client()).WithMaxAttempts(0).WithTimeout(500 * time.Millisecond) + + // for performance reasons, let's test this for Create only + ns := &corev1.Namespace{} + ns.Name = "test" + mc.reset(-1) + now := time.Now() + timeoutCtx, cancel := context.WithTimeout(env.Ctx, 200*time.Millisecond) + defer cancel() + Expect(c.Create(timeoutCtx, ns)).ToNot(Succeed()) + after := time.Now() + Expect(after.Sub(now)).To(BeNumerically("<", 300*time.Millisecond)) + Expect(mc.attempts).To(BeNumerically("<=", 3)) + }) + + It("should pass the arguments through correctly", func() { + env, mc := defaultTestSetup() + c := retry.NewRetryingClient(env.Client()) + + // for performance reasons, let's test this for Create only + s1 := &corev1.Secret{} + s1.Name = "test" + s1.Namespace = "foo" + Expect(env.Client().Create(env.Ctx, s1)).To(Succeed()) + s2 := &corev1.Secret{} + s2.Name = "test" + s2.Namespace = "bar" + Expect(env.Client().Create(env.Ctx, s2)).To(Succeed()) + mc.reset(0) + l1 := &corev1.SecretList{} + Expect(c.List(env.Ctx, l1)).To(Succeed()) + Expect(mc.attempts).To(Equal(1)) + Expect(l1.Items).To(ConsistOf( + MatchFields(IgnoreExtras, Fields{ + "ObjectMeta": MatchFields(IgnoreExtras, Fields{ + "Name": Equal("test"), + "Namespace": Equal("foo"), + }), + }), + MatchFields(IgnoreExtras, Fields{ + "ObjectMeta": MatchFields(IgnoreExtras, Fields{ + "Name": Equal("test"), + "Namespace": Equal("bar"), + }), + }), + )) + mc.reset(0) + l2 := &corev1.SecretList{} + Expect(c.List(env.Ctx, l2, client.InNamespace("foo"))).To(Succeed()) + Expect(mc.attempts).To(Equal(1)) + Expect(l2.Items).To(ConsistOf( + MatchFields(IgnoreExtras, Fields{ + "ObjectMeta": MatchFields(IgnoreExtras, Fields{ + "Name": Equal("test"), + "Namespace": Equal("foo"), + }), + }), + )) + }) + + It("should correctly handle CreateOrUpdate and CreateOrPatch", func() { + env, mc := defaultTestSetup() + c := retry.NewRetryingClient(env.Client()).WithMaxAttempts(5).WithTimeout(0) + + // create or update namespace + // we cannot check mc.attempts here, because CreateOrUpdate calls multiple methods on the client internally + ns := &corev1.Namespace{} + ns.Name = "test" + mc.reset(0) + Expect(c.CreateOrUpdate(env.Ctx, ns, func() error { + return nil + })) + Expect(env.Client().Get(env.Ctx, client.ObjectKeyFromObject(ns), ns)).To(Succeed()) + mc.reset(0) + Expect(c.CreateOrUpdate(env.Ctx, ns, func() error { + ns.Labels = map[string]string{"test": "label"} + return nil + })) + Expect(env.Client().Get(env.Ctx, client.ObjectKeyFromObject(ns), ns)).To(Succeed()) + Expect(ns.Labels).To(HaveKeyWithValue("test", "label")) + mc.reset(2) + Expect(c.CreateOrUpdate(env.Ctx, ns, func() error { + ns.Labels = map[string]string{"test2": "label2"} + return nil + })) + Expect(env.Client().Get(env.Ctx, client.ObjectKeyFromObject(ns), ns)).To(Succeed()) + Expect(ns.Labels).To(HaveKeyWithValue("test2", "label2")) + Expect(env.Client().Delete(env.Ctx, ns)).To(Succeed()) + + // create or patch namespace + ns = &corev1.Namespace{} + ns.Name = "test" + mc.reset(0) + Expect(c.CreateOrPatch(env.Ctx, ns, func() error { + return nil + })) + Expect(env.Client().Get(env.Ctx, client.ObjectKeyFromObject(ns), ns)).To(Succeed()) + mc.reset(0) + Expect(c.CreateOrPatch(env.Ctx, ns, func() error { + ns.Labels = map[string]string{"test": "label"} + return nil + })) + Expect(env.Client().Get(env.Ctx, client.ObjectKeyFromObject(ns), ns)).To(Succeed()) + Expect(ns.Labels).To(HaveKeyWithValue("test", "label")) + mc.reset(2) + Expect(c.CreateOrUpdate(env.Ctx, ns, func() error { + ns.Labels = map[string]string{"test2": "label2"} + return nil + })) + Expect(env.Client().Get(env.Ctx, client.ObjectKeyFromObject(ns), ns)).To(Succeed()) + Expect(ns.Labels).To(HaveKeyWithValue("test2", "label2")) + Expect(env.Client().Delete(env.Ctx, ns)).To(Succeed()) + }) + +}) From 40620cffa1895cdf4b72ef2673e61f8a1c30e932 Mon Sep 17 00:00:00 2001 From: Johannes Aubart Date: Tue, 8 Jul 2025 13:09:39 +0200 Subject: [PATCH 2/4] implement review feedback --- pkg/retry/retry.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 3edda49..5c1649b 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -57,8 +57,8 @@ func (rc *Client) BackoffMultiplier() float64 { return rc.backoffMultiplier } -// MaxRetries returns the configured maximum number of retries. -func (rc *Client) MaxRetries() int { +// MaxAttempts returns the configured maximum number of retries. +func (rc *Client) MaxAttempts() int { return rc.maxAttempts } From 3c4db1e0264caa00856b3383f1a15144307c9e82 Mon Sep 17 00:00:00 2001 From: Valentin Gerlach Date: Fri, 11 Jul 2025 15:52:33 +0200 Subject: [PATCH 3/4] try approach without reflection --- pkg/retry/retry.go | 175 +++++++++++++++++++++------------------------ 1 file changed, 82 insertions(+), 93 deletions(-) diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 5c1649b..e3178ed 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -2,7 +2,6 @@ package retry import ( "context" - "reflect" "time" "k8s.io/apimachinery/pkg/api/meta" @@ -123,42 +122,24 @@ func (rc *Client) WithTimeout(timeout time.Duration) *Client { // CLIENT IMPLEMENTATION // /////////////////////////// +type callbackFn func(ctx context.Context) error + type operation struct { parent *Client interval time.Duration attempts int startTime time.Time - method reflect.Value - args []reflect.Value + cfn callbackFn } -func (rc *Client) newOperation(method reflect.Value, args ...any) *operation { - op := &operation{ +func (rc *Client) newOperation(cfn callbackFn) *operation { + return &operation{ parent: rc, interval: rc.interval, attempts: 0, startTime: time.Now(), - method: method, + cfn: cfn, } - if method.Type().IsVariadic() { - argCountWithoutVariadic := len(args) - 1 - last := args[argCountWithoutVariadic] - lastVal := reflect.ValueOf(last) - argCountVariadic := lastVal.Len() - op.args = make([]reflect.Value, argCountWithoutVariadic+argCountVariadic) - for i, arg := range args[:argCountWithoutVariadic] { - op.args[i] = reflect.ValueOf(arg) - } - for i := range argCountVariadic { - op.args[argCountWithoutVariadic+i] = lastVal.Index(i) - } - } else { - op.args = make([]reflect.Value, len(args)) - for i, arg := range args { - op.args[i] = reflect.ValueOf(arg) - } - } - return op } // try attempts the operation. @@ -169,20 +150,12 @@ func (rc *Client) newOperation(method reflect.Value, args ...any) *operation { // This can be because the operation succeeded, or because the timeout or retry limit was reached. // // The third return value contains the return values of the operation. -func (op *operation) try() (bool, time.Duration, []reflect.Value) { - res := op.method.Call(op.args) - - // check for success by converting the last return value to an error - success := true - if len(res) > 0 { - if err, ok := res[len(res)-1].Interface().(error); ok && err != nil { - success = false - } - } +func (op *operation) try(ctx context.Context) (bool, time.Duration) { + err := op.cfn(ctx) // if the operation succeeded, return true and no retry - if success { - return true, 0, res + if err == nil { + return true, 0 } // if the operation failed, check if we should retry @@ -191,31 +164,22 @@ func (op *operation) try() (bool, time.Duration, []reflect.Value) { op.interval = time.Duration(float64(op.interval) * op.parent.backoffMultiplier) if (op.parent.maxAttempts > 0 && op.attempts >= op.parent.maxAttempts) || (op.parent.timeout > 0 && time.Now().Add(retryAfter).After(op.startTime.Add(op.parent.timeout))) { // if we reached the maximum number of retries or the next retry would exceed the timeout, return false and no retry - return false, 0, res + return false, 0 } - return false, retryAfter, res + return false, retryAfter } // retry executes the given method with the provided arguments, retrying on failure. -func (rc *Client) retry(method reflect.Value, args ...any) []reflect.Value { - op := rc.newOperation(method, args...) - var ctx context.Context - if len(args) > 0 { - if ctxArg, ok := args[0].(context.Context); ok { - ctx = ctxArg - } - } - if ctx == nil { - ctx = context.Background() - } +func (rc *Client) retry(ctx context.Context, cfn callbackFn) { + op := rc.newOperation(cfn) if rc.Timeout() > 0 { var cancel context.CancelFunc ctx, cancel = context.WithDeadline(ctx, op.startTime.Add(rc.timeout)) defer cancel() } interruptedOrTimeouted := ctx.Done() - success, retryAfter, res := op.try() + success, retryAfter := op.try(ctx) for !success && retryAfter > 0 { opCtx, opCancel := context.WithTimeout(ctx, retryAfter) expired := opCtx.Done() @@ -223,84 +187,109 @@ func (rc *Client) retry(method reflect.Value, args ...any) []reflect.Value { case <-interruptedOrTimeouted: retryAfter = 0 // stop retrying if the context was cancelled case <-expired: - success, retryAfter, res = op.try() + success, retryAfter = op.try(ctx) } opCancel() } - return res -} - -func errOrNil(val reflect.Value) error { - if val.IsNil() { - return nil - } - return val.Interface().(error) } // CreateOrUpdate wraps the controllerutil.CreateOrUpdate function and retries it on failure. -func (rc *Client) CreateOrUpdate(ctx context.Context, obj client.Object, f controllerutil.MutateFn) (controllerutil.OperationResult, error) { - res := rc.retry(reflect.ValueOf(controllerutil.CreateOrUpdate), ctx, rc.internal, obj, f) - return res[0].Interface().(controllerutil.OperationResult), errOrNil(res[1]) +func (rc *Client) CreateOrUpdate(ctx context.Context, obj client.Object, f controllerutil.MutateFn) (res controllerutil.OperationResult, err error) { + rc.retry(ctx, func(ctx context.Context) error { + res, err = controllerutil.CreateOrUpdate(ctx, rc.internal, obj, f) + return err + }) + return } // CreateOrPatch wraps the controllerutil.CreateOrPatch function and retries it on failure. -func (rc *Client) CreateOrPatch(ctx context.Context, obj client.Object, f controllerutil.MutateFn) (controllerutil.OperationResult, error) { - res := rc.retry(reflect.ValueOf(controllerutil.CreateOrPatch), ctx, rc.internal, obj, f) - return res[0].Interface().(controllerutil.OperationResult), errOrNil(res[1]) +func (rc *Client) CreateOrPatch(ctx context.Context, obj client.Object, f controllerutil.MutateFn) (res controllerutil.OperationResult, err error) { + rc.retry(ctx, func(ctx context.Context) error { + res, err = controllerutil.CreateOrPatch(ctx, rc.internal, obj, f) + return err + }) + return } // Create wraps the client's Create method and retries it on failure. -func (rc *Client) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { - res := rc.retry(reflect.ValueOf(rc.internal.Create), ctx, obj, opts) - return errOrNil(res[0]) +func (rc *Client) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) (err error) { + rc.retry(ctx, func(ctx context.Context) error { + err = rc.internal.Create(ctx, obj, opts...) + return err + }) + return } // Delete wraps the client's Delete method and retries it on failure. -func (rc *Client) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { - res := rc.retry(reflect.ValueOf(rc.internal.Delete), ctx, obj, opts) - return errOrNil(res[0]) +func (rc *Client) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) (err error) { + rc.retry(ctx, func(ctx context.Context) error { + err = rc.internal.Delete(ctx, obj, opts...) + return err + }) + return } // DeleteAllOf wraps the client's DeleteAllOf method and retries it on failure. -func (rc *Client) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { - res := rc.retry(reflect.ValueOf(rc.internal.DeleteAllOf), ctx, obj, opts) - return errOrNil(res[0]) +func (rc *Client) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) (err error) { + rc.retry(ctx, func(ctx context.Context) error { + err = rc.internal.DeleteAllOf(ctx, obj, opts...) + return err + }) + return } // Get wraps the client's Get method and retries it on failure. -func (rc *Client) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { - res := rc.retry(reflect.ValueOf(rc.internal.Get), ctx, key, obj, opts) - return errOrNil(res[0]) +func (rc *Client) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) (err error) { + rc.retry(ctx, func(ctx context.Context) error { + err = rc.internal.Get(ctx, key, obj, opts...) + return err + }) + return } // List wraps the client's List method and retries it on failure. -func (rc *Client) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { - res := rc.retry(reflect.ValueOf(rc.internal.List), ctx, list, opts) - return errOrNil(res[0]) +func (rc *Client) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) { + rc.retry(ctx, func(ctx context.Context) error { + err = rc.internal.List(ctx, list, opts...) + return err + }) + return } // Patch wraps the client's Patch method and retries it on failure. -func (rc *Client) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { - res := rc.retry(reflect.ValueOf(rc.internal.Patch), ctx, obj, patch, opts) - return errOrNil(res[0]) +func (rc *Client) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) (err error) { + rc.retry(ctx, func(ctx context.Context) error { + err = rc.internal.Patch(ctx, obj, patch, opts...) + return err + }) + return } // Update wraps the client's Update method and retries it on failure. -func (rc *Client) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { - res := rc.retry(reflect.ValueOf(rc.internal.Update), ctx, obj, opts) - return errOrNil(res[0]) +func (rc *Client) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) (err error) { + rc.retry(ctx, func(ctx context.Context) error { + err = rc.internal.Update(ctx, obj, opts...) + return err + }) + return } // GroupVersionKindFor wraps the client's GroupVersionKindFor method and retries it on failure. -func (rc *Client) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) { - res := rc.retry(reflect.ValueOf(rc.internal.GroupVersionKindFor), obj) - return res[0].Interface().(schema.GroupVersionKind), errOrNil(res[1]) +func (rc *Client) GroupVersionKindFor(obj runtime.Object) (gvk schema.GroupVersionKind, err error) { + rc.retry(context.Background(), func(ctx context.Context) error { + gvk, err = rc.internal.GroupVersionKindFor(obj) + return err + }) + return } // IsObjectNamespaced wraps the client's IsObjectNamespaced method and retries it on failure. -func (rc *Client) IsObjectNamespaced(obj runtime.Object) (bool, error) { - res := rc.retry(reflect.ValueOf(rc.internal.IsObjectNamespaced), obj) - return res[0].Interface().(bool), errOrNil(res[1]) +func (rc *Client) IsObjectNamespaced(obj runtime.Object) (namespaced bool, err error) { + rc.retry(context.Background(), func(ctx context.Context) error { + namespaced, err = rc.internal.IsObjectNamespaced(obj) + return err + }) + return } // RESTMapper calls the internal client's RESTMapper method. From 8f900f705920af54d461514de96908ca6b0afcd6 Mon Sep 17 00:00:00 2001 From: Johannes Aubart Date: Mon, 14 Jul 2025 15:54:35 +0200 Subject: [PATCH 4/4] add possibility to inject context for cancellation into GroupVersionKindForObject and IsObjectNamespaced --- pkg/retry/retry.go | 26 ++++++++++++++++++++++++-- pkg/retry/retry_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index e3178ed..8741b6d 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -17,6 +17,7 @@ type Client struct { backoffMultiplier float64 maxAttempts int timeout time.Duration + context context.Context } // NewRetryingClient returns a retry.Client that implements client.Client, but retries each operation that can fail with the specified parameters. @@ -37,6 +38,7 @@ func NewRetryingClient(c client.Client) *Client { backoffMultiplier: 1.0, // default backoff multiplier maxAttempts: 0, // default max retries timeout: 1 * time.Second, // default timeout for retries + context: context.Background(), // default context } } @@ -118,6 +120,25 @@ func (rc *Client) WithTimeout(timeout time.Duration) *Client { return rc } +// WithContext sets the context for the next call of either GroupVersionKindFor or IsObjectNamespaced. +// Since the signature of these methods does not allow passing a context, and the retrying can not be cancelled without one, +// this method is required to inject the context to be used for the aforementioned methods. +// Note that any function of this Client that actually retries an operation will reset this context, but only for GroupVersionKindFor and IsObjectNamespaced it will actually be used. +// The intended use of this method is something like this: +// +// c.WithContext(ctx).GroupVersionKindFor(obj) +// c.WithContext(ctx).IsObjectNamespaced(obj) +// +// If no context is injected via this method, both GroupVersionKindFor and IsObjectNamespaced will use the default context.Background(). +// It returns the Client for chaining. +func (rc *Client) WithContext(ctx context.Context) *Client { + if ctx == nil { + ctx = context.Background() // default to background context if nil + } + rc.context = ctx + return rc +} + /////////////////////////// // CLIENT IMPLEMENTATION // /////////////////////////// @@ -172,6 +193,7 @@ func (op *operation) try(ctx context.Context) (bool, time.Duration) { // retry executes the given method with the provided arguments, retrying on failure. func (rc *Client) retry(ctx context.Context, cfn callbackFn) { + rc.WithContext(context.Background()) // reset context op := rc.newOperation(cfn) if rc.Timeout() > 0 { var cancel context.CancelFunc @@ -276,7 +298,7 @@ func (rc *Client) Update(ctx context.Context, obj client.Object, opts ...client. // GroupVersionKindFor wraps the client's GroupVersionKindFor method and retries it on failure. func (rc *Client) GroupVersionKindFor(obj runtime.Object) (gvk schema.GroupVersionKind, err error) { - rc.retry(context.Background(), func(ctx context.Context) error { + rc.retry(rc.context, func(ctx context.Context) error { gvk, err = rc.internal.GroupVersionKindFor(obj) return err }) @@ -285,7 +307,7 @@ func (rc *Client) GroupVersionKindFor(obj runtime.Object) (gvk schema.GroupVersi // IsObjectNamespaced wraps the client's IsObjectNamespaced method and retries it on failure. func (rc *Client) IsObjectNamespaced(obj runtime.Object) (namespaced bool, err error) { - rc.retry(context.Background(), func(ctx context.Context) error { + rc.retry(rc.context, func(ctx context.Context) error { namespaced, err = rc.internal.IsObjectNamespaced(obj) return err }) diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go index d163307..12445a5 100644 --- a/pkg/retry/retry_test.go +++ b/pkg/retry/retry_test.go @@ -305,6 +305,31 @@ var _ = Describe("Client", func() { Expect(mc.attempts).To(BeNumerically("<=", 3)) }) + It("should handle WithContext correctly", func() { + env, mc := defaultTestSetup() + c := retry.NewRetryingClient(env.Client()).WithMaxAttempts(0).WithTimeout(500 * time.Millisecond) + + type dummy struct { + corev1.Namespace + } + + mc.reset(-1) + now := time.Now() + timeoutCtx, cancel := context.WithTimeout(env.Ctx, 200*time.Millisecond) + defer cancel() + _, err := c.WithContext(timeoutCtx).GroupVersionKindFor(&dummy{}) + Expect(err).To(HaveOccurred()) + after := time.Now() + Expect(after.Sub(now)).To(BeNumerically("<", 300*time.Millisecond)) + + // should not use the same context again, it should have been reset + now = time.Now() + _, err = c.GroupVersionKindFor(&dummy{}) + Expect(err).To(HaveOccurred()) + after = time.Now() + Expect(after.Sub(now)).To(BeNumerically(">", 300*time.Millisecond)) + }) + It("should pass the arguments through correctly", func() { env, mc := defaultTestSetup() c := retry.NewRetryingClient(env.Client())