Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[elasticsearchexporter]: Add dynamic document id support for logs #37065

Merged
merged 24 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1c3b656
Add support for setting a document id
mauri870 Jan 7, 2025
7128be6
run tests in parallel, test with sync and async bulk indexer
mauri870 Jan 7, 2025
c277b55
add changelog entry
mauri870 Jan 7, 2025
89f19d3
remove todo for metrics and traces
mauri870 Jan 7, 2025
bebd2e5
Merge branch 'main' into elasticsearchexporter-id
mauri870 Jan 7, 2025
dc25057
only look into record attributes
mauri870 Jan 8, 2025
7f2557c
fixes from code review
mauri870 Jan 9, 2025
1080790
clarify about empty string value
mauri870 Jan 9, 2025
9e0ad2f
remove id attribute from the final document
mauri870 Jan 9, 2025
d2349db
Merge branch 'main' into elasticsearchexporter-id
mauri870 Jan 9, 2025
b3810dd
Apply suggestions from code review
mauri870 Jan 10, 2025
47be731
mention that the document id is removed from the log record
mauri870 Jan 10, 2025
d1be590
Update exporter/elasticsearchexporter/config.go
mauri870 Jan 10, 2025
08330da
fixes for the recent changes in config
mauri870 Jan 10, 2025
f60d701
Use RemoveIf
mauri870 Jan 10, 2025
aef629c
avoid conditionally setting the doc id
mauri870 Jan 16, 2025
490e747
Merge remote-tracking branch 'origin/main' into elasticsearchexporter-id
mauri870 Jan 16, 2025
d7e0035
Merge remote-tracking branch 'upstream/main' into elasticsearchexport…
mauri870 Jan 20, 2025
b420003
Merge branch 'main' into elasticsearchexporter-id
ChrsMark Jan 21, 2025
8a44f05
Merge remote-tracking branch 'upstream/main' into elasticsearchexport…
mauri870 Jan 22, 2025
0a34926
update logic to ignore doc id attribute
mauri870 Jan 22, 2025
e9a12a1
Merge remote-tracking branch 'upstream/main' into elasticsearchexport…
mauri870 Jan 22, 2025
721b190
Merge branch 'main' into elasticsearchexporter-id
mauri870 Jan 23, 2025
3c14cb1
Merge branch 'main' into elasticsearchexporter-id
mauri870 Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_logs_dynamic_id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add config `logs_dynamic_id` to dynamically set the document ID of log records using log record attribute `elasticsearch.document_id`

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36882]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ This can be customised through the following settings:
- `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date.
- `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name.

- `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on a log record attribute.
- `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists and is not an empty string in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. The attribute `elasticsearch.document_id` is removed from the final document.
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved

### Elasticsearch document mapping

The Elasticsearch exporter supports several document schemas and preprocessing
Expand Down
10 changes: 6 additions & 4 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type bulkIndexer interface {

type bulkIndexerSession interface {
// Add adds a document to the bulk indexing session.
Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error
Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error

// End must be called on the session object once it is no longer
// needed, in order to release any associated resources.
Expand Down Expand Up @@ -126,8 +126,9 @@ type syncBulkIndexerSession struct {
}

// Add adds an item to the sync bulk indexer session.
func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error {
err := s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates})
func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error {
doc := docappender.BulkIndexerItem{Index: index, Body: document, DocumentID: docID, DynamicTemplates: dynamicTemplates}
err := s.bi.Add(doc)
if err != nil {
return err
}
Expand Down Expand Up @@ -248,10 +249,11 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error {
// Add adds an item to the async bulk indexer session.
//
// Adding an item after a call to Close() will panic.
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error {
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error {
item := docappender.BulkIndexerItem{
Index: index,
Body: document,
DocumentID: docID,
DynamicTemplates: dynamicTemplates,
}
select {
Expand Down
8 changes: 4 additions & 4 deletions exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestAsyncBulkIndexer_flush(t *testing.T) {
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
Expand Down Expand Up @@ -312,7 +312,7 @@ func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Clie
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, bulkIndexer.Close(context.Background()))

return bulkIndexer
Expand All @@ -338,7 +338,7 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) {
session, err := bi.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil))
assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes
assert.NoError(t, bi.Close(context.Background()))
}
7 changes: 7 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Config struct {
// fall back to pure TracesIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute)
TracesDynamicIndex DynamicIndexSetting `mapstructure:"traces_dynamic_index"`

// LogsDynamicID configures whether log record attribute `elasticsearch.document_id` is set as the document ID in ES.
LogsDynamicID DynamicIDSettings `mapstructure:"logs_dynamic_id"`

// Pipeline configures the ingest node pipeline name that should be used to process the
// events.
//
Expand Down Expand Up @@ -112,6 +115,10 @@ type DynamicIndexSetting struct {
Enabled bool `mapstructure:"enabled"`
}

type DynamicIDSettings struct {
Enabled bool `mapstructure:"enabled"`
}

// AuthenticationSettings defines user authentication related settings.
type AuthenticationSettings struct {
// User is used to configure HTTP Basic Authentication.
Expand Down
9 changes: 9 additions & 0 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func TestConfig(t *testing.T) {
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
LogsDynamicID: DynamicIDSettings{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) {
cfg.Timeout = 2 * time.Minute
Expand Down Expand Up @@ -144,6 +147,9 @@ func TestConfig(t *testing.T) {
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
LogsDynamicID: DynamicIDSettings{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) {
cfg.Timeout = 2 * time.Minute
Expand Down Expand Up @@ -215,6 +221,9 @@ func TestConfig(t *testing.T) {
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
LogsDynamicID: DynamicIDSettings{
Enabled: false,
},
Pipeline: "mypipeline",
ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) {
cfg.Timeout = 2 * time.Minute
Expand Down
29 changes: 25 additions & 4 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool"
)

const (
// documentIDAttributeName is the attribute name used to specify the document ID.
documentIDAttributeName = "elasticsearch.document_id"
)
mauri870 marked this conversation as resolved.
Show resolved Hide resolved

type elasticsearchExporter struct {
component.TelemetrySettings
userAgent string
Expand Down Expand Up @@ -176,13 +181,15 @@ func (e *elasticsearchExporter) pushLogRecord(
}

buf := e.bufferPool.NewPooledBuffer()
docID := e.extractDocumentIDAttribute(record.Attributes())
err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL, buf.Buffer)
if err != nil {
buf.Recycle()
return fmt.Errorf("failed to encode log event: %w", err)
}

// not recycling after Add returns an error as we don't know if it's already recycled
return bulkIndexerSession.Add(ctx, fIndex, buf, nil)
return bulkIndexerSession.Add(ctx, fIndex, docID, buf, nil)
}

func (e *elasticsearchExporter) pushMetricsData(
Expand Down Expand Up @@ -299,7 +306,7 @@ func (e *elasticsearchExporter) pushMetricsData(
errs = append(errs, err)
continue
}
if err := session.Add(ctx, fIndex, buf, dynamicTemplates); err != nil {
if err := session.Add(ctx, fIndex, "", buf, dynamicTemplates); err != nil {
// not recycling after Add returns an error as we don't know if it's already recycled
if cerr := ctx.Err(); cerr != nil {
return cerr
Expand Down Expand Up @@ -422,7 +429,7 @@ func (e *elasticsearchExporter) pushTraceRecord(
return fmt.Errorf("failed to encode trace record: %w", err)
}
// not recycling after Add returns an error as we don't know if it's already recycled
return bulkIndexerSession.Add(ctx, fIndex, buf, nil)
return bulkIndexerSession.Add(ctx, fIndex, "", buf, nil)
}

func (e *elasticsearchExporter) pushSpanEvent(
Expand Down Expand Up @@ -454,5 +461,19 @@ func (e *elasticsearchExporter) pushSpanEvent(
return nil
}
// not recycling after Add returns an error as we don't know if it's already recycled
return bulkIndexerSession.Add(ctx, fIndex, buf, nil)
return bulkIndexerSession.Add(ctx, fIndex, "", buf, nil)
}

func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) (docID string) {
if !e.config.LogsDynamicID.Enabled {
return
}
m.RemoveIf(func(k string, value pcommon.Value) bool {
if k == documentIDAttributeName {
docID = value.AsString()
return true
}
return false
})
return
}
87 changes: 87 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,82 @@ func TestExporterLogs(t *testing.T) {
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})

t.Run("publish logs with dynamic id", func(t *testing.T) {
t.Parallel()
exampleDocID := "abc123"
tableTests := []struct {
name string
expectedDocID string // "" means the _id will not be set
recordAttrs map[string]any
}{
{
name: "missing document id attribute should not set _id",
expectedDocID: "",
},
{
name: "empty document id attribute should not set _id",
expectedDocID: "",
recordAttrs: map[string]any{
documentIDAttributeName: "",
},
},
{
name: "record attributes",
expectedDocID: exampleDocID,
recordAttrs: map[string]any{
documentIDAttributeName: exampleDocID,
},
},
}

cfgs := map[string]func(*Config){
"async": func(cfg *Config) {
batcherEnabled := false
cfg.Batcher.Enabled = &batcherEnabled
},
"sync": func(cfg *Config) {
batcherEnabled := true
cfg.Batcher.Enabled = &batcherEnabled
cfg.Batcher.FlushTimeout = 10 * time.Millisecond
},
}
for _, tt := range tableTests {
for cfgName, cfgFn := range cfgs {
t.Run(tt.name+"/"+cfgName, func(t *testing.T) {
t.Parallel()
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)

if tt.expectedDocID == "" {
assert.NotContains(t, string(docs[0].Action), "_id", "expected _id to not be set")
} else {
assert.Equal(t, tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set")
}

// Ensure the document id attribute is removed from the final document.
assert.NotContains(t, docs[0].Document, documentIDAttributeName, "expected document id attribute to be removed")
return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
cfg.LogsDynamicID.Enabled = true
cfgFn(cfg)
})
logs := newLogsWithAttributes(
tt.recordAttrs,
map[string]any{},
map[string]any{},
)
logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world")
mustSendLogs(t, exporter, logs)

rec.WaitItems(1)
})
}
}
})
t.Run("otel mode attribute complex value", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
Expand Down Expand Up @@ -1940,3 +2016,14 @@ func actionJSONToIndex(t *testing.T, actionJSON json.RawMessage) string {
require.NoError(t, err)
return action.Create.Index
}

func actionJSONToID(t *testing.T, actionJSON json.RawMessage) string {
action := struct {
Create struct {
ID string `json:"_id"`
} `json:"create"`
}{}
err := json.Unmarshal(actionJSON, &action)
require.NoError(t, err)
return action.Create.ID
}
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func createDefaultConfig() component.Config {
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
},
LogsDynamicID: DynamicIDSettings{
Enabled: false,
},
Retry: RetrySettings{
Enabled: true,
MaxRetries: 0, // default is set in exporter code
Expand Down
Loading