Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[statsdreceiver] add source attributes and option to disable aggregation by source address #33010

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion receiver/statsdreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ The Following settings are optional:

- `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server)

- `aggregate_by_source_address: true`(default value is true): Aggregate the metrics by source address. If it is false, the receiver will not aggregate by the source address. In that case it is recommended your application sends identifying tags and use the [groupbyattrsprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/groupbyattrsprocessor).

- `enable_metric_type: true`(default value is false): Enable the statsd receiver to be able to emit the metric type(gauge, counter, timer(in the future), histogram(in the future)) as a label.

- `enable_simple_tags: true`(default value is false): Enable parsing tags that do not have a value, e.g. `#mykey` instead of `#mykey:myvalue`. DogStatsD supports such tagging.
Expand All @@ -51,6 +53,7 @@ receivers:
statsd/2:
endpoint: "localhost:8127"
aggregation_interval: 70s
aggregate_by_source_address: true
enable_metric_type: true
is_monotonic_counter: false
timer_histogram_mapping:
Expand Down Expand Up @@ -168,4 +171,4 @@ echo "test.metric:42|c|#myKey:myVal" | nc -w 1 -u -4 localhost 8125;
echo "test.metric:42|c|#myKey:myVal" | nc -w 1 -u -6 localhost 8125;
```

Which sends a UDP packet using both IPV4 and IPV6, which is needed because the receiver's UDP server only accepts one or the other.
Which sends a UDP packet using both IPV4 and IPV6, which is needed because the receiver's UDP server only accepts one or the other.
1 change: 1 addition & 0 deletions receiver/statsdreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
type Config struct {
NetAddr confignet.AddrConfig `mapstructure:",squash"`
AggregationInterval time.Duration `mapstructure:"aggregation_interval"`
AggregateBySourceAddr bool `mapstructure:"aggregate_by_source_address"`
EnableMetricType bool `mapstructure:"enable_metric_type"`
EnableSimpleTags bool `mapstructure:"enable_simple_tags"`
IsMonotonicCounter bool `mapstructure:"is_monotonic_counter"`
Expand Down
1 change: 1 addition & 0 deletions receiver/statsdreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestLoadConfig(t *testing.T) {
Transport: confignet.TransportTypeUDP6,
},
AggregationInterval: 70 * time.Second,
AggregateBySourceAddr: true,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{
StatsdType: "histogram",
Expand Down
2 changes: 2 additions & 0 deletions receiver/statsdreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
const (
defaultBindEndpoint = "localhost:8125"
defaultAggregationInterval = 60 * time.Second
defaultAggregateBySourceAddr = true
defaultEnableMetricType = false
defaultIsMonotonicCounter = false
)
Expand All @@ -43,6 +44,7 @@ func createDefaultConfig() component.Config {
Transport: confignet.TransportTypeUDP,
},
AggregationInterval: defaultAggregationInterval,
AggregateBySourceAddr: defaultAggregateBySourceAddr,
EnableMetricType: defaultEnableMetricType,
IsMonotonicCounter: defaultIsMonotonicCounter,
TimerHistogramMapping: defaultTimerHistogramMapping,
Expand Down
10 changes: 10 additions & 0 deletions receiver/statsdreceiver/internal/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,16 @@ func (p *StatsDParser) GetMetrics() []BatchMetrics {
Metrics: pmetric.NewMetrics(),
}
rm := batch.Metrics.ResourceMetrics().AppendEmpty()

// https://opentelemetry.io/docs/specs/semconv/attributes-registry/source/
// Set source.address and source.port attributes if able to parse.
if instrument.addr != nil {
host, port, err := net.SplitHostPort(instrument.addr.String())
if err == nil {
rm.Resource().Attributes().PutStr("source.address", host)
rm.Resource().Attributes().PutStr("source.port", port)
}
}
for _, metric := range instrument.gauges {
p.copyMetricAndScope(rm, metric)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package protocol

import (
"errors"
"fmt"
"net"
"testing"
"time"
Expand Down Expand Up @@ -1690,9 +1691,15 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) {
},
}

sourceAddr := "1.2.3.4"
sourcePort := "5678"

newPoint := func() (pmetric.Metrics, pmetric.ExponentialHistogramDataPoint) {
data := pmetric.NewMetrics()
ilm := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
rm := data.ResourceMetrics().AppendEmpty()
rm.Resource().Attributes().PutStr("source.address", sourceAddr)
rm.Resource().Attributes().PutStr("source.port", sourcePort)
ilm := rm.ScopeMetrics().AppendEmpty()
m := ilm.Metrics().AppendEmpty()
m.SetName("expohisto")
ep := m.SetEmptyExponentialHistogram()
Expand Down Expand Up @@ -1903,7 +1910,7 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) {
var err error
p := &StatsDParser{}
assert.NoError(t, p.Initialize(false, false, false, tt.mapping))
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
addr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", sourceAddr, sourcePort))
for _, line := range tt.input {
err = p.Aggregate(line, addr)
assert.NoError(t, err)
Expand Down
12 changes: 6 additions & 6 deletions receiver/statsdreceiver/internal/transport/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type StatsD struct {
transport string
address string
conn io.Writer
Conn net.Conn
}

// NewStatsD creates a new StatsD instance to support the need for testing
Expand All @@ -41,13 +41,13 @@ func (s *StatsD) connect() error {
if err != nil {
return err
}
s.conn, err = net.DialUDP(s.transport, nil, udpAddr)
s.Conn, err = net.DialUDP(s.transport, nil, udpAddr)
if err != nil {
return err
}
case "tcp":
var err error
s.conn, err = net.Dial(s.transport, s.address)
s.Conn, err = net.Dial(s.transport, s.address)
if err != nil {
return err
}
Expand All @@ -61,16 +61,16 @@ func (s *StatsD) connect() error {
// Disconnect closes the StatsD.conn.
func (s *StatsD) Disconnect() error {
var err error
if cl, ok := s.conn.(io.Closer); ok {
if cl, ok := s.Conn.(io.Closer); ok {
err = cl.Close()
}
s.conn = nil
s.Conn = nil
return err
}

// SendMetric sends the input metric to the StatsD connection.
func (s *StatsD) SendMetric(metric Metric) error {
_, err := io.Copy(s.conn, strings.NewReader(metric.String()))
_, err := io.Copy(s.Conn, strings.NewReader(metric.String()))
if err != nil {
return fmt.Errorf("send metric on test client: %w", err)
}
Expand Down
10 changes: 10 additions & 0 deletions receiver/statsdreceiver/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,13 @@ status:
distributions: [contrib]
codeowners:
active: [jmacd, dmitryax]

resource_attributes:
source.address:
description: Source address
type: string
enabled: true
source.port:
description: Source port number
type: int
enabled: true
36 changes: 29 additions & 7 deletions receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,40 @@ func newReceiver(
return r, nil
}

func buildTransportServer(config Config) (transport.Server, error) {
func buildTransportServer(config Config) (transport.Server, net.Addr, error) {
// TODO: Add unix socket transport implementations
trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
configTrans := strings.ToLower(string(config.NetAddr.Transport))
trans := transport.NewTransport(configTrans)
switch trans {
case transport.UDP, transport.UDP4, transport.UDP6:
return transport.NewUDPServer(trans, config.NetAddr.Endpoint)
server, err := transport.NewUDPServer(trans, config.NetAddr.Endpoint)
if err != nil {
return nil, nil, err
}
serverAddr, err := net.ResolveUDPAddr(configTrans, config.NetAddr.Endpoint)
if err != nil {
return nil, nil, err
}
return server, serverAddr, nil
case transport.TCP, transport.TCP4, transport.TCP6:
return transport.NewTCPServer(trans, config.NetAddr.Endpoint)
server, err := transport.NewTCPServer(trans, config.NetAddr.Endpoint)
if err != nil {
return nil, nil, err
}
serverAddr, err := net.ResolveTCPAddr(configTrans, config.NetAddr.Endpoint)
if err != nil {
return nil, nil, err
}
return server, serverAddr, nil
}

return nil, fmt.Errorf("unsupported transport %q", string(config.NetAddr.Transport))
return nil, nil, fmt.Errorf("unsupported transport %q", string(config.NetAddr.Transport))
}

// Start starts a UDP server that can process StatsD messages.
func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error {
ctx, r.cancel = context.WithCancel(ctx)
server, err := buildTransportServer(*r.config)
server, serverAddr, err := buildTransportServer(*r.config)
if err != nil {
return err
}
Expand Down Expand Up @@ -116,7 +133,12 @@ func (r *statsdReceiver) Start(ctx context.Context, _ component.Host) error {
}
}
case metric := <-transferChan:
if err := r.parser.Aggregate(metric.Raw, metric.Addr); err != nil {
// Aggregate by server address. If enabled use source address indead.
aggregateAddr := serverAddr
if r.config.AggregateBySourceAddr {
aggregateAddr = metric.Addr
}
if err := r.parser.Aggregate(metric.Raw, aggregateAddr); err != nil {
r.reporter.OnDebugf("Error aggregating metric", zap.Error(err))
}
case <-ctx.Done():
Expand Down
122 changes: 122 additions & 0 deletions receiver/statsdreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package statsdreceiver
import (
"context"
"errors"
"net"
"testing"
"time"

Expand Down Expand Up @@ -163,3 +164,124 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) {
})
}
}

func Test_statsdreceiver_resource_attribute_source(t *testing.T) {
serverAddr := testutil.GetAvailableLocalNetworkAddress(t, "udp")
firstStatsdClient, err := client.NewStatsD("udp", serverAddr)
require.NoError(t, err)
secondStatsdClient, err := client.NewStatsD("udp", serverAddr)
require.NoError(t, err)
t.Run("aggregate by source address with two clients sending", func(t *testing.T) {
cfg := &Config{
NetAddr: confignet.AddrConfig{
Endpoint: serverAddr,
Transport: confignet.TransportTypeUDP,
},
AggregationInterval: 4 * time.Second,
AggregateBySourceAddr: true,
}
sink := new(consumertest.MetricsSink)
rcv, err := newReceiver(receivertest.NewNopCreateSettings(), *cfg, sink)
require.NoError(t, err)
r := rcv.(*statsdReceiver)

mr := transport.NewMockReporter(1)
r.reporter = mr

require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, r.Shutdown(context.Background()))
}()

statsdMetric := client.Metric{
Name: "test.metric",
Value: "42",
Type: "c",
}
err = firstStatsdClient.SendMetric(statsdMetric)
require.NoError(t, err)
err = secondStatsdClient.SendMetric(statsdMetric)
require.NoError(t, err)

// Wait for aggregation interval
time.Sleep(5 * time.Second)

// We should have two resources which one metric each
mdd := sink.AllMetrics()
require.Len(t, mdd, 2)
require.Equal(t, 1, mdd[0].ResourceMetrics().Len())
require.Equal(t, 1, mdd[1].ResourceMetrics().Len())

// The resources should have source attributes matching the client addr
firstResource := mdd[0].ResourceMetrics().At(0)
sourceAddress, exists := firstResource.Resource().Attributes().Get("source.address")
assert.Equal(t, true, exists)
sourcePort, exists := firstResource.Resource().Attributes().Get("source.port")
assert.Equal(t, true, exists)
clientAddress, clientPort, err := net.SplitHostPort(firstStatsdClient.Conn.LocalAddr().String())
require.NoError(t, err)
assert.Equal(t, clientAddress, sourceAddress.AsString())
assert.Equal(t, clientPort, sourcePort.AsString())

secondResource := mdd[1].ResourceMetrics().At(0)
sourceAddress, exists = secondResource.Resource().Attributes().Get("source.address")
assert.Equal(t, true, exists)
sourcePort, exists = secondResource.Resource().Attributes().Get("source.port")
assert.Equal(t, true, exists)
clientAddress, clientPort, err = net.SplitHostPort(secondStatsdClient.Conn.LocalAddr().String())
require.NoError(t, err)
assert.Equal(t, clientAddress, sourceAddress.AsString())
assert.Equal(t, clientPort, sourcePort.AsString())
})
t.Run("do not aggregate by source address with two clients sending", func(t *testing.T) {
cfg := &Config{
NetAddr: confignet.AddrConfig{
Endpoint: serverAddr,
Transport: confignet.TransportTypeUDP,
},
AggregationInterval: 4 * time.Second,
AggregateBySourceAddr: false,
}
sink := new(consumertest.MetricsSink)
rcv, err := newReceiver(receivertest.NewNopCreateSettings(), *cfg, sink)
require.NoError(t, err)
r := rcv.(*statsdReceiver)

mr := transport.NewMockReporter(1)
r.reporter = mr

require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, r.Shutdown(context.Background()))
}()

statsdMetric := client.Metric{
Name: "test.metric",
Value: "42",
Type: "c",
}
err = firstStatsdClient.SendMetric(statsdMetric)
require.NoError(t, err)
err = secondStatsdClient.SendMetric(statsdMetric)
require.NoError(t, err)

// Wait for aggregation interval
time.Sleep(5 * time.Second)

// We should have one resource with one metric due to disabled aggregation by source address
mdd := sink.AllMetrics()
require.Len(t, mdd, 1)
require.Equal(t, 1, mdd[0].ResourceMetrics().Len())

// The resources should have source attribute matching the servers address
resource := mdd[0].ResourceMetrics().At(0)
sourceAddress, exists := resource.Resource().Attributes().Get("source.address")
assert.Equal(t, true, exists)
sourcePort, exists := resource.Resource().Attributes().Get("source.port")
assert.Equal(t, true, exists)
serverAddr, serverPort, err := net.SplitHostPort(serverAddr)
require.NoError(t, err)
assert.Equal(t, serverAddr, sourceAddress.AsString())
assert.Equal(t, serverPort, sourcePort.AsString())
})
}
1 change: 1 addition & 0 deletions receiver/statsdreceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ statsd/receiver_settings:
endpoint: "localhost:12345"
transport: "udp6"
aggregation_interval: 70s
aggregate_by_source_address: true
enable_metric_type: false
timer_histogram_mapping:
- statsd_type: "histogram"
Expand Down
Loading