Skip to content

Commit

Permalink
Fix internal/otelarrow and receiver/otelarrow
Browse files Browse the repository at this point in the history
  • Loading branch information
mx-psi committed Jan 24, 2025
1 parent e347753 commit 4016d9f
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 38 deletions.
3 changes: 0 additions & 3 deletions internal/otelarrow/admission2/boundedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -47,8 +46,6 @@ func newBQTest(t *testing.T, maxAdmit, maxWait uint64) bqTest {
sdkmetric.WithReader(reader),
)
settings.MeterProvider = provider
settings.MetricsLevel = configtelemetry.LevelDetailed

bq, err := NewBoundedQueue(component.MustNewID("admission_testing"), settings, maxAdmit, maxWait)
require.NoError(t, err)
return bqTest{
Expand Down
39 changes: 8 additions & 31 deletions internal/otelarrow/netstats/netstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package netstats // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"context"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -128,13 +127,6 @@ func makeRecvMetrics(prefix string, meter metric.Meter, major bool) (recv, recvW

// NewExporterNetworkReporter creates a new NetworkReporter configured for an exporter.
func NewExporterNetworkReporter(settings exporter.Settings) (*NetworkReporter, error) {
level := settings.TelemetrySettings.MetricsLevel

if level <= configtelemetry.LevelBasic {
// Note: NetworkReporter implements nil a check.
return nil, nil
}

meter := settings.TelemetrySettings.MeterProvider.Meter(scopeName)
rep := &NetworkReporter{
isExporter: true,
Expand All @@ -143,33 +135,22 @@ func NewExporterNetworkReporter(settings exporter.Settings) (*NetworkReporter, e
}

var errors, err error
if level > configtelemetry.LevelNormal {
rep.compSizeHisto, err = meter.Int64Histogram("otelcol_"+ExporterKey+"_"+CompSize, metric.WithDescription(compSizeDescription), metric.WithUnit(bytesUnit))
errors = multierr.Append(errors, err)
}
rep.compSizeHisto, err = meter.Int64Histogram("otelcol_"+ExporterKey+"_"+CompSize, metric.WithDescription(compSizeDescription), metric.WithUnit(bytesUnit))
errors = multierr.Append(errors, err)

rep.sentBytes, rep.sentWireBytes, err = makeSentMetrics("otelcol_"+ExporterKey, meter, true)
errors = multierr.Append(errors, err)

// Normally, an exporter counts sent bytes, and skips received
// bytes. LevelDetailed will reveal exporter-received bytes.
if level > configtelemetry.LevelNormal {
rep.recvBytes, rep.recvWireBytes, err = makeRecvMetrics("otelcol_"+ExporterKey, meter, false)
errors = multierr.Append(errors, err)
}
rep.recvBytes, rep.recvWireBytes, err = makeRecvMetrics("otelcol_"+ExporterKey, meter, false)
errors = multierr.Append(errors, err)

return rep, errors
}

// NewReceiverNetworkReporter creates a new NetworkReporter configured for an exporter.
func NewReceiverNetworkReporter(settings receiver.Settings) (*NetworkReporter, error) {
level := settings.TelemetrySettings.MetricsLevel

if level <= configtelemetry.LevelBasic {
// Note: NetworkReporter implements nil a check.
return nil, nil
}

meter := settings.MeterProvider.Meter(scopeName)
rep := &NetworkReporter{
isExporter: false,
Expand All @@ -178,20 +159,16 @@ func NewReceiverNetworkReporter(settings receiver.Settings) (*NetworkReporter, e
}

var errors, err error
if level > configtelemetry.LevelNormal {
rep.compSizeHisto, err = meter.Int64Histogram("otelcol_"+ReceiverKey+"_"+CompSize, metric.WithDescription(compSizeDescription), metric.WithUnit(bytesUnit))
errors = multierr.Append(errors, err)
}
rep.compSizeHisto, err = meter.Int64Histogram("otelcol_"+ReceiverKey+"_"+CompSize, metric.WithDescription(compSizeDescription), metric.WithUnit(bytesUnit))
errors = multierr.Append(errors, err)

rep.recvBytes, rep.recvWireBytes, err = makeRecvMetrics("otelcol_"+ReceiverKey, meter, true)
errors = multierr.Append(errors, err)

// Normally, a receiver counts received bytes, and skips sent
// bytes. LevelDetailed will reveal receiver-sent bytes.
if level > configtelemetry.LevelNormal {
rep.sentBytes, rep.sentWireBytes, err = makeSentMetrics("otelcol_"+ReceiverKey, meter, false)
errors = multierr.Append(errors, err)
}
rep.sentBytes, rep.sentWireBytes, err = makeSentMetrics("otelcol_"+ReceiverKey, meter, false)
errors = multierr.Append(errors, err)

return rep, errors
}
Expand Down
82 changes: 80 additions & 2 deletions internal/otelarrow/netstats/netstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,90 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"google.golang.org/grpc/stats"
)

func dropView(instrument metric.Instrument) metric.View {
return metric.NewView(
instrument,
metric.Stream{
Aggregation: metric.AggregationDrop{},
},
)
}

func viewsFromLevel(level configtelemetry.Level) []metric.View {
var views []metric.View

if level == configtelemetry.LevelNone {
return []metric.View{dropView(metric.Instrument{Name: "*"})}
}

// otel-arrow library metrics
// See https://github.com/open-telemetry/otel-arrow/blob/c39257/pkg/otel/arrow_record/consumer.go#L174-L176
if level < configtelemetry.LevelNormal {
scope := instrumentation.Scope{Name: "otel-arrow/pkg/otel/arrow_record"}
views = append(views,
dropView(metric.Instrument{
Name: "arrow_batch_records",
Scope: scope,
}),
dropView(metric.Instrument{
Name: "arrow_schema_resets",
Scope: scope,
}),
dropView(metric.Instrument{
Name: "arrow_memory_inuse",
Scope: scope,
}),
)
}

// contrib's internal/otelarrow/netstats metrics
// See
// - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L130
// - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L165
if level < configtelemetry.LevelDetailed {
scope := instrumentation.Scope{Name: scopeName}
// Compressed size metrics.
views = append(views, dropView(metric.Instrument{
Name: "otelcol_*_compressed_size",
Scope: scope,
}))

views = append(views, dropView(metric.Instrument{
Name: "otelcol_*_compressed_size",
Scope: scope,
}))

// makeRecvMetrics for exporters.
views = append(views, dropView(metric.Instrument{
Name: "otelcol_exporter_recv",
Scope: scope,
}))
views = append(views, dropView(metric.Instrument{
Name: "otelcol_exporter_recv_wire",
Scope: scope,
}))

// makeSentMetrics for receivers.
views = append(views, dropView(metric.Instrument{
Name: "otelcol_receiver_sent",
Scope: scope,
}))
views = append(views, dropView(metric.Instrument{
Name: "otelcol_receiver_sent_wire",
Scope: scope,
}))
}
return views
}

func metricValues(t *testing.T, rm metricdata.ResourceMetrics, expectMethod string) map[string]any {
res := map[string]any{}
for _, sm := range rm.ScopeMetrics {
Expand Down Expand Up @@ -73,6 +150,7 @@ func TestNetStatsExporterDetailed(t *testing.T) {
}

func testNetStatsExporter(t *testing.T, level configtelemetry.Level, expect map[string]any) {
t.Helper()
for _, apiDirect := range []bool{true, false} {
t.Run(func() string {
if apiDirect {
Expand All @@ -84,12 +162,12 @@ func testNetStatsExporter(t *testing.T, level configtelemetry.Level, expect map[
mp := metric.NewMeterProvider(
metric.WithResource(resource.Empty()),
metric.WithReader(rdr),
metric.WithView(viewsFromLevel(level)...),
)
enr, err := NewExporterNetworkReporter(exporter.Settings{
ID: component.NewID(component.MustNewType("test")),
TelemetrySettings: component.TelemetrySettings{
MeterProvider: mp,
MetricsLevel: level,
},
})
require.NoError(t, err)
Expand Down Expand Up @@ -224,6 +302,7 @@ func testNetStatsReceiver(t *testing.T, level configtelemetry.Level, expect map[
mp := metric.NewMeterProvider(
metric.WithResource(resource.Empty()),
metric.WithReader(rdr),
metric.WithView(viewsFromLevel(level)...),
)
rer, err := NewReceiverNetworkReporter(receiver.Settings{
ID: component.NewID(component.MustNewType("test")),
Expand Down Expand Up @@ -284,7 +363,6 @@ func TestUncompressedSizeBypass(t *testing.T) {
ID: component.NewID(component.MustNewType("test")),
TelemetrySettings: component.TelemetrySettings{
MeterProvider: mp,
MetricsLevel: configtelemetry.LevelDetailed,
},
})
require.NoError(t, err)
Expand Down
2 changes: 0 additions & 2 deletions receiver/otelarrowreceiver/otelarrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand Down Expand Up @@ -219,7 +218,6 @@ func newGRPCReceiver(t *testing.T, endpoint string, settings component.Telemetry
func newReceiver(t *testing.T, factory receiver.Factory, settings component.TelemetrySettings, cfg *Config, id component.ID, tc consumer.Traces, mc consumer.Metrics) component.Component {
set := receivertest.NewNopSettings()
set.TelemetrySettings = settings
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal
set.ID = id
var r component.Component
var err error
Expand Down

0 comments on commit 4016d9f

Please sign in to comment.