Skip to content
This repository was archived by the owner on Sep 24, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,24 @@ type clusterCache struct {
respectRBAC int
}

func (c *clusterCache) namespaceExists(ctx context.Context, nsClient dynamic.NamespaceableResourceInterface, namespace string, deletedNamespaces *sync.Map) bool {
if namespace == "" {
return true // Cluster-wide operations don't need namespace validation
}

_, err := nsClient.Get(ctx, namespace, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
c.log.Info(fmt.Sprintf("Namespace '%s' no longer exists, marking for removal", namespace))
deletedNamespaces.Store(namespace, true)
return false
}
c.log.V(1).Info(fmt.Sprintf("Failed to get namespace '%s' existence: %v", namespace, err))
}

return true
}

type clusterCacheSync struct {
// When using this struct:
// 1) 'lock' mutex should be acquired when reading/writing from fields of this struct.
Expand Down Expand Up @@ -533,14 +551,18 @@ func (c *clusterCache) startMissingWatches() error {
return fmt.Errorf("failed to create clientset: %w", err)
}
namespacedResources := make(map[schema.GroupKind]bool)

// For watch startup, we don't update namespaces list, so use empty map
deletedNamespaces := &sync.Map{}

for i := range apis {
api := apis[i]
namespacedResources[api.GroupKind] = api.Meta.Namespaced
if _, ok := c.apisMeta[api.GroupKind]; !ok {
ctx, cancel := context.WithCancel(context.Background())
c.apisMeta[api.GroupKind] = &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel}

err := c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
err := c.processApi(client, api, deletedNamespaces, func(resClient dynamic.ResourceInterface, ns string) error {
resourceVersion, err := c.loadInitialState(ctx, api, resClient, ns, false) // don't lock here, we are already in a lock before startMissingWatches is called inside watchEvents
if err != nil && c.isRestrictedResource(err) {
keep := false
Expand Down Expand Up @@ -786,15 +808,26 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
// processApi processes all the resources for a given API. First we construct an API client for the given API. Then we
// call the callback. If we're managing the whole cluster, we call the callback with the client and an empty namespace.
// If we're managing specific namespaces, we call the callback for each namespace.
func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, callback func(resClient dynamic.ResourceInterface, ns string) error) error {
func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, deletedNamespaces *sync.Map, callback func(resClient dynamic.ResourceInterface, ns string) error) error {
resClient := client.Resource(api.GroupVersionResource)
switch {
// if manage whole cluster or resource is cluster level and cluster resources enabled
case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources):
return callback(resClient, "")
// if manage some namespaces and resource is namespaced
case len(c.namespaces) != 0 && api.Meta.Namespaced:
nsClient := client.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "namespaces",
})

for _, ns := range c.namespaces {
if !c.namespaceExists(context.Background(), nsClient, ns, deletedNamespaces) {
// Namespace was deleted, skip it (deletedNamespaces map tracks it for later cleanup)
continue
}

err := callback(resClient.Namespace(ns), ns)
if err != nil {
return err
Expand Down Expand Up @@ -882,6 +915,7 @@ func (c *clusterCache) sync() error {
c.resources = make(map[kube.ResourceKey]*Resource)
c.namespacedResources = make(map[schema.GroupKind]bool)
config := c.config

version, err := c.kubectl.GetServerVersion(config)
if err != nil {
return fmt.Errorf("failed to get server version: %w", err)
Expand Down Expand Up @@ -921,6 +955,9 @@ func (c *clusterCache) sync() error {
go c.processEvents()
}

// Track deleted namespaces found during parallel processing
deletedNamespaces := &sync.Map{}

// Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
lock := sync.Mutex{}
err = kube.RunAllAsync(len(apis), func(i int) error {
Expand All @@ -933,7 +970,7 @@ func (c *clusterCache) sync() error {
c.namespacedResources[api.GroupKind] = api.Meta.Namespaced
lock.Unlock()

return c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
return c.processApi(client, api, deletedNamespaces, func(resClient dynamic.ResourceInterface, ns string) error {
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
return listPager.EachListItem(context.Background(), metav1.ListOptions{}, func(obj runtime.Object) error {
if un, ok := obj.(*unstructured.Unstructured); !ok {
Expand Down Expand Up @@ -979,6 +1016,17 @@ func (c *clusterCache) sync() error {
return fmt.Errorf("failed to sync cluster %s: %w", c.config.Host, err)
}

// After parallel processing completes, update namespace list by removing deleted ones
var validNamespaces []string
for _, ns := range c.namespaces {
if _, deleted := deletedNamespaces.Load(ns); deleted {
c.log.Info(fmt.Sprintf("Namespace '%s' no longer exists, removing from cluster cache", ns))
continue
}
validNamespaces = append(validNamespaces, ns)
}
c.namespaces = validNamespaces

c.log.Info("Cluster successfully synced")
return nil
}
Expand Down
174 changes: 164 additions & 10 deletions pkg/cache/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -245,6 +246,16 @@ func TestStatefulSetOwnershipInferred(t *testing.T) {
},
}

defaultNs := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: "default",
},
}

tests := []struct {
name string
cluster *clusterCache
Expand All @@ -254,7 +265,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) {
}{
{
name: "STSTemplateNameNotMatching",
cluster: newCluster(t, sts),
cluster: newCluster(t, sts, defaultNs),
pvc: &corev1.PersistentVolumeClaim{
TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind},
ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"},
Expand All @@ -263,7 +274,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) {
},
{
name: "MatchingSTSExists",
cluster: newCluster(t, sts),
cluster: newCluster(t, sts, defaultNs),
pvc: &corev1.PersistentVolumeClaim{
TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind},
ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"},
Expand All @@ -272,7 +283,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) {
},
{
name: "STSTemplateNameNotMatchingWithBatchProcessing",
cluster: newClusterWithOptions(t, opts, sts),
cluster: newClusterWithOptions(t, opts, sts, defaultNs),
pvc: &corev1.PersistentVolumeClaim{
TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind},
ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"},
Expand All @@ -281,7 +292,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) {
},
{
name: "MatchingSTSExistsWithBatchProcessing",
cluster: newClusterWithOptions(t, opts, sts),
cluster: newClusterWithOptions(t, opts, sts, defaultNs),
pvc: &corev1.PersistentVolumeClaim{
TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind},
ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"},
Expand Down Expand Up @@ -334,7 +345,26 @@ func TestEnsureSyncedSingleNamespace(t *testing.T) {
},
}

cluster := newCluster(t, obj1, obj2)
ns1 := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: "default1",
},
}
ns2 := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: "default2",
},
}

cluster := newCluster(t, obj1, obj2, ns1, ns2)
cluster.namespaces = []string{"default1"}
err := cluster.EnsureSynced()
require.NoError(t, err)
Expand Down Expand Up @@ -420,7 +450,26 @@ metadata:
}

func TestGetManagedLiveObjsNamespacedModeClusterLevelResource(t *testing.T) {
cluster := newCluster(t, testPod1(), testRS(), testDeploy())
defaultNs := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: "default",
},
}
productionNs := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: "production",
},
}

cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, productionNs)
cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) {
return nil, true
}))
Expand All @@ -445,7 +494,26 @@ metadata:
}

func TestGetManagedLiveObjsNamespacedModeClusterLevelResource_ClusterResourceEnabled(t *testing.T) {
cluster := newCluster(t, testPod1(), testRS(), testDeploy())
defaultNs := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: "default",
},
}
productionNs := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: "production",
},
}

cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, productionNs)
cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) {
return nil, true
}))
Expand Down Expand Up @@ -514,7 +582,26 @@ metadata:
}

func TestGetManagedLiveObjsValidNamespace(t *testing.T) {
cluster := newCluster(t, testPod1(), testRS(), testDeploy())
defaultNs := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: "default",
},
}
productionNs := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: "production",
},
}

cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, productionNs)
cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) {
return nil, true
}))
Expand Down Expand Up @@ -542,7 +629,26 @@ metadata:
}

func TestGetManagedLiveObjsInvalidNamespace(t *testing.T) {
cluster := newCluster(t, testPod1(), testRS(), testDeploy())
defaultNs := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: "default",
},
}
developNs := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: "develop",
},
}

cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, developNs)
cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) {
return nil, true
}))
Expand Down Expand Up @@ -595,7 +701,18 @@ func TestGetManagedLiveObjsFailedConversion(t *testing.T) {
t.Run(testCaseCopy.name, func(t *testing.T) {
err := apiextensions.AddToScheme(scheme.Scheme)
require.NoError(t, err)
cluster := newCluster(t, testCRD(), testCronTab()).

defaultNs := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: "default",
},
}

cluster := newCluster(t, testCRD(), testCronTab(), defaultNs).
WithAPIResources([]kube.APIResourceInfo{
{
GroupKind: schema.GroupKind{Group: cronTabGroup, Kind: "CronTab"},
Expand Down Expand Up @@ -1292,3 +1409,40 @@ func BenchmarkIterateHierarchyV2(b *testing.B) {
})
}
}

func TestSyncWithDeletedNamespace(t *testing.T) {
deletedNamespace := "deleted-namespace"
validNamespace := "default"
pod := testPod1()
pod.SetNamespace(validNamespace)
validNs := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: validNamespace,
},
}
cluster := newCluster(t, pod, validNs)
cluster.namespaces = []string{validNamespace, deletedNamespace}
client := cluster.kubectl.(*kubetest.MockKubectlCmd).DynamicClient.(*fake.FakeDynamicClient)

// Return "not found" error when getting the deleted namespace during validation
client.PrependReactor("get", "namespaces", func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
getAction := action.(testcore.GetAction)
if getAction.GetName() == deletedNamespace {
// Simulate namespace not found (deleted)
return true, nil, apierrors.NewNotFound(
schema.GroupResource{Group: "", Resource: "namespaces"},
deletedNamespace)
}
return false, nil, nil
})

err := cluster.sync()

require.NoError(t, err)
assert.NotContains(t, cluster.namespaces, deletedNamespace)
assert.Contains(t, cluster.namespaces, validNamespace)
}