From 97659b59ea8f56538760e9ca812e503fdcce0ed4 Mon Sep 17 00:00:00 2001 From: Howard Cheung Date: Sun, 17 Nov 2024 22:18:01 +0800 Subject: [PATCH] feat (processor/k8sattributes): wait for synced when starting k8sattributes processor. (#32622) **Description:** When starting `k8sattributes` processor, block until an initial synchronization has been completed. This solves #32556 **Link to tracking Issue:** fix #32556 **Testing:** Tested in a cluster with constant high span traffic, no more spans with unassociated k8s metadata after adding this pr. **Documentation:** --------- Co-authored-by: Christos Markou --- .chloggen/k8sattributes-block.yaml | 27 ++++++ processor/k8sattributesprocessor/README.md | 15 +++ .../k8sattributesprocessor/client_test.go | 9 +- processor/k8sattributesprocessor/config.go | 7 ++ .../k8sattributesprocessor/config_test.go | 5 + processor/k8sattributesprocessor/factory.go | 7 ++ .../generated_component_test.go | 38 -------- .../internal/kube/client.go | 97 +++++++++++++------ .../internal/kube/client_test.go | 77 +++++++++++---- .../internal/kube/fake_informer.go | 4 +- .../internal/kube/kube.go | 4 +- .../k8sattributesprocessor/metadata.yaml | 1 + processor/k8sattributesprocessor/options.go | 17 ++++ processor/k8sattributesprocessor/processor.go | 37 ++++--- .../k8sattributesprocessor/processor_test.go | 2 +- 15 files changed, 238 insertions(+), 109 deletions(-) create mode 100644 .chloggen/k8sattributes-block.yaml diff --git a/.chloggen/k8sattributes-block.yaml b/.chloggen/k8sattributes-block.yaml new file mode 100644 index 000000000000..2b05153af746 --- /dev/null +++ b/.chloggen/k8sattributes-block.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/k8sattributes + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Block when starting until the metadata have been synced, to fix that some data couldn't be associated with metadata when the agent was just started. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32556] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/k8sattributesprocessor/README.md b/processor/k8sattributesprocessor/README.md index cdb82bc759aa..12a3e4d32f8a 100644 --- a/processor/k8sattributesprocessor/README.md +++ b/processor/k8sattributesprocessor/README.md @@ -198,6 +198,21 @@ the processor associates the received trace to the pod, based on the connection } ``` +By default, the processor will be ready as soon as it starts, even if no metadata has been fetched yet. +If data is sent to this processor before the metadata is synced, there will be no metadata to enrich the data with. + +To wait for the metadata to be synced before the processor is ready, set the `wait_for_metadata` option to `true`. +Then the processor will not be ready until the metadata is fully synced. As a result, the start-up of the Collector will be blocked. If the metadata cannot be synced, the Collector will ultimately fail to start. +If a timeout is reached, the processor will fail to start and return an error, which will cause the collector to exit. +The timeout defaults to 10s and can be configured with the `metadata_sync_timeout` option. + +example for setting the processor to wait for metadata to be synced before it is ready: + +```yaml +wait_for_metadata: true +wait_for_metadata_timeout: 10s +``` + ## Extracting attributes from pod labels and annotations The k8sattributesprocessor can also set resource attributes from k8s labels and annotations of pods, namespaces and nodes. diff --git a/processor/k8sattributesprocessor/client_test.go b/processor/k8sattributesprocessor/client_test.go index 2a893e52790c..c69e0638301a 100644 --- a/processor/k8sattributesprocessor/client_test.go +++ b/processor/k8sattributesprocessor/client_test.go @@ -4,6 +4,8 @@ package k8sattributesprocessor import ( + "time" + "go.opentelemetry.io/collector/component" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -35,7 +37,7 @@ func selectors() (labels.Selector, fields.Selector) { } // newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type -func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) { +func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet, _ bool, _ time.Duration) (kube.Client, error) { cs := fake.NewSimpleClientset() ls, fs := selectors() @@ -70,10 +72,11 @@ func (f *fakeClient) GetNode(nodeName string) (*kube.Node, bool) { } // Start is a noop for FakeClient. -func (f *fakeClient) Start() { +func (f *fakeClient) Start() error { if f.Informer != nil { - f.Informer.Run(f.StopCh) + go f.Informer.Run(f.StopCh) } + return nil } // Stop is a noop for FakeClient. diff --git a/processor/k8sattributesprocessor/config.go b/processor/k8sattributesprocessor/config.go index 27b49cef5d63..e5651a087bf4 100644 --- a/processor/k8sattributesprocessor/config.go +++ b/processor/k8sattributesprocessor/config.go @@ -6,6 +6,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr import ( "fmt" "regexp" + "time" "go.opentelemetry.io/collector/featuregate" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" @@ -46,6 +47,12 @@ type Config struct { // Exclude section allows to define names of pod that should be // ignored while tagging. Exclude ExcludeConfig `mapstructure:"exclude"` + + // WaitForMetadata is a flag that determines if the processor should wait k8s metadata to be synced when starting. + WaitForMetadata bool `mapstructure:"wait_for_metadata"` + + // WaitForMetadataTimeout is the maximum time the processor will wait for the k8s metadata to be synced. + WaitForMetadataTimeout time.Duration `mapstructure:"wait_for_metadata_timeout"` } func (cfg *Config) Validate() error { diff --git a/processor/k8sattributesprocessor/config_test.go b/processor/k8sattributesprocessor/config_test.go index 78826108016b..9510df99a3b7 100644 --- a/processor/k8sattributesprocessor/config_test.go +++ b/processor/k8sattributesprocessor/config_test.go @@ -6,6 +6,7 @@ package k8sattributesprocessor import ( "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -34,6 +35,7 @@ func TestLoadConfig(t *testing.T) { Extract: ExtractConfig{ Metadata: enabledAttributes(), }, + WaitForMetadataTimeout: 10 * time.Second, }, }, { @@ -105,6 +107,7 @@ func TestLoadConfig(t *testing.T) { {Name: "jaeger-collector"}, }, }, + WaitForMetadataTimeout: 10 * time.Second, }, }, { @@ -127,6 +130,7 @@ func TestLoadConfig(t *testing.T) { {Name: "jaeger-collector"}, }, }, + WaitForMetadataTimeout: 10 * time.Second, }, }, { @@ -149,6 +153,7 @@ func TestLoadConfig(t *testing.T) { {Name: "jaeger-collector"}, }, }, + WaitForMetadataTimeout: 10 * time.Second, }, }, { diff --git a/processor/k8sattributesprocessor/factory.go b/processor/k8sattributesprocessor/factory.go index 2c3eb39b2914..56d27a2b5102 100644 --- a/processor/k8sattributesprocessor/factory.go +++ b/processor/k8sattributesprocessor/factory.go @@ -5,6 +5,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr import ( "context" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -44,6 +45,7 @@ func createDefaultConfig() component.Config { Extract: ExtractConfig{ Metadata: enabledAttributes(), }, + WaitForMetadataTimeout: 10 * time.Second, } } @@ -202,5 +204,10 @@ func createProcessorOpts(cfg component.Config) []option { opts = append(opts, withExcludes(oCfg.Exclude)) + opts = append(opts, withWaitForMetadataTimeout(oCfg.WaitForMetadataTimeout)) + if oCfg.WaitForMetadata { + opts = append(opts, withWaitForMetadata(true)) + } + return opts } diff --git a/processor/k8sattributesprocessor/generated_component_test.go b/processor/k8sattributesprocessor/generated_component_test.go index 13918bbe7795..38c00260ae43 100644 --- a/processor/k8sattributesprocessor/generated_component_test.go +++ b/processor/k8sattributesprocessor/generated_component_test.go @@ -72,44 +72,6 @@ func TestComponentLifecycle(t *testing.T) { err = c.Shutdown(context.Background()) require.NoError(t, err) }) - t.Run(tt.name+"-lifecycle", func(t *testing.T) { - c, err := tt.createFn(context.Background(), processortest.NewNopSettings(), cfg) - require.NoError(t, err) - host := componenttest.NewNopHost() - err = c.Start(context.Background(), host) - require.NoError(t, err) - require.NotPanics(t, func() { - switch tt.name { - case "logs": - e, ok := c.(processor.Logs) - require.True(t, ok) - logs := generateLifecycleTestLogs() - if !e.Capabilities().MutatesData { - logs.MarkReadOnly() - } - err = e.ConsumeLogs(context.Background(), logs) - case "metrics": - e, ok := c.(processor.Metrics) - require.True(t, ok) - metrics := generateLifecycleTestMetrics() - if !e.Capabilities().MutatesData { - metrics.MarkReadOnly() - } - err = e.ConsumeMetrics(context.Background(), metrics) - case "traces": - e, ok := c.(processor.Traces) - require.True(t, ok) - traces := generateLifecycleTestTraces() - if !e.Capabilities().MutatesData { - traces.MarkReadOnly() - } - err = e.ConsumeTraces(context.Background(), traces) - } - }) - require.NoError(t, err) - err = c.Shutdown(context.Background()) - require.NoError(t, err) - }) } } diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 9624fb250b22..bf8746b3e8ef 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -5,6 +5,7 @@ package kube // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "context" + "errors" "fmt" "regexp" "strings" @@ -39,18 +40,20 @@ var enableRFC3339Timestamp = featuregate.GlobalRegistry().MustRegister( // WatchClient is the main interface provided by this package to a kubernetes cluster. type WatchClient struct { - m sync.RWMutex - deleteMut sync.Mutex - logger *zap.Logger - kc kubernetes.Interface - informer cache.SharedInformer - namespaceInformer cache.SharedInformer - nodeInformer cache.SharedInformer - replicasetInformer cache.SharedInformer - replicasetRegex *regexp.Regexp - cronJobRegex *regexp.Regexp - deleteQueue []deleteRequest - stopCh chan struct{} + m sync.RWMutex + deleteMut sync.Mutex + logger *zap.Logger + kc kubernetes.Interface + informer cache.SharedInformer + namespaceInformer cache.SharedInformer + nodeInformer cache.SharedInformer + replicasetInformer cache.SharedInformer + replicasetRegex *regexp.Regexp + cronJobRegex *regexp.Regexp + deleteQueue []deleteRequest + stopCh chan struct{} + waitForMetadata bool + waitForMetadataTimeout time.Duration // A map containing Pod related data, used to associate them with resources. // Key can be either an IP address or Pod UID @@ -84,21 +87,36 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`) var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`) // New initializes a new k8s Client. -func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet) (Client, error) { +func New( + set component.TelemetrySettings, + apiCfg k8sconfig.APIConfig, + rules ExtractionRules, + filters Filters, + associations []Association, + exclude Excludes, + newClientSet APIClientsetProvider, + newInformer InformerProvider, + newNamespaceInformer InformerProviderNamespace, + newReplicaSetInformer InformerProviderReplicaSet, + waitForMetadata bool, + waitForMetadataTimeout time.Duration, +) (Client, error) { telemetryBuilder, err := metadata.NewTelemetryBuilder(set) if err != nil { return nil, err } c := &WatchClient{ - logger: set.Logger, - Rules: rules, - Filters: filters, - Associations: associations, - Exclude: exclude, - replicasetRegex: rRegex, - cronJobRegex: cronJobRegex, - stopCh: make(chan struct{}), - telemetryBuilder: telemetryBuilder, + logger: set.Logger, + Rules: rules, + Filters: filters, + Associations: associations, + Exclude: exclude, + replicasetRegex: rRegex, + cronJobRegex: cronJobRegex, + stopCh: make(chan struct{}), + telemetryBuilder: telemetryBuilder, + waitForMetadata: waitForMetadata, + waitForMetadataTimeout: waitForMetadataTimeout, } go c.deleteLoop(time.Second*30, defaultPodDeleteGracePeriod) @@ -189,50 +207,67 @@ func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules Extr } // Start registers pod event handlers and starts watching the kubernetes cluster for pod changes. -func (c *WatchClient) Start() { - _, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ +func (c *WatchClient) Start() error { + synced := make([]cache.InformerSynced, 0) + reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handlePodAdd, UpdateFunc: c.handlePodUpdate, DeleteFunc: c.handlePodDelete, }) if err != nil { - c.logger.Error("error adding event handler to pod informer", zap.Error(err)) + return err } + synced = append(synced, reg.HasSynced) go c.informer.Run(c.stopCh) - _, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + reg, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNamespaceAdd, UpdateFunc: c.handleNamespaceUpdate, DeleteFunc: c.handleNamespaceDelete, }) if err != nil { - c.logger.Error("error adding event handler to namespace informer", zap.Error(err)) + return err } + synced = append(synced, reg.HasSynced) go c.namespaceInformer.Run(c.stopCh) if c.Rules.DeploymentName || c.Rules.DeploymentUID { - _, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + reg, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleReplicaSetAdd, UpdateFunc: c.handleReplicaSetUpdate, DeleteFunc: c.handleReplicaSetDelete, }) if err != nil { - c.logger.Error("error adding event handler to replicaset informer", zap.Error(err)) + return err } + synced = append(synced, reg.HasSynced) go c.replicasetInformer.Run(c.stopCh) } if c.nodeInformer != nil { - _, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNodeAdd, UpdateFunc: c.handleNodeUpdate, DeleteFunc: c.handleNodeDelete, }) if err != nil { - c.logger.Error("error adding event handler to node informer", zap.Error(err)) + return err } + synced = append(synced, reg.HasSynced) go c.nodeInformer.Run(c.stopCh) } + + if c.waitForMetadata { + timeoutCh := make(chan struct{}) + t := time.AfterFunc(c.waitForMetadataTimeout, func() { + close(timeoutCh) + }) + defer t.Stop() + if !cache.WaitForCacheSync(timeoutCh, synced...) { + return errors.New("failed to wait for caches to sync") + } + } + return nil } // Stop signals the the k8s watcher/informer to stop watching for new events. diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index 7f6b85d195c7..f5e9cd03bfdc 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -18,6 +18,8 @@ import ( apps_v1 "k8s.io/api/apps/v1" api_v1 "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" @@ -143,29 +145,18 @@ func nodeAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj any)) { } func TestDefaultClientset(t *testing.T) { - c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, nil) + c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, nil, false, 10*time.Second) assert.Error(t, err) assert.Equal(t, "invalid authType for kubernetes: ", err.Error()) assert.Nil(t, c) - c, err = New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, nil) + c, err = New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, nil, false, 10*time.Second) assert.NoError(t, err) assert.NotNil(t, c) } func TestBadFilters(t *testing.T) { - c, err := New( - componenttest.NewNopTelemetrySettings(), - k8sconfig.APIConfig{}, - ExtractionRules{}, - Filters{Fields: []FieldFilter{{Op: selection.Exists}}}, - []Association{}, - Excludes{}, - newFakeAPIClientset, - NewFakeInformer, - NewFakeNamespaceInformer, - NewFakeReplicaSetInformer, - ) + c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{Fields: []FieldFilter{{Op: selection.Exists}}}, []Association{}, Excludes{}, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer, false, 10*time.Second) assert.Error(t, err) assert.Nil(t, c) } @@ -180,7 +171,7 @@ func TestClientStartStop(t *testing.T) { done := make(chan struct{}) assert.False(t, fctr.HasStopped()) go func() { - c.Start() + assert.NoError(t, c.Start()) close(done) }() c.Stop() @@ -201,7 +192,7 @@ func TestConstructorErrors(t *testing.T) { gotAPIConfig = c return nil, fmt.Errorf("error creating k8s client") } - c, err := New(componenttest.NewNopTelemetrySettings(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer, nil) + c, err := New(componenttest.NewNopTelemetrySettings(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer, nil, false, 10*time.Second) assert.Nil(t, c) assert.Error(t, err) assert.Equal(t, "error creating k8s client", err.Error()) @@ -1923,7 +1914,7 @@ func newTestClientWithRulesAndFilters(t *testing.T, f Filters) (*WatchClient, *o }, }, } - c, err := New(set, k8sconfig.APIConfig{}, ExtractionRules{}, f, associations, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer) + c, err := New(set, k8sconfig.APIConfig{}, ExtractionRules{}, f, associations, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer, false, 10*time.Second) require.NoError(t, err) return c.(*WatchClient), logs } @@ -1931,3 +1922,55 @@ func newTestClientWithRulesAndFilters(t *testing.T, f Filters) (*WatchClient, *o func newTestClient(t *testing.T) (*WatchClient, *observer.ObservedLogs) { return newTestClientWithRulesAndFilters(t, Filters{}) } + +type neverSyncedFakeClient struct { + cache.SharedInformer +} + +type neverSyncedResourceEventHandlerRegistration struct { + cache.ResourceEventHandlerRegistration +} + +func (n *neverSyncedResourceEventHandlerRegistration) HasSynced() bool { + return false +} + +func (n *neverSyncedFakeClient) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + delegate, err := n.SharedInformer.AddEventHandler(handler) + if err != nil { + return nil, err + } + return &neverSyncedResourceEventHandlerRegistration{ResourceEventHandlerRegistration: delegate}, nil +} + +func TestWaitForMetadata(t *testing.T) { + testCases := []struct { + name string + informerProvider InformerProvider + err bool + }{{ + name: "no wait", + informerProvider: NewFakeInformer, + err: false, + }, { + name: "wait but never synced", + informerProvider: func(client kubernetes.Interface, namespace string, labelSelector labels.Selector, fieldSelector fields.Selector) cache.SharedInformer { + return &neverSyncedFakeClient{NewFakeInformer(client, namespace, labelSelector, fieldSelector)} + }, + err: true, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, tc.informerProvider, nil, nil, true, 1*time.Second) + require.NoError(t, err) + + err = c.Start() + if tc.err { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/processor/k8sattributesprocessor/internal/kube/fake_informer.go b/processor/k8sattributesprocessor/internal/kube/fake_informer.go index fc5f479a6ddd..15d63c4e9c0c 100644 --- a/processor/k8sattributesprocessor/internal/kube/fake_informer.go +++ b/processor/k8sattributesprocessor/internal/kube/fake_informer.go @@ -40,7 +40,7 @@ func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cach } func (f *FakeInformer) AddEventHandlerWithResyncPeriod(_ cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) { - return nil, nil + return f, nil } func (f *FakeInformer) RemoveEventHandler(_ cache.ResourceEventHandlerRegistration) error { @@ -165,7 +165,7 @@ func (f *NoOpInformer) AddEventHandler(handler cache.ResourceEventHandler) (cach } func (f *NoOpInformer) AddEventHandlerWithResyncPeriod(_ cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) { - return nil, nil + return f, nil } func (f *NoOpInformer) RemoveEventHandler(_ cache.ResourceEventHandlerRegistration) error { diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index 6145cb972235..9faeee2452dd 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -91,12 +91,12 @@ type Client interface { GetPod(PodIdentifier) (*Pod, bool) GetNamespace(string) (*Namespace, bool) GetNode(string) (*Node, bool) - Start() + Start() error Stop() } // ClientProvider defines a func type that returns a new Client. -type ClientProvider func(component.TelemetrySettings, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace, InformerProviderReplicaSet) (Client, error) +type ClientProvider func(component.TelemetrySettings, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace, InformerProviderReplicaSet, bool, time.Duration) (Client, error) // APIClientsetProvider defines a func type that initializes and return a new kubernetes // Clientset object. diff --git a/processor/k8sattributesprocessor/metadata.yaml b/processor/k8sattributesprocessor/metadata.yaml index a388cfcb3dfd..edfb8bbc414d 100644 --- a/processor/k8sattributesprocessor/metadata.yaml +++ b/processor/k8sattributesprocessor/metadata.yaml @@ -114,6 +114,7 @@ resource_attributes: tests: config: + skip_lifecycle: true goleak: skip: true diff --git a/processor/k8sattributesprocessor/options.go b/processor/k8sattributesprocessor/options.go index ec39cdb9b827..4ccccb0d4638 100644 --- a/processor/k8sattributesprocessor/options.go +++ b/processor/k8sattributesprocessor/options.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "regexp" + "time" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "k8s.io/apimachinery/pkg/selection" @@ -381,3 +382,19 @@ func withExcludes(podExclude ExcludeConfig) option { return nil } } + +// withWaitForMetadata allows specifying whether to wait for pod metadata to be synced. +func withWaitForMetadata(wait bool) option { + return func(p *kubernetesprocessor) error { + p.waitForMetadata = wait + return nil + } +} + +// withWaitForMetadataTimeout allows specifying the timeout for waiting for pod metadata to be synced. +func withWaitForMetadataTimeout(timeout time.Duration) option { + return func(p *kubernetesprocessor) error { + p.waitForMetadataTimeout = timeout + return nil + } +} diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index 5d63cbbd100a..98499f5e5473 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "strconv" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" @@ -27,17 +28,19 @@ const ( ) type kubernetesprocessor struct { - cfg component.Config - options []option - telemetrySettings component.TelemetrySettings - logger *zap.Logger - apiConfig k8sconfig.APIConfig - kc kube.Client - passthroughMode bool - rules kube.ExtractionRules - filters kube.Filters - podAssociations []kube.Association - podIgnore kube.Excludes + cfg component.Config + options []option + telemetrySettings component.TelemetrySettings + logger *zap.Logger + apiConfig k8sconfig.APIConfig + kc kube.Client + passthroughMode bool + rules kube.ExtractionRules + filters kube.Filters + podAssociations []kube.Association + podIgnore kube.Excludes + waitForMetadata bool + waitForMetadataTimeout time.Duration } func (kp *kubernetesprocessor) initKubeClient(set component.TelemetrySettings, kubeClient kube.ClientProvider) error { @@ -45,7 +48,7 @@ func (kp *kubernetesprocessor) initKubeClient(set component.TelemetrySettings, k kubeClient = kube.New } if !kp.passthroughMode { - kc, err := kubeClient(set, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil, nil) + kc, err := kubeClient(set, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil, nil, kp.waitForMetadata, kp.waitForMetadataTimeout) if err != nil { return err } @@ -60,7 +63,7 @@ func (kp *kubernetesprocessor) Start(_ context.Context, host component.Host) err for _, opt := range allOptions { if err := opt(kp); err != nil { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) - return nil + return err } } @@ -69,11 +72,15 @@ func (kp *kubernetesprocessor) Start(_ context.Context, host component.Host) err err := kp.initKubeClient(kp.telemetrySettings, kubeClientProvider) if err != nil { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) - return nil + return err } } if !kp.passthroughMode { - go kp.kc.Start() + err := kp.kc.Start() + if err != nil { + componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) + return err + } } return nil } diff --git a/processor/k8sattributesprocessor/processor_test.go b/processor/k8sattributesprocessor/processor_test.go index b8cbf1ec66b0..790c612549b2 100644 --- a/processor/k8sattributesprocessor/processor_test.go +++ b/processor/k8sattributesprocessor/processor_test.go @@ -266,7 +266,7 @@ func TestNewProcessor(t *testing.T) { } func TestProcessorBadClientProvider(t *testing.T) { - clientProvider := func(_ component.TelemetrySettings, _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) { + clientProvider := func(_ component.TelemetrySettings, _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet, _ bool, _ time.Duration) (kube.Client, error) { return nil, fmt.Errorf("bad client error") }