Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements to list_to_map processor #4038

Merged
merged 6 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion data-prepper-plugins/mutate-event-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,47 @@ the last element will be kept:
}
```

If `use_source_key` and `extract_value` are true:
```yaml
- list_to_map:
source: "mylist"
use_source_key: true
extract_value: true
```
we will get:
```json
{
"mylist": [
{
"name": "a",
"value": "val-a"
},
{
"name": "b",
"value": "val-b1"
},
{
"name": "b",
"value": "val-b2"
},
{
"name": "c",
"value": "val-c"
}
],
"name": ["a", "b", "b", "c"],
"value": ["val-a", "val-b1", "val-b2", "val-c"]
}
```

### Configuration
* `key` - (required) - The key of the fields that will be extracted as keys in the generated map

* `source` - (required) - The key in the event with a list of objects that will be converted to map
* `target` - (optional) - The key of the field that will hold the generated map. If not specified, the generated map will be put in the root.
* `key` - (optional) - The key of the fields that will be extracted as keys in the generated map
* `use_source_key` - (optional) - A boolean value, default to false. If it's true, will use the source key as is in the result
* `value_key` - (optional) - If specified, the values of the given `value_key` in the objects of the source list will be extracted and put into the values of the generated map; otherwise, original objects in the source list will be put into the values of the generated map.
* `extract_value` - (optional) - A boolean value, default to false. It only applies when `use_source_key` is true. If it's true, the value corresponding to the source key will be extracted into the target; otherwise, original objects in the source list will be put into the target.
* `flatten` - (optional) - A boolean value, default to false. If it's false, the values in the generated map will be lists; if it's true, the lists will be flattened into single items.
* `flattened_element` - (optional) - Valid options are "first" and "last", default is "first". This specifies which element, first one or last one, to keep if `flatten` option is true.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand All @@ -21,7 +16,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand All @@ -30,7 +28,6 @@
@DataPrepperPlugin(name = "list_to_map", pluginType = Processor.class, pluginConfigurationType = ListToMapProcessorConfig.class)
public class ListToMapProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(ListToMapProcessor.class);
private final ListToMapProcessorConfig config;

Expand All @@ -52,81 +49,93 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
continue;
}

final JsonNode sourceNode;
final List<Map<String, Object>> sourceList;
try {
sourceNode = getSourceNode(recordEvent);
sourceList = recordEvent.get(config.getSource(), List.class);
} catch (final Exception e) {
LOG.warn(EVENT, "Given source path [{}] is not valid on record [{}]",
config.getSource(), recordEvent, e);
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
continue;
}

ObjectNode targetNode;
final Map<String, Object> targetMap;
try {
targetNode = constructTargetNode(sourceNode);
} catch (IllegalArgumentException e) {
targetMap = constructTargetMap(sourceList);
} catch (final IllegalArgumentException e) {
LOG.warn(EVENT, "Cannot find a list at the given source path [{}} on record [{}]",
config.getSource(), recordEvent, e);
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
continue;
} catch (final Exception e) {
LOG.error(EVENT, "Error converting source list to map on record [{}]", recordEvent, e);
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
continue;
}

try {
updateEvent(recordEvent, targetNode);
updateEvent(recordEvent, targetMap);
} catch (final Exception e) {
LOG.error(EVENT, "Error updating record [{}] after converting source list to map", recordEvent, e);
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
}
}
return records;
}

private JsonNode getSourceNode(final Event recordEvent) {
final Object sourceObject = recordEvent.get(config.getSource(), Object.class);
return OBJECT_MAPPER.convertValue(sourceObject, JsonNode.class);
}

private ObjectNode constructTargetNode(JsonNode sourceNode) {
final ObjectNode targetNode = OBJECT_MAPPER.createObjectNode();
if (sourceNode.isArray()) {
for (final JsonNode itemNode : sourceNode) {
String itemKey = itemNode.get(config.getKey()).asText();

if (!config.getFlatten()) {
final ArrayNode itemValueNode;
if (!targetNode.has(itemKey)) {
itemValueNode = OBJECT_MAPPER.createArrayNode();
targetNode.set(itemKey, itemValueNode);
} else {
itemValueNode = (ArrayNode) targetNode.get(itemKey);
}
private Map<String, Object> constructTargetMap(final List<Map<String, Object>> sourceList) {
Map<String, Object> targetMap = new HashMap<>();
for (final Map<String, Object> itemMap : sourceList) {

if (config.getValueKey() == null) {
itemValueNode.add(itemNode);
} else {
itemValueNode.add(itemNode.get(config.getValueKey()));
if (config.getUseSourceKey()) {
if (config.getFlatten()) {
for (final String entryKey : itemMap.keySet()) {
setTargetMapFlattened(targetMap, itemMap, entryKey, entryKey, config.getExtractValue());
}
} else {
if (!targetNode.has(itemKey) || config.getFlattenedElement() == ListToMapProcessorConfig.FlattenedElement.LAST) {
if (config.getValueKey() == null) {
targetNode.set(itemKey, itemNode);
} else {
targetNode.set(itemKey, itemNode.get(config.getValueKey()));
}
for (final String entryKey : itemMap.keySet()) {
setTargetMapUnflattened(targetMap, itemMap, entryKey, entryKey, config.getExtractValue());
}
}
} else {
final String itemKey = (String) itemMap.get(config.getKey());
if (config.getFlatten()) {
setTargetMapFlattened(targetMap, itemMap, itemKey, config.getValueKey(), config.getValueKey() != null);
} else {
setTargetMapUnflattened(targetMap, itemMap, itemKey, config.getValueKey(), config.getValueKey() != null);
}
}
}
return targetMap;
}

private void setTargetMapUnflattened(
final Map<String, Object> targetMap, final Map<String, Object> itemMap, final String itemKey, final String itemValueKey, final boolean doExtractValue) {
if (!targetMap.containsKey(itemKey)) {
targetMap.put(itemKey, new ArrayList<>());
}

final List<Object> itemValue = (List<Object>) targetMap.get(itemKey);

if (doExtractValue) {
itemValue.add(itemMap.get(itemValueKey));
} else {
throw new IllegalArgumentException("Cannot find a list at the given source path [{}]" + config.getSource());
itemValue.add(itemMap);
}
return targetNode;
}

private void updateEvent(Event recordEvent, ObjectNode targetNode) {
final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<>() {};
final Map<String, Object> targetMap = OBJECT_MAPPER.convertValue(targetNode, mapTypeReference);
private void setTargetMapFlattened(
final Map<String, Object> targetMap, final Map<String, Object> itemMap, final String itemKey, final String itemValueKey, final boolean doExtractValue) {
if (!targetMap.containsKey(itemKey) || config.getFlattenedElement() == ListToMapProcessorConfig.FlattenedElement.LAST) {
if (doExtractValue) {
targetMap.put(itemKey, itemMap.get(itemValueKey));
} else {
targetMap.put(itemKey, itemMap);
}
}
}

private void updateEvent(final Event recordEvent, final Map<String, Object> targetMap) {
final boolean doWriteToRoot = Objects.isNull(config.getTarget());
if (doWriteToRoot) {
for (final Map.Entry<String, Object> entry : targetMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import jakarta.validation.constraints.NotNull;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -45,14 +46,18 @@ static FlattenedElement fromOptionValue(final String option) {
@JsonProperty("target")
private String target = null;

@NotEmpty
@NotNull
@JsonProperty("key")
private String key;

@JsonProperty("value_key")
private String valueKey = null;

@JsonProperty("use_source_key")
private boolean useSourceKey = false;

@JsonProperty("extract_value")
private boolean extractValue = false;

@NotNull
@JsonProperty("flatten")
private boolean flatten = false;
Expand All @@ -64,6 +69,9 @@ static FlattenedElement fromOptionValue(final String option) {
@JsonProperty("list_to_map_when")
private String listToMapWhen;

@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

public String getSource() {
return source;
}
Expand All @@ -80,6 +88,14 @@ public String getValueKey() {
return valueKey;
}

public boolean getUseSourceKey() {
return useSourceKey;
}

public boolean getExtractValue() {
return extractValue;
}

public boolean getFlatten() {
return flatten;
}
Expand All @@ -89,4 +105,8 @@ public boolean getFlatten() {
public FlattenedElement getFlattenedElement() {
return flattenedElement;
}

public List<String> getTagsOnFailure() {
return tagsOnFailure;
}
}
Loading
Loading