Skip to content

Commit

Permalink
use only needed informer scopes
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac committed Oct 1, 2024
1 parent 91f8aeb commit a0c4063
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 40 deletions.
1 change: 1 addition & 0 deletions pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func buildCommonContextInfo(
config.Attributes.Kubernetes.Enable,
config.Attributes.Kubernetes.DisableInformers,
config.Attributes.Kubernetes.KubeconfigPath,
config.Enabled(beyla.FeatureNetO11y),
config.Attributes.Kubernetes.InformersSyncTimeout,
),
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/discover/watcher_kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestWatcherKubeEnricher(t *testing.T) {
// Setup a fake K8s API connected to the watcherKubeEnricher
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute))
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, "", 30*time.Minute))
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer}, fakeInternalMetrics{})()
require.NoError(t, err)
inputCh, outputCh := make(chan []Event[processAttrs], 10), make(chan []Event[processAttrs], 10)
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) {
// Setup a fake K8s API connected to the watcherKubeEnricher
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, 30*time.Minute))
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, "", 30*time.Minute))
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer}, fakeInternalMetrics{})()
require.NoError(t, err)
pipeConfig := beyla.Config{}
Expand Down
30 changes: 23 additions & 7 deletions pkg/internal/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -229,10 +230,10 @@ func rmContainerIDSchema(containerID string) string {
return containerID
}

func (k *Metadata) InitFromClient(ctx context.Context, client kubernetes.Interface, timeout time.Duration) error {
func (k *Metadata) InitFromClient(ctx context.Context, client kubernetes.Interface, restrictNode string, timeout time.Duration) error {
// Initialization variables
k.log = klog()
return k.initInformers(ctx, client, timeout)
return k.initInformers(ctx, client, restrictNode, timeout)
}

func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
Expand Down Expand Up @@ -262,11 +263,27 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
return config, nil
}

func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interface, syncTimeout time.Duration) error {
func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interface, restrictNode string, syncTimeout time.Duration) error {
if syncTimeout <= 0 {
syncTimeout = defaultSyncTimeout
}
informerFactory := informers.NewSharedInformerFactory(client, resyncTime)
var informerFactory informers.SharedInformerFactory
if restrictNode == "" {
k.log.Info("no node selector provided. Listening to global resources")
informerFactory = informers.NewSharedInformerFactory(client, resyncTime)
} else {
fieldSelector := fields.OneTermEqualSelector("spec.nodeName", restrictNode).String()
k.log.Info("using field selector", "selector", fieldSelector)
opts := informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector
})
informerFactory = informers.NewSharedInformerFactoryWithOptions(client, resyncTime, opts)
// In the App O11y use case, we restrict to local nodes as we don't need to listen to global resources.
// In App O11y, we don't need neither Node nor Service informers, so we disable them.
k.disabledInformers |= InformerNode
k.disabledInformers |= InformerService
}

if err := k.initPodInformer(informerFactory); err != nil {
return err
}
Expand All @@ -277,8 +294,7 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac
return err
}

log := klog()
log.Debug("starting kubernetes informers, waiting for syncronization")
k.log.Debug("starting kubernetes informers, waiting for syncronization")
informerFactory.Start(ctx.Done())
finishedCacheSync := make(chan struct{})
go func() {
Expand All @@ -287,7 +303,7 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac
}()
select {
case <-finishedCacheSync:
log.Debug("kubernetes informers started")
k.log.Debug("kubernetes informers started")
return nil
case <-time.After(syncTimeout):
return fmt.Errorf("kubernetes cache has not been synced after %s timeout", syncTimeout)
Expand Down
48 changes: 47 additions & 1 deletion pkg/internal/kube/informer_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package kube
import (
"context"
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/grafana/beyla/pkg/internal/helpers/maps"
Expand All @@ -23,15 +25,18 @@ type MetadataProvider struct {

enable atomic.Value
disabledInformers maps.Bits
enableNetworkMeta bool
}

func NewMetadataProvider(
enable kubeflags.EnableFlag,
disabledInformers []string,
kubeConfigPath string,
enableNetworkMetadata bool,
syncTimeout time.Duration,
) *MetadataProvider {
mp := &MetadataProvider{
enableNetworkMeta: enableNetworkMetadata,
kubeConfigPath: kubeConfigPath,
syncTimeout: syncTimeout,
disabledInformers: informerTypes(disabledInformers),
Expand Down Expand Up @@ -89,9 +94,50 @@ func (mp *MetadataProvider) Get(ctx context.Context) (*Metadata, error) {
if err != nil {
return nil, fmt.Errorf("kubernetes client can't be initialized: %w", err)
}

// restricting the node name of the informers for App O11y, as we will only decorate
// instances running on the same node that Beyla
// however, for network o11y, we need to get all the nodes so the node name restriction
// would remain unset
restrictNodeName := ""
if !mp.enableNetworkMeta {
restrictNodeName, err = mp.CurrentNodeName(ctx)
if err != nil {
return nil, fmt.Errorf("can't get current node name: %w", err)
}
}
mp.metadata = &Metadata{disabledInformers: mp.disabledInformers}
if err := mp.metadata.InitFromClient(ctx, kubeClient, mp.syncTimeout); err != nil {
if err := mp.metadata.InitFromClient(ctx, kubeClient, restrictNodeName, mp.syncTimeout); err != nil {
return nil, fmt.Errorf("can't initialize kubernetes metadata: %w", err)
}
return mp.metadata, nil
}

func (mp *MetadataProvider) CurrentNodeName(ctx context.Context) (string, error) {
log := klog().With("func", "NodeName")
kubeClient, err := mp.KubeClient()
if err != nil {
return "", fmt.Errorf("can't get kubernetes client: %w", err)
}
// fist: get the current pod name and namespace
currentPod, err := os.Hostname()
if err != nil {
return "", fmt.Errorf("can't get hostname of current pod: %w", err)
}
var currentNamespace string
if nsBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err != nil {
log.Warn("can't read service account namespace. Two Beyla pods with the same"+
" name could result in inaccuracies in the host.id attribute", "error", err)
} else {
currentNamespace = string(nsBytes)
}
// second: get the node for the current Pod
// using List instead of Get because to not require extra serviceaccount permissions
pods, err := kubeClient.CoreV1().Pods(currentNamespace).List(ctx, metav1.ListOptions{
FieldSelector: "metadata.name=" + currentPod,
})
if err != nil || len(pods.Items) == 0 {
return "", fmt.Errorf("can't get pod %s/%s: %w", currentNamespace, currentPod, err)
}
return pods.Items[0].Spec.NodeName, nil
}
38 changes: 9 additions & 29 deletions pkg/internal/pipe/global/host_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.19.0"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type hostIDFetcher func(context.Context, time.Duration) (string, error)
Expand Down Expand Up @@ -108,42 +108,22 @@ func detectHostID(ctx context.Context, timeout time.Duration, detector resource.
}

func (ci *ContextInfo) kubeNodeFetcher(ctx context.Context, _ time.Duration) (string, error) {
if !ci.K8sInformer.IsKubeEnabled() {
if ci.K8sInformer == nil || !ci.K8sInformer.IsKubeEnabled() {
return "", errors.New("kubernetes is not enabled")
}
log := cilog().With("func", "kubeNodeFetcher")
kubeClient, err := ci.K8sInformer.KubeClient()
nodeName, err := ci.K8sInformer.CurrentNodeName(ctx)
if err != nil {
return "", fmt.Errorf("can't get kubernetes client: %w", err)
return "", fmt.Errorf("can't get node name: %w", err)
}
// fist: get the current pod name and namespace
currentPod, err := os.Hostname()
kubeClient, err := ci.K8sInformer.KubeClient()
if err != nil {
return "", fmt.Errorf("can't get hostname of current pod: %w", err)
}
var currentNamespace string
if nsBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err != nil {
log.Warn("can't read service account namespace. Two Beyla pods with the same"+
" name could result in inaccuracies in the host.id attribute", "error", err)
} else {
currentNamespace = string(nsBytes)
}
// second: get the node for the current Pod
// using List instead of Get because to not require extra serviceaccount permissions
pods, err := kubeClient.CoreV1().Pods(currentNamespace).List(ctx, v1.ListOptions{
FieldSelector: "metadata.name=" + currentPod,
})
if err != nil || len(pods.Items) == 0 {
return "", fmt.Errorf("can't get pod %s/%s: %w", currentNamespace, currentPod, err)
return "", fmt.Errorf("can't get kubernetes client: %w", err)
}
pod := pods.Items[0]
// third: get the node MachineID from NodeInfo
// using List instead of Get because to not require extra serviceaccount permissions
nodes, err := kubeClient.CoreV1().Nodes().List(ctx, v1.ListOptions{
FieldSelector: "metadata.name=" + pod.Spec.NodeName,
nodes, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: "metadata.name=" + nodeName,
})
if err != nil || len(nodes.Items) == 0 {
return "", fmt.Errorf("can't get node %s: %w", pod.Spec.NodeName, err)
return "", fmt.Errorf("can't get node %s: %w", nodeName, err)
}
return nodes.Items[0].Status.NodeInfo.MachineID, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/pipe/instrumenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func gctx(groups attributes.AttrGroups) *global.ContextInfo {
return &global.ContextInfo{
Metrics: imetrics.NoopReporter{},
MetricAttributeGroups: groups,
K8sInformer: kube.NewMetadataProvider(kubeflags.EnabledFalse, nil, "", 0),
K8sInformer: kube.NewMetadataProvider(kubeflags.EnabledFalse, nil, "", false, 0),
HostID: "host-id",
}
}
Expand Down

0 comments on commit a0c4063

Please sign in to comment.