From 3944fe7cca7c73fcaae077f45522ecad707a2338 Mon Sep 17 00:00:00 2001 From: Nadav Strahilevitz Date: Thu, 29 Jun 2023 15:05:38 +0300 Subject: [PATCH] fix: enrich post control plane (#3285) (#3289) Locks in the container package were as short as possible. This caused transaction issues once a few actors interacted with it. Change the locking scheme to be more "transactional" with large locks, instead of combining read locks and write locks in the same transaction. Additional chores in the scope of the commit: 1. Initialize enrichEnabled field in control plane. 2. Control Plane's Run started a go routine internally, which was redundant. 3. Rename mutex in cgroups code to cgroupsMutex. 4. Capitalize BPF in RemoveFromBpfMap. 5. Delete the unused processing code for cgroup events. commit: f638b4ec (main), cherry-pick --- pkg/containers/containers.go | 77 +++++++++++++++-------------- pkg/containers/datasource.go | 4 +- pkg/ebpf/controlplane/controller.go | 41 ++++++++------- pkg/ebpf/events_processor.go | 48 ------------------ 4 files changed, 63 insertions(+), 107 deletions(-) diff --git a/pkg/containers/containers.go b/pkg/containers/containers.go index 34f2848cf32a..929e87a9d5d4 100644 --- a/pkg/containers/containers.go +++ b/pkg/containers/containers.go @@ -24,12 +24,12 @@ import ( // Containers contains information about running containers in the host. type Containers struct { - cgroups *cgroup.Cgroups - cgroupsMap map[uint32]CgroupInfo - deleted []uint64 - mtx sync.RWMutex // protecting both cgroups and deleted fields - enricher runtimeInfoService - bpfMapName string + cgroups *cgroup.Cgroups + cgroupsMap map[uint32]CgroupInfo + deleted []uint64 + cgroupsMutex sync.RWMutex // protecting both cgroups and deleted fields + enricher runtimeInfoService + bpfMapName string } // CgroupInfo represents a cgroup dir (might describe a container cgroup dir). @@ -53,10 +53,10 @@ func New( error, ) { containers := &Containers{ - cgroups: cgroups, - cgroupsMap: make(map[uint32]CgroupInfo), - mtx: sync.RWMutex{}, - bpfMapName: mapName, + cgroups: cgroups, + cgroupsMap: make(map[uint32]CgroupInfo), + cgroupsMutex: sync.RWMutex{}, + bpfMapName: mapName, } runtimeService := RuntimeInfoService(sockets) @@ -101,6 +101,8 @@ func (c *Containers) GetCgroupVersion() cgroup.CgroupVersion { // Populate populates Containers struct by reading mounted proc and cgroups fs. func (c *Containers) Populate() error { + c.cgroupsMutex.Lock() + defer c.cgroupsMutex.Unlock() return c.populate() } @@ -147,7 +149,7 @@ func (c *Containers) populate() error { inodeNumber := stat.Ino statusChange := time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec) - _, err = c.CgroupUpdate(inodeNumber, path, statusChange) + _, err = c.cgroupUpdate(inodeNumber, path, statusChange) return errfmt.WrapError(err) } @@ -155,10 +157,12 @@ func (c *Containers) populate() error { return filepath.WalkDir(c.cgroups.GetDefaultCgroup().GetMountPoint(), fn) } -// CgroupUpdate checks if given path belongs to a known container runtime, +// cgroupUpdate checks if given path belongs to a known container runtime, // saving container information in Containers CgroupInfo map. // NOTE: ALL given cgroup dir paths are stored in CgroupInfo map. -func (c *Containers) CgroupUpdate(cgroupId uint64, path string, ctime time.Time) (CgroupInfo, error) { +// NOTE: not thread-safe, lock should be placed in the external calling function, depending +// on the transaction length. +func (c *Containers) cgroupUpdate(cgroupId uint64, path string, ctime time.Time) (CgroupInfo, error) { // Cgroup paths should be stored and evaluated relative to the mountpoint, // trim it from the path. path = strings.TrimPrefix(path, c.cgroups.GetDefaultCgroup().GetMountPoint()) @@ -175,9 +179,7 @@ func (c *Containers) CgroupUpdate(cgroupId uint64, path string, ctime time.Time) Ctime: ctime, } - c.mtx.Lock() c.cgroupsMap[uint32(cgroupId)] = info - c.mtx.Unlock() return info, nil } @@ -187,11 +189,11 @@ func (c *Containers) CgroupUpdate(cgroupId uint64, path string, ctime time.Time) // it returns the retrieved metadata and a relevant error // this function shouldn't be called twice for the same cgroupId unless attempting a retry func (c *Containers) EnrichCgroupInfo(cgroupId uint64) (cruntime.ContainerMetadata, error) { - var metadata cruntime.ContainerMetadata + c.cgroupsMutex.Lock() + defer c.cgroupsMutex.Unlock() - c.mtx.RLock() + var metadata cruntime.ContainerMetadata info, ok := c.cgroupsMap[uint32(cgroupId)] - c.mtx.RUnlock() // if there is no cgroup anymore for some reason, return early if !ok { @@ -221,14 +223,12 @@ func (c *Containers) EnrichCgroupInfo(cgroupId uint64) (cruntime.ContainerMetada } info.Container = metadata - c.mtx.Lock() // we read the dictionary again to make sure the cgroup still exists // otherwise we risk reintroducing it despite not existing _, ok = c.cgroupsMap[uint32(cgroupId)] if ok { c.cgroupsMap[uint32(cgroupId)] = info } - c.mtx.Unlock() return metadata, nil } @@ -306,8 +306,8 @@ func (c *Containers) CgroupRemove(cgroupId uint64, hierarchyID uint32) { now := time.Now() var deleted []uint64 - c.mtx.Lock() - defer c.mtx.Unlock() + c.cgroupsMutex.Lock() + defer c.cgroupsMutex.Unlock() // process previously deleted cgroupInfo data (deleted cgroup dirs) for _, id := range c.deleted { @@ -338,27 +338,29 @@ func (c *Containers) CgroupMkdir(cgroupId uint64, subPath string, hierarchyID ui } // Find container cgroup dir path to get directory stats + c.cgroupsMutex.Lock() + defer c.cgroupsMutex.Unlock() curTime := time.Now() path, err := cgroup.GetCgroupPath(c.cgroups.GetDefaultCgroup().GetMountPoint(), cgroupId, subPath) if err == nil { var stat syscall.Stat_t if err := syscall.Stat(path, &stat); err == nil { // Add cgroupInfo to Containers struct w/ found path (and its last modification time) - return c.CgroupUpdate(cgroupId, path, time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec)) + return c.cgroupUpdate(cgroupId, path, time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec)) } } // No entry found: container may have already exited. // Add cgroupInfo to Containers struct with existing data. // In this case, ctime is just an estimation (current time). - return c.CgroupUpdate(cgroupId, subPath, curTime) + return c.cgroupUpdate(cgroupId, subPath, curTime) } // FindContainerCgroupID32LSB returns the 32 LSB of the Cgroup ID for a given container ID func (c *Containers) FindContainerCgroupID32LSB(containerID string) []uint32 { var cgroupIDs []uint32 - c.mtx.RLock() - defer c.mtx.RUnlock() + c.cgroupsMutex.RLock() + defer c.cgroupsMutex.RUnlock() for k, v := range c.cgroupsMap { if strings.HasPrefix(v.Container.ContainerId, containerID) { cgroupIDs = append(cgroupIDs, k) @@ -378,11 +380,14 @@ func (c *Containers) GetCgroupInfo(cgroupId uint64) CgroupInfo { // cgroupInfo in the Containers struct. An empty subPath will make // getCgroupPath() to walk all cgroupfs directories until it finds the // directory of given cgroupId. + c.cgroupsMutex.Lock() + defer c.cgroupsMutex.Unlock() + path, err := cgroup.GetCgroupPath(c.cgroups.GetDefaultCgroup().GetMountPoint(), cgroupId, "") if err == nil { var stat syscall.Stat_t if err = syscall.Stat(path, &stat); err == nil { - info, err := c.CgroupUpdate(cgroupId, path, time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec)) + info, err := c.cgroupUpdate(cgroupId, path, time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec)) if err == nil { return info } @@ -390,9 +395,9 @@ func (c *Containers) GetCgroupInfo(cgroupId uint64) CgroupInfo { } } - c.mtx.RLock() + c.cgroupsMutex.RLock() + defer c.cgroupsMutex.RUnlock() cgroupInfo := c.cgroupsMap[uint32(cgroupId)] - c.mtx.RUnlock() return cgroupInfo } @@ -400,8 +405,8 @@ func (c *Containers) GetCgroupInfo(cgroupId uint64) CgroupInfo { // GetContainers provides a list of all existing containers. func (c *Containers) GetContainers() map[uint32]CgroupInfo { conts := map[uint32]CgroupInfo{} - c.mtx.RLock() - defer c.mtx.RUnlock() + c.cgroupsMutex.RLock() + defer c.cgroupsMutex.RUnlock() for id, v := range c.cgroupsMap { if v.ContainerRoot && v.expiresAt.IsZero() { conts[id] = v @@ -412,9 +417,9 @@ func (c *Containers) GetContainers() map[uint32]CgroupInfo { // CgroupExists checks if there is a cgroupInfo data of a given cgroupId. func (c *Containers) CgroupExists(cgroupId uint64) bool { - c.mtx.RLock() + c.cgroupsMutex.RLock() _, ok := c.cgroupsMap[uint32(cgroupId)] - c.mtx.RUnlock() + c.cgroupsMutex.RUnlock() return ok } @@ -430,19 +435,19 @@ func (c *Containers) PopulateBpfMap(bpfModule *libbpfgo.Module) error { return errfmt.WrapError(err) } - c.mtx.RLock() + c.cgroupsMutex.RLock() for cgroupIdLsb, info := range c.cgroupsMap { if info.ContainerRoot { state := containerExisted err = containersMap.Update(unsafe.Pointer(&cgroupIdLsb), unsafe.Pointer(&state)) } } - c.mtx.RUnlock() + c.cgroupsMutex.RUnlock() return errfmt.WrapError(err) } -func (c *Containers) RemoveFromBpfMap(bpfModule *libbpfgo.Module, cgroupId uint64, hierarchyID uint32) error { +func (c *Containers) RemoveFromBPFMap(bpfModule *libbpfgo.Module, cgroupId uint64, hierarchyID uint32) error { // cgroupv1: no need to check other controllers than the default switch c.cgroups.GetDefaultCgroup().(type) { case *cgroup.CgroupV1: diff --git a/pkg/containers/datasource.go b/pkg/containers/datasource.go index cd3ba458fa69..3a834d21afaa 100644 --- a/pkg/containers/datasource.go +++ b/pkg/containers/datasource.go @@ -21,8 +21,8 @@ func (ctx SignaturesDataSource) Get(key interface{}) (map[string]interface{}, er if !ok { return nil, detect.ErrKeyNotSupported } - ctx.containers.mtx.RLock() - defer ctx.containers.mtx.RUnlock() + ctx.containers.cgroupsMutex.RLock() + defer ctx.containers.cgroupsMutex.RUnlock() for _, cgroup := range ctx.containers.cgroupsMap { if cgroup.Container.ContainerId == containerId { containerData := cgroup.Container diff --git a/pkg/ebpf/controlplane/controller.go b/pkg/ebpf/controlplane/controller.go index 4f12e0d84ec6..a141ff777673 100644 --- a/pkg/ebpf/controlplane/controller.go +++ b/pkg/ebpf/controlplane/controller.go @@ -42,6 +42,7 @@ func NewController(bpfModule *libbpfgo.Module, cgroupManager *containers.Contain lostSignalChan: make(chan uint64), bpfModule: bpfModule, cgroupManager: cgroupManager, + enrichEnabled: enrichEnabled, } p.signalBuffer, err = bpfModule.InitPerfBuf("signals", p.signalChan, p.lostSignalChan, 1024) if err != nil { @@ -83,27 +84,25 @@ func (p *Controller) Start() error { func (p *Controller) Run(ctx context.Context) { p.ctx = ctx - go func() { - for { - select { - case signalData := <-p.signalChan: - signal := signal{} - err := signal.Unmarshal(signalData) - if err != nil { - logger.Errorw("error unmarshaling signal ebpf buffer", "error", err) - continue - } - err = p.processSignal(signal) - if err != nil { - logger.Errorw("error processing control plane signal", "error", err) - } - case lost := <-p.lostSignalChan: - logger.Warnw(fmt.Sprintf("Lost %d control plane signals", lost)) - case <-p.ctx.Done(): - return + for { + select { + case signalData := <-p.signalChan: + signal := signal{} + err := signal.Unmarshal(signalData) + if err != nil { + logger.Errorw("error unmarshaling signal ebpf buffer", "error", err) + continue + } + err = p.processSignal(signal) + if err != nil { + logger.Errorw("error processing control plane signal", "error", err) } + case lost := <-p.lostSignalChan: + logger.Warnw(fmt.Sprintf("Lost %d control plane signals", lost)) + case <-p.ctx.Done(): + return } - }() + } } func (p *Controller) Stop() error { @@ -144,7 +143,7 @@ func (p *Controller) processCgroupMkdir(args []trace.Argument) error { // removed from the containers bpf map. err := capabilities.GetInstance().EBPF( func() error { - return p.cgroupManager.RemoveFromBpfMap(p.bpfModule, cgroupId, hId) + return p.cgroupManager.RemoveFromBPFMap(p.bpfModule, cgroupId, hId) }, ) if err != nil { @@ -160,7 +159,7 @@ func (p *Controller) processCgroupMkdir(args []trace.Argument) error { if p.enrichEnabled { // If cgroupId belongs to a container, enrich now (in a goroutine) go func() { - _, err = p.cgroupManager.EnrichCgroupInfo(cgroupId) + _, err := p.cgroupManager.EnrichCgroupInfo(cgroupId) if err != nil { logger.Errorw("error triggering container enrich in control plane", "error", err) } diff --git a/pkg/ebpf/events_processor.go b/pkg/ebpf/events_processor.go index 76f421200895..b143d580e0a3 100644 --- a/pkg/ebpf/events_processor.go +++ b/pkg/ebpf/events_processor.go @@ -309,54 +309,6 @@ func (t *Tracee) processSchedProcessFork(event *trace.Event) error { return t.convertArgMonotonicToEpochTime(event, "start_time") } -func (t *Tracee) processCgroupMkdir(event *trace.Event) error { - cgroupId, err := parse.ArgVal[uint64](event.Args, "cgroup_id") - if err != nil { - return errfmt.Errorf("error parsing cgroup_mkdir args: %v", err) - } - path, err := parse.ArgVal[string](event.Args, "cgroup_path") - if err != nil { - return errfmt.Errorf("error parsing cgroup_mkdir args: %v", err) - } - hId, err := parse.ArgVal[uint32](event.Args, "hierarchy_id") - if err != nil { - return errfmt.Errorf("error parsing cgroup_mkdir args: %v", err) - } - info, err := t.containers.CgroupMkdir(cgroupId, path, hId) - if err == nil && info.Container.ContainerId == "" { - // If cgroupId is from a regular cgroup directory, and not the - // container base directory (from known runtimes), it should be - // removed from the containers bpf map. - err := capabilities.GetInstance().EBPF( - func() error { - return t.containers.RemoveFromBpfMap(t.bpfModule, cgroupId, hId) - }, - ) - if err != nil { - // If the cgroupId was not found in bpf map, this could mean that - // it is not a container cgroup and, as a systemd cgroup, could have been - // created and removed very quickly. - // In this case, we don't want to return an error. - logger.Debugw("Failed to remove entry from containers bpf map", "error", err) - } - } - return errfmt.WrapError(err) -} - -func (t *Tracee) processCgroupRmdir(event *trace.Event) error { - cgroupId, err := parse.ArgVal[uint64](event.Args, "cgroup_id") - if err != nil { - return errfmt.Errorf("error parsing cgroup_rmdir args: %v", err) - } - - hId, err := parse.ArgVal[uint32](event.Args, "hierarchy_id") - if err != nil { - return errfmt.Errorf("error parsing cgroup_mkdir args: %v", err) - } - t.containers.CgroupRemove(cgroupId, hId) - return nil -} - // In case FinitModule and InitModule occurs, it means that a kernel module // was loaded and tracee needs to check if it hooked the syscall table and // seq_ops