Skip to content

Commit

Permalink
Merge branch 'main' into bump-sarama-1.43.3
Browse files Browse the repository at this point in the history
  • Loading branch information
pierDipi authored Sep 20, 2024
2 parents 3034e24 + 6c4d180 commit 1980050
Show file tree
Hide file tree
Showing 110 changed files with 3,835 additions and 1,641 deletions.
14 changes: 13 additions & 1 deletion control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package main
import (
"context"
"log"
"os"
"strings"

"github.com/IBM/sarama"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -70,7 +73,16 @@ func main() {
auth.OIDCLabelSelector,
eventing.DispatcherLabelSelectorStr,
)
ctx = clientpool.WithKafkaClientPool(ctx)

if v := os.Getenv("ENABLE_SARAMA_LOGGER"); strings.EqualFold(v, "true") {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags|log.Llongfile)
}
if v := os.Getenv("ENABLE_SARAMA_DEBUG_LOGGER"); strings.EqualFold(v, "true") {
sarama.DebugLogger = log.New(os.Stdout, "[sarama][debug] ", log.LstdFlags|log.Llongfile)
}
if v := os.Getenv("ENABLE_SARAMA_CLIENT_POOL"); v == "" || strings.EqualFold(v, "true") {
ctx = clientpool.WithKafkaClientPool(ctx)
}

sharedmain.MainNamed(ctx, component,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: ENABLE_SARAMA_LOGGER
value: "false"
- name: ENABLE_SARAMA_DEBUG_LOGGER
value: "false"
- name: ENABLE_SARAMA_CLIENT_POOL
value: "true"

ports:
- containerPort: 9090
Expand Down
28 changes: 25 additions & 3 deletions control-plane/pkg/kafka/clientpool/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (
"go.uber.org/zap"

corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"
"knative.dev/pkg/logging"
)

type KafkaClientKey struct{}
Expand Down Expand Up @@ -63,8 +64,21 @@ type ClientPool struct {
}

type GetKafkaClientFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error)

type GetKafkaClusterAdminFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error)

func DisabledGetKafkaClusterAdminFunc(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error) {
c, err := makeSaramaClient(bootstrapServers, secret, sarama.NewClient)
if err != nil {
return nil, err
}
return sarama.NewClusterAdminFromClient(c)
}

func DisabledGetClient(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
return makeSaramaClient(bootstrapServers, secret, sarama.NewClient)
}

func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
client, err := cp.getClient(ctx, bootstrapServers, secret)
if err != nil {
Expand Down Expand Up @@ -141,7 +155,11 @@ func (cp *ClientPool) GetClusterAdmin(ctx context.Context, bootstrapServers []st
}

func Get(ctx context.Context) *ClientPool {
return ctx.Value(ctxKey).(*ClientPool)
v := ctx.Value(ctxKey)
if v == nil {
return nil
}
return v.(*ClientPool)
}

func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clientKey {
Expand All @@ -162,6 +180,10 @@ func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clien
}

func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
return makeSaramaClient(bootstrapServers, secret, cp.newSaramaClient)
}

func makeSaramaClient(bootstrapServers []string, secret *corev1.Secret, newSaramaClient kafka.NewClientFunc) (sarama.Client, error) {
secretOpt, err := security.NewSaramaSecurityOptionFromSecret(secret)
if err != nil {
return nil, err
Expand All @@ -172,7 +194,7 @@ func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1
return nil, err
}

saramaClient, err := cp.newSaramaClient(bootstrapServers, config)
saramaClient, err := newSaramaClient(bootstrapServers, config)
if err != nil {
return nil, err
}
Expand Down
20 changes: 12 additions & 8 deletions control-plane/pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
eventPolicyInformer := eventpolicyinformer.Get(ctx)
featureFlags := apisconfig.DefaultFeaturesConfig()

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -83,12 +81,18 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
DispatcherLabel: base.BrokerDispatcherLabel,
ReceiverLabel: base.BrokerReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
ConfigMapLister: configmapInformer.Lister(),
EventPolicyLister: eventPolicyInformer.Lister(),
Env: env,
Counter: counter.NewExpiringCounter(ctx),
KafkaFeatureFlags: featureFlags,
ConfigMapLister: configmapInformer.Lister(),
EventPolicyLister: eventPolicyInformer.Lister(),
Env: env,
Counter: counter.NewExpiringCounter(ctx),
KafkaFeatureFlags: featureFlags,
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

logger := logging.FromContext(ctx)
Expand Down
10 changes: 7 additions & 3 deletions control-plane/pkg/reconciler/broker/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
logger.Fatal("unable to create Manifestival client-go client", zap.Error(err))
}

clientPool := clientpool.Get(ctx)

reconciler := &NamespacedReconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -107,7 +105,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
DispatcherLabel: base.BrokerDispatcherLabel,
ReceiverLabel: base.BrokerReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
NamespaceLister: namespaceinformer.Get(ctx).Lister(),
ConfigMapLister: configmapInformer.Lister(),
ServiceAccountLister: serviceaccountinformer.Get(ctx).Lister(),
Expand All @@ -124,6 +121,13 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

impl := brokerreconciler.NewImpl(ctx, reconciler, kafka.NamespacedBrokerClass, func(impl *controller.Impl) controller.Options {
return controller.Options{PromoteFilterFunc: kafka.NamespacedBrokerClassFilter()}
})
Expand Down
29 changes: 17 additions & 12 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ import (

"knative.dev/pkg/controller"

"knative.dev/eventing/pkg/auth"

apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumergroup"
"knative.dev/eventing/pkg/auth"
)

func NewController(ctx context.Context, watcher configmap.Watcher, configs *config.Env) *controller.Impl {
Expand All @@ -71,8 +72,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf

messagingv1beta.RegisterAlternateKafkaChannelConditionSet(conditionSet)

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -84,15 +83,21 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
DataPlaneNamespace: configs.SystemNamespace,
ReceiverLabel: base.ChannelReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
Env: configs,
ConfigMapLister: configmapInformer.Lister(),
ServiceLister: serviceinformer.Get(ctx).Lister(),
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
ConsumerGroupLister: consumerGroupInformer.Lister(),
EventPolicyLister: eventPolicyInformer.Lister(),
InternalsClient: consumergroupclient.Get(ctx),
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
Env: configs,
ConfigMapLister: configmapInformer.Lister(),
ServiceLister: serviceinformer.Get(ctx).Lister(),
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
ConsumerGroupLister: consumerGroupInformer.Lister(),
EventPolicyLister: eventPolicyInformer.Lister(),
InternalsClient: consumergroupclient.Get(ctx),
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

logger := logging.FromContext(ctx)
Expand Down
5 changes: 4 additions & 1 deletion control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,10 @@ func (r *Reconciler) reconcileSecret(ctx context.Context, expectedSecret *corev1
}

func (r *Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error {
selector := labels.SelectorFromSet(map[string]string{"app": scheduler.StatefulSetName})
selector := labels.SelectorFromSet(map[string]string{
"app": scheduler.StatefulSetName,
"app.kubernetes.io/kind": "kafka-dispatcher",
})
pods, err := r.PodLister.
Pods(r.SystemNamespace).
List(selector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ func TestReconcileKind(t *testing.T) {
Name: "Consumers in multiple pods, with pods pending and unknown phase",
Objects: []runtime.Object{
NewService(),
NewDispatcherPod("p1", PodLabel(kafkainternals.SourceStatefulSetName), PodPending()),
NewDispatcherPod("p2", PodLabel(kafkainternals.SourceStatefulSetName)),
NewDispatcherPod("p1", PodLabel("app", kafkainternals.SourceStatefulSetName), DispatcherLabel(), PodPending()),
NewDispatcherPod("p2", PodLabel("app", kafkainternals.SourceStatefulSetName), DispatcherLabel()),
NewConsumerGroup(
ConsumerGroupConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
Expand Down Expand Up @@ -1723,6 +1723,10 @@ func TestReconcileKind(t *testing.T) {

}

func DispatcherLabel() PodOption {
return PodLabel("app.kubernetes.io/kind", "kafka-dispatcher")
}

func TestReconcileKindNoAutoscaler(t *testing.T) {

tt := TableTest{
Expand Down
13 changes: 9 additions & 4 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
KafkaChannelScheduler: createKafkaScheduler(ctx, c, kafkainternals.ChannelStatefulSetName, dispatcherPodInformer),
}

clientPool := clientpool.Get(ctx)

r := &Reconciler{
SchedulerFunc: func(s string) (Scheduler, bool) { sched, ok := schedulers[strings.ToLower(s)]; return sched, ok },
ConsumerLister: consumer.Get(ctx).Lister(),
Expand All @@ -134,17 +132,24 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
PodLister: dispatcherPodInformer.Lister(),
KubeClient: kubeclient.Get(ctx),
NameGenerator: names.SimpleNameGenerator,
GetKafkaClient: clientPool.GetClient,
InitOffsetsFunc: offset.InitOffsets,
SystemNamespace: system.Namespace(),
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
KafkaFeatureFlags: config.DefaultFeaturesConfig(),
KedaClient: kedaclient.Get(ctx),
AutoscalerConfig: env.AutoscalerConfigMap,
DeleteConsumerGroupMetadataCounter: counter.NewExpiringCounter(ctx),
InitOffsetLatestInitialOffsetCache: prober.NewLocalExpiringCache[string, prober.Status, struct{}](ctx, 20*time.Minute),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
r.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
r.GetKafkaClient = clientpool.DisabledGetClient
} else {
r.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
r.GetKafkaClient = clientPool.GetClient
}

consumerInformer := consumer.Get(ctx)

consumerGroupInformer := consumergroup.Get(ctx)
Expand Down
16 changes: 10 additions & 6 deletions control-plane/pkg/reconciler/sink/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
configmapInformer := configmapinformer.Get(ctx)
eventPolicyInformer := eventpolicyinformer.Get(ctx)

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -72,10 +70,16 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
DataPlaneNamespace: configs.SystemNamespace,
ReceiverLabel: base.SinkReceiverLabel,
},
ConfigMapLister: configmapInformer.Lister(),
EventPolicyLister: eventPolicyInformer.Lister(),
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
Env: configs,
ConfigMapLister: configmapInformer.Lister(),
EventPolicyLister: eventPolicyInformer.Lister(),
Env: configs,
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

_, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx)
Expand Down
3 changes: 2 additions & 1 deletion control-plane/pkg/reconciler/testing/objects_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ func BrokerDispatcherPod(namespace string, annotations map[string]string) runtim
Namespace: namespace,
Annotations: annotations,
Labels: map[string]string{
"app": base.BrokerDispatcherLabel,
"app": base.BrokerDispatcherLabel,
"app.kubernetes.io/kind": "kafka-dispatcher",
},
},
Status: corev1.PodStatus{
Expand Down
3 changes: 2 additions & 1 deletion control-plane/pkg/reconciler/testing/objects_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ func ChannelDispatcherPod(namespace string, annotations map[string]string) runti
Namespace: namespace,
Annotations: annotations,
Labels: map[string]string{
"app": base.ChannelDispatcherLabel,
"app": base.ChannelDispatcherLabel,
"app.kubernetes.io/kind": "kafka-dispatcher",
},
},
Status: corev1.PodStatus{
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/testing/objects_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,12 +616,12 @@ func NewDispatcherPod(name string, options ...PodOption) *corev1.Pod {
return p
}

func PodLabel(value string) PodOption {
func PodLabel(key, value string) PodOption {
return func(pod *corev1.Pod) {
if pod.Labels == nil {
pod.Labels = make(map[string]string, 2)
}
pod.Labels["app"] = value
pod.Labels[key] = value
}
}

Expand Down
8 changes: 5 additions & 3 deletions control-plane/pkg/reconciler/testing/objects_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"
"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1"
sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/eventingtls/eventingtlstesting"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1"
sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base"
)
Expand Down Expand Up @@ -215,7 +216,8 @@ func SourceDispatcherPod(namespace string, annotations map[string]string) runtim
Namespace: namespace,
Annotations: annotations,
Labels: map[string]string{
"app": base.SourceDispatcherLabel,
"app": base.SourceDispatcherLabel,
"app.kubernetes.io/kind": "kafka-dispatcher",
},
},
Status: corev1.PodStatus{
Expand Down
Loading

0 comments on commit 1980050

Please sign in to comment.