From 013031628a5f44b887d3a967b3cfe180f436e248 Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Fri, 27 Sep 2024 15:20:29 +0200 Subject: [PATCH 1/2] Try to fix unmounting of BPF FS during integration tests (#1205) * Try to fix unmounting of BPF FS during integration tests * make beyla graph to exit immediately after context cancellation --- pkg/internal/pipe/instrumenter.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/internal/pipe/instrumenter.go b/pkg/internal/pipe/instrumenter.go index 5a2a75226..9679e70f3 100644 --- a/pkg/internal/pipe/instrumenter.go +++ b/pkg/internal/pipe/instrumenter.go @@ -152,7 +152,11 @@ type Instrumenter struct { func (i *Instrumenter) Run(ctx context.Context) { go i.internalMetrics.Start(ctx) i.graph.Start() - <-i.graph.Done() + // run until either the graph is finished or the context is cancelled + select { + case <-i.graph.Done(): + case <-ctx.Done(): + } } // spanPtrPromGetters adapts the invocation of SpanPromGetters to work with a request.Span value From 5006cbd909eb6167dc22f7e63f58852c58e6559b Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Mon, 30 Sep 2024 11:28:14 +0200 Subject: [PATCH 2/2] Remove ReplicaSet informer (#1204) * Remove ReplicaSet informer * Make vale happy * Fix network data * Fix documentation --- docs/sources/configure/options.md | 25 +++- pkg/internal/discover/watcher_kube.go | 102 +-------------- pkg/internal/helpers/maps/bits_test.go | 8 ++ pkg/internal/kube/informer.go | 116 +----------------- pkg/internal/kube/informer_ip.go | 22 +--- pkg/internal/kube/informer_type.go | 11 +- pkg/internal/kube/informer_type_test.go | 7 +- pkg/internal/kube/owner.go | 36 +++--- pkg/internal/kube/owner_test.go | 34 ++--- .../netolly/transform/k8s/kubernetes.go | 5 +- pkg/internal/transform/kube/db.go | 3 - pkg/transform/k8s.go | 8 +- 12 files changed, 91 insertions(+), 286 deletions(-) diff --git a/docs/sources/configure/options.md b/docs/sources/configure/options.md index 373882b6a..482c84742 100644 --- a/docs/sources/configure/options.md +++ b/docs/sources/configure/options.md @@ -293,7 +293,7 @@ Each `services` entry is a map where the properties can be grouped according to | `name` | -- | string | (see description) | Defines a name for the matching instrumented service. It will be used to populate the `service.name` -OTEL property and/or the `service_name` prometheus property in the exported metrics/traces. +OTEL property and the `service_name` Prometheus property in the exported metrics/traces. If the property is not set, it will default to any of the following properties, in order of precedence: @@ -302,7 +302,7 @@ precedence: 1. The name of the Deployment that runs the instrumented process, if any. 2. The name of the ReplicaSet/DaemonSet/StatefulSet that runs the instrumented process, if any. 3. The name of the Pod that runs the instrumented process. -- If kubernetes is not enabled: +- If Kubernetes is not enabled: 1. The name of the process executable file. If multiple processes match the service selection criteria described below, @@ -511,7 +511,7 @@ To disable the automatic HTTP request timeout feature, set this option to zero, | `high_request_volume` | `BEYLA_BPF_HIGH_REQUEST_VOLUME` | boolean | (false) | Configures the HTTP tracer heuristic to send telemetry events as soon as a response is detected. -Setting this option reduces the acuracy of timings for requests with large responses, however, +Setting this option reduces the accuracy of timings for requests with large responses, however, in high request volume scenarios this option will reduce the number of dropped trace events. ## Configuration of metrics and traces attributes @@ -662,6 +662,25 @@ establish communication with the Kubernetes Cluster. Usually you won't need to change this value. +| YAML | Environment variable | Type | Default | +|---------------------|--------------------------------|--------|---------| +| `disable_informers` | `BEYLA_KUBE_DISABLE_INFORMERS` | string | (empty) | + +The accepted value is a list that might contain `node` and `service`. + +This option allows you to selectively disable some Kubernetes informers, which are continuously +listening to the Kubernetes API to obtain the metadata that is required for decorating +network metrics or application metrics and traces. + +When Beyla is deployed as a DaemonSet in very large clusters, all the Beyla instances +creating multiple informers might end up overloading the Kubernetes API. + +Disabling some informers would cause reported metadata to be incomplete, but +reduces the load of the Kubernetes API. + +The Pods informer can't be disabled. For that purpose, you should disable the whole +Kubernetes metadata decoration. + ## Routes decorator YAML section `routes`. diff --git a/pkg/internal/discover/watcher_kube.go b/pkg/internal/discover/watcher_kube.go index 4dd0af7a0..425c18e29 100644 --- a/pkg/internal/discover/watcher_kube.go +++ b/pkg/internal/discover/watcher_kube.go @@ -24,10 +24,8 @@ var ( // kubeMetadata is implemented by kube.Metadata type kubeMetadata interface { - FetchPodOwnerInfo(pod *kube.PodInfo) GetContainerPod(containerID string) (*kube.PodInfo, bool) AddPodEventHandler(handler cache.ResourceEventHandler) error - AddReplicaSetEventHandler(handler cache.ResourceEventHandler) error } // watcherKubeEnricher keeps an update relational snapshot of the in-host process-pods-deployments, @@ -47,7 +45,6 @@ type watcherKubeEnricher struct { podsByOwner maps.Map2[nsName, string, *kube.PodInfo] podsInfoCh chan Event[*kube.PodInfo] - rsInfoCh chan Event[*kube.ReplicaSetInfo] } type nsName struct { @@ -112,27 +109,6 @@ func (wk *watcherKubeEnricher) init() error { return fmt.Errorf("can't register watcherKubeEnricher as Pod event handler in the K8s informer: %w", err) } - // the rsInfoCh channel will receive any update about replicasets being created or deleted - wk.rsInfoCh = make(chan Event[*kube.ReplicaSetInfo], 10) - if err := wk.informer.AddReplicaSetEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - rs := obj.(*kube.ReplicaSetInfo) - d := time.Since(rs.CreationTimestamp.Time) - wk.rsInfoCh <- Event[*kube.ReplicaSetInfo]{Type: EventCreated, Obj: obj.(*kube.ReplicaSetInfo)} - wk.m.InformerAddDuration("replicaset", d) - }, - UpdateFunc: func(_, newObj interface{}) { - rs := newObj.(*kube.ReplicaSetInfo) - d := time.Since(rs.CreationTimestamp.Time) - wk.rsInfoCh <- Event[*kube.ReplicaSetInfo]{Type: EventCreated, Obj: newObj.(*kube.ReplicaSetInfo)} - wk.m.InformerUpdateDuration("replicaset", d) - }, - DeleteFunc: func(obj interface{}) { - wk.rsInfoCh <- Event[*kube.ReplicaSetInfo]{Type: EventDeleted, Obj: obj.(*kube.ReplicaSetInfo)} - }, - }); err != nil { - return fmt.Errorf("can't register watcherKubeEnricher as ReplicaSet event handler in the K8s informer: %w", err) - } return nil } @@ -147,8 +123,6 @@ func (wk *watcherKubeEnricher) enrich(in <-chan []Event[processAttrs], out chan< select { case podEvent := <-wk.podsInfoCh: wk.enrichPodEvent(podEvent, out) - case rsEvent := <-wk.rsInfoCh: - wk.enrichReplicaSetEvent(rsEvent, out) case processEvents, ok := <-in: if !ok { wk.log.Debug("input channel closed. Stopping") @@ -175,20 +149,6 @@ func (wk *watcherKubeEnricher) enrichPodEvent(podEvent Event[*kube.PodInfo], out } } -func (wk *watcherKubeEnricher) enrichReplicaSetEvent(rsEvent Event[*kube.ReplicaSetInfo], out chan<- []Event[processAttrs]) { - switch rsEvent.Type { - case EventCreated: - wk.log.Debug("ReplicaSet added", "namespace", - rsEvent.Obj.Namespace, "name", rsEvent.Obj.Name, "owner", rsEvent.Obj.Owner) - out <- wk.onNewReplicaSet(rsEvent.Obj) - case EventDeleted: - wk.log.Debug("ReplicaSet deleted", "namespace", rsEvent.Obj.Namespace, "name", rsEvent.Obj.Name) - wk.onDeletedReplicaSet(rsEvent.Obj) - // we don't forward replicaset deletion, as it will be eventually done - // when the process is removed - } -} - // enrichProcessEvent creates a copy of the process information in the input slice, but decorated with // K8s attributes, if any. It also handles deletion of processes func (wk *watcherKubeEnricher) enrichProcessEvent(processEvents []Event[processAttrs], out chan<- []Event[processAttrs]) { @@ -225,7 +185,7 @@ func (wk *watcherKubeEnricher) onNewProcess(procInfo processAttrs) (processAttrs wk.processByContainer[containerInfo.ContainerID] = procInfo - if pod, ok := wk.getPodInfo(containerInfo.ContainerID); ok { + if pod, ok := wk.informer.GetContainerPod(containerInfo.ContainerID); ok { procInfo = withMetadata(procInfo, pod) } return procInfo, true @@ -234,12 +194,6 @@ func (wk *watcherKubeEnricher) onNewProcess(procInfo processAttrs) (processAttrs func (wk *watcherKubeEnricher) onNewPod(pod *kube.PodInfo) []Event[processAttrs] { wk.updateNewPodsByOwnerIndex(pod) - // update PodInfo with its owner's info, if any - // for each container in the Pod - // - get matching process, if available - // - forward enriched processAttrs data - wk.informer.FetchPodOwnerInfo(pod) - var events []Event[processAttrs] for _, containerID := range pod.ContainerIDs { if procInfo, ok := wk.processByContainer[containerID]; ok { @@ -259,35 +213,6 @@ func (wk *watcherKubeEnricher) onDeletedPod(pod *kube.PodInfo) { } } -func (wk *watcherKubeEnricher) onNewReplicaSet(rsInfo *kube.ReplicaSetInfo) []Event[processAttrs] { - // for each Pod in the ReplicaSet - // for each container in the Pod - // - get matching process, if any - // - enrich and forward it - podInfos := wk.getReplicaSetPods(rsInfo.Namespace, rsInfo.Name) - var allProcesses []Event[processAttrs] - for _, pod := range podInfos { - for _, containerID := range pod.ContainerIDs { - if procInfo, ok := wk.processByContainer[containerID]; ok { - pod.Owner = &kube.Owner{ - LabelName: kube.OwnerReplicaSet, - Name: rsInfo.Name, - Owner: rsInfo.Owner, - } - allProcesses = append(allProcesses, Event[processAttrs]{ - Type: EventCreated, - Obj: withMetadata(procInfo, pod), - }) - } - } - } - return allProcesses -} - -func (wk *watcherKubeEnricher) onDeletedReplicaSet(rsInfo *kube.ReplicaSetInfo) { - wk.podsByOwner.DeleteAll(nsName{namespace: rsInfo.Namespace, name: rsInfo.Name}) -} - func (wk *watcherKubeEnricher) getContainerInfo(pid PID) (container.Info, error) { if cntInfo, ok := wk.containerByPID[pid]; ok { return cntInfo, nil @@ -300,25 +225,6 @@ func (wk *watcherKubeEnricher) getContainerInfo(pid PID) (container.Info, error) return cntInfo, nil } -func (wk *watcherKubeEnricher) getPodInfo(containerID string) (*kube.PodInfo, bool) { - if pod, ok := wk.informer.GetContainerPod(containerID); ok { - wk.informer.FetchPodOwnerInfo(pod) - return pod, true - } - return nil, false -} - -func (wk *watcherKubeEnricher) getReplicaSetPods(namespace, name string) []*kube.PodInfo { - var podInfos []*kube.PodInfo - if pods, ok := wk.podsByOwner[nsName{namespace: namespace, name: name}]; ok { - podInfos = make([]*kube.PodInfo, 0, len(pods)) - for _, pod := range pods { - podInfos = append(podInfos, pod) - } - } - return podInfos -} - func (wk *watcherKubeEnricher) updateNewPodsByOwnerIndex(pod *kube.PodInfo) { if pod.Owner != nil { wk.podsByOwner.Put(nsName{namespace: pod.Namespace, name: pod.Owner.Name}, pod.Name, pod) @@ -342,9 +248,9 @@ func withMetadata(pp processAttrs, info *kube.PodInfo) processAttrs { if info.Owner != nil { ret.metadata[attr.Name(info.Owner.LabelName).Prom()] = info.Owner.Name - topName, topLabel := info.Owner.TopOwnerNameLabel() - ret.metadata[attr.Name(topLabel).Prom()] = topName - ret.metadata[services.AttrOwnerName] = topName + topOwner := info.Owner.TopOwner() + ret.metadata[attr.Name(topOwner.LabelName).Prom()] = topOwner.Name + ret.metadata[services.AttrOwnerName] = topOwner.Name } return ret } diff --git a/pkg/internal/helpers/maps/bits_test.go b/pkg/internal/helpers/maps/bits_test.go index 05eab6830..584c075f2 100644 --- a/pkg/internal/helpers/maps/bits_test.go +++ b/pkg/internal/helpers/maps/bits_test.go @@ -26,6 +26,14 @@ func TestBits_Empty(t *testing.T) { assert.False(t, bits.Has(0b1000)) } +func TestBits_IgnoreUnknownEnums(t *testing.T) { + bits := MappedBits([]key{1, 2, 3, 40}, mapper) + assert.True(t, bits.Has(0b0001)) + assert.True(t, bits.Has(0b0010)) + assert.True(t, bits.Has(0b0100)) + assert.False(t, bits.Has(0b1000)) +} + func TestBits_Transform(t *testing.T) { bits := MappedBits([]key{10, 30, 8910}, mapper, WithTransform(func(k key) key { return k / 10 })) diff --git a/pkg/internal/kube/informer.go b/pkg/internal/kube/informer.go index bc60da3a5..ec3b3651f 100644 --- a/pkg/internal/kube/informer.go +++ b/pkg/internal/kube/informer.go @@ -9,7 +9,6 @@ import ( "strings" "time" - appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" @@ -47,10 +46,9 @@ type ContainerEventHandler interface { type Metadata struct { log *slog.Logger // pods and replicaSets cache the different K8s types to custom, smaller object types - pods cache.SharedIndexInformer - replicaSets cache.SharedIndexInformer - nodesIP cache.SharedIndexInformer - servicesIP cache.SharedIndexInformer + pods cache.SharedIndexInformer + nodesIP cache.SharedIndexInformer + servicesIP cache.SharedIndexInformer containerEventHandlers []ContainerEventHandler @@ -89,10 +87,6 @@ type NodeInfo struct { IPInfo IPInfo } -func qName(namespace, name string) string { - return namespace + "/" + name -} - var podIndexers = cache.Indexers{ IndexPodByContainerIDs: func(obj interface{}) ([]string, error) { pi := obj.(*PodInfo) @@ -118,18 +112,6 @@ var nodeIndexers = cache.Indexers{ }, } -// usually all the data required by the discovery and enrichement is inside -// te v1.Pod object. However, when the Pod object has a ReplicaSet as owner, -// if the ReplicaSet is owned by a Deployment, the reported Pod Owner should -// be the Deployment, as the Replicaset is just an intermediate entity -// used by the Deployment that it's actually defined by the user -var replicaSetIndexers = cache.Indexers{ - IndexReplicaSetNames: func(obj interface{}) ([]string, error) { - rs := obj.(*ReplicaSetInfo) - return []string{qName(rs.Namespace, rs.Name)}, nil - }, -} - // GetContainerPod fetches metadata from a Pod given the name of one of its containers func (k *Metadata) GetContainerPod(containerID string) (*PodInfo, bool) { objs, err := k.pods.GetIndexer().ByIndex(IndexPodByContainerIDs, containerID) @@ -247,65 +229,6 @@ func rmContainerIDSchema(containerID string) string { return containerID } -// GetReplicaSetInfo fetches metadata from a ReplicaSet given its name -func (k *Metadata) GetReplicaSetInfo(namespace, name string) (*ReplicaSetInfo, bool) { - if k.disabledInformers.Has(InformerReplicaSet) { - return nil, false - } - objs, err := k.replicaSets.GetIndexer().ByIndex(IndexReplicaSetNames, qName(namespace, name)) - if err != nil { - klog().Debug("error accessing ReplicaSet index by name. Ignoring", - "error", err, "name", name) - return nil, false - } - if len(objs) == 0 { - return nil, false - } - return objs[0].(*ReplicaSetInfo), true -} - -func (k *Metadata) initReplicaSetInformer(informerFactory informers.SharedInformerFactory) error { - if k.disabledInformers.Has(InformerReplicaSet) { - return nil - } - log := klog().With("informer", "ReplicaSet") - rss := informerFactory.Apps().V1().ReplicaSets().Informer() - // Transform any *appsv1.Replicaset instance into a *ReplicaSetInfo instance to save space - // in the informer's cache - if err := rss.SetTransform(func(i interface{}) (interface{}, error) { - rs, ok := i.(*appsv1.ReplicaSet) - if !ok { - // it's Ok. The K8s library just informed from an entity - // that has been previously transformed/stored - if pi, ok := i.(*ReplicaSetInfo); ok { - return pi, nil - } - return nil, fmt.Errorf("was expecting a ReplicaSet. Got: %T", i) - } - owner := OwnerFrom(rs.OwnerReferences) - if log.Enabled(context.TODO(), slog.LevelDebug) { - log.Debug("inserting ReplicaSet", "name", rs.Name, "namespace", rs.Namespace, - "owner", owner) - } - return &ReplicaSetInfo{ - ObjectMeta: metav1.ObjectMeta{ - Name: rs.Name, - Namespace: rs.Namespace, - OwnerReferences: rs.OwnerReferences, - }, - Owner: owner, - }, nil - }); err != nil { - return fmt.Errorf("can't set pods transform: %w", err) - } - if err := rss.AddIndexers(replicaSetIndexers); err != nil { - return fmt.Errorf("can't add %s indexer to ReplicaSets informer: %w", IndexReplicaSetNames, err) - } - - k.replicaSets = rss - return nil -} - func (k *Metadata) InitFromClient(ctx context.Context, client kubernetes.Interface, timeout time.Duration) error { // Initialization variables k.log = klog() @@ -353,9 +276,6 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac if err := k.initServiceIPInformer(informerFactory); err != nil { return err } - if err := k.initReplicaSetInformer(informerFactory); err != nil { - return err - } log := klog() log.Debug("starting kubernetes informers, waiting for syncronization") @@ -374,18 +294,6 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac } } -// FetchPodOwnerInfo updates the pod owner with the Deployment information, if it exists. -// Pod Info might include a ReplicaSet as owner, and ReplicaSet info -// usually has a Deployment as owner reference, which is the one that we'd really like -// to report as owner. -func (k *Metadata) FetchPodOwnerInfo(pod *PodInfo) { - if pod.Owner != nil && pod.Owner.LabelName == OwnerReplicaSet { - if rsi, ok := k.GetReplicaSetInfo(pod.Namespace, pod.Owner.Name); ok { - pod.Owner.Owner = rsi.Owner - } - } -} - func (k *Metadata) AddContainerEventHandler(eh ContainerEventHandler) { k.containerEventHandlers = append(k.containerEventHandlers, eh) } @@ -401,20 +309,6 @@ func (k *Metadata) AddPodEventHandler(h cache.ResourceEventHandler) error { return err } -func (k *Metadata) AddReplicaSetEventHandler(h cache.ResourceEventHandler) error { - if k.disabledInformers.Has(InformerReplicaSet) { - return nil - } - _, err := k.replicaSets.AddEventHandler(h) - // passing a snapshot of the currently stored entities - go func() { - for _, pod := range k.replicaSets.GetStore().List() { - h.OnAdd(pod, true) - } - }() - return err -} - func (k *Metadata) AddNodeEventHandler(h cache.ResourceEventHandler) error { if k.disabledInformers.Has(InformerNode) { return nil @@ -430,8 +324,8 @@ func (k *Metadata) AddNodeEventHandler(h cache.ResourceEventHandler) error { } func (i *PodInfo) ServiceName() string { - if on, _ := i.Owner.TopOwnerNameLabel(); on != "" { - return on + if to := i.Owner.TopOwner(); to != nil { + return to.Name } return i.Name } diff --git a/pkg/internal/kube/informer_ip.go b/pkg/internal/kube/informer_ip.go index 18556d3b4..f545fab11 100644 --- a/pkg/internal/kube/informer_ip.go +++ b/pkg/internal/kube/informer_ip.go @@ -167,27 +167,7 @@ func (k *Metadata) infoForIP(idx cache.Indexer, ip string) (any, bool) { func (k *Metadata) getOwner(meta *metav1.ObjectMeta, info *IPInfo) Owner { if len(meta.OwnerReferences) > 0 { - ownerReference := meta.OwnerReferences[0] - if ownerReference.Kind != "ReplicaSet" { - return *OwnerFrom(meta.OwnerReferences) - } - - if !k.disabledInformers.Has(InformerReplicaSet) { - item, ok, err := k.replicaSets.GetIndexer().GetByKey(meta.Namespace + "/" + ownerReference.Name) - switch { - case err != nil: - k.log.Debug("can't get ReplicaSet info from informer. Ignoring", - "key", meta.Namespace+"/"+ownerReference.Name, "error", err) - case !ok: - k.log.Debug("ReplicaSet info still not in the informer. Ignoring", - "key", meta.Namespace+"/"+ownerReference.Name) - default: - rsInfo := item.(*ReplicaSetInfo).ObjectMeta - if len(rsInfo.OwnerReferences) > 0 { - return *OwnerFrom(rsInfo.OwnerReferences) - } - } - } + return *OwnerFrom(meta.OwnerReferences) } // If no owner references found, return itself as owner return Owner{ diff --git a/pkg/internal/kube/informer_type.go b/pkg/internal/kube/informer_type.go index a95a62740..b54da9d1a 100644 --- a/pkg/internal/kube/informer_type.go +++ b/pkg/internal/kube/informer_type.go @@ -8,7 +8,6 @@ import ( const ( InformerService = maps.Bits(1 << iota) - InformerReplicaSet InformerNode ) @@ -16,12 +15,10 @@ func informerTypes(str []string) maps.Bits { return maps.MappedBits( str, map[string]maps.Bits{ - "service": InformerService, - "services": InformerService, - "replicaset": InformerReplicaSet, - "replicasets": InformerReplicaSet, - "node": InformerNode, - "nodes": InformerNode, + "service": InformerService, + "services": InformerService, + "node": InformerNode, + "nodes": InformerNode, }, maps.WithTransform(strings.ToLower), ) diff --git a/pkg/internal/kube/informer_type_test.go b/pkg/internal/kube/informer_type_test.go index 41493fc21..96c06f792 100644 --- a/pkg/internal/kube/informer_type_test.go +++ b/pkg/internal/kube/informer_type_test.go @@ -7,18 +7,15 @@ import ( ) func TestInformerTypeHas(t *testing.T) { - it := informerTypes([]string{"ReplicaSet", "Node"}) - require.False(t, it.Has(InformerService)) - require.True(t, it.Has(InformerReplicaSet)) + it := informerTypes([]string{"Service", "Node"}) + require.True(t, it.Has(InformerService)) require.True(t, it.Has(InformerNode)) it = informerTypes([]string{"Service"}) require.True(t, it.Has(InformerService)) - require.False(t, it.Has(InformerReplicaSet)) require.False(t, it.Has(InformerNode)) it = informerTypes(nil) require.False(t, it.Has(InformerService)) - require.False(t, it.Has(InformerReplicaSet)) require.False(t, it.Has(InformerNode)) } diff --git a/pkg/internal/kube/owner.go b/pkg/internal/kube/owner.go index d8e99b909..13bfbb5ea 100644 --- a/pkg/internal/kube/owner.go +++ b/pkg/internal/kube/owner.go @@ -62,30 +62,30 @@ func unrecognizedOwner(or *metav1.OwnerReference) *Owner { } } -// TopOwnerNameLabel returns the top-level name and metadata label in the owner chain. +// TopOwner returns the top Owner in the owner chain. // For example, if the owner is a ReplicaSet, it will return the Deployment name. -func (o *Owner) TopOwnerNameLabel() (string, OwnerLabel) { - if o == nil { - return "", "" - } - if o.LabelName == OwnerReplicaSet { - // we have two levels of ownership at most - if o.Owner != nil { - return o.Owner.Name, o.Owner.LabelName - } - // if the replicaset informer is disabled, we can't get the owner deployment, - // so we will heuristically extract it from the ReplicaSet Name (and cache it) - topOwnerName := o.Name - if idx := strings.LastIndexByte(topOwnerName, '-'); idx > 0 { - topOwnerName = topOwnerName[:idx] +func (o *Owner) TopOwner() *Owner { + // we have two levels of ownership at most + if o != nil && o.LabelName == OwnerReplicaSet && o.Owner == nil { + // we heuristically extract the Deployment name from the replicaset name + if idx := strings.LastIndexByte(o.Name, '-'); idx > 0 { o.Owner = &Owner{ - Name: topOwnerName, + Name: o.Name[:idx], LabelName: OwnerDeployment, + Kind: "Deployment", } - return topOwnerName, OwnerDeployment + } else { + // just caching the own replicaset as owner, in order to cache the result + o.Owner = o } + return o.Owner + } + + // just return the highest existing owner (two levels of ownership maximum) + if o == nil || o.Owner == nil { + return o } - return o.Name, o.LabelName + return o.Owner } func (o *Owner) String() string { diff --git a/pkg/internal/kube/owner_test.go b/pkg/internal/kube/owner_test.go index db42359ac..9e2cba65c 100644 --- a/pkg/internal/kube/owner_test.go +++ b/pkg/internal/kube/owner_test.go @@ -61,27 +61,33 @@ func TestTopOwnerLabel(t *testing.T) { type testCase struct { expectedLabel OwnerLabel expectedName string + expectedKind string owner *Owner } for _, tc := range []testCase{ - {owner: nil}, - {expectedLabel: OwnerDaemonSet, expectedName: "ds", - owner: &Owner{LabelName: OwnerDaemonSet, Name: "ds"}}, - {expectedLabel: OwnerDeployment, expectedName: "rs-without-dep-meta", - owner: &Owner{LabelName: OwnerReplicaSet, Name: "rs-without-dep-meta-34fb1fa3a"}}, - {expectedLabel: OwnerDeployment, expectedName: "dep", - owner: &Owner{LabelName: OwnerReplicaSet, Name: "dep-34fb1fa3a", - Owner: &Owner{LabelName: OwnerDeployment, Name: "dep"}}}, + {expectedLabel: OwnerDaemonSet, expectedName: "ds", expectedKind: "DaemonSet", + owner: &Owner{LabelName: OwnerDaemonSet, Name: "ds", Kind: "DaemonSet"}}, + {expectedLabel: OwnerDeployment, expectedName: "rs-without-dep-meta", expectedKind: "Deployment", + owner: &Owner{LabelName: OwnerReplicaSet, Name: "rs-without-dep-meta-34fb1fa3a", Kind: "ReplicaSet"}}, + {expectedLabel: OwnerDeployment, expectedName: "dep", expectedKind: "Deployment", + owner: &Owner{LabelName: OwnerReplicaSet, Name: "dep-34fb1fa3a", Kind: "ReplicaSet", + Owner: &Owner{LabelName: OwnerDeployment, Name: "dep", Kind: "Deployment"}}}, } { t.Run(tc.expectedName, func(t *testing.T) { - name, label := tc.owner.TopOwnerNameLabel() - assert.Equal(t, tc.expectedName, name) - assert.Equal(t, tc.expectedLabel, label) + topOwner := tc.owner.TopOwner() + assert.Equal(t, tc.expectedName, topOwner.Name) + assert.Equal(t, tc.expectedLabel, topOwner.LabelName) + assert.Equal(t, tc.expectedKind, topOwner.Kind) // check that the output is consistent (e.g. after ReplicaSet owner data is cached) - name, label = tc.owner.TopOwnerNameLabel() - assert.Equal(t, tc.expectedName, name) - assert.Equal(t, tc.expectedLabel, label) + topOwner = tc.owner.TopOwner() + assert.Equal(t, tc.expectedName, topOwner.Name) + assert.Equal(t, tc.expectedLabel, topOwner.LabelName) + assert.Equal(t, tc.expectedKind, topOwner.Kind) }) } } + +func TestTopOwner_Nil(t *testing.T) { + assert.Nil(t, (*Owner)(nil).TopOwner()) +} diff --git a/pkg/internal/netolly/transform/k8s/kubernetes.go b/pkg/internal/netolly/transform/k8s/kubernetes.go index d5567c921..cd7a69a54 100644 --- a/pkg/internal/netolly/transform/k8s/kubernetes.go +++ b/pkg/internal/netolly/transform/k8s/kubernetes.go @@ -130,11 +130,12 @@ func (n *decorator) decorate(flow *ebpf.Record, prefix, ip string) bool { } return false } + topOwner := ipinfo.Owner.TopOwner() flow.Attrs.Metadata[attr.Name(prefix+attrSuffixNs)] = meta.Namespace flow.Attrs.Metadata[attr.Name(prefix+attrSuffixName)] = meta.Name flow.Attrs.Metadata[attr.Name(prefix+attrSuffixType)] = ipinfo.Kind - flow.Attrs.Metadata[attr.Name(prefix+attrSuffixOwnerName)] = ipinfo.Owner.Name - flow.Attrs.Metadata[attr.Name(prefix+attrSuffixOwnerType)] = ipinfo.Owner.Kind + flow.Attrs.Metadata[attr.Name(prefix+attrSuffixOwnerName)] = topOwner.Name + flow.Attrs.Metadata[attr.Name(prefix+attrSuffixOwnerType)] = topOwner.Kind if ipinfo.HostIP != "" { flow.Attrs.Metadata[attr.Name(prefix+attrSuffixHostIP)] = ipinfo.HostIP if ipinfo.HostName != "" { diff --git a/pkg/internal/transform/kube/db.go b/pkg/internal/transform/kube/db.go index 1f084e3b2..9cd48d839 100644 --- a/pkg/internal/transform/kube/db.go +++ b/pkg/internal/transform/kube/db.go @@ -174,9 +174,6 @@ func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) { } id.fetchedPodsCache[pidNamespace] = pod } - // we check DeploymentName after caching, as the replicasetInfo might be - // received late by the replicaset informer - id.informer.FetchPodOwnerInfo(pod) return pod, true } diff --git a/pkg/transform/k8s.go b/pkg/transform/k8s.go index 40601a2f7..92011d205 100644 --- a/pkg/transform/k8s.go +++ b/pkg/transform/k8s.go @@ -35,8 +35,8 @@ type KubernetesDecorator struct { // IPs are not matched to any kubernetes entity, assuming they are cluster-external DropExternal bool `yaml:"drop_external" env:"BEYLA_NETWORK_DROP_EXTERNAL"` - // DisableInformers allow selectively disabling some informers. Accepted value is a list - // that mitght contain replicaset, node, service. Disabling any of them + // DisableInformers allows selectively disabling some informers. Accepted value is a list + // that might contain node or service. Disabling any of them // will cause metadata to be incomplete but will reduce the load of the Kube API. // Pods informer can't be disabled. For that purpose, you should disable the whole // kubernetes metadata decoration. @@ -130,8 +130,8 @@ func (md *metadataDecorator) appendMetadata(span *request.Span, info *kube.PodIn } if info.Owner != nil { span.ServiceID.Metadata[attr.Name(info.Owner.LabelName)] = info.Owner.Name - topName, topLabel := info.Owner.TopOwnerNameLabel() - span.ServiceID.Metadata[attr.Name(topLabel)] = topName + topOwner := info.Owner.TopOwner() + span.ServiceID.Metadata[attr.Name(topOwner.LabelName)] = topOwner.Name } // override hostname by the Pod name span.ServiceID.HostName = info.Name