Skip to content

Commit

Permalink
feat(statsdreceiver): use source attributes semantic
Browse files Browse the repository at this point in the history
  • Loading branch information
Manuelraa committed May 13, 2024
1 parent be5b205 commit 0085017
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 13 deletions.
1 change: 1 addition & 0 deletions receiver/statsdreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestLoadConfig(t *testing.T) {
Transport: confignet.TransportTypeUDP6,
},
AggregationInterval: 70 * time.Second,
AggregateBySourceAddr: true,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{
StatsdType: "histogram",
Expand Down
11 changes: 10 additions & 1 deletion receiver/statsdreceiver/internal/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,16 @@ func (p *StatsDParser) GetMetrics() []BatchMetrics {
Metrics: pmetric.NewMetrics(),
}
rm := batch.Metrics.ResourceMetrics().AppendEmpty()
rm.Resource().Attributes().PutStr("source", instrument.addr.String())

// https://opentelemetry.io/docs/specs/semconv/attributes-registry/source/
// Set source.address and source.port attributes if able to parse.
if instrument.addr != nil {
host, port, err := net.SplitHostPort(instrument.addr.String())
if err == nil {
rm.Resource().Attributes().PutStr("source.address", host)
rm.Resource().Attributes().PutStr("source.port", port)
}
}
for _, metric := range instrument.gauges {
p.copyMetricAndScope(rm, metric)
}
Expand Down
11 changes: 9 additions & 2 deletions receiver/statsdreceiver/internal/protocol/statsd_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package protocol

import (
"errors"
"fmt"
"net"
"testing"
"time"
Expand Down Expand Up @@ -1690,9 +1691,15 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) {
},
}

sourceAddr := "1.2.3.4"
sourcePort := "5678"

newPoint := func() (pmetric.Metrics, pmetric.ExponentialHistogramDataPoint) {
data := pmetric.NewMetrics()
ilm := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
rm := data.ResourceMetrics().AppendEmpty()
rm.Resource().Attributes().PutStr("source.address", sourceAddr)
rm.Resource().Attributes().PutStr("source.port", sourcePort)
ilm := rm.ScopeMetrics().AppendEmpty()
m := ilm.Metrics().AppendEmpty()
m.SetName("expohisto")
ep := m.SetEmptyExponentialHistogram()
Expand Down Expand Up @@ -1903,7 +1910,7 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) {
var err error
p := &StatsDParser{}
assert.NoError(t, p.Initialize(false, false, false, tt.mapping))
addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678")
addr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", sourceAddr, sourcePort))
for _, line := range tt.input {
err = p.Aggregate(line, addr)
assert.NoError(t, err)
Expand Down
8 changes: 6 additions & 2 deletions receiver/statsdreceiver/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ status:
active: [jmacd, dmitryax]

resource_attributes:
source:
description: Source address of the data
source.address:
description: Source address
type: string
enabled: true
source.port:
description: Source port number
type: int
enabled: true
33 changes: 25 additions & 8 deletions receiver/statsdreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package statsdreceiver
import (
"context"
"errors"
"net"
"testing"
"time"

Expand Down Expand Up @@ -211,15 +212,26 @@ func Test_statsdreceiver_resource_attribute_source(t *testing.T) {
require.Equal(t, 1, mdd[0].ResourceMetrics().Len())
require.Equal(t, 1, mdd[1].ResourceMetrics().Len())

// The resources should have source attribute matching the client addr
// The resources should have source attributes matching the client addr
firstResource := mdd[0].ResourceMetrics().At(0)
resourceAttributeSource, exists := firstResource.Resource().Attributes().Get("source")
sourceAddress, exists := firstResource.Resource().Attributes().Get("source.address")
assert.Equal(t, true, exists)
assert.Equal(t, firstStatsdClient.Conn.LocalAddr().String(), resourceAttributeSource.AsString())
sourcePort, exists := firstResource.Resource().Attributes().Get("source.port")
assert.Equal(t, true, exists)
clientAddress, clientPort, err := net.SplitHostPort(firstStatsdClient.Conn.LocalAddr().String())
require.NoError(t, err)
assert.Equal(t, clientAddress, sourceAddress.AsString())
assert.Equal(t, clientPort, sourcePort.AsString())

secondResource := mdd[1].ResourceMetrics().At(0)
resourceAttributeSource, exists = secondResource.Resource().Attributes().Get("source")
sourceAddress, exists = secondResource.Resource().Attributes().Get("source.address")
assert.Equal(t, true, exists)
sourcePort, exists = secondResource.Resource().Attributes().Get("source.port")
assert.Equal(t, true, exists)
assert.Equal(t, secondStatsdClient.Conn.LocalAddr().String(), resourceAttributeSource.AsString())
clientAddress, clientPort, err = net.SplitHostPort(secondStatsdClient.Conn.LocalAddr().String())
require.NoError(t, err)
assert.Equal(t, clientAddress, sourceAddress.AsString())
assert.Equal(t, clientPort, sourcePort.AsString())
})
t.Run("do not aggregate by source address with two clients sending", func(t *testing.T) {
cfg := &Config{
Expand Down Expand Up @@ -262,9 +274,14 @@ func Test_statsdreceiver_resource_attribute_source(t *testing.T) {
require.Equal(t, 1, mdd[0].ResourceMetrics().Len())

// The resources should have source attribute matching the servers address
firstResource := mdd[0].ResourceMetrics().At(0)
resourceAttributeSource, exists := firstResource.Resource().Attributes().Get("source")
resource := mdd[0].ResourceMetrics().At(0)
sourceAddress, exists := resource.Resource().Attributes().Get("source.address")
assert.Equal(t, true, exists)
assert.Equal(t, serverAddr, resourceAttributeSource.AsString())
sourcePort, exists := resource.Resource().Attributes().Get("source.port")
assert.Equal(t, true, exists)
serverAddr, serverPort, err := net.SplitHostPort(serverAddr)
require.NoError(t, err)
assert.Equal(t, serverAddr, sourceAddress.AsString())
assert.Equal(t, serverPort, sourcePort.AsString())
})
}

0 comments on commit 0085017

Please sign in to comment.