Skip to content

Commit

Permalink
Update ConsumerGroup status on finalization for better error reporing (
Browse files Browse the repository at this point in the history
…#3415) (#3417)

This will make sure that we report finalization issues up to the
KafkaSource resource.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Oct 23, 2023
1 parent edf9426 commit d186c99
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func (cg *ConsumerGroup) MarkInitializeOffsetFailed(reason string, err error) er
return err
}

func (cg *ConsumerGroup) MarkDeleteOffsetFailed(reason string, err error) error {
err = fmt.Errorf("failed to delete consumer group offset: %w", err)
cg.GetConditionSet().Manage(cg.GetStatus()).MarkFalse(ConditionConsumerGroupConsumers, reason, err.Error())
return err
}

func (cg *ConsumerGroup) MarkScheduleSucceeded() {
cg.GetConditionSet().Manage(cg.GetStatus()).MarkTrue(ConditionConsumerGroupConsumersScheduled)
}
Expand Down
6 changes: 2 additions & 4 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,8 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume
err := r.schedule(ctx, cg) //de-schedule placements

if err != nil {
cg.Status.Placements = nil

// return an error to 1. update the status. 2. not clear the finalizer
return errors.New("placement list was not empty")
return cg.MarkScheduleConsumerFailed("Deschedule", fmt.Errorf("failed to unschedule consumer group: %w", err))
}

// Get consumers associated with the ConsumerGroup.
Expand All @@ -177,7 +175,7 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume
if err := r.deleteConsumerGroupMetadata(ctx, cg); err != nil {
// We retry a few times to delete Consumer group metadata from Kafka before giving up.
if v := r.DeleteConsumerGroupMetadataCounter.Inc(string(cg.GetUID())); v <= 5 {
return err
return cg.MarkDeleteOffsetFailed("DeleteConsumerGroupOffset", fmt.Errorf("%w (retry num %d)", err, v))
}
r.DeleteConsumerGroupMetadataCounter.Del(string(cg.GetUID()))
}
Expand Down
21 changes: 20 additions & 1 deletion control-plane/pkg/reconciler/consumergroup/consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2164,9 +2164,28 @@ func TestFinalizeKind(t *testing.T) {
Eventf(
corev1.EventTypeWarning,
"InternalError",
"unable to delete the consumer group my.group.id: "+sarama.ErrClusterAuthorizationFailed.Error(),
"failed to delete consumer group offset: unable to delete the consumer group my.group.id: "+sarama.ErrClusterAuthorizationFailed.Error() + " (retry num 1)",
),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: func() runtime.Object {
cg := NewDeletedConsumeGroup(
ConsumerGroupOwnerRef(SourceAsOwnerReference()),
ConsumerGroupConsumerSpec(NewConsumerSpec(
ConsumerConfigs(
ConsumerBootstrapServersConfig(ChannelBootstrapServers),
ConsumerGroupIdConfig("my.group.id"),
),
)),
)

_ = cg.MarkDeleteOffsetFailed("DeleteConsumerGroupOffset", fmt.Errorf("unable to delete the consumer group my.group.id: kafka server: The client is not authorized to send this request type (retry num 1)"))

return cg
}(),
},
},
SkipNamespaceValidation: true, // WantCreates compare the source namespace with configmap namespace, so skip it
},
}
Expand Down
5 changes: 2 additions & 3 deletions control-plane/pkg/reconciler/testing/objects_consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand Down Expand Up @@ -219,7 +218,7 @@ func ConsumerGroupOwnerRef(reference metav1.OwnerReference) ConsumerGroupOption
}
}

func NewDeletedConsumeGroup(opts ...ConsumerGroupOption) runtime.Object {
func NewDeletedConsumeGroup(opts ...ConsumerGroupOption) *kafkainternals.ConsumerGroup {
return NewConsumerGroup(
append(
opts,
Expand All @@ -229,7 +228,7 @@ func NewDeletedConsumeGroup(opts ...ConsumerGroupOption) runtime.Object {
}

func WithDeletedTimeStampConsumeGroup(cg *kafkainternals.ConsumerGroup) {
cg.GetObjectMeta().SetDeletionTimestamp(&metav1.Time{Time: time.Now()})
cg.GetObjectMeta().SetDeletionTimestamp(&metav1.Time{Time: time.Date(2023, time.October, 23, 1, 0, 0, 0, &time.Location{})})
}

func WithConfigmapOwnerRef(ownerref *metav1.OwnerReference) reconcilertesting.ConfigMapOption {
Expand Down

0 comments on commit d186c99

Please sign in to comment.