Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.14] Upgrade knative.dev/eventing to latest 1.14 #4113

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr
return cg.MarkScheduleConsumerFailed("Schedule", err)
}

placements, err := statefulSetScheduler.Schedule(cg)
placements, err := statefulSetScheduler.Schedule(ctx, cg)
if err != nil {
return cg.MarkScheduleConsumerFailed("Schedule", err)
}
Expand Down
48 changes: 21 additions & 27 deletions control-plane/pkg/reconciler/consumergroup/consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,6 @@ import (
kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client/fake"
)

type SchedulerFunc func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error)

func (f SchedulerFunc) Schedule(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return f(vpod)
}

const (
testSchedulerKey = "scheduler"
noTestScheduler = "no-scheduler"
Expand Down Expand Up @@ -102,7 +96,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -189,7 +183,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -307,7 +301,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -402,7 +396,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -528,7 +522,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -702,7 +696,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -877,7 +871,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1034,7 +1028,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
}, nil
Expand Down Expand Up @@ -1121,7 +1115,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1208,7 +1202,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1303,7 +1297,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 2},
Expand Down Expand Up @@ -1426,7 +1420,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 2},
Expand Down Expand Up @@ -1533,7 +1527,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1630,7 +1624,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, io.EOF
}),
},
Expand Down Expand Up @@ -1762,7 +1756,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1926,7 +1920,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
},
Expand Down Expand Up @@ -1995,7 +1989,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
},
Expand Down Expand Up @@ -2121,7 +2115,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
},
Expand Down Expand Up @@ -2167,7 +2161,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrUnknownTopicOrPartition,
Expand Down Expand Up @@ -2214,7 +2208,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrGroupIDNotFound,
Expand Down Expand Up @@ -2262,7 +2256,7 @@ func TestFinalizeKind(t *testing.T) {
WantErr: true,
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrClusterAuthorizationFailed,
Expand Down
17 changes: 10 additions & 7 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/storage/names"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
Expand Down Expand Up @@ -106,6 +107,9 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
logger.Panicf("unable to process required environment variables: %v", err)
}

dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr)
dispatcherPodLister := dispatcherPodInformer.Lister()

c := SchedulerConfig{
RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second,
Capacity: env.PodCapacity,
Expand All @@ -114,13 +118,10 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
}

schedulers := map[string]Scheduler{
KafkaSourceScheduler: createKafkaScheduler(ctx, c, kafkainternals.SourceStatefulSetName),
KafkaTriggerScheduler: createKafkaScheduler(ctx, c, kafkainternals.BrokerStatefulSetName),
//KafkaChannelScheduler: createKafkaScheduler(ctx, c, kafkainternals.ChannelStatefulSetName), //To be added with channel/v2 reconciler version only
KafkaSourceScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.SourceStatefulSetName),
KafkaTriggerScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.BrokerStatefulSetName),
}

dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr)

r := &Reconciler{
SchedulerFunc: func(s string) (Scheduler, bool) { sched, ok := schedulers[strings.ToLower(s)]; return sched, ok },
ConsumerLister: consumer.Get(ctx).Lister(),
Expand Down Expand Up @@ -299,10 +300,11 @@ func enqueueConsumerGroupFromConsumer(enqueue func(name types.NamespacedName)) f
}
}

func createKafkaScheduler(ctx context.Context, c SchedulerConfig, ssName string) Scheduler {
func createKafkaScheduler(ctx context.Context, c SchedulerConfig, podLister corelisters.PodLister, ssName string) Scheduler {
lister := consumergroup.Get(ctx).Lister()
return createStatefulSetScheduler(
ctx,
podLister,
SchedulerConfig{
StatefulSetName: ssName,
RefreshPeriod: c.RefreshPeriod,
Expand Down Expand Up @@ -344,7 +346,7 @@ func getSelectorLabel(ssName string) map[string]string {
return selectorLabel
}

func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister scheduler.VPodLister) Scheduler {
func createStatefulSetScheduler(ctx context.Context, podLister corelisters.PodLister, c SchedulerConfig, lister scheduler.VPodLister) Scheduler {
ss, _ := statefulsetscheduler.New(ctx, &statefulsetscheduler.Config{
StatefulSetNamespace: system.Namespace(),
StatefulSetName: c.StatefulSetName,
Expand All @@ -357,6 +359,7 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s
Evictor: newEvictor(ctx, zap.String("kafka.eventing.knative.dev/component", "evictor")).evict,
VPodLister: lister,
NodeLister: nodeinformer.Get(ctx).Lister(),
PodLister: podLister.Pods(system.Namespace()),
})

return Scheduler{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
k8s.io/apiserver v0.29.2
k8s.io/client-go v0.29.2
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
knative.dev/eventing v0.41.0
knative.dev/eventing v0.41.7-0.20240923180940-09cb6334226f
knative.dev/hack v0.0.0-20240404013450-1133b37da8d7
knative.dev/pkg v0.0.0-20240416145024-0f34a8815650
knative.dev/reconciler-test v0.0.0-20240417065737-ca905cbb09a9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1270,8 +1270,8 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
knative.dev/eventing v0.41.0 h1:e38nejJiwEpFQI5JgcaT8JRUGZsQn05h1vBWSwNkroY=
knative.dev/eventing v0.41.0/go.mod h1:/DjKZGRcZtBx8FOSvOy3mhxzm1Wem7H3aofb6kHq/68=
knative.dev/eventing v0.41.7-0.20240923180940-09cb6334226f h1:OEpj00D31CAlJTm8xfimpK1haCrDIzVP+3axlNgY4bE=
knative.dev/eventing v0.41.7-0.20240923180940-09cb6334226f/go.mod h1:JjZn90agmsqMIQOqZ4oToWRrcAIvlqn7hlharxcTv/w=
knative.dev/hack v0.0.0-20240404013450-1133b37da8d7 h1:fkWYWvdHm1mVHevKW2vVJnZtxH0NzOlux8imesweKwE=
knative.dev/hack v0.0.0-20240404013450-1133b37da8d7/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/pkg v0.0.0-20240416145024-0f34a8815650 h1:m2ahFUO0L2VrgGDYdyOUFdE6xBd3pLXAJozLJwqLRQM=
Expand Down
4 changes: 3 additions & 1 deletion vendor/knative.dev/eventing/pkg/apis/feature/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package feature

import (
"fmt"
"log"
"strings"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -148,7 +149,8 @@ func NewFlagsConfigFromMap(data map[string]string) (Flags, error) {
} else if strings.Contains(k, NodeSelectorLabel) {
flags[sanitizedKey] = Flag(v)
} else {
return flags, fmt.Errorf("cannot parse the feature flag '%s' = '%s'", k, v)
flags[k] = Flag(v)
log.Printf("Warning: unknown feature flag value %q=%q\n", k, v)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@ const (

// ContainerSourceConditionReceiveAdapterReady has status True when the ContainerSource's ReceiveAdapter is ready.
ContainerSourceConditionReceiveAdapterReady apis.ConditionType = "ReceiveAdapterReady"

ContainerConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated"
)

var containerCondSet = apis.NewLivingConditionSet(
ContainerSourceConditionSinkBindingReady,
ContainerSourceConditionReceiveAdapterReady,
ContainerConditionOIDCIdentityCreated,
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
Expand Down Expand Up @@ -66,23 +63,7 @@ func (s *ContainerSourceStatus) InitializeConditions() {
containerCondSet.Manage(s).InitializeConditions()
}

func (s *ContainerSourceStatus) MarkOIDCIdentityCreatedSucceeded() {
containerCondSet.Manage(s).MarkTrue(ContainerConditionOIDCIdentityCreated)
}

func (s *ContainerSourceStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, messageFormat string, messageA ...interface{}) {
containerCondSet.Manage(s).MarkTrueWithReason(ContainerConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (s *ContainerSourceStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) {
containerCondSet.Manage(s).MarkFalse(ContainerConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (s *ContainerSourceStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) {
containerCondSet.Manage(s).MarkUnknown(ContainerConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

// PropagateSinkBindingStatus uses the availability of the provided Deployment to determine if
// PropagateSinkBindingStatus uses the SinkBinding to determine if
// ContainerSourceConditionSinkBindingReady should be marked as true, false or unknown.
func (s *ContainerSourceStatus) PropagateSinkBindingStatus(status *SinkBindingStatus) {
// Do not copy conditions nor observedGeneration
Expand All @@ -105,6 +86,9 @@ func (s *ContainerSourceStatus) PropagateSinkBindingStatus(status *SinkBindingSt
default:
containerCondSet.Manage(s).MarkUnknown(ContainerSourceConditionSinkBindingReady, cond.Reason, cond.Message)
}

// Propagate SinkBindings AuthStatus to containersources AuthStatus
s.Auth = status.Auth
}

// PropagateReceiveAdapterStatus uses the availability of the provided Deployment to determine if
Expand Down
Loading
Loading