Skip to content

Commit

Permalink
ENH: support custom index template for ES6 in opensearch sink (opense…
Browse files Browse the repository at this point in the history
…arch-project#3061)

Signed-off-by: George Chen <qchea@amazon.com>
  • Loading branch information
chenqi0805 authored Aug 3, 2023
1 parent c753ba6 commit 8821005
Show file tree
Hide file tree
Showing 21 changed files with 1,136 additions and 514 deletions.
4 changes: 2 additions & 2 deletions data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Default is null.

- `proxy`(optional): A String of the address of a forward HTTP proxy. The format is like "<host-name-or-ip>:\<port\>". Examples: "example.com:8100", "http://example.com:8100", "112.112.112.112:8100". Note: port number cannot be omitted.

- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration) and `distribution_version` is `null`, otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.
- `index_type` (optional): a String from the list [`custom`, `trace-analytics-raw`, `trace-analytics-service-map`, `management_disabled`], which represents an index type. Defaults to `custom` if `serverless` is `false` in [AWS Configuration](#aws_configuration), otherwise defaults to `management_disabled`. This index_type instructs Sink plugin what type of data it is handling.

- `enable_request_compression` (optional): A boolean that enables or disables request compression when sending requests to OpenSearch. Default is true.

Expand Down Expand Up @@ -133,7 +133,7 @@ Default is null.
* This index name can also be a plain string plus a date-time pattern as a suffix, such as `application-%{yyyy.MM.dd}`, `my-index-name-%{yyyy.MM.dd.HH}`. When OpenSearch Sink is sending data to OpenSearch, the date-time pattern will be replaced by actual UTC time. The pattern supports all the symbols that represent one hour or above and are listed in [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). For example, with an index pattern like `my-index-name-%{yyyy.MM.dd}`, a new index is created for each day such as `my-index-name-2022.01.25`. For another example, with an index pattern like `my-index-name-%{yyyy.MM.dd.HH}`, a new index is created for each hour such as `my-index-name-2022.01.25.13`.
* This index name can also be a formatted string (with or without date-time pattern suffix), such as `my-${index}-name`. When OpenSearchSink is sending data to OpenSearch, the format portion "${index}" will be replaced by it's value in the event that is being processed. The format may also be like "${index1/index2/index3}" in which case the field "index1/index2/index3" is searched in the event and replaced by its value.
- Additionally, the formatted string can include expressions to evaluate to format the index name. For example, `my-${index}-${getMetadata(\"some_metadata_key\")}-name` will inject both the `index` value from the Event, as well as the value of `some_metadata_key` from the Event metadata to construct the index name.
- <a name="template_type"></a>`template_type`(optional): Defines what type of OpenSearch template to use. The available options are `v1` and `index-template`. The default value is `v1`, which uses the original OpenSearch templates available at the `_template` API endpoints. Select `index-template` to use composable index templates which are available at OpenSearch's `_index_template` endpoint.
- <a name="template_type"></a>`template_type`(optional): Defines what type of OpenSearch template to use. The available options are `v1` and `index-template`. The default value is `v1`, which uses the original OpenSearch templates available at the `_template` API endpoints. Select `index-template` to use composable index templates which are available at OpenSearch's `_index_template` endpoint. Note: when `distribution_version` is `es6`, `template_type` is enforced into `v1`.

- <a name="template_file"></a>`template_file`(optional): A json file path or AWS S3 URI to be read as index template for custom data ingestion. The json file content should be the json value of
`"template"` key in the json content of OpenSearch [Index templates API](https://opensearch.org/docs/latest/opensearch/index-templates/),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.opensearch.dataprepper.plugins.sink.opensearch.index.DocumentBuilder;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManager;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapperFactory;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy;
import org.slf4j.Logger;
Expand Down Expand Up @@ -190,7 +192,10 @@ private void doInitializeInternal() throws IOException {
restHighLevelClient = openSearchSinkConfig.getConnectionConfiguration().createClient(awsCredentialsSupplier);
openSearchClient = openSearchSinkConfig.getConnectionConfiguration().createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier);
configuredIndexAlias = openSearchSinkConfig.getIndexConfiguration().getIndexAlias();
final TemplateStrategy templateStrategy = openSearchSinkConfig.getIndexConfiguration().getTemplateType().createTemplateStrategy(openSearchClient);
final IndexTemplateAPIWrapper indexTemplateAPIWrapper = IndexTemplateAPIWrapperFactory.getWrapper(
openSearchSinkConfig.getIndexConfiguration(), openSearchClient);
final TemplateStrategy templateStrategy = openSearchSinkConfig.getIndexConfiguration().getTemplateType()
.createTemplateStrategy(indexTemplateAPIWrapper);
indexManager = indexManagerFactory.getIndexManager(indexType, openSearchClient, restHighLevelClient,
openSearchSinkConfig, templateStrategy, configuredIndexAlias);
final String dlqFile = openSearchSinkConfig.getRetryConfiguration().getDlqFile();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class ComposableIndexTemplate implements IndexTemplate {

private final Map<String, Object> indexTemplateMap;
private String name;

public ComposableIndexTemplate(final Map<String, Object> indexTemplateMap) {
this.indexTemplateMap = new HashMap<>(indexTemplateMap);
}

@Override
public void setTemplateName(final String name) {
this.name = name;

}

@Override
public void setIndexPatterns(final List<String> indexPatterns) {
indexTemplateMap.put("index_patterns", indexPatterns);
}

@Override
public void putCustomSetting(final String name, final Object value) {

}

@Override
public Optional<Long> getVersion() {
if(!indexTemplateMap.containsKey("version"))
return Optional.empty();
final Number version = (Number) indexTemplateMap.get("version");
return Optional.of(version.longValue());
}

public Map<String, Object> getIndexTemplateMap() {
return Collections.unmodifiableMap(indexTemplateMap);
}

public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,10 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.json.stream.JsonParser;
import org.opensearch.client.json.JsonpDeserializer;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.ObjectBuilderDeserializer;
import org.opensearch.client.json.ObjectDeserializer;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.GetIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.GetIndexTemplateResponse;
import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.get_index_template.IndexTemplateItem;
import org.opensearch.client.opensearch.indices.put_index_template.IndexTemplateMapping;
import org.opensearch.client.transport.endpoints.BooleanResponse;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -32,18 +17,24 @@
* A {@link TemplateStrategy} for the OpenSearch <a href="https://opensearch.org/docs/latest/im-plugin/index-templates/">index template</a>.
*/
class ComposableIndexTemplateStrategy implements TemplateStrategy {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final OpenSearchClient openSearchClient;
private final IndexTemplateAPIWrapper<GetIndexTemplateResponse> indexTemplateAPIWrapper;

public ComposableIndexTemplateStrategy(final OpenSearchClient openSearchClient) {
this.openSearchClient = openSearchClient;
public ComposableIndexTemplateStrategy(final IndexTemplateAPIWrapper<GetIndexTemplateResponse> indexTemplateAPIWrapper) {
this.indexTemplateAPIWrapper = indexTemplateAPIWrapper;
}

@Override
public Optional<Long> getExistingTemplateVersion(final String templateName) throws IOException {
return getIndexTemplate(templateName)
.map(IndexTemplateItem::indexTemplate)
.map(indexTemplate -> indexTemplate.version());
return indexTemplateAPIWrapper.getTemplate(templateName)
.map(getIndexTemplateResponse -> {
final List<IndexTemplateItem> indexTemplateItems = getIndexTemplateResponse.indexTemplates();
if (indexTemplateItems.size() == 1) {
return indexTemplateItems.stream().findFirst().get().indexTemplate().version();
} else {
throw new RuntimeException(String.format("Found zero or multiple index templates result when querying for %s",
templateName));
}
});
}

@Override
Expand All @@ -53,104 +44,6 @@ public IndexTemplate createIndexTemplate(final Map<String, Object> templateMap)

@Override
public void createTemplate(final IndexTemplate indexTemplate) throws IOException {
if(!(indexTemplate instanceof ComposableIndexTemplate)) {
throw new IllegalArgumentException("Unexpected indexTemplate provided to createTemplate.");
}

final ComposableIndexTemplate composableIndexTemplate = (ComposableIndexTemplate) indexTemplate;

final Map<String, Object> templateMapping = composableIndexTemplate.indexTemplateMap;

final String indexTemplateString = OBJECT_MAPPER.writeValueAsString(templateMapping);

final ByteArrayInputStream byteIn = new ByteArrayInputStream(
indexTemplateString.getBytes(StandardCharsets.UTF_8));
final JsonpMapper mapper = openSearchClient._transport().jsonpMapper();
final JsonParser parser = mapper.jsonProvider().createParser(byteIn);

final PutIndexTemplateRequest putIndexTemplateRequest = PutIndexTemplateRequestDeserializer.getJsonpDeserializer(composableIndexTemplate.name)
.deserialize(parser, mapper);

openSearchClient.indices().putIndexTemplate(putIndexTemplateRequest);

}

private Optional<IndexTemplateItem> getIndexTemplate(final String indexTemplateName) throws IOException {
final ExistsIndexTemplateRequest existsRequest = new ExistsIndexTemplateRequest.Builder()
.name(indexTemplateName)
.build();
final BooleanResponse existsResponse = openSearchClient.indices().existsIndexTemplate(existsRequest);

if (!existsResponse.value()) {
return Optional.empty();
}

final GetIndexTemplateRequest getRequest = new GetIndexTemplateRequest.Builder()
.name(indexTemplateName)
.build();
final GetIndexTemplateResponse indexTemplateResponse = openSearchClient.indices().getIndexTemplate(getRequest);

final List<IndexTemplateItem> indexTemplateItems = indexTemplateResponse.indexTemplates();
if (indexTemplateItems.size() == 1) {
return indexTemplateItems.stream().findFirst();
} else {
throw new RuntimeException(String.format("Found zero or multiple index templates result when querying for %s",
indexTemplateName));
}
}

static class ComposableIndexTemplate implements IndexTemplate {

private final Map<String, Object> indexTemplateMap;
private String name;

private ComposableIndexTemplate(final Map<String, Object> indexTemplateMap) {
this.indexTemplateMap = new HashMap<>(indexTemplateMap);
}

@Override
public void setTemplateName(final String name) {
this.name = name;

}

@Override
public void setIndexPatterns(final List<String> indexPatterns) {
indexTemplateMap.put("index_patterns", indexPatterns);
}

@Override
public void putCustomSetting(final String name, final Object value) {

}

@Override
public Optional<Long> getVersion() {
if(!indexTemplateMap.containsKey("version"))
return Optional.empty();
final Number version = (Number) indexTemplateMap.get("version");
return Optional.of(version.longValue());
}
}

private static class PutIndexTemplateRequestDeserializer {
private static void setupPutIndexTemplateRequestDeserializer(final ObjectDeserializer<PutIndexTemplateRequest.Builder> objectDeserializer) {

objectDeserializer.add(PutIndexTemplateRequest.Builder::name, JsonpDeserializer.stringDeserializer(), "name");
objectDeserializer.add(PutIndexTemplateRequest.Builder::indexPatterns, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()),
"index_patterns");
objectDeserializer.add(PutIndexTemplateRequest.Builder::version, JsonpDeserializer.longDeserializer(), "version");
objectDeserializer.add(PutIndexTemplateRequest.Builder::priority, JsonpDeserializer.integerDeserializer(), "priority");
objectDeserializer.add(PutIndexTemplateRequest.Builder::composedOf, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()),
"composed_of");
objectDeserializer.add(PutIndexTemplateRequest.Builder::template, IndexTemplateMapping._DESERIALIZER, "template");
}

static JsonpDeserializer<PutIndexTemplateRequest> getJsonpDeserializer(final String name) {
return ObjectBuilderDeserializer
.lazy(
() -> new PutIndexTemplateRequest.Builder().name(name),
PutIndexTemplateRequestDeserializer::setupPutIndexTemplateRequestDeserializer);
}
indexTemplateAPIWrapper.putTemplate(indexTemplate);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.json.stream.JsonParser;
import org.opensearch.client.json.JsonpDeserializer;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.ObjectBuilderDeserializer;
import org.opensearch.client.json.ObjectDeserializer;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.GetIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.GetIndexTemplateResponse;
import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest;
import org.opensearch.client.opensearch.indices.put_index_template.IndexTemplateMapping;
import org.opensearch.client.transport.endpoints.BooleanResponse;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;

public class ComposableTemplateAPIWrapper implements IndexTemplateAPIWrapper<GetIndexTemplateResponse> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final OpenSearchClient openSearchClient;

public ComposableTemplateAPIWrapper(final OpenSearchClient openSearchClient) {
this.openSearchClient = openSearchClient;
}

@Override
public void putTemplate(final IndexTemplate indexTemplate) throws IOException {
if(!(indexTemplate instanceof ComposableIndexTemplate)) {
throw new IllegalArgumentException("Unexpected indexTemplate provided to createTemplate.");
}

final ComposableIndexTemplate composableIndexTemplate = (ComposableIndexTemplate) indexTemplate;
final String indexTemplateString = OBJECT_MAPPER.writeValueAsString(
composableIndexTemplate.getIndexTemplateMap());

final ByteArrayInputStream byteIn = new ByteArrayInputStream(
indexTemplateString.getBytes(StandardCharsets.UTF_8));
final JsonpMapper mapper = openSearchClient._transport().jsonpMapper();
final JsonParser parser = mapper.jsonProvider().createParser(byteIn);

final PutIndexTemplateRequest putIndexTemplateRequest = PutIndexTemplateRequestDeserializer
.getJsonpDeserializer(composableIndexTemplate.getName())
.deserialize(parser, mapper);

openSearchClient.indices().putIndexTemplate(putIndexTemplateRequest);
}

@Override
public Optional<GetIndexTemplateResponse> getTemplate(final String indexTemplateName) throws IOException {
final ExistsIndexTemplateRequest existsRequest = new ExistsIndexTemplateRequest.Builder()
.name(indexTemplateName)
.build();
final BooleanResponse existsResponse = openSearchClient.indices().existsIndexTemplate(existsRequest);

if (!existsResponse.value()) {
return Optional.empty();
}

final GetIndexTemplateRequest getRequest = new GetIndexTemplateRequest.Builder()
.name(indexTemplateName)
.build();
return Optional.of(openSearchClient.indices().getIndexTemplate(getRequest));
}

private static class PutIndexTemplateRequestDeserializer {
private static void setupPutIndexTemplateRequestDeserializer(final ObjectDeserializer<PutIndexTemplateRequest.Builder> objectDeserializer) {

objectDeserializer.add(PutIndexTemplateRequest.Builder::name, JsonpDeserializer.stringDeserializer(), "name");
objectDeserializer.add(PutIndexTemplateRequest.Builder::indexPatterns, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()),
"index_patterns");
objectDeserializer.add(PutIndexTemplateRequest.Builder::version, JsonpDeserializer.longDeserializer(), "version");
objectDeserializer.add(PutIndexTemplateRequest.Builder::priority, JsonpDeserializer.integerDeserializer(), "priority");
objectDeserializer.add(PutIndexTemplateRequest.Builder::composedOf, JsonpDeserializer.arrayDeserializer(JsonpDeserializer.stringDeserializer()),
"composed_of");
objectDeserializer.add(PutIndexTemplateRequest.Builder::template, IndexTemplateMapping._DESERIALIZER, "template");
}

static JsonpDeserializer<PutIndexTemplateRequest> getJsonpDeserializer(final String name) {
return ObjectBuilderDeserializer
.lazy(
() -> new PutIndexTemplateRequest.Builder().name(name),
PutIndexTemplateRequestDeserializer::setupPutIndexTemplateRequestDeserializer);
}
}
}
Loading

0 comments on commit 8821005

Please sign in to comment.