diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_lifecycle.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_lifecycle.go index c5e598b45b..aa8d475160 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_lifecycle.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_lifecycle.go @@ -83,6 +83,12 @@ func (cg *ConsumerGroup) MarkInitializeOffsetFailed(reason string, err error) er return err } +func (cg *ConsumerGroup) MarkDeleteOffsetFailed(reason string, err error) error { + err = fmt.Errorf("failed to delete consumer group offset: %w", err) + cg.GetConditionSet().Manage(cg.GetStatus()).MarkFalse(ConditionConsumerGroupConsumers, reason, err.Error()) + return err +} + func (cg *ConsumerGroup) MarkScheduleSucceeded() { cg.GetConditionSet().Manage(cg.GetStatus()).MarkTrue(ConditionConsumerGroupConsumersScheduled) } diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index f7cef59a57..72295d97f7 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -156,10 +156,8 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume err := r.schedule(ctx, cg) //de-schedule placements if err != nil { - cg.Status.Placements = nil - // return an error to 1. update the status. 2. not clear the finalizer - return errors.New("placement list was not empty") + return cg.MarkScheduleConsumerFailed("Deschedule", fmt.Errorf("failed to unschedule consumer group: %w", err)) } // Get consumers associated with the ConsumerGroup. @@ -177,7 +175,7 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume if err := r.deleteConsumerGroupMetadata(ctx, cg); err != nil { // We retry a few times to delete Consumer group metadata from Kafka before giving up. if v := r.DeleteConsumerGroupMetadataCounter.Inc(string(cg.GetUID())); v <= 5 { - return err + return cg.MarkDeleteOffsetFailed("DeleteConsumerGroupOffset", fmt.Errorf("%w (retry num %d)", err, v)) } r.DeleteConsumerGroupMetadataCounter.Del(string(cg.GetUID())) } diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go index b49c9b120f..b2ac6250cd 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go @@ -2164,9 +2164,28 @@ func TestFinalizeKind(t *testing.T) { Eventf( corev1.EventTypeWarning, "InternalError", - "unable to delete the consumer group my.group.id: "+sarama.ErrClusterAuthorizationFailed.Error(), + "failed to delete consumer group offset: unable to delete the consumer group my.group.id: "+sarama.ErrClusterAuthorizationFailed.Error() + " (retry num 1)", ), }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: func() runtime.Object { + cg := NewDeletedConsumeGroup( + ConsumerGroupOwnerRef(SourceAsOwnerReference()), + ConsumerGroupConsumerSpec(NewConsumerSpec( + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + )), + ) + + _ = cg.MarkDeleteOffsetFailed("DeleteConsumerGroupOffset", fmt.Errorf("unable to delete the consumer group my.group.id: kafka server: The client is not authorized to send this request type (retry num 1)")) + + return cg + }(), + }, + }, SkipNamespaceValidation: true, // WantCreates compare the source namespace with configmap namespace, so skip it }, } diff --git a/control-plane/pkg/reconciler/testing/objects_consumergroup.go b/control-plane/pkg/reconciler/testing/objects_consumergroup.go index e354b2de59..3942bd7915 100644 --- a/control-plane/pkg/reconciler/testing/objects_consumergroup.go +++ b/control-plane/pkg/reconciler/testing/objects_consumergroup.go @@ -23,7 +23,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/pointer" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -219,7 +218,7 @@ func ConsumerGroupOwnerRef(reference metav1.OwnerReference) ConsumerGroupOption } } -func NewDeletedConsumeGroup(opts ...ConsumerGroupOption) runtime.Object { +func NewDeletedConsumeGroup(opts ...ConsumerGroupOption) *kafkainternals.ConsumerGroup { return NewConsumerGroup( append( opts, @@ -229,7 +228,7 @@ func NewDeletedConsumeGroup(opts ...ConsumerGroupOption) runtime.Object { } func WithDeletedTimeStampConsumeGroup(cg *kafkainternals.ConsumerGroup) { - cg.GetObjectMeta().SetDeletionTimestamp(&metav1.Time{Time: time.Now()}) + cg.GetObjectMeta().SetDeletionTimestamp(&metav1.Time{Time: time.Date(2023, time.October, 23, 1, 0, 0, 0, &time.Location{})}) } func WithConfigmapOwnerRef(ownerref *metav1.OwnerReference) reconcilertesting.ConfigMapOption {