From d6cf1d0447f07bc06b1c88bc7e8eb875f1124a76 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 21 Oct 2024 10:30:25 +0200 Subject: [PATCH 01/25] [extension/opampagent] use status subscription for fine granular health reporting Signed-off-by: Florian Bacher --- extension/opampextension/go.mod | 1 + extension/opampextension/go.sum | 2 + extension/opampextension/opamp_agent.go | 102 ++++++- extension/opampextension/opamp_agent_test.go | 286 +++++++++++++++++++ 4 files changed, 383 insertions(+), 8 deletions(-) diff --git a/extension/opampextension/go.mod b/extension/opampextension/go.mod index 4bc064b766df..58557c0692f7 100644 --- a/extension/opampextension/go.mod +++ b/extension/opampextension/go.mod @@ -7,6 +7,7 @@ require ( github.com/oklog/ulid/v2 v2.1.0 github.com/open-telemetry/opamp-go v0.17.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.111.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status v0.0.0-20241018061824-1fab9bbe8195 github.com/shirou/gopsutil/v4 v4.24.9 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.111.1-0.20241008154146-ea48c09c31ae diff --git a/extension/opampextension/go.sum b/extension/opampextension/go.sum index 13c87b480fa0..cd6a489791cd 100644 --- a/extension/opampextension/go.sum +++ b/extension/opampextension/go.sum @@ -46,6 +46,8 @@ github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= github.com/open-telemetry/opamp-go v0.17.0 h1:3R4+B/6Sy8mknLBbzO3gqloqwTT02rCSRcr4ac2B124= github.com/open-telemetry/opamp-go v0.17.0/go.mod h1:SGDhUoAx7uGutO4ENNMQla/tiSujxgZmMPJXIOPGBdk= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status v0.0.0-20241018061824-1fab9bbe8195 h1:niaM+o3zQ+UI1Q2cLJPVCZI4ET0zt4E6dJWl+JPkOG0= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status v0.0.0-20241018061824-1fab9bbe8195/go.mod h1:Xxm+q149aIvJUFozPs6VrDk0R8Sto2Y0BfGgaVxLEuw= github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index f984974c6c88..4783e45a3820 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -7,18 +7,12 @@ import ( "context" "errors" "fmt" - "net/http" - "os" - "runtime" - "sort" - "strings" - "sync" - "github.com/google/uuid" "github.com/oklog/ulid/v2" "github.com/open-telemetry/opamp-go/client" "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/confmap" @@ -28,12 +22,22 @@ import ( "go.uber.org/zap" "golang.org/x/exp/maps" "gopkg.in/yaml.v3" + "net/http" + "os" + "runtime" + "sort" + "strings" + "sync" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages" ) var _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil) +type statusAggregator interface { + Subscribe(scope status.Scope, verbosity status.Verbosity) (<-chan *status.AggregateStatus, status.UnsubscribeFunc) +} + type opampAgent struct { cfg *Config logger *zap.Logger @@ -59,6 +63,10 @@ type opampAgent struct { opampClient client.OpAMPClient customCapabilityRegistry *customCapabilityRegistry + + statusAggregator statusAggregator + statusSubscriptionWg *sync.WaitGroup + startTimeUnixNano uint64 } var _ opampcustommessages.CustomCapabilityRegistry = (*opampAgent)(nil) @@ -123,7 +131,7 @@ func (o *opampAgent) Start(ctx context.Context, host component.Host) error { return err } - o.setHealth(&protobufs.ComponentHealth{Healthy: false}) + o.initHealthReporting() o.logger.Debug("Starting OpAMP client...") @@ -141,6 +149,8 @@ func (o *opampAgent) Shutdown(ctx context.Context) error { o.lifetimeCtxCancel() } + o.statusSubscriptionWg.Wait() + o.logger.Debug("OpAMP agent shutting down...") if o.opampClient == nil { return nil @@ -244,9 +254,14 @@ func newOpampAgent(cfg *Config, set extension.Settings) (*opampAgent, error) { instanceID: uid, capabilities: cfg.Capabilities, opampClient: opampClient, + statusSubscriptionWg: &sync.WaitGroup{}, customCapabilityRegistry: newCustomCapabilityRegistry(set.Logger, opampClient), } + if agent.capabilities.ReportsHealth { + agent.statusAggregator = status.NewAggregator(status.PriorityPermanent) + } + return agent, nil } @@ -362,8 +377,79 @@ func (o *opampAgent) onMessage(_ context.Context, msg *types.MessageData) { func (o *opampAgent) setHealth(ch *protobufs.ComponentHealth) { if o.capabilities.ReportsHealth && o.opampClient != nil { + + if ch.Healthy && o.startTimeUnixNano == 0 { + ch.StartTimeUnixNano = ch.StatusTimeUnixNano + } else { + ch.StartTimeUnixNano = o.startTimeUnixNano + } if err := o.opampClient.SetHealth(ch); err != nil { o.logger.Error("Could not report health to OpAMP server", zap.Error(err)) } } } + +func (o *opampAgent) initHealthReporting() { + if !o.capabilities.ReportsHealth { + return + } + o.setHealth(&protobufs.ComponentHealth{Healthy: false}) + + statusChan, unsubscribeFunc := o.statusAggregator.Subscribe(status.ScopeAll, status.Verbose) + + o.statusSubscriptionWg.Add(1) + go func() { + for { + select { + case <-o.lifetimeCtx.Done(): + unsubscribeFunc() + o.statusSubscriptionWg.Done() + return + case statusUpdate, ok := <-statusChan: + if !ok { + unsubscribeFunc() + o.statusSubscriptionWg.Done() + return + } + + if statusUpdate == nil { + continue + } + + componentHealth := convertOverallStatus(statusUpdate) + componentHealth.ComponentHealthMap = convertComponentStatusMap(statusUpdate.ComponentStatusMap) + + o.setHealth(componentHealth) + } + } + }() +} + +func convertOverallStatus(statusUpdate *status.AggregateStatus) *protobufs.ComponentHealth { + var isHealthy bool + if statusUpdate.Status() == componentstatus.StatusOK { + isHealthy = true + } else { + isHealthy = false + } + + componentHealth := &protobufs.ComponentHealth{ + Healthy: isHealthy, + Status: statusUpdate.Status().String(), + StatusTimeUnixNano: uint64(statusUpdate.Timestamp().UnixNano()), + } + + if statusUpdate.Err() != nil { + componentHealth.LastError = statusUpdate.Err().Error() + } + return componentHealth +} + +func convertComponentStatusMap(statusMap map[string]*status.AggregateStatus) map[string]*protobufs.ComponentHealth { + res := map[string]*protobufs.ComponentHealth{} + + for comp, stat := range statusMap { + res[comp] = convertOverallStatus(stat) + } + return res +} diff --git a/extension/opampextension/opamp_agent_test.go b/extension/opampextension/opamp_agent_test.go index fd72d346492c..6af4e247f86b 100644 --- a/extension/opampextension/opamp_agent_test.go +++ b/extension/opampextension/opamp_agent_test.go @@ -5,10 +5,16 @@ package opampextension import ( "context" + "fmt" + "github.com/open-telemetry/opamp-go/client/types" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" + "go.opentelemetry.io/collector/component/componentstatus" "os" "path/filepath" "runtime" + "sync" "testing" + "time" "github.com/google/uuid" "github.com/open-telemetry/opamp-go/protobufs" @@ -205,6 +211,205 @@ func TestStart(t *testing.T) { assert.NoError(t, o.Shutdown(context.TODO())) } +func TestHealthReporting(t *testing.T) { + cfg := createDefaultConfig() + set := extensiontest.NewNopSettings() + o, err := newOpampAgent(cfg.(*Config), set) + assert.NoError(t, err) + + statusUpdateChannel := make(chan *status.AggregateStatus) + sa := &mockStatusAggregator{ + statusChan: statusUpdateChannel, + } + o.statusAggregator = sa + + mtx := &sync.RWMutex{} + now := time.Now() + expectedHealthUpdates := []*protobufs.ComponentHealth{ + { + Healthy: false, + }, + { + Healthy: true, + StartTimeUnixNano: uint64(now.UnixNano()), + Status: "StatusOK", + StatusTimeUnixNano: uint64(now.UnixNano()), + ComponentHealthMap: map[string]*protobufs.ComponentHealth{ + "test-receiver": { + Healthy: true, + Status: "StatusOK", + StatusTimeUnixNano: uint64(now.UnixNano()), + }, + }, + }, + { + Healthy: false, + Status: "StatusPermanentError", + StatusTimeUnixNano: uint64(now.UnixNano()), + LastError: "unexpected error", + ComponentHealthMap: map[string]*protobufs.ComponentHealth{ + "test-receiver": { + Healthy: false, + Status: "StatusPermanentError", + StatusTimeUnixNano: uint64(now.UnixNano()), + LastError: "unexpected error", + }, + }, + }, + } + receivedHealthUpdates := 0 + + o.opampClient = &mockOpAMPClient{ + setHealthFunc: func(health *protobufs.ComponentHealth) error { + mtx.Lock() + defer mtx.Unlock() + require.Equal(t, expectedHealthUpdates[receivedHealthUpdates], health) + receivedHealthUpdates++ + return nil + }, + } + + assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) + + statusUpdateChannel <- nil + statusUpdateChannel <- &status.AggregateStatus{ + Event: &mockStatusEvent{ + status: componentstatus.StatusOK, + err: nil, + timestamp: now, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "test-receiver": { + Event: &mockStatusEvent{ + status: componentstatus.StatusOK, + err: nil, + timestamp: now, + }, + }, + }, + } + statusUpdateChannel <- &status.AggregateStatus{ + Event: &mockStatusEvent{ + status: componentstatus.StatusPermanentError, + err: fmt.Errorf("unexpected error"), + timestamp: now, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "test-receiver": { + Event: &mockStatusEvent{ + status: componentstatus.StatusPermanentError, + err: fmt.Errorf("unexpected error"), + timestamp: now, + }, + }, + }, + } + + close(statusUpdateChannel) + + require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() + return receivedHealthUpdates == len(expectedHealthUpdates) + }, 1*time.Second, 100*time.Millisecond) + + assert.NoError(t, o.Shutdown(context.TODO())) + require.True(t, sa.unsubscribed) +} + +func TestHealthReportingExitsOnClosedContext(t *testing.T) { + cfg := createDefaultConfig() + set := extensiontest.NewNopSettings() + o, err := newOpampAgent(cfg.(*Config), set) + assert.NoError(t, err) + + statusUpdateChannel := make(chan *status.AggregateStatus) + sa := &mockStatusAggregator{ + statusChan: statusUpdateChannel, + } + o.statusAggregator = sa + + mtx := &sync.RWMutex{} + now := time.Now() + expectedHealthUpdates := []*protobufs.ComponentHealth{ + { + Healthy: false, + }, + { + Healthy: true, + StartTimeUnixNano: uint64(now.UnixNano()), + Status: "StatusOK", + StatusTimeUnixNano: uint64(now.UnixNano()), + ComponentHealthMap: map[string]*protobufs.ComponentHealth{ + "test-receiver": { + Healthy: true, + Status: "StatusOK", + StatusTimeUnixNano: uint64(now.UnixNano()), + }, + }, + }, + } + receivedHealthUpdates := 0 + + o.opampClient = &mockOpAMPClient{ + setHealthFunc: func(health *protobufs.ComponentHealth) error { + mtx.Lock() + defer mtx.Unlock() + require.Equal(t, expectedHealthUpdates[receivedHealthUpdates], health) + receivedHealthUpdates++ + return nil + }, + } + + assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) + + statusUpdateChannel <- nil + statusUpdateChannel <- &status.AggregateStatus{ + Event: &mockStatusEvent{ + status: componentstatus.StatusOK, + err: nil, + timestamp: now, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "test-receiver": { + Event: &mockStatusEvent{ + status: componentstatus.StatusOK, + err: nil, + timestamp: now, + }, + }, + }, + } + + require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() + return receivedHealthUpdates == len(expectedHealthUpdates) + }, 1*time.Second, 100*time.Millisecond) + + // invoke Shutdown before health update channel has been closed + assert.NoError(t, o.Shutdown(context.TODO())) + require.True(t, sa.unsubscribed) +} + +func TestHealthReportingDisabled(t *testing.T) { + cfg := createDefaultConfig() + set := extensiontest.NewNopSettings() + o, err := newOpampAgent(cfg.(*Config), set) + assert.NoError(t, err) + + o.capabilities.ReportsHealth = false + o.opampClient = &mockOpAMPClient{ + setHealthFunc: func(health *protobufs.ComponentHealth) error { + t.Errorf("setHealth is not supposed to be called with deactivated ReportsHealth capability") + return nil + }, + } + + assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) + assert.NoError(t, o.Shutdown(context.TODO())) +} + func TestParseInstanceIDString(t *testing.T) { testCases := []struct { name string @@ -278,3 +483,84 @@ func TestOpAMPAgent_Dependencies(t *testing.T) { require.Equal(t, []component.ID{authID}, o.Dependencies()) }) } + +type mockStatusAggregator struct { + statusChan chan *status.AggregateStatus + unsubscribed bool +} + +func (m *mockStatusAggregator) Subscribe(_ status.Scope, _ status.Verbosity) (<-chan *status.AggregateStatus, status.UnsubscribeFunc) { + return m.statusChan, func() { + m.unsubscribed = true + } +} + +type mockOpAMPClient struct { + setHealthFunc func(health *protobufs.ComponentHealth) error +} + +func (mockOpAMPClient) Start(_ context.Context, _ types.StartSettings) error { + return nil +} + +func (mockOpAMPClient) Stop(_ context.Context) error { + return nil +} + +func (mockOpAMPClient) SetAgentDescription(_ *protobufs.AgentDescription) error { + return nil +} + +func (mockOpAMPClient) AgentDescription() *protobufs.AgentDescription { + return nil +} + +func (m mockOpAMPClient) SetHealth(health *protobufs.ComponentHealth) error { + return m.setHealthFunc(health) +} + +func (mockOpAMPClient) UpdateEffectiveConfig(_ context.Context) error { + return nil +} + +func (mockOpAMPClient) SetRemoteConfigStatus(_ *protobufs.RemoteConfigStatus) error { + return nil +} + +func (mockOpAMPClient) SetPackageStatuses(_ *protobufs.PackageStatuses) error { + return nil +} + +func (mockOpAMPClient) RequestConnectionSettings(_ *protobufs.ConnectionSettingsRequest) error { + return nil +} + +func (mockOpAMPClient) SetCustomCapabilities(_ *protobufs.CustomCapabilities) error { + return nil +} + +func (mockOpAMPClient) SendCustomMessage(_ *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { + return nil, nil +} + +func (mockOpAMPClient) SetFlags(flags protobufs.AgentToServerFlags) { + +} + +type mockStatusEvent struct { + status componentstatus.Status + err error + timestamp time.Time +} + +func (m mockStatusEvent) Status() componentstatus.Status { + return m.status +} + +func (m mockStatusEvent) Err() error { + return m.err +} + +func (m mockStatusEvent) Timestamp() time.Time { + return m.timestamp +} From 8be384ea1b4062ec5e1b17e0071f71957c48d44d Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 21 Oct 2024 10:44:59 +0200 Subject: [PATCH 02/25] add changelog entry Signed-off-by: Florian Bacher --- .../opamp-extension-health-reporting.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/opamp-extension-health-reporting.yaml diff --git a/.chloggen/opamp-extension-health-reporting.yaml b/.chloggen/opamp-extension-health-reporting.yaml new file mode 100644 index 000000000000..b0a0e7f209dc --- /dev/null +++ b/.chloggen/opamp-extension-health-reporting.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampextension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Use status subscription for fine granular component health reporting + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35856] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From 5fd8e8be265799419dd72314dc335a130393b4bc Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 21 Oct 2024 10:53:25 +0200 Subject: [PATCH 03/25] fix linting Signed-off-by: Florian Bacher --- extension/opampextension/opamp_agent.go | 15 ++++----- extension/opampextension/opamp_agent_test.go | 33 ++++++++++---------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 4783e45a3820..908289202599 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -7,12 +7,18 @@ import ( "context" "errors" "fmt" + "net/http" + "os" + "runtime" + "sort" + "strings" + "sync" + "github.com/google/uuid" "github.com/oklog/ulid/v2" "github.com/open-telemetry/opamp-go/client" "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/confmap" @@ -22,14 +28,9 @@ import ( "go.uber.org/zap" "golang.org/x/exp/maps" "gopkg.in/yaml.v3" - "net/http" - "os" - "runtime" - "sort" - "strings" - "sync" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" ) var _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil) diff --git a/extension/opampextension/opamp_agent_test.go b/extension/opampextension/opamp_agent_test.go index 6af4e247f86b..1baef08503e6 100644 --- a/extension/opampextension/opamp_agent_test.go +++ b/extension/opampextension/opamp_agent_test.go @@ -6,9 +6,6 @@ package opampextension import ( "context" "fmt" - "github.com/open-telemetry/opamp-go/client/types" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" - "go.opentelemetry.io/collector/component/componentstatus" "os" "path/filepath" "runtime" @@ -17,14 +14,18 @@ import ( "time" "github.com/google/uuid" + "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/extension/extensiontest" semconv "go.opentelemetry.io/collector/semconv/v1.27.0" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" ) func TestNewOpampAgent(t *testing.T) { @@ -400,7 +401,7 @@ func TestHealthReportingDisabled(t *testing.T) { o.capabilities.ReportsHealth = false o.opampClient = &mockOpAMPClient{ - setHealthFunc: func(health *protobufs.ComponentHealth) error { + setHealthFunc: func(_ *protobufs.ComponentHealth) error { t.Errorf("setHealth is not supposed to be called with deactivated ReportsHealth capability") return nil }, @@ -499,19 +500,19 @@ type mockOpAMPClient struct { setHealthFunc func(health *protobufs.ComponentHealth) error } -func (mockOpAMPClient) Start(_ context.Context, _ types.StartSettings) error { +func (m mockOpAMPClient) Start(_ context.Context, _ types.StartSettings) error { return nil } -func (mockOpAMPClient) Stop(_ context.Context) error { +func (m mockOpAMPClient) Stop(_ context.Context) error { return nil } -func (mockOpAMPClient) SetAgentDescription(_ *protobufs.AgentDescription) error { +func (m mockOpAMPClient) SetAgentDescription(_ *protobufs.AgentDescription) error { return nil } -func (mockOpAMPClient) AgentDescription() *protobufs.AgentDescription { +func (m mockOpAMPClient) AgentDescription() *protobufs.AgentDescription { return nil } @@ -519,33 +520,31 @@ func (m mockOpAMPClient) SetHealth(health *protobufs.ComponentHealth) error { return m.setHealthFunc(health) } -func (mockOpAMPClient) UpdateEffectiveConfig(_ context.Context) error { +func (m mockOpAMPClient) UpdateEffectiveConfig(_ context.Context) error { return nil } -func (mockOpAMPClient) SetRemoteConfigStatus(_ *protobufs.RemoteConfigStatus) error { +func (m mockOpAMPClient) SetRemoteConfigStatus(_ *protobufs.RemoteConfigStatus) error { return nil } -func (mockOpAMPClient) SetPackageStatuses(_ *protobufs.PackageStatuses) error { +func (m mockOpAMPClient) SetPackageStatuses(_ *protobufs.PackageStatuses) error { return nil } -func (mockOpAMPClient) RequestConnectionSettings(_ *protobufs.ConnectionSettingsRequest) error { +func (m mockOpAMPClient) RequestConnectionSettings(_ *protobufs.ConnectionSettingsRequest) error { return nil } -func (mockOpAMPClient) SetCustomCapabilities(_ *protobufs.CustomCapabilities) error { +func (m mockOpAMPClient) SetCustomCapabilities(_ *protobufs.CustomCapabilities) error { return nil } -func (mockOpAMPClient) SendCustomMessage(_ *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { +func (m mockOpAMPClient) SendCustomMessage(_ *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { return nil, nil } -func (mockOpAMPClient) SetFlags(flags protobufs.AgentToServerFlags) { - -} +func (m mockOpAMPClient) SetFlags(_ protobufs.AgentToServerFlags) {} type mockStatusEvent struct { status componentstatus.Status From 24b32e3284709c4ddd5f6e0dfae6b7fdc21ff9fc Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 21 Oct 2024 14:08:30 +0200 Subject: [PATCH 04/25] fix crosslinking Signed-off-by: Florian Bacher --- extension/opampextension/go.mod | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extension/opampextension/go.mod b/extension/opampextension/go.mod index 58557c0692f7..280b674532bf 100644 --- a/extension/opampextension/go.mod +++ b/extension/opampextension/go.mod @@ -67,3 +67,5 @@ require ( ) replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages => ../opampcustommessages + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status => ../../pkg/status From cc815b4a2a742a5ba87661db9da420c82ad96a5b Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 21 Oct 2024 14:43:31 +0200 Subject: [PATCH 05/25] go tidy Signed-off-by: Florian Bacher --- extension/opampextension/go.sum | 2 -- 1 file changed, 2 deletions(-) diff --git a/extension/opampextension/go.sum b/extension/opampextension/go.sum index cd6a489791cd..13c87b480fa0 100644 --- a/extension/opampextension/go.sum +++ b/extension/opampextension/go.sum @@ -46,8 +46,6 @@ github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= github.com/open-telemetry/opamp-go v0.17.0 h1:3R4+B/6Sy8mknLBbzO3gqloqwTT02rCSRcr4ac2B124= github.com/open-telemetry/opamp-go v0.17.0/go.mod h1:SGDhUoAx7uGutO4ENNMQla/tiSujxgZmMPJXIOPGBdk= -github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status v0.0.0-20241018061824-1fab9bbe8195 h1:niaM+o3zQ+UI1Q2cLJPVCZI4ET0zt4E6dJWl+JPkOG0= -github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status v0.0.0-20241018061824-1fab9bbe8195/go.mod h1:Xxm+q149aIvJUFozPs6VrDk0R8Sto2Y0BfGgaVxLEuw= github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= From 2b172ae30f88761cd80a02f77ebe59626bf012c4 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Fri, 25 Oct 2024 11:12:17 +0200 Subject: [PATCH 06/25] receive component health updates and forward them to status aggregator Signed-off-by: Florian Bacher --- extension/opampextension/opamp_agent.go | 138 +++++++++++++++---- extension/opampextension/opamp_agent_test.go | 82 ++++++++++- 2 files changed, 188 insertions(+), 32 deletions(-) diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 908289202599..c1c53a47a420 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -37,6 +37,12 @@ var _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil) type statusAggregator interface { Subscribe(scope status.Scope, verbosity status.Verbosity) (<-chan *status.AggregateStatus, status.UnsubscribeFunc) + RecordStatus(source *componentstatus.InstanceID, event *componentstatus.Event) +} + +type eventSourcePair struct { + source *componentstatus.InstanceID + event *componentstatus.Event } type opampAgent struct { @@ -67,7 +73,10 @@ type opampAgent struct { statusAggregator statusAggregator statusSubscriptionWg *sync.WaitGroup + componentHealthWg *sync.WaitGroup startTimeUnixNano uint64 + componentStatusCh chan *eventSourcePair + readyCh chan interface{} } var _ opampcustommessages.CustomCapabilityRegistry = (*opampAgent)(nil) @@ -196,6 +205,7 @@ func (o *opampAgent) Register(capability string, opts ...opampcustommessages.Cus func (o *opampAgent) Ready() error { o.setHealth(&protobufs.ComponentHealth{Healthy: true}) + close(o.readyCh) return nil } @@ -204,6 +214,27 @@ func (o *opampAgent) NotReady() error { return nil } +// ComponentStatusChanged implements the componentstatus.Watcher interface. +func (o *opampAgent) ComponentStatusChanged( + source *componentstatus.InstanceID, + event *componentstatus.Event, +) { + // There can be late arriving events after shutdown. We need to close + // the event channel so that this function doesn't block and we release all + // goroutines, but attempting to write to a closed channel will panic; log + // and recover. + defer func() { + if r := recover(); r != nil { + o.logger.Info( + "discarding event received after shutdown", + zap.Any("source", source), + zap.Any("event", event), + ) + } + }() + o.componentStatusCh <- &eventSourcePair{source: source, event: event} +} + func (o *opampAgent) updateEffectiveConfig(conf *confmap.Conf) { o.eclk.Lock() defer o.eclk.Unlock() @@ -256,6 +287,8 @@ func newOpampAgent(cfg *Config, set extension.Settings) (*opampAgent, error) { capabilities: cfg.Capabilities, opampClient: opampClient, statusSubscriptionWg: &sync.WaitGroup{}, + componentHealthWg: &sync.WaitGroup{}, + readyCh: make(chan interface{}), customCapabilityRegistry: newCustomCapabilityRegistry(set.Logger, opampClient), } @@ -396,37 +429,85 @@ func (o *opampAgent) initHealthReporting() { } o.setHealth(&protobufs.ComponentHealth{Healthy: false}) - statusChan, unsubscribeFunc := o.statusAggregator.Subscribe(status.ScopeAll, status.Verbose) + o.componentStatusCh = make(chan *eventSourcePair) + o.componentHealthWg.Add(1) + go o.componentHealthEventLoop() + statusChan, unsubscribeFunc := o.statusAggregator.Subscribe(status.ScopeAll, status.Verbose) o.statusSubscriptionWg.Add(1) - go func() { - for { - select { - case <-o.lifetimeCtx.Done(): - unsubscribeFunc() - o.statusSubscriptionWg.Done() + go o.statusAggregatorEventLoop(unsubscribeFunc, statusChan) +} + +func (o *opampAgent) componentHealthEventLoop() { + // Record events with component.StatusStarting, but queue other events until + // PipelineWatcher.Ready is called. This prevents aggregate statuses from + // flapping between StatusStarting and StatusOK as components are started + // individually by the service. + var eventQueue []*eventSourcePair + + defer o.componentHealthWg.Done() + for loop := true; loop; { + select { + case esp, ok := <-o.componentStatusCh: + if !ok { return - case statusUpdate, ok := <-statusChan: - if !ok { - unsubscribeFunc() - o.statusSubscriptionWg.Done() - return - } + } + if esp.event.Status() != componentstatus.StatusStarting { + eventQueue = append(eventQueue, esp) + continue + } + o.statusAggregator.RecordStatus(esp.source, esp.event) + case <-o.readyCh: + for _, esp := range eventQueue { + o.statusAggregator.RecordStatus(esp.source, esp.event) + } + eventQueue = nil + loop = false + case <-o.lifetimeCtx.Done(): + return + } + } - if statusUpdate == nil { - continue - } + // After PipelineWatcher.Ready, record statuses as they are received. + for { + select { + case esp, ok := <-o.componentStatusCh: + if !ok { + return + } + o.statusAggregator.RecordStatus(esp.source, esp.event) + case <-o.lifetimeCtx.Done(): + return + } + } +} - componentHealth := convertOverallStatus(statusUpdate) - componentHealth.ComponentHealthMap = convertComponentStatusMap(statusUpdate.ComponentStatusMap) +func (o *opampAgent) statusAggregatorEventLoop(unsubscribeFunc status.UnsubscribeFunc, statusChan <-chan *status.AggregateStatus) { + defer func() { + unsubscribeFunc() + o.statusSubscriptionWg.Done() + }() + for { + select { + case <-o.lifetimeCtx.Done(): + return + case statusUpdate, ok := <-statusChan: + if !ok { + return + } - o.setHealth(componentHealth) + if statusUpdate == nil { + continue } + + componentHealth := convertComponentHealth(statusUpdate) + + o.setHealth(componentHealth) } - }() + } } -func convertOverallStatus(statusUpdate *status.AggregateStatus) *protobufs.ComponentHealth { +func convertComponentHealth(statusUpdate *status.AggregateStatus) *protobufs.ComponentHealth { var isHealthy bool if statusUpdate.Status() == componentstatus.StatusOK { isHealthy = true @@ -443,14 +524,13 @@ func convertOverallStatus(statusUpdate *status.AggregateStatus) *protobufs.Compo if statusUpdate.Err() != nil { componentHealth.LastError = statusUpdate.Err().Error() } - return componentHealth -} -func convertComponentStatusMap(statusMap map[string]*status.AggregateStatus) map[string]*protobufs.ComponentHealth { - res := map[string]*protobufs.ComponentHealth{} - - for comp, stat := range statusMap { - res[comp] = convertOverallStatus(stat) + if len(statusUpdate.ComponentStatusMap) > 0 { + componentHealth.ComponentHealthMap = map[string]*protobufs.ComponentHealth{} + for comp, compState := range statusUpdate.ComponentStatusMap { + componentHealth.ComponentHealthMap[comp] = convertComponentHealth(compState) + } } - return res + + return componentHealth } diff --git a/extension/opampextension/opamp_agent_test.go b/extension/opampextension/opamp_agent_test.go index 1baef08503e6..b566d4cc3fd9 100644 --- a/extension/opampextension/opamp_agent_test.go +++ b/extension/opampextension/opamp_agent_test.go @@ -6,6 +6,7 @@ package opampextension import ( "context" "fmt" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status/testhelpers" "os" "path/filepath" "runtime" @@ -212,7 +213,7 @@ func TestStart(t *testing.T) { assert.NoError(t, o.Shutdown(context.TODO())) } -func TestHealthReporting(t *testing.T) { +func TestHealthReportingReceiveUpdateFromAggregator(t *testing.T) { cfg := createDefaultConfig() set := extensiontest.NewNopSettings() o, err := newOpampAgent(cfg.(*Config), set) @@ -318,6 +319,73 @@ func TestHealthReporting(t *testing.T) { require.True(t, sa.unsubscribed) } +func TestHealthReportingForwardComponentHealthToAggregator(t *testing.T) { + cfg := createDefaultConfig() + set := extensiontest.NewNopSettings() + o, err := newOpampAgent(cfg.(*Config), set) + assert.NoError(t, err) + + sa := &mockStatusAggregator{} + o.statusAggregator = sa + + assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) + + traces := testhelpers.NewPipelineMetadata("traces") + + // StatusStarting will be sent immediately. + for _, id := range traces.InstanceIDs() { + o.ComponentStatusChanged(id, componentstatus.NewEvent(componentstatus.StatusStarting)) + } + + // StatusOK will be queued until the PipelineWatcher Ready method is called. + for _, id := range traces.InstanceIDs() { + o.ComponentStatusChanged(id, componentstatus.NewEvent(componentstatus.StatusOK)) + } + + // verify we have received the StatusStarting events + require.Eventually(t, func() bool { + return len(sa.receivedEvents) == len(traces.InstanceIDs()) + }, 5*time.Second, 100*time.Millisecond) + + for _, event := range sa.receivedEvents { + require.Equal(t, componentstatus.NewEvent(componentstatus.StatusStarting).Status(), event.event.Status()) + } + + // clean the received events of the mocked status aggregator + sa.receivedEvents = nil + + err = o.Ready() + require.NoError(t, err) + + // verify we have received the StatusOK events that have been queued while the agent has not been ready + require.Eventually(t, func() bool { + return len(sa.receivedEvents) == len(traces.InstanceIDs()) + }, 5*time.Second, 100*time.Millisecond) + + for _, event := range sa.receivedEvents { + require.Equal(t, componentstatus.NewEvent(componentstatus.StatusOK).Status(), event.event.Status()) + } + + // clean the received events of the mocked status aggregator + sa.receivedEvents = nil + + // send another set of events - these should be passed through immediately + for _, id := range traces.InstanceIDs() { + o.ComponentStatusChanged(id, componentstatus.NewEvent(componentstatus.StatusStopping)) + } + + require.Eventually(t, func() bool { + return len(sa.receivedEvents) == len(traces.InstanceIDs()) + }, 5*time.Second, 100*time.Millisecond) + + for _, event := range sa.receivedEvents { + require.Equal(t, componentstatus.NewEvent(componentstatus.StatusStopping).Status(), event.event.Status()) + } + + assert.NoError(t, o.Shutdown(context.TODO())) + require.True(t, sa.unsubscribed) +} + func TestHealthReportingExitsOnClosedContext(t *testing.T) { cfg := createDefaultConfig() set := extensiontest.NewNopSettings() @@ -486,8 +554,9 @@ func TestOpAMPAgent_Dependencies(t *testing.T) { } type mockStatusAggregator struct { - statusChan chan *status.AggregateStatus - unsubscribed bool + statusChan chan *status.AggregateStatus + receivedEvents []eventSourcePair + unsubscribed bool } func (m *mockStatusAggregator) Subscribe(_ status.Scope, _ status.Verbosity) (<-chan *status.AggregateStatus, status.UnsubscribeFunc) { @@ -496,6 +565,13 @@ func (m *mockStatusAggregator) Subscribe(_ status.Scope, _ status.Verbosity) (<- } } +func (m *mockStatusAggregator) RecordStatus(source *componentstatus.InstanceID, event *componentstatus.Event) { + m.receivedEvents = append(m.receivedEvents, eventSourcePair{ + source: source, + event: event, + }) +} + type mockOpAMPClient struct { setHealthFunc func(health *protobufs.ComponentHealth) error } From 38ecfee413f7e290224cb152e28a29ca610f13ac Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 28 Oct 2024 08:32:20 +0100 Subject: [PATCH 07/25] fix linting Signed-off-by: Florian Bacher --- extension/opampextension/opamp_agent.go | 4 ++-- extension/opampextension/opamp_agent_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index c246bcddc1f9..b26e4f202ddb 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -79,7 +79,7 @@ type opampAgent struct { componentHealthWg *sync.WaitGroup startTimeUnixNano uint64 componentStatusCh chan *eventSourcePair - readyCh chan interface{} + readyCh chan struct{} } var _ opampcustommessages.CustomCapabilityRegistry = (*opampAgent)(nil) @@ -291,7 +291,7 @@ func newOpampAgent(cfg *Config, set extension.Settings) (*opampAgent, error) { opampClient: opampClient, statusSubscriptionWg: &sync.WaitGroup{}, componentHealthWg: &sync.WaitGroup{}, - readyCh: make(chan interface{}), + readyCh: make(chan struct{}), customCapabilityRegistry: newCustomCapabilityRegistry(set.Logger, opampClient), } diff --git a/extension/opampextension/opamp_agent_test.go b/extension/opampextension/opamp_agent_test.go index 01830b486da8..1b4cc6f4bebc 100644 --- a/extension/opampextension/opamp_agent_test.go +++ b/extension/opampextension/opamp_agent_test.go @@ -6,7 +6,6 @@ package opampextension import ( "context" "fmt" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status/testhelpers" "os" "path/filepath" "runtime" @@ -25,9 +24,10 @@ import ( "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/extension/extensiontest" semconv "go.opentelemetry.io/collector/semconv/v1.27.0" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" - "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status/testhelpers" ) func TestNewOpampAgent(t *testing.T) { From 2b829b08998de005839d5351ee87977a148bdf39 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 28 Oct 2024 09:53:04 +0100 Subject: [PATCH 08/25] fix data race Signed-off-by: Florian Bacher --- extension/opampextension/opamp_agent_test.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/extension/opampextension/opamp_agent_test.go b/extension/opampextension/opamp_agent_test.go index 1b4cc6f4bebc..173c98f88697 100644 --- a/extension/opampextension/opamp_agent_test.go +++ b/extension/opampextension/opamp_agent_test.go @@ -330,7 +330,11 @@ func TestHealthReportingForwardComponentHealthToAggregator(t *testing.T) { o, err := newOpampAgent(cfg.(*Config), set) assert.NoError(t, err) - sa := &mockStatusAggregator{} + mtx := &sync.RWMutex{} + + sa := &mockStatusAggregator{ + mtx: mtx, + } o.statusAggregator = sa assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) @@ -349,6 +353,8 @@ func TestHealthReportingForwardComponentHealthToAggregator(t *testing.T) { // verify we have received the StatusStarting events require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() return len(sa.receivedEvents) == len(traces.InstanceIDs()) }, 5*time.Second, 100*time.Millisecond) @@ -364,6 +370,8 @@ func TestHealthReportingForwardComponentHealthToAggregator(t *testing.T) { // verify we have received the StatusOK events that have been queued while the agent has not been ready require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() return len(sa.receivedEvents) == len(traces.InstanceIDs()) }, 5*time.Second, 100*time.Millisecond) @@ -380,6 +388,8 @@ func TestHealthReportingForwardComponentHealthToAggregator(t *testing.T) { } require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() return len(sa.receivedEvents) == len(traces.InstanceIDs()) }, 5*time.Second, 100*time.Millisecond) @@ -562,6 +572,7 @@ type mockStatusAggregator struct { statusChan chan *status.AggregateStatus receivedEvents []eventSourcePair unsubscribed bool + mtx *sync.RWMutex } func (m *mockStatusAggregator) Subscribe(_ status.Scope, _ status.Verbosity) (<-chan *status.AggregateStatus, status.UnsubscribeFunc) { @@ -571,6 +582,8 @@ func (m *mockStatusAggregator) Subscribe(_ status.Scope, _ status.Verbosity) (<- } func (m *mockStatusAggregator) RecordStatus(source *componentstatus.InstanceID, event *componentstatus.Event) { + m.mtx.Lock() + defer m.mtx.Unlock() m.receivedEvents = append(m.receivedEvents, eventSourcePair{ source: source, event: event, From 7a47fb88e58cbe4f72dbc4a567d0580fc8c022e7 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 28 Oct 2024 13:07:46 +0100 Subject: [PATCH 09/25] ensure the component status event loop is started before updates are sent to opamp agent Signed-off-by: Florian Bacher --- extension/opampextension/opamp_agent.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index b26e4f202ddb..e976edea63da 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -101,8 +101,6 @@ func (o *opampAgent) Start(ctx context.Context, host component.Host) error { return err } - o.lifetimeCtx, o.lifetimeCtxCancel = context.WithCancel(context.Background()) - if o.cfg.PPID != 0 { go monitorPPID(o.lifetimeCtx, o.cfg.PPIDPollInterval, o.cfg.PPID, o.reportFunc) } @@ -295,8 +293,16 @@ func newOpampAgent(cfg *Config, set extension.Settings) (*opampAgent, error) { customCapabilityRegistry: newCustomCapabilityRegistry(set.Logger, opampClient), } + agent.lifetimeCtx, agent.lifetimeCtxCancel = context.WithCancel(context.Background()) + if agent.capabilities.ReportsHealth { agent.statusAggregator = status.NewAggregator(status.PriorityPermanent) + + // Start processing events in the background so that our status watcher doesn't + // block others before the extension starts. + agent.componentStatusCh = make(chan *eventSourcePair) + agent.componentHealthWg.Add(1) + go agent.componentHealthEventLoop() } return agent, nil @@ -452,10 +458,6 @@ func (o *opampAgent) initHealthReporting() { } o.setHealth(&protobufs.ComponentHealth{Healthy: false}) - o.componentStatusCh = make(chan *eventSourcePair) - o.componentHealthWg.Add(1) - go o.componentHealthEventLoop() - statusChan, unsubscribeFunc := o.statusAggregator.Subscribe(status.ScopeAll, status.Verbose) o.statusSubscriptionWg.Add(1) go o.statusAggregatorEventLoop(unsubscribeFunc, statusChan) From 6cd557f6f7b4dca4f26199cc536ccb61c23a170d Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 28 Oct 2024 13:50:18 +0100 Subject: [PATCH 10/25] increase test timeout Signed-off-by: Florian Bacher --- cmd/opampsupervisor/e2e_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 5b6a7ac4adbf..0284f85e428e 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -938,7 +938,7 @@ func TestSupervisorRestartCommand(t *testing.T) { return health.Healthy && health.LastError == "" } return false - }, 10*time.Second, 250*time.Millisecond, "Collector never reported healthy after restart") + }, 20*time.Second, 250*time.Millisecond, "Collector never reported healthy after restart") } func TestSupervisorOpAMPConnectionSettings(t *testing.T) { From 1f8ea967a2fc6ca2387bd547e80e52384d14cd4f Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 19 Nov 2024 13:06:37 +0100 Subject: [PATCH 11/25] fix linting Signed-off-by: Florian Bacher --- extension/opampextension/opamp_agent.go | 1 - 1 file changed, 1 deletion(-) diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index d88551082e00..d9cc50e42c76 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -425,7 +425,6 @@ func (o *opampAgent) onMessage(_ context.Context, msg *types.MessageData) { func (o *opampAgent) setHealth(ch *protobufs.ComponentHealth) { if o.capabilities.ReportsHealth && o.opampClient != nil { - if ch.Healthy && o.startTimeUnixNano == 0 { ch.StartTimeUnixNano = ch.StatusTimeUnixNano } else { From 21017488ece9ae4427f931f90c09fbff805e4d30 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Wed, 20 Nov 2024 10:21:11 +0100 Subject: [PATCH 12/25] consolidate health reporting setup into initHealthReporting function Signed-off-by: Florian Bacher --- extension/opampextension/opamp_agent.go | 21 +++--- extension/opampextension/opamp_agent_test.go | 75 +++++++++++++++----- 2 files changed, 68 insertions(+), 28 deletions(-) diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index d9cc50e42c76..f9146971eeb0 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -142,8 +142,6 @@ func (o *opampAgent) Start(ctx context.Context, host component.Host) error { return err } - o.initHealthReporting() - o.logger.Debug("Starting OpAMP client...") if err := o.opampClient.Start(context.Background(), settings); err != nil { @@ -296,13 +294,7 @@ func newOpampAgent(cfg *Config, set extension.Settings) (*opampAgent, error) { agent.lifetimeCtx, agent.lifetimeCtxCancel = context.WithCancel(context.Background()) if agent.capabilities.ReportsHealth { - agent.statusAggregator = status.NewAggregator(status.PriorityPermanent) - - // Start processing events in the background so that our status watcher doesn't - // block others before the extension starts. - agent.componentStatusCh = make(chan *eventSourcePair) - agent.componentHealthWg.Add(1) - go agent.componentHealthEventLoop() + agent.initHealthReporting() } return agent, nil @@ -460,9 +452,18 @@ func (o *opampAgent) initHealthReporting() { } o.setHealth(&protobufs.ComponentHealth{Healthy: false}) + if o.statusAggregator == nil { + o.statusAggregator = status.NewAggregator(status.PriorityPermanent) + } statusChan, unsubscribeFunc := o.statusAggregator.Subscribe(status.ScopeAll, status.Verbose) o.statusSubscriptionWg.Add(1) go o.statusAggregatorEventLoop(unsubscribeFunc, statusChan) + + // Start processing events in the background so that our status watcher doesn't + // block others before the extension starts. + o.componentStatusCh = make(chan *eventSourcePair) + o.componentHealthWg.Add(1) + go o.componentHealthEventLoop() } func (o *opampAgent) componentHealthEventLoop() { @@ -523,7 +524,7 @@ func (o *opampAgent) statusAggregatorEventLoop(unsubscribeFunc status.Unsubscrib return } - if statusUpdate == nil { + if statusUpdate == nil || statusUpdate.Status() == componentstatus.StatusNone { continue } diff --git a/extension/opampextension/opamp_agent_test.go b/extension/opampextension/opamp_agent_test.go index 71489a42fcf5..da547b098f1b 100644 --- a/extension/opampextension/opamp_agent_test.go +++ b/extension/opampextension/opamp_agent_test.go @@ -6,6 +6,7 @@ package opampextension import ( "context" "fmt" + "go.opentelemetry.io/collector/extension" "os" "path/filepath" "runtime" @@ -43,6 +44,7 @@ func TestNewOpampAgent(t *testing.T) { assert.True(t, o.capabilities.ReportsHealth) assert.Empty(t, o.effectiveConfig) assert.Nil(t, o.agentDescription) + assert.NoError(t, o.Shutdown(context.TODO())) } func TestNewOpampAgentAttributes(t *testing.T) { @@ -57,6 +59,7 @@ func TestNewOpampAgentAttributes(t *testing.T) { assert.Equal(t, "otelcol-distro", o.agentType) assert.Equal(t, "distro.0", o.agentVersion) assert.Equal(t, "f8999bc1-4c9b-4619-9bae-7f009d2411ec", o.instanceID.String()) + assert.NoError(t, o.Shutdown(context.TODO())) } func TestCreateAgentDescription(t *testing.T) { @@ -155,6 +158,7 @@ func TestCreateAgentDescription(t *testing.T) { err = o.createAgentDescription() assert.NoError(t, err) require.Equal(t, tc.expected, o.agentDescription) + assert.NoError(t, o.Shutdown(context.TODO())) }) } } @@ -173,6 +177,7 @@ func TestUpdateAgentIdentity(t *testing.T) { o.updateAgentIdentity(uid) assert.Equal(t, o.instanceID, uid) + assert.NoError(t, o.Shutdown(context.TODO())) } func TestComposeEffectiveConfig(t *testing.T) { @@ -196,6 +201,8 @@ func TestComposeEffectiveConfig(t *testing.T) { assert.NotNil(t, ec) assert.YAMLEq(t, string(expected), string(ec.ConfigMap.ConfigMap[""].Body)) assert.Equal(t, "text/yaml", ec.ConfigMap.ConfigMap[""].ContentType) + + assert.NoError(t, o.Shutdown(context.TODO())) } func TestShutdown(t *testing.T) { @@ -219,16 +226,10 @@ func TestStart(t *testing.T) { } func TestHealthReportingReceiveUpdateFromAggregator(t *testing.T) { - cfg := createDefaultConfig() + cfg := createDefaultConfig().(*Config) set := extensiontest.NewNopSettings() - o, err := newOpampAgent(cfg.(*Config), set) - assert.NoError(t, err) statusUpdateChannel := make(chan *status.AggregateStatus) - sa := &mockStatusAggregator{ - statusChan: statusUpdateChannel, - } - o.statusAggregator = sa mtx := &sync.RWMutex{} now := time.Now() @@ -266,7 +267,7 @@ func TestHealthReportingReceiveUpdateFromAggregator(t *testing.T) { } receivedHealthUpdates := 0 - o.opampClient = &mockOpAMPClient{ + mockOpampClient := &mockOpAMPClient{ setHealthFunc: func(health *protobufs.ComponentHealth) error { mtx.Lock() defer mtx.Unlock() @@ -276,6 +277,14 @@ func TestHealthReportingReceiveUpdateFromAggregator(t *testing.T) { }, } + sa := &mockStatusAggregator{ + statusChan: statusUpdateChannel, + } + + o := newTestOpampAgent(cfg, set, mockOpampClient, sa) + + o.initHealthReporting() + assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) statusUpdateChannel <- nil @@ -325,17 +334,25 @@ func TestHealthReportingReceiveUpdateFromAggregator(t *testing.T) { } func TestHealthReportingForwardComponentHealthToAggregator(t *testing.T) { - cfg := createDefaultConfig() + cfg := createDefaultConfig().(*Config) set := extensiontest.NewNopSettings() - o, err := newOpampAgent(cfg.(*Config), set) - assert.NoError(t, err) mtx := &sync.RWMutex{} sa := &mockStatusAggregator{ mtx: mtx, } - o.statusAggregator = sa + + o := newTestOpampAgent( + cfg, + set, + &mockOpAMPClient{ + setHealthFunc: func(_ *protobufs.ComponentHealth) error { + return nil + }, + }, sa) + + o.initHealthReporting() assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) @@ -365,7 +382,7 @@ func TestHealthReportingForwardComponentHealthToAggregator(t *testing.T) { // clean the received events of the mocked status aggregator sa.receivedEvents = nil - err = o.Ready() + err := o.Ready() require.NoError(t, err) // verify we have received the StatusOK events that have been queued while the agent has not been ready @@ -402,16 +419,13 @@ func TestHealthReportingForwardComponentHealthToAggregator(t *testing.T) { } func TestHealthReportingExitsOnClosedContext(t *testing.T) { - cfg := createDefaultConfig() + cfg := createDefaultConfig().(*Config) set := extensiontest.NewNopSettings() - o, err := newOpampAgent(cfg.(*Config), set) - assert.NoError(t, err) statusUpdateChannel := make(chan *status.AggregateStatus) sa := &mockStatusAggregator{ statusChan: statusUpdateChannel, } - o.statusAggregator = sa mtx := &sync.RWMutex{} now := time.Now() @@ -435,7 +449,7 @@ func TestHealthReportingExitsOnClosedContext(t *testing.T) { } receivedHealthUpdates := 0 - o.opampClient = &mockOpAMPClient{ + mockOpampClient := &mockOpAMPClient{ setHealthFunc: func(health *protobufs.ComponentHealth) error { mtx.Lock() defer mtx.Unlock() @@ -445,6 +459,10 @@ func TestHealthReportingExitsOnClosedContext(t *testing.T) { }, } + o := newTestOpampAgent(cfg, set, mockOpampClient, sa) + + o.initHealthReporting() + assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) statusUpdateChannel <- nil @@ -657,3 +675,24 @@ func (m mockStatusEvent) Err() error { func (m mockStatusEvent) Timestamp() time.Time { return m.timestamp } + +func newTestOpampAgent(cfg *Config, set extension.Settings, mockOpampClient *mockOpAMPClient, sa *mockStatusAggregator) *opampAgent { + uid := uuid.New() + o := &opampAgent{ + cfg: cfg, + logger: set.Logger, + agentType: set.BuildInfo.Command, + agentVersion: set.BuildInfo.Version, + instanceID: uid, + capabilities: cfg.Capabilities, + opampClient: mockOpampClient, + statusSubscriptionWg: &sync.WaitGroup{}, + componentHealthWg: &sync.WaitGroup{}, + readyCh: make(chan struct{}), + customCapabilityRegistry: newCustomCapabilityRegistry(set.Logger, mockOpampClient), + statusAggregator: sa, + } + + o.lifetimeCtx, o.lifetimeCtxCancel = context.WithCancel(context.Background()) + return o +} From c4005bfc09a22cee04aa43d24c132daaeb9ea179 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Wed, 20 Nov 2024 10:54:44 +0100 Subject: [PATCH 13/25] fix linting Signed-off-by: Florian Bacher --- extension/opampextension/opamp_agent_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/opampextension/opamp_agent_test.go b/extension/opampextension/opamp_agent_test.go index da547b098f1b..262b5c4c9ca4 100644 --- a/extension/opampextension/opamp_agent_test.go +++ b/extension/opampextension/opamp_agent_test.go @@ -6,7 +6,6 @@ package opampextension import ( "context" "fmt" - "go.opentelemetry.io/collector/extension" "os" "path/filepath" "runtime" @@ -23,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensiontest" semconv "go.opentelemetry.io/collector/semconv/v1.27.0" "go.uber.org/zap" From 4fdaa6c8b4b03b2008976899cf043e39cd1ae051 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 09:04:22 +0100 Subject: [PATCH 14/25] address pr feedback Signed-off-by: Florian Bacher --- extension/opampextension/factory_test.go | 2 ++ extension/opampextension/opamp_agent.go | 2 ++ extension/opampextension/opamp_agent_test.go | 32 ++++++++++---------- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/extension/opampextension/factory_test.go b/extension/opampextension/factory_test.go index 5f763ab06f9f..eaa4d40c6d29 100644 --- a/extension/opampextension/factory_test.go +++ b/extension/opampextension/factory_test.go @@ -21,6 +21,7 @@ func TestFactory_CreateDefaultConfig(t *testing.T) { ext, err := createExtension(context.Background(), extensiontest.NewNopSettings(), cfg) require.NoError(t, err) require.NotNil(t, ext) + require.NoError(t, ext.Shutdown(context.Background())) } func TestFactory_Create(t *testing.T) { @@ -28,4 +29,5 @@ func TestFactory_Create(t *testing.T) { ext, err := createExtension(context.Background(), extensiontest.NewNopSettings(), cfg) require.NoError(t, err) require.NotNil(t, ext) + require.NoError(t, ext.Shutdown(context.Background())) } diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index e6eda7828fe9..75cd7ff3038c 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -37,6 +37,7 @@ import ( ) var _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil) +var _ componentstatus.Watcher = (*opampAgent)(nil) type statusAggregator interface { Subscribe(scope status.Scope, verbosity status.Verbosity) (<-chan *status.AggregateStatus, status.UnsubscribeFunc) @@ -161,6 +162,7 @@ func (o *opampAgent) Shutdown(ctx context.Context) error { } o.statusSubscriptionWg.Wait() + o.componentHealthWg.Wait() o.logger.Debug("OpAMP agent shutting down...") if o.opampClient == nil { diff --git a/extension/opampextension/opamp_agent_test.go b/extension/opampextension/opamp_agent_test.go index 262b5c4c9ca4..7921bd767470 100644 --- a/extension/opampextension/opamp_agent_test.go +++ b/extension/opampextension/opamp_agent_test.go @@ -44,7 +44,7 @@ func TestNewOpampAgent(t *testing.T) { assert.True(t, o.capabilities.ReportsHealth) assert.Empty(t, o.effectiveConfig) assert.Nil(t, o.agentDescription) - assert.NoError(t, o.Shutdown(context.TODO())) + assert.NoError(t, o.Shutdown(context.Background())) } func TestNewOpampAgentAttributes(t *testing.T) { @@ -59,7 +59,7 @@ func TestNewOpampAgentAttributes(t *testing.T) { assert.Equal(t, "otelcol-distro", o.agentType) assert.Equal(t, "distro.0", o.agentVersion) assert.Equal(t, "f8999bc1-4c9b-4619-9bae-7f009d2411ec", o.instanceID.String()) - assert.NoError(t, o.Shutdown(context.TODO())) + assert.NoError(t, o.Shutdown(context.Background())) } func TestCreateAgentDescription(t *testing.T) { @@ -158,7 +158,7 @@ func TestCreateAgentDescription(t *testing.T) { err = o.createAgentDescription() assert.NoError(t, err) require.Equal(t, tc.expected, o.agentDescription) - assert.NoError(t, o.Shutdown(context.TODO())) + assert.NoError(t, o.Shutdown(context.Background())) }) } } @@ -177,7 +177,7 @@ func TestUpdateAgentIdentity(t *testing.T) { o.updateAgentIdentity(uid) assert.Equal(t, o.instanceID, uid) - assert.NoError(t, o.Shutdown(context.TODO())) + assert.NoError(t, o.Shutdown(context.Background())) } func TestComposeEffectiveConfig(t *testing.T) { @@ -202,7 +202,7 @@ func TestComposeEffectiveConfig(t *testing.T) { assert.YAMLEq(t, string(expected), string(ec.ConfigMap.ConfigMap[""].Body)) assert.Equal(t, "text/yaml", ec.ConfigMap.ConfigMap[""].ContentType) - assert.NoError(t, o.Shutdown(context.TODO())) + assert.NoError(t, o.Shutdown(context.Background())) } func TestShutdown(t *testing.T) { @@ -212,7 +212,7 @@ func TestShutdown(t *testing.T) { assert.NoError(t, err) // Shutdown with no OpAMP client - assert.NoError(t, o.Shutdown(context.TODO())) + assert.NoError(t, o.Shutdown(context.Background())) } func TestStart(t *testing.T) { @@ -221,8 +221,8 @@ func TestStart(t *testing.T) { o, err := newOpampAgent(cfg.(*Config), set) assert.NoError(t, err) - assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) - assert.NoError(t, o.Shutdown(context.TODO())) + assert.NoError(t, o.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, o.Shutdown(context.Background())) } func TestHealthReportingReceiveUpdateFromAggregator(t *testing.T) { @@ -285,7 +285,7 @@ func TestHealthReportingReceiveUpdateFromAggregator(t *testing.T) { o.initHealthReporting() - assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) + assert.NoError(t, o.Start(context.Background(), componenttest.NewNopHost())) statusUpdateChannel <- nil statusUpdateChannel <- &status.AggregateStatus{ @@ -329,7 +329,7 @@ func TestHealthReportingReceiveUpdateFromAggregator(t *testing.T) { return receivedHealthUpdates == len(expectedHealthUpdates) }, 1*time.Second, 100*time.Millisecond) - assert.NoError(t, o.Shutdown(context.TODO())) + assert.NoError(t, o.Shutdown(context.Background())) require.True(t, sa.unsubscribed) } @@ -354,7 +354,7 @@ func TestHealthReportingForwardComponentHealthToAggregator(t *testing.T) { o.initHealthReporting() - assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) + assert.NoError(t, o.Start(context.Background(), componenttest.NewNopHost())) traces := testhelpers.NewPipelineMetadata("traces") @@ -414,7 +414,7 @@ func TestHealthReportingForwardComponentHealthToAggregator(t *testing.T) { require.Equal(t, componentstatus.NewEvent(componentstatus.StatusStopping).Status(), event.event.Status()) } - assert.NoError(t, o.Shutdown(context.TODO())) + assert.NoError(t, o.Shutdown(context.Background())) require.True(t, sa.unsubscribed) } @@ -463,7 +463,7 @@ func TestHealthReportingExitsOnClosedContext(t *testing.T) { o.initHealthReporting() - assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) + assert.NoError(t, o.Start(context.Background(), componenttest.NewNopHost())) statusUpdateChannel <- nil statusUpdateChannel <- &status.AggregateStatus{ @@ -490,7 +490,7 @@ func TestHealthReportingExitsOnClosedContext(t *testing.T) { }, 1*time.Second, 100*time.Millisecond) // invoke Shutdown before health update channel has been closed - assert.NoError(t, o.Shutdown(context.TODO())) + assert.NoError(t, o.Shutdown(context.Background())) require.True(t, sa.unsubscribed) } @@ -508,8 +508,8 @@ func TestHealthReportingDisabled(t *testing.T) { }, } - assert.NoError(t, o.Start(context.TODO(), componenttest.NewNopHost())) - assert.NoError(t, o.Shutdown(context.TODO())) + assert.NoError(t, o.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, o.Shutdown(context.Background())) } func TestParseInstanceIDString(t *testing.T) { From 7c0ecab22ac4c33572346f5fd6e5f6ffb4c81bc7 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 09:43:58 +0100 Subject: [PATCH 15/25] fix linting Signed-off-by: Florian Bacher --- extension/opampextension/opamp_agent.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 75cd7ff3038c..410b2f82f13c 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -36,8 +36,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" ) -var _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil) -var _ componentstatus.Watcher = (*opampAgent)(nil) +var ( + _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil) + _ componentstatus.Watcher = (*opampAgent)(nil) +) type statusAggregator interface { Subscribe(scope status.Scope, verbosity status.Verbosity) (<-chan *status.AggregateStatus, status.UnsubscribeFunc) From 9867ff8786011b1c15cdcb3fd68bc1e8507b09ad Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 11:20:12 +0100 Subject: [PATCH 16/25] trigger CI Signed-off-by: Florian Bacher From 263958c121bbf418bf9ca5a7c3f9f1f5973a167c Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 11:31:38 +0100 Subject: [PATCH 17/25] try to fix failing e2e test Signed-off-by: Florian Bacher --- extension/opampextension/opamp_agent.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 410b2f82f13c..5bae023e36a8 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -164,7 +164,8 @@ func (o *opampAgent) Shutdown(ctx context.Context) error { } o.statusSubscriptionWg.Wait() - o.componentHealthWg.Wait() + // TODO uncomment again + //o.componentHealthWg.Wait() o.logger.Debug("OpAMP agent shutting down...") if o.opampClient == nil { From 3b03c3e55ba083bc2bbbc97fbe903fad9029e75e Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 12:39:12 +0100 Subject: [PATCH 18/25] increase timeout to wait before checking collector health endpoint Signed-off-by: Florian Bacher --- cmd/opampsupervisor/e2e_test.go | 2 +- extension/opampextension/opamp_agent.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 8abcb79b2ce1..ff9d41973d7b 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -1348,7 +1348,7 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) { } // Verify the collector is not running after 250 ms by checking the healthcheck endpoint - time.Sleep(250 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) _, err := http.DefaultClient.Get("http://localhost:12345") if runtime.GOOS != "windows" { require.ErrorContains(t, err, "connection refused") diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 5bae023e36a8..410b2f82f13c 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -164,8 +164,7 @@ func (o *opampAgent) Shutdown(ctx context.Context) error { } o.statusSubscriptionWg.Wait() - // TODO uncomment again - //o.componentHealthWg.Wait() + o.componentHealthWg.Wait() o.logger.Debug("OpAMP agent shutting down...") if o.opampClient == nil { From 919261f64934e5eeeee1adf1fffbf9c4c4413dda Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 12:55:59 +0100 Subject: [PATCH 19/25] trigger CI Signed-off-by: Florian Bacher From 96c6002dcc8cf22d462a8e46c10d654e4ae040b8 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 13:16:33 +0100 Subject: [PATCH 20/25] increase timeout in e2e test Signed-off-by: Florian Bacher --- cmd/opampsupervisor/e2e_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index ff9d41973d7b..50003fbb2d4f 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -940,7 +940,7 @@ func TestSupervisorRestartCommand(t *testing.T) { return health.Healthy && health.LastError == "" } return false - }, 20*time.Second, 250*time.Millisecond, "Collector never reported healthy after restart") + }, 30*time.Second, 250*time.Millisecond, "Collector never reported healthy after restart") } func TestSupervisorOpAMPConnectionSettings(t *testing.T) { From 092721fb34f204622f0574e1d7d8eab6c2540c5c Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 13:33:53 +0100 Subject: [PATCH 21/25] trigger CI Signed-off-by: Florian Bacher From 386afda9fa0d5b6e436d296318087885cab6184e Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 14:15:08 +0100 Subject: [PATCH 22/25] trigger CI Signed-off-by: Florian Bacher From 548ee1756ff4e1db44de603c93ef7b3db7e772f1 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 14:56:49 +0100 Subject: [PATCH 23/25] trigger CI Signed-off-by: Florian Bacher From e0d9039966f47c1be644cc7210f825d70432b7df Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 5 Dec 2024 15:16:23 +0100 Subject: [PATCH 24/25] trigger CI Signed-off-by: Florian Bacher From bc6733c6a08057ead6abdfc050f95920e8838f9d Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Fri, 6 Dec 2024 06:56:15 +0100 Subject: [PATCH 25/25] move interface assertions Signed-off-by: Florian Bacher --- extension/opampextension/opamp_agent.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 410b2f82f13c..c638e8727b05 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -36,11 +36,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" ) -var ( - _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil) - _ componentstatus.Watcher = (*opampAgent)(nil) -) - type statusAggregator interface { Subscribe(scope status.Scope, verbosity status.Verbosity) (<-chan *status.AggregateStatus, status.UnsubscribeFunc) RecordStatus(source *componentstatus.InstanceID, event *componentstatus.Event) @@ -89,6 +84,8 @@ var ( _ opampcustommessages.CustomCapabilityRegistry = (*opampAgent)(nil) _ extensioncapabilities.Dependent = (*opampAgent)(nil) _ extensioncapabilities.ConfigWatcher = (*opampAgent)(nil) + _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil) + _ componentstatus.Watcher = (*opampAgent)(nil) ) func (o *opampAgent) Start(ctx context.Context, host component.Host) error {