diff --git a/receiver/statsdreceiver/README.md b/receiver/statsdreceiver/README.md index ba04406c6e6d..60eab7eec2f4 100644 --- a/receiver/statsdreceiver/README.md +++ b/receiver/statsdreceiver/README.md @@ -28,6 +28,8 @@ The Following settings are optional: - `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server) +- `aggregate_by_source_address: true`(default value is true): Aggregate the metrics by source address. If it is false, the receiver will not aggregate by the source address. In that case it is recommended your application sends identifying tags and use the [groupbyattrsprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/groupbyattrsprocessor). + - `enable_metric_type: true`(default value is false): Enable the statsd receiver to be able to emit the metric type(gauge, counter, timer(in the future), histogram(in the future)) as a label. - `enable_simple_tags: true`(default value is false): Enable parsing tags that do not have a value, e.g. `#mykey` instead of `#mykey:myvalue`. DogStatsD supports such tagging. @@ -51,6 +53,7 @@ receivers: statsd/2: endpoint: "localhost:8127" aggregation_interval: 70s + aggregate_by_source_address: true enable_metric_type: true is_monotonic_counter: false timer_histogram_mapping: @@ -168,4 +171,4 @@ echo "test.metric:42|c|#myKey:myVal" | nc -w 1 -u -4 localhost 8125; echo "test.metric:42|c|#myKey:myVal" | nc -w 1 -u -6 localhost 8125; ``` -Which sends a UDP packet using both IPV4 and IPV6, which is needed because the receiver's UDP server only accepts one or the other. \ No newline at end of file +Which sends a UDP packet using both IPV4 and IPV6, which is needed because the receiver's UDP server only accepts one or the other. diff --git a/receiver/statsdreceiver/config.go b/receiver/statsdreceiver/config.go index b0ccd36302ac..9d8c6db833a2 100644 --- a/receiver/statsdreceiver/config.go +++ b/receiver/statsdreceiver/config.go @@ -18,6 +18,7 @@ import ( type Config struct { NetAddr confignet.AddrConfig `mapstructure:",squash"` AggregationInterval time.Duration `mapstructure:"aggregation_interval"` + AggregateBySourceAddr bool `mapstructure:"aggregate_by_source_address"` EnableMetricType bool `mapstructure:"enable_metric_type"` EnableSimpleTags bool `mapstructure:"enable_simple_tags"` IsMonotonicCounter bool `mapstructure:"is_monotonic_counter"` diff --git a/receiver/statsdreceiver/config_test.go b/receiver/statsdreceiver/config_test.go index ef98e777f99a..127f2b310ee7 100644 --- a/receiver/statsdreceiver/config_test.go +++ b/receiver/statsdreceiver/config_test.go @@ -42,6 +42,7 @@ func TestLoadConfig(t *testing.T) { Transport: confignet.TransportTypeUDP6, }, AggregationInterval: 70 * time.Second, + AggregateBySourceAddr: true, TimerHistogramMapping: []protocol.TimerHistogramMapping{ { StatsdType: "histogram", diff --git a/receiver/statsdreceiver/factory.go b/receiver/statsdreceiver/factory.go index 438564ebf7e5..bd0ef2113f58 100644 --- a/receiver/statsdreceiver/factory.go +++ b/receiver/statsdreceiver/factory.go @@ -19,6 +19,7 @@ import ( const ( defaultBindEndpoint = "localhost:8125" defaultAggregationInterval = 60 * time.Second + defaultAggregateBySourceAddr = true defaultEnableMetricType = false defaultIsMonotonicCounter = false ) @@ -43,6 +44,7 @@ func createDefaultConfig() component.Config { Transport: confignet.TransportTypeUDP, }, AggregationInterval: defaultAggregationInterval, + AggregateBySourceAddr: defaultAggregateBySourceAddr, EnableMetricType: defaultEnableMetricType, IsMonotonicCounter: defaultIsMonotonicCounter, TimerHistogramMapping: defaultTimerHistogramMapping, diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser.go b/receiver/statsdreceiver/internal/protocol/statsd_parser.go index ccbcdd6d108a..31532bed692a 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser.go @@ -202,6 +202,16 @@ func (p *StatsDParser) GetMetrics() []BatchMetrics { Metrics: pmetric.NewMetrics(), } rm := batch.Metrics.ResourceMetrics().AppendEmpty() + + // https://opentelemetry.io/docs/specs/semconv/attributes-registry/source/ + // Set source.address and source.port attributes if able to parse. + if instrument.addr != nil { + host, port, err := net.SplitHostPort(instrument.addr.String()) + if err == nil { + rm.Resource().Attributes().PutStr("source.address", host) + rm.Resource().Attributes().PutStr("source.port", port) + } + } for _, metric := range instrument.gauges { p.copyMetricAndScope(rm, metric) } diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go index ff32cbc2069e..2e2176822aa2 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go @@ -5,6 +5,7 @@ package protocol import ( "errors" + "fmt" "net" "testing" "time" @@ -1690,9 +1691,15 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) { }, } + sourceAddr := "1.2.3.4" + sourcePort := "5678" + newPoint := func() (pmetric.Metrics, pmetric.ExponentialHistogramDataPoint) { data := pmetric.NewMetrics() - ilm := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() + rm := data.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("source.address", sourceAddr) + rm.Resource().Attributes().PutStr("source.port", sourcePort) + ilm := rm.ScopeMetrics().AppendEmpty() m := ilm.Metrics().AppendEmpty() m.SetName("expohisto") ep := m.SetEmptyExponentialHistogram() @@ -1903,7 +1910,7 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) { var err error p := &StatsDParser{} assert.NoError(t, p.Initialize(false, false, false, tt.mapping)) - addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") + addr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", sourceAddr, sourcePort)) for _, line := range tt.input { err = p.Aggregate(line, addr) assert.NoError(t, err) diff --git a/receiver/statsdreceiver/internal/transport/client/client.go b/receiver/statsdreceiver/internal/transport/client/client.go index 8b9fd7f06ba3..7c63d7c410ad 100644 --- a/receiver/statsdreceiver/internal/transport/client/client.go +++ b/receiver/statsdreceiver/internal/transport/client/client.go @@ -14,7 +14,7 @@ import ( type StatsD struct { transport string address string - conn io.Writer + Conn net.Conn } // NewStatsD creates a new StatsD instance to support the need for testing @@ -41,13 +41,13 @@ func (s *StatsD) connect() error { if err != nil { return err } - s.conn, err = net.DialUDP(s.transport, nil, udpAddr) + s.Conn, err = net.DialUDP(s.transport, nil, udpAddr) if err != nil { return err } case "tcp": var err error - s.conn, err = net.Dial(s.transport, s.address) + s.Conn, err = net.Dial(s.transport, s.address) if err != nil { return err } @@ -61,16 +61,16 @@ func (s *StatsD) connect() error { // Disconnect closes the StatsD.conn. func (s *StatsD) Disconnect() error { var err error - if cl, ok := s.conn.(io.Closer); ok { + if cl, ok := s.Conn.(io.Closer); ok { err = cl.Close() } - s.conn = nil + s.Conn = nil return err } // SendMetric sends the input metric to the StatsD connection. func (s *StatsD) SendMetric(metric Metric) error { - _, err := io.Copy(s.conn, strings.NewReader(metric.String())) + _, err := io.Copy(s.Conn, strings.NewReader(metric.String())) if err != nil { return fmt.Errorf("send metric on test client: %w", err) } diff --git a/receiver/statsdreceiver/metadata.yaml b/receiver/statsdreceiver/metadata.yaml index e24c0521dc83..a61aaec860f9 100644 --- a/receiver/statsdreceiver/metadata.yaml +++ b/receiver/statsdreceiver/metadata.yaml @@ -8,3 +8,13 @@ status: distributions: [contrib] codeowners: active: [jmacd, dmitryax] + +resource_attributes: + source.address: + description: Source address + type: string + enabled: true + source.port: + description: Source port number + type: int + enabled: true diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 3e05ca47eb2a..51f359a71614 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -64,23 +64,40 @@ func newReceiver( return r, nil } -func buildTransportServer(config Config) (transport.Server, error) { +func buildTransportServer(config Config) (transport.Server, net.Addr, error) { // TODO: Add unix socket transport implementations - trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport))) + configTrans := strings.ToLower(string(config.NetAddr.Transport)) + trans := transport.NewTransport(configTrans) switch trans { case transport.UDP, transport.UDP4, transport.UDP6: - return transport.NewUDPServer(trans, config.NetAddr.Endpoint) + server, err := transport.NewUDPServer(trans, config.NetAddr.Endpoint) + if err != nil { + return nil, nil, err + } + serverAddr, err := net.ResolveUDPAddr(configTrans, config.NetAddr.Endpoint) + if err != nil { + return nil, nil, err + } + return server, serverAddr, nil case transport.TCP, transport.TCP4, transport.TCP6: - return transport.NewTCPServer(trans, config.NetAddr.Endpoint) + server, err := transport.NewTCPServer(trans, config.NetAddr.Endpoint) + if err != nil { + return nil, nil, err + } + serverAddr, err := net.ResolveTCPAddr(configTrans, config.NetAddr.Endpoint) + if err != nil { + return nil, nil, err + } + return server, serverAddr, nil } - return nil, fmt.Errorf("unsupported transport %q", string(config.NetAddr.Transport)) + return nil, nil, fmt.Errorf("unsupported transport %q", string(config.NetAddr.Transport)) } // Start starts a UDP server that can process StatsD messages. func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error { ctx, r.cancel = context.WithCancel(ctx) - server, err := buildTransportServer(*r.config) + server, serverAddr, err := buildTransportServer(*r.config) if err != nil { return err } @@ -116,7 +133,12 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error { } } case metric := <-transferChan: - if err := r.parser.Aggregate(metric.Raw, metric.Addr); err != nil { + // Aggregate by server address. If enabled use source address indead. + aggregateAddr := serverAddr + if r.config.AggregateBySourceAddr { + aggregateAddr = metric.Addr + } + if err := r.parser.Aggregate(metric.Raw, aggregateAddr); err != nil { r.reporter.OnDebugf("Error aggregating metric", zap.Error(err)) } case <-ctx.Done(): diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index b55456f80004..33fb20346d7d 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -6,6 +6,7 @@ package statsdreceiver import ( "context" "errors" + "net" "testing" "time" @@ -163,3 +164,124 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { }) } } + +func Test_statsdreceiver_resource_attribute_source(t *testing.T) { + serverAddr := testutil.GetAvailableLocalNetworkAddress(t, "udp") + firstStatsdClient, err := client.NewStatsD("udp", serverAddr) + require.NoError(t, err) + secondStatsdClient, err := client.NewStatsD("udp", serverAddr) + require.NoError(t, err) + t.Run("aggregate by source address with two clients sending", func(t *testing.T) { + cfg := &Config{ + NetAddr: confignet.AddrConfig{ + Endpoint: serverAddr, + Transport: confignet.TransportTypeUDP, + }, + AggregationInterval: 4 * time.Second, + AggregateBySourceAddr: true, + } + sink := new(consumertest.MetricsSink) + rcv, err := newReceiver(receivertest.NewNopCreateSettings(), *cfg, sink) + require.NoError(t, err) + r := rcv.(*statsdReceiver) + + mr := transport.NewMockReporter(1) + r.reporter = mr + + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + assert.NoError(t, r.Shutdown(context.Background())) + }() + + statsdMetric := client.Metric{ + Name: "test.metric", + Value: "42", + Type: "c", + } + err = firstStatsdClient.SendMetric(statsdMetric) + require.NoError(t, err) + err = secondStatsdClient.SendMetric(statsdMetric) + require.NoError(t, err) + + // Wait for aggregation interval + time.Sleep(5 * time.Second) + + // We should have two resources which one metric each + mdd := sink.AllMetrics() + require.Len(t, mdd, 2) + require.Equal(t, 1, mdd[0].ResourceMetrics().Len()) + require.Equal(t, 1, mdd[1].ResourceMetrics().Len()) + + // The resources should have source attributes matching the client addr + firstResource := mdd[0].ResourceMetrics().At(0) + sourceAddress, exists := firstResource.Resource().Attributes().Get("source.address") + assert.Equal(t, true, exists) + sourcePort, exists := firstResource.Resource().Attributes().Get("source.port") + assert.Equal(t, true, exists) + clientAddress, clientPort, err := net.SplitHostPort(firstStatsdClient.Conn.LocalAddr().String()) + require.NoError(t, err) + assert.Equal(t, clientAddress, sourceAddress.AsString()) + assert.Equal(t, clientPort, sourcePort.AsString()) + + secondResource := mdd[1].ResourceMetrics().At(0) + sourceAddress, exists = secondResource.Resource().Attributes().Get("source.address") + assert.Equal(t, true, exists) + sourcePort, exists = secondResource.Resource().Attributes().Get("source.port") + assert.Equal(t, true, exists) + clientAddress, clientPort, err = net.SplitHostPort(secondStatsdClient.Conn.LocalAddr().String()) + require.NoError(t, err) + assert.Equal(t, clientAddress, sourceAddress.AsString()) + assert.Equal(t, clientPort, sourcePort.AsString()) + }) + t.Run("do not aggregate by source address with two clients sending", func(t *testing.T) { + cfg := &Config{ + NetAddr: confignet.AddrConfig{ + Endpoint: serverAddr, + Transport: confignet.TransportTypeUDP, + }, + AggregationInterval: 4 * time.Second, + AggregateBySourceAddr: false, + } + sink := new(consumertest.MetricsSink) + rcv, err := newReceiver(receivertest.NewNopCreateSettings(), *cfg, sink) + require.NoError(t, err) + r := rcv.(*statsdReceiver) + + mr := transport.NewMockReporter(1) + r.reporter = mr + + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + assert.NoError(t, r.Shutdown(context.Background())) + }() + + statsdMetric := client.Metric{ + Name: "test.metric", + Value: "42", + Type: "c", + } + err = firstStatsdClient.SendMetric(statsdMetric) + require.NoError(t, err) + err = secondStatsdClient.SendMetric(statsdMetric) + require.NoError(t, err) + + // Wait for aggregation interval + time.Sleep(5 * time.Second) + + // We should have one resource with one metric due to disabled aggregation by source address + mdd := sink.AllMetrics() + require.Len(t, mdd, 1) + require.Equal(t, 1, mdd[0].ResourceMetrics().Len()) + + // The resources should have source attribute matching the servers address + resource := mdd[0].ResourceMetrics().At(0) + sourceAddress, exists := resource.Resource().Attributes().Get("source.address") + assert.Equal(t, true, exists) + sourcePort, exists := resource.Resource().Attributes().Get("source.port") + assert.Equal(t, true, exists) + serverAddr, serverPort, err := net.SplitHostPort(serverAddr) + require.NoError(t, err) + assert.Equal(t, serverAddr, sourceAddress.AsString()) + assert.Equal(t, serverPort, sourcePort.AsString()) + }) +} diff --git a/receiver/statsdreceiver/testdata/config.yaml b/receiver/statsdreceiver/testdata/config.yaml index ed319d8e5781..076a202d022c 100644 --- a/receiver/statsdreceiver/testdata/config.yaml +++ b/receiver/statsdreceiver/testdata/config.yaml @@ -3,6 +3,7 @@ statsd/receiver_settings: endpoint: "localhost:12345" transport: "udp6" aggregation_interval: 70s + aggregate_by_source_address: true enable_metric_type: false timer_histogram_mapping: - statsd_type: "histogram"