Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/opampagent] use status subscription for fine granular health reporting #35892

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
d6cf1d0
[extension/opampagent] use status subscription for fine granular heal…
bacherfl Oct 21, 2024
f27572a
Merge branch 'main' into feat/35856/opamp-extension-health-report
bacherfl Oct 21, 2024
8be384e
add changelog entry
bacherfl Oct 21, 2024
d2fe6b2
Merge remote-tracking branch 'bacherfl/feat/35856/opamp-extension-hea…
bacherfl Oct 21, 2024
5fd8e8b
fix linting
bacherfl Oct 21, 2024
24b32e3
fix crosslinking
bacherfl Oct 21, 2024
cc815b4
go tidy
bacherfl Oct 21, 2024
e5711a6
Merge branch 'main' into feat/35856/opamp-extension-health-report
bacherfl Oct 22, 2024
2b172ae
receive component health updates and forward them to status aggregator
bacherfl Oct 25, 2024
257145a
Merge branch 'main' into feat/35856/opamp-extension-health-report
bacherfl Oct 28, 2024
38ecfee
fix linting
bacherfl Oct 28, 2024
2b829b0
fix data race
bacherfl Oct 28, 2024
7a47fb8
ensure the component status event loop is started before updates are …
bacherfl Oct 28, 2024
f280a71
Merge branch 'main' into feat/35856/opamp-extension-health-report
bacherfl Oct 28, 2024
6cd557f
increase test timeout
bacherfl Oct 28, 2024
7b92809
Merge remote-tracking branch 'bacherfl/feat/35856/opamp-extension-hea…
bacherfl Oct 28, 2024
06c6949
Merge branch 'main' into feat/35856/opamp-extension-health-report
bacherfl Nov 12, 2024
87a406b
Merge branch 'main' into feat/35856/opamp-extension-health-report
bacherfl Nov 19, 2024
1f8ea96
fix linting
bacherfl Nov 19, 2024
0ffdbc1
Merge branch 'main' into feat/35856/opamp-extension-health-report
bacherfl Nov 19, 2024
2101748
consolidate health reporting setup into initHealthReporting function
bacherfl Nov 20, 2024
c4005bf
fix linting
bacherfl Nov 20, 2024
bba9f4f
Merge branch 'main' into feat/35856/opamp-extension-health-report
bacherfl Nov 21, 2024
011877b
Merge branch 'main' into feat/35856/opamp-extension-health-report
bacherfl Nov 22, 2024
47c73d0
Merge branch 'main' into feat/35856/opamp-extension-health-report
bacherfl Dec 5, 2024
4fdaa6c
address pr feedback
bacherfl Dec 5, 2024
7c0ecab
fix linting
bacherfl Dec 5, 2024
9867ff8
trigger CI
bacherfl Dec 5, 2024
263958c
try to fix failing e2e test
bacherfl Dec 5, 2024
3b03c3e
increase timeout to wait before checking collector health endpoint
bacherfl Dec 5, 2024
919261f
trigger CI
bacherfl Dec 5, 2024
96c6002
increase timeout in e2e test
bacherfl Dec 5, 2024
092721f
trigger CI
bacherfl Dec 5, 2024
386afda
trigger CI
bacherfl Dec 5, 2024
2c5c524
Merge branch 'main' into feat/35856/opamp-extension-health-report
bacherfl Dec 5, 2024
548ee17
trigger CI
bacherfl Dec 5, 2024
e0d9039
trigger CI
bacherfl Dec 5, 2024
bc6733c
move interface assertions
bacherfl Dec 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/opamp-extension-health-reporting.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampextension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Use status subscription for fine granular component health reporting

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35856]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 2 additions & 2 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ func TestSupervisorRestartCommand(t *testing.T) {
return health.Healthy && health.LastError == ""
}
return false
}, 10*time.Second, 250*time.Millisecond, "Collector never reported healthy after restart")
}, 30*time.Second, 250*time.Millisecond, "Collector never reported healthy after restart")
}

func TestSupervisorOpAMPConnectionSettings(t *testing.T) {
Expand Down Expand Up @@ -1348,7 +1348,7 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {
}

// Verify the collector is not running after 250 ms by checking the healthcheck endpoint
time.Sleep(250 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
_, err := http.DefaultClient.Get("http://localhost:12345")
if runtime.GOOS != "windows" {
require.ErrorContains(t, err, "connection refused")
Expand Down
2 changes: 2 additions & 0 deletions extension/opampextension/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
ext, err := createExtension(context.Background(), extensiontest.NewNopSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, ext)
require.NoError(t, ext.Shutdown(context.Background()))
}

func TestFactory_Create(t *testing.T) {
cfg := NewFactory().CreateDefaultConfig()
ext, err := createExtension(context.Background(), extensiontest.NewNopSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, ext)
require.NoError(t, ext.Shutdown(context.Background()))
}
3 changes: 3 additions & 0 deletions extension/opampextension/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/oklog/ulid/v2 v2.1.0
github.com/open-telemetry/opamp-go v0.17.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.115.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status v0.115.0
github.com/shirou/gopsutil/v4 v4.24.10
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.115.0
Expand Down Expand Up @@ -67,3 +68,5 @@ require (
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages => ../opampcustommessages

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status => ../../pkg/status
180 changes: 175 additions & 5 deletions extension/opampextension/opamp_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,18 @@ import (
"gopkg.in/yaml.v3"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
)

var _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil)
type statusAggregator interface {
Subscribe(scope status.Scope, verbosity status.Verbosity) (<-chan *status.AggregateStatus, status.UnsubscribeFunc)
RecordStatus(source *componentstatus.InstanceID, event *componentstatus.Event)
}

type eventSourcePair struct {
source *componentstatus.InstanceID
event *componentstatus.Event
}

type opampAgent struct {
cfg *Config
Expand All @@ -62,12 +71,21 @@ type opampAgent struct {
opampClient client.OpAMPClient

customCapabilityRegistry *customCapabilityRegistry

statusAggregator statusAggregator
statusSubscriptionWg *sync.WaitGroup
componentHealthWg *sync.WaitGroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell, we never call Wait on this, so I think we can remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch - seems like the Wait was missing in the Shutdown method. I added it now, to make sure the component health routine exits properly when we receive the shutdown signal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI i had to increase some of the timeouts in the e2e tests of the supervisor, as the agent now needs a bit more time to shut down - I will re-trigger the tests a couple of times, to make sure the tests are passing consistently now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright, it seems like now the tests are passing consistently (the previous run failed due to a temporary issue with the govuln check, but that is unrelated to the changes made in this PR

startTimeUnixNano uint64
componentStatusCh chan *eventSourcePair
readyCh chan struct{}
}

var (
_ opampcustommessages.CustomCapabilityRegistry = (*opampAgent)(nil)
_ extensioncapabilities.Dependent = (*opampAgent)(nil)
_ extensioncapabilities.ConfigWatcher = (*opampAgent)(nil)
_ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil)
_ componentstatus.Watcher = (*opampAgent)(nil)
)

func (o *opampAgent) Start(ctx context.Context, host component.Host) error {
Expand All @@ -85,8 +103,6 @@ func (o *opampAgent) Start(ctx context.Context, host component.Host) error {
return err
}

o.lifetimeCtx, o.lifetimeCtxCancel = context.WithCancel(context.Background())

if o.cfg.PPID != 0 {
go monitorPPID(o.lifetimeCtx, o.cfg.PPIDPollInterval, o.cfg.PPID, o.reportFunc)
}
Expand Down Expand Up @@ -128,8 +144,6 @@ func (o *opampAgent) Start(ctx context.Context, host component.Host) error {
return err
}

o.setHealth(&protobufs.ComponentHealth{Healthy: false})

o.logger.Debug("Starting OpAMP client...")

if err := o.opampClient.Start(context.Background(), settings); err != nil {
Expand All @@ -146,6 +160,9 @@ func (o *opampAgent) Shutdown(ctx context.Context) error {
o.lifetimeCtxCancel()
}

o.statusSubscriptionWg.Wait()
o.componentHealthWg.Wait()

o.logger.Debug("OpAMP agent shutting down...")
if o.opampClient == nil {
return nil
Expand Down Expand Up @@ -190,6 +207,7 @@ func (o *opampAgent) Register(capability string, opts ...opampcustommessages.Cus

func (o *opampAgent) Ready() error {
o.setHealth(&protobufs.ComponentHealth{Healthy: true})
close(o.readyCh)
return nil
}

Expand All @@ -198,6 +216,27 @@ func (o *opampAgent) NotReady() error {
return nil
}

// ComponentStatusChanged implements the componentstatus.Watcher interface.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an interface implementation assertion for componentstatus.Watcher above?

func (o *opampAgent) ComponentStatusChanged(
source *componentstatus.InstanceID,
event *componentstatus.Event,
) {
// There can be late arriving events after shutdown. We need to close
// the event channel so that this function doesn't block and we release all
// goroutines, but attempting to write to a closed channel will panic; log
// and recover.
defer func() {
if r := recover(); r != nil {
o.logger.Info(
"discarding event received after shutdown",
zap.Any("source", source),
zap.Any("event", event),
)
}
}()
o.componentStatusCh <- &eventSourcePair{source: source, event: event}
}

func (o *opampAgent) updateEffectiveConfig(conf *confmap.Conf) {
o.eclk.Lock()
defer o.eclk.Unlock()
Expand Down Expand Up @@ -249,9 +288,18 @@ func newOpampAgent(cfg *Config, set extension.Settings) (*opampAgent, error) {
instanceID: uid,
capabilities: cfg.Capabilities,
opampClient: opampClient,
statusSubscriptionWg: &sync.WaitGroup{},
componentHealthWg: &sync.WaitGroup{},
readyCh: make(chan struct{}),
customCapabilityRegistry: newCustomCapabilityRegistry(set.Logger, opampClient),
}

agent.lifetimeCtx, agent.lifetimeCtxCancel = context.WithCancel(context.Background())

if agent.capabilities.ReportsHealth {
agent.initHealthReporting()
}

return agent, nil
}

Expand Down Expand Up @@ -372,6 +420,11 @@ func (o *opampAgent) onMessage(_ context.Context, msg *types.MessageData) {

func (o *opampAgent) setHealth(ch *protobufs.ComponentHealth) {
if o.capabilities.ReportsHealth && o.opampClient != nil {
if ch.Healthy && o.startTimeUnixNano == 0 {
ch.StartTimeUnixNano = ch.StatusTimeUnixNano
} else {
ch.StartTimeUnixNano = o.startTimeUnixNano
}
if err := o.opampClient.SetHealth(ch); err != nil {
o.logger.Error("Could not report health to OpAMP server", zap.Error(err))
}
Expand All @@ -395,3 +448,120 @@ func getOSDescription(logger *zap.Logger) string {
return runtime.GOOS
}
}

func (o *opampAgent) initHealthReporting() {
if !o.capabilities.ReportsHealth {
return
}
o.setHealth(&protobufs.ComponentHealth{Healthy: false})

if o.statusAggregator == nil {
o.statusAggregator = status.NewAggregator(status.PriorityPermanent)
}
statusChan, unsubscribeFunc := o.statusAggregator.Subscribe(status.ScopeAll, status.Verbose)
o.statusSubscriptionWg.Add(1)
go o.statusAggregatorEventLoop(unsubscribeFunc, statusChan)

// Start processing events in the background so that our status watcher doesn't
// block others before the extension starts.
o.componentStatusCh = make(chan *eventSourcePair)
o.componentHealthWg.Add(1)
go o.componentHealthEventLoop()
}

func (o *opampAgent) componentHealthEventLoop() {
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
// Record events with component.StatusStarting, but queue other events until
// PipelineWatcher.Ready is called. This prevents aggregate statuses from
// flapping between StatusStarting and StatusOK as components are started
// individually by the service.
var eventQueue []*eventSourcePair

defer o.componentHealthWg.Done()
for loop := true; loop; {
select {
case esp, ok := <-o.componentStatusCh:
if !ok {
return
}
if esp.event.Status() != componentstatus.StatusStarting {
eventQueue = append(eventQueue, esp)
continue
}
o.statusAggregator.RecordStatus(esp.source, esp.event)
case <-o.readyCh:
for _, esp := range eventQueue {
o.statusAggregator.RecordStatus(esp.source, esp.event)
}
eventQueue = nil
loop = false
case <-o.lifetimeCtx.Done():
return
}
}

// After PipelineWatcher.Ready, record statuses as they are received.
for {
select {
case esp, ok := <-o.componentStatusCh:
if !ok {
return
}
o.statusAggregator.RecordStatus(esp.source, esp.event)
case <-o.lifetimeCtx.Done():
return
}
}
}

func (o *opampAgent) statusAggregatorEventLoop(unsubscribeFunc status.UnsubscribeFunc, statusChan <-chan *status.AggregateStatus) {
defer func() {
unsubscribeFunc()
o.statusSubscriptionWg.Done()
}()
for {
select {
case <-o.lifetimeCtx.Done():
return
case statusUpdate, ok := <-statusChan:
if !ok {
return
}

if statusUpdate == nil || statusUpdate.Status() == componentstatus.StatusNone {
continue
}

componentHealth := convertComponentHealth(statusUpdate)

o.setHealth(componentHealth)
}
}
}

func convertComponentHealth(statusUpdate *status.AggregateStatus) *protobufs.ComponentHealth {
var isHealthy bool
if statusUpdate.Status() == componentstatus.StatusOK {
isHealthy = true
} else {
isHealthy = false
}

componentHealth := &protobufs.ComponentHealth{
Healthy: isHealthy,
Status: statusUpdate.Status().String(),
StatusTimeUnixNano: uint64(statusUpdate.Timestamp().UnixNano()),
}

if statusUpdate.Err() != nil {
componentHealth.LastError = statusUpdate.Err().Error()
}

if len(statusUpdate.ComponentStatusMap) > 0 {
componentHealth.ComponentHealthMap = map[string]*protobufs.ComponentHealth{}
for comp, compState := range statusUpdate.ComponentStatusMap {
componentHealth.ComponentHealthMap[comp] = convertComponentHealth(compState)
}
}

return componentHealth
}
Loading
Loading