From e7ebc6e1676aa661880a09b0ff93a9cccad8f011 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 17 Oct 2024 05:47:36 -0300 Subject: [PATCH] [exporter/elasticsearch] Add mapping mode bodymap (#35637) #### Description This PR implements a new mapping mode `bodymap` that works by serializing each LogRecord body as-is into a separate document for ingestion. Fixes #35444 #### Testing #### Documentation --------- Co-authored-by: Carson Ip --- ...feature_elasticsearch_mapping_bodymap.yaml | 27 ++++ exporter/elasticsearchexporter/README.md | 9 +- exporter/elasticsearchexporter/config.go | 4 + exporter/elasticsearchexporter/exporter.go | 5 + .../elasticsearchexporter/exporter_test.go | 125 ++++++++++++++++++ exporter/elasticsearchexporter/go.mod | 2 +- exporter/elasticsearchexporter/model.go | 14 ++ exporter/elasticsearchexporter/model_test.go | 40 ++++++ 8 files changed, 223 insertions(+), 3 deletions(-) create mode 100644 .chloggen/feature_elasticsearch_mapping_bodymap.yaml diff --git a/.chloggen/feature_elasticsearch_mapping_bodymap.yaml b/.chloggen/feature_elasticsearch_mapping_bodymap.yaml new file mode 100644 index 000000000000..9d749c7b3c45 --- /dev/null +++ b/.chloggen/feature_elasticsearch_mapping_bodymap.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Introduce an experimental bodymap mapping mode for logs + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35444] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 13564bbd012a..5ec203f13674 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -155,14 +155,19 @@ behaviours, which may be configured through the following settings: - `none`: Use original fields and event structure from the OTLP event. - `ecs`: Try to map fields to [Elastic Common Schema (ECS)][ECS] - `otel`: Elastic's preferred "OTel-native" mapping mode. Uses original fields and event structure from the OTLP event. - - :warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes. + - :warning: This mode's behavior is unstable, it is currently experimental and undergoing changes. - There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields. - `data_stream.dataset` will always be appended with `.otel`. It is recommended to use with `*_dynamic_index.enabled: true` to route documents to data stream `${data_stream.type}-${data_stream.dataset}-${data_stream.namespace}`. - Span events are stored in separate documents. They will be routed with `data_stream.type` set to `logs` if `traces_dynamic_index::enabled` is `true`. - `raw`: Omit the `Attributes.` string prefixed to field names for log and span attributes as well as omit the `Events.` string prefixed to - field names for span events. + field names for span events. + - `bodymap`: Provides fine-grained control over the final documents to be ingested. + :warning: This mode's behavior is unstable, it is currently experimental and undergoing changes. + It works only for logs where the log record body is a map. Each LogRecord + body is serialized to JSON as-is and becomes a separate document for ingestion. + If the log record body is not a map, the exporter will log a warning and drop the log record. - `dedup` (DEPRECATED). This configuration is deprecated and non-operational, and will be removed in the future. Object keys are always deduplicated to avoid Elasticsearch rejecting documents. diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 547ff693894a..072bd725c6fe 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -207,6 +207,7 @@ const ( MappingECS MappingOTel MappingRaw + MappingBodyMap ) var ( @@ -224,6 +225,8 @@ func (m MappingMode) String() string { return "otel" case MappingRaw: return "raw" + case MappingBodyMap: + return "bodymap" default: return "" } @@ -236,6 +239,7 @@ var mappingModes = func() map[string]MappingMode { MappingECS, MappingOTel, MappingRaw, + MappingBodyMap, } { table[strings.ToLower(m.String())] = m } diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index b7e00b713cd8..ebd3800858a2 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -131,6 +131,11 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) return cerr } + if errors.Is(err, ErrInvalidTypeForBodyMapMode) { + e.Logger.Warn("dropping log record", zap.Error(err)) + continue + } + errs = append(errs, err) } } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 13de09564d32..e2871666b138 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -81,6 +81,131 @@ func TestExporterLogs(t *testing.T) { rec.WaitItems(1) }) + t.Run("publish with bodymap encoding", func(t *testing.T) { + tableTests := []struct { + name string + body func() pcommon.Value + expected string + }{ + { + name: "flat", + body: func() pcommon.Value { + body := pcommon.NewValueMap() + m := body.Map() + m.PutStr("@timestamp", "2024-03-12T20:00:41.123456789Z") + m.PutInt("id", 1) + m.PutStr("key", "value") + return body + }, + expected: `{"@timestamp":"2024-03-12T20:00:41.123456789Z","id":1,"key":"value"}`, + }, + { + name: "dotted key", + body: func() pcommon.Value { + body := pcommon.NewValueMap() + m := body.Map() + m.PutInt("a", 1) + m.PutInt("a.b", 2) + m.PutInt("a.b.c", 3) + return body + }, + expected: `{"a":1,"a.b":2,"a.b.c":3}`, + }, + { + name: "slice", + body: func() pcommon.Value { + body := pcommon.NewValueMap() + m := body.Map() + s := m.PutEmptySlice("a") + for i := 0; i < 2; i++ { + s.AppendEmpty().SetInt(int64(i)) + } + return body + }, + expected: `{"a":[0,1]}`, + }, + { + name: "inner map", + body: func() pcommon.Value { + body := pcommon.NewValueMap() + m := body.Map() + m1 := m.PutEmptyMap("a") + m1.PutInt("b", 1) + m1.PutInt("c", 2) + return body + }, + expected: `{"a":{"b":1,"c":2}}`, + }, + { + name: "nested map", + body: func() pcommon.Value { + body := pcommon.NewValueMap() + m := body.Map() + m1 := m.PutEmptyMap("a") + m2 := m1.PutEmptyMap("b") + m2.PutInt("c", 1) + m2.PutInt("d", 2) + return body + }, + expected: `{"a":{"b":{"c":1,"d":2}}}`, + }, + } + + for _, tt := range tableTests { + t.Run(tt.name, func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + assert.JSONEq(t, tt.expected, string(docs[0].Document)) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "bodymap" + }) + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + logRecords := scopeLogs.LogRecords() + logRecord := logRecords.AppendEmpty() + tt.body().CopyTo(logRecord.Body()) + + mustSendLogs(t, exporter, logs) + rec.WaitItems(1) + }) + } + }) + + t.Run("drops log records for bodymap mode if body is not a map", func(t *testing.T) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + logRecords := scopeLogs.LogRecords() + + // Invalid body type should be dropped. + logRecords.AppendEmpty().Body().SetEmptySlice() + + // We should still process the valid records in the batch. + bodyMap := logRecords.AppendEmpty().Body().SetEmptyMap() + bodyMap.PutInt("a", 42) + + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + defer rec.Record(docs) + assert.Len(t, docs, 1) + assert.JSONEq(t, `{"a":42}`, string(docs[0].Document)) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "bodymap" + }) + + err := exporter.ConsumeLogs(context.Background(), logs) + assert.NoError(t, err) + rec.WaitItems(1) + }) + t.Run("publish with dedot", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index 3f1046727987..c9a675710622 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -7,6 +7,7 @@ require ( github.com/elastic/go-docappender/v2 v2.3.0 github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/elastic/go-structform v0.0.12 + github.com/json-iterator/go v1.1.12 github.com/lestrrat-go/strftime v1.1.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.111.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0 @@ -43,7 +44,6 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.10 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 4bf95be05f3d..434bb1090a93 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -15,6 +15,7 @@ import ( "slices" "time" + jsoniter "github.com/json-iterator/go" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -65,6 +66,8 @@ var resourceAttrsToPreserve = map[string]bool{ semconv.AttributeHostName: true, } +var ErrInvalidTypeForBodyMapMode = errors.New("invalid log record body type for 'bodymap' mapping mode") + type mappingModel interface { encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error) encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string) ([]byte, error) @@ -107,6 +110,8 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL str document = m.encodeLogECSMode(resource, record, scope) case MappingOTel: document = m.encodeLogOTelMode(resource, resourceSchemaURL, record, scope, scopeSchemaURL) + case MappingBodyMap: + return m.encodeLogBodyMapMode(record) default: document = m.encodeLogDefaultMode(resource, record, scope) } @@ -138,6 +143,15 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo return document } +func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord) ([]byte, error) { + body := record.Body() + if body.Type() != pcommon.ValueTypeMap { + return nil, fmt.Errorf("%w: %q", ErrInvalidTypeForBodyMapMode, body.Type()) + } + + return jsoniter.Marshal(body.Map().AsRaw()) +} + func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document { var document objmodel.Document diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 2bc34e9a24cf..136039cf28ea 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -1225,3 +1225,43 @@ func TestEncodeLogScalarObjectConflict(t *testing.T) { fooValue = gjson.GetBytes(encoded, "Attributes\\.foo\\.value") assert.Equal(t, "foovalue", fooValue.Str) } + +func TestEncodeLogBodyMapMode(t *testing.T) { + // craft a log record with a body map + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + logRecords := scopeLogs.LogRecords() + observedTimestamp := pcommon.Timestamp(time.Now().UnixNano()) + + logRecord := logRecords.AppendEmpty() + logRecord.SetObservedTimestamp(observedTimestamp) + + bodyMap := pcommon.NewMap() + bodyMap.PutStr("@timestamp", "2024-03-12T20:00:41.123456789Z") + bodyMap.PutInt("id", 1) + bodyMap.PutStr("key", "value") + bodyMap.PutStr("key.a", "a") + bodyMap.PutStr("key.a.b", "b") + bodyMap.PutDouble("pi", 3.14) + bodyMap.CopyTo(logRecord.Body().SetEmptyMap()) + + m := encodeModel{} + got, err := m.encodeLogBodyMapMode(logRecord) + require.NoError(t, err) + + require.JSONEq(t, `{ + "@timestamp": "2024-03-12T20:00:41.123456789Z", + "id": 1, + "key": "value", + "key.a": "a", + "key.a.b": "b", + "pi": 3.14 + }`, string(got)) + + // invalid body map + logRecord.Body().SetEmptySlice() + _, err = m.encodeLogBodyMapMode(logRecord) + require.Error(t, err) + require.ErrorIs(t, err, ErrInvalidTypeForBodyMapMode) +}