Skip to content

Commit

Permalink
only requeue consumergroups from appropriate statefulset on change (#…
Browse files Browse the repository at this point in the history
…3825)

* only requeue consumergroups from appropriate statefulset on change

Signed-off-by: Calum Murray <cmurray@redhat.com>

* inline resync logic

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix the consumer controller build error

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: re-queue with correct informer

Signed-off-by: Calum Murray <cmurray@redhat.com>

* cleanup: remove unused import

Signed-off-by: Calum Murray <cmurray@redhat.com>

* use strings.EqualFold

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 committed May 28, 2024
1 parent b31c5d3 commit 2fa69ad
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,15 @@ func IsKnownStatefulSet(name string) bool {
name == ChannelStatefulSetName ||
name == BrokerStatefulSetName
}

func GetOwnerKindFromStatefulSetName(name string) (string, bool) {
switch name {
case SourceStatefulSetName:
return "KafkaSource", true
case ChannelStatefulSetName:
return "KafkaChannel", true
case BrokerStatefulSetName:
return "Trigger", true
}
return "", false
}
12 changes: 11 additions & 1 deletion control-plane/pkg/reconciler/consumer/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"knative.dev/pkg/system"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
"knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumer"
"knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumergroup"
creconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/reconciler/eventing/v1alpha1/consumer"
Expand Down Expand Up @@ -90,7 +91,16 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
impl.GlobalResync(consumerInformer.Informer())
}

cgreconciler.ResyncOnStatefulSetChange(ctx, globalResync)
cgreconciler.ResyncOnStatefulSetChange(ctx, impl.FilteredGlobalResync, consumerInformer.Informer(), func(obj interface{}) (*kafkainternals.ConsumerGroup, bool) {
c, ok := obj.(*kafkainternals.Consumer)
if !ok {
return nil, false
}

cgRef := c.GetConsumerGroup()
cg, err := r.ConsumerGroupLister.ConsumerGroups(c.GetNamespace()).Get(cgRef.Name)
return cg, err == nil
})

trustBundleConfigMapInformer.Informer().AddEventHandler(controller.HandleAll(globalResync))

Expand Down
44 changes: 35 additions & 9 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,27 +189,53 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
})
consumerInformer.Informer().AddEventHandler(controller.HandleAll(enqueueConsumerGroupFromConsumer(impl.EnqueueKey)))

globalResync := func(interface{}) {
impl.GlobalResync(consumerGroupInformer.Informer())
}

ResyncOnStatefulSetChange(ctx, globalResync)
ResyncOnStatefulSetChange(ctx, impl.FilteredGlobalResync, consumerGroupInformer.Informer(), func(obj interface{}) (*kafkainternals.ConsumerGroup, bool) {
cg, ok := obj.(*kafkainternals.ConsumerGroup)
return cg, ok
})

//Todo: ScaledObject informer when KEDA is installed

return impl
}

func ResyncOnStatefulSetChange(ctx context.Context, handle func(interface{})) {
func ResyncOnStatefulSetChange(ctx context.Context, filteredResync func(f func(interface{}) bool, si cache.SharedInformer), informer cache.SharedInformer, getConsumerGroupFromObj func(obj interface{}) (*kafkainternals.ConsumerGroup, bool)) {
systemNamespace := system.Namespace()

handleResync := func(obj interface{}) {

ss, ok := obj.(*appsv1.StatefulSet)
if !ok {
return
}

kind, ok := kafkainternals.GetOwnerKindFromStatefulSetName(ss.GetName())
if !ok {
return
}

filteredResync(func(i interface{}) bool {
cg, ok := getConsumerGroupFromObj(i)
if !ok {
return false
}
for _, owner := range cg.OwnerReferences {
if strings.EqualFold(owner.Kind, kind) {
return true
}
}

return false
}, informer)
}

statefulset.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
ss := obj.(*appsv1.StatefulSet)
return ss.GetNamespace() == systemNamespace && kafkainternals.IsKnownStatefulSet(ss.GetName())
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: handle,
AddFunc: handleResync,
UpdateFunc: func(oldObj, newObj interface{}) {
o, ok := oldObj.(*appsv1.StatefulSet)
if !ok {
Expand All @@ -230,9 +256,9 @@ func ResyncOnStatefulSetChange(ctx context.Context, handle func(interface{})) {
return
}

handle(newObj)
handleResync(newObj)
},
DeleteFunc: handle,
DeleteFunc: handleResync,
},
})
}
Expand Down

0 comments on commit 2fa69ad

Please sign in to comment.