From 3c78f088d81f85285e762951b968964a17387efc Mon Sep 17 00:00:00 2001 From: Daniel Lipovetsky Date: Thu, 27 Jun 2024 17:53:17 -0700 Subject: [PATCH 1/2] feat: Add waiter for object Implements a wait for a check to pass against a typed object. --- pkg/wait/wait.go | 74 +++++++++++++++++++++++ pkg/wait/wait_test.go | 137 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 pkg/wait/wait.go create mode 100644 pkg/wait/wait_test.go diff --git a/pkg/wait/wait.go b/pkg/wait/wait.go new file mode 100644 index 000000000..5f4b35d90 --- /dev/null +++ b/pkg/wait/wait.go @@ -0,0 +1,74 @@ +// Copyright 2024 Nutanix. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +package wait + +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// CheckFailedError is used to determine whether the wait failed because wraps an error returned by a failed check. +type CheckFailedError struct { + cause error +} + +func (e *CheckFailedError) Error() string { + return fmt.Sprintf("check failed: %s", e.cause) +} + +func (e *CheckFailedError) Is(target error) bool { + _, ok := target.(*CheckFailedError) + return ok +} + +func (e *CheckFailedError) Unwrap() error { + return e.cause +} + +type ForObjectInput[T client.Object] struct { + Reader client.Reader + Target T + Check func(ctx context.Context, obj T) (bool, error) + Interval time.Duration + Timeout time.Duration +} + +func ForObject[T client.Object]( + ctx context.Context, + input ForObjectInput[T], +) error { + key := client.ObjectKeyFromObject(input.Target) + + var getErr error + waitErr := wait.PollUntilContextTimeout( + ctx, + input.Interval, + input.Timeout, + true, + func(checkCtx context.Context) (bool, error) { + if getErr = input.Reader.Get(checkCtx, key, input.Target); getErr != nil { + // Retry if get fails. + return false, nil + } + + if ok, err := input.Check(checkCtx, input.Target); err != nil { + return false, &CheckFailedError{cause: err} + } else { + // Retry if check fails. + return ok, nil + } + }) + + if wait.Interrupted(waitErr) { + if getErr != nil { + return fmt.Errorf("%w; last get error: %w", waitErr, getErr) + } + return fmt.Errorf("%w: check never passed", waitErr) + } + // waitErr is a CheckFailedError + return waitErr +} diff --git a/pkg/wait/wait_test.go b/pkg/wait/wait_test.go new file mode 100644 index 000000000..a03ac29d3 --- /dev/null +++ b/pkg/wait/wait_test.go @@ -0,0 +1,137 @@ +// Copyright 2024 Nutanix. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +package wait + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestWait(t *testing.T) { + // We use the corev1.Namespace concrete type for the test, because we want to + // verify behavior for a concrete type, and because the Wait function is + // generic, and will behave identically for all concrete types. + type args struct { + input ForObjectInput[*corev1.Namespace] + } + tests := []struct { + name string + args args + errCheck func(error) bool + }{ + { + name: "time out while get fails; report get error", + args: args{ + input: ForObjectInput[*corev1.Namespace]{ + Reader: fake.NewFakeClient(), + Check: func(_ context.Context, _ *corev1.Namespace) (bool, error) { + return true, nil + }, + Interval: time.Nanosecond, + Timeout: time.Millisecond, + Target: &corev1.Namespace{ + TypeMeta: v1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "example", + }, + }, + }, + }, + errCheck: func(err error) bool { + return wait.Interrupted(err) && + apierrors.IsNotFound(err) + }, + }, + { + name: "time out while check returns false; no check error to report", + args: args{ + input: ForObjectInput[*corev1.Namespace]{ + Reader: fake.NewFakeClient( + &corev1.Namespace{ + TypeMeta: v1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "example", + }, + }, + ), + Check: func(_ context.Context, _ *corev1.Namespace) (bool, error) { + return false, nil + }, + Interval: time.Nanosecond, + Timeout: time.Millisecond, + Target: &corev1.Namespace{ + TypeMeta: v1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "example", + }, + }, + }, + }, + errCheck: wait.Interrupted, + }, + { + name: "return immediately when check returns an error; report the error", + args: args{ + input: ForObjectInput[*corev1.Namespace]{ + Reader: fake.NewFakeClient( + &corev1.Namespace{ + TypeMeta: v1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "example", + }, + }, + ), + Check: func(_ context.Context, _ *corev1.Namespace) (bool, error) { + return false, fmt.Errorf("condition failed") + }, + Interval: time.Nanosecond, + Timeout: time.Millisecond, Target: &corev1.Namespace{ + TypeMeta: v1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "example", + }, + }, + }, + }, + errCheck: func(err error) bool { + return errors.Is(err, &CheckFailedError{}) && + !wait.Interrupted(err) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := ForObject( + context.Background(), + tt.args.input, + ) + if !tt.errCheck(err) { + t.Errorf("error did not pass check: %s", err) + } + }) + } +} From e57042be3512a2a4abf1afcd93a6737c0a0f78cb Mon Sep 17 00:00:00 2001 From: Daniel Lipovetsky Date: Tue, 2 Jul 2024 10:08:53 -0700 Subject: [PATCH 2/2] fixup! feat: Add waiter for object - Retry get only if object is not found, fail immediately otherwise. --- pkg/wait/wait.go | 7 +- pkg/wait/wait_test.go | 164 ++++++++++++++++++++++++++---------------- 2 files changed, 107 insertions(+), 64 deletions(-) diff --git a/pkg/wait/wait.go b/pkg/wait/wait.go index 5f4b35d90..674862ab7 100644 --- a/pkg/wait/wait.go +++ b/pkg/wait/wait.go @@ -7,6 +7,7 @@ import ( "fmt" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -51,8 +52,10 @@ func ForObject[T client.Object]( true, func(checkCtx context.Context) (bool, error) { if getErr = input.Reader.Get(checkCtx, key, input.Target); getErr != nil { - // Retry if get fails. - return false, nil + if apierrors.IsNotFound(getErr) { + return false, nil + } + return false, getErr } if ok, err := input.Check(checkCtx, input.Target); err != nil { diff --git a/pkg/wait/wait_test.go b/pkg/wait/wait_test.go index a03ac29d3..6debc479d 100644 --- a/pkg/wait/wait_test.go +++ b/pkg/wait/wait_test.go @@ -13,39 +13,58 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) +var errBrokenReader = errors.New("broken") + +type brokenReader struct{} + +func (r *brokenReader) Get( + ctx context.Context, + key client.ObjectKey, + obj client.Object, + opts ...client.GetOption, +) error { + return errBrokenReader +} + +func (r *brokenReader) List( + ctx context.Context, + list client.ObjectList, + opts ...client.ListOption, +) error { + return errBrokenReader +} + +var _ client.Reader = &brokenReader{} + func TestWait(t *testing.T) { - // We use the corev1.Namespace concrete type for the test, because we want to - // verify behavior for a concrete type, and because the Wait function is - // generic, and will behave identically for all concrete types. - type args struct { - input ForObjectInput[*corev1.Namespace] - } tests := []struct { - name string - args args + name string + // We use the corev1.Namespace concrete type for the test, because we want to + // verify behavior for a concrete type, and because the Wait function is + // generic, and will behave identically for all concrete types. + input ForObjectInput[*corev1.Namespace] errCheck func(error) bool }{ { - name: "time out while get fails; report get error", - args: args{ - input: ForObjectInput[*corev1.Namespace]{ - Reader: fake.NewFakeClient(), - Check: func(_ context.Context, _ *corev1.Namespace) (bool, error) { - return true, nil + name: "time out while get does not find object; report get error", + input: ForObjectInput[*corev1.Namespace]{ + Reader: fake.NewFakeClient(), + Check: func(_ context.Context, _ *corev1.Namespace) (bool, error) { + return true, nil + }, + Interval: time.Nanosecond, + Timeout: time.Millisecond, + Target: &corev1.Namespace{ + TypeMeta: v1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", }, - Interval: time.Nanosecond, - Timeout: time.Millisecond, - Target: &corev1.Namespace{ - TypeMeta: v1.TypeMeta{ - Kind: "Namespace", - APIVersion: "v1", - }, - ObjectMeta: v1.ObjectMeta{ - Name: "example", - }, + ObjectMeta: v1.ObjectMeta{ + Name: "example", }, }, }, @@ -55,26 +74,35 @@ func TestWait(t *testing.T) { }, }, { - name: "time out while check returns false; no check error to report", - args: args{ - input: ForObjectInput[*corev1.Namespace]{ - Reader: fake.NewFakeClient( - &corev1.Namespace{ - TypeMeta: v1.TypeMeta{ - Kind: "Namespace", - APIVersion: "v1", - }, - ObjectMeta: v1.ObjectMeta{ - Name: "example", - }, - }, - ), - Check: func(_ context.Context, _ *corev1.Namespace) (bool, error) { - return false, nil + name: "return immediately when get fails; report get error", + input: ForObjectInput[*corev1.Namespace]{ + Reader: &brokenReader{}, + Check: func(_ context.Context, _ *corev1.Namespace) (bool, error) { + return true, nil + }, + Interval: time.Nanosecond, + Timeout: time.Millisecond, + Target: &corev1.Namespace{ + TypeMeta: v1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "example", }, - Interval: time.Nanosecond, - Timeout: time.Millisecond, - Target: &corev1.Namespace{ + }, + }, + errCheck: func(err error) bool { + return !wait.Interrupted(err) && + !apierrors.IsNotFound(err) && + errors.Is(err, errBrokenReader) + }, + }, + { + name: "time out while check returns false; no check error to report", + input: ForObjectInput[*corev1.Namespace]{ + Reader: fake.NewFakeClient( + &corev1.Namespace{ TypeMeta: v1.TypeMeta{ Kind: "Namespace", APIVersion: "v1", @@ -83,30 +111,29 @@ func TestWait(t *testing.T) { Name: "example", }, }, + ), + Check: func(_ context.Context, _ *corev1.Namespace) (bool, error) { + return false, nil + }, + Interval: time.Nanosecond, + Timeout: time.Millisecond, + Target: &corev1.Namespace{ + TypeMeta: v1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "example", + }, }, }, errCheck: wait.Interrupted, }, { name: "return immediately when check returns an error; report the error", - args: args{ - input: ForObjectInput[*corev1.Namespace]{ - Reader: fake.NewFakeClient( - &corev1.Namespace{ - TypeMeta: v1.TypeMeta{ - Kind: "Namespace", - APIVersion: "v1", - }, - ObjectMeta: v1.ObjectMeta{ - Name: "example", - }, - }, - ), - Check: func(_ context.Context, _ *corev1.Namespace) (bool, error) { - return false, fmt.Errorf("condition failed") - }, - Interval: time.Nanosecond, - Timeout: time.Millisecond, Target: &corev1.Namespace{ + input: ForObjectInput[*corev1.Namespace]{ + Reader: fake.NewFakeClient( + &corev1.Namespace{ TypeMeta: v1.TypeMeta{ Kind: "Namespace", APIVersion: "v1", @@ -115,6 +142,19 @@ func TestWait(t *testing.T) { Name: "example", }, }, + ), + Check: func(_ context.Context, _ *corev1.Namespace) (bool, error) { + return false, fmt.Errorf("condition failed") + }, + Interval: time.Nanosecond, + Timeout: time.Millisecond, Target: &corev1.Namespace{ + TypeMeta: v1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "example", + }, }, }, errCheck: func(err error) bool { @@ -127,7 +167,7 @@ func TestWait(t *testing.T) { t.Run(tt.name, func(t *testing.T) { err := ForObject( context.Background(), - tt.args.input, + tt.input, ) if !tt.errCheck(err) { t.Errorf("error did not pass check: %s", err)