Skip to content

Commit

Permalink
Fix delete namespace with a namespaced broker stuck on finalizer (#2896
Browse files Browse the repository at this point in the history
…) (#2897)

When the system namespace is deleted while we're running there is no point in
trying to delete the resource from the ConfigMap since the entire ConfigMap
is gone.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi authored Jan 16, 2023
1 parent b691ec9 commit bca0b90
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 35 deletions.
3 changes: 3 additions & 0 deletions control-plane/pkg/prober/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
StatusReady Status = iota
// StatusUnknown signals that a given object is not ready and its state is unknown.
StatusUnknown
// StatusUnknownErr signals that a given object is not ready and its state is unknown due to
// networking problems between control plane and data plane.
StatusUnknownErr
// StatusNotReady signals that a given object is not ready.
StatusNotReady
)
Expand Down
2 changes: 1 addition & 1 deletion control-plane/pkg/prober/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func probe(ctx context.Context, client httpClient, logger *zap.Logger, address s
response, err := client.Do(r)
if err != nil {
logger.Error("Failed probe", zap.Error(err))
return StatusUnknown
return StatusUnknownErr
}

if response.StatusCode != http.StatusOK {
Expand Down
80 changes: 48 additions & 32 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,37 +317,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, broker *eventing.Broker)
func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker) reconciler.Event {
logger := kafkalogging.CreateFinalizeMethodLogger(ctx, broker)

// Get contract config map.
contractConfigMap, err := r.GetOrCreateDataPlaneConfigMap(ctx)
if err != nil {
return fmt.Errorf("failed to get contract config map %s: %w", r.DataPlaneConfigMapAsString(), err)
}

logger.Debug("Got contract config map")

// Get contract data.
ct, err := r.GetDataPlaneConfigMapData(logger, contractConfigMap)
if err != nil {
return fmt.Errorf("failed to get contract: %w", err)
}

logger.Debug("Got contract data from config map", zap.Any(base.ContractLogKey, ct))

if err := r.DeleteResource(ctx, logger, broker.GetUID(), ct, contractConfigMap); err != nil {
return err
}

// We update receiver and dispatcher pods annotation regardless of our contract changed or not due to the fact
// that in a previous reconciliation we might have failed to update one of our data plane pod annotation, so we want
// to update anyway remaining annotations with the contract generation that was saved in the CM.
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}
// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.deleteResourceFromContractConfigMap(ctx, logger, broker); err != nil {
return err
}

Expand All @@ -368,7 +338,8 @@ func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker)
Name: broker.GetName(),
},
}
if status := r.Prober.Probe(ctx, proberAddressable, prober.StatusNotReady); status != prober.StatusNotReady {
status := r.Prober.Probe(ctx, proberAddressable, prober.StatusNotReady)
if status != prober.StatusNotReady && status != prober.StatusUnknownErr {
// Return a requeueKeyError that doesn't generate an event and it re-queues the object
// for a new reconciliation.
return controller.NewRequeueAfter(5 * time.Second)
Expand Down Expand Up @@ -444,6 +415,51 @@ func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker)
return nil
}

func (r *Reconciler) deleteResourceFromContractConfigMap(ctx context.Context, logger *zap.Logger, broker *eventing.Broker) error {
// Get contract config map.
contractConfigMap, err := r.GetOrCreateDataPlaneConfigMap(ctx)
// Handles https://github.com/knative-sandbox/eventing-kafka-broker/issues/2893
// When the system namespace is deleted while we're running there is no point in
// trying to delete the resource from the ConfigMap since the entire ConfigMap
// is gone.
if apierrors.IsForbidden(err) {
return nil
}
if err != nil {
return fmt.Errorf("failed to get contract config map %s: %w", r.DataPlaneConfigMapAsString(), err)
}

logger.Debug("Got contract config map")

// Get contract data.
ct, err := r.GetDataPlaneConfigMapData(logger, contractConfigMap)
if err != nil {
return fmt.Errorf("failed to get contract: %w", err)
}

logger.Debug("Got contract data from config map", zap.Any(base.ContractLogKey, ct))

if err := r.DeleteResource(ctx, logger, broker.GetUID(), ct, contractConfigMap); err != nil {
return err
}

// We update receiver and dispatcher pods annotation regardless of our contract changed or not due to the fact
// that in a previous reconciliation we might have failed to update one of our data plane pod annotation, so we want
// to update anyway remaining annotations with the contract generation that was saved in the CM.
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}
// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

return nil
}

func (r *Reconciler) finalizeNonExternalBrokerTopic(broker *eventing.Broker, securityOption kafka.ConfigOption, topicConfig *kafka.TopicConfig, logger *zap.Logger) reconciler.Event {
saramaConfig, err := kafka.GetSaramaConfig(securityOption)
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions test/e2e_new/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"

Expand Down Expand Up @@ -115,3 +116,23 @@ func TestBrokerCannotReachKafkaCluster(t *testing.T) {

env.Test(ctx, t, features.BrokerCannotReachKafkaCluster())
}

func TestNamespacedBrokerNamespaceDeletion(t *testing.T) {

name := "broker"
namespace := feature.MakeRandomK8sName("test-namespaced-broker")

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.WithPollTimings(PollInterval, PollTimeout),
environment.WithTestLogger(t),
environment.InNamespace(namespace),
)

env.Test(ctx, t, features.SetupNamespace(namespace))
env.Test(ctx, t, features.SetupNamespacedBroker(name))
env.Test(ctx, t, features.CleanupNamespace(namespace))
}
22 changes: 20 additions & 2 deletions test/e2e_new/features/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package features
import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/reconciler-test/pkg/feature"
)
Expand Down Expand Up @@ -50,13 +52,29 @@ func SetupNamespace(name string) *feature.Feature {
func CleanupNamespace(name string) *feature.Feature {
f := feature.NewFeatureNamed("delete namespace")

f.Setup(fmt.Sprintf("install namespace %s", name), func(ctx context.Context, t feature.T) {
err := kubeclient.Get(ctx).CoreV1().Namespaces().Delete(ctx, name, metav1.DeleteOptions{})
f.Setup(fmt.Sprintf("delete namespace %s", name), func(ctx context.Context, t feature.T) {
pp := metav1.DeletePropagationForeground
err := kubeclient.Get(ctx).CoreV1().Namespaces().Delete(ctx, name, metav1.DeleteOptions{
PropagationPolicy: &pp,
})
if err != nil && !apierrors.IsNotFound(err) {
t.Fatal(err)
}

})

f.Assert(fmt.Sprintf("wait for namespace %s to be deleted", name), func(ctx context.Context, t feature.T) {
err := wait.PollImmediate(100*time.Millisecond, time.Minute, func() (done bool, err error) {
_, err = kubeclient.Get(ctx).CoreV1().Namespaces().Get(ctx, name, metav1.GetOptions{})
if err != nil && apierrors.IsNotFound(err) {
return true, nil
}
return false, err
})
if err != nil {
t.Errorf("failed while waiting for namespace %s to be deleted %v", name, err)
}
})

return f
}
48 changes: 48 additions & 0 deletions test/e2e_new/features/namespaced_broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2021 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package features

import (
"fmt"

"knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/reconciler-test/pkg/feature"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
brokerconfigmap "knative.dev/eventing-kafka-broker/test/e2e_new/resources/configmap/broker"
testpkg "knative.dev/eventing-kafka-broker/test/pkg"
)

func SetupNamespacedBroker(name string) *feature.Feature {
f := feature.NewFeatureNamed("setup namespaced broker")

f.Setup("Create broker config", brokerconfigmap.Install(
"kafka-broker-config",
brokerconfigmap.WithBootstrapServer(testpkg.BootstrapServersPlaintext),
brokerconfigmap.WithNumPartitions(1),
brokerconfigmap.WithReplicationFactor(1),
))
f.Setup(fmt.Sprintf("install broker %q", name), broker.Install(
name,
broker.WithBrokerClass(kafka.NamespacedBrokerClass),
broker.WithConfig("kafka-broker-config"),
))
f.Setup("Broker is ready", broker.IsReady(name))
f.Setup("Broker is addressable", broker.IsAddressable(name))

return f
}

0 comments on commit bca0b90

Please sign in to comment.