From f62e51fafa230cae6694c1429054755515875cfe Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Fri, 20 Sep 2024 10:51:44 +0200 Subject: [PATCH] Allow enabling sarama logging and disabling client pool (#4103) (#4108) Add 3 new environment variables: ``` ENABLE_SARAMA_LOGGER (default: false) ENABLE_SARAMA_DEBUG_LOGGER (default: false) ENABLE_SARAMA_CLIENT_POOL (default: true) ``` Signed-off-by: Pierangelo Di Pilato --- control-plane/cmd/kafka-controller/main.go | 14 +++++++++- .../200-controller/500-controller.yaml | 6 ++++ .../pkg/kafka/clientpool/clientpool.go | 28 +++++++++++++++++-- .../pkg/reconciler/broker/controller.go | 18 +++++++----- .../broker/namespaced_controller.go | 10 +++++-- .../pkg/reconciler/channel/controller.go | 24 +++++++++------- .../reconciler/consumergroup/controller.go | 13 ++++++--- .../pkg/reconciler/sink/controller.go | 14 ++++++---- .../pkg/reconciler/trigger/controller.go | 13 ++++++--- .../trigger/namespaced_controller.go | 13 ++++++--- 10 files changed, 112 insertions(+), 41 deletions(-) diff --git a/control-plane/cmd/kafka-controller/main.go b/control-plane/cmd/kafka-controller/main.go index 70a9df64b0..0e2d833465 100644 --- a/control-plane/cmd/kafka-controller/main.go +++ b/control-plane/cmd/kafka-controller/main.go @@ -19,7 +19,10 @@ package main import ( "context" "log" + "os" + "strings" + "github.com/IBM/sarama" filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -70,7 +73,16 @@ func main() { auth.OIDCLabelSelector, eventing.DispatcherLabelSelectorStr, ) - ctx = clientpool.WithKafkaClientPool(ctx) + + if v := os.Getenv("ENABLE_SARAMA_LOGGER"); strings.EqualFold(v, "true") { + sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags|log.Llongfile) + } + if v := os.Getenv("ENABLE_SARAMA_DEBUG_LOGGER"); strings.EqualFold(v, "true") { + sarama.DebugLogger = log.New(os.Stdout, "[sarama][debug] ", log.LstdFlags|log.Llongfile) + } + if v := os.Getenv("ENABLE_SARAMA_CLIENT_POOL"); v == "" || strings.EqualFold(v, "true") { + ctx = clientpool.WithKafkaClientPool(ctx) + } sharedmain.MainNamed(ctx, component, diff --git a/control-plane/config/eventing-kafka-broker/200-controller/500-controller.yaml b/control-plane/config/eventing-kafka-broker/200-controller/500-controller.yaml index 13a7df4c1c..58133f2dc2 100644 --- a/control-plane/config/eventing-kafka-broker/200-controller/500-controller.yaml +++ b/control-plane/config/eventing-kafka-broker/200-controller/500-controller.yaml @@ -173,6 +173,12 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: ENABLE_SARAMA_LOGGER + value: "false" + - name: ENABLE_SARAMA_DEBUG_LOGGER + value: "false" + - name: ENABLE_SARAMA_CLIENT_POOL + value: "true" ports: - containerPort: 9090 diff --git a/control-plane/pkg/kafka/clientpool/clientpool.go b/control-plane/pkg/kafka/clientpool/clientpool.go index f8261f2b2b..3df2d5bbec 100644 --- a/control-plane/pkg/kafka/clientpool/clientpool.go +++ b/control-plane/pkg/kafka/clientpool/clientpool.go @@ -27,10 +27,11 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "knative.dev/pkg/logging" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" "knative.dev/eventing-kafka-broker/control-plane/pkg/security" - "knative.dev/pkg/logging" ) type KafkaClientKey struct{} @@ -63,8 +64,21 @@ type ClientPool struct { } type GetKafkaClientFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) + type GetKafkaClusterAdminFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error) +func DisabledGetKafkaClusterAdminFunc(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error) { + c, err := makeSaramaClient(bootstrapServers, secret, sarama.NewClient) + if err != nil { + return nil, err + } + return sarama.NewClusterAdminFromClient(c) +} + +func DisabledGetClient(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) { + return makeSaramaClient(bootstrapServers, secret, sarama.NewClient) +} + func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) { client, err := cp.getClient(ctx, bootstrapServers, secret) if err != nil { @@ -141,7 +155,11 @@ func (cp *ClientPool) GetClusterAdmin(ctx context.Context, bootstrapServers []st } func Get(ctx context.Context) *ClientPool { - return ctx.Value(ctxKey).(*ClientPool) + v := ctx.Value(ctxKey) + if v == nil { + return nil + } + return v.(*ClientPool) } func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clientKey { @@ -162,6 +180,10 @@ func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clien } func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) { + return makeSaramaClient(bootstrapServers, secret, cp.newSaramaClient) +} + +func makeSaramaClient(bootstrapServers []string, secret *corev1.Secret, newSaramaClient kafka.NewClientFunc) (sarama.Client, error) { secretOpt, err := security.NewSaramaSecurityOptionFromSecret(secret) if err != nil { return nil, err @@ -172,7 +194,7 @@ func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1 return nil, err } - saramaClient, err := cp.newSaramaClient(bootstrapServers, config) + saramaClient, err := newSaramaClient(bootstrapServers, config) if err != nil { return nil, err } diff --git a/control-plane/pkg/reconciler/broker/controller.go b/control-plane/pkg/reconciler/broker/controller.go index 369894f95c..b19dab433e 100644 --- a/control-plane/pkg/reconciler/broker/controller.go +++ b/control-plane/pkg/reconciler/broker/controller.go @@ -65,8 +65,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E configmapInformer := configmapinformer.Get(ctx) featureFlags := apisconfig.DefaultFeaturesConfig() - clientPool := clientpool.Get(ctx) - reconciler := &Reconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -79,11 +77,17 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E DispatcherLabel: base.BrokerDispatcherLabel, ReceiverLabel: base.BrokerReceiverLabel, }, - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, - ConfigMapLister: configmapInformer.Lister(), - Env: env, - Counter: counter.NewExpiringCounter(ctx), - KafkaFeatureFlags: featureFlags, + ConfigMapLister: configmapInformer.Lister(), + Env: env, + Counter: counter.NewExpiringCounter(ctx), + KafkaFeatureFlags: featureFlags, + } + + clientPool := clientpool.Get(ctx) + if clientPool == nil { + reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + } else { + reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin } logger := logging.FromContext(ctx) diff --git a/control-plane/pkg/reconciler/broker/namespaced_controller.go b/control-plane/pkg/reconciler/broker/namespaced_controller.go index 4441ee475a..214a3e2509 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_controller.go +++ b/control-plane/pkg/reconciler/broker/namespaced_controller.go @@ -88,8 +88,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env logger.Fatal("unable to create Manifestival client-go client", zap.Error(err)) } - clientPool := clientpool.Get(ctx) - reconciler := &NamespacedReconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -103,7 +101,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env DispatcherLabel: base.BrokerDispatcherLabel, ReceiverLabel: base.BrokerReceiverLabel, }, - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, NamespaceLister: namespaceinformer.Get(ctx).Lister(), ConfigMapLister: configmapInformer.Lister(), ServiceAccountLister: serviceaccountinformer.Get(ctx).Lister(), @@ -119,6 +116,13 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } + clientPool := clientpool.Get(ctx) + if clientPool == nil { + reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + } else { + reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin + } + impl := brokerreconciler.NewImpl(ctx, reconciler, kafka.NamespacedBrokerClass, func(impl *controller.Impl) controller.Options { return controller.Options{PromoteFilterFunc: kafka.NamespacedBrokerClassFilter()} }) diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index f11c29d477..6659d46f0a 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -67,8 +67,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf messagingv1beta.RegisterAlternateKafkaChannelConditionSet(conditionSet) - clientPool := clientpool.Get(ctx) - reconciler := &Reconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -80,14 +78,20 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf DataPlaneNamespace: configs.SystemNamespace, ReceiverLabel: base.ChannelReceiverLabel, }, - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, - Env: configs, - ConfigMapLister: configmapInformer.Lister(), - ServiceLister: serviceinformer.Get(ctx).Lister(), - SubscriptionLister: subscriptioninformer.Get(ctx).Lister(), - ConsumerGroupLister: consumerGroupInformer.Lister(), - InternalsClient: consumergroupclient.Get(ctx), - KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), + ConsumerGroupLister: consumerGroupInformer.Lister(), + InternalsClient: consumergroupclient.Get(ctx), + Env: configs, + ConfigMapLister: configmapInformer.Lister(), + ServiceLister: serviceinformer.Get(ctx).Lister(), + SubscriptionLister: subscriptioninformer.Get(ctx).Lister(), + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), + } + + clientPool := clientpool.Get(ctx) + if clientPool == nil { + reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + } else { + reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin } logger := logging.FromContext(ctx) diff --git a/control-plane/pkg/reconciler/consumergroup/controller.go b/control-plane/pkg/reconciler/consumergroup/controller.go index ecc96df556..83bf44bc3f 100644 --- a/control-plane/pkg/reconciler/consumergroup/controller.go +++ b/control-plane/pkg/reconciler/consumergroup/controller.go @@ -119,8 +119,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I KafkaChannelScheduler: createKafkaScheduler(ctx, c, kafkainternals.ChannelStatefulSetName), } - clientPool := clientpool.Get(ctx) - dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr) r := &Reconciler{ @@ -132,10 +130,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I PodLister: dispatcherPodInformer.Lister(), KubeClient: kubeclient.Get(ctx), NameGenerator: names.SimpleNameGenerator, - GetKafkaClient: clientPool.GetClient, InitOffsetsFunc: offset.InitOffsets, SystemNamespace: system.Namespace(), - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, KafkaFeatureFlags: config.DefaultFeaturesConfig(), KedaClient: kedaclient.Get(ctx), AutoscalerConfig: env.AutoscalerConfigMap, @@ -143,6 +139,15 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache[string, prober.Status, struct{}](ctx, 20*time.Minute), } + clientPool := clientpool.Get(ctx) + if clientPool == nil { + r.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + r.GetKafkaClient = clientpool.DisabledGetClient + } else { + r.GetKafkaClusterAdmin = clientPool.GetClusterAdmin + r.GetKafkaClient = clientPool.GetClient + } + consumerInformer := consumer.Get(ctx) consumerGroupInformer := consumergroup.Get(ctx) diff --git a/control-plane/pkg/reconciler/sink/controller.go b/control-plane/pkg/reconciler/sink/controller.go index 43f2d32675..28669faf33 100644 --- a/control-plane/pkg/reconciler/sink/controller.go +++ b/control-plane/pkg/reconciler/sink/controller.go @@ -54,8 +54,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf configmapInformer := configmapinformer.Get(ctx) - clientPool := clientpool.Get(ctx) - reconciler := &Reconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -67,9 +65,15 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf DataPlaneNamespace: configs.SystemNamespace, ReceiverLabel: base.SinkReceiverLabel, }, - ConfigMapLister: configmapInformer.Lister(), - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, - Env: configs, + ConfigMapLister: configmapInformer.Lister(), + Env: configs, + } + + clientPool := clientpool.Get(ctx) + if clientPool == nil { + reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + } else { + reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin } _, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx) diff --git a/control-plane/pkg/reconciler/trigger/controller.go b/control-plane/pkg/reconciler/trigger/controller.go index 41138ca061..1d576ab94c 100644 --- a/control-plane/pkg/reconciler/trigger/controller.go +++ b/control-plane/pkg/reconciler/trigger/controller.go @@ -72,8 +72,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf triggerLister := triggerInformer.Lister() oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector) - clientPool := clientpool.Get(ctx) - reconciler := &Reconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -97,12 +95,19 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf BrokerClass: kafka.BrokerClass, DataPlaneConfigMapLabeler: base.NoopConfigmapOption, KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), - GetKafkaClient: clientPool.GetClient, - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, InitOffsetsFunc: offset.InitOffsets, ServiceAccountLister: oidcServiceaccountInformer.Lister(), } + clientPool := clientpool.Get(ctx) + if clientPool == nil { + reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + reconciler.GetKafkaClient = clientpool.DisabledGetClient + } else { + reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin + reconciler.GetKafkaClient = clientPool.GetClient + } + impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options { return controller.Options{ FinalizerName: FinalizerName, diff --git a/control-plane/pkg/reconciler/trigger/namespaced_controller.go b/control-plane/pkg/reconciler/trigger/namespaced_controller.go index e021784959..e1ae23e98a 100644 --- a/control-plane/pkg/reconciler/trigger/namespaced_controller.go +++ b/control-plane/pkg/reconciler/trigger/namespaced_controller.go @@ -64,8 +64,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con triggerLister := triggerInformer.Lister() oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector) - clientPool := clientpool.Get(ctx) - reconciler := &NamespacedReconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -87,12 +85,19 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con ServiceAccountLister: oidcServiceaccountInformer.Lister(), EventingClient: eventingclient.Get(ctx), Env: configs, - GetKafkaClient: clientPool.GetClient, - GetKafkaClusterAdmin: clientPool.GetClusterAdmin, InitOffsetsFunc: offset.InitOffsets, KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } + clientPool := clientpool.Get(ctx) + if clientPool == nil { + reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc + reconciler.GetKafkaClient = clientpool.DisabledGetClient + } else { + reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin + reconciler.GetKafkaClient = clientPool.GetClient + } + impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options { return controller.Options{ FinalizerName: NamespacedFinalizerName,