Skip to content

Commit

Permalink
Pass PodLister as expected
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Sep 23, 2024
1 parent 8d28bc5 commit 7634ed8
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr
return cg.MarkScheduleConsumerFailed("Schedule", err)
}

placements, err := statefulSetScheduler.Schedule(cg)
placements, err := statefulSetScheduler.Schedule(ctx, cg)
if err != nil {
return cg.MarkScheduleConsumerFailed("Schedule", err)
}
Expand Down
18 changes: 11 additions & 7 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/storage/names"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
Expand Down Expand Up @@ -106,6 +107,9 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
logger.Panicf("unable to process required environment variables: %v", err)
}

dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr)
dispatcherPodLister := dispatcherPodInformer.Lister()

c := SchedulerConfig{
RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second,
Capacity: env.PodCapacity,
Expand All @@ -114,13 +118,11 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
}

schedulers := map[string]Scheduler{
KafkaSourceScheduler: createKafkaScheduler(ctx, c, kafkainternals.SourceStatefulSetName),
KafkaTriggerScheduler: createKafkaScheduler(ctx, c, kafkainternals.BrokerStatefulSetName),
KafkaChannelScheduler: createKafkaScheduler(ctx, c, kafkainternals.ChannelStatefulSetName),
KafkaSourceScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.SourceStatefulSetName),
KafkaTriggerScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.BrokerStatefulSetName),
KafkaChannelScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.ChannelStatefulSetName),
}

dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr)

r := &Reconciler{
SchedulerFunc: func(s string) (Scheduler, bool) { sched, ok := schedulers[strings.ToLower(s)]; return sched, ok },
ConsumerLister: consumer.Get(ctx).Lister(),
Expand Down Expand Up @@ -325,10 +327,11 @@ func enqueueConsumerGroupFromConsumer(enqueue func(name types.NamespacedName)) f
}
}

func createKafkaScheduler(ctx context.Context, c SchedulerConfig, ssName string) Scheduler {
func createKafkaScheduler(ctx context.Context, c SchedulerConfig, podLister corelisters.PodLister, ssName string) Scheduler {
lister := consumergroup.Get(ctx).Lister()
return createStatefulSetScheduler(
ctx,
podLister,
SchedulerConfig{
StatefulSetName: ssName,
RefreshPeriod: c.RefreshPeriod,
Expand Down Expand Up @@ -370,7 +373,7 @@ func getSelectorLabel(ssName string) map[string]string {
return selectorLabel
}

func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister scheduler.VPodLister) Scheduler {
func createStatefulSetScheduler(ctx context.Context, podLister corelisters.PodLister, c SchedulerConfig, lister scheduler.VPodLister) Scheduler {
ss, _ := statefulsetscheduler.New(ctx, &statefulsetscheduler.Config{
StatefulSetNamespace: system.Namespace(),
StatefulSetName: c.StatefulSetName,
Expand All @@ -383,6 +386,7 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s
Evictor: newEvictor(ctx, zap.String("kafka.eventing.knative.dev/component", "evictor")).evict,
VPodLister: lister,
NodeLister: nodeinformer.Get(ctx).Lister(),
PodLister: podLister.Pods(system.Namespace()),
})

return Scheduler{
Expand Down

0 comments on commit 7634ed8

Please sign in to comment.