Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add index_types for OTEL logs and metrics #3148 #3929

Merged
merged 20 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pipeline:

The OpenSearch sink will reserve `otel-v1-apm-span-*` as index pattern and `otel-v1-apm-span` as index alias for record ingestion.

### </a>Service map trace analytics
### Service map trace analytics

```
pipeline:
Expand All @@ -45,6 +45,45 @@ pipeline:

The OpenSearch sink will reserve `otel-v1-apm-service-map` as index for record ingestion.

### Log analytics

```
pipeline:
...
sink:
opensearch:
hosts: ["https://localhost:9200"]
cert: path/to/cert
username: YOUR_USERNAME_HERE
password: YOUR_PASSWORD_HERE
index_type: log-analytics
dlq_file: /your/local/dlq-file
max_retries: 20
bulk_size: 4
```

The OpenSearch sink will reserve `logs-otel-v1-*` as index pattern and `logs-otel-v1` as index alias for record ingestion.

### Metric analytics

```
pipeline:
...
sink:
opensearch:
hosts: ["https://localhost:9200"]
cert: path/to/cert
username: YOUR_USERNAME_HERE
password: YOUR_PASSWORD_HERE
index_type: metric-analytics
dlq_file: /your/local/dlq-file
max_retries: 20
bulk_size: 4
```

The OpenSearch sink will reserve `metrics-otel-v1-*` as index pattern and `metrics-otel-v1` as index alias for record ingestion.


### Amazon OpenSearch Service

The OpenSearch sink can also be configured for an Amazon OpenSearch Service domain. See [security](security.md) for details.
Expand Down Expand Up @@ -93,7 +132,7 @@ Default is null.

- `proxy`(optional): A String of the address of a forward HTTP proxy. The format is like "<host-name-or-ip>:\<port\>". Examples: "example.com:8100", "http://example.com:8100", "112.112.112.112:8100". Note: port number cannot be omitted.

- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.
- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `metric-analytics`, `log-analytics`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.

- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. For `distribution_version` set to `es6`, default value is `false`, otherwise default value is `true`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public class OpenSearchSinkIT {
private static final String DEFAULT_SERVICE_MAP_FILE = "service-map-1.json";
private static final String INCLUDE_TYPE_NAME_FALSE_URI = "?include_type_name=false";
private static final String TRACE_INGESTION_TEST_DISABLED_REASON = "Trace ingestion is not supported for ES 6";
private static final String LOG_INGESTION_TEST_DISABLED_REASON = "Log ingestion is not supported for ES 6";
private static final String METRIC_INGESTION_TEST_DISABLED_REASON = "Metric ingestion is not supported for ES 6";

private RestClient client;
private SinkContext sinkContext;
Expand Down Expand Up @@ -188,6 +190,7 @@ public void testInstantiateSinkRawSpanDefault() throws IOException {
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null);
OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW);
assertThat(indexAlias, equalTo("otel-v1-apm-span"));
Request request = new Request(HttpMethod.HEAD, indexAlias);
Response response = client.performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
Expand Down Expand Up @@ -226,6 +229,96 @@ public void testInstantiateSinkRawSpanDefault() throws IOException {
}
}

@Test
@DisabledIf(value = "isES6", disabledReason = LOG_INGESTION_TEST_DISABLED_REASON)
public void testInstantiateSinkLogsDefaultLogSink() throws IOException {
final PluginSetting pluginSetting = generatePluginSetting(IndexType.LOG_ANALYTICS.getValue(), null, null);
OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.LOG_ANALYTICS);
assertThat(indexAlias, equalTo("logs-otel-v1"));
Request request = new Request(HttpMethod.HEAD, indexAlias);
Response response = client.performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
final String index = String.format("%s-000001", indexAlias);
final Map<String, Object> mappings = getIndexMappings(index);
assertThat(mappings, notNullValue());
assertThat((boolean) mappings.get("date_detection"), equalTo(false));
sink.shutdown();

if (isOSBundle()) {
// Check managed index
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(getIndexPolicyId(index), equalTo(IndexConstants.LOGS_ISM_POLICY));
}
);
}

// roll over initial index
request = new Request(HttpMethod.POST, String.format("%s/_rollover", indexAlias));
request.setJsonEntity("{ \"conditions\" : { } }\n");
response = client.performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));

// Instantiate sink again
sink = createObjectUnderTest(pluginSetting, true);
// Make sure no new write index *-000001 is created under alias
final String rolloverIndexName = String.format("%s-000002", indexAlias);
request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias");
response = client.performRequest(request);
assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true));
sink.shutdown();

if (isOSBundle()) {
// Check managed index
assertThat(getIndexPolicyId(rolloverIndexName), equalTo(IndexConstants.LOGS_ISM_POLICY));
}
}

@Test
@DisabledIf(value = "isES6", disabledReason = METRIC_INGESTION_TEST_DISABLED_REASON)
public void testInstantiateSinkMetricsDefaultMetricSink() throws IOException {
final PluginSetting pluginSetting = generatePluginSetting(IndexType.METRIC_ANALYTICS.getValue(), null, null);
OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.METRIC_ANALYTICS);
assertThat(indexAlias, equalTo("metrics-otel-v1"));
Request request = new Request(HttpMethod.HEAD, indexAlias);
Response response = client.performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));
final String index = String.format("%s-000001", indexAlias);
final Map<String, Object> mappings = getIndexMappings(index);
assertThat(mappings, notNullValue());
assertThat((boolean) mappings.get("date_detection"), equalTo(false));
sink.shutdown();

if (isOSBundle()) {
// Check managed index
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(getIndexPolicyId(index), equalTo(IndexConstants.METRICS_ISM_POLICY));
}
);
}

// roll over initial index
request = new Request(HttpMethod.POST, String.format("%s/_rollover", indexAlias));
request.setJsonEntity("{ \"conditions\" : { } }\n");
response = client.performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK));

// Instantiate sink again
sink = createObjectUnderTest(pluginSetting, true);
// Make sure no new write index *-000001 is created under alias
final String rolloverIndexName = String.format("%s-000002", indexAlias);
request = new Request(HttpMethod.GET, rolloverIndexName + "/_alias");
response = client.performRequest(request);
assertThat(checkIsWriteIndex(EntityUtils.toString(response.getEntity()), indexAlias, rolloverIndexName), equalTo(true));
sink.shutdown();

if (isOSBundle()) {
// Check managed index
assertThat(getIndexPolicyId(rolloverIndexName), equalTo(IndexConstants.METRICS_ISM_POLICY));
}
}

@Test
@DisabledIf(value = "isES6", disabledReason = TRACE_INGESTION_TEST_DISABLED_REASON)
public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ private Map<String, Object> readIndexTemplate(final String templateFile, final I
templateURL = loadExistingTemplate(templateType, IndexConstants.RAW_DEFAULT_TEMPLATE_FILE);
} else if (indexType.equals(IndexType.TRACE_ANALYTICS_SERVICE_MAP)) {
templateURL = loadExistingTemplate(templateType, IndexConstants.SERVICE_MAP_DEFAULT_TEMPLATE_FILE);
} else if (indexType.equals(IndexType.LOG_ANALYTICS)) {
templateURL = loadExistingTemplate(templateType, IndexConstants.LOGS_DEFAULT_TEMPLATE_FILE);
} else if (indexType.equals(IndexType.METRIC_ANALYTICS)) {
templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_DEFAULT_TEMPLATE_FILE);
} else if (templateFile != null) {
if (templateFile.toLowerCase().startsWith(S3_PREFIX)) {
FileReader s3FileReader = new S3FileReader(s3Client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ public class IndexConstants {
public static final String RAW_ISM_POLICY = "raw-span-policy";
public static final String RAW_ISM_FILE_NO_ISM_TEMPLATE = "raw-span-policy-no-ism-template.json";
public static final String RAW_ISM_FILE_WITH_ISM_TEMPLATE = "raw-span-policy-with-ism-template.json";

public static final String LOGS_DEFAULT_TEMPLATE_FILE = "logs-otel-v1-index-template.json";
public static final String LOGS_ISM_POLICY = "logs-policy";
public static final String LOGS_ISM_FILE_NO_ISM_TEMPLATE = "logs-policy-no-ism-template.json";
public static final String LOGS_ISM_FILE_WITH_ISM_TEMPLATE = "logs-policy-with-ism-template.json";

public static final String METRICS_DEFAULT_TEMPLATE_FILE = "metrics-otel-v1-index-template.json";
public static final String METRICS_ISM_POLICY = "metrics-policy";
public static final String METRICS_ISM_FILE_NO_ISM_TEMPLATE = "metrics-policy-no-ism-template.json";
public static final String METRICS_ISM_FILE_WITH_ISM_TEMPLATE = "metrics-policy-with-ism-template.json";

public static final String ISM_ENABLED_SETTING = "opendistro.index_state_management.enabled";
public static final String ISM_POLICY_ID_SETTING = "opendistro.index_state_management.policy_id";
public static final String ISM_ROLLOVER_ALIAS_SETTING = "opendistro.index_state_management.rollover_alias";
Expand All @@ -26,7 +37,9 @@ public class IndexConstants {

static {
// TODO: extract out version number into version enum
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_RAW, "otel-v1-apm-span");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_SERVICE_MAP, "otel-v1-apm-service-map");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_RAW, "otel-v1-apm-span");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.LOG_ANALYTICS, "logs-otel-v1");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS, "metrics-otel-v1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ public final IndexManager getIndexManager(final IndexType indexType,
indexManager = new TraceAnalyticsServiceMapIndexManager(
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
break;
case LOG_ANALYTICS:
indexManager = new LogAnalyticsIndexManager(
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
break;
case METRIC_ANALYTICS:
indexManager = new MetricAnalyticsIndexManager(
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
break;
case MANAGEMENT_DISABLED:
indexManager = new ManagementDisabledIndexManager(
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
Expand Down Expand Up @@ -140,6 +148,42 @@ public TraceAnalyticsServiceMapIndexManager(final RestHighLevelClient restHighLe
}
}

private static class LogAnalyticsIndexManager extends AbstractIndexManager {

public LogAnalyticsIndexManager(final RestHighLevelClient restHighLevelClient,
final OpenSearchClient openSearchClient,
final OpenSearchSinkConfiguration openSearchSinkConfiguration,
final ClusterSettingsParser clusterSettingsParser,
final TemplateStrategy templateStrategy,
final String indexAlias) {
super(restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
this.ismPolicyManagementStrategy = new IsmPolicyManagement(
openSearchClient,
restHighLevelClient,
IndexConstants.LOGS_ISM_POLICY,
IndexConstants.LOGS_ISM_FILE_WITH_ISM_TEMPLATE,
IndexConstants.LOGS_ISM_FILE_NO_ISM_TEMPLATE);
}
}

private static class MetricAnalyticsIndexManager extends AbstractIndexManager {

public MetricAnalyticsIndexManager(final RestHighLevelClient restHighLevelClient,
final OpenSearchClient openSearchClient,
final OpenSearchSinkConfiguration openSearchSinkConfiguration,
final ClusterSettingsParser clusterSettingsParser,
final TemplateStrategy templateStrategy,
final String indexAlias) {
super(restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
this.ismPolicyManagementStrategy = new IsmPolicyManagement(
openSearchClient,
restHighLevelClient,
IndexConstants.METRICS_ISM_POLICY,
IndexConstants.METRICS_ISM_FILE_WITH_ISM_TEMPLATE,
IndexConstants.METRICS_ISM_FILE_NO_ISM_TEMPLATE);
}
}

private class ManagementDisabledIndexManager extends AbstractIndexManager {
protected ManagementDisabledIndexManager(final RestHighLevelClient restHighLevelClient,
final OpenSearchClient openSearchClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
public enum IndexType {
TRACE_ANALYTICS_RAW("trace-analytics-raw"),
TRACE_ANALYTICS_SERVICE_MAP("trace-analytics-service-map"),
LOG_ANALYTICS("log-analytics"),
METRIC_ANALYTICS("metric-analytics"),
CUSTOM("custom"),
MANAGEMENT_DISABLED("management_disabled");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
{
"version": 1,
"template": {
"mappings": {
"date_detection": false,
"dynamic_templates": [
{
"resource_attributes_map": {
"mapping": {
"type": "keyword"
},
"path_match": "resource.attributes.*"
}
},
{
"log_attributes_map": {
"mapping": {
"type": "keyword"
},
"path_match": "log.attributes.*"
}
}
],
"_source": {
"enabled": true
},
"properties": {
"severity": {
"properties": {
"number": {
"type": "long"
},
"text": {
"type": "keyword"
}
}
},
"body": {
"type": "text"
juergen-walter marked this conversation as resolved.
Show resolved Hide resolved
},
"@timestamp": {
"type": "date_nanos"
},
"time": {
"type": "date_nanos"
},
"observedTimestamp": {
"type": "date_nanos"
},
"observedTime": {
"type": "alias",
"path": "observedTimestamp"
},
"traceId": {
"ignore_above": 256,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are actually hex encoded 16 bytes values, hence 32 characters. You can shorten the length to this value.

Suggested change
"ignore_above": 256,
"ignore_above": 32,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"type": "keyword"
},
"spanId": {
"ignore_above": 256,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are actually hex encoded 8 bytes values, hence 16 characters. You can shorten the length to this value.

Suggested change
"ignore_above": 256,
"ignore_above": 16,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"type": "keyword"
},
"schemaUrl": {
"type": "keyword"
},
"instrumentationScope": {
"properties": {
"name": {
"type": "keyword"
},
"version": {
"type": "keyword"
}
}
},
"event": {
"properties": {
"kind": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we follow the pattern that from Simple Schema and add "ignore_above": 256, to these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"type": "keyword"
},
"domain": {
"type": "keyword"
},
"category": {
"type": "keyword"
},
"type": {
"type": "keyword"
},
"result": {
"type": "keyword"
},
"exception": {
"properties": {
"message": {
"type": "text"
juergen-walter marked this conversation as resolved.
Show resolved Hide resolved
juergen-walter marked this conversation as resolved.
Show resolved Hide resolved
},
"stacktrace": {
"type": "text"
},
"type": {
"type": "keyword"
}
}
}
}
}
}
}
}
}
Loading
Loading