Skip to content

Commit

Permalink
Fix race conditions in kube database (#774)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac authored Apr 23, 2024
1 parent e36cd60 commit 72af6c9
Showing 1 changed file with 31 additions and 2 deletions.
33 changes: 31 additions & 2 deletions pkg/internal/transform/kube/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kube
import (
"fmt"
"log/slog"
"sync"

"k8s.io/client-go/tools/cache"

Expand All @@ -21,15 +22,20 @@ func dblog() *slog.Logger {
type Database struct {
informer *kube.Metadata

cntMut sync.Mutex
containerIDs map[string]*container.Info

// a single namespace will point to any container inside the pod
// but we don't care which one
nsMut sync.RWMutex
namespaces map[uint32]*container.Info

// key: pid namespace
podsCacheMut sync.RWMutex
fetchedPodsCache map[uint32]*kube.PodInfo

// ip to pod name matcher
podsMut sync.RWMutex
podsByIP map[string]*kube.PodInfo
}

Expand Down Expand Up @@ -68,11 +74,18 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) {
// OnDeletion implements ContainerEventHandler
func (id *Database) OnDeletion(containerID []string) {
for _, cid := range containerID {
if info, ok := id.containerIDs[cid]; ok {
id.cntMut.Lock()
info, ok := id.containerIDs[cid]
delete(id.containerIDs, cid)
id.cntMut.Unlock()
if ok {
id.podsCacheMut.Lock()
delete(id.fetchedPodsCache, info.PIDNamespace)
id.podsCacheMut.Unlock()
id.nsMut.Lock()
delete(id.namespaces, info.PIDNamespace)
id.nsMut.Unlock()
}
delete(id.containerIDs, cid)
}
}

Expand All @@ -83,23 +96,33 @@ func (id *Database) AddProcess(pid uint32) {
dblog().Debug("failing to get container information", "pid", pid, "error", err)
return
}
id.nsMut.Lock()
id.namespaces[ifp.PIDNamespace] = &ifp
id.nsMut.Unlock()
id.cntMut.Lock()
id.containerIDs[ifp.ContainerID] = &ifp
id.cntMut.Unlock()
}

// OwnerPodInfo returns the information of the pod owning the passed namespace
func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) {
id.podsCacheMut.RLock()
pod, ok := id.fetchedPodsCache[pidNamespace]
id.podsCacheMut.RUnlock()
if !ok {
id.nsMut.RLock()
info, ok := id.namespaces[pidNamespace]
id.nsMut.RUnlock()
if !ok {
return nil, false
}
pod, ok = id.informer.GetContainerPod(info.ContainerID)
if !ok {
return nil, false
}
id.podsCacheMut.Lock()
id.fetchedPodsCache[pidNamespace] = pod
id.podsCacheMut.Unlock()
}
// we check DeploymentName after caching, as the replicasetInfo might be
// received late by the replicaset informer
Expand All @@ -109,6 +132,8 @@ func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) {

func (id *Database) UpdateNewPodsByIPIndex(pod *kube.PodInfo) {
if len(pod.IPs) > 0 {
id.podsMut.Lock()
defer id.podsMut.Unlock()
for _, ip := range pod.IPs {
id.podsByIP[ip] = pod
}
Expand All @@ -117,12 +142,16 @@ func (id *Database) UpdateNewPodsByIPIndex(pod *kube.PodInfo) {

func (id *Database) UpdateDeletedPodsByIPIndex(pod *kube.PodInfo) {
if len(pod.IPs) > 0 {
id.podsMut.Lock()
defer id.podsMut.Unlock()
for _, ip := range pod.IPs {
delete(id.podsByIP, ip)
}
}
}

func (id *Database) PodInfoForIP(ip string) *kube.PodInfo {
id.podsMut.RLock()
defer id.podsMut.RUnlock()
return id.podsByIP[ip]
}

0 comments on commit 72af6c9

Please sign in to comment.