From 0b5d0a11a6339e85ac6a7f469407049961d11b48 Mon Sep 17 00:00:00 2001 From: Thibault Gagnaux Date: Mon, 22 Sep 2025 20:38:19 +0200 Subject: [PATCH] fix: handle deleted namespaces gracefully during sync Implement automatic detection and removal of deleted namespaces to prevent infinite sync failure loops when namespaces are deleted without removing the managed-by label first. Signed-off-by: Thibault Gagnaux --- pkg/cache/cluster.go | 54 +++++++++++- pkg/cache/cluster_test.go | 174 +++++++++++++++++++++++++++++++++++--- 2 files changed, 215 insertions(+), 13 deletions(-) diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 38f1e6016..bd2c0d912 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -247,6 +247,24 @@ type clusterCache struct { respectRBAC int } +func (c *clusterCache) namespaceExists(ctx context.Context, nsClient dynamic.NamespaceableResourceInterface, namespace string, deletedNamespaces *sync.Map) bool { + if namespace == "" { + return true // Cluster-wide operations don't need namespace validation + } + + _, err := nsClient.Get(ctx, namespace, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + c.log.Info(fmt.Sprintf("Namespace '%s' no longer exists, marking for removal", namespace)) + deletedNamespaces.Store(namespace, true) + return false + } + c.log.V(1).Info(fmt.Sprintf("Failed to get namespace '%s' existence: %v", namespace, err)) + } + + return true +} + type clusterCacheSync struct { // When using this struct: // 1) 'lock' mutex should be acquired when reading/writing from fields of this struct. @@ -533,6 +551,10 @@ func (c *clusterCache) startMissingWatches() error { return fmt.Errorf("failed to create clientset: %w", err) } namespacedResources := make(map[schema.GroupKind]bool) + + // For watch startup, we don't update namespaces list, so use empty map + deletedNamespaces := &sync.Map{} + for i := range apis { api := apis[i] namespacedResources[api.GroupKind] = api.Meta.Namespaced @@ -540,7 +562,7 @@ func (c *clusterCache) startMissingWatches() error { ctx, cancel := context.WithCancel(context.Background()) c.apisMeta[api.GroupKind] = &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel} - err := c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error { + err := c.processApi(client, api, deletedNamespaces, func(resClient dynamic.ResourceInterface, ns string) error { resourceVersion, err := c.loadInitialState(ctx, api, resClient, ns, false) // don't lock here, we are already in a lock before startMissingWatches is called inside watchEvents if err != nil && c.isRestrictedResource(err) { keep := false @@ -786,7 +808,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo // processApi processes all the resources for a given API. First we construct an API client for the given API. Then we // call the callback. If we're managing the whole cluster, we call the callback with the client and an empty namespace. // If we're managing specific namespaces, we call the callback for each namespace. -func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, callback func(resClient dynamic.ResourceInterface, ns string) error) error { +func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, deletedNamespaces *sync.Map, callback func(resClient dynamic.ResourceInterface, ns string) error) error { resClient := client.Resource(api.GroupVersionResource) switch { // if manage whole cluster or resource is cluster level and cluster resources enabled @@ -794,7 +816,18 @@ func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResource return callback(resClient, "") // if manage some namespaces and resource is namespaced case len(c.namespaces) != 0 && api.Meta.Namespaced: + nsClient := client.Resource(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }) + for _, ns := range c.namespaces { + if !c.namespaceExists(context.Background(), nsClient, ns, deletedNamespaces) { + // Namespace was deleted, skip it (deletedNamespaces map tracks it for later cleanup) + continue + } + err := callback(resClient.Namespace(ns), ns) if err != nil { return err @@ -882,6 +915,7 @@ func (c *clusterCache) sync() error { c.resources = make(map[kube.ResourceKey]*Resource) c.namespacedResources = make(map[schema.GroupKind]bool) config := c.config + version, err := c.kubectl.GetServerVersion(config) if err != nil { return fmt.Errorf("failed to get server version: %w", err) @@ -921,6 +955,9 @@ func (c *clusterCache) sync() error { go c.processEvents() } + // Track deleted namespaces found during parallel processing + deletedNamespaces := &sync.Map{} + // Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields. lock := sync.Mutex{} err = kube.RunAllAsync(len(apis), func(i int) error { @@ -933,7 +970,7 @@ func (c *clusterCache) sync() error { c.namespacedResources[api.GroupKind] = api.Meta.Namespaced lock.Unlock() - return c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error { + return c.processApi(client, api, deletedNamespaces, func(resClient dynamic.ResourceInterface, ns string) error { resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error { return listPager.EachListItem(context.Background(), metav1.ListOptions{}, func(obj runtime.Object) error { if un, ok := obj.(*unstructured.Unstructured); !ok { @@ -979,6 +1016,17 @@ func (c *clusterCache) sync() error { return fmt.Errorf("failed to sync cluster %s: %w", c.config.Host, err) } + // After parallel processing completes, update namespace list by removing deleted ones + var validNamespaces []string + for _, ns := range c.namespaces { + if _, deleted := deletedNamespaces.Load(ns); deleted { + c.log.Info(fmt.Sprintf("Namespace '%s' no longer exists, removing from cluster cache", ns)) + continue + } + validNamespaces = append(validNamespaces, ns) + } + c.namespaces = validNamespaces + c.log.Info("Cluster successfully synced") return nil } diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 7bac78043..28dbf8206 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -19,6 +19,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -245,6 +246,16 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { }, } + defaultNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + tests := []struct { name string cluster *clusterCache @@ -254,7 +265,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { }{ { name: "STSTemplateNameNotMatching", - cluster: newCluster(t, sts), + cluster: newCluster(t, sts, defaultNs), pvc: &corev1.PersistentVolumeClaim{ TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, @@ -263,7 +274,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { }, { name: "MatchingSTSExists", - cluster: newCluster(t, sts), + cluster: newCluster(t, sts, defaultNs), pvc: &corev1.PersistentVolumeClaim{ TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"}, @@ -272,7 +283,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { }, { name: "STSTemplateNameNotMatchingWithBatchProcessing", - cluster: newClusterWithOptions(t, opts, sts), + cluster: newClusterWithOptions(t, opts, sts, defaultNs), pvc: &corev1.PersistentVolumeClaim{ TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, @@ -281,7 +292,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { }, { name: "MatchingSTSExistsWithBatchProcessing", - cluster: newClusterWithOptions(t, opts, sts), + cluster: newClusterWithOptions(t, opts, sts, defaultNs), pvc: &corev1.PersistentVolumeClaim{ TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"}, @@ -334,7 +345,26 @@ func TestEnsureSyncedSingleNamespace(t *testing.T) { }, } - cluster := newCluster(t, obj1, obj2) + ns1 := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default1", + }, + } + ns2 := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default2", + }, + } + + cluster := newCluster(t, obj1, obj2, ns1, ns2) cluster.namespaces = []string{"default1"} err := cluster.EnsureSynced() require.NoError(t, err) @@ -420,7 +450,26 @@ metadata: } func TestGetManagedLiveObjsNamespacedModeClusterLevelResource(t *testing.T) { - cluster := newCluster(t, testPod1(), testRS(), testDeploy()) + defaultNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + productionNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "production", + }, + } + + cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, productionNs) cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) { return nil, true })) @@ -445,7 +494,26 @@ metadata: } func TestGetManagedLiveObjsNamespacedModeClusterLevelResource_ClusterResourceEnabled(t *testing.T) { - cluster := newCluster(t, testPod1(), testRS(), testDeploy()) + defaultNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + productionNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "production", + }, + } + + cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, productionNs) cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) { return nil, true })) @@ -514,7 +582,26 @@ metadata: } func TestGetManagedLiveObjsValidNamespace(t *testing.T) { - cluster := newCluster(t, testPod1(), testRS(), testDeploy()) + defaultNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + productionNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "production", + }, + } + + cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, productionNs) cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) { return nil, true })) @@ -542,7 +629,26 @@ metadata: } func TestGetManagedLiveObjsInvalidNamespace(t *testing.T) { - cluster := newCluster(t, testPod1(), testRS(), testDeploy()) + defaultNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + developNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "develop", + }, + } + + cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, developNs) cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) { return nil, true })) @@ -595,7 +701,18 @@ func TestGetManagedLiveObjsFailedConversion(t *testing.T) { t.Run(testCaseCopy.name, func(t *testing.T) { err := apiextensions.AddToScheme(scheme.Scheme) require.NoError(t, err) - cluster := newCluster(t, testCRD(), testCronTab()). + + defaultNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + + cluster := newCluster(t, testCRD(), testCronTab(), defaultNs). WithAPIResources([]kube.APIResourceInfo{ { GroupKind: schema.GroupKind{Group: cronTabGroup, Kind: "CronTab"}, @@ -1292,3 +1409,40 @@ func BenchmarkIterateHierarchyV2(b *testing.B) { }) } } + +func TestSyncWithDeletedNamespace(t *testing.T) { + deletedNamespace := "deleted-namespace" + validNamespace := "default" + pod := testPod1() + pod.SetNamespace(validNamespace) + validNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: validNamespace, + }, + } + cluster := newCluster(t, pod, validNs) + cluster.namespaces = []string{validNamespace, deletedNamespace} + client := cluster.kubectl.(*kubetest.MockKubectlCmd).DynamicClient.(*fake.FakeDynamicClient) + + // Return "not found" error when getting the deleted namespace during validation + client.PrependReactor("get", "namespaces", func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + getAction := action.(testcore.GetAction) + if getAction.GetName() == deletedNamespace { + // Simulate namespace not found (deleted) + return true, nil, apierrors.NewNotFound( + schema.GroupResource{Group: "", Resource: "namespaces"}, + deletedNamespace) + } + return false, nil, nil + }) + + err := cluster.sync() + + require.NoError(t, err) + assert.NotContains(t, cluster.namespaces, deletedNamespace) + assert.Contains(t, cluster.namespaces, validNamespace) +}