diff --git a/pkg/otelcollector/metricsprocessor/internal/check_response_labels.go b/pkg/otelcollector/metricsprocessor/internal/check_response_labels.go index 2dece581de..f56aaef387 100644 --- a/pkg/otelcollector/metricsprocessor/internal/check_response_labels.go +++ b/pkg/otelcollector/metricsprocessor/internal/check_response_labels.go @@ -112,10 +112,6 @@ func AddCheckResponseBasedLabels(attributes pcommon.Map, checkResponse *flowcont labels[otelcollector.ApertureFlowLabelKeysLabel].Slice().AppendEmpty().SetStr(flowLabelKey) } - for key, value := range checkResponse.GetTelemetryFlowLabels() { - pcommon.NewValueStr(value).CopyTo(attributes.PutEmpty(key)) - } - for _, classifier := range checkResponse.ClassifierInfos { rawValue := []string{ fmt.Sprintf("%s:%v", metrics.PolicyNameLabel, classifier.PolicyName), @@ -141,3 +137,10 @@ func AddCheckResponseBasedLabels(attributes pcommon.Map, checkResponse *flowcont value.CopyTo(attributes.PutEmpty(key)) } } + +// AddFlowLabels adds dynamic from labels. +func AddFlowLabels(attributes pcommon.Map, checkResponse *flowcontrolv1.CheckResponse) { + for key, value := range checkResponse.GetTelemetryFlowLabels() { + pcommon.NewValueStr(value).CopyTo(attributes.PutEmpty(key)) + } +} diff --git a/pkg/otelcollector/metricsprocessor/internal/check_response_labels_test.go b/pkg/otelcollector/metricsprocessor/internal/check_response_labels_test.go index ca4864ffd0..e242db9668 100644 --- a/pkg/otelcollector/metricsprocessor/internal/check_response_labels_test.go +++ b/pkg/otelcollector/metricsprocessor/internal/check_response_labels_test.go @@ -107,19 +107,6 @@ var _ = DescribeTable("Check Response labels", func(checkResponse *flowcontrolv1 map[string]interface{}{otelcollector.ApertureFlowLabelKeysLabel: []interface{}{"someLabel", "otherLabel"}}, ), - Entry("Sets telemetry flow labels", - &flowcontrolv1.CheckResponse{ - TelemetryFlowLabels: map[string]string{ - "someLabel": "someValue", - "otherLabel": "otherValue", - }, - }, - map[string]interface{}{ - "someLabel": "someValue", - "otherLabel": "otherValue", - }, - ), - Entry("Sets classifiers", &flowcontrolv1.CheckResponse{ ClassifierInfos: []*flowcontrolv1.ClassifierInfo{ @@ -138,3 +125,16 @@ var _ = DescribeTable("Check Response labels", func(checkResponse *flowcontrolv1 }, ), ) + +var _ = Describe("AddFlowLabels", func() { + attributes := pcommon.NewMap() + checkResponse := &flowcontrolv1.CheckResponse{ + TelemetryFlowLabels: map[string]string{ + "someLabel": "someValue", + "otherLabel": "otherValue", + }, + } + internal.AddFlowLabels(attributes, checkResponse) + Expect(attributes.AsRaw()).To(HaveKeyWithValue("someLabel", "someValue")) + Expect(attributes.AsRaw()).To(HaveKeyWithValue("otherLabel", "otherValue")) +}) diff --git a/pkg/otelcollector/metricsprocessor/processor.go b/pkg/otelcollector/metricsprocessor/processor.go index 3d0f052c1a..9ca85a0414 100644 --- a/pkg/otelcollector/metricsprocessor/processor.go +++ b/pkg/otelcollector/metricsprocessor/processor.go @@ -98,6 +98,9 @@ func (p *metricsProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) (plog. p.updateMetrics(attributes, checkResponse, []string{otelcollector.EnvoyMissingAttributeValue}) internal.EnforceIncludeListHTTP(attributes) } + + // This needs to be called **after** internal.EnforceIncludeList{HTTP,SDK}. + internal.AddFlowLabels(attributes, checkResponse) return nil }) return ld, err diff --git a/pkg/otelcollector/metricsprocessor/processor_test.go b/pkg/otelcollector/metricsprocessor/processor_test.go index f3f2f5088b..9bacc79d53 100644 --- a/pkg/otelcollector/metricsprocessor/processor_test.go +++ b/pkg/otelcollector/metricsprocessor/processor_test.go @@ -191,6 +191,7 @@ var _ = Describe("Metrics Processor", func() { }} baseCheckResp.FluxMeterInfos = []*flowcontrolv1.FluxMeterInfo{{FluxMeterName: "bar"}} baseCheckResp.FlowLabelKeys = []string{"someLabel"} + baseCheckResp.TelemetryFlowLabels = map[string]string{"flowLabelKey": "flowLabelValue"} baseCheckResp.Services = []string{"svc1", "svc2"} // is a workaround until PR https://github.com/prometheus/client_golang/pull/1143 is released @@ -228,6 +229,7 @@ workload_latency_ms_count{component_index="1",decision_type="DECISION_TYPE_REJEC oc.ApertureProcessingDurationLabel: float64(1000), oc.ApertureServicesLabel: []interface{}{"svc1", "svc2"}, oc.ApertureControlPointLabel: "type:TYPE_INGRESS", + "flowLabelKey": "flowLabelValue", } source = oc.ApertureSourceEnvoy