diff --git a/.chloggen/elasticsearchexporter_optimized-json-encoding.yaml b/.chloggen/elasticsearchexporter_optimized-json-encoding.yaml new file mode 100644 index 0000000000000..440ac12a5ce78 --- /dev/null +++ b/.chloggen/elasticsearchexporter_optimized-json-encoding.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: More efficient JSON encoding for OTel mode + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37032] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Increases throughput for metrics by 2x and for logs and traces by 3x + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index ebd3800858a2b..27db606c69632 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -4,7 +4,6 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( - "bytes" "context" "errors" "fmt" @@ -20,7 +19,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool" ) type elasticsearchExporter struct { @@ -36,6 +35,8 @@ type elasticsearchExporter struct { wg sync.WaitGroup // active sessions bulkIndexer bulkIndexer + + bufferPool *pool.BufferPool } func newExporter( @@ -69,6 +70,7 @@ func newExporter( model: model, logstashFormat: cfg.LogstashFormat, otel: otel, + bufferPool: pool.NewBufferPool(), } } @@ -173,11 +175,14 @@ func (e *elasticsearchExporter) pushLogRecord( fIndex = formattedIndex } - document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL) + buf := e.bufferPool.NewPooledBuffer() + err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, buf.Buffer) if err != nil { + buf.Recycle() return fmt.Errorf("failed to encode log event: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil) + // not recycling after Add returns an error as we don't know if it's already recycled + return bulkIndexerSession.Add(ctx, fIndex, buf, nil) } func (e *elasticsearchExporter) pushMetricsData( @@ -193,21 +198,18 @@ func (e *elasticsearchExporter) pushMetricsData( } defer session.End() - var ( - validationErrs []error // log instead of returning these so that upstream does not retry - errs []error - ) + var errs []error resourceMetrics := metrics.ResourceMetrics() for i := 0; i < resourceMetrics.Len(); i++ { resourceMetric := resourceMetrics.At(i) resource := resourceMetric.Resource() scopeMetrics := resourceMetric.ScopeMetrics() - resourceDocs := make(map[string]map[uint32]objmodel.Document) - for j := 0; j < scopeMetrics.Len(); j++ { + var validationErrs []error // log instead of returning these so that upstream does not retry scopeMetrics := scopeMetrics.At(j) scope := scopeMetrics.Scope() + groupedDataPointsByIndex := make(map[string]map[uint32][]dataPoint) for k := 0; k < scopeMetrics.Metrics().Len(); k++ { metric := scopeMetrics.Metrics().At(k) @@ -216,13 +218,17 @@ func (e *elasticsearchExporter) pushMetricsData( if err != nil { return err } - if _, ok := resourceDocs[fIndex]; !ok { - resourceDocs[fIndex] = make(map[uint32]objmodel.Document) + groupedDataPoints, ok := groupedDataPointsByIndex[fIndex] + if !ok { + groupedDataPoints = make(map[uint32][]dataPoint) + groupedDataPointsByIndex[fIndex] = groupedDataPoints } - - if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource, - resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), metric, dp); err != nil { - return err + dpHash := e.model.hashDataPoint(dp) + dataPoints, ok := groupedDataPoints[dpHash] + if !ok { + groupedDataPoints[dpHash] = []dataPoint{dp} + } else { + groupedDataPoints[dpHash] = append(dataPoints, dp) } return nil } @@ -232,7 +238,7 @@ func (e *elasticsearchExporter) pushMetricsData( dps := metric.Sum().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) - if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil { + if err := upsertDataPoint(newNumberDataPoint(metric, dp)); err != nil { validationErrs = append(validationErrs, err) continue } @@ -241,7 +247,7 @@ func (e *elasticsearchExporter) pushMetricsData( dps := metric.Gauge().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) - if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil { + if err := upsertDataPoint(newNumberDataPoint(metric, dp)); err != nil { validationErrs = append(validationErrs, err) continue } @@ -254,7 +260,7 @@ func (e *elasticsearchExporter) pushMetricsData( dps := metric.ExponentialHistogram().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) - if err := upsertDataPoint(newExponentialHistogramDataPoint(dp)); err != nil { + if err := upsertDataPoint(newExponentialHistogramDataPoint(metric, dp)); err != nil { validationErrs = append(validationErrs, err) continue } @@ -267,7 +273,7 @@ func (e *elasticsearchExporter) pushMetricsData( dps := metric.Histogram().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) - if err := upsertDataPoint(newHistogramDataPoint(dp)); err != nil { + if err := upsertDataPoint(newHistogramDataPoint(metric, dp)); err != nil { validationErrs = append(validationErrs, err) continue } @@ -276,37 +282,35 @@ func (e *elasticsearchExporter) pushMetricsData( dps := metric.Summary().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) - if err := upsertDataPoint(newSummaryDataPoint(dp)); err != nil { + if err := upsertDataPoint(newSummaryDataPoint(metric, dp)); err != nil { validationErrs = append(validationErrs, err) continue } } } } - } - if len(validationErrs) > 0 { - e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...))) - } - - for fIndex, docs := range resourceDocs { - for _, doc := range docs { - var ( - docBytes []byte - err error - ) - docBytes, err = e.model.encodeDocument(doc) - if err != nil { - errs = append(errs, err) - continue - } - if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates()); err != nil { - if cerr := ctx.Err(); cerr != nil { - return cerr + for fIndex, groupedDataPoints := range groupedDataPointsByIndex { + for _, dataPoints := range groupedDataPoints { + buf := e.bufferPool.NewPooledBuffer() + dynamicTemplates, err := e.model.encodeMetrics(resource, resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), dataPoints, &validationErrs, buf.Buffer) + if err != nil { + buf.Recycle() + errs = append(errs, err) + continue + } + if err := session.Add(ctx, fIndex, buf, dynamicTemplates); err != nil { + // not recycling after Add returns an error as we don't know if it's already recycled + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) } - errs = append(errs, err) } } + if len(validationErrs) > 0 { + e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...))) + } } } @@ -411,11 +415,14 @@ func (e *elasticsearchExporter) pushTraceRecord( fIndex = formattedIndex } - document, err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL) + buf := e.bufferPool.NewPooledBuffer() + err := e.model.encodeSpan(resource, resourceSchemaURL, span, scope, scopeSchemaURL, buf.Buffer) if err != nil { + buf.Recycle() return fmt.Errorf("failed to encode trace record: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil) + // not recycling after Add returns an error as we don't know if it's already recycled + return bulkIndexerSession.Add(ctx, fIndex, buf, nil) } func (e *elasticsearchExporter) pushSpanEvent( @@ -440,14 +447,12 @@ func (e *elasticsearchExporter) pushSpanEvent( } fIndex = formattedIndex } - - document := e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL) - if document == nil { + buf := e.bufferPool.NewPooledBuffer() + e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, buf.Buffer) + if buf.Buffer.Len() == 0 { + buf.Recycle() return nil } - docBytes, err := e.model.encodeDocument(*document) - if err != nil { - return err - } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil) + // not recycling after Add returns an error as we don't know if it's already recycled + return bulkIndexerSession.Add(ctx, fIndex, buf, nil) } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 33a0cf6d13838..6125988ea4631 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -427,7 +427,7 @@ func TestExporterLogs(t *testing.T) { body: func() pcommon.Value { return pcommon.NewValueStr("foo") }(), - wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"text":"foo"}}`), + wantDocument: []byte(`{"@timestamp":"0.0","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"observed_timestamp":"0.0","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"}},"scope":{},"body":{"text":"foo"}}`), }, { body: func() pcommon.Value { @@ -438,7 +438,7 @@ func TestExporterLogs(t *testing.T) { m.PutEmptyMap("inner").PutStr("foo", "bar") return vm }(), - wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"flattened":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`), + wantDocument: []byte(`{"@timestamp":"0.0","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"observed_timestamp":"0.0","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"}},"scope":{},"body":{"flattened":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`), }, { body: func() pcommon.Value { @@ -450,7 +450,7 @@ func TestExporterLogs(t *testing.T) { return vm }(), isEvent: true, - wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"event_name":"foo","data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`), + wantDocument: []byte(`{"@timestamp":"0.0","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"event_name":"foo","data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"observed_timestamp":"0.0","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"}},"scope":{},"body":{"structured":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`), }, { body: func() pcommon.Value { @@ -461,7 +461,7 @@ func TestExporterLogs(t *testing.T) { s.AppendEmpty().SetEmptyMap().PutStr("foo", "bar") return vs }(), - wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"flattened":{"value":["foo",false,{"foo":"bar"}]}}}`), + wantDocument: []byte(`{"@timestamp":"0.0","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"observed_timestamp":"0.0","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"}},"scope":{},"body":{"flattened":{"value":["foo",false,{"foo":"bar"}]}}}`), }, { body: func() pcommon.Value { @@ -473,7 +473,7 @@ func TestExporterLogs(t *testing.T) { return vs }(), isEvent: true, - wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"event_name":"foo","data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"value":["foo",false,{"foo":"bar"}]}}}`), + wantDocument: []byte(`{"@timestamp":"0.0","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"event_name":"foo","data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"observed_timestamp":"0.0","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"}},"scope":{},"body":{"structured":{"value":["foo",false,{"foo":"bar"}]}}}`), }, } { rec := newBulkRecorder() @@ -734,6 +734,36 @@ func TestExporterLogs(t *testing.T) { assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw) assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw) }) + + t.Run("otel mode attribute complex value", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + }) + + logs := plog.NewLogs() + resourceLog := logs.ResourceLogs().AppendEmpty() + resourceLog.Resource().Attributes().PutEmptyMap("some.resource.attribute").PutEmptyMap("foo.bar").PutStr("baz", "qux") + scopeLog := resourceLog.ScopeLogs().AppendEmpty() + scopeLog.Scope().Attributes().PutEmptyMap("some.scope.attribute").PutEmptyMap("foo.bar").PutStr("baz", "qux") + logRecord := scopeLog.LogRecords().AppendEmpty() + logRecord.Attributes().PutEmptyMap("some.record.attribute").PutEmptyMap("foo.bar").PutStr("baz", "qux") + + mustSendLogs(t, exporter, logs) + + rec.WaitItems(1) + + assert.Len(t, rec.Items(), 1) + doc := rec.Items()[0].Document + assert.JSONEq(t, `{"some.record.attribute":{"foo.bar":{"baz":"qux"}}}`, gjson.GetBytes(doc, `attributes`).Raw) + assert.JSONEq(t, `{"some.scope.attribute":{"foo.bar":{"baz":"qux"}}}`, gjson.GetBytes(doc, `scope.attributes`).Raw) + assert.JSONEq(t, `{"some.resource.attribute":{"foo.bar":{"baz":"qux"}}}`, gjson.GetBytes(doc, `resource.attributes`).Raw) + }) } func TestExporterMetrics(t *testing.T) { @@ -1166,19 +1196,19 @@ func TestExporterMetrics(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3.0]}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + Document: []byte(`{"@timestamp":"0.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"metric.foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3.0]}},"resource":{},"scope":{}}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[4,5,6,7],"values":[2.0,4.5,5.5,6.0]}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + Document: []byte(`{"@timestamp":"3600000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"metric.foo":{"counts":[4,5,6,7],"values":[2.0,4.5,5.5,6.0]}},"resource":{},"scope":{}}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.sum":"gauge_double"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.sum":1.5},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"start_timestamp":"1970-01-01T02:00:00.000000000Z"}`), + Document: []byte(`{"@timestamp":"3600000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"metric.sum":1.5},"resource":{},"scope":{},"start_timestamp":"7200000.0"}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.summary":"summary"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T03:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.summary":{"sum":1.5,"value_count":1}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"start_timestamp":"1970-01-01T03:00:00.000000000Z"}`), + Document: []byte(`{"@timestamp":"10800000.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"metric.summary":{"sum":1.5,"value_count":1}},"resource":{},"scope":{},"start_timestamp":"10800000.0"}`), }, } @@ -1247,7 +1277,7 @@ func TestExporterMetrics(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.sum":"gauge_long","metrics.summary":"summary"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"sum":0,"summary":{"sum":1.0,"value_count":10}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + Document: []byte(`{"@timestamp":"0.0","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"sum":0,"summary":{"sum":1.0,"value_count":10}},"resource":{},"scope":{}}`), }, } @@ -1297,11 +1327,11 @@ func TestExporterMetrics(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.histogram.summary":"summary"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"histogram.summary":{"sum":1.0,"value_count":10}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + Document: []byte(`{"@timestamp":"0.0","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"histogram.summary":{"sum":1.0,"value_count":10}},"resource":{},"scope":{}}`), }, { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.exphistogram.summary":"summary"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"exphistogram.summary":{"sum":1.0,"value_count":10}},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + Document: []byte(`{"@timestamp":"3600000.0","_doc_count":10,"data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"exphistogram.summary":{"sum":1.0,"value_count":10}},"resource":{},"scope":{}}`), }, } @@ -1340,7 +1370,7 @@ func TestExporterMetrics(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.foo.bar":"gauge_long","metrics.foo":"gauge_long","metrics.foo.bar.baz":"gauge_long"}}}`), - Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"foo":0,"foo.bar":0,"foo.bar.baz":0},"resource":{"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + Document: []byte(`{"@timestamp":"0.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"attributes":{},"metrics":{"foo":0,"foo.bar":0,"foo.bar.baz":0},"resource":{},"scope":{}}`), }, } @@ -1611,8 +1641,8 @@ func TestExporterTraces(t *testing.T) { }) spanLink := span.Links().AppendEmpty() - spanLink.SetTraceID(pcommon.NewTraceIDEmpty()) - spanLink.SetSpanID(pcommon.NewSpanIDEmpty()) + spanLink.SetTraceID([16]byte{1}) + spanLink.SetSpanID([8]byte{1}) spanLink.SetFlags(10) spanLink.SetDroppedAttributesCount(11) spanLink.TraceState().FromRaw("bar") @@ -1625,11 +1655,11 @@ func TestExporterTraces(t *testing.T) { expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"traces-generic.otel-default"}}`), - Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","attributes":{"attr.foo":"attr.bar"},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"traces"},"dropped_attributes_count":2,"dropped_events_count":3,"dropped_links_count":4,"duration":3600000000000,"kind":"Unspecified","links":[{"attributes":{"link.attr.foo":"link.attr.bar"},"dropped_attributes_count":11,"span_id":"","trace_id":"","trace_state":"bar"}],"name":"name","resource":{"attributes":{"resource.foo":"resource.bar"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"status":{"code":"Unset"},"trace_state":"foo"}`), + Document: []byte(`{"@timestamp":"3600000.0","attributes":{"attr.foo":"attr.bar"},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"traces"},"dropped_attributes_count":2,"dropped_events_count":3,"dropped_links_count":4,"duration":3600000000000,"kind":"Unspecified","links":[{"attributes":{"link.attr.foo":"link.attr.bar"},"dropped_attributes_count":11,"span_id":"0100000000000000","trace_id":"01000000000000000000000000000000","trace_state":"bar"}],"name":"name","resource":{"attributes":{"resource.foo":"resource.bar"}},"scope":{},"status":{"code":"Unset"},"trace_state":"foo"}`), }, { Action: []byte(`{"create":{"_index":"logs-generic.otel-default"}}`), - Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"event.attr.foo":"event.attr.bar","event.name":"exception"},"event_name":"exception","data_stream":{"dataset":"generic.otel","namespace":"default","type":"logs"},"dropped_attributes_count":1,"resource":{"attributes":{"resource.foo":"resource.bar"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + Document: []byte(`{"@timestamp":"0.0","event_name":"exception","attributes":{"event.attr.foo":"event.attr.bar","event.name":"exception"},"event_name":"exception","data_stream":{"dataset":"generic.otel","namespace":"default","type":"logs"},"dropped_attributes_count":1,"resource":{"attributes":{"resource.foo":"resource.bar"}},"scope":{}}`), }, } diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index b821b540b0ebb..c96efed1cdd24 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -7,7 +7,6 @@ require ( github.com/elastic/go-docappender/v2 v2.3.3 github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/elastic/go-structform v0.0.12 - github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.11 github.com/lestrrat-go/strftime v1.1.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.117.0 @@ -48,6 +47,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.2 // indirect diff --git a/exporter/elasticsearchexporter/internal/objmodel/objmodel.go b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go index 0f514e06aaaa6..b60a90daf0a6f 100644 --- a/exporter/elasticsearchexporter/internal/objmodel/objmodel.go +++ b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go @@ -218,12 +218,12 @@ func (doc *Document) sort() { // The filtering only keeps the last value for a key. // // Dedup ensure that keys are sorted. -func (doc *Document) Dedup(appendValueOnConflict bool) { +func (doc *Document) Dedup() { // 1. Always ensure the fields are sorted, Dedup support requires // Fields to be sorted. doc.sort() - // 2. rename fields if a primitive value is overwritten by an object if appendValueOnConflict. + // 2. rename fields if a primitive value is overwritten by an object. // For example the pair (path.x=1, path.x.a="test") becomes: // (path.x.value=1, path.x.a="test"). // @@ -236,19 +236,17 @@ func (doc *Document) Dedup(appendValueOnConflict bool) { // field in favor of the `value` field in the document. // // This step removes potential conflicts when dedotting and serializing fields. - if appendValueOnConflict { - var renamed bool - for i := 0; i < len(doc.fields)-1; i++ { - key, nextKey := doc.fields[i].key, doc.fields[i+1].key - if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' { - renamed = true - doc.fields[i].key = key + ".value" - } - } - if renamed { - doc.sort() + var renamed bool + for i := 0; i < len(doc.fields)-1; i++ { + key, nextKey := doc.fields[i].key, doc.fields[i+1].key + if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' { + renamed = true + doc.fields[i].key = key + ".value" } } + if renamed { + doc.sort() + } // 3. mark duplicates as 'ignore' // @@ -262,7 +260,7 @@ func (doc *Document) Dedup(appendValueOnConflict bool) { // 4. fix objects that might be stored in arrays for i := range doc.fields { - doc.fields[i].value.Dedup(appendValueOnConflict) + doc.fields[i].value.Dedup() } } @@ -277,19 +275,19 @@ func newJSONVisitor(w io.Writer) *json.Visitor { // Serialize writes the document to the given writer. The serializer will create nested objects if dedot is true. // // NOTE: The documented MUST be sorted if dedot is true. -func (doc *Document) Serialize(w io.Writer, dedot bool, otel bool) error { +func (doc *Document) Serialize(w io.Writer, dedot bool) error { v := newJSONVisitor(w) - return doc.iterJSON(v, dedot, otel) + return doc.iterJSON(v, dedot) } -func (doc *Document) iterJSON(v *json.Visitor, dedot bool, otel bool) error { +func (doc *Document) iterJSON(v *json.Visitor, dedot bool) error { if dedot { - return doc.iterJSONDedot(v, otel) + return doc.iterJSONDedot(v) } - return doc.iterJSONFlat(v, otel) + return doc.iterJSONFlat(v) } -func (doc *Document) iterJSONFlat(w *json.Visitor, otel bool) error { +func (doc *Document) iterJSONFlat(w *json.Visitor) error { err := w.OnObjectStart(-1, structform.AnyType) if err != nil { return err @@ -308,7 +306,7 @@ func (doc *Document) iterJSONFlat(w *json.Visitor, otel bool) error { return err } - if err := fld.value.iterJSON(w, true, otel); err != nil { + if err := fld.value.iterJSON(w, true); err != nil { return err } } @@ -316,20 +314,7 @@ func (doc *Document) iterJSONFlat(w *json.Visitor, otel bool) error { return nil } -// Under OTel mode, set of key prefixes where keys should be flattened from that level, -// such that a document (root or not) with fields {"attributes.a.b": 1} will be serialized as {"attributes": {"a.b": 1}} -// It is not aware of whether it is a root document or sub-document. -// NOTE: This works very delicately with the implementation of OTel mode that -// e.g. resource.attributes is a "resource" objmodel.Document under the root document that contains attributes -// added using AddAttributes func as flattened keys. -// Therefore, there will be correctness issues when attributes are added / used in other ways, but it is working -// for current use cases and the proper fix will be slightly too complex. YAGNI. -var otelPrefixSet = map[string]struct{}{ - "attributes.": {}, - "metrics.": {}, -} - -func (doc *Document) iterJSONDedot(w *json.Visitor, otel bool) error { +func (doc *Document) iterJSONDedot(w *json.Visitor) error { objPrefix := "" level := 0 @@ -381,15 +366,6 @@ func (doc *Document) iterJSONDedot(w *json.Visitor, otel bool) error { // increase object level up to current field for { - // Otel mode serialization - if otel { - // Check the prefix - _, isOtelPrefix := otelPrefixSet[objPrefix] - if isOtelPrefix { - break - } - } - start := len(objPrefix) idx := strings.IndexByte(key[start:], '.') if idx < 0 { @@ -412,7 +388,7 @@ func (doc *Document) iterJSONDedot(w *json.Visitor, otel bool) error { if err := w.OnKey(fieldName); err != nil { return err } - if err := fld.value.iterJSON(w, true, otel); err != nil { + if err := fld.value.iterJSON(w, true); err != nil { return err } } @@ -500,13 +476,13 @@ func (v *Value) sort() { // Dedup recursively dedups keys in stored documents. // // NOTE: The value MUST be sorted. -func (v *Value) Dedup(appendValueOnConflict bool) { +func (v *Value) Dedup() { switch v.kind { case KindObject: - v.doc.Dedup(appendValueOnConflict) + v.doc.Dedup() case KindArr: for i := range v.arr { - v.arr[i].Dedup(appendValueOnConflict) + v.arr[i].Dedup() } } } @@ -524,7 +500,7 @@ func (v *Value) IsEmpty() bool { } } -func (v *Value) iterJSON(w *json.Visitor, dedot bool, otel bool) error { +func (v *Value) iterJSON(w *json.Visitor, dedot bool) error { switch v.kind { case KindNil: return w.OnNil() @@ -549,18 +525,18 @@ func (v *Value) iterJSON(w *json.Visitor, dedot bool, otel bool) error { if len(v.doc.fields) == 0 { return w.OnNil() } - return v.doc.iterJSON(w, dedot, otel) + return v.doc.iterJSON(w, dedot) case KindUnflattenableObject: if len(v.doc.fields) == 0 { return w.OnNil() } - return v.doc.iterJSON(w, true, otel) + return v.doc.iterJSON(w, true) case KindArr: if err := w.OnArrayStart(-1, structform.AnyType); err != nil { return err } for i := range v.arr { - if err := v.arr[i].iterJSON(w, dedot, otel); err != nil { + if err := v.arr[i].iterJSON(w, dedot); err != nil { return err } } diff --git a/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go b/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go index 6805a958a019f..915ad9eceae00 100644 --- a/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go +++ b/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go @@ -86,9 +86,8 @@ func TestObjectModel_CreateMap(t *testing.T) { func TestObjectModel_Dedup(t *testing.T) { tests := map[string]struct { - build func() Document - appendValueOnConflict bool - want Document + build func() Document + want Document }{ "no duplicates": { build: func() (doc Document) { @@ -96,8 +95,7 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("c", 3) return doc }, - appendValueOnConflict: true, - want: Document{fields: []field{{"a", IntValue(1)}, {"c", IntValue(3)}}}, + want: Document{fields: []field{{"a", IntValue(1)}, {"c", IntValue(3)}}}, }, "duplicate keys": { build: func() (doc Document) { @@ -106,8 +104,7 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("a", 2) return doc }, - appendValueOnConflict: true, - want: Document{fields: []field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}}, + want: Document{fields: []field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}}, }, "duplicate after flattening from map: namespace object at end": { build: func() Document { @@ -117,8 +114,7 @@ func TestObjectModel_Dedup(t *testing.T) { am.PutEmptyMap("namespace").PutInt("a", 23) return DocumentFromAttributes(am) }, - appendValueOnConflict: true, - want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}}, + want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}}, }, "duplicate after flattening from map: namespace object at beginning": { build: func() Document { @@ -128,8 +124,7 @@ func TestObjectModel_Dedup(t *testing.T) { am.PutStr("toplevel", "test") return DocumentFromAttributes(am) }, - appendValueOnConflict: true, - want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}}, + want: Document{fields: []field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}}, }, "dedup in arrays": { build: func() (doc Document) { @@ -141,7 +136,6 @@ func TestObjectModel_Dedup(t *testing.T) { doc.Add("arr", ArrValue(Value{kind: KindObject, doc: embedded})) return doc }, - appendValueOnConflict: true, want: Document{fields: []field{{"arr", ArrValue(Value{kind: KindObject, doc: Document{fields: []field{ {"a", ignoreValue}, {"a", IntValue(2)}, @@ -154,8 +148,7 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("namespace.a", 2) return doc }, - appendValueOnConflict: true, - want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}}, + want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}}, }, "dedup removes primitive if value exists": { build: func() (doc Document) { @@ -164,25 +157,14 @@ func TestObjectModel_Dedup(t *testing.T) { doc.AddInt("namespace.value", 3) return doc }, - appendValueOnConflict: true, - want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}}, - }, - "dedup without append value on conflict": { - build: func() (doc Document) { - doc.AddInt("namespace", 1) - doc.AddInt("namespace.a", 2) - doc.AddInt("namespace.value", 3) - return doc - }, - appendValueOnConflict: false, - want: Document{fields: []field{{"namespace", IntValue(1)}, {"namespace.a", IntValue(2)}, {"namespace.value", IntValue(3)}}}, + want: Document{fields: []field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}}, }, } for name, test := range tests { t.Run(name, func(t *testing.T) { doc := test.build() - doc.Dedup(test.appendValueOnConflict) + doc.Dedup() assert.Equal(t, test.want, doc) }) } @@ -300,8 +282,8 @@ func TestDocument_Serialize_Flat(t *testing.T) { m := pcommon.NewMap() assert.NoError(t, m.FromRaw(test.attrs)) doc := DocumentFromAttributes(m) - doc.Dedup(true) - err := doc.Serialize(&buf, false, false) + doc.Dedup() + err := doc.Serialize(&buf, false) require.NoError(t, err) assert.Equal(t, test.want, buf.String()) @@ -361,8 +343,8 @@ func TestDocument_Serialize_Dedot(t *testing.T) { m := pcommon.NewMap() assert.NoError(t, m.FromRaw(test.attrs)) doc := DocumentFromAttributes(m) - doc.Dedup(true) - err := doc.Serialize(&buf, true, false) + doc.Dedup() + err := doc.Serialize(&buf, true) require.NoError(t, err) assert.Equal(t, test.want, buf.String()) @@ -410,7 +392,7 @@ func TestValue_Serialize(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { var buf strings.Builder - err := test.value.iterJSON(newJSONVisitor(&buf), false, false) + err := test.value.iterJSON(newJSONVisitor(&buf), false) require.NoError(t, err) assert.Equal(t, test.want, buf.String()) }) diff --git a/exporter/elasticsearchexporter/internal/pool/bufferpool.go b/exporter/elasticsearchexporter/internal/pool/bufferpool.go new file mode 100644 index 0000000000000..ee0b260efb370 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/pool/bufferpool.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pool // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool" + +import ( + "bytes" + "io" + "sync" +) + +type BufferPool struct { + pool *sync.Pool +} + +func NewBufferPool() *BufferPool { + return &BufferPool{pool: &sync.Pool{New: func() any { return &bytes.Buffer{} }}} +} + +func (w *BufferPool) NewPooledBuffer() PooledBuffer { + return PooledBuffer{ + Buffer: w.pool.Get().(*bytes.Buffer), + pool: w.pool, + } +} + +type PooledBuffer struct { + Buffer *bytes.Buffer + pool *sync.Pool +} + +func (p PooledBuffer) Recycle() { + p.Buffer.Reset() + p.pool.Put(p.Buffer) +} + +func (p PooledBuffer) WriteTo(w io.Writer) (n int64, err error) { + defer p.Recycle() + return p.Buffer.WriteTo(w) +} diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index f1c0e615f8eb5..b18c9d2f2917b 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -13,10 +13,8 @@ import ( "hash/fnv" "math" "slices" - "strings" "time" - jsoniter "github.com/json-iterator/go" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -77,11 +75,12 @@ var resourceAttrsToPreserve = map[string]bool{ var ErrInvalidTypeForBodyMapMode = errors.New("invalid log record body type for 'bodymap' mapping mode") type mappingModel interface { - encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error) - encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string) ([]byte, error) - encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) *objmodel.Document - upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, string, pcommon.InstrumentationScope, string, pmetric.Metric, dataPoint) error - encodeDocument(objmodel.Document) ([]byte, error) + encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string, *bytes.Buffer) error + encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string, *bytes.Buffer) error + encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) + hashDataPoint(dataPoint) uint32 + encodeDocument(objmodel.Document, *bytes.Buffer) error + encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) } // encodeModel tries to keep the event as close to the original open telemetry semantics as is. @@ -103,6 +102,7 @@ type dataPoint interface { DynamicTemplate(pmetric.Metric) string DocCount() uint64 HasMappingHint(mappingHint) bool + Metric() pmetric.Metric } const ( @@ -111,24 +111,21 @@ const ( attributeField = "attribute" ) -func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) { +func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) error { var document objmodel.Document switch m.mode { case MappingECS: document = m.encodeLogECSMode(resource, record, scope) case MappingOTel: - document = m.encodeLogOTelMode(resource, resourceSchemaURL, record, scope, scopeSchemaURL) + return serializeLog(resource, resourceSchemaURL, scope, scopeSchemaURL, record, buf) case MappingBodyMap: - return m.encodeLogBodyMapMode(record) + return m.encodeLogBodyMapMode(record, buf) default: document = m.encodeLogDefaultMode(resource, record, scope) } - // For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false - document.Dedup(m.mode != MappingOTel) + document.Dedup() - var buf bytes.Buffer - err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel) - return buf.Bytes(), err + return document.Serialize(buf, m.dedot) } func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) objmodel.Document { @@ -152,93 +149,14 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo return document } -func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord) ([]byte, error) { +func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord, buf *bytes.Buffer) error { body := record.Body() if body.Type() != pcommon.ValueTypeMap { - return nil, fmt.Errorf("%w: %q", ErrInvalidTypeForBodyMapMode, body.Type()) + return fmt.Errorf("%w: %q", ErrInvalidTypeForBodyMapMode, body.Type()) } - return jsoniter.Marshal(body.Map().AsRaw()) -} - -func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document { - var document objmodel.Document - - docTimeStamp := record.Timestamp() - if docTimeStamp.AsTime().UnixNano() == 0 { - docTimeStamp = record.ObservedTimestamp() - } - - document.AddTimestamp("@timestamp", docTimeStamp) - document.AddTimestamp("observed_timestamp", record.ObservedTimestamp()) - - document.AddTraceID("trace_id", record.TraceID()) - document.AddSpanID("span_id", record.SpanID()) - document.AddString("severity_text", record.SeverityText()) - document.AddInt("severity_number", int64(record.SeverityNumber())) - document.AddInt("dropped_attributes_count", int64(record.DroppedAttributesCount())) - - if record.EventName() != "" { - document.AddString("event_name", record.EventName()) - } else if eventNameAttr, ok := record.Attributes().Get("event.name"); ok && eventNameAttr.Str() != "" { - document.AddString("event_name", eventNameAttr.Str()) - } - - m.encodeAttributesOTelMode(&document, record.Attributes()) - m.encodeResourceOTelMode(&document, resource, resourceSchemaURL) - m.encodeScopeOTelMode(&document, scope, scopeSchemaURL) - - // Body - setOTelLogBody(&document, record) - - return document -} - -func setOTelLogBody(doc *objmodel.Document, record plog.LogRecord) { - // Determine if this log record is an event, as they are mapped differently - // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/general/events.md - _, isEvent := record.Attributes().Get("event.name") - isEvent = isEvent || record.EventName() != "" - - body := record.Body() - switch body.Type() { - case pcommon.ValueTypeMap: - if isEvent { - doc.AddAttribute("body.structured", body) - } else { - doc.AddAttribute("body.flattened", body) - } - case pcommon.ValueTypeSlice: - // output must be an array of objects due to ES limitations - // otherwise, wrap the array in an object - s := body.Slice() - allMaps := true - for i := 0; i < s.Len(); i++ { - if s.At(i).Type() != pcommon.ValueTypeMap { - allMaps = false - } - } - - var outVal pcommon.Value - if allMaps { - outVal = body - } else { - vm := pcommon.NewValueMap() - m := vm.SetEmptyMap() - body.Slice().CopyTo(m.PutEmptySlice("value")) - outVal = vm - } - - if isEvent { - doc.AddAttribute("body.structured", outVal) - } else { - doc.AddAttribute("body.flattened", outVal) - } - case pcommon.ValueTypeStr: - doc.AddString("body.text", body.Str()) - default: - doc.AddString("body.text", body.AsString()) - } + serializeMap(body.Map(), buf) + return nil } func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) objmodel.Document { @@ -283,110 +201,64 @@ func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.Lo return document } -func (m *encodeModel) encodeDocument(document objmodel.Document) ([]byte, error) { - // For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false - document.Dedup(m.mode != MappingOTel) +func (m *encodeModel) encodeDocument(document objmodel.Document, buf *bytes.Buffer) error { + document.Dedup() - var buf bytes.Buffer - err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel) + err := document.Serialize(buf, m.dedot) if err != nil { - return nil, err + return err } - return buf.Bytes(), nil + return nil } // upsertMetricDataPointValue upserts a datapoint value to documents which is already hashed by resource and index -func (m *encodeModel) upsertMetricDataPointValue(documents map[uint32]objmodel.Document, resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, metric pmetric.Metric, dp dataPoint) error { +func (m *encodeModel) hashDataPoint(dp dataPoint) uint32 { switch m.mode { case MappingOTel: - return m.upsertMetricDataPointValueOTelMode(documents, resource, resourceSchemaURL, scope, scopeSchemaURL, metric, dp) - case MappingECS: - return m.upsertMetricDataPointValueECSMode(documents, resource, resourceSchemaURL, scope, scopeSchemaURL, metric, dp) + return metricOTelHash(dp, dp.Metric().Unit()) default: // Defaults to ECS for backward compatibility - return m.upsertMetricDataPointValueECSMode(documents, resource, resourceSchemaURL, scope, scopeSchemaURL, metric, dp) + return metricECSHash(dp.Timestamp(), dp.Attributes()) } } -func (m *encodeModel) upsertMetricDataPointValueECSMode(documents map[uint32]objmodel.Document, resource pcommon.Resource, _ string, _ pcommon.InstrumentationScope, _ string, metric pmetric.Metric, dp dataPoint) error { - value, err := dp.Value() - if err != nil { - return err - } - - hash := metricECSHash(dp.Timestamp(), dp.Attributes()) - var ( - document objmodel.Document - ok bool - ) - if document, ok = documents[hash]; !ok { - encodeAttributesECSMode(&document, resource.Attributes(), resourceAttrsConversionMap, resourceAttrsToPreserve) - document.AddTimestamp("@timestamp", dp.Timestamp()) - document.AddAttributes("", dp.Attributes()) - } - - document.AddAttribute(metric.Name(), value) - - documents[hash] = document - return nil -} - -func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]objmodel.Document, resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, metric pmetric.Metric, dp dataPoint) error { - value, err := dp.Value() - if err != nil { - return err - } +func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) { + dp0 := dataPoints[0] + var document objmodel.Document + encodeAttributesECSMode(&document, resource.Attributes(), resourceAttrsConversionMap, resourceAttrsToPreserve) + document.AddTimestamp("@timestamp", dp0.Timestamp()) + document.AddAttributes("", dp0.Attributes()) - // documents is per-resource. Therefore, there is no need to hash resource attributes - hash := metricOTelHash(dp, scope.Attributes(), metric.Unit()) - var ( - document objmodel.Document - ok bool - ) - if document, ok = documents[hash]; !ok { - document.AddTimestamp("@timestamp", dp.Timestamp()) - if dp.StartTimestamp() != 0 { - document.AddTimestamp("start_timestamp", dp.StartTimestamp()) + for _, dp := range dataPoints { + value, err := dp.Value() + if err != nil { + *validationErrors = append(*validationErrors, err) + continue } - document.AddString("unit", metric.Unit()) - - m.encodeAttributesOTelMode(&document, dp.Attributes()) - m.encodeResourceOTelMode(&document, resource, resourceSchemaURL) - m.encodeScopeOTelMode(&document, scope, scopeSchemaURL) + document.AddAttribute(dp.Metric().Name(), value) } + err := m.encodeDocument(document, buf) - if dp.HasMappingHint(hintDocCount) { - docCount := dp.DocCount() - document.AddUInt("_doc_count", docCount) - } + return document.DynamicTemplates(), err +} - switch value.Type() { - case pcommon.ValueTypeMap: - m := pcommon.NewMap() - value.Map().CopyTo(m) - document.Add("metrics."+metric.Name(), objmodel.UnflattenableObjectValue(m)) +func (m *encodeModel) encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) { + switch m.mode { + case MappingOTel: + return serializeMetrics(resource, resourceSchemaURL, scope, scopeSchemaURL, dataPoints, validationErrors, buf) default: - document.Add("metrics."+metric.Name(), objmodel.ValueFromAttribute(value)) + return m.encodeDataPointsECSMode(resource, dataPoints, validationErrors, buf) } - // TODO: support quantiles - // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34561 - - // DynamicTemplate returns the name of dynamic template that applies to the metric and data point, - // so that the field is indexed into Elasticsearch with the correct mapping. The name should correspond to a - // dynamic template that is defined in ES mapping, e.g. - // https://github.com/elastic/elasticsearch/blob/8.15/x-pack/plugin/core/template-resources/src/main/resources/metrics%40mappings.json - document.AddDynamicTemplate("metrics."+metric.Name(), dp.DynamicTemplate(metric)) - documents[hash] = document - return nil } type summaryDataPoint struct { pmetric.SummaryDataPoint mappingHintGetter + metric pmetric.Metric } -func newSummaryDataPoint(dp pmetric.SummaryDataPoint) summaryDataPoint { - return summaryDataPoint{SummaryDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes())} +func newSummaryDataPoint(metric pmetric.Metric, dp pmetric.SummaryDataPoint) summaryDataPoint { + return summaryDataPoint{SummaryDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes()), metric: metric} } func (dp summaryDataPoint) Value() (pcommon.Value, error) { @@ -407,13 +279,18 @@ func (dp summaryDataPoint) DocCount() uint64 { return dp.Count() } +func (dp summaryDataPoint) Metric() pmetric.Metric { + return dp.metric +} + type exponentialHistogramDataPoint struct { pmetric.ExponentialHistogramDataPoint mappingHintGetter + metric pmetric.Metric } -func newExponentialHistogramDataPoint(dp pmetric.ExponentialHistogramDataPoint) exponentialHistogramDataPoint { - return exponentialHistogramDataPoint{ExponentialHistogramDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes())} +func newExponentialHistogramDataPoint(metric pmetric.Metric, dp pmetric.ExponentialHistogramDataPoint) exponentialHistogramDataPoint { + return exponentialHistogramDataPoint{ExponentialHistogramDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes()), metric: metric} } func (dp exponentialHistogramDataPoint) Value() (pcommon.Value, error) { @@ -454,13 +331,18 @@ func (dp exponentialHistogramDataPoint) DocCount() uint64 { return dp.Count() } +func (dp exponentialHistogramDataPoint) Metric() pmetric.Metric { + return dp.metric +} + type histogramDataPoint struct { pmetric.HistogramDataPoint mappingHintGetter + metric pmetric.Metric } -func newHistogramDataPoint(dp pmetric.HistogramDataPoint) histogramDataPoint { - return histogramDataPoint{HistogramDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes())} +func newHistogramDataPoint(metric pmetric.Metric, dp pmetric.HistogramDataPoint) histogramDataPoint { + return histogramDataPoint{HistogramDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes()), metric: metric} } func (dp histogramDataPoint) Value() (pcommon.Value, error) { @@ -485,6 +367,10 @@ func (dp histogramDataPoint) DocCount() uint64 { return dp.HistogramDataPoint.Count() } +func (dp histogramDataPoint) Metric() pmetric.Metric { + return dp.metric +} + func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) { // Histogram conversion function is from // https://github.com/elastic/apm-data/blob/3b28495c3cbdc0902983134276eb114231730249/input/otlp/metrics.go#L277 @@ -536,10 +422,11 @@ func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) { type numberDataPoint struct { pmetric.NumberDataPoint mappingHintGetter + metric pmetric.Metric } -func newNumberDataPoint(dp pmetric.NumberDataPoint) numberDataPoint { - return numberDataPoint{NumberDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes())} +func newNumberDataPoint(metric pmetric.Metric, dp pmetric.NumberDataPoint) numberDataPoint { + return numberDataPoint{NumberDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes()), metric: metric} } func (dp numberDataPoint) Value() (pcommon.Value, error) { @@ -590,128 +477,23 @@ func (dp numberDataPoint) DocCount() uint64 { return 1 } -var errInvalidNumberDataPoint = errors.New("invalid number data point") - -func (m *encodeModel) encodeResourceOTelMode(document *objmodel.Document, resource pcommon.Resource, resourceSchemaURL string) { - resourceMapVal := pcommon.NewValueMap() - resourceMap := resourceMapVal.Map() - if resourceSchemaURL != "" { - resourceMap.PutStr("schema_url", resourceSchemaURL) - } - resourceMap.PutInt("dropped_attributes_count", int64(resource.DroppedAttributesCount())) - resourceAttrMap := resourceMap.PutEmptyMap("attributes") - resource.Attributes().CopyTo(resourceAttrMap) - resourceAttrMap.RemoveIf(func(key string, _ pcommon.Value) bool { - switch key { - case dataStreamType, dataStreamDataset, dataStreamNamespace: - return true - } - return false - }) - mergeGeolocation(resourceAttrMap) - document.Add("resource", objmodel.ValueFromAttribute(resourceMapVal)) +func (dp numberDataPoint) Metric() pmetric.Metric { + return dp.metric } -func (m *encodeModel) encodeScopeOTelMode(document *objmodel.Document, scope pcommon.InstrumentationScope, scopeSchemaURL string) { - scopeMapVal := pcommon.NewValueMap() - scopeMap := scopeMapVal.Map() - if scope.Name() != "" { - scopeMap.PutStr("name", scope.Name()) - } - if scope.Version() != "" { - scopeMap.PutStr("version", scope.Version()) - } - if scopeSchemaURL != "" { - scopeMap.PutStr("schema_url", scopeSchemaURL) - } - scopeMap.PutInt("dropped_attributes_count", int64(scope.DroppedAttributesCount())) - scopeAttrMap := scopeMap.PutEmptyMap("attributes") - scope.Attributes().CopyTo(scopeAttrMap) - scopeAttrMap.RemoveIf(func(key string, _ pcommon.Value) bool { - switch key { - case dataStreamType, dataStreamDataset, dataStreamNamespace: - return true - } - return false - }) - mergeGeolocation(scopeAttrMap) - document.Add("scope", objmodel.ValueFromAttribute(scopeMapVal)) -} - -func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attributeMap pcommon.Map) { - attrsCopy := pcommon.NewMap() // Copy to avoid mutating original map - attributeMap.CopyTo(attrsCopy) - attrsCopy.RemoveIf(func(key string, val pcommon.Value) bool { - switch key { - case dataStreamType, dataStreamDataset, dataStreamNamespace: - // At this point the data_stream attributes are expected to be in the record attributes, - // updated by the router. - // Move them to the top of the document and remove them from the record - document.AddAttribute(key, val) - return true - case mappingHintsAttrKey: - return true - } - return false - }) - mergeGeolocation(attrsCopy) - document.AddAttributes("attributes", attrsCopy) -} +var errInvalidNumberDataPoint = errors.New("invalid number data point") -func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) { +func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) error { var document objmodel.Document switch m.mode { case MappingOTel: - document = m.encodeSpanOTelMode(resource, resourceSchemaURL, span, scope, scopeSchemaURL) + return serializeSpan(resource, resourceSchemaURL, scope, scopeSchemaURL, span, buf) default: document = m.encodeSpanDefaultMode(resource, span, scope) } - // For OTel mode, prefix conflicts are not a problem as otel-data has subobjects: false - document.Dedup(m.mode != MappingOTel) - var buf bytes.Buffer - err := document.Serialize(&buf, m.dedot, m.mode == MappingOTel) - return buf.Bytes(), err -} - -func (m *encodeModel) encodeSpanOTelMode(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document { - var document objmodel.Document - document.AddTimestamp("@timestamp", span.StartTimestamp()) - document.AddTraceID("trace_id", span.TraceID()) - document.AddSpanID("span_id", span.SpanID()) - document.AddString("trace_state", span.TraceState().AsRaw()) - document.AddSpanID("parent_span_id", span.ParentSpanID()) - document.AddString("name", span.Name()) - document.AddString("kind", span.Kind().String()) - document.AddUInt("duration", uint64(span.EndTimestamp()-span.StartTimestamp())) - - m.encodeAttributesOTelMode(&document, span.Attributes()) - - document.AddInt("dropped_attributes_count", int64(span.DroppedAttributesCount())) - document.AddInt("dropped_events_count", int64(span.DroppedEventsCount())) - - links := pcommon.NewValueSlice() - linkSlice := links.SetEmptySlice() - spanLinks := span.Links() - for i := 0; i < spanLinks.Len(); i++ { - linkMap := linkSlice.AppendEmpty().SetEmptyMap() - spanLink := spanLinks.At(i) - linkMap.PutStr("trace_id", spanLink.TraceID().String()) - linkMap.PutStr("span_id", spanLink.SpanID().String()) - linkMap.PutStr("trace_state", spanLink.TraceState().AsRaw()) - mAttr := linkMap.PutEmptyMap("attributes") - spanLink.Attributes().CopyTo(mAttr) - linkMap.PutInt("dropped_attributes_count", int64(spanLink.DroppedAttributesCount())) - } - document.AddAttribute("links", links) - - document.AddInt("dropped_links_count", int64(span.DroppedLinksCount())) - document.AddString("status.message", span.Status().Message()) - document.AddString("status.code", span.Status().Code().String()) - - m.encodeResourceOTelMode(&document, resource, resourceSchemaURL) - m.encodeScopeOTelMode(&document, scope, scopeSchemaURL) - - return document + document.Dedup() + err := document.Serialize(buf, m.dedot) + return err } func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) objmodel.Document { @@ -734,26 +516,13 @@ func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptra return document } -func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) *objmodel.Document { +func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, buf *bytes.Buffer) { if m.mode != MappingOTel { // Currently span events are stored separately only in OTel mapping mode. // In other modes, they are stored within the span document. - return nil + return } - var document objmodel.Document - document.AddTimestamp("@timestamp", spanEvent.Timestamp()) - document.AddString("event_name", spanEvent.Name()) - // todo remove before GA, make sure Kibana uses event_name - document.AddString("attributes.event.name", spanEvent.Name()) - document.AddSpanID("span_id", span.SpanID()) - document.AddTraceID("trace_id", span.TraceID()) - document.AddInt("dropped_attributes_count", int64(spanEvent.DroppedAttributesCount())) - - m.encodeAttributesOTelMode(&document, spanEvent.Attributes()) - m.encodeResourceOTelMode(&document, resource, resourceSchemaURL) - m.encodeScopeOTelMode(&document, scope, scopeSchemaURL) - - return &document + serializeSpanEvent(resource, resourceSchemaURL, scope, scopeSchemaURL, span, spanEvent, buf) } func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map) { @@ -932,7 +701,7 @@ func metricECSHash(timestamp pcommon.Timestamp, attributes pcommon.Map) uint32 { return hasher.Sum32() } -func metricOTelHash(dp dataPoint, scopeAttrs pcommon.Map, unit string) uint32 { +func metricOTelHash(dp dataPoint, unit string) uint32 { hasher := fnv.New32a() timestampBuf := make([]byte, 8) @@ -944,7 +713,6 @@ func metricOTelHash(dp dataPoint, scopeAttrs pcommon.Map, unit string) uint32 { hasher.Write([]byte(unit)) - mapHashExcludeReservedAttrs(hasher, scopeAttrs) mapHashExcludeReservedAttrs(hasher, dp.Attributes(), mappingHintsAttrKey) return hasher.Sum32() @@ -1012,79 +780,6 @@ func sliceHash(h hash.Hash, s pcommon.Slice) { } } -// mergeGeolocation mutates attributes map to merge all `geo.location.{lon,lat}`, -// and namespaced `*.geo.location.{lon,lat}` to unnamespaced and namespaced `geo.location`. -// This is to match the geo_point type in Elasticsearch. -func mergeGeolocation(attributes pcommon.Map) { - const ( - lonKey = "geo.location.lon" - latKey = "geo.location.lat" - mergedKey = "geo.location" - ) - // Prefix is the attribute name without lonKey or latKey suffix - // e.g. prefix of "foo.bar.geo.location.lon" is "foo.bar.", prefix of "geo.location.lon" is "". - prefixToGeo := make(map[string]struct { - lon, lat float64 - lonSet, latSet bool - }) - setLon := func(prefix string, v float64) { - g := prefixToGeo[prefix] - g.lon = v - g.lonSet = true - prefixToGeo[prefix] = g - } - setLat := func(prefix string, v float64) { - g := prefixToGeo[prefix] - g.lat = v - g.latSet = true - prefixToGeo[prefix] = g - } - attributes.RemoveIf(func(key string, val pcommon.Value) bool { - if val.Type() != pcommon.ValueTypeDouble { - return false - } - - if key == lonKey { - setLon("", val.Double()) - return true - } else if key == latKey { - setLat("", val.Double()) - return true - } else if namespace, found := strings.CutSuffix(key, "."+lonKey); found { - prefix := namespace + "." - setLon(prefix, val.Double()) - return true - } else if namespace, found := strings.CutSuffix(key, "."+latKey); found { - prefix := namespace + "." - setLat(prefix, val.Double()) - return true - } - return false - }) - - for prefix, geo := range prefixToGeo { - if geo.lonSet && geo.latSet { - key := prefix + mergedKey - // Geopoint expressed as an array with the format: [lon, lat] - s := attributes.PutEmptySlice(key) - s.EnsureCapacity(2) - s.AppendEmpty().SetDouble(geo.lon) - s.AppendEmpty().SetDouble(geo.lat) - continue - } - - // Place the attributes back if lon and lat are not present together - if geo.lonSet { - key := prefix + lonKey - attributes.PutDouble(key, geo.lon) - } - if geo.latSet { - key := prefix + latKey - attributes.PutDouble(key, geo.lat) - } - } -} - func safeUint64ToInt64(v uint64) int64 { if v > math.MaxInt64 { return math.MaxInt64 diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index dddf46a14a014..772674b9af862 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -56,9 +56,10 @@ var ( func TestEncodeSpan(t *testing.T) { model := &encodeModel{dedot: false} td := mockResourceSpans() - spanByte, err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), "", td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope(), "") + var buf bytes.Buffer + err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), "", td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope(), "", &buf) assert.NoError(t, err) - assert.Equal(t, expectedSpanBody, string(spanByte)) + assert.Equal(t, expectedSpanBody, buf.String()) } func TestEncodeLog(t *testing.T) { @@ -66,26 +67,29 @@ func TestEncodeLog(t *testing.T) { model := &encodeModel{dedot: false} td := mockResourceLogs() td.ScopeLogs().At(0).LogRecords().At(0).SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Date(2023, 4, 19, 3, 4, 5, 6, time.UTC))) - logByte, err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl()) + var buf bytes.Buffer + err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), &buf) assert.NoError(t, err) - assert.Equal(t, expectedLogBody, string(logByte)) + assert.Equal(t, expectedLogBody, buf.String()) }) t.Run("both timestamp and observedTimestamp empty", func(t *testing.T) { model := &encodeModel{dedot: false} td := mockResourceLogs() - logByte, err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl()) + var buf bytes.Buffer + err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), &buf) assert.NoError(t, err) - assert.Equal(t, expectedLogBodyWithEmptyTimestamp, string(logByte)) + assert.Equal(t, expectedLogBodyWithEmptyTimestamp, buf.String()) }) t.Run("dedot true", func(t *testing.T) { model := &encodeModel{dedot: true} td := mockResourceLogs() td.Resource().Attributes().PutStr("foo.bar", "baz") - logByte, err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl()) + var buf bytes.Buffer + err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), &buf) require.NoError(t, err) - require.Equal(t, expectedLogBodyDeDottedWithEmptyTimestamp, string(logByte)) + require.Equal(t, expectedLogBodyDeDottedWithEmptyTimestamp, buf.String()) }) } @@ -99,26 +103,31 @@ func TestEncodeMetric(t *testing.T) { mode: MappingECS, } - docs := make(map[uint32]objmodel.Document) + groupedDataPoints := make(map[uint32][]dataPoint) var docsBytes [][]byte - for i := 0; i < metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len(); i++ { - err := model.upsertMetricDataPointValue( - docs, - metrics.ResourceMetrics().At(0).Resource(), - "", - metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope(), - "", - metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0), - newNumberDataPoint(metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(i)), - ) - require.NoError(t, err) + rm := metrics.ResourceMetrics().At(0) + sm := rm.ScopeMetrics().At(0) + m := sm.Metrics().At(0) + dps := m.Sum().DataPoints() + for i := 0; i < dps.Len(); i++ { + dp := newNumberDataPoint(m, dps.At(i)) + dpHash := model.hashDataPoint(dp) + dataPoints, ok := groupedDataPoints[dpHash] + if !ok { + groupedDataPoints[dpHash] = []dataPoint{dp} + } else { + groupedDataPoints[dpHash] = append(dataPoints, dp) + } } - for _, doc := range docs { - bytes, err := model.encodeDocument(doc) + for _, dataPoints := range groupedDataPoints { + var buf bytes.Buffer + errors := make([]error, 0) + _, err := model.encodeMetrics(rm.Resource(), rm.SchemaUrl(), sm.Scope(), sm.SchemaUrl(), dataPoints, &errors, &buf) + require.Empty(t, errors, err) require.NoError(t, err) - docsBytes = append(docsBytes, bytes) + docsBytes = append(docsBytes, buf.Bytes()) } allDocsSorted := docBytesToSortedString(docsBytes) @@ -336,10 +345,11 @@ func TestEncodeLogECSModeDuplication(t *testing.T) { mode: MappingECS, dedot: true, } - doc, err := m.encodeLog(resource, "", record, scope, "") + var buf bytes.Buffer + err = m.encodeLog(resource, "", record, scope, "", &buf) require.NoError(t, err) - assert.Equal(t, want, string(doc)) + assert.Equal(t, want, buf.String()) } func TestEncodeLogECSMode(t *testing.T) { @@ -409,7 +419,7 @@ func TestEncodeLogECSMode(t *testing.T) { var buf bytes.Buffer m := encodeModel{} doc := m.encodeLogECSMode(resource, record, scope) - require.NoError(t, doc.Serialize(&buf, false, false)) + require.NoError(t, doc.Serialize(&buf, false)) require.JSONEq(t, `{ "@timestamp": "2024-03-12T20:00:41.123456789Z", @@ -540,7 +550,7 @@ func TestEncodeLogECSModeAgentName(t *testing.T) { var buf bytes.Buffer m := encodeModel{} doc := m.encodeLogECSMode(resource, record, scope) - require.NoError(t, doc.Serialize(&buf, false, false)) + require.NoError(t, doc.Serialize(&buf, false)) require.JSONEq(t, fmt.Sprintf(`{ "@timestamp": "2024-03-13T23:50:59.123456789Z", "agent.name": %q @@ -592,7 +602,7 @@ func TestEncodeLogECSModeAgentVersion(t *testing.T) { var buf bytes.Buffer m := encodeModel{} doc := m.encodeLogECSMode(resource, record, scope) - require.NoError(t, doc.Serialize(&buf, false, false)) + require.NoError(t, doc.Serialize(&buf, false)) if test.expectedAgentVersion == "" { require.JSONEq(t, `{ @@ -699,7 +709,7 @@ func TestEncodeLogECSModeHostOSType(t *testing.T) { var buf bytes.Buffer m := encodeModel{} doc := m.encodeLogECSMode(resource, record, scope) - require.NoError(t, doc.Serialize(&buf, false, false)) + require.NoError(t, doc.Serialize(&buf, false)) expectedJSON := `{"@timestamp":"2024-03-13T23:50:59.123456789Z", "agent.name":"otlp"` if test.expectedHostOsName != "" { @@ -750,7 +760,7 @@ func TestEncodeLogECSModeTimestamps(t *testing.T) { var buf bytes.Buffer m := encodeModel{} doc := m.encodeLogECSMode(resource, record, scope) - require.NoError(t, doc.Serialize(&buf, false, false)) + require.NoError(t, doc.Serialize(&buf, false)) require.JSONEq(t, fmt.Sprintf( `{"@timestamp":%q,"agent.name":"otlp"}`, test.expectedTimestamp, @@ -904,8 +914,6 @@ func TestMapLogAttributesToECS(t *testing.T) { type OTelRecord struct { TraceID OTelTraceID `json:"trace_id"` SpanID OTelSpanID `json:"span_id"` - Timestamp time.Time `json:"@timestamp"` - ObservedTimestamp time.Time `json:"observed_timestamp"` SeverityNumber int32 `json:"severity_number"` SeverityText string `json:"severity_text"` EventName string `json:"event_name"` @@ -1114,7 +1122,8 @@ func TestEncodeLogOtelMode(t *testing.T) { // This sets the data_stream values default or derived from the record/scope/resources routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), "", true, scope.Name()) - b, err := m.encodeLog(resource, tc.rec.Resource.SchemaURL, record, scope, tc.rec.Scope.SchemaURL) + var buf bytes.Buffer + err := m.encodeLog(resource, tc.rec.Resource.SchemaURL, record, scope, tc.rec.Scope.SchemaURL, &buf) require.NoError(t, err) want := tc.rec @@ -1123,7 +1132,7 @@ func TestEncodeLogOtelMode(t *testing.T) { } var got OTelRecord - err = json.Unmarshal(b, &got) + err = json.Unmarshal(buf.Bytes(), &got) require.NoError(t, err) @@ -1134,8 +1143,6 @@ func TestEncodeLogOtelMode(t *testing.T) { // helper function that creates the OTel LogRecord from the test structure func createTestOTelLogRecord(t *testing.T, rec OTelRecord) (plog.LogRecord, pcommon.InstrumentationScope, pcommon.Resource) { record := plog.NewLogRecord() - record.SetTimestamp(pcommon.Timestamp(uint64(rec.Timestamp.UnixNano()))) //nolint:gosec // this input is controlled by tests - record.SetObservedTimestamp(pcommon.Timestamp(uint64(rec.ObservedTimestamp.UnixNano()))) //nolint:gosec // this input is controlled by tests record.SetTraceID(pcommon.TraceID(rec.TraceID)) record.SetSpanID(pcommon.SpanID(rec.SpanID)) @@ -1246,9 +1253,11 @@ func TestEncodeLogScalarObjectConflict(t *testing.T) { td := mockResourceLogs() td.ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("foo", "scalar") td.ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("foo.bar", "baz") - encoded, err := model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "") + var buf bytes.Buffer + err := model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "", &buf) assert.NoError(t, err) + encoded := buf.Bytes() assert.True(t, gjson.ValidBytes(encoded)) assert.False(t, gjson.GetBytes(encoded, "Attributes\\.foo").Exists()) fooValue := gjson.GetBytes(encoded, "Attributes\\.foo\\.value") @@ -1258,9 +1267,11 @@ func TestEncodeLogScalarObjectConflict(t *testing.T) { // If there is an attribute named "foo.value", then "foo" would be omitted rather than renamed. td.ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("foo.value", "foovalue") - encoded, err = model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "") + buf = bytes.Buffer{} + err = model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "", &buf) assert.NoError(t, err) + encoded = buf.Bytes() assert.False(t, gjson.GetBytes(encoded, "Attributes\\.foo").Exists()) fooValue = gjson.GetBytes(encoded, "Attributes\\.foo\\.value") assert.Equal(t, "foovalue", fooValue.Str) @@ -1287,7 +1298,8 @@ func TestEncodeLogBodyMapMode(t *testing.T) { bodyMap.CopyTo(logRecord.Body().SetEmptyMap()) m := encodeModel{} - got, err := m.encodeLogBodyMapMode(logRecord) + var buf bytes.Buffer + err := m.encodeLogBodyMapMode(logRecord, &buf) require.NoError(t, err) require.JSONEq(t, `{ @@ -1297,44 +1309,11 @@ func TestEncodeLogBodyMapMode(t *testing.T) { "key.a": "a", "key.a.b": "b", "pi": 3.14 - }`, string(got)) + }`, buf.String()) // invalid body map logRecord.Body().SetEmptySlice() - _, err = m.encodeLogBodyMapMode(logRecord) + err = m.encodeLogBodyMapMode(logRecord, &bytes.Buffer{}) require.Error(t, err) require.ErrorIs(t, err, ErrInvalidTypeForBodyMapMode) } - -func TestMergeGeolocation(t *testing.T) { - attributes := map[string]any{ - "geo.location.lon": 1.1, - "geo.location.lat": 2.2, - "foo.bar.geo.location.lon": 3.3, - "foo.bar.geo.location.lat": 4.4, - "a.geo.location.lon": 5.5, - "b.geo.location.lat": 6.6, - "unrelatedgeo.location.lon": 7.7, - "unrelatedgeo.location.lat": 8.8, - "d": 9.9, - "e.geo.location.lon": "foo", - "e.geo.location.lat": "bar", - } - wantAttributes := map[string]any{ - "geo.location": []any{1.1, 2.2}, - "foo.bar.geo.location": []any{3.3, 4.4}, - "a.geo.location.lon": 5.5, - "b.geo.location.lat": 6.6, - "unrelatedgeo.location.lon": 7.7, - "unrelatedgeo.location.lat": 8.8, - "d": 9.9, - "e.geo.location.lon": "foo", - "e.geo.location.lat": "bar", - } - input := pcommon.NewMap() - err := input.FromRaw(attributes) - require.NoError(t, err) - mergeGeolocation(input) - after := input.AsRaw() - assert.Equal(t, wantAttributes, after) -} diff --git a/exporter/elasticsearchexporter/pdata_serializer.go b/exporter/elasticsearchexporter/pdata_serializer.go new file mode 100644 index 0000000000000..5fbcc5c91fa0a --- /dev/null +++ b/exporter/elasticsearchexporter/pdata_serializer.go @@ -0,0 +1,478 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + +import ( + "bytes" + "encoding/hex" + "fmt" + "strconv" + "strings" + + "github.com/elastic/go-structform" + "github.com/elastic/go-structform/json" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +const tsLayout = "2006-01-02T15:04:05.000000000Z" + +func serializeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, buf *bytes.Buffer) (map[string]string, error) { + if len(dataPoints) == 0 { + return nil, nil + } + dp0 := dataPoints[0] + + v := json.NewVisitor(buf) + // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. + // This is required to generate the correct dynamic mapping in ES. + v.SetExplicitRadixPoint(true) + _ = v.OnObjectStart(-1, structform.AnyType) + writeTimestampField(v, "@timestamp", dp0.Timestamp()) + if dp0.StartTimestamp() != 0 { + writeTimestampField(v, "start_timestamp", dp0.StartTimestamp()) + } + writeStringFieldSkipDefault(v, "unit", dp0.Metric().Unit()) + writeDataStream(v, dp0.Attributes()) + writeAttributes(v, dp0.Attributes(), true) + writeResource(v, resource, resourceSchemaURL, true) + writeScope(v, scope, scopeSchemaURL, true) + dynamicTemplates := serializeDataPoints(v, dataPoints, validationErrors) + _ = v.OnObjectFinished() + return dynamicTemplates, nil +} + +func serializeDataPoints(v *json.Visitor, dataPoints []dataPoint, validationErrors *[]error) map[string]string { + _ = v.OnKey("metrics") + _ = v.OnObjectStart(-1, structform.AnyType) + + dynamicTemplates := make(map[string]string, len(dataPoints)) + var docCount uint64 + metricNames := make(map[string]bool, len(dataPoints)) + for _, dp := range dataPoints { + metric := dp.Metric() + if _, present := metricNames[metric.Name()]; present { + *validationErrors = append( + *validationErrors, + fmt.Errorf( + "metric with name '%s' has already been serialized in document with timestamp %s", + metric.Name(), + dp.Timestamp().AsTime().UTC().Format(tsLayout), + ), + ) + continue + } + metricNames[metric.Name()] = true + // TODO here's potential for more optimization by directly serializing the value instead of allocating a pcommon.Value + // the tradeoff is that this would imply a duplicated logic for the ECS mode + value, err := dp.Value() + if dp.HasMappingHint(hintDocCount) { + docCount = dp.DocCount() + } + if err != nil { + *validationErrors = append(*validationErrors, err) + continue + } + _ = v.OnKey(metric.Name()) + // TODO: support quantiles + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34561 + writeValue(v, value, false) + // DynamicTemplate returns the name of dynamic template that applies to the metric and data point, + // so that the field is indexed into Elasticsearch with the correct mapping. The name should correspond to a + // dynamic template that is defined in ES mapping, e.g. + // https://github.com/elastic/elasticsearch/blob/8.15/x-pack/plugin/core/template-resources/src/main/resources/metrics%40mappings.json + dynamicTemplates["metrics."+metric.Name()] = dp.DynamicTemplate(metric) + } + _ = v.OnObjectFinished() + if docCount != 0 { + writeUIntField(v, "_doc_count", docCount) + } + return dynamicTemplates +} + +func serializeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, buf *bytes.Buffer) { + v := json.NewVisitor(buf) + // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. + // This is required to generate the correct dynamic mapping in ES. + v.SetExplicitRadixPoint(true) + _ = v.OnObjectStart(-1, structform.AnyType) + writeTimestampField(v, "@timestamp", spanEvent.Timestamp()) + writeDataStream(v, spanEvent.Attributes()) + writeTraceIDField(v, span.TraceID()) + writeSpanIDField(v, "span_id", span.SpanID()) + writeIntFieldSkipDefault(v, "dropped_attributes_count", int64(spanEvent.DroppedAttributesCount())) + writeStringFieldSkipDefault(v, "event_name", spanEvent.Name()) + + var attributes pcommon.Map + if spanEvent.Name() != "" { + attributes = pcommon.NewMap() + spanEvent.Attributes().CopyTo(attributes) + attributes.PutStr("event.name", spanEvent.Name()) + } else { + attributes = spanEvent.Attributes() + } + writeAttributes(v, attributes, false) + writeResource(v, resource, resourceSchemaURL, false) + writeScope(v, scope, scopeSchemaURL, false) + _ = v.OnObjectFinished() +} + +func serializeSpan(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, buf *bytes.Buffer) error { + v := json.NewVisitor(buf) + // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. + // This is required to generate the correct dynamic mapping in ES. + v.SetExplicitRadixPoint(true) + _ = v.OnObjectStart(-1, structform.AnyType) + writeTimestampField(v, "@timestamp", span.StartTimestamp()) + writeDataStream(v, span.Attributes()) + writeTraceIDField(v, span.TraceID()) + writeSpanIDField(v, "span_id", span.SpanID()) + writeStringFieldSkipDefault(v, "trace_state", span.TraceState().AsRaw()) + writeSpanIDField(v, "parent_span_id", span.ParentSpanID()) + writeStringFieldSkipDefault(v, "name", span.Name()) + writeStringFieldSkipDefault(v, "kind", span.Kind().String()) + writeUIntField(v, "duration", uint64(span.EndTimestamp()-span.StartTimestamp())) + writeAttributes(v, span.Attributes(), false) + writeIntFieldSkipDefault(v, "dropped_attributes_count", int64(span.DroppedAttributesCount())) + writeIntFieldSkipDefault(v, "dropped_events_count", int64(span.DroppedEventsCount())) + writeSpanLinks(v, span) + writeIntFieldSkipDefault(v, "dropped_links_count", int64(span.DroppedLinksCount())) + writeStatus(v, span.Status()) + writeResource(v, resource, resourceSchemaURL, false) + writeScope(v, scope, scopeSchemaURL, false) + _ = v.OnObjectFinished() + return nil +} + +func writeStatus(v *json.Visitor, status ptrace.Status) { + _ = v.OnKey("status") + _ = v.OnObjectStart(-1, structform.AnyType) + writeStringFieldSkipDefault(v, "message", status.Message()) + writeStringFieldSkipDefault(v, "code", status.Code().String()) + _ = v.OnObjectFinished() +} + +func writeSpanLinks(v *json.Visitor, span ptrace.Span) { + _ = v.OnKey("links") + _ = v.OnArrayStart(-1, structform.AnyType) + spanLinks := span.Links() + for i := 0; i < spanLinks.Len(); i++ { + spanLink := spanLinks.At(i) + _ = v.OnObjectStart(-1, structform.AnyType) + writeStringFieldSkipDefault(v, "trace_id", spanLink.TraceID().String()) + writeStringFieldSkipDefault(v, "span_id", spanLink.SpanID().String()) + writeStringFieldSkipDefault(v, "trace_state", spanLink.TraceState().AsRaw()) + writeAttributes(v, spanLink.Attributes(), false) + writeIntFieldSkipDefault(v, "dropped_attributes_count", int64(spanLink.DroppedAttributesCount())) + _ = v.OnObjectFinished() + } + _ = v.OnArrayFinished() +} + +func serializeMap(m pcommon.Map, buf *bytes.Buffer) { + v := json.NewVisitor(buf) + // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. + // This is required to generate the correct dynamic mapping in ES. + v.SetExplicitRadixPoint(true) + writeMap(v, m, false) +} + +func serializeLog(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, record plog.LogRecord, buf *bytes.Buffer) error { + v := json.NewVisitor(buf) + // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. + // This is required to generate the correct dynamic mapping in ES. + v.SetExplicitRadixPoint(true) + _ = v.OnObjectStart(-1, structform.AnyType) + docTimeStamp := record.Timestamp() + if docTimeStamp.AsTime().UnixNano() == 0 { + docTimeStamp = record.ObservedTimestamp() + } + writeTimestampField(v, "@timestamp", docTimeStamp) + writeTimestampField(v, "observed_timestamp", record.ObservedTimestamp()) + writeDataStream(v, record.Attributes()) + writeStringFieldSkipDefault(v, "severity_text", record.SeverityText()) + writeIntFieldSkipDefault(v, "severity_number", int64(record.SeverityNumber())) + writeTraceIDField(v, record.TraceID()) + writeSpanIDField(v, "span_id", record.SpanID()) + writeAttributes(v, record.Attributes(), false) + writeIntFieldSkipDefault(v, "dropped_attributes_count", int64(record.DroppedAttributesCount())) + isEvent := false + if record.EventName() != "" { + isEvent = true + writeStringFieldSkipDefault(v, "event_name", record.EventName()) + } else if eventNameAttr, ok := record.Attributes().Get("event.name"); ok && eventNameAttr.Str() != "" { + isEvent = true + writeStringFieldSkipDefault(v, "event_name", eventNameAttr.Str()) + } + writeResource(v, resource, resourceSchemaURL, false) + writeScope(v, scope, scopeSchemaURL, false) + writeLogBody(v, record, isEvent) + _ = v.OnObjectFinished() + return nil +} + +func writeDataStream(v *json.Visitor, attributes pcommon.Map) { + _ = v.OnKey("data_stream") + _ = v.OnObjectStart(-1, structform.AnyType) + attributes.Range(func(k string, val pcommon.Value) bool { + if strings.HasPrefix(k, "data_stream.") && val.Type() == pcommon.ValueTypeStr { + writeStringFieldSkipDefault(v, k[12:], val.Str()) + } + return true + }) + + _ = v.OnObjectFinished() +} + +func writeLogBody(v *json.Visitor, record plog.LogRecord, isEvent bool) { + if record.Body().Type() == pcommon.ValueTypeEmpty { + return + } + _ = v.OnKey("body") + _ = v.OnObjectStart(-1, structform.AnyType) + + // Determine if this log record is an event, as they are mapped differently + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/general/events.md + var bodyType string + if isEvent { + bodyType = "structured" + } else { + bodyType = "flattened" + } + body := record.Body() + switch body.Type() { + case pcommon.ValueTypeMap: + case pcommon.ValueTypeSlice: + // output must be an array of objects due to ES limitations + // otherwise, wrap the array in an object + s := body.Slice() + allMaps := true + for i := 0; i < s.Len(); i++ { + if s.At(i).Type() != pcommon.ValueTypeMap { + allMaps = false + } + } + + if !allMaps { + body = pcommon.NewValueMap() + m := body.SetEmptyMap() + record.Body().Slice().CopyTo(m.PutEmptySlice("value")) + } + default: + bodyType = "text" + } + _ = v.OnKey(bodyType) + writeValue(v, body, false) + _ = v.OnObjectFinished() +} + +func writeResource(v *json.Visitor, resource pcommon.Resource, resourceSchemaURL string, stringifyMapAttributes bool) { + _ = v.OnKey("resource") + _ = v.OnObjectStart(-1, structform.AnyType) + writeStringFieldSkipDefault(v, "schema_url", resourceSchemaURL) + writeAttributes(v, resource.Attributes(), stringifyMapAttributes) + writeIntFieldSkipDefault(v, "dropped_attributes_count", int64(resource.DroppedAttributesCount())) + _ = v.OnObjectFinished() +} + +func writeScope(v *json.Visitor, scope pcommon.InstrumentationScope, scopeSchemaURL string, stringifyMapAttributes bool) { + _ = v.OnKey("scope") + _ = v.OnObjectStart(-1, structform.AnyType) + writeStringFieldSkipDefault(v, "schema_url", scopeSchemaURL) + writeStringFieldSkipDefault(v, "name", scope.Name()) + writeStringFieldSkipDefault(v, "version", scope.Version()) + writeAttributes(v, scope.Attributes(), stringifyMapAttributes) + writeIntFieldSkipDefault(v, "dropped_attributes_count", int64(scope.DroppedAttributesCount())) + _ = v.OnObjectFinished() +} + +func writeAttributes(v *json.Visitor, attributes pcommon.Map, stringifyMapValues bool) { + if attributes.Len() == 0 { + return + } + + _ = v.OnKey("attributes") + _ = v.OnObjectStart(-1, structform.AnyType) + attributes.Range(func(k string, val pcommon.Value) bool { + switch k { + case dataStreamType, dataStreamDataset, dataStreamNamespace, mappingHintsAttrKey: + return true + } + if isGeoAttribute(k, val) { + return true + } + _ = v.OnKey(k) + writeValue(v, val, stringifyMapValues) + return true + }) + writeGeolocationAttributes(v, attributes) + _ = v.OnObjectFinished() +} + +func isGeoAttribute(k string, val pcommon.Value) bool { + if val.Type() != pcommon.ValueTypeDouble { + return false + } + switch k { + case "geo.location.lat", "geo.location.lon": + return true + } + return strings.HasSuffix(k, ".geo.location.lat") || strings.HasSuffix(k, ".geo.location.lon") +} + +func writeGeolocationAttributes(v *json.Visitor, attributes pcommon.Map) { + const ( + lonKey = "geo.location.lon" + latKey = "geo.location.lat" + mergedKey = "geo.location" + ) + // Prefix is the attribute name without lonKey or latKey suffix + // e.g. prefix of "foo.bar.geo.location.lon" is "foo.bar.", prefix of "geo.location.lon" is "". + prefixToGeo := make(map[string]struct { + lon, lat float64 + lonSet, latSet bool + }) + setLon := func(prefix string, v float64) { + g := prefixToGeo[prefix] + g.lon = v + g.lonSet = true + prefixToGeo[prefix] = g + } + setLat := func(prefix string, v float64) { + g := prefixToGeo[prefix] + g.lat = v + g.latSet = true + prefixToGeo[prefix] = g + } + attributes.Range(func(key string, val pcommon.Value) bool { + if val.Type() != pcommon.ValueTypeDouble { + return true + } + + if key == lonKey { + setLon("", val.Double()) + return true + } else if key == latKey { + setLat("", val.Double()) + return true + } else if namespace, found := strings.CutSuffix(key, "."+lonKey); found { + prefix := namespace + "." + setLon(prefix, val.Double()) + return true + } else if namespace, found := strings.CutSuffix(key, "."+latKey); found { + prefix := namespace + "." + setLat(prefix, val.Double()) + return true + } + return true + }) + + for prefix, geo := range prefixToGeo { + if geo.lonSet && geo.latSet { + key := prefix + mergedKey + // Geopoint expressed as an array with the format: [lon, lat] + _ = v.OnKey(key) + _ = v.OnArrayStart(-1, structform.AnyType) + _ = v.OnFloat64(geo.lon) + _ = v.OnFloat64(geo.lat) + _ = v.OnArrayFinished() + continue + } + // Place the attributes back if lon and lat are not present together + if geo.lonSet { + _ = v.OnKey(prefix + lonKey) + _ = v.OnFloat64(geo.lon) + } + if geo.latSet { + _ = v.OnKey(prefix + latKey) + _ = v.OnFloat64(geo.lat) + } + } +} + +func writeMap(v *json.Visitor, m pcommon.Map, stringifyMapValues bool) { + _ = v.OnObjectStart(-1, structform.AnyType) + m.Range(func(k string, val pcommon.Value) bool { + _ = v.OnKey(k) + writeValue(v, val, stringifyMapValues) + return true + }) + _ = v.OnObjectFinished() +} + +func writeValue(v *json.Visitor, val pcommon.Value, stringifyMaps bool) { + switch val.Type() { + case pcommon.ValueTypeEmpty: + _ = v.OnNil() + case pcommon.ValueTypeStr: + _ = v.OnString(val.Str()) + case pcommon.ValueTypeBool: + _ = v.OnBool(val.Bool()) + case pcommon.ValueTypeDouble: + _ = v.OnFloat64(val.Double()) + case pcommon.ValueTypeInt: + _ = v.OnInt64(val.Int()) + case pcommon.ValueTypeBytes: + _ = v.OnString(hex.EncodeToString(val.Bytes().AsRaw())) + case pcommon.ValueTypeMap: + if stringifyMaps { + _ = v.OnString(val.AsString()) + } else { + writeMap(v, val.Map(), false) + } + case pcommon.ValueTypeSlice: + _ = v.OnArrayStart(-1, structform.AnyType) + slice := val.Slice() + for i := 0; i < slice.Len(); i++ { + writeValue(v, slice.At(i), stringifyMaps) + } + _ = v.OnArrayFinished() + } +} + +func writeTimestampField(v *json.Visitor, key string, timestamp pcommon.Timestamp) { + _ = v.OnKey(key) + nsec := uint64(timestamp) + msec := nsec / 1e6 + nsec -= msec * 1e6 + _ = v.OnString(strconv.FormatUint(msec, 10) + "." + strconv.FormatUint(nsec, 10)) +} + +func writeUIntField(v *json.Visitor, key string, i uint64) { + _ = v.OnKey(key) + _ = v.OnUint64(i) +} + +func writeIntFieldSkipDefault(v *json.Visitor, key string, i int64) { + if i == 0 { + return + } + _ = v.OnKey(key) + _ = v.OnInt64(i) +} + +func writeStringFieldSkipDefault(v *json.Visitor, key, value string) { + if value == "" { + return + } + _ = v.OnKey(key) + _ = v.OnString(value) +} + +func writeTraceIDField(v *json.Visitor, id pcommon.TraceID) { + if id.IsEmpty() { + return + } + _ = v.OnKey("trace_id") + _ = v.OnString(hex.EncodeToString(id[:])) +} + +func writeSpanIDField(v *json.Visitor, key string, id pcommon.SpanID) { + if id.IsEmpty() { + return + } + _ = v.OnKey(key) + _ = v.OnString(hex.EncodeToString(id[:])) +} diff --git a/exporter/elasticsearchexporter/pdata_serializer_test.go b/exporter/elasticsearchexporter/pdata_serializer_test.go new file mode 100644 index 0000000000000..85ba952d140f0 --- /dev/null +++ b/exporter/elasticsearchexporter/pdata_serializer_test.go @@ -0,0 +1,251 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearchexporter + +import ( + "bytes" + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func TestSerializeLog(t *testing.T) { + tests := []struct { + name string + logCustomizer func(resource pcommon.Resource, scope pcommon.InstrumentationScope, record plog.LogRecord) + wantErr bool + expected any + }{ + {name: "test attributes", logCustomizer: func(resource pcommon.Resource, scope pcommon.InstrumentationScope, record plog.LogRecord) { + record.SetSeverityText("debug") + record.Attributes().PutEmpty("empty") + record.Attributes().PutStr("data_stream.type", "logs") + record.Attributes().PutStr("string", "foo") + record.Attributes().PutBool("bool", true) + record.Attributes().PutDouble("double", 42.0) + record.Attributes().PutInt("int", 42) + record.Attributes().PutEmptyBytes("bytes").Append(42) + _ = record.Attributes().PutEmptySlice("slice").FromRaw([]any{42, "foo"}) + record.Attributes().PutEmptySlice("map_slice").AppendEmpty().SetEmptyMap().PutStr("foo.bar", "baz") + mapAttr := record.Attributes().PutEmptyMap("map") + mapAttr.PutStr("foo.bar", "baz") + mapAttr.PutEmptySlice("inner.slice").AppendEmpty().SetStr("foo") + + resource.Attributes().PutEmptyMap("resource_map").PutStr("foo", "bar") + scope.Attributes().PutEmptyMap("scope_map").PutStr("foo", "bar") + }, wantErr: false, expected: map[string]any{ + "@timestamp": "0.0", + "observed_timestamp": "0.0", + "data_stream": map[string]any{ + "type": "logs", + }, + "severity_text": "debug", + "resource": map[string]any{ + "attributes": map[string]any{ + "resource_map": map[string]any{ + "foo": "bar", + }, + }, + }, + "scope": map[string]any{ + "attributes": map[string]any{ + "scope_map": map[string]any{ + "foo": "bar", + }, + }, + }, + "attributes": map[string]any{ + "empty": nil, + "string": "foo", + "bool": true, + "double": json.Number("42.0"), + "int": json.Number("42"), + "bytes": "2a", + "slice": []any{json.Number("42"), "foo"}, + "map_slice": []any{map[string]any{ + "foo.bar": "baz", + }}, + "map": map[string]any{ + "foo.bar": "baz", + "inner.slice": []any{"foo"}, + }, + }, + }}, + { + name: "text body", + logCustomizer: func(_ pcommon.Resource, _ pcommon.InstrumentationScope, record plog.LogRecord) { + record.Body().SetStr("foo") + }, + wantErr: false, + expected: map[string]any{ + "@timestamp": "0.0", + "observed_timestamp": "0.0", + "data_stream": map[string]any{}, + "resource": map[string]any{}, + "scope": map[string]any{}, + "body": map[string]any{ + "text": "foo", + }, + }, + }, + { + name: "map body", + logCustomizer: func(_ pcommon.Resource, _ pcommon.InstrumentationScope, record plog.LogRecord) { + record.Body().SetEmptyMap().PutStr("foo.bar", "baz") + }, + wantErr: false, + expected: map[string]any{ + "@timestamp": "0.0", + "observed_timestamp": "0.0", + "data_stream": map[string]any{}, + "resource": map[string]any{}, + "scope": map[string]any{}, + "body": map[string]any{ + "flattened": map[string]any{ + "foo.bar": "baz", + }, + }, + }, + }, + { + name: "geo attributes", + logCustomizer: func(_ pcommon.Resource, _ pcommon.InstrumentationScope, record plog.LogRecord) { + record.Attributes().PutDouble("geo.location.lon", 1.1) + record.Attributes().PutDouble("geo.location.lat", 2.2) + record.Attributes().PutDouble("foo.bar.geo.location.lon", 3.3) + record.Attributes().PutDouble("foo.bar.geo.location.lat", 4.4) + record.Attributes().PutDouble("a.geo.location.lon", 5.5) + record.Attributes().PutDouble("b.geo.location.lat", 6.6) + record.Attributes().PutDouble("unrelatedgeo.location.lon", 7.7) + record.Attributes().PutDouble("unrelatedgeo.location.lat", 8.8) + record.Attributes().PutDouble("d", 9.9) + record.Attributes().PutStr("e.geo.location.lon", "foo") + record.Attributes().PutStr("e.geo.location.lat", "bar") + }, + wantErr: false, + expected: map[string]any{ + "@timestamp": "0.0", + "observed_timestamp": "0.0", + "data_stream": map[string]any{}, + "resource": map[string]any{}, + "scope": map[string]any{}, + "attributes": map[string]any{ + "geo.location": []any{json.Number("1.1"), json.Number("2.2")}, + "foo.bar.geo.location": []any{json.Number("3.3"), json.Number("4.4")}, + "a.geo.location.lon": json.Number("5.5"), + "b.geo.location.lat": json.Number("6.6"), + "unrelatedgeo.location.lon": json.Number("7.7"), + "unrelatedgeo.location.lat": json.Number("8.8"), + "d": json.Number("9.9"), + "e.geo.location.lon": "foo", + "e.geo.location.lat": "bar", + }, + }, + }, + { + name: "event_name takes precedent over attributes.event.name", + logCustomizer: func(_ pcommon.Resource, _ pcommon.InstrumentationScope, record plog.LogRecord) { + record.Attributes().PutStr("event.name", "foo") + record.SetEventName("bar") + }, + wantErr: false, + expected: map[string]any{ + "@timestamp": "0.0", + "observed_timestamp": "0.0", + "event_name": "bar", + "data_stream": map[string]any{}, + "resource": map[string]any{}, + "scope": map[string]any{}, + "attributes": map[string]any{ + "event.name": "foo", + }, + }, + }, + { + name: "timestamp", + logCustomizer: func(_ pcommon.Resource, _ pcommon.InstrumentationScope, record plog.LogRecord) { + record.SetTimestamp(1721314113467654123) + }, + wantErr: false, + expected: map[string]any{ + "@timestamp": "1721314113467.654123", + "observed_timestamp": "0.0", + "data_stream": map[string]any{}, + "resource": map[string]any{}, + "scope": map[string]any{}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resourceLogs := plog.NewResourceLogs() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + record := scopeLogs.LogRecords().AppendEmpty() + tt.logCustomizer(resourceLogs.Resource(), scopeLogs.Scope(), record) + + var buf bytes.Buffer + err := serializeLog(resourceLogs.Resource(), "", scopeLogs.Scope(), "", record, &buf) + if (err != nil) != tt.wantErr { + t.Errorf("serializeLog() error = %v, wantErr %v", err, tt.wantErr) + } + logBytes := buf.Bytes() + eventAsJSON := string(logBytes) + var result any + decoder := json.NewDecoder(bytes.NewBuffer(logBytes)) + decoder.UseNumber() + if err := decoder.Decode(&result); err != nil { + t.Error(err) + } + + assert.Equal(t, tt.expected, result, eventAsJSON) + }) + } +} + +func TestSerializeMetricsConflict(t *testing.T) { + resourceMetrics := pmetric.NewResourceMetrics() + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + var dataPoints []dataPoint + metric1 := scopeMetrics.Metrics().AppendEmpty() + metric2 := scopeMetrics.Metrics().AppendEmpty() + for _, m := range []pmetric.Metric{metric1, metric2} { + m.SetName("foo") + dp := m.SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetIntValue(42) + dataPoints = append(dataPoints, newNumberDataPoint(m, dp)) + } + + var validationErrors []error + var buf bytes.Buffer + _, err := serializeMetrics(resourceMetrics.Resource(), "", scopeMetrics.Scope(), "", dataPoints, &validationErrors, &buf) + if err != nil { + t.Errorf("serializeMetrics() error = %v", err) + } + b := buf.Bytes() + eventAsJSON := string(b) + var result any + decoder := json.NewDecoder(bytes.NewBuffer(b)) + decoder.UseNumber() + if err := decoder.Decode(&result); err != nil { + t.Error(err) + } + + assert.Len(t, validationErrors, 1) + assert.Equal(t, fmt.Errorf("metric with name 'foo' has already been serialized in document with timestamp 1970-01-01T00:00:00.000000000Z"), validationErrors[0]) + + assert.Equal(t, map[string]any{ + "@timestamp": "0.0", + "data_stream": map[string]any{}, + "resource": map[string]any{}, + "scope": map[string]any{}, + "metrics": map[string]any{ + "foo": json.Number("42"), + }, + }, result, eventAsJSON) +}