diff --git a/.chloggen/elasticsearchexporter_sanitize-datastream-fields.yaml b/.chloggen/elasticsearchexporter_sanitize-datastream-fields.yaml new file mode 100644 index 000000000000..92ea2b187712 --- /dev/null +++ b/.chloggen/elasticsearchexporter_sanitize-datastream-fields.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Sanitize datastream routing fields + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34285] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + Sanitize the dataset and namespace fields according to https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 1f7c9f5fb9c5..13564bbd012a 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -121,7 +121,7 @@ This can be customised through the following settings: - `logs_dynamic_index` (optional): uses resource, scope, or log record attributes to dynamically construct index name. - `enabled`(default=false): Enable/Disable dynamic index for log records. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: log record attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `logs-generic-default`, and `logs_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > log record attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `logs-generic-default`, and `logs_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields, see restrictions applied to [Data Stream Fields](https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html). - `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`. ⚠️ Note that metrics support is currently in development. @@ -129,13 +129,13 @@ This can be customised through the following settings: - `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name. ⚠️ Note that metrics support is currently in development. - `enabled`(default=true): Enable/disable dynamic index for metrics. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: data point attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `metrics-generic-default`, and `metrics_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > data point attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `metrics-generic-default`, and `metrics_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields, see restrictions applied to [Data Stream Fields](https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html). - `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`. - `traces_dynamic_index` (optional): uses resource, scope, or span attributes to dynamically construct index name. - `enabled`(default=false): Enable/Disable dynamic index for trace spans. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: span attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. There is an exception for span events under OTel mapping mode (`mapping::mode: otel`), where span event attributes instead of span attributes are considered, and `data_stream.type` is always `logs` instead of `traces` such that documents are routed to `logs-${data_stream.dataset}-${data_stream.namespace}`. + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, if scope name matches regex `/receiver/(\w*receiver)`, `data_stream.dataset` will be capture group #1. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields, see restrictions applied to [Data Stream Fields](https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html). There is an exception for span events under OTel mapping mode (`mapping::mode: otel`), where span event attributes instead of span attributes are considered, and `data_stream.type` is always `logs` instead of `traces` such that documents are routed to `logs-${data_stream.dataset}-${data_stream.namespace}`. - `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format. - `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix, @@ -347,4 +347,4 @@ When sending high traffic of metrics to a TSDB metrics data stream, e.g. using O This will be fixed in a future version of Elasticsearch. A possible workaround would be to use a transform processor to truncate the timestamp, but this will cause duplicate data to be dropped silently. -However, if `@timestamp` precision is not the problem, check your metrics pipeline setup for misconfiguration that causes an actual violation of the [single writer principle](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#single-writer). +However, if `@timestamp` precision is not the problem, check your metrics pipeline setup for misconfiguration that causes an actual violation of the [single writer principle](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#single-writer). \ No newline at end of file diff --git a/exporter/elasticsearchexporter/data_stream_router.go b/exporter/elasticsearchexporter/data_stream_router.go index df9b17c6cc6e..a64b15d0ad97 100644 --- a/exporter/elasticsearchexporter/data_stream_router.go +++ b/exporter/elasticsearchexporter/data_stream_router.go @@ -6,12 +6,39 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry import ( "fmt" "regexp" + "strings" + "unicode" "go.opentelemetry.io/collector/pdata/pcommon" ) var receiverRegex = regexp.MustCompile(`/receiver/(\w*receiver)`) +const ( + maxDataStreamBytes = 100 + disallowedNamespaceRunes = "\\/*?\"<>| ,#:" + disallowedDatasetRunes = "-\\/*?\"<>| ,#:" +) + +// Sanitize the datastream fields (dataset, namespace) to apply restrictions +// as outlined in https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html +// The suffix will be appended after truncation of max bytes. +func sanitizeDataStreamField(field, disallowed, appendSuffix string) string { + field = strings.Map(func(r rune) rune { + if strings.ContainsRune(disallowed, r) { + return '_' + } + return unicode.ToLower(r) + }, field) + + if len(field) > maxDataStreamBytes-len(appendSuffix) { + field = field[:maxDataStreamBytes-len(appendSuffix)] + } + field += appendSuffix + + return field +} + func routeWithDefaults(defaultDSType string) func( pcommon.Map, pcommon.Map, @@ -53,15 +80,20 @@ func routeWithDefaults(defaultDSType string) func( dataset = receiverName } - // The naming convention for datastream is expected to be "logs-[dataset].otel-[namespace]". + // For dataset, the naming convention for datastream is expected to be "logs-[dataset].otel-[namespace]". // This is in order to match the built-in logs-*.otel-* index template. + var datasetSuffix string if otel { - dataset += ".otel" + datasetSuffix += ".otel" } + dataset = sanitizeDataStreamField(dataset, disallowedDatasetRunes, datasetSuffix) + namespace = sanitizeDataStreamField(namespace, disallowedNamespaceRunes, "") + recordAttr.PutStr(dataStreamDataset, dataset) recordAttr.PutStr(dataStreamNamespace, namespace) recordAttr.PutStr(dataStreamType, defaultDSType) + return fmt.Sprintf("%s-%s-%s", defaultDSType, dataset, namespace) } } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 06d02e0272c7..13de09564d32 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -215,7 +215,8 @@ func TestExporterLogs(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - assert.Equal(t, "logs-record.dataset-resource.namespace", actionJSONToIndex(t, docs[0].Action)) + expected := "logs-record.dataset.____________-resource.namespace.-____________" + assert.Equal(t, expected, actionJSONToIndex(t, docs[0].Action)) return itemsAllOK(docs) }) @@ -225,12 +226,12 @@ func TestExporterLogs(t *testing.T) { }) logs := newLogsWithAttributes( map[string]any{ - dataStreamDataset: "record.dataset", + dataStreamDataset: "record.dataset.\\/*?\"<>| ,#:", }, nil, map[string]any{ dataStreamDataset: "resource.dataset", - dataStreamNamespace: "resource.namespace", + dataStreamNamespace: "resource.namespace.-\\/*?\"<>| ,#:", }, ) logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") @@ -647,7 +648,7 @@ func TestExporterMetrics(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - expected := "metrics-resource.dataset-data.point.namespace" + expected := "metrics-resource.dataset.____________-data.point.namespace.-____________" assert.Equal(t, expected, actionJSONToIndex(t, docs[0].Action)) return itemsAllOK(docs) @@ -659,11 +660,11 @@ func TestExporterMetrics(t *testing.T) { }) metrics := newMetricsWithAttributes( map[string]any{ - dataStreamNamespace: "data.point.namespace", + dataStreamNamespace: "data.point.namespace.-\\/*?\"<>| ,#:", }, nil, map[string]any{ - dataStreamDataset: "resource.dataset", + dataStreamDataset: "resource.dataset.\\/*?\"<>| ,#:", dataStreamNamespace: "resource.namespace", }, ) @@ -1287,7 +1288,7 @@ func TestExporterTraces(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - expected := "traces-span.dataset-default" + expected := "traces-span.dataset.____________-default" assert.Equal(t, expected, actionJSONToIndex(t, docs[0].Action)) return itemsAllOK(docs) @@ -1299,7 +1300,7 @@ func TestExporterTraces(t *testing.T) { mustSendTraces(t, exporter, newTracesWithAttributes( map[string]any{ - dataStreamDataset: "span.dataset", + dataStreamDataset: "span.dataset.\\/*?\"<>| ,#:", }, nil, map[string]any{ diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index ec17db600f78..2bc34e9a24cf 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -960,6 +960,9 @@ func decodeOTelID(data []byte) ([]byte, error) { } func TestEncodeLogOtelMode(t *testing.T) { + randomString := strings.Repeat("abcdefghijklmnopqrstuvwxyz0123456789", 10) + maxLenNamespace := maxDataStreamBytes - len(disallowedNamespaceRunes) + maxLenDataset := maxDataStreamBytes - len(disallowedDatasetRunes) - len(".otel") tests := []struct { name string @@ -1044,6 +1047,20 @@ func TestEncodeLogOtelMode(t *testing.T) { return assignDatastreamData(or, "", "third.otel") }, }, + { + name: "sanitize dataset/namespace", + rec: buildOTelRecordTestData(t, func(or OTelRecord) OTelRecord { + or.Attributes["data_stream.dataset"] = disallowedDatasetRunes + randomString + or.Attributes["data_stream.namespace"] = disallowedNamespaceRunes + randomString + return or + }), + wantFn: func(or OTelRecord) OTelRecord { + deleteDatasetAttributes(or) + ds := strings.Repeat("_", len(disallowedDatasetRunes)) + randomString[:maxLenDataset] + ".otel" + ns := strings.Repeat("_", len(disallowedNamespaceRunes)) + randomString[:maxLenNamespace] + return assignDatastreamData(or, "", ds, ns) + }, + }, } m := encodeModel{