Skip to content

Commit

Permalink
Add support for using expressions with formatString in JacksonEvent, …
Browse files Browse the repository at this point in the history
…use for index in OpenSearch sink (#3032)

Add support for using expressions with formatString in JacksonEvent, use for index in OpenSearch sink

Signed-off-by: Taylor Gray <tylgry@amazon.com>

---------

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 authored Jul 18, 2023
1 parent 37d05bb commit 8785bbf
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -113,6 +115,16 @@ public interface Event extends Serializable {
*/
String formatString(final String format);

/**
* Returns formatted parts of the input string replaced by their values in the event or the values from the result
* of a Data Prepper expression
* @param format input format
* @return returns a string with no formatted parts, returns null if no value is found
* @throws RuntimeException if the input string is not properly formatted
* @since 2.1
*/
String formatString(String format, ExpressionEvaluator expressionEvaluator);

/**
* Returns event handle
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,6 +30,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -298,6 +300,24 @@ public String getAsJsonString(final String key) {
*/
@Override
public String formatString(final String format) {
return formatStringInternal(format, null);
}

/**
* returns a string with formatted parts replaced by their values. The input
* string may contain parts with format "${.../.../...}" which are replaced
* by their value in the event. The input string may also contain Data Prepper expressions
* such as "${getMetadata(\"some_metadata_key\")}
*
* @param format string with format
* @throws RuntimeException if the format is incorrect or the value is not a string
*/
@Override
public String formatString(final String format, final ExpressionEvaluator expressionEvaluator) {
return formatStringInternal(format, expressionEvaluator);
}

private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator) {
int fromIndex = 0;
String result = "";
int position = 0;
Expand All @@ -308,11 +328,21 @@ public String formatString(final String format) {
}
result += format.substring(fromIndex, position);
String name = format.substring(position + 2, endPosition);
Object val = this.get(name, Object.class);
if (val == null) {
throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name));

Object val;
if (Objects.nonNull(expressionEvaluator) && expressionEvaluator.isValidExpressionStatement(name)) {
val = expressionEvaluator.evaluate(name, this);
} else {
val = this.get(name, Object.class);
if (val == null) {
throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name));
}
}


if (Objects.nonNull(val)) {
result += val.toString();
}
result += val.toString();
fromIndex = endPosition + 1;
}
if (fromIndex < format.length()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;

import java.time.Instant;
Expand Down Expand Up @@ -528,6 +529,31 @@ public void testBuild_withFormatString(String formattedString, String finalStrin
assertThat(event.formatString(formattedString), is(equalTo(finalString)));
}

@Test
public void testBuild_withFormatStringWithExpressionEvaluator() {

final String jsonString = "{\"foo\": \"bar\", \"info\": {\"ids\": {\"id\":\"idx\"}}}";
final String expressionStatement = UUID.randomUUID().toString();
final String expressionEvaluationResult = UUID.randomUUID().toString();

final String formatString = "${foo}-${" + expressionStatement + "}-test-string";
final String finalString = "bar-" + expressionEvaluationResult + "-test-string";

event = JacksonEvent.builder()
.withEventType(eventType)
.withData(jsonString)
.getThis()
.build();

final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class);

when(expressionEvaluator.isValidExpressionStatement("foo")).thenReturn(false);
when(expressionEvaluator.isValidExpressionStatement(expressionStatement)).thenReturn(true);
when(expressionEvaluator.evaluate(expressionStatement, event)).thenReturn(expressionEvaluationResult);

assertThat(event.formatString(formatString, expressionEvaluator), is(equalTo(finalString)));
}

@ParameterizedTest
@CsvSource({
"test-${foo}-string, test-123-string",
Expand Down
11 changes: 2 additions & 9 deletions data-prepper-plugins/opensearch-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,13 @@ opensearch-source-pipeline:
hosts: [ "https://source-cluster:9200" ]
username: "username"
password: "password"
processor:
- add_entries:
entries:
- key: "document_id"
value_expression: "getMetadata(\"document_id\")"
- key: "index"
value_expression: "getMetadata(\"index\")"
sink:
- opensearch:
hosts: [ "https://sink-cluster:9200" ]
username: "username"
password: "password"
document_id_field: "document_id"
index: "copied-${index}"
document_id_field: "getMetadata(\"opensearch-document_id\")"
index: "${getMetadata(\"opensearch-index\"}"
```

## Configuration
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ Default is null.
* This index name can be a plain string, such as `application`, `my-index-name`.
* This index name can also be a plain string plus a date-time pattern as a suffix, such as `application-%{yyyy.MM.dd}`, `my-index-name-%{yyyy.MM.dd.HH}`. When OpenSearch Sink is sending data to OpenSearch, the date-time pattern will be replaced by actual UTC time. The pattern supports all the symbols that represent one hour or above and are listed in [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html). For example, with an index pattern like `my-index-name-%{yyyy.MM.dd}`, a new index is created for each day such as `my-index-name-2022.01.25`. For another example, with an index pattern like `my-index-name-%{yyyy.MM.dd.HH}`, a new index is created for each hour such as `my-index-name-2022.01.25.13`.
* This index name can also be a formatted string (with or without date-time pattern suffix), such as `my-${index}-name`. When OpenSearchSink is sending data to OpenSearch, the format portion "${index}" will be replaced by it's value in the event that is being processed. The format may also be like "${index1/index2/index3}" in which case the field "index1/index2/index3" is searched in the event and replaced by its value.

- Additionally, the formatted string can include expressions to evaluate to format the index name. For example, `my-${index}-${getMetadata(\"some_metadata_key\")}-name` will inject both the `index` value from the Event, as well as the value of `some_metadata_key` from the Event metadata to construct the index name.
- <a name="template_type"></a>`template_type`(optional): Defines what type of OpenSearch template to use. The available options are `v1` and `index-template`. The default value is `v1`, which uses the original OpenSearch templates available at the `_template` API endpoints. Select `index-template` to use composable index templates which are available at OpenSearch's `_index_template` endpoint.

- <a name="template_file"></a>`template_file`(optional): A json file path or AWS S3 URI to be read as index template for custom data ingestion. The json file content should be the json value of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public void doOutput(final Collection<Record<Event>> records) {
final Optional<String> routing = document.getRoutingField();
String indexName = configuredIndexAlias;
try {
indexName = indexManager.getIndexName(event.formatString(indexName));
indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator));
} catch (IOException | EventKeyNotFoundException e) {
LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage());
dynamicIndexDroppedEvents.increment();
Expand Down

0 comments on commit 8785bbf

Please sign in to comment.