diff --git a/.chloggen/healthcheck-agg.yaml b/.chloggen/healthcheck-agg.yaml new file mode 100644 index 000000000000..6e7ea18c7d2a --- /dev/null +++ b/.chloggen/healthcheck-agg.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: healthcheckv2extension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add shared aggregation logic for status events. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26661] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/extension/healthcheckv2extension/internal/status/aggregation.go b/extension/healthcheckv2extension/internal/status/aggregation.go new file mode 100644 index 000000000000..4f4f08025535 --- /dev/null +++ b/extension/healthcheckv2extension/internal/status/aggregation.go @@ -0,0 +1,156 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package status // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status" + +import ( + "time" + + "go.opentelemetry.io/collector/component" +) + +// statusEvent contains a status and timestamp, and can contain an error. Note: +// this is duplicated from core because we need to be able to "rewrite" the +// timestamps of some events during aggregation. +type statusEvent struct { + status component.Status + err error + timestamp time.Time +} + +var _ Event = (*statusEvent)(nil) + +// Status returns the Status (enum) associated with the StatusEvent +func (ev *statusEvent) Status() component.Status { + return ev.status +} + +// Err returns the error associated with the StatusEvent. +func (ev *statusEvent) Err() error { + return ev.err +} + +// Timestamp returns the timestamp associated with the StatusEvent +func (ev *statusEvent) Timestamp() time.Time { + return ev.timestamp +} + +type ErrorPriority int + +const ( + PriorityPermanent ErrorPriority = iota + PriorityRecoverable +) + +type aggregationFunc func(*AggregateStatus) Event + +// The purpose of aggregation is to ensure that the most relevant status bubbles +// upwards in the aggregate status. This aggregation func prioritizes lifecycle +// events (including FatalError) over PermanentError and RecoverableError +// events. The priority argument determines the priority of PermanentError +// events vs RecoverableError events. Lifecycle events will have the timestamp +// of the most recent event and error events will have the timestamp of the +// first occurrence. We use the first occurrence of an error event as this marks +// the beginning of a possible failure. This is important for two reasons: +// recovery duration and causality. We expect a RecoverableError to recover +// before the RecoveryDuration elapses. We need to use the earliest timestamp so +// that a later RecoverableError does not shadow an earlier event in the +// aggregate status. Additionally, this makes sense in the case where a +// RecoverableError in one component cascades to other components; the earliest +// error event is likely to be correlated with the cause. For non-error stauses +// we use the latest event as it represents the last time a successful status was +// reported. +func newAggregationFunc(priority ErrorPriority) aggregationFunc { + statusFunc := func(st *AggregateStatus) component.Status { + seen := make(map[component.Status]struct{}) + for _, cs := range st.ComponentStatusMap { + seen[cs.Status()] = struct{}{} + } + + // All statuses are the same. Note, this will handle StatusOK and StatusStopped as these two + // cases require all components be in the same state. + if len(seen) == 1 { + for st := range seen { + return st + } + } + + // Handle mixed status cases + if _, isFatal := seen[component.StatusFatalError]; isFatal { + return component.StatusFatalError + } + + if _, isStarting := seen[component.StatusStarting]; isStarting { + return component.StatusStarting + } + + if _, isStopping := seen[component.StatusStopping]; isStopping { + return component.StatusStopping + } + + if _, isStopped := seen[component.StatusStopped]; isStopped { + return component.StatusStopping + } + + if priority == PriorityPermanent { + if _, isPermanent := seen[component.StatusPermanentError]; isPermanent { + return component.StatusPermanentError + } + if _, isRecoverable := seen[component.StatusRecoverableError]; isRecoverable { + return component.StatusRecoverableError + } + } else { + if _, isRecoverable := seen[component.StatusRecoverableError]; isRecoverable { + return component.StatusRecoverableError + } + if _, isPermanent := seen[component.StatusPermanentError]; isPermanent { + return component.StatusPermanentError + } + } + + return component.StatusNone + } + + return func(st *AggregateStatus) Event { + var ev, lastEvent, matchingEvent Event + status := statusFunc(st) + isError := component.StatusIsError(status) + + for _, cs := range st.ComponentStatusMap { + ev = cs.Event + if lastEvent == nil || lastEvent.Timestamp().Before(ev.Timestamp()) { + lastEvent = ev + } + if status == ev.Status() { + switch { + case matchingEvent == nil: + matchingEvent = ev + case isError: + // Use earliest to mark beginning of a failure + if ev.Timestamp().Before(matchingEvent.Timestamp()) { + matchingEvent = ev + } + case ev.Timestamp().After(matchingEvent.Timestamp()): + // Use most recent for last successful status + matchingEvent = ev + } + } + } + + // the error status will be the first matching event + if isError { + return matchingEvent + } + + // the aggregate status matches an existing event + if lastEvent.Status() == status { + return lastEvent + } + + // the aggregate status requires a synthetic event + return &statusEvent{ + status: status, + timestamp: lastEvent.Timestamp(), + } + } +} diff --git a/extension/healthcheckv2extension/internal/status/aggregation_test.go b/extension/healthcheckv2extension/internal/status/aggregation_test.go new file mode 100644 index 000000000000..50a2ee43d61c --- /dev/null +++ b/extension/healthcheckv2extension/internal/status/aggregation_test.go @@ -0,0 +1,175 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package status + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" +) + +func TestAggregationFuncs(t *testing.T) { + aggRecoverable := newAggregationFunc(PriorityRecoverable) + aggPermanent := newAggregationFunc(PriorityPermanent) + + type statusExpectation struct { + priorityPermanent component.Status + priorityRecoverable component.Status + } + + for _, tc := range []struct { + name string + aggregateStatus *AggregateStatus + expectedStatus *statusExpectation + }{ + { + name: "FatalError takes precedence over all", + aggregateStatus: &AggregateStatus{ + ComponentStatusMap: map[string]*AggregateStatus{ + "c1": { + Event: component.NewStatusEvent(component.StatusFatalError), + }, + "c2": { + Event: component.NewStatusEvent(component.StatusStarting), + }, + "c3": { + Event: component.NewStatusEvent(component.StatusOK), + }, + "c4": { + Event: component.NewStatusEvent(component.StatusRecoverableError), + }, + "c5": { + Event: component.NewStatusEvent(component.StatusPermanentError), + }, + "c6": { + Event: component.NewStatusEvent(component.StatusStopping), + }, + "c7": { + Event: component.NewStatusEvent(component.StatusStopped), + }, + }, + }, + expectedStatus: &statusExpectation{ + priorityPermanent: component.StatusFatalError, + priorityRecoverable: component.StatusFatalError, + }, + }, + { + name: "Lifecycle: Starting takes precedence over non-fatal errors", + aggregateStatus: &AggregateStatus{ + ComponentStatusMap: map[string]*AggregateStatus{ + "c1": { + Event: component.NewStatusEvent(component.StatusStarting), + }, + "c2": { + Event: component.NewStatusEvent(component.StatusRecoverableError), + }, + "c3": { + Event: component.NewStatusEvent(component.StatusPermanentError), + }, + }, + }, + expectedStatus: &statusExpectation{ + priorityPermanent: component.StatusStarting, + priorityRecoverable: component.StatusStarting, + }, + }, + { + name: "Lifecycle: Stopping takes precedence over non-fatal errors", + aggregateStatus: &AggregateStatus{ + ComponentStatusMap: map[string]*AggregateStatus{ + "c1": { + Event: component.NewStatusEvent(component.StatusStopping), + }, + "c2": { + Event: component.NewStatusEvent(component.StatusRecoverableError), + }, + "c3": { + Event: component.NewStatusEvent(component.StatusPermanentError), + }, + }, + }, + expectedStatus: &statusExpectation{ + priorityPermanent: component.StatusStopping, + priorityRecoverable: component.StatusStopping, + }, + }, + { + name: "Prioritized error takes priority over OK", + aggregateStatus: &AggregateStatus{ + ComponentStatusMap: map[string]*AggregateStatus{ + "c1": { + Event: component.NewStatusEvent(component.StatusOK), + }, + "c2": { + Event: component.NewStatusEvent(component.StatusRecoverableError), + }, + "c3": { + Event: component.NewStatusEvent(component.StatusPermanentError), + }, + }, + }, + expectedStatus: &statusExpectation{ + priorityPermanent: component.StatusPermanentError, + priorityRecoverable: component.StatusRecoverableError, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expectedStatus.priorityPermanent, + aggPermanent(tc.aggregateStatus).Status()) + assert.Equal(t, tc.expectedStatus.priorityRecoverable, + aggRecoverable(tc.aggregateStatus).Status()) + }) + } +} + +func TestEventTemporalOrder(t *testing.T) { + // Note: ErrorPriority does not affect temporal ordering + aggFunc := newAggregationFunc(PriorityPermanent) + st := &AggregateStatus{ + ComponentStatusMap: map[string]*AggregateStatus{ + "c1": { + Event: component.NewStatusEvent(component.StatusOK), + }, + }, + } + assert.Equal(t, st.ComponentStatusMap["c1"].Event, aggFunc(st)) + + // Record first error + st.ComponentStatusMap["c2"] = &AggregateStatus{ + Event: component.NewRecoverableErrorEvent(assert.AnError), + } + + // Returns first error + assert.Equal(t, st.ComponentStatusMap["c2"].Event, aggFunc(st)) + + // Record second error + st.ComponentStatusMap["c3"] = &AggregateStatus{ + Event: component.NewRecoverableErrorEvent(assert.AnError), + } + + // Still returns first error + assert.Equal(t, st.ComponentStatusMap["c2"].Event, aggFunc(st)) + + // Replace first error with later error + st.ComponentStatusMap["c2"] = &AggregateStatus{ + Event: component.NewRecoverableErrorEvent(assert.AnError), + } + + // Returns second error now + assert.Equal(t, st.ComponentStatusMap["c3"].Event, aggFunc(st)) + + // Clear errors + st.ComponentStatusMap["c2"] = &AggregateStatus{ + Event: component.NewStatusEvent(component.StatusOK), + } + st.ComponentStatusMap["c3"] = &AggregateStatus{ + Event: component.NewStatusEvent(component.StatusOK), + } + + // Returns latest event + assert.Equal(t, st.ComponentStatusMap["c3"].Event, aggFunc(st)) +} diff --git a/extension/healthcheckv2extension/internal/status/aggregator.go b/extension/healthcheckv2extension/internal/status/aggregator.go new file mode 100644 index 000000000000..7946eba5c218 --- /dev/null +++ b/extension/healthcheckv2extension/internal/status/aggregator.go @@ -0,0 +1,234 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package status // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status" + +import ( + "container/list" + "fmt" + "strings" + "sync" + "time" + + "go.opentelemetry.io/collector/component" +) + +// Extensions are treated as a pseudo pipeline and extsID is used as a map key +var ( + extsID = component.MustNewID("extensions") + extsIDMap = map[component.ID]struct{}{extsID: {}} +) + +// Note: this interface had to be introduced because we need to be able to rewrite the +// timestamps of some events during aggregation. The implementation in core doesn't currently +// allow this, but this interface provides a workaround. +type Event interface { + Status() component.Status + Err() error + Timestamp() time.Time +} + +// Scope refers to a part of an AggregateStatus. The zero-value, aka ScopeAll, +// refers to the entire AggregateStatus. ScopeExtensions refers to the extensions +// subtree, and any other value refers to a pipeline subtree. +type Scope string + +const ( + ScopeAll Scope = "" + ScopeExtensions Scope = "extensions" + pipelinePrefix string = "pipeline:" +) + +func (s Scope) toKey() string { + if s == ScopeAll || s == ScopeExtensions { + return string(s) + } + return pipelinePrefix + string(s) +} + +type Verbosity bool + +const ( + Verbose Verbosity = true + Concise = false +) + +// AggregateStatus contains a map of child AggregateStatuses and an embedded Event. +// It can be used to represent a single, top-level status when the ComponentStatusMap +// is empty, or a nested structure when map is non-empty. +type AggregateStatus struct { + Event + + ComponentStatusMap map[string]*AggregateStatus +} + +func (a *AggregateStatus) clone(verbosity Verbosity) *AggregateStatus { + st := &AggregateStatus{ + Event: a.Event, + } + + if verbosity == Verbose && len(a.ComponentStatusMap) > 0 { + st.ComponentStatusMap = make(map[string]*AggregateStatus, len(a.ComponentStatusMap)) + for k, cs := range a.ComponentStatusMap { + st.ComponentStatusMap[k] = cs.clone(verbosity) + } + } + + return st +} + +type subscription struct { + statusCh chan *AggregateStatus + verbosity Verbosity +} + +// UnsubscribeFunc is a function used to unsubscribe from a stream. +type UnsubscribeFunc func() + +// Aggregator records individual status events for components and aggregates statuses for the +// pipelines they belong to and the collector overall. +type Aggregator struct { + // mu protects aggregateStatus and subscriptions from concurrent modification + mu sync.RWMutex + aggregateStatus *AggregateStatus + subscriptions map[string]*list.List + aggregationFunc aggregationFunc +} + +// NewAggregator returns a *status.Aggregator. +func NewAggregator(errPriority ErrorPriority) *Aggregator { + return &Aggregator{ + aggregateStatus: &AggregateStatus{ + Event: &component.StatusEvent{}, + ComponentStatusMap: make(map[string]*AggregateStatus), + }, + subscriptions: make(map[string]*list.List), + aggregationFunc: newAggregationFunc(errPriority), + } +} + +// AggregateStatus returns an *AggregateStatus for the given scope. The scope can be the collector +// overall (ScopeAll), extensions (ScopeExtensions), or a pipeline by name. Detail specifies whether +// or not subtrees should be returned with the *AggregateStatus. The boolean return value indicates +// whether or not the scope was found. +func (a *Aggregator) AggregateStatus(scope Scope, verbosity Verbosity) (*AggregateStatus, bool) { + a.mu.RLock() + defer a.mu.RUnlock() + + if scope == ScopeAll { + return a.aggregateStatus.clone(verbosity), true + } + + st, ok := a.aggregateStatus.ComponentStatusMap[scope.toKey()] + if !ok { + return nil, false + } + + return st.clone(verbosity), true +} + +// RecordStatus stores and aggregates a StatusEvent for the given component instance. +func (a *Aggregator) RecordStatus(source *component.InstanceID, event *component.StatusEvent) { + compIDs := source.PipelineIDs + // extensions are treated as a pseudo-pipeline + if source.Kind == component.KindExtension { + compIDs = extsIDMap + } + + a.mu.Lock() + defer a.mu.Unlock() + + for compID := range compIDs { + var pipelineStatus *AggregateStatus + pipelineScope := Scope(compID.String()) + pipelineKey := pipelineScope.toKey() + + pipelineStatus, ok := a.aggregateStatus.ComponentStatusMap[pipelineKey] + if !ok { + pipelineStatus = &AggregateStatus{ + ComponentStatusMap: make(map[string]*AggregateStatus), + } + } + + componentKey := fmt.Sprintf("%s:%s", strings.ToLower(source.Kind.String()), source.ID) + pipelineStatus.ComponentStatusMap[componentKey] = &AggregateStatus{ + Event: event, + } + a.aggregateStatus.ComponentStatusMap[pipelineKey] = pipelineStatus + pipelineStatus.Event = a.aggregationFunc(pipelineStatus) + a.notifySubscribers(pipelineScope, pipelineStatus) + } + + a.aggregateStatus.Event = a.aggregationFunc(a.aggregateStatus) + a.notifySubscribers(ScopeAll, a.aggregateStatus) +} + +// Subscribe allows you to subscribe to a stream of events for the given scope. The scope can be +// the collector overall (ScopeAll), extensions (ScopeExtensions), or a pipeline name. +// It is possible to subscribe to a pipeline that has not yet reported. An initial nil +// will be sent on the channel and events will start streaming if and when it starts reporting. +// A `Verbose` verbosity specifies that subtrees should be returned with the *AggregateStatus. +// To unsubscribe, call the returned UnsubscribeFunc. +func (a *Aggregator) Subscribe(scope Scope, verbosity Verbosity) (<-chan *AggregateStatus, UnsubscribeFunc) { + a.mu.Lock() + defer a.mu.Unlock() + + key := scope.toKey() + st := a.aggregateStatus + if scope != ScopeAll { + st = st.ComponentStatusMap[key] + } + if st != nil { + st = st.clone(verbosity) + } + sub := &subscription{ + statusCh: make(chan *AggregateStatus, 1), + verbosity: verbosity, + } + subList, ok := a.subscriptions[key] + if !ok { + subList = list.New() + a.subscriptions[key] = subList + } + el := subList.PushBack(sub) + + unsubFunc := func() { + subList.Remove(el) + if subList.Front() == nil { + delete(a.subscriptions, key) + } + } + + sub.statusCh <- st + + return sub.statusCh, unsubFunc +} + +// Close terminates all existing subscriptions. +func (a *Aggregator) Close() { + a.mu.Lock() + defer a.mu.Unlock() + + for _, subList := range a.subscriptions { + for el := subList.Front(); el != nil; el = el.Next() { + sub := el.Value.(*subscription) + close(sub.statusCh) + } + } +} + +func (a *Aggregator) notifySubscribers(scope Scope, status *AggregateStatus) { + subList, ok := a.subscriptions[scope.toKey()] + if !ok { + return + } + for el := subList.Front(); el != nil; el = el.Next() { + sub := el.Value.(*subscription) + // clear unread events + select { + case <-sub.statusCh: + default: + } + sub.statusCh <- status.clone(sub.verbosity) + } +} diff --git a/extension/healthcheckv2extension/internal/status/aggregator_test.go b/extension/healthcheckv2extension/internal/status/aggregator_test.go new file mode 100644 index 000000000000..bff1bfe6700e --- /dev/null +++ b/extension/healthcheckv2extension/internal/status/aggregator_test.go @@ -0,0 +1,553 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package status_test + +import ( + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/testhelpers" +) + +func TestAggregateStatus(t *testing.T) { + agg := status.NewAggregator(status.PriorityPermanent) + traces := testhelpers.NewPipelineMetadata("traces") + + t.Run("zero value", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.ScopeAll, status.Concise) + require.True(t, ok) + assert.Equal(t, component.StatusNone, st.Status()) + }) + + testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusOK) + + t.Run("pipeline statuses all successful", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.ScopeAll, status.Concise) + require.True(t, ok) + assert.Equal(t, component.StatusOK, st.Status()) + }) + + agg.RecordStatus( + traces.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + + t.Run("pipeline with recoverable error", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.ScopeAll, status.Concise) + require.True(t, ok) + assertErrorEventsMatch(t, + component.StatusRecoverableError, + assert.AnError, + st, + ) + }) + + agg.RecordStatus( + traces.ExporterID, + component.NewPermanentErrorEvent(assert.AnError), + ) + + t.Run("pipeline with permanent error", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.ScopeAll, status.Concise) + require.True(t, ok) + assertErrorEventsMatch(t, + component.StatusPermanentError, + assert.AnError, + st, + ) + }) +} + +func TestAggregateStatusVerbose(t *testing.T) { + agg := status.NewAggregator(status.PriorityPermanent) + traces := testhelpers.NewPipelineMetadata("traces") + tracesKey := toPipelineKey(traces.PipelineID) + + t.Run("zero value", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.ScopeAll, status.Verbose) + require.True(t, ok) + assertEventsMatch(t, component.StatusNone, st) + assert.Empty(t, st.ComponentStatusMap) + }) + + // Seed aggregator with successful statuses for pipeline. + testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusOK) + + t.Run("pipeline statuses all successful", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.ScopeAll, status.Verbose) + require.True(t, ok) + + // The top-level status and pipeline status match. + assertEventsMatch(t, component.StatusOK, st, st.ComponentStatusMap[tracesKey]) + + // Component statuses match + assertEventsMatch(t, + component.StatusOK, + collectStatuses(st.ComponentStatusMap[tracesKey], traces.InstanceIDs()...)..., + ) + }) + + // Record an error in the traces exporter + agg.RecordStatus( + traces.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + + t.Run("pipeline with exporter error", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.ScopeAll, status.Verbose) + require.True(t, ok) + // The top-level status and pipeline status match. + assertErrorEventsMatch( + t, + component.StatusRecoverableError, + assert.AnError, + st, + st.ComponentStatusMap[tracesKey], + ) + + // Component statuses match + assertEventsMatch(t, + component.StatusOK, + collectStatuses( + st.ComponentStatusMap[tracesKey], traces.ReceiverID, traces.ProcessorID, + )..., + ) + assertErrorEventsMatch(t, + component.StatusRecoverableError, + assert.AnError, + st.ComponentStatusMap[tracesKey].ComponentStatusMap[toComponentKey(traces.ExporterID)], + ) + }) + +} + +func TestAggregateStatusPriorityRecoverable(t *testing.T) { + agg := status.NewAggregator(status.PriorityRecoverable) + traces := testhelpers.NewPipelineMetadata("traces") + + testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusOK) + + t.Run("pipeline statuses all successful", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.ScopeAll, status.Concise) + require.True(t, ok) + assert.Equal(t, component.StatusOK, st.Status()) + }) + + agg.RecordStatus( + traces.ProcessorID, + component.NewPermanentErrorEvent(assert.AnError), + ) + + t.Run("pipeline with permanent error", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.ScopeAll, status.Concise) + require.True(t, ok) + assertErrorEventsMatch(t, + component.StatusPermanentError, + assert.AnError, + st, + ) + }) + + agg.RecordStatus( + traces.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + + t.Run("pipeline with recoverable error", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.ScopeAll, status.Concise) + require.True(t, ok) + assertErrorEventsMatch(t, + component.StatusRecoverableError, + assert.AnError, + st, + ) + }) +} + +func TestPipelineAggregateStatus(t *testing.T) { + agg := status.NewAggregator(status.PriorityPermanent) + traces := testhelpers.NewPipelineMetadata("traces") + + t.Run("non existent pipeline", func(t *testing.T) { + st, ok := agg.AggregateStatus("doesnotexist", status.Concise) + require.Nil(t, st) + require.False(t, ok) + }) + + testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusOK) + + t.Run("pipeline exists / status successful", func(t *testing.T) { + st, ok := agg.AggregateStatus( + status.Scope(traces.PipelineID.String()), + status.Concise, + ) + require.True(t, ok) + assertEventsMatch(t, component.StatusOK, st) + }) + + agg.RecordStatus( + traces.ExporterID, + component.NewRecoverableErrorEvent(assert.AnError), + ) + + t.Run("pipeline exists / exporter error", func(t *testing.T) { + st, ok := agg.AggregateStatus( + status.Scope(traces.PipelineID.String()), + status.Concise, + ) + require.True(t, ok) + assertErrorEventsMatch(t, component.StatusRecoverableError, assert.AnError, st) + }) +} + +func TestPipelineAggregateStatusVerbose(t *testing.T) { + agg := status.NewAggregator(status.PriorityPermanent) + traces := testhelpers.NewPipelineMetadata("traces") + + t.Run("non existent pipeline", func(t *testing.T) { + st, ok := agg.AggregateStatus("doesnotexist", status.Verbose) + require.Nil(t, st) + require.False(t, ok) + }) + + testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusOK) + + t.Run("pipeline exists / status successful", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.Scope(traces.PipelineID.String()), status.Verbose) + require.True(t, ok) + + // Top-level status matches + assertEventsMatch(t, component.StatusOK, st) + + // Component statuses match + assertEventsMatch(t, component.StatusOK, collectStatuses(st, traces.InstanceIDs()...)...) + }) + + agg.RecordStatus(traces.ExporterID, component.NewRecoverableErrorEvent(assert.AnError)) + + t.Run("pipeline exists / exporter error", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.Scope(traces.PipelineID.String()), status.Verbose) + require.True(t, ok) + + // Top-level status matches + assertErrorEventsMatch(t, component.StatusRecoverableError, assert.AnError, st) + + // Component statuses match + assertEventsMatch(t, + component.StatusOK, + collectStatuses(st, traces.ReceiverID, traces.ProcessorID)..., + ) + assertErrorEventsMatch(t, + component.StatusRecoverableError, + assert.AnError, + st.ComponentStatusMap[toComponentKey(traces.ExporterID)], + ) + }) +} + +func TestAggregateStatusExtensions(t *testing.T) { + agg := status.NewAggregator(status.PriorityPermanent) + + extsID := component.MustNewID("extensions") + extInstanceID1 := &component.InstanceID{ + ID: component.MustNewID("ext1"), + Kind: component.KindExtension, + PipelineIDs: map[component.ID]struct{}{ + extsID: {}, + }, + } + extInstanceID2 := &component.InstanceID{ + ID: component.MustNewID("ext2"), + Kind: component.KindExtension, + PipelineIDs: map[component.ID]struct{}{ + extsID: {}, + }, + } + extInstanceIDs := []*component.InstanceID{extInstanceID1, extInstanceID2} + + testhelpers.SeedAggregator(agg, extInstanceIDs, component.StatusOK) + + t.Run("extension statuses all successful", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.ScopeExtensions, status.Concise) + require.True(t, ok) + assert.Equal(t, component.StatusOK, st.Status()) + }) + + agg.RecordStatus( + extInstanceID1, + component.NewRecoverableErrorEvent(assert.AnError), + ) + + t.Run("extension with recoverable error", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.ScopeExtensions, status.Concise) + require.True(t, ok) + assertErrorEventsMatch(t, + component.StatusRecoverableError, + assert.AnError, + st, + ) + }) + + agg.RecordStatus( + extInstanceID1, + component.NewStatusEvent(component.StatusOK), + ) + + t.Run("extensions recovered", func(t *testing.T) { + st, ok := agg.AggregateStatus(status.ScopeExtensions, status.Concise) + require.True(t, ok) + assertEventsMatch(t, + component.StatusOK, + st, + ) + }) +} + +func TestStreaming(t *testing.T) { + agg := status.NewAggregator(status.PriorityPermanent) + defer agg.Close() + + traces := testhelpers.NewPipelineMetadata("traces") + metrics := testhelpers.NewPipelineMetadata("metrics") + + traceEvents, traceUnsub := agg.Subscribe(status.Scope(traces.PipelineID.String()), status.Concise) + defer traceUnsub() + + metricEvents, metricUnsub := agg.Subscribe(status.Scope(metrics.PipelineID.String()), status.Concise) + defer metricUnsub() + + allEvents, allUnsub := agg.Subscribe(status.ScopeAll, status.Concise) + defer allUnsub() + + assert.Nil(t, <-traceEvents) + assert.Nil(t, <-metricEvents) + assert.NotNil(t, <-allEvents) + + // Start pipelines + testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusStarting) + assertEventsRecvdMatch(t, component.StatusStarting, traceEvents, allEvents) + testhelpers.SeedAggregator(agg, metrics.InstanceIDs(), component.StatusStarting) + assertEventsRecvdMatch(t, component.StatusStarting, metricEvents, allEvents) + + // Successful start + testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusOK) + assertEventsRecvdMatch(t, component.StatusOK, traceEvents) + // All is still in StatusStarting until the metrics pipeline reports OK + assertEventsRecvdMatch(t, component.StatusStarting, allEvents) + testhelpers.SeedAggregator(agg, metrics.InstanceIDs(), component.StatusOK) + assertEventsRecvdMatch(t, component.StatusOK, metricEvents, allEvents) + + // Traces Pipeline RecoverableError + agg.RecordStatus(traces.ExporterID, component.NewRecoverableErrorEvent(assert.AnError)) + assertErrorEventsRecvdMatch(t, + component.StatusRecoverableError, + assert.AnError, + traceEvents, + allEvents, + ) + + // Traces Pipeline Recover + agg.RecordStatus(traces.ExporterID, component.NewStatusEvent(component.StatusOK)) + assertEventsRecvdMatch(t, component.StatusOK, traceEvents, allEvents) + + // Stopping + testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusStopping) + assertEventsRecvdMatch(t, component.StatusStopping, traceEvents, allEvents) + testhelpers.SeedAggregator(agg, metrics.InstanceIDs(), component.StatusStopping) + assertEventsRecvdMatch(t, component.StatusStopping, metricEvents, allEvents) + + // Stopped + testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusStopped) + // All is not stopped until the metrics pipeline is stopped + assertEventsRecvdMatch(t, component.StatusStopped, traceEvents) + testhelpers.SeedAggregator(agg, metrics.InstanceIDs(), component.StatusStopped) + assertEventsRecvdMatch(t, component.StatusStopped, metricEvents, allEvents) +} + +func TestStreamingVerbose(t *testing.T) { + agg := status.NewAggregator(status.PriorityPermanent) + defer agg.Close() + + traces := testhelpers.NewPipelineMetadata("traces") + tracesKey := toPipelineKey(traces.PipelineID) + + allEvents, unsub := agg.Subscribe(status.ScopeAll, status.Verbose) + defer unsub() + + t.Run("zero value", func(t *testing.T) { + st := <-allEvents + assertEventsMatch(t, component.StatusNone, st) + assert.Empty(t, st.ComponentStatusMap) + }) + + // Seed aggregator with successful statuses for pipeline. + testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusOK) + + t.Run("pipeline statuses all successful", func(t *testing.T) { + st := <-allEvents + // The top-level status matches the pipeline status. + assertEventsMatch(t, component.StatusOK, st, st.ComponentStatusMap[tracesKey]) + + // Component statuses match + assertEventsMatch(t, + component.StatusOK, + collectStatuses(st.ComponentStatusMap[tracesKey], traces.InstanceIDs()...)..., + ) + }) + + // Record an error in the traces exporter + agg.RecordStatus(traces.ExporterID, component.NewRecoverableErrorEvent(assert.AnError)) + + t.Run("pipeline with exporter error", func(t *testing.T) { + st := <-allEvents + + // The top-level status and pipeline status match. + assertErrorEventsMatch(t, + component.StatusRecoverableError, + assert.AnError, + st, + st.ComponentStatusMap[tracesKey], + ) + + // Component statuses match + assertEventsMatch(t, + component.StatusOK, + collectStatuses( + st.ComponentStatusMap[tracesKey], traces.ReceiverID, traces.ProcessorID, + )..., + ) + assertErrorEventsMatch(t, + component.StatusRecoverableError, + assert.AnError, + st.ComponentStatusMap[tracesKey].ComponentStatusMap[toComponentKey(traces.ExporterID)], + ) + }) +} + +func TestUnsubscribe(t *testing.T) { + agg := status.NewAggregator(status.PriorityPermanent) + defer agg.Close() + + traces := testhelpers.NewPipelineMetadata("traces") + + traceEvents, traceUnsub := agg.Subscribe(status.Scope(traces.PipelineID.String()), status.Concise) + allEvents, allUnsub := agg.Subscribe(status.ScopeAll, status.Concise) + + assert.Nil(t, <-traceEvents) + assert.NotNil(t, <-allEvents) + + // Start pipeline + testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusStarting) + assertEventsRecvdMatch(t, component.StatusStarting, traceEvents, allEvents) + + traceUnsub() + + // Pipeline OK + testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusOK) + assertNoEventsRecvd(t, traceEvents) + assertEventsRecvdMatch(t, component.StatusOK, allEvents) + + allUnsub() + + // Stop pipeline + testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusStopping) + + assertNoEventsRecvd(t, traceEvents, allEvents) +} + +// assertEventMatches ensures one or more events share the expected status and are +// otherwise equal, ignoring timestamp. +func assertEventsMatch( + t *testing.T, + expectedStatus component.Status, + statuses ...*status.AggregateStatus, +) { + err0 := statuses[0].Event.Err() + for _, st := range statuses { + ev := st.Event + assert.Equal(t, expectedStatus, ev.Status()) + assert.Equal(t, err0, ev.Err()) + } +} + +// assertErrorEventMatches compares one or more status events with the expected +// status and expected error. +func assertErrorEventsMatch( + t *testing.T, + expectedStatus component.Status, + expectedErr error, + statuses ...*status.AggregateStatus, +) { + assert.True(t, component.StatusIsError(expectedStatus)) + for _, st := range statuses { + ev := st.Event + assert.Equal(t, expectedStatus, ev.Status()) + assert.Equal(t, expectedErr, ev.Err()) + } +} + +func collectStatuses( + aggregateStatus *status.AggregateStatus, + instanceIDs ...*component.InstanceID, +) (result []*status.AggregateStatus) { + for _, id := range instanceIDs { + key := toComponentKey(id) + result = append(result, aggregateStatus.ComponentStatusMap[key]) + } + return +} + +func assertEventsRecvdMatch(t *testing.T, + expectedStatus component.Status, + chans ...<-chan *status.AggregateStatus, +) { + var err0 error + for i, stCh := range chans { + st := <-stCh + ev := st.Event + if i == 0 { + err0 = ev.Err() + } + assert.Equal(t, expectedStatus, ev.Status()) + assert.Equal(t, err0, ev.Err()) + } +} + +func assertErrorEventsRecvdMatch(t *testing.T, + expectedStatus component.Status, + expectedErr error, + chans ...<-chan *status.AggregateStatus, +) { + assert.True(t, component.StatusIsError(expectedStatus)) + for _, stCh := range chans { + st := <-stCh + ev := st.Event + assert.Equal(t, expectedStatus, ev.Status()) + assert.Equal(t, expectedErr, ev.Err()) + } +} + +func toComponentKey(id *component.InstanceID) string { + return fmt.Sprintf("%s:%s", strings.ToLower(id.Kind.String()), id.ID) +} + +func toPipelineKey(id component.ID) string { + return fmt.Sprintf("pipeline:%s", id.String()) +} + +func assertNoEventsRecvd(t *testing.T, chans ...<-chan *status.AggregateStatus) { + for _, stCh := range chans { + select { + case <-stCh: + require.Fail(t, "Found unexpected event") + default: + } + } +} diff --git a/extension/healthcheckv2extension/internal/status/package_test.go b/extension/healthcheckv2extension/internal/status/package_test.go new file mode 100644 index 000000000000..312e32157c05 --- /dev/null +++ b/extension/healthcheckv2extension/internal/status/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package status // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status" + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/extension/healthcheckv2extension/internal/testhelpers/helpers.go b/extension/healthcheckv2extension/internal/testhelpers/helpers.go new file mode 100644 index 000000000000..be02ca627538 --- /dev/null +++ b/extension/healthcheckv2extension/internal/testhelpers/helpers.go @@ -0,0 +1,83 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testhelpers // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/testhelpers" + +import ( + "go.opentelemetry.io/collector/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status" +) + +// PipelineMetadata groups together component and instance IDs for a hypothetical pipeline used +// for testing purposes. +type PipelineMetadata struct { + PipelineID component.ID + ReceiverID *component.InstanceID + ProcessorID *component.InstanceID + ExporterID *component.InstanceID +} + +// InstanceIDs returns a slice of instanceIDs for components within the hypothetical pipeline. +func (p *PipelineMetadata) InstanceIDs() []*component.InstanceID { + return []*component.InstanceID{p.ReceiverID, p.ProcessorID, p.ExporterID} +} + +// NewPipelineMetadata returns a metadata for a hypothetical pipeline. +func NewPipelineMetadata(typestr string) *PipelineMetadata { + pipelineID := component.MustNewID(typestr) + return &PipelineMetadata{ + PipelineID: pipelineID, + ReceiverID: &component.InstanceID{ + ID: component.NewIDWithName(component.MustNewType(typestr), "in"), + Kind: component.KindReceiver, + PipelineIDs: map[component.ID]struct{}{ + pipelineID: {}, + }, + }, + ProcessorID: &component.InstanceID{ + ID: component.MustNewID("batch"), + Kind: component.KindProcessor, + PipelineIDs: map[component.ID]struct{}{ + pipelineID: {}, + }, + }, + ExporterID: &component.InstanceID{ + ID: component.NewIDWithName(component.MustNewType(typestr), "out"), + Kind: component.KindExporter, + PipelineIDs: map[component.ID]struct{}{ + pipelineID: {}, + }, + }, + } +} + +// NewPipelines returns a map of hypothetical pipelines identified by their stringified typeVal. +func NewPipelines(typestrs ...string) map[string]*PipelineMetadata { + result := make(map[string]*PipelineMetadata, len(typestrs)) + for _, typestr := range typestrs { + result[typestr] = NewPipelineMetadata(typestr) + } + return result +} + +// SeedAggregator records a status event for each instanceID. +func SeedAggregator( + agg *status.Aggregator, + instanceIDs []*component.InstanceID, + statuses ...component.Status, +) { + for _, st := range statuses { + for _, id := range instanceIDs { + agg.RecordStatus(id, component.NewStatusEvent(st)) + } + } +} + +func ErrPriority(config *common.ComponentHealthConfig) status.ErrorPriority { + if config != nil && config.IncludeRecoverable && !config.IncludePermanent { + return status.PriorityRecoverable + } + return status.PriorityPermanent +}