diff --git a/pkg/internal/transform/kube/db.go b/pkg/internal/transform/kube/db.go index e98c542e8..db188b399 100644 --- a/pkg/internal/transform/kube/db.go +++ b/pkg/internal/transform/kube/db.go @@ -3,6 +3,7 @@ package kube import ( "fmt" "log/slog" + "sync" "k8s.io/client-go/tools/cache" @@ -21,15 +22,20 @@ func dblog() *slog.Logger { type Database struct { informer *kube.Metadata + cntMut sync.Mutex containerIDs map[string]*container.Info + // a single namespace will point to any container inside the pod // but we don't care which one + nsMut sync.RWMutex namespaces map[uint32]*container.Info // key: pid namespace + podsCacheMut sync.RWMutex fetchedPodsCache map[uint32]*kube.PodInfo // ip to pod name matcher + podsMut sync.RWMutex podsByIP map[string]*kube.PodInfo } @@ -68,11 +74,18 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) { // OnDeletion implements ContainerEventHandler func (id *Database) OnDeletion(containerID []string) { for _, cid := range containerID { - if info, ok := id.containerIDs[cid]; ok { + id.cntMut.Lock() + info, ok := id.containerIDs[cid] + delete(id.containerIDs, cid) + id.cntMut.Unlock() + if ok { + id.podsCacheMut.Lock() delete(id.fetchedPodsCache, info.PIDNamespace) + id.podsCacheMut.Unlock() + id.nsMut.Lock() delete(id.namespaces, info.PIDNamespace) + id.nsMut.Unlock() } - delete(id.containerIDs, cid) } } @@ -83,15 +96,23 @@ func (id *Database) AddProcess(pid uint32) { dblog().Debug("failing to get container information", "pid", pid, "error", err) return } + id.nsMut.Lock() id.namespaces[ifp.PIDNamespace] = &ifp + id.nsMut.Unlock() + id.cntMut.Lock() id.containerIDs[ifp.ContainerID] = &ifp + id.cntMut.Unlock() } // OwnerPodInfo returns the information of the pod owning the passed namespace func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) { + id.podsCacheMut.RLock() pod, ok := id.fetchedPodsCache[pidNamespace] + id.podsCacheMut.RUnlock() if !ok { + id.nsMut.RLock() info, ok := id.namespaces[pidNamespace] + id.nsMut.RUnlock() if !ok { return nil, false } @@ -99,7 +120,9 @@ func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) { if !ok { return nil, false } + id.podsCacheMut.Lock() id.fetchedPodsCache[pidNamespace] = pod + id.podsCacheMut.Unlock() } // we check DeploymentName after caching, as the replicasetInfo might be // received late by the replicaset informer @@ -109,6 +132,8 @@ func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) { func (id *Database) UpdateNewPodsByIPIndex(pod *kube.PodInfo) { if len(pod.IPs) > 0 { + id.podsMut.Lock() + defer id.podsMut.Unlock() for _, ip := range pod.IPs { id.podsByIP[ip] = pod } @@ -117,6 +142,8 @@ func (id *Database) UpdateNewPodsByIPIndex(pod *kube.PodInfo) { func (id *Database) UpdateDeletedPodsByIPIndex(pod *kube.PodInfo) { if len(pod.IPs) > 0 { + id.podsMut.Lock() + defer id.podsMut.Unlock() for _, ip := range pod.IPs { delete(id.podsByIP, ip) } @@ -124,5 +151,7 @@ func (id *Database) UpdateDeletedPodsByIPIndex(pod *kube.PodInfo) { } func (id *Database) PodInfoForIP(ip string) *kube.PodInfo { + id.podsMut.RLock() + defer id.podsMut.RUnlock() return id.podsByIP[ip] }