From bd0347b1db087bdb00a88bbbe466a0a8dd547e71 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Tue, 24 Sep 2024 07:45:42 +0200 Subject: [PATCH 1/3] Upgrade knative.dev/eventing to latest 1.14 Signed-off-by: Pierangelo Di Pilato --- go.mod | 2 +- go.sum | 4 +- .../eventing/pkg/apis/feature/features.go | 4 +- .../apis/sources/v1/container_lifecycle.go | 24 +--- .../eventing/pkg/auth/serviceaccount.go | 32 +++-- .../pkg/kncloudevents/event_dispatcher.go | 26 ++-- .../reconciler/testing/v1/containersource.go | 23 +-- .../eventing/pkg/scheduler/scheduler.go | 8 +- .../eventing/pkg/scheduler/state/helpers.go | 9 +- .../eventing/pkg/scheduler/state/state.go | 95 ++++++------- .../pkg/scheduler/statefulset/autoscaler.go | 104 +++++++------- .../pkg/scheduler/statefulset/scheduler.go | 133 +++++++++++------- .../test/rekt/features/trigger/feature.go | 3 +- .../test/rekt/resources/trigger/trigger.go | 12 +- .../test/rekt/resources/trigger/trigger.yaml | 2 +- vendor/modules.txt | 2 +- 16 files changed, 243 insertions(+), 240 deletions(-) diff --git a/go.mod b/go.mod index e42813fa74..c7b10d1d2b 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( k8s.io/apiserver v0.29.2 k8s.io/client-go v0.29.2 k8s.io/utils v0.0.0-20240102154912-e7106e64919e - knative.dev/eventing v0.41.0 + knative.dev/eventing v0.41.7-0.20240923180940-09cb6334226f knative.dev/hack v0.0.0-20240404013450-1133b37da8d7 knative.dev/pkg v0.0.0-20240416145024-0f34a8815650 knative.dev/reconciler-test v0.0.0-20240417065737-ca905cbb09a9 diff --git a/go.sum b/go.sum index ad939fc3a7..4f9507f381 100644 --- a/go.sum +++ b/go.sum @@ -1270,8 +1270,8 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -knative.dev/eventing v0.41.0 h1:e38nejJiwEpFQI5JgcaT8JRUGZsQn05h1vBWSwNkroY= -knative.dev/eventing v0.41.0/go.mod h1:/DjKZGRcZtBx8FOSvOy3mhxzm1Wem7H3aofb6kHq/68= +knative.dev/eventing v0.41.7-0.20240923180940-09cb6334226f h1:OEpj00D31CAlJTm8xfimpK1haCrDIzVP+3axlNgY4bE= +knative.dev/eventing v0.41.7-0.20240923180940-09cb6334226f/go.mod h1:JjZn90agmsqMIQOqZ4oToWRrcAIvlqn7hlharxcTv/w= knative.dev/hack v0.0.0-20240404013450-1133b37da8d7 h1:fkWYWvdHm1mVHevKW2vVJnZtxH0NzOlux8imesweKwE= knative.dev/hack v0.0.0-20240404013450-1133b37da8d7/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q= knative.dev/pkg v0.0.0-20240416145024-0f34a8815650 h1:m2ahFUO0L2VrgGDYdyOUFdE6xBd3pLXAJozLJwqLRQM= diff --git a/vendor/knative.dev/eventing/pkg/apis/feature/features.go b/vendor/knative.dev/eventing/pkg/apis/feature/features.go index 9fc57664e5..95ca615118 100644 --- a/vendor/knative.dev/eventing/pkg/apis/feature/features.go +++ b/vendor/knative.dev/eventing/pkg/apis/feature/features.go @@ -18,6 +18,7 @@ package feature import ( "fmt" + "log" "strings" corev1 "k8s.io/api/core/v1" @@ -148,7 +149,8 @@ func NewFlagsConfigFromMap(data map[string]string) (Flags, error) { } else if strings.Contains(k, NodeSelectorLabel) { flags[sanitizedKey] = Flag(v) } else { - return flags, fmt.Errorf("cannot parse the feature flag '%s' = '%s'", k, v) + flags[k] = Flag(v) + log.Printf("Warning: unknown feature flag value %q=%q\n", k, v) } } diff --git a/vendor/knative.dev/eventing/pkg/apis/sources/v1/container_lifecycle.go b/vendor/knative.dev/eventing/pkg/apis/sources/v1/container_lifecycle.go index 68a4c829c5..0f47a4e974 100644 --- a/vendor/knative.dev/eventing/pkg/apis/sources/v1/container_lifecycle.go +++ b/vendor/knative.dev/eventing/pkg/apis/sources/v1/container_lifecycle.go @@ -31,14 +31,11 @@ const ( // ContainerSourceConditionReceiveAdapterReady has status True when the ContainerSource's ReceiveAdapter is ready. ContainerSourceConditionReceiveAdapterReady apis.ConditionType = "ReceiveAdapterReady" - - ContainerConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated" ) var containerCondSet = apis.NewLivingConditionSet( ContainerSourceConditionSinkBindingReady, ContainerSourceConditionReceiveAdapterReady, - ContainerConditionOIDCIdentityCreated, ) // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. @@ -66,23 +63,7 @@ func (s *ContainerSourceStatus) InitializeConditions() { containerCondSet.Manage(s).InitializeConditions() } -func (s *ContainerSourceStatus) MarkOIDCIdentityCreatedSucceeded() { - containerCondSet.Manage(s).MarkTrue(ContainerConditionOIDCIdentityCreated) -} - -func (s *ContainerSourceStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, messageFormat string, messageA ...interface{}) { - containerCondSet.Manage(s).MarkTrueWithReason(ContainerConditionOIDCIdentityCreated, reason, messageFormat, messageA...) -} - -func (s *ContainerSourceStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) { - containerCondSet.Manage(s).MarkFalse(ContainerConditionOIDCIdentityCreated, reason, messageFormat, messageA...) -} - -func (s *ContainerSourceStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) { - containerCondSet.Manage(s).MarkUnknown(ContainerConditionOIDCIdentityCreated, reason, messageFormat, messageA...) -} - -// PropagateSinkBindingStatus uses the availability of the provided Deployment to determine if +// PropagateSinkBindingStatus uses the SinkBinding to determine if // ContainerSourceConditionSinkBindingReady should be marked as true, false or unknown. func (s *ContainerSourceStatus) PropagateSinkBindingStatus(status *SinkBindingStatus) { // Do not copy conditions nor observedGeneration @@ -105,6 +86,9 @@ func (s *ContainerSourceStatus) PropagateSinkBindingStatus(status *SinkBindingSt default: containerCondSet.Manage(s).MarkUnknown(ContainerSourceConditionSinkBindingReady, cond.Reason, cond.Message) } + + // Propagate SinkBindings AuthStatus to containersources AuthStatus + s.Auth = status.Auth } // PropagateReceiveAdapterStatus uses the availability of the provided Deployment to determine if diff --git a/vendor/knative.dev/eventing/pkg/auth/serviceaccount.go b/vendor/knative.dev/eventing/pkg/auth/serviceaccount.go index b67666ef6a..5b98d61c79 100644 --- a/vendor/knative.dev/eventing/pkg/auth/serviceaccount.go +++ b/vendor/knative.dev/eventing/pkg/auth/serviceaccount.go @@ -21,11 +21,13 @@ import ( "fmt" "strings" - "knative.dev/eventing/pkg/apis/feature" + "k8s.io/apimachinery/pkg/api/equality" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" pkgreconciler "knative.dev/pkg/reconciler" + "knative.dev/eventing/pkg/apis/feature" + "go.uber.org/zap" v1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" @@ -38,10 +40,10 @@ import ( ) const ( - //OIDCLabelKey is used to filter out all the informers that related to OIDC work - OIDCLabelKey = "oidc" + // OIDCLabelKey is used to filter out all the informers that related to OIDC work + OIDCLabelKey = "eventing.knative.dev/oidc" - // OIDCTokenRoleLabelSelector is the label selector for the OIDC token creator role and rolebinding informers + // OIDCLabelSelector is the label selector for the OIDC resources OIDCLabelSelector = OIDCLabelKey ) @@ -87,28 +89,38 @@ func EnsureOIDCServiceAccountExistsForResource(ctx context.Context, serviceAccou saName := GetOIDCServiceAccountNameForResource(gvk, objectMeta) sa, err := serviceAccountLister.ServiceAccounts(objectMeta.Namespace).Get(saName) + expected := GetOIDCServiceAccountForResource(gvk, objectMeta) + // If the resource doesn't exist, we'll create it. if apierrs.IsNotFound(err) { logging.FromContext(ctx).Debugw("Creating OIDC service account", zap.Error(err)) - expected := GetOIDCServiceAccountForResource(gvk, objectMeta) - _, err = kubeclient.CoreV1().ServiceAccounts(objectMeta.Namespace).Create(ctx, expected, metav1.CreateOptions{}) if err != nil { - return fmt.Errorf("could not create OIDC service account %s/%s for %s: %w", objectMeta.Name, objectMeta.Namespace, gvk.Kind, err) + return fmt.Errorf("could not create OIDC service account %s/%s for %s: %w", objectMeta.Namespace, objectMeta.Name, gvk.Kind, err) } return nil } - if err != nil { - return fmt.Errorf("could not get OIDC service account %s/%s for %s: %w", objectMeta.Name, objectMeta.Namespace, gvk.Kind, err) + return fmt.Errorf("could not get OIDC service account %s/%s for %s: %w", objectMeta.Namespace, objectMeta.Name, gvk.Kind, err) } - if !metav1.IsControlledBy(&sa.ObjectMeta, &objectMeta) { return fmt.Errorf("service account %s not owned by %s %s", sa.Name, gvk.Kind, objectMeta.Name) } + if !equality.Semantic.DeepDerivative(expected, sa) { + expected.ResourceVersion = sa.ResourceVersion + + _, err = kubeclient.CoreV1().ServiceAccounts(objectMeta.Namespace).Update(ctx, expected, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("could not update OIDC service account %s/%s for %s: %w", objectMeta.Namespace, objectMeta.Name, gvk.Kind, err) + } + + return nil + + } + return nil } diff --git a/vendor/knative.dev/eventing/pkg/kncloudevents/event_dispatcher.go b/vendor/knative.dev/eventing/pkg/kncloudevents/event_dispatcher.go index 54b1227670..a62cddd764 100644 --- a/vendor/knative.dev/eventing/pkg/kncloudevents/event_dispatcher.go +++ b/vendor/knative.dev/eventing/pkg/kncloudevents/event_dispatcher.go @@ -26,9 +26,10 @@ import ( "net/http" "time" + "github.com/cloudevents/sdk-go/v2/binding/buffering" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" - "github.com/cloudevents/sdk-go/v2/binding/buffering" "github.com/cloudevents/sdk-go/v2/event" cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/hashicorp/go-retryablehttp" @@ -248,7 +249,11 @@ func (d *Dispatcher) send(ctx context.Context, message binding.Message, destinat messagesToFinish = append(messagesToFinish, responseMessage) if config.eventTypeAutoHandler != nil { - d.handleAutocreate(ctx, responseMessage, config) + // messages can only be read once, so we need to make a copy of it + responseMessage, err = buffering.CopyMessage(ctx, responseMessage) + if err == nil { + d.handleAutocreate(ctx, responseMessage, config) + } } if config.reply == nil { @@ -320,11 +325,11 @@ func (d *Dispatcher) executeRequest(ctx context.Context, target duckv1.Addressab dispatchInfo.ResponseHeader = response.Header body := new(bytes.Buffer) - _, readErr := body.ReadFrom(response.Body) + _, err = body.ReadFrom(response.Body) if isFailure(response.StatusCode) { // Read response body into dispatchInfo for failures - if readErr != nil && readErr != io.EOF { + if err != nil && err != io.EOF { dispatchInfo.ResponseBody = []byte(fmt.Sprintf("dispatch resulted in status \"%s\". Could not read response body: error: %s", response.Status, err.Error())) } else { dispatchInfo.ResponseBody = body.Bytes() @@ -336,7 +341,7 @@ func (d *Dispatcher) executeRequest(ctx context.Context, target duckv1.Addressab } var responseMessageBody []byte - if readErr != nil && readErr != io.EOF { + if err != nil && err != io.EOF { responseMessageBody = []byte(fmt.Sprintf("Failed to read response body: %s", err.Error())) } else { responseMessageBody = body.Bytes() @@ -354,15 +359,8 @@ func (d *Dispatcher) executeRequest(ctx context.Context, target duckv1.Addressab return ctx, responseMessage, &dispatchInfo, nil } -func (d *Dispatcher) handleAutocreate(ctx context.Context, responseMessage binding.Message, config *senderConfig) { - // messages can only be read once, so we need to make a copy of it - messageCopy, err := buffering.CopyMessage(ctx, responseMessage) - if err != nil { - return - } - defer responseMessage.Finish(nil) - - responseEvent, err := binding.ToEvent(ctx, messageCopy) +func (d *Dispatcher) handleAutocreate(ctx context.Context, msg binding.Message, config *senderConfig) { + responseEvent, err := binding.ToEvent(ctx, msg) if err != nil { return } diff --git a/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/containersource.go b/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/containersource.go index d02a791750..99729da170 100644 --- a/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/containersource.go +++ b/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/containersource.go @@ -18,12 +18,11 @@ package testing import ( "context" - "fmt" - "knative.dev/eventing/pkg/apis/feature" - v1 "knative.dev/eventing/pkg/apis/sources/v1" duckv1 "knative.dev/pkg/apis/duck/v1" + v1 "knative.dev/eventing/pkg/apis/sources/v1" + appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -96,24 +95,6 @@ func WithContainerUnobservedGeneration() ContainerSourceOption { } } -func WithContainerSourceOIDCIdentityCreatedSucceeded() ContainerSourceOption { - return func(c *v1.ContainerSource) { - c.Status.MarkOIDCIdentityCreatedSucceeded() - } -} - -func WithContainerSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled() ContainerSourceOption { - return func(c *v1.ContainerSource) { - c.Status.MarkOIDCIdentityCreatedSucceededWithReason(fmt.Sprintf("%s feature disabled", feature.OIDCAuthentication), "") - } -} - -func WithContainerSourceOIDCIdentityCreatedFailed(reason, message string) ContainerSourceOption { - return func(c *v1.ContainerSource) { - c.Status.MarkOIDCIdentityCreatedFailed(reason, message) - } -} - func WithContainerSourceOIDCServiceAccountName(name string) ContainerSourceOption { return func(c *v1.ContainerSource) { if c.Status.Auth == nil { diff --git a/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go index bc2f043db1..c67ea2b99a 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/scheduler.go @@ -92,18 +92,18 @@ type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) erro // Scheduler is responsible for placing VPods into real Kubernetes pods type Scheduler interface { // Schedule computes the new set of placements for vpod. - Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) + Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) } // SchedulerFunc type is an adapter to allow the use of // ordinary functions as Schedulers. If f is a function // with the appropriate signature, SchedulerFunc(f) is a // Scheduler that calls f. -type SchedulerFunc func(vpod VPod) ([]duckv1alpha1.Placement, error) +type SchedulerFunc func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) // Schedule implements the Scheduler interface. -func (f SchedulerFunc) Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) { - return f(vpod) +func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) { + return f(ctx, vpod) } // VPod represents virtual replicas placed into real Kubernetes pods diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go b/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go index 3f8670e7e7..475d2974cd 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "knative.dev/eventing/pkg/scheduler" ) @@ -55,10 +56,10 @@ func SatisfyZoneAvailability(feasiblePods []int32, states *State) bool { var zoneName string var err error for _, podID := range feasiblePods { - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID)) - return err == nil, nil - }) + zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID)) + if err != nil { + continue + } zoneMap[zoneName] = struct{}{} } return len(zoneMap) == int(states.NumZones) diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go index 04794805a3..6cf06e93e1 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go @@ -22,7 +22,6 @@ import ( "errors" "math" "strconv" - "time" "go.uber.org/zap" v1 "k8s.io/api/core/v1" @@ -30,9 +29,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" corev1 "k8s.io/client-go/listers/core/v1" - "knative.dev/pkg/logging" "knative.dev/eventing/pkg/scheduler" @@ -42,7 +39,7 @@ type StateAccessor interface { // State returns the current state (snapshot) about placed vpods // Take into account reserved vreplicas and update `reserved` to reflect // the current state. - State(reserved map[types.NamespacedName]map[string]int32) (*State, error) + State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) } // state provides information about the current scheduling of all vpods @@ -152,8 +149,6 @@ func (s *State) IsSchedulablePod(ordinal int32) bool { // stateBuilder reconstruct the state from scratch, by listing vpods type stateBuilder struct { - ctx context.Context - logger *zap.SugaredLogger vpodLister scheduler.VPodLister capacity int32 schedulerPolicy scheduler.SchedulerPolicyType @@ -166,11 +161,9 @@ type stateBuilder struct { } // NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested -func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor { +func NewStateBuilder(sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor { return &stateBuilder{ - ctx: ctx, - logger: logging.FromContext(ctx), vpodLister: lister, capacity: podCapacity, schedulerPolicy: schedulerPolicy, @@ -183,15 +176,18 @@ func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister sche } } -func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) (*State, error) { +func (s *stateBuilder) State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) { vpods, err := s.vpodLister() if err != nil { return nil, err } - scale, err := s.statefulSetCache.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{}) + logger := logging.FromContext(ctx).With("subcomponent", "statebuilder") + ctx = logging.WithLogger(ctx, logger) + + scale, err := s.statefulSetCache.GetScale(ctx, s.statefulSetName, metav1.GetOptions{}) if err != nil { - s.logger.Infow("failed to get statefulset", zap.Error(err)) + logger.Infow("failed to get statefulset", zap.Error(err)) return nil, err } @@ -235,36 +231,35 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } for podId := int32(0); podId < scale.Spec.Replicas && s.podLister != nil; podId++ { - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId)) - return err == nil, nil - }) - - if pod != nil { - if isPodUnschedulable(pod) { - // Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod. - continue - } - - node, err := s.nodeLister.Get(pod.Spec.NodeName) - if err != nil { - return nil, err - } + pod, err := s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId)) + if err != nil { + logger.Warnw("Failed to get pod", zap.Int32("ordinal", podId), zap.Error(err)) + continue + } + if isPodUnschedulable(pod) { + // Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod. + logger.Debugw("Pod is unschedulable", zap.Any("pod", pod)) + continue + } - if isNodeUnschedulable(node) { - // Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node. - continue - } + node, err := s.nodeLister.Get(pod.Spec.NodeName) + if err != nil { + return nil, err + } - // Pod has no annotation or not annotated as unschedulable and - // not on an unschedulable node, so add to feasible - schedulablePods.Insert(podId) + if isNodeUnschedulable(node) { + // Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node. + logger.Debugw("Pod is on an unschedulable node", zap.Any("pod", node)) + continue } + + // Pod has no annotation or not annotated as unschedulable and + // not on an unschedulable node, so add to feasible + schedulablePods.Insert(podId) } for _, p := range schedulablePods.List() { - free, last = s.updateFreeCapacity(free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) + free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) } // Getting current state from existing placements for all vpods @@ -286,15 +281,14 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) // Account for reserved vreplicas vreplicas = withReserved(vpod.GetKey(), podName, vreplicas, reserved) - free, last = s.updateFreeCapacity(free, last, podName, vreplicas) + free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas) withPlacement[vpod.GetKey()][podName] = true - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(podName) - return err == nil, nil - }) + pod, err := s.podLister.Get(podName) + if err != nil { + logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err)) + } if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) { nodeName := pod.Spec.NodeName //node name for this pod @@ -315,11 +309,10 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) continue } - var pod *v1.Pod - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - pod, err = s.podLister.Get(podName) - return err == nil, nil - }) + pod, err := s.podLister.Get(podName) + if err != nil { + logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err)) + } if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) { nodeName := pod.Spec.NodeName //node name for this pod @@ -330,7 +323,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) } } - free, last = s.updateFreeCapacity(free, last, podName, rvreplicas) + free, last = s.updateFreeCapacity(logger, free, last, podName, rvreplicas) } } @@ -338,7 +331,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister, PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod} - s.logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved))) + logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved))) return state, nil } @@ -350,7 +343,7 @@ func pendingFromVPod(vpod scheduler.VPod) int32 { return int32(math.Max(float64(0), float64(expected-scheduled))) } -func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { +func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { ordinal := OrdinalFromPodName(podName) free = grow(free, ordinal, s.capacity) @@ -359,7 +352,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri // Assert the pod is not overcommitted if free[ordinal] < 0 { // This should not happen anymore. Log as an error but do not interrupt the current scheduling. - s.logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) + logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) } if ordinal > last { diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go index fe15aff3a5..19997d173a 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go @@ -18,6 +18,7 @@ package statefulset import ( "context" + "fmt" "math" "sync" "sync/atomic" @@ -27,10 +28,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "knative.dev/pkg/reconciler" - "knative.dev/pkg/logging" + "knative.dev/pkg/reconciler" "knative.dev/eventing/pkg/scheduler" st "knative.dev/eventing/pkg/scheduler/state" @@ -58,9 +57,8 @@ type autoscaler struct { statefulSetCache *scheduler.ScaleCache statefulSetName string vpodLister scheduler.VPodLister - logger *zap.SugaredLogger stateAccessor st.StateAccessor - trigger chan struct{} + trigger chan context.Context evictor scheduler.Evictor // capacity is the total number of virtual replicas available per pod. @@ -68,7 +66,9 @@ type autoscaler struct { // refreshPeriod is how often the autoscaler tries to scale down the statefulset refreshPeriod time.Duration - lock sync.Locker + // retryPeriod is how often the autoscaler retry failed autoscale operations + retryPeriod time.Duration + lock sync.Locker // isLeader signals whether a given autoscaler instance is leader or not. // The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a @@ -104,17 +104,17 @@ func (a *autoscaler) Demote(b reconciler.Bucket) { } } -func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor, statefulSetCache *scheduler.ScaleCache) *autoscaler { - return &autoscaler{ - logger: logging.FromContext(ctx).With(zap.String("component", "autoscaler")), +func newAutoscaler(cfg *Config, stateAccessor st.StateAccessor, statefulSetCache *scheduler.ScaleCache) *autoscaler { + a := &autoscaler{ statefulSetCache: statefulSetCache, statefulSetName: cfg.StatefulSetName, vpodLister: cfg.VPodLister, stateAccessor: stateAccessor, evictor: cfg.Evictor, - trigger: make(chan struct{}, 1), + trigger: make(chan context.Context, 1), capacity: cfg.PodCapacity, refreshPeriod: cfg.RefreshPeriod, + retryPeriod: cfg.RetryPeriod, lock: new(sync.Mutex), isLeader: atomic.Bool{}, getReserved: cfg.getReserved, @@ -124,25 +124,38 @@ func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAcces Add(-cfg.RefreshPeriod). Add(-time.Minute), } + + if a.retryPeriod == 0 { + a.retryPeriod = time.Second + } + + return a } func (a *autoscaler) Start(ctx context.Context) { attemptScaleDown := false for { + autoscaleCtx := ctx select { case <-ctx.Done(): return case <-time.After(a.refreshPeriod): - a.logger.Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load())) + logging.FromContext(ctx).Infow("Triggering scale down", zap.Bool("isLeader", a.isLeader.Load())) attemptScaleDown = true - case <-a.trigger: - a.logger.Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load())) + case autoscaleCtx = <-a.trigger: + logging.FromContext(autoscaleCtx).Infow("Triggering scale up", zap.Bool("isLeader", a.isLeader.Load())) attemptScaleDown = false } // Retry a few times, just so that we don't have to wait for the next beat when // a transient error occurs - a.syncAutoscale(ctx, attemptScaleDown) + if err := a.syncAutoscale(autoscaleCtx, attemptScaleDown); err != nil { + logging.FromContext(autoscaleCtx).Errorw("Failed to sync autoscale", zap.Error(err)) + go func() { + time.Sleep(a.retryPeriod) + a.Autoscale(ctx) // Use top-level context for background retries + }() + } } } @@ -150,10 +163,10 @@ func (a *autoscaler) Autoscale(ctx context.Context) { select { // We trigger the autoscaler asynchronously by using the channel so that the scale down refresh // period is reset. - case a.trigger <- struct{}{}: + case a.trigger <- ctx: default: // We don't want to block if the channel's buffer is full, it will be triggered eventually. - + logging.FromContext(ctx).Debugw("Skipping autoscale since autoscale is in progress") } } @@ -161,36 +174,34 @@ func (a *autoscaler) syncAutoscale(ctx context.Context, attemptScaleDown bool) e a.lock.Lock() defer a.lock.Unlock() - var lastErr error - wait.Poll(500*time.Millisecond, 5*time.Second, func() (bool, error) { - err := a.doautoscale(ctx, attemptScaleDown) - if err != nil { - logging.FromContext(ctx).Errorw("Failed to autoscale", zap.Error(err)) - } - lastErr = err - return err == nil, nil - }) - return lastErr + if err := a.doautoscale(ctx, attemptScaleDown); err != nil { + return fmt.Errorf("failed to do autoscale: %w", err) + } + return nil } func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) error { if !a.isLeader.Load() { return nil } - state, err := a.stateAccessor.State(a.getReserved()) + + logger := logging.FromContext(ctx).With("component", "autoscaler") + ctx = logging.WithLogger(ctx, logger) + + state, err := a.stateAccessor.State(ctx, a.getReserved()) if err != nil { - a.logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) + logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return err } scale, err := a.statefulSetCache.GetScale(ctx, a.statefulSetName, metav1.GetOptions{}) if err != nil { // skip a beat - a.logger.Infow("failed to get scale subresource", zap.Error(err)) + logger.Infow("failed to get scale subresource", zap.Error(err)) return err } - a.logger.Debugw("checking adapter capacity", + logger.Debugw("checking adapter capacity", zap.Int32("replicas", scale.Spec.Replicas), zap.Any("state", state)) @@ -234,43 +245,43 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err if newreplicas != scale.Spec.Replicas { scale.Spec.Replicas = newreplicas - a.logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas)) + logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas)) _, err = a.statefulSetCache.UpdateScale(ctx, a.statefulSetName, scale, metav1.UpdateOptions{}) if err != nil { - a.logger.Errorw("updating scale subresource failed", zap.Error(err)) + logger.Errorw("updating scale subresource failed", zap.Error(err)) return err } } else if attemptScaleDown { // since the number of replicas hasn't changed and time has approached to scale down, // take the opportunity to compact the vreplicas - a.mayCompact(state, scaleUpFactor) + return a.mayCompact(logger, state, scaleUpFactor) } return nil } -func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { +func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State, scaleUpFactor int32) error { // This avoids a too aggressive scale down by adding a "grace period" based on the refresh // period nextAttempt := a.lastCompactAttempt.Add(a.refreshPeriod) if time.Now().Before(nextAttempt) { - a.logger.Debugw("Compact was retried before refresh period", + logger.Debugw("Compact was retried before refresh period", zap.Time("lastCompactAttempt", a.lastCompactAttempt), zap.Time("nextAttempt", nextAttempt), zap.String("refreshPeriod", a.refreshPeriod.String()), ) - return + return nil } - a.logger.Debugw("Trying to compact and scale down", + logger.Debugw("Trying to compact and scale down", zap.Int32("scaleUpFactor", scaleUpFactor), zap.Any("state", s), ) // when there is only one pod there is nothing to move or number of pods is just enough! if s.LastOrdinal < 1 || len(s.SchedulablePods) <= int(scaleUpFactor) { - return + return nil } if s.SchedulerPolicy == scheduler.MAXFILLUP { @@ -283,7 +294,7 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { - a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) + return fmt.Errorf("vreplicas compaction failed (scaleUpFactor %d): %w", scaleUpFactor, err) } } @@ -303,10 +314,11 @@ func (a *autoscaler) mayCompact(s *st.State, scaleUpFactor int32) { a.lastCompactAttempt = time.Now() err := a.compact(s, scaleUpFactor) if err != nil { - a.logger.Errorw("vreplicas compaction failed", zap.Error(err)) + return fmt.Errorf("vreplicas compaction failed (scaleUpFactor %d): %w", scaleUpFactor, err) } } } + return nil } func (a *autoscaler) compact(s *st.State, scaleUpFactor int32) error { @@ -323,16 +335,14 @@ func (a *autoscaler) compact(s *st.State, scaleUpFactor int32) error { ordinal := st.OrdinalFromPodName(placements[i].PodName) if ordinal == s.LastOrdinal-j { - wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - if s.PodLister != nil { - pod, err = s.PodLister.Get(placements[i].PodName) - } - return err == nil, nil - }) + pod, err = s.PodLister.Get(placements[i].PodName) + if err != nil { + return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err) + } err = a.evictor(pod, vpod, &placements[i]) if err != nil { - return err + return fmt.Errorf("failed to evict pod %s: %w", pod.Name, err) } } } diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go index 62235e474c..613410ec7a 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go @@ -28,18 +28,16 @@ import ( "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/utils/integer" + "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" kubeclient "knative.dev/pkg/client/injection/kube/client" - statefulsetinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset" "knative.dev/pkg/controller" - "knative.dev/pkg/logging" - - podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/eventing/pkg/scheduler" @@ -69,6 +67,8 @@ type Config struct { PodCapacity int32 `json:"podCapacity"` // Autoscaler refresh period RefreshPeriod time.Duration `json:"refreshPeriod"` + // Autoscaler retry period + RetryPeriod time.Duration `json:"retryPeriod"` SchedulerPolicy scheduler.SchedulerPolicyType `json:"schedulerPolicy"` SchedPolicy *scheduler.SchedulerPolicy `json:"schedPolicy"` @@ -78,6 +78,8 @@ type Config struct { VPodLister scheduler.VPodLister `json:"-"` NodeLister corev1listers.NodeLister `json:"-"` + // Pod lister for statefulset: StatefulSetNamespace / StatefulSetName + PodLister corev1listers.PodNamespaceLister `json:"-"` // getReserved returns reserved replicas getReserved GetReserved @@ -85,19 +87,20 @@ type Config struct { func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { - podInformer := podinformer.Get(ctx) - podLister := podInformer.Lister().Pods(cfg.StatefulSetNamespace) + if cfg.PodLister == nil { + return nil, fmt.Errorf("Config.PodLister is required") + } scaleCache := scheduler.NewScaleCache(ctx, cfg.StatefulSetNamespace, kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), cfg.ScaleCacheConfig) - stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister, scaleCache) + stateAccessor := st.NewStateBuilder(cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, cfg.PodLister, cfg.NodeLister, scaleCache) var getReserved GetReserved cfg.getReserved = func() map[types.NamespacedName]map[string]int32 { return getReserved() } - autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache) + autoscaler := newAutoscaler(cfg, stateAccessor, scaleCache) var wg sync.WaitGroup wg.Add(1) @@ -106,7 +109,7 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) { autoscaler.Start(ctx) }() - s := newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler, podLister) + s := newStatefulSetScheduler(ctx, cfg, stateAccessor, autoscaler) getReserved = s.Reserved wg.Done() @@ -125,12 +128,9 @@ func (p Pending) Total() int32 { // StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods type StatefulSetScheduler struct { - ctx context.Context - logger *zap.SugaredLogger statefulSetName string statefulSetNamespace string statefulSetClient clientappsv1.StatefulSetInterface - podLister corev1listers.PodNamespaceLister vpodLister scheduler.VPodLister lock sync.Locker stateAccessor st.StateAccessor @@ -168,16 +168,12 @@ func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) { func newStatefulSetScheduler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor, - autoscaler Autoscaler, - podlister corev1listers.PodNamespaceLister) *StatefulSetScheduler { + autoscaler Autoscaler) *StatefulSetScheduler { scheduler := &StatefulSetScheduler{ - ctx: ctx, - logger: logging.FromContext(ctx), statefulSetNamespace: cfg.StatefulSetNamespace, statefulSetName: cfg.StatefulSetName, statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), - podLister: podlister, vpodLister: cfg.VPodLister, lock: new(sync.Mutex), stateAccessor: stateAccessor, @@ -186,22 +182,38 @@ func newStatefulSetScheduler(ctx context.Context, } // Monitor our statefulset - statefulsetInformer := statefulsetinformer.Get(ctx) - statefulsetInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName), - Handler: controller.HandleAll(scheduler.updateStatefulset), - }) + c := kubeclient.Get(ctx) + sif := informers.NewSharedInformerFactoryWithOptions(c, + controller.GetResyncPeriod(ctx), + informers.WithNamespace(cfg.StatefulSetNamespace), + ) + + sif.Apps().V1().StatefulSets().Informer(). + AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterWithNameAndNamespace(cfg.StatefulSetNamespace, cfg.StatefulSetName), + Handler: controller.HandleAll(func(i interface{}) { + scheduler.updateStatefulset(ctx, i) + }), + }) + + sif.Start(ctx.Done()) + _ = sif.WaitForCacheSync(ctx.Done()) + + go func() { + <-ctx.Done() + sif.Shutdown() + }() return scheduler } -func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { +func (s *StatefulSetScheduler) Schedule(ctx context.Context, vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { s.lock.Lock() defer s.lock.Unlock() s.reservedMu.Lock() defer s.reservedMu.Unlock() - placements, err := s.scheduleVPod(vpod) + placements, err := s.scheduleVPod(ctx, vpod) if placements == nil { return placements, err } @@ -216,11 +228,13 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla return placements, err } -func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { - logger := s.logger.With("key", vpod.GetKey(), zap.String("component", "scheduler")) +func (s *StatefulSetScheduler) scheduleVPod(ctx context.Context, vpod scheduler.VPod) ([]duckv1alpha1.Placement, error) { + logger := logging.FromContext(ctx).With("key", vpod.GetKey(), zap.String("component", "scheduler")) + ctx = logging.WithLogger(ctx, logger) + // Get the current placements state // Quite an expensive operation but safe and simple. - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Debug("error while refreshing scheduler state (will retry)", zap.Error(err)) return nil, err @@ -258,13 +272,15 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 } // Handle overcommitted pods. - if state.FreeCap[ordinal] < 0 { + if state.Free(ordinal) < 0 { // vr > free => vr: 9, overcommit 4 -> free: 0, vr: 5, pending: +4 // vr = free => vr: 4, overcommit 4 -> free: 0, vr: 0, pending: +4 // vr < free => vr: 3, overcommit 4 -> free: -1, vr: 0, pending: +3 overcommit := -state.FreeCap[ordinal] + logger.Debugw("overcommit", zap.Any("overcommit", overcommit), zap.Any("placement", p)) + if p.VReplicas >= overcommit { state.SetFree(ordinal, 0) state.Pending[vpod.GetKey()] += overcommit @@ -301,7 +317,9 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 if state.SchedulerPolicy != "" { // Need less => scale down if tr > vpod.GetVReplicas() { - logger.Debugw("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + logger.Debugw("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) placements = s.removeReplicas(tr-vpod.GetVReplicas(), placements) @@ -311,15 +329,19 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 } // Need more => scale up - logger.Debugw("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) + logger.Debugw("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) placements, left = s.addReplicas(state, vpod.GetVReplicas()-tr, placements) } else { //Predicates and priorities must be used for scheduling // Need less => scale down if tr > vpod.GetVReplicas() && state.DeschedPolicy != nil { - logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - placements = s.removeReplicasWithPolicy(vpod, tr-vpod.GetVReplicas(), placements) + logger.Infow("scaling down", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) + placements = s.removeReplicasWithPolicy(ctx, vpod, tr-vpod.GetVReplicas(), placements) // Do not trigger the autoscaler to avoid unnecessary churn @@ -331,8 +353,10 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 // Need more => scale up // rebalancing needed for all vreps most likely since there are pending vreps from previous reconciliation // can fall here when vreps scaled up or after eviction - logger.Infow("scaling up with a rebalance (if needed)", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas())) - placements, left = s.rebalanceReplicasWithPolicy(vpod, vpod.GetVReplicas(), placements) + logger.Infow("scaling up with a rebalance (if needed)", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()), + zap.Any("placements", placements), + zap.Any("existingPlacements", existingPlacements)) + placements, left = s.rebalanceReplicasWithPolicy(ctx, vpod, vpod.GetVReplicas(), placements) } } @@ -343,10 +367,10 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 // Trigger the autoscaler if s.autoscaler != nil { logger.Infow("Awaiting autoscaler", zap.Any("placement", placements), zap.Int32("left", left)) - s.autoscaler.Autoscale(s.ctx) + s.autoscaler.Autoscale(ctx) } - if state.SchedPolicy != nil { + if state.SchedulerPolicy == "" && state.SchedPolicy != nil { logger.Info("reverting to previous placements") s.reservePlacements(vpod, existingPlacements) // rebalancing doesn't care about new placements since all vreps will be re-placed return existingPlacements, s.notEnoughPodReplicas(left) // requeue to wait for the autoscaler to do its job @@ -368,25 +392,25 @@ func toJSONable(pending map[types.NamespacedName]int32) map[string]int32 { return r } -func (s *StatefulSetScheduler) rebalanceReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { +func (s *StatefulSetScheduler) rebalanceReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { s.makeZeroPlacements(vpod, placements) - placements, diff = s.addReplicasWithPolicy(vpod, diff, make([]duckv1alpha1.Placement, 0)) //start fresh with a new placements list + placements, diff = s.addReplicasWithPolicy(ctx, vpod, diff, make([]duckv1alpha1.Placement, 0)) //start fresh with a new placements list return placements, diff } -func (s *StatefulSetScheduler) removeReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { - logger := s.logger.Named("remove replicas with policy") +func (s *StatefulSetScheduler) removeReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) []duckv1alpha1.Placement { + logger := logging.FromContext(ctx).Named("remove replicas with policy") numVreps := diff for i := int32(0); i < numVreps; i++ { //deschedule one vreplica at a time - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return placements } - feasiblePods := s.findFeasiblePods(s.ctx, state, vpod, state.DeschedPolicy) + feasiblePods := s.findFeasiblePods(ctx, state, vpod, state.DeschedPolicy) feasiblePods = s.removePodsNotInPlacement(vpod, feasiblePods) if len(feasiblePods) == 1 { //nothing to score, remove vrep from that pod placementPodID := feasiblePods[0] @@ -397,7 +421,7 @@ func (s *StatefulSetScheduler) removeReplicasWithPolicy(vpod scheduler.VPod, dif continue } - priorityList, err := s.prioritizePods(s.ctx, state, vpod, feasiblePods, state.DeschedPolicy) + priorityList, err := s.prioritizePods(ctx, state, vpod, feasiblePods, state.DeschedPolicy) if err != nil { logger.Info("error while scoring pods using priorities", zap.Error(err)) s.reservePlacements(vpod, placements) @@ -443,13 +467,13 @@ func (s *StatefulSetScheduler) removeSelectionFromPlacements(placementPodID int3 return newPlacements } -func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { - logger := s.logger.Named("add replicas with policy") +func (s *StatefulSetScheduler) addReplicasWithPolicy(ctx context.Context, vpod scheduler.VPod, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) { + logger := logging.FromContext(ctx).Named("add replicas with policy") numVreps := diff for i := int32(0); i < numVreps; i++ { //schedule one vreplica at a time (find most suitable pod placement satisying predicates with high score) // Get the current placements state - state, err := s.stateAccessor.State(s.reserved) + state, err := s.stateAccessor.State(ctx, s.reserved) if err != nil { logger.Info("error while refreshing scheduler state (will retry)", zap.Error(err)) return placements, diff @@ -462,7 +486,7 @@ func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff i break //end the iteration for all vreps since there are not pods } - feasiblePods := s.findFeasiblePods(s.ctx, state, vpod, state.SchedPolicy) + feasiblePods := s.findFeasiblePods(ctx, state, vpod, state.SchedPolicy) if len(feasiblePods) == 0 { //no pods available to schedule this vreplica logger.Info("no feasible pods available to schedule this vreplica") s.reservePlacements(vpod, placements) @@ -480,7 +504,7 @@ func (s *StatefulSetScheduler) addReplicasWithPolicy(vpod scheduler.VPod, diff i continue } */ - priorityList, err := s.prioritizePods(s.ctx, state, vpod, feasiblePods, state.SchedPolicy) + priorityList, err := s.prioritizePods(ctx, state, vpod, feasiblePods, state.SchedPolicy) if err != nil { logger.Info("error while scoring pods using priorities", zap.Error(err)) s.reservePlacements(vpod, placements) @@ -555,7 +579,7 @@ func (s *StatefulSetScheduler) removePodsNotInPlacement(vpod scheduler.VPod, fea // prioritizePods prioritizes the pods by running the score plugins, which return a score for each pod. // The scores from each plugin are added together to make the score for that pod. func (s *StatefulSetScheduler) prioritizePods(ctx context.Context, states *st.State, vpod scheduler.VPod, feasiblePods []int32, policy *scheduler.SchedulerPolicy) (st.PodScoreList, error) { - logger := s.logger.Named("prioritize all feasible pods") + logger := logging.FromContext(ctx).Named("prioritize all feasible pods") // If no priority configs are provided, then all pods will have a score of one result := make(st.PodScoreList, 0, len(feasiblePods)) @@ -618,7 +642,7 @@ func (s *StatefulSetScheduler) selectPod(podScoreList st.PodScoreList) (int32, e // If any of these plugins doesn't return "Success", the pod is not suitable for placing the vrep. // Meanwhile, the failure message and status are set for the given pod. func (s *StatefulSetScheduler) RunFilterPlugins(ctx context.Context, states *st.State, vpod scheduler.VPod, podID int32, policy *scheduler.SchedulerPolicy) st.PluginToStatus { - logger := s.logger.Named("run all filter plugins") + logger := logging.FromContext(ctx).Named("run all filter plugins") statuses := make(st.PluginToStatus) for _, plugin := range policy.Predicates { @@ -651,7 +675,7 @@ func (s *StatefulSetScheduler) runFilterPlugin(ctx context.Context, pl st.Filter // RunScorePlugins runs the set of configured scoring plugins. It returns a list that stores for each scoring plugin name the corresponding PodScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns a non-success status. func (s *StatefulSetScheduler) RunScorePlugins(ctx context.Context, states *st.State, vpod scheduler.VPod, feasiblePods []int32, policy *scheduler.SchedulerPolicy) (st.PluginToPodScores, *st.Status) { - logger := s.logger.Named("run all score plugins") + logger := logging.FromContext(ctx).Named("run all score plugins") pluginToPodScores := make(st.PluginToPodScores, len(policy.Priorities)) for _, plugin := range policy.Priorities { @@ -764,10 +788,11 @@ func (s *StatefulSetScheduler) addReplicas(states *st.State, diff int32, placeme return newPlacements, diff } -func (s *StatefulSetScheduler) updateStatefulset(obj interface{}) { +func (s *StatefulSetScheduler) updateStatefulset(ctx context.Context, obj interface{}) { statefulset, ok := obj.(*appsv1.StatefulSet) if !ok { - s.logger.Fatalw("expected a Statefulset object", zap.Any("object", obj)) + logging.FromContext(ctx).Warnw("expected a Statefulset object", zap.Any("object", obj)) + return } s.lock.Lock() @@ -777,7 +802,7 @@ func (s *StatefulSetScheduler) updateStatefulset(obj interface{}) { s.replicas = 1 } else if s.replicas != *statefulset.Spec.Replicas { s.replicas = *statefulset.Spec.Replicas - s.logger.Infow("statefulset replicas updated", zap.Int32("replicas", s.replicas)) + logging.FromContext(ctx).Infow("statefulset replicas updated", zap.Int32("replicas", s.replicas)) } } diff --git a/vendor/knative.dev/eventing/test/rekt/features/trigger/feature.go b/vendor/knative.dev/eventing/test/rekt/features/trigger/feature.go index 4896fd1c7d..219e48535c 100644 --- a/vendor/knative.dev/eventing/test/rekt/features/trigger/feature.go +++ b/vendor/knative.dev/eventing/test/rekt/features/trigger/feature.go @@ -69,7 +69,8 @@ func TriggerDependencyAnnotation() *feature.Feature { // Install the trigger f.Setup("install trigger", trigger.Install(triggerName, brokerName, cfg...)) - f.Setup("trigger goes ready", trigger.IsReady(triggerName)) + // trigger won't go ready until after the pingsource exists, because of the dependency annotation + f.Requirement("trigger goes ready", trigger.IsReady(triggerName)) f.Requirement("install pingsource", func(ctx context.Context, t feature.T) { brokeruri, err := broker.Address(ctx, brokerName) diff --git a/vendor/knative.dev/eventing/test/rekt/resources/trigger/trigger.go b/vendor/knative.dev/eventing/test/rekt/resources/trigger/trigger.go index dc59d58ba6..e6ea82a71f 100644 --- a/vendor/knative.dev/eventing/test/rekt/resources/trigger/trigger.go +++ b/vendor/knative.dev/eventing/test/rekt/resources/trigger/trigger.go @@ -124,18 +124,14 @@ func WithSubscriberFromDestination(dest *duckv1.Destination) manifest.CfgFn { // WithAnnotations adds annotations to the trigger func WithAnnotations(annotations map[string]interface{}) manifest.CfgFn { return func(cfg map[string]interface{}) { - if _, set := cfg["ceOverrides"]; !set { - cfg["ceOverrides"] = map[string]interface{}{} + if _, set := cfg["annotations"]; !set { + cfg["annotations"] = map[string]string{} } - ceOverrides := cfg["ceOverrides"].(map[string]interface{}) if annotations != nil { - if _, set := ceOverrides["annotations"]; !set { - ceOverrides["annotations"] = map[string]interface{}{} - } - ceExt := ceOverrides["annotations"].(map[string]interface{}) + annotation := cfg["annotations"].(map[string]string) for k, v := range annotations { - ceExt[k] = v + annotation[k] = v.(string) } } } diff --git a/vendor/knative.dev/eventing/test/rekt/resources/trigger/trigger.yaml b/vendor/knative.dev/eventing/test/rekt/resources/trigger/trigger.yaml index 4521988db4..77d02ad190 100644 --- a/vendor/knative.dev/eventing/test/rekt/resources/trigger/trigger.yaml +++ b/vendor/knative.dev/eventing/test/rekt/resources/trigger/trigger.yaml @@ -20,7 +20,7 @@ metadata: {{ if .annotations }} annotations: {{ range $key, $value := .annotations }} - {{ $key }}: {{ $value }} + {{ $key }}: '{{ $value }}' {{ end }} {{ end }} spec: diff --git a/vendor/modules.txt b/vendor/modules.txt index 67a11824cf..13179238b3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1320,7 +1320,7 @@ k8s.io/utils/pointer k8s.io/utils/ptr k8s.io/utils/strings/slices k8s.io/utils/trace -# knative.dev/eventing v0.41.0 +# knative.dev/eventing v0.41.7-0.20240923180940-09cb6334226f ## explicit; go 1.21 knative.dev/eventing/cmd/event_display knative.dev/eventing/cmd/heartbeats From 6885cc1755bb65034d0fe78b92f2a27e5a4d74af Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Mon, 23 Sep 2024 17:45:49 +0200 Subject: [PATCH 2/3] Pass PodLister as expected Signed-off-by: Pierangelo Di Pilato --- .../reconciler/consumergroup/consumergroup.go | 2 +- .../pkg/reconciler/consumergroup/controller.go | 17 ++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index becf3220f5..613ff09cb0 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -446,7 +446,7 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr return cg.MarkScheduleConsumerFailed("Schedule", err) } - placements, err := statefulSetScheduler.Schedule(cg) + placements, err := statefulSetScheduler.Schedule(ctx, cg) if err != nil { return cg.MarkScheduleConsumerFailed("Schedule", err) } diff --git a/control-plane/pkg/reconciler/consumergroup/controller.go b/control-plane/pkg/reconciler/consumergroup/controller.go index 910eae7002..e01a12adf6 100644 --- a/control-plane/pkg/reconciler/consumergroup/controller.go +++ b/control-plane/pkg/reconciler/consumergroup/controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/storage/names" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing" @@ -106,6 +107,9 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I logger.Panicf("unable to process required environment variables: %v", err) } + dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr) + dispatcherPodLister := dispatcherPodInformer.Lister() + c := SchedulerConfig{ RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second, Capacity: env.PodCapacity, @@ -114,13 +118,10 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I } schedulers := map[string]Scheduler{ - KafkaSourceScheduler: createKafkaScheduler(ctx, c, kafkainternals.SourceStatefulSetName), - KafkaTriggerScheduler: createKafkaScheduler(ctx, c, kafkainternals.BrokerStatefulSetName), - //KafkaChannelScheduler: createKafkaScheduler(ctx, c, kafkainternals.ChannelStatefulSetName), //To be added with channel/v2 reconciler version only + KafkaSourceScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.SourceStatefulSetName), + KafkaTriggerScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.BrokerStatefulSetName), } - dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr) - r := &Reconciler{ SchedulerFunc: func(s string) (Scheduler, bool) { sched, ok := schedulers[strings.ToLower(s)]; return sched, ok }, ConsumerLister: consumer.Get(ctx).Lister(), @@ -299,10 +300,11 @@ func enqueueConsumerGroupFromConsumer(enqueue func(name types.NamespacedName)) f } } -func createKafkaScheduler(ctx context.Context, c SchedulerConfig, ssName string) Scheduler { +func createKafkaScheduler(ctx context.Context, c SchedulerConfig, podLister corelisters.PodLister, ssName string) Scheduler { lister := consumergroup.Get(ctx).Lister() return createStatefulSetScheduler( ctx, + podLister, SchedulerConfig{ StatefulSetName: ssName, RefreshPeriod: c.RefreshPeriod, @@ -344,7 +346,7 @@ func getSelectorLabel(ssName string) map[string]string { return selectorLabel } -func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister scheduler.VPodLister) Scheduler { +func createStatefulSetScheduler(ctx context.Context, podLister corelisters.PodLister, c SchedulerConfig, lister scheduler.VPodLister) Scheduler { ss, _ := statefulsetscheduler.New(ctx, &statefulsetscheduler.Config{ StatefulSetNamespace: system.Namespace(), StatefulSetName: c.StatefulSetName, @@ -357,6 +359,7 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s Evictor: newEvictor(ctx, zap.String("kafka.eventing.knative.dev/component", "evictor")).evict, VPodLister: lister, NodeLister: nodeinformer.Get(ctx).Lister(), + PodLister: podLister.Pods(system.Namespace()), }) return Scheduler{ From 280eeaaaa170d26b25dfb70e1cf44046b5b4f4c2 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Mon, 23 Sep 2024 17:55:57 +0200 Subject: [PATCH 3/3] Migrate to library SchedulerFunc to use the new signature Signed-off-by: Pierangelo Di Pilato --- .../consumergroup/consumergroup_test.go | 48 ++++++++----------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go index 536d0574aa..987e4a9b55 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go @@ -61,12 +61,6 @@ import ( kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client/fake" ) -type SchedulerFunc func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) - -func (f SchedulerFunc) Schedule(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { - return f(vpod) -} - const ( testSchedulerKey = "scheduler" noTestScheduler = "no-scheduler" @@ -102,7 +96,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -189,7 +183,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -307,7 +301,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -402,7 +396,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -528,7 +522,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -702,7 +696,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -877,7 +871,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1034,7 +1028,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, }, nil @@ -1121,7 +1115,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1208,7 +1202,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1303,7 +1297,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 2}, @@ -1426,7 +1420,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 2}, @@ -1533,7 +1527,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1630,7 +1624,7 @@ func TestReconcileKind(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, io.EOF }), }, @@ -1762,7 +1756,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) { }, Key: ConsumerGroupTestKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return []eventingduckv1alpha1.Placement{ {PodName: "p1", VReplicas: 1}, {PodName: "p2", VReplicas: 1}, @@ -1926,7 +1920,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), }, @@ -1995,7 +1989,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), }, @@ -2121,7 +2115,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), }, @@ -2167,7 +2161,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrUnknownTopicOrPartition, @@ -2214,7 +2208,7 @@ func TestFinalizeKind(t *testing.T) { }, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrGroupIDNotFound, @@ -2262,7 +2256,7 @@ func TestFinalizeKind(t *testing.T) { WantErr: true, Key: testKey, OtherTestData: map[string]interface{}{ - testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { + testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) { return nil, nil }), kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrClusterAuthorizationFailed,