diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go index 04650da6f8..952058f633 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go @@ -75,3 +75,15 @@ func IsKnownStatefulSet(name string) bool { name == ChannelStatefulSetName || name == BrokerStatefulSetName } + +func GetOwnerKindFromStatefulSetName(name string) (string, bool) { + switch name { + case SourceStatefulSetName: + return "KafkaSource", true + case ChannelStatefulSetName: + return "KafkaChannel", true + case BrokerStatefulSetName: + return "Trigger", true + } + return "", false +} diff --git a/control-plane/pkg/reconciler/consumer/controller.go b/control-plane/pkg/reconciler/consumer/controller.go index 587d5cecd7..539a09503a 100644 --- a/control-plane/pkg/reconciler/consumer/controller.go +++ b/control-plane/pkg/reconciler/consumer/controller.go @@ -35,6 +35,7 @@ import ( "knative.dev/pkg/system" "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" + kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumer" "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumergroup" creconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/reconciler/eventing/v1alpha1/consumer" @@ -90,7 +91,16 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I impl.GlobalResync(consumerInformer.Informer()) } - cgreconciler.ResyncOnStatefulSetChange(ctx, globalResync) + cgreconciler.ResyncOnStatefulSetChange(ctx, impl.FilteredGlobalResync, consumerInformer.Informer(), func(obj interface{}) (*kafkainternals.ConsumerGroup, bool) { + c, ok := obj.(*kafkainternals.Consumer) + if !ok { + return nil, false + } + + cgRef := c.GetConsumerGroup() + cg, err := r.ConsumerGroupLister.ConsumerGroups(c.GetNamespace()).Get(cgRef.Name) + return cg, err == nil + }) trustBundleConfigMapInformer.Informer().AddEventHandler(controller.HandleAll(globalResync)) diff --git a/control-plane/pkg/reconciler/consumergroup/controller.go b/control-plane/pkg/reconciler/consumergroup/controller.go index d5acd210ad..74b73cf04b 100644 --- a/control-plane/pkg/reconciler/consumergroup/controller.go +++ b/control-plane/pkg/reconciler/consumergroup/controller.go @@ -189,27 +189,53 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I }) consumerInformer.Informer().AddEventHandler(controller.HandleAll(enqueueConsumerGroupFromConsumer(impl.EnqueueKey))) - globalResync := func(interface{}) { - impl.GlobalResync(consumerGroupInformer.Informer()) - } - - ResyncOnStatefulSetChange(ctx, globalResync) + ResyncOnStatefulSetChange(ctx, impl.FilteredGlobalResync, consumerGroupInformer.Informer(), func(obj interface{}) (*kafkainternals.ConsumerGroup, bool) { + cg, ok := obj.(*kafkainternals.ConsumerGroup) + return cg, ok + }) //Todo: ScaledObject informer when KEDA is installed return impl } -func ResyncOnStatefulSetChange(ctx context.Context, handle func(interface{})) { +func ResyncOnStatefulSetChange(ctx context.Context, filteredResync func(f func(interface{}) bool, si cache.SharedInformer), informer cache.SharedInformer, getConsumerGroupFromObj func(obj interface{}) (*kafkainternals.ConsumerGroup, bool)) { systemNamespace := system.Namespace() + handleResync := func(obj interface{}) { + + ss, ok := obj.(*appsv1.StatefulSet) + if !ok { + return + } + + kind, ok := kafkainternals.GetOwnerKindFromStatefulSetName(ss.GetName()) + if !ok { + return + } + + filteredResync(func(i interface{}) bool { + cg, ok := getConsumerGroupFromObj(i) + if !ok { + return false + } + for _, owner := range cg.OwnerReferences { + if strings.EqualFold(owner.Kind, kind) { + return true + } + } + + return false + }, informer) + } + statefulset.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { ss := obj.(*appsv1.StatefulSet) return ss.GetNamespace() == systemNamespace && kafkainternals.IsKnownStatefulSet(ss.GetName()) }, Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: handle, + AddFunc: handleResync, UpdateFunc: func(oldObj, newObj interface{}) { o, ok := oldObj.(*appsv1.StatefulSet) if !ok { @@ -230,9 +256,9 @@ func ResyncOnStatefulSetChange(ctx context.Context, handle func(interface{})) { return } - handle(newObj) + handleResync(newObj) }, - DeleteFunc: handle, + DeleteFunc: handleResync, }, }) }