diff --git a/.github/ISSUE_TEMPLATE/bug_report.yaml b/.github/ISSUE_TEMPLATE/bug_report.yaml index 7b4c04ed5a46..b3e4845bf4ee 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yaml +++ b/.github/ISSUE_TEMPLATE/bug_report.yaml @@ -143,6 +143,7 @@ body: - pkg/resourcetotelemetry - pkg/sampling - pkg/stanza + - pkg/stanza/fileconsumer - pkg/translator/azure - pkg/translator/jaeger - pkg/translator/loki diff --git a/.github/ISSUE_TEMPLATE/feature_request.yaml b/.github/ISSUE_TEMPLATE/feature_request.yaml index c7c0697c10f2..1cb027e98ac2 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.yaml +++ b/.github/ISSUE_TEMPLATE/feature_request.yaml @@ -137,6 +137,7 @@ body: - pkg/resourcetotelemetry - pkg/sampling - pkg/stanza + - pkg/stanza/fileconsumer - pkg/translator/azure - pkg/translator/jaeger - pkg/translator/loki diff --git a/.github/ISSUE_TEMPLATE/other.yaml b/.github/ISSUE_TEMPLATE/other.yaml index cb07de2bd476..04ebdfa20a03 100644 --- a/.github/ISSUE_TEMPLATE/other.yaml +++ b/.github/ISSUE_TEMPLATE/other.yaml @@ -137,6 +137,7 @@ body: - pkg/resourcetotelemetry - pkg/sampling - pkg/stanza + - pkg/stanza/fileconsumer - pkg/translator/azure - pkg/translator/jaeger - pkg/translator/loki diff --git a/.github/ISSUE_TEMPLATE/unmaintained.yaml b/.github/ISSUE_TEMPLATE/unmaintained.yaml index 7e7ebe069cb9..4d41898357f0 100644 --- a/.github/ISSUE_TEMPLATE/unmaintained.yaml +++ b/.github/ISSUE_TEMPLATE/unmaintained.yaml @@ -142,6 +142,7 @@ body: - pkg/resourcetotelemetry - pkg/sampling - pkg/stanza + - pkg/stanza/fileconsumer - pkg/translator/azure - pkg/translator/jaeger - pkg/translator/loki diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 6c7db8828a26..03c481cacc1f 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -1,6 +1,8 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +//go:generate mdatagen metadata.yaml + package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" import ( @@ -12,7 +14,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/featuregate" - "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "golang.org/x/text/encoding" @@ -21,6 +22,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" @@ -35,8 +37,6 @@ const ( defaultMaxConcurrentFiles = 1024 defaultEncoding = "utf-8" defaultPollInterval = 200 * time.Millisecond - openFilesMetric = "fileconsumer/open_files" - readingFilesMetric = "fileconsumer/reading_files" ) var allowFileDeletion = featuregate.GlobalRegistry().MustRegister( @@ -179,34 +179,19 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts t = tracker.NewFileTracker(set, c.MaxConcurrentFiles/2) } - meter := set.MeterProvider.Meter("otelcol/fileconsumer") - - openFiles, err := meter.Int64UpDownCounter( - openFilesMetric, - metric.WithDescription("Number of open files"), - metric.WithUnit("1"), - ) - if err != nil { - return nil, err - } - readingFiles, err := meter.Int64UpDownCounter( - readingFilesMetric, - metric.WithDescription("Number of open files that are being read"), - metric.WithUnit("1"), - ) + telemetryBuilder, err := metadata.NewTelemetryBuilder(set) if err != nil { return nil, err } return &Manager{ - set: set, - readerFactory: readerFactory, - fileMatcher: fileMatcher, - pollInterval: c.PollInterval, - maxBatchFiles: c.MaxConcurrentFiles / 2, - maxBatches: c.MaxBatches, - tracker: t, - openFiles: openFiles, - readingFiles: readingFiles, + set: set, + readerFactory: readerFactory, + fileMatcher: fileMatcher, + pollInterval: c.PollInterval, + maxBatchFiles: c.MaxConcurrentFiles / 2, + maxBatches: c.MaxBatches, + tracker: t, + telemetryBuilder: telemetryBuilder, }, nil } diff --git a/pkg/stanza/fileconsumer/documentation.md b/pkg/stanza/fileconsumer/documentation.md new file mode 100644 index 000000000000..128912d1a2eb --- /dev/null +++ b/pkg/stanza/fileconsumer/documentation.md @@ -0,0 +1,23 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# fileconsumer + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### fileconsumer_open_files + +Number of open files + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | false | + +### fileconsumer_reading_files + +Number of open files that are being read + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | false | diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 7599703ed0a0..d46507ecf3eb 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -11,11 +11,11 @@ import ( "time" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" @@ -36,8 +36,7 @@ type Manager struct { maxBatches int maxBatchFiles int - openFiles metric.Int64UpDownCounter - readingFiles metric.Int64UpDownCounter + telemetryBuilder *metadata.TelemetryBuilder } func (m *Manager) Start(persister operator.Persister) error { @@ -74,7 +73,7 @@ func (m *Manager) Stop() error { m.cancel = nil } m.wg.Wait() - m.openFiles.Add(context.TODO(), int64(0-m.tracker.ClosePreviousFiles())) + m.telemetryBuilder.FileconsumerOpenFiles.Add(context.TODO(), int64(0-m.tracker.ClosePreviousFiles())) if m.persister != nil { if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil { m.set.Logger.Error("save offsets", zap.Error(err)) @@ -157,14 +156,14 @@ func (m *Manager) consume(ctx context.Context, paths []string) { wg.Add(1) go func(r *reader.Reader) { defer wg.Done() - m.readingFiles.Add(ctx, 1) + m.telemetryBuilder.FileconsumerReadingFiles.Add(ctx, 1) r.ReadToEnd(ctx) - m.readingFiles.Add(ctx, -1) + m.telemetryBuilder.FileconsumerReadingFiles.Add(ctx, -1) }(r) } wg.Wait() - m.openFiles.Add(ctx, int64(0-m.tracker.EndConsume())) + m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, int64(0-m.tracker.EndConsume())) } func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) { @@ -249,7 +248,7 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint. if err != nil { return nil, err } - m.openFiles.Add(ctx, 1) + m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1) return r, nil } @@ -259,6 +258,6 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint. if err != nil { return nil, err } - m.openFiles.Add(ctx, 1) + m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1) return r, nil } diff --git a/pkg/stanza/fileconsumer/file_other.go b/pkg/stanza/fileconsumer/file_other.go index 5acc955e8448..b59165683d83 100644 --- a/pkg/stanza/fileconsumer/file_other.go +++ b/pkg/stanza/fileconsumer/file_other.go @@ -57,9 +57,9 @@ OUTER: m.set.Logger.Debug("Reading lost file", zap.String("path", lostReader.GetFileName())) go func(r *reader.Reader) { defer lostWG.Done() - m.readingFiles.Add(ctx, 1) + m.telemetryBuilder.FileconsumerReadingFiles.Add(ctx, 1) r.ReadToEnd(ctx) - m.readingFiles.Add(ctx, -1) + m.telemetryBuilder.FileconsumerReadingFiles.Add(ctx, -1) }(lostReader) } lostWG.Wait() diff --git a/pkg/stanza/fileconsumer/generated_component_telemetry_test.go b/pkg/stanza/fileconsumer/generated_component_telemetry_test.go new file mode 100644 index 000000000000..16e4146b22cb --- /dev/null +++ b/pkg/stanza/fileconsumer/generated_component_telemetry_test.go @@ -0,0 +1,64 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package fileconsumer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +type componentTestTelemetry struct { + reader *sdkmetric.ManualReader + meterProvider *sdkmetric.MeterProvider +} + +func setupTestTelemetry() componentTestTelemetry { + reader := sdkmetric.NewManualReader() + return componentTestTelemetry{ + reader: reader, + meterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)), + } +} + +func (tt *componentTestTelemetry) assertMetrics(t *testing.T, expected []metricdata.Metrics) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.reader.Collect(context.Background(), &md)) + // ensure all required metrics are present + for _, want := range expected { + got := tt.getMetric(want.Name, md) + metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) + } + + // ensure no additional metrics are emitted + require.Equal(t, len(expected), tt.len(md)) +} + +func (tt *componentTestTelemetry) getMetric(name string, got metricdata.ResourceMetrics) metricdata.Metrics { + for _, sm := range got.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + + return metricdata.Metrics{} +} + +func (tt *componentTestTelemetry) len(got metricdata.ResourceMetrics) int { + metricsCount := 0 + for _, sm := range got.ScopeMetrics { + metricsCount += len(sm.Metrics) + } + + return metricsCount +} + +func (tt *componentTestTelemetry) Shutdown(ctx context.Context) error { + return tt.meterProvider.Shutdown(ctx) +} diff --git a/pkg/stanza/fileconsumer/package_test.go b/pkg/stanza/fileconsumer/generated_package_test.go similarity index 61% rename from pkg/stanza/fileconsumer/package_test.go rename to pkg/stanza/fileconsumer/generated_package_test.go index ce83ffab6419..82364a087892 100644 --- a/pkg/stanza/fileconsumer/package_test.go +++ b/pkg/stanza/fileconsumer/generated_package_test.go @@ -1,5 +1,4 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 +// Code generated by mdatagen. DO NOT EDIT. package fileconsumer diff --git a/pkg/stanza/fileconsumer/internal/metadata/generated_telemetry.go b/pkg/stanza/fileconsumer/internal/metadata/generated_telemetry.go new file mode 100644 index 000000000000..022f426fc0e8 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/metadata/generated_telemetry.go @@ -0,0 +1,69 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "errors" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" +) + +func Meter(settings component.TelemetrySettings) metric.Meter { + return settings.MeterProvider.Meter("otelcol/fileconsumer") +} + +func Tracer(settings component.TelemetrySettings) trace.Tracer { + return settings.TracerProvider.Tracer("otelcol/fileconsumer") +} + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + FileconsumerOpenFiles metric.Int64UpDownCounter + FileconsumerReadingFiles metric.Int64UpDownCounter + level configtelemetry.Level +} + +// telemetryBuilderOption applies changes to default builder. +type telemetryBuilderOption func(*TelemetryBuilder) + +// WithLevel sets the current telemetry level for the component. +func WithLevel(lvl configtelemetry.Level) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.level = lvl + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{level: configtelemetry.LevelBasic} + for _, op := range options { + op(&builder) + } + var err, errs error + if builder.level >= configtelemetry.LevelBasic { + builder.meter = Meter(settings) + } else { + builder.meter = noop.Meter{} + } + builder.FileconsumerOpenFiles, err = builder.meter.Int64UpDownCounter( + "fileconsumer_open_files", + metric.WithDescription("Number of open files"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.FileconsumerReadingFiles, err = builder.meter.Int64UpDownCounter( + "fileconsumer_reading_files", + metric.WithDescription("Number of open files that are being read"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/pkg/stanza/fileconsumer/internal/metadata/generated_telemetry_test.go b/pkg/stanza/fileconsumer/internal/metadata/generated_telemetry_test.go new file mode 100644 index 000000000000..1517f672ddd7 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/metadata/generated_telemetry_test.go @@ -0,0 +1,76 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + embeddedmetric "go.opentelemetry.io/otel/metric/embedded" + noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + embeddedtrace "go.opentelemetry.io/otel/trace/embedded" + nooptrace "go.opentelemetry.io/otel/trace/noop" + + "go.opentelemetry.io/collector/component" +) + +type mockMeter struct { + noopmetric.Meter + name string +} +type mockMeterProvider struct { + embeddedmetric.MeterProvider +} + +func (m mockMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter { + return mockMeter{name: name} +} + +type mockTracer struct { + nooptrace.Tracer + name string +} + +type mockTracerProvider struct { + embeddedtrace.TracerProvider +} + +func (m mockTracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.Tracer { + return mockTracer{name: name} +} + +func TestProviders(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + + meter := Meter(set) + if m, ok := meter.(mockMeter); ok { + require.Equal(t, "otelcol/fileconsumer", m.name) + } else { + require.Fail(t, "returned Meter not mockMeter") + } + + tracer := Tracer(set) + if m, ok := tracer.(mockTracer); ok { + require.Equal(t, "otelcol/fileconsumer", m.name) + } else { + require.Fail(t, "returned Meter not mockTracer") + } +} + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/pkg/stanza/fileconsumer/metadata.yaml b/pkg/stanza/fileconsumer/metadata.yaml new file mode 100644 index 000000000000..d17aa17c4008 --- /dev/null +++ b/pkg/stanza/fileconsumer/metadata.yaml @@ -0,0 +1,26 @@ +type: fileconsumer +scope_name: otelcol/fileconsumer + +status: + class: pkg + stability: + beta: [logs] + codeowners: + active: [djaglowski] + +telemetry: + metrics: + fileconsumer_open_files: + description: Number of open files + unit: "1" + enabled: true + sum: + value_type: int + monotonic: false + fileconsumer_reading_files: + description: Number of open files that are being read + unit: "1" + enabled: true + sum: + value_type: int + monotonic: false diff --git a/pkg/stanza/go.mod b/pkg/stanza/go.mod index 0c92a50f6544..d2657774f683 100644 --- a/pkg/stanza/go.mod +++ b/pkg/stanza/go.mod @@ -16,6 +16,7 @@ require ( github.com/stretchr/testify v1.9.0 github.com/valyala/fastjson v1.6.4 go.opentelemetry.io/collector/component v0.103.0 + go.opentelemetry.io/collector/config/configtelemetry v0.103.0 go.opentelemetry.io/collector/config/configtls v0.103.0 go.opentelemetry.io/collector/confmap v0.103.0 go.opentelemetry.io/collector/consumer v0.103.0 @@ -24,6 +25,8 @@ require ( go.opentelemetry.io/collector/pdata v1.10.0 go.opentelemetry.io/collector/receiver v0.103.0 go.opentelemetry.io/otel/metric v1.27.0 + go.opentelemetry.io/otel/sdk/metric v1.27.0 + go.opentelemetry.io/otel/trace v1.27.0 go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 @@ -60,13 +63,10 @@ require ( github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/collector v0.103.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.10.0 // indirect - go.opentelemetry.io/collector/config/configtelemetry v0.103.0 // indirect go.opentelemetry.io/collector/semconv v0.103.0 // indirect go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect go.opentelemetry.io/otel/sdk v1.27.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.27.0 // indirect - go.opentelemetry.io/otel/trace v1.27.0 // indirect golang.org/x/net v0.25.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5 // indirect google.golang.org/grpc v1.64.0 // indirect