Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.15] Allow enabling sarama logging and disabling client pool (#4103) #4108

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import (
"context"
"log"
"os"
"strings"

"github.com/IBM/sarama"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -70,7 +73,16 @@
auth.OIDCLabelSelector,
eventing.DispatcherLabelSelectorStr,
)
ctx = clientpool.WithKafkaClientPool(ctx)

if v := os.Getenv("ENABLE_SARAMA_LOGGER"); strings.EqualFold(v, "true") {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags|log.Llongfile)

Check warning on line 78 in control-plane/cmd/kafka-controller/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/kafka-controller/main.go#L77-L78

Added lines #L77 - L78 were not covered by tests
}
if v := os.Getenv("ENABLE_SARAMA_DEBUG_LOGGER"); strings.EqualFold(v, "true") {
sarama.DebugLogger = log.New(os.Stdout, "[sarama][debug] ", log.LstdFlags|log.Llongfile)

Check warning on line 81 in control-plane/cmd/kafka-controller/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/kafka-controller/main.go#L80-L81

Added lines #L80 - L81 were not covered by tests
}
if v := os.Getenv("ENABLE_SARAMA_CLIENT_POOL"); v == "" || strings.EqualFold(v, "true") {
ctx = clientpool.WithKafkaClientPool(ctx)

Check warning on line 84 in control-plane/cmd/kafka-controller/main.go

View check run for this annotation

Codecov / codecov/patch

control-plane/cmd/kafka-controller/main.go#L83-L84

Added lines #L83 - L84 were not covered by tests
}

sharedmain.MainNamed(ctx, component,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: ENABLE_SARAMA_LOGGER
value: "false"
- name: ENABLE_SARAMA_DEBUG_LOGGER
value: "false"
- name: ENABLE_SARAMA_CLIENT_POOL
value: "true"

ports:
- containerPort: 9090
Expand Down
28 changes: 25 additions & 3 deletions control-plane/pkg/kafka/clientpool/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
"go.uber.org/zap"

corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"
"knative.dev/pkg/logging"
)

type KafkaClientKey struct{}
Expand Down Expand Up @@ -63,8 +64,21 @@
}

type GetKafkaClientFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error)

type GetKafkaClusterAdminFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error)

func DisabledGetKafkaClusterAdminFunc(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error) {
c, err := makeSaramaClient(bootstrapServers, secret, sarama.NewClient)
if err != nil {
return nil, err

Check warning on line 73 in control-plane/pkg/kafka/clientpool/clientpool.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/kafka/clientpool/clientpool.go#L70-L73

Added lines #L70 - L73 were not covered by tests
}
return sarama.NewClusterAdminFromClient(c)

Check warning on line 75 in control-plane/pkg/kafka/clientpool/clientpool.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/kafka/clientpool/clientpool.go#L75

Added line #L75 was not covered by tests
}

func DisabledGetClient(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
return makeSaramaClient(bootstrapServers, secret, sarama.NewClient)

Check warning on line 79 in control-plane/pkg/kafka/clientpool/clientpool.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/kafka/clientpool/clientpool.go#L78-L79

Added lines #L78 - L79 were not covered by tests
}

func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
client, err := cp.getClient(ctx, bootstrapServers, secret)
if err != nil {
Expand Down Expand Up @@ -141,7 +155,11 @@
}

func Get(ctx context.Context) *ClientPool {
return ctx.Value(ctxKey).(*ClientPool)
v := ctx.Value(ctxKey)
if v == nil {
return nil

Check warning on line 160 in control-plane/pkg/kafka/clientpool/clientpool.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/kafka/clientpool/clientpool.go#L158-L160

Added lines #L158 - L160 were not covered by tests
}
return v.(*ClientPool)

Check warning on line 162 in control-plane/pkg/kafka/clientpool/clientpool.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/kafka/clientpool/clientpool.go#L162

Added line #L162 was not covered by tests
}

func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clientKey {
Expand All @@ -162,6 +180,10 @@
}

func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
return makeSaramaClient(bootstrapServers, secret, cp.newSaramaClient)
}

func makeSaramaClient(bootstrapServers []string, secret *corev1.Secret, newSaramaClient kafka.NewClientFunc) (sarama.Client, error) {
secretOpt, err := security.NewSaramaSecurityOptionFromSecret(secret)
if err != nil {
return nil, err
Expand All @@ -172,7 +194,7 @@
return nil, err
}

saramaClient, err := cp.newSaramaClient(bootstrapServers, config)
saramaClient, err := newSaramaClient(bootstrapServers, config)
if err != nil {
return nil, err
}
Expand Down
18 changes: 11 additions & 7 deletions control-plane/pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@
configmapInformer := configmapinformer.Get(ctx)
featureFlags := apisconfig.DefaultFeaturesConfig()

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -79,11 +77,17 @@
DispatcherLabel: base.BrokerDispatcherLabel,
ReceiverLabel: base.BrokerReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
ConfigMapLister: configmapInformer.Lister(),
Env: env,
Counter: counter.NewExpiringCounter(ctx),
KafkaFeatureFlags: featureFlags,
ConfigMapLister: configmapInformer.Lister(),
Env: env,
Counter: counter.NewExpiringCounter(ctx),
KafkaFeatureFlags: featureFlags,
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc

Check warning on line 88 in control-plane/pkg/reconciler/broker/controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/controller.go#L88

Added line #L88 was not covered by tests
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

logger := logging.FromContext(ctx)
Expand Down
10 changes: 7 additions & 3 deletions control-plane/pkg/reconciler/broker/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@
logger.Fatal("unable to create Manifestival client-go client", zap.Error(err))
}

clientPool := clientpool.Get(ctx)

reconciler := &NamespacedReconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -103,7 +101,6 @@
DispatcherLabel: base.BrokerDispatcherLabel,
ReceiverLabel: base.BrokerReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
NamespaceLister: namespaceinformer.Get(ctx).Lister(),
ConfigMapLister: configmapInformer.Lister(),
ServiceAccountLister: serviceaccountinformer.Get(ctx).Lister(),
Expand All @@ -119,6 +116,13 @@
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin

Check warning on line 123 in control-plane/pkg/reconciler/broker/namespaced_controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/namespaced_controller.go#L119-L123

Added lines #L119 - L123 were not covered by tests
}

impl := brokerreconciler.NewImpl(ctx, reconciler, kafka.NamespacedBrokerClass, func(impl *controller.Impl) controller.Options {
return controller.Options{PromoteFilterFunc: kafka.NamespacedBrokerClassFilter()}
})
Expand Down
27 changes: 16 additions & 11 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"

"knative.dev/eventing/pkg/apis/feature"

consumergroupclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client"
consumergroupinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumergroup"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"
"knative.dev/eventing/pkg/apis/feature"

"knative.dev/pkg/controller"

Expand All @@ -66,8 +67,6 @@

messagingv1beta.RegisterAlternateKafkaChannelConditionSet(conditionSet)

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -79,14 +78,20 @@
DataPlaneNamespace: configs.SystemNamespace,
ReceiverLabel: base.ChannelReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
Env: configs,
ConfigMapLister: configmapInformer.Lister(),
ServiceLister: serviceinformer.Get(ctx).Lister(),
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
ConsumerGroupLister: consumerGroupInformer.Lister(),
InternalsClient: consumergroupclient.Get(ctx),
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
ConsumerGroupLister: consumerGroupInformer.Lister(),
InternalsClient: consumergroupclient.Get(ctx),
Env: configs,
ConfigMapLister: configmapInformer.Lister(),
ServiceLister: serviceinformer.Get(ctx).Lister(),
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc

Check warning on line 92 in control-plane/pkg/reconciler/channel/controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/channel/controller.go#L92

Added line #L92 was not covered by tests
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

logger := logging.FromContext(ctx)
Expand Down
13 changes: 9 additions & 4 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@
KafkaChannelScheduler: createKafkaScheduler(ctx, c, kafkainternals.ChannelStatefulSetName),
}

clientPool := clientpool.Get(ctx)

dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr)

r := &Reconciler{
Expand All @@ -132,17 +130,24 @@
PodLister: dispatcherPodInformer.Lister(),
KubeClient: kubeclient.Get(ctx),
NameGenerator: names.SimpleNameGenerator,
GetKafkaClient: clientPool.GetClient,
InitOffsetsFunc: offset.InitOffsets,
SystemNamespace: system.Namespace(),
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
KafkaFeatureFlags: config.DefaultFeaturesConfig(),
KedaClient: kedaclient.Get(ctx),
AutoscalerConfig: env.AutoscalerConfigMap,
DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx),
InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache[string, prober.Status, struct{}](ctx, 20*time.Minute),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
r.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
r.GetKafkaClient = clientpool.DisabledGetClient

Check warning on line 145 in control-plane/pkg/reconciler/consumergroup/controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/controller.go#L144-L145

Added lines #L144 - L145 were not covered by tests
} else {
r.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
r.GetKafkaClient = clientPool.GetClient
}

consumerInformer := consumer.Get(ctx)

consumerGroupInformer := consumergroup.Get(ctx)
Expand Down
14 changes: 9 additions & 5 deletions control-plane/pkg/reconciler/sink/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@

configmapInformer := configmapinformer.Get(ctx)

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -67,9 +65,15 @@
DataPlaneNamespace: configs.SystemNamespace,
ReceiverLabel: base.SinkReceiverLabel,
},
ConfigMapLister: configmapInformer.Lister(),
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
Env: configs,
ConfigMapLister: configmapInformer.Lister(),
Env: configs,
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc

Check warning on line 74 in control-plane/pkg/reconciler/sink/controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/sink/controller.go#L74

Added line #L74 was not covered by tests
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

_, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx)
Expand Down
13 changes: 9 additions & 4 deletions control-plane/pkg/reconciler/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@
triggerLister := triggerInformer.Lister()
oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector)

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -97,12 +95,19 @@
BrokerClass: kafka.BrokerClass,
DataPlaneConfigMapLabeler: base.NoopConfigmapOption,
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
GetKafkaClient: clientPool.GetClient,
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
InitOffsetsFunc: offset.InitOffsets,
ServiceAccountLister: oidcServiceaccountInformer.Lister(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
reconciler.GetKafkaClient = clientpool.DisabledGetClient

Check warning on line 105 in control-plane/pkg/reconciler/trigger/controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/trigger/controller.go#L104-L105

Added lines #L104 - L105 were not covered by tests
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
reconciler.GetKafkaClient = clientPool.GetClient
}

impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options {
return controller.Options{
FinalizerName: FinalizerName,
Expand Down
13 changes: 9 additions & 4 deletions control-plane/pkg/reconciler/trigger/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@
triggerLister := triggerInformer.Lister()
oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector)

clientPool := clientpool.Get(ctx)

reconciler := &NamespacedReconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -87,12 +85,19 @@
ServiceAccountLister: oidcServiceaccountInformer.Lister(),
EventingClient: eventingclient.Get(ctx),
Env: configs,
GetKafkaClient: clientPool.GetClient,
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
InitOffsetsFunc: offset.InitOffsets,
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
reconciler.GetKafkaClient = clientpool.DisabledGetClient

Check warning on line 95 in control-plane/pkg/reconciler/trigger/namespaced_controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/trigger/namespaced_controller.go#L94-L95

Added lines #L94 - L95 were not covered by tests
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
reconciler.GetKafkaClient = clientPool.GetClient
}

impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options {
return controller.Options{
FinalizerName: NamespacedFinalizerName,
Expand Down
Loading