diff --git a/control-plane/cmd/kafka-controller/main.go b/control-plane/cmd/kafka-controller/main.go index d39dfce1b9..e10735efe1 100644 --- a/control-plane/cmd/kafka-controller/main.go +++ b/control-plane/cmd/kafka-controller/main.go @@ -19,7 +19,10 @@ package main import ( "context" "log" + "os" + "strings" + "github.com/IBM/sarama" filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -68,7 +71,16 @@ func main() { eventingtls.TrustBundleLabelSelector, auth.OIDCLabelSelector, ) - ctx = clientpool.WithKafkaClientPool(ctx) + + if v := os.Getenv("ENABLE_SARAMA_LOGGER"); strings.EqualFold(v, "true") { + sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags|log.Llongfile) + } + if v := os.Getenv("ENABLE_SARAMA_DEBUG_LOGGER"); strings.EqualFold(v, "true") { + sarama.DebugLogger = log.New(os.Stdout, "[sarama][debug] ", log.LstdFlags|log.Llongfile) + } + if v := os.Getenv("ENABLE_SARAMA_CLIENT_POOL"); v == "" || strings.EqualFold(v, "true") { + ctx = clientpool.WithKafkaClientPool(ctx) + } sharedmain.MainNamed(ctx, component, diff --git a/control-plane/config/eventing-kafka-broker/200-controller/500-controller.yaml b/control-plane/config/eventing-kafka-broker/200-controller/500-controller.yaml index 419278511b..57ac761294 100644 --- a/control-plane/config/eventing-kafka-broker/200-controller/500-controller.yaml +++ b/control-plane/config/eventing-kafka-broker/200-controller/500-controller.yaml @@ -173,6 +173,12 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: ENABLE_SARAMA_LOGGER + value: "false" + - name: ENABLE_SARAMA_DEBUG_LOGGER + value: "false" + - name: ENABLE_SARAMA_CLIENT_POOL + value: "true" ports: - containerPort: 9090 diff --git a/control-plane/pkg/kafka/clientpool/clientpool.go b/control-plane/pkg/kafka/clientpool/clientpool.go index f8261f2b2b..3df2d5bbec 100644 --- a/control-plane/pkg/kafka/clientpool/clientpool.go +++ b/control-plane/pkg/kafka/clientpool/clientpool.go @@ -27,10 +27,11 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "knative.dev/pkg/logging" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" "knative.dev/eventing-kafka-broker/control-plane/pkg/security" - "knative.dev/pkg/logging" ) type KafkaClientKey struct{} @@ -63,8 +64,21 @@ type ClientPool struct { } type GetKafkaClientFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) + type GetKafkaClusterAdminFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error) +func DisabledGetKafkaClusterAdminFunc(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error) { + c, err := makeSaramaClient(bootstrapServers, secret, sarama.NewClient) + if err != nil { + return nil, err + } + return sarama.NewClusterAdminFromClient(c) +} + +func DisabledGetClient(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) { + return makeSaramaClient(bootstrapServers, secret, sarama.NewClient) +} + func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) { client, err := cp.getClient(ctx, bootstrapServers, secret) if err != nil { @@ -141,7 +155,11 @@ func (cp *ClientPool) GetClusterAdmin(ctx context.Context, bootstrapServers []st } func Get(ctx context.Context) *ClientPool { - return ctx.Value(ctxKey).(*ClientPool) + v := ctx.Value(ctxKey) + if v == nil { + return nil + } + return v.(*ClientPool) } func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clientKey { @@ -162,6 +180,10 @@ func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clien } func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) { + return makeSaramaClient(bootstrapServers, secret, cp.newSaramaClient) +} + +func makeSaramaClient(bootstrapServers []string, secret *corev1.Secret, newSaramaClient kafka.NewClientFunc) (sarama.Client, error) { secretOpt, err := security.NewSaramaSecurityOptionFromSecret(secret) if err != nil { return nil, err @@ -172,7 +194,7 @@ func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1 return nil, err } - saramaClient, err := cp.newSaramaClient(bootstrapServers, config) + saramaClient, err := newSaramaClient(bootstrapServers, config) if err != nil { return nil, err } diff --git a/control-plane/pkg/reconciler/broker/controller.go b/control-plane/pkg/reconciler/broker/controller.go index 9501d1fb1e..57b53c1814 100644 --- a/control-plane/pkg/reconciler/broker/controller.go +++ b/control-plane/pkg/reconciler/broker/controller.go @@ -65,8 +65,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E configmapInformer := configmapinformer.Get(ctx) featureFlags := apisconfig.DefaultFeaturesConfig() - clientPool := clientpool.Get(ctx) - reconciler := &Reconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -79,11 +77,17 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E DispatcherLabel: base.BrokerDispatcherLabel, ReceiverLabel: base.BrokerReceiverLabel, }, - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, - ConfigMapLister: configmapInformer.Lister(), - Env: env, - Counter: counter.NewExpiringCounter(ctx), - KafkaFeatureFlags: featureFlags, + ConfigMapLister: configmapInformer.Lister(), + Env: env, + Counter: counter.NewExpiringCounter(ctx), + KafkaFeatureFlags: featureFlags, + } + + clientPool := clientpool.Get(ctx) + if clientPool == nil { + reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + } else { + reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin } logger := logging.FromContext(ctx) diff --git a/control-plane/pkg/reconciler/broker/namespaced_controller.go b/control-plane/pkg/reconciler/broker/namespaced_controller.go index 4441ee475a..214a3e2509 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_controller.go +++ b/control-plane/pkg/reconciler/broker/namespaced_controller.go @@ -88,8 +88,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env logger.Fatal("unable to create Manifestival client-go client", zap.Error(err)) } - clientPool := clientpool.Get(ctx) - reconciler := &NamespacedReconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -103,7 +101,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env DispatcherLabel: base.BrokerDispatcherLabel, ReceiverLabel: base.BrokerReceiverLabel, }, - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, NamespaceLister: namespaceinformer.Get(ctx).Lister(), ConfigMapLister: configmapInformer.Lister(), ServiceAccountLister: serviceaccountinformer.Get(ctx).Lister(), @@ -119,6 +116,13 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } + clientPool := clientpool.Get(ctx) + if clientPool == nil { + reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + } else { + reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin + } + impl := brokerreconciler.NewImpl(ctx, reconciler, kafka.NamespacedBrokerClass, func(impl *controller.Impl) controller.Options { return controller.Options{PromoteFilterFunc: kafka.NamespacedBrokerClassFilter()} }) diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index 2cc2636e3c..213861d8c0 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -31,13 +31,6 @@ import ( "knative.dev/eventing/pkg/apis/feature" subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription" - messagingv1beta "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1" - kafkachannelinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel" - kafkachannelreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool" - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset" - kubeclient "knative.dev/pkg/client/injection/kube/client" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap" podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" @@ -45,6 +38,13 @@ import ( serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service" "knative.dev/pkg/configmap" + messagingv1beta "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1" + kafkachannelinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel" + kafkachannelreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset" + "knative.dev/pkg/controller" "knative.dev/pkg/logging" "knative.dev/pkg/network" @@ -63,8 +63,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf configmapInformer := configmapinformer.Get(ctx) serviceInformer := serviceinformer.Get(ctx) - clientPool := clientpool.Get(ctx) - reconciler := &Reconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -77,14 +75,21 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf DispatcherLabel: base.ChannelDispatcherLabel, ReceiverLabel: base.ChannelReceiverLabel, }, - SubscriptionLister: subscriptioninformer.Get(ctx).Lister(), - GetKafkaClient: clientPool.GetClient, - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, - InitOffsetsFunc: offset.InitOffsets, - Env: configs, - ConfigMapLister: configmapInformer.Lister(), - ServiceLister: serviceInformer.Lister(), - KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), + Env: configs, + InitOffsetsFunc: offset.InitOffsets, + ConfigMapLister: configmapInformer.Lister(), + ServiceLister: serviceinformer.Get(ctx).Lister(), + SubscriptionLister: subscriptioninformer.Get(ctx).Lister(), + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), + } + + clientPool := clientpool.Get(ctx) + if clientPool == nil { + reconciler.GetKafkaClient = clientpool.DisabledGetClient + reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + } else { + reconciler.GetKafkaClient = clientPool.GetClient + reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin } logger := logging.FromContext(ctx) diff --git a/control-plane/pkg/reconciler/consumergroup/controller.go b/control-plane/pkg/reconciler/consumergroup/controller.go index d5acd210ad..ae011cb58f 100644 --- a/control-plane/pkg/reconciler/consumergroup/controller.go +++ b/control-plane/pkg/reconciler/consumergroup/controller.go @@ -117,8 +117,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I //KafkaChannelScheduler: createKafkaScheduler(ctx, c, kafkainternals.ChannelStatefulSetName), //To be added with channel/v2 reconciler version only } - clientPool := clientpool.Get(ctx) - r := &Reconciler{ SchedulerFunc: func(s string) (Scheduler, bool) { sched, ok := schedulers[strings.ToLower(s)]; return sched, ok }, ConsumerLister: consumer.Get(ctx).Lister(), @@ -128,10 +126,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I PodLister: podinformer.Get(ctx).Lister(), KubeClient: kubeclient.Get(ctx), NameGenerator: names.SimpleNameGenerator, - GetKafkaClient: clientPool.GetClient, InitOffsetsFunc: offset.InitOffsets, SystemNamespace: system.Namespace(), - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, KafkaFeatureFlags: config.DefaultFeaturesConfig(), KedaClient: kedaclient.Get(ctx), AutoscalerConfig: env.AutoscalerConfigMap, @@ -139,6 +135,15 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache[string, prober.Status, struct{}](ctx, 20*time.Minute), } + clientPool := clientpool.Get(ctx) + if clientPool == nil { + r.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + r.GetKafkaClient = clientpool.DisabledGetClient + } else { + r.GetKafkaClusterAdmin = clientPool.GetClusterAdmin + r.GetKafkaClient = clientPool.GetClient + } + consumerInformer := consumer.Get(ctx) consumerGroupInformer := consumergroup.Get(ctx) diff --git a/control-plane/pkg/reconciler/sink/controller.go b/control-plane/pkg/reconciler/sink/controller.go index ba70b34ae5..ea5241ad02 100644 --- a/control-plane/pkg/reconciler/sink/controller.go +++ b/control-plane/pkg/reconciler/sink/controller.go @@ -54,8 +54,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf configmapInformer := configmapinformer.Get(ctx) - clientPool := clientpool.Get(ctx) - reconciler := &Reconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -67,9 +65,15 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf DataPlaneNamespace: configs.SystemNamespace, ReceiverLabel: base.SinkReceiverLabel, }, - ConfigMapLister: configmapInformer.Lister(), - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, - Env: configs, + ConfigMapLister: configmapInformer.Lister(), + Env: configs, + } + + clientPool := clientpool.Get(ctx) + if clientPool == nil { + reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + } else { + reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin } _, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx) diff --git a/control-plane/pkg/reconciler/trigger/controller.go b/control-plane/pkg/reconciler/trigger/controller.go index 4f59f70d05..0b65d37a53 100644 --- a/control-plane/pkg/reconciler/trigger/controller.go +++ b/control-plane/pkg/reconciler/trigger/controller.go @@ -67,8 +67,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf triggerLister := triggerInformer.Lister() serviceaccountInformer := serviceaccountinformer.Get(ctx) - clientPool := clientpool.Get(ctx) - reconciler := &Reconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -92,12 +90,19 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf BrokerClass: kafka.BrokerClass, DataPlaneConfigMapLabeler: base.NoopConfigmapOption, KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), - GetKafkaClient: clientPool.GetClient, - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, InitOffsetsFunc: offset.InitOffsets, ServiceAccountLister: serviceaccountInformer.Lister(), } + clientPool := clientpool.Get(ctx) + if clientPool == nil { + reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + reconciler.GetKafkaClient = clientpool.DisabledGetClient + } else { + reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin + reconciler.GetKafkaClient = clientPool.GetClient + } + impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options { return controller.Options{ FinalizerName: FinalizerName, diff --git a/control-plane/pkg/reconciler/trigger/namespaced_controller.go b/control-plane/pkg/reconciler/trigger/namespaced_controller.go index c8f622c8ab..22e11ab4ec 100644 --- a/control-plane/pkg/reconciler/trigger/namespaced_controller.go +++ b/control-plane/pkg/reconciler/trigger/namespaced_controller.go @@ -62,8 +62,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con triggerLister := triggerInformer.Lister() serviceaccountInformer := serviceaccountinformer.Get(ctx) - clientPool := clientpool.Get(ctx) - reconciler := &NamespacedReconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -85,12 +83,19 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con ServiceAccountLister: serviceaccountInformer.Lister(), EventingClient: eventingclient.Get(ctx), Env: configs, - GetKafkaClient: clientPool.GetClient, - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, InitOffsetsFunc: offset.InitOffsets, KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } + clientPool := clientpool.Get(ctx) + if clientPool == nil { + reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + reconciler.GetKafkaClient = clientpool.DisabledGetClient + } else { + reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin + reconciler.GetKafkaClient = clientPool.GetClient + } + impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options { return controller.Options{ FinalizerName: NamespacedFinalizerName,