Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spanmetrics] Add events_total metric to get the measurement for list of configured event attributes for a span #27811

Merged
merged 19 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/events-metric-to-span-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetricsconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add Events metric to span metrics connector that adds list of event attributes as dimensions

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27451]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
10 changes: 9 additions & 1 deletion connector/spanmetricsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ The following settings can be optionally configured:
- `metrics_flush_interval` (default: `15s`): Defines the flush interval of the generated metrics.
- `exemplars`: Use to configure how to attach exemplars to histograms
- `enabled` (default: `false`): enabling will add spans as Exemplars.
- `events`: Use to configure the events metric.
- `enabled`: (default: `false`): enabling will add the events metric.
- `dimensions`: (mandatory if `enabled`) the list of the span's event attributes to add as dimensions to the events metric, which will be included _on top of_ the common and configured `dimensions` for span and resource attributes.

## Examples

Expand Down Expand Up @@ -132,7 +135,12 @@ connectors:
exclude_dimensions: ['status.code']
dimensions_cache_size: 1000
aggregation_temporality: "AGGREGATION_TEMPORALITY_CUMULATIVE"
metrics_flush_interval: 15s
metrics_flush_interval: 15s
events:
enabled: true
dimensions:
- name: exception.type
- name: exception.message

service:
pipelines:
Expand Down
30 changes: 27 additions & 3 deletions connector/spanmetricsconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type Config struct {

// Exemplars defines the configuration for exemplars.
Exemplars ExemplarsConfig `mapstructure:"exemplars"`

// Events defines the configuration for events section of spans.
Events EventsConfig `mapstructure:"events"`
}

type HistogramConfig struct {
Expand All @@ -80,13 +83,22 @@ type ExplicitHistogramConfig struct {
Buckets []time.Duration `mapstructure:"buckets"`
}

type EventsConfig struct {
// Enabled is a flag to enable events.
Enabled bool `mapstructure:"enabled"`
// Dimensions defines the list of dimensions to add to the events metric.
Dimensions []Dimension `mapstructure:"dimensions"`
aishyandapalli marked this conversation as resolved.
Show resolved Hide resolved
}

var _ component.ConfigValidator = (*Config)(nil)

// Validate checks if the processor configuration is valid
func (c Config) Validate() error {
err := validateDimensions(c.Dimensions)
if err != nil {
return err
if err := validateDimensions(c.Dimensions); err != nil {
return fmt.Errorf("failed validating dimensions: %w", err)
}
if err := validateEventDimensions(c.Events.Enabled, c.Events.Dimensions); err != nil {
return fmt.Errorf("failed validating event dimensions: %w", err)
}

if c.DimensionsCacheSize <= 0 {
Expand All @@ -99,6 +111,7 @@ func (c Config) Validate() error {
if c.Histogram.Explicit != nil && c.Histogram.Exponential != nil {
return errors.New("use either `explicit` or `exponential` buckets histogram")
}

return nil
}

Expand Down Expand Up @@ -127,3 +140,14 @@ func validateDimensions(dimensions []Dimension) error {

return nil
}

// validateEventDimensions checks for empty and duplicates for the dimensions configured.
func validateEventDimensions(enabled bool, dimensions []Dimension) error {
if !enabled {
return nil
}
if len(dimensions) == 0 {
return fmt.Errorf("no dimensions configured for events")
}
return validateDimensions(dimensions)
}
44 changes: 44 additions & 0 deletions connector/spanmetricsconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,47 @@ func TestValidateDimensions(t *testing.T) {
})
}
}

func TestValidateEventDimensions(t *testing.T) {
for _, tc := range []struct {
enabled bool
name string
dimensions []Dimension
expectedErr string
}{
{
enabled: false,
name: "disabled - no additional dimensions",
dimensions: []Dimension{},
},
{
enabled: true,
name: "enabled - no additional dimensions",
dimensions: []Dimension{},
expectedErr: "no dimensions configured for events",
},
{
enabled: true,
name: "enabled - no duplicate dimensions",
dimensions: []Dimension{{Name: "exception_type"}},
},
{
enabled: true,
name: "enabled - duplicate dimensions",
dimensions: []Dimension{
{Name: "exception_type"},
{Name: "exception_type"},
},
expectedErr: "duplicate dimension name exception_type",
},
} {
t.Run(tc.name, func(t *testing.T) {
err := validateEventDimensions(tc.enabled, tc.dimensions)
if tc.expectedErr != "" {
assert.EqualError(t, err, tc.expectedErr)
} else {
assert.NoError(t, err)
}
})
}
}
55 changes: 48 additions & 7 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (

metricNameDuration = "duration"
metricNameCalls = "calls"
metricNameEvents = "events"

defaultUnit = metrics.Milliseconds
)
Expand Down Expand Up @@ -66,11 +67,17 @@ type connectorImp struct {
started bool

shutdownOnce sync.Once

// Event dimensions to add to the events metric.
eDimensions []dimension

events EventsConfig
}

type resourceMetrics struct {
histograms metrics.HistogramMetrics
sums metrics.SumMetrics
events metrics.SumMetrics
attributes pcommon.Map
}

Expand Down Expand Up @@ -113,6 +120,8 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
metricKeyToDimensions: metricKeyToDimensionsCache,
ticker: ticker,
done: make(chan struct{}),
eDimensions: newDimensions(cfg.Events.Dimensions),
events: cfg.Events,
}, nil
}

Expand Down Expand Up @@ -245,6 +254,13 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
metric.SetUnit(p.config.Histogram.Unit.String())
histograms.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality())
}

events := rawMetrics.events
if p.events.Enabled {
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameEvents))
events.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality())
}
}

return m
Expand Down Expand Up @@ -288,6 +304,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
rm := p.getOrCreateResourceMetrics(resourceAttr)
sums := rm.sums
histograms := rm.histograms
events := rm.events

unitDivider := unitDivider(p.config.Histogram.Unit)
serviceName := serviceAttr.Str()
Expand All @@ -308,7 +325,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {

attributes, ok := p.metricKeyToDimensions.Get(key)
if !ok {
attributes = p.buildAttributes(serviceName, span, resourceAttr)
attributes = p.buildAttributes(serviceName, span, resourceAttr, p.dimensions)
p.metricKeyToDimensions.Add(key, attributes)
}
if !p.config.Histogram.Disable {
Expand All @@ -321,6 +338,29 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
// aggregate sums metrics
s := sums.GetOrCreate(key, attributes)
s.Add(1)

// aggregate events metrics
if p.events.Enabled {
aishyandapalli marked this conversation as resolved.
Show resolved Hide resolved
for l := 0; l < span.Events().Len(); l++ {
event := span.Events().At(l)
eDimensions := p.dimensions
eDimensions = append(eDimensions, p.eDimensions...)

rscAndEventAttrs := pcommon.NewMap()
rscAndEventAttrs.EnsureCapacity(resourceAttr.Len() + event.Attributes().Len())
resourceAttr.CopyTo(rscAndEventAttrs)
event.Attributes().CopyTo(rscAndEventAttrs)

eKey := p.buildKey(serviceName, span, eDimensions, rscAndEventAttrs)
eAttributes, ok := p.metricKeyToDimensions.Get(eKey)
if !ok {
eAttributes = p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions)
p.metricKeyToDimensions.Add(eKey, eAttributes)
}
e := events.GetOrCreate(eKey, eAttributes)
e.Add(1)
}
}
}
}
}
Expand All @@ -346,6 +386,7 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMet
v = &resourceMetrics{
histograms: initHistogramMetrics(p.config),
sums: metrics.NewSumMetrics(),
events: metrics.NewSumMetrics(),
attributes: attr,
}
p.resourceMetrics[key] = v
Expand All @@ -363,9 +404,9 @@ func contains(elements []string, value string) bool {
return false
}

func (p *connectorImp) buildAttributes(serviceName string, span ptrace.Span, resourceAttrs pcommon.Map) pcommon.Map {
func (p *connectorImp) buildAttributes(serviceName string, span ptrace.Span, resourceAttrs pcommon.Map, dimensions []dimension) pcommon.Map {
attr := pcommon.NewMap()
attr.EnsureCapacity(4 + len(p.dimensions))
attr.EnsureCapacity(4 + len(dimensions))
if !contains(p.config.ExcludeDimensions, serviceNameKey) {
attr.PutStr(serviceNameKey, serviceName)
}
Expand All @@ -378,7 +419,7 @@ func (p *connectorImp) buildAttributes(serviceName string, span ptrace.Span, res
if !contains(p.config.ExcludeDimensions, statusCodeKey) {
attr.PutStr(statusCodeKey, traceutil.StatusCodeStr(span.Status().Code()))
}
for _, d := range p.dimensions {
for _, d := range dimensions {
if v, ok := getDimensionValue(d, span.Attributes(), resourceAttrs); ok {
v.CopyTo(attr.PutEmpty(d.name))
}
Expand All @@ -395,10 +436,10 @@ func concatDimensionValue(dest *bytes.Buffer, value string, prefixSep bool) {

// buildKey builds the metric key from the service name and span metadata such as name, kind, status_code and
// will attempt to add any additional dimensions the user has configured that match the span's attributes
// or resource attributes. If the dimension exists in both, the span's attributes, being the most specific, takes precedence.
// or resource/event attributes. If the dimension exists in both, the span's attributes, being the most specific, takes precedence.
//
// The metric key is a simple concatenation of dimension values, delimited by a null character.
func (p *connectorImp) buildKey(serviceName string, span ptrace.Span, optionalDims []dimension, resourceAttrs pcommon.Map) metrics.Key {
func (p *connectorImp) buildKey(serviceName string, span ptrace.Span, optionalDims []dimension, resourceOrEventAttrs pcommon.Map) metrics.Key {
p.keyBuf.Reset()
if !contains(p.config.ExcludeDimensions, serviceNameKey) {
concatDimensionValue(p.keyBuf, serviceName, false)
Expand All @@ -414,7 +455,7 @@ func (p *connectorImp) buildKey(serviceName string, span ptrace.Span, optionalDi
}

for _, d := range optionalDims {
if v, ok := getDimensionValue(d, span.Attributes(), resourceAttrs); ok {
if v, ok := getDimensionValue(d, span.Attributes(), resourceOrEventAttrs); ok {
concatDimensionValue(p.keyBuf, v.AsString(), true)
}
}
Expand Down
59 changes: 59 additions & 0 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
notInSpanAttrName0 = "shouldBeInMetric"
notInSpanAttrName1 = "shouldNotBeInMetric"
regionResourceAttrName = "region"
exceptionTypeAttrName = "exception.type"
DimensionsCacheSize = 2

sampleRegion = "us-east-1"
Expand Down Expand Up @@ -394,6 +395,10 @@ func initSpan(span span, s ptrace.Span) {
s.Attributes().PutEmptySlice(arrayAttrName)
s.SetTraceID(pcommon.TraceID(span.traceID))
s.SetSpanID(pcommon.SpanID(span.spanID))

e := s.Events().AppendEmpty()
e.SetName("exception")
e.Attributes().PutStr(exceptionTypeAttrName, "NullPointerException")
}

func disabledExemplarsConfig() ExemplarsConfig {
Expand Down Expand Up @@ -1259,3 +1264,57 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
})
}
}

func TestSpanMetrics_Events(t *testing.T) {
tests := []struct {
name string
eventsConfig EventsConfig
shouldEventsMetricExist bool
}{
{
name: "events disabled",
eventsConfig: EventsConfig{Enabled: false, Dimensions: []Dimension{{Name: "exception.type", Default: stringp("NullPointerException")}}},
shouldEventsMetricExist: false,
},
{
name: "events enabled",
eventsConfig: EventsConfig{Enabled: true, Dimensions: []Dimension{{Name: "exception.type", Default: stringp("NullPointerException")}}},
shouldEventsMetricExist: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Events = tt.eventsConfig
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
require.NoError(t, err)
err = c.ConsumeTraces(context.Background(), buildSampleTrace())
require.NoError(t, err)
metrics := c.buildMetrics()
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
ism := rm.ScopeMetrics()
for ilmC := 0; ilmC < ism.Len(); ilmC++ {
m := ism.At(ilmC).Metrics()
if !tt.shouldEventsMetricExist {
assert.Equal(t, 2, m.Len())
continue
}
assert.Equal(t, 3, m.Len())
for mC := 0; mC < m.Len(); mC++ {
metric := m.At(mC)
if metric.Name() != "events" {
continue
}
assert.Equal(t, pmetric.MetricTypeSum, metric.Type())
for idp := 0; idp < metric.Sum().DataPoints().Len(); idp++ {
attrs := metric.Sum().DataPoints().At(idp).Attributes()
assert.Contains(t, attrs.AsRaw(), exceptionTypeAttrName)
}
}
}
}
})
}
}