Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default keys #3075

Merged
merged 16 commits into from
Aug 8, 2023
3 changes: 3 additions & 0 deletions data-prepper-plugins/key-value-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ When run, the processor will parse the message into the following output:
* `exclude_keys` - An array specifying the parsed keys which should not be added to the event. By default no keys will be excluded.
* Default: `[]`
* Example: `exclude_keys` is `["key2"]`. `key1=value1&key2=value2` will parse into `{"key1": "value1"}`
* `default_keys` - A hash specifying the default keys and their values which should be added to the event in case these keys do not exist in the source field being parsed.
shenkw1 marked this conversation as resolved.
Show resolved Hide resolved
* Default: `{}`
* Example: `default_keys` is `{"defaultkey": "defaultvalue"}`. `key1=value1` will parse into `{"key1": "value1", "defaultkey": "defaultvalue"}`
* `key_value_delimiter_regex` - A regex specifying the delimiter between a key and a value. Special regex characters such as `[` and `]` must be escaped using `\\`.
* There is no default.
* Note: This cannot be defined at the same time as `value_split_characters`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class KeyValueProcessor extends AbstractProcessor<Record<Event>, Record<E
private final Pattern keyValueDelimiterPattern;
private final Set<String> includeKeysSet = new HashSet<String>();
private final Set<String> excludeKeysSet = new HashSet<String>();
private final HashMap<String, Object> defaultKeysMap = new HashMap<>();
private final Set<String> defaultKeysSet = new HashSet<String>();
private final String lowercaseKey = "lowercase";
private final String uppercaseKey = "uppercase";
private final String capitalizeKey = "capitalize";
Expand Down Expand Up @@ -102,9 +104,19 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces

includeKeysSet.addAll(keyValueProcessorConfig.getIncludeKeys());
excludeKeysSet.addAll(keyValueProcessorConfig.getExcludeKeys());
defaultKeysMap.putAll(keyValueProcessorConfig.getDefaultKeys());
if (!defaultKeysMap.isEmpty()) {
defaultKeysSet.addAll(defaultKeysMap.keySet());
}

validateKeySets(includeKeysSet, excludeKeysSet);
validateKeySets(includeKeysSet, excludeKeysSet, defaultKeysSet);

final Set<String> includeDefaultCheckSet = new HashSet<String>(includeKeysSet);
includeDefaultCheckSet.retainAll(defaultKeysSet);
if (!includeDefaultCheckSet.isEmpty()) {
shenkw1 marked this conversation as resolved.
Show resolved Hide resolved
includeKeysSet.removeAll(defaultKeysSet);
}
shenkw1 marked this conversation as resolved.
Show resolved Hide resolved

if (!validTransformOptionSet.contains(keyValueProcessorConfig.getTransformKey())) {
throw new IllegalArgumentException(String.format("The transform_key value: %s is not a valid option", keyValueProcessorConfig.getTransformKey()));
}
Expand Down Expand Up @@ -155,12 +167,19 @@ private boolean validateRegex(final String pattern)
return true;
}

private void validateKeySets(final Set<String> includeSet, final Set<String> excludeSet) {
Set<String> intersectionSet = new HashSet<String>(includeSet);
intersectionSet.retainAll(excludeSet);
if (!intersectionSet.isEmpty()) {
private void validateKeySets(final Set<String> includeSet, final Set<String> excludeSet, final Set<String> defaultSet) {
final Set<String> includeIntersectionSet = new HashSet<String>(includeSet);
final Set<String> defaultIntersectionSet = new HashSet<String>(defaultSet);

includeIntersectionSet.retainAll(excludeSet);
if (!includeIntersectionSet.isEmpty()) {
throw new IllegalArgumentException("Include keys and exclude keys set cannot have any overlap", null);
}

defaultIntersectionSet.retainAll(excludeSet);
if (!defaultIntersectionSet.isEmpty()) {
throw new IllegalArgumentException("Cannot exclude a default key!", null);
shenkw1 marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
Expand All @@ -171,6 +190,9 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor

final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class);
final String[] groups = fieldDelimiterPattern.split(groupsRaw, 0);

parsedMap.putAll(defaultKeysMap);
shenkw1 marked this conversation as resolved.
Show resolved Hide resolved

for(final String group : groups) {
final String[] terms = keyValueDelimiterPattern.split(group, 2);
String key = terms[0];
Expand All @@ -186,6 +208,11 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
continue;
}

if (!defaultKeysSet.isEmpty() && defaultKeysSet.contains(key)) {
shenkw1 marked this conversation as resolved.
Show resolved Hide resolved
LOG.debug(String.format("Skipping already included default key: '%s'", key));
continue;
}

if(keyValueProcessorConfig.getDeleteKeyRegex() != null && !Objects.equals(keyValueProcessorConfig.getDeleteKeyRegex(), "")) {
key = key.replaceAll(keyValueProcessorConfig.getDeleteKeyRegex(), "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class KeyValueProcessorConfig {
static final String DEFAULT_SOURCE = "message";
static final String DEFAULT_DESTINATION = "parsed_message";
public static final String DEFAULT_FIELD_SPLIT_CHARACTERS = "&";
static final List<String> DEFAULT_INCLUDE_KEYS = new ArrayList<>();
static final List<String> DEFAULT_EXCLUDE_KEYS = new ArrayList<>();
static final Map<String, Object> DEFAULT_DEFAULT_KEYS = Map.of();
public static final String DEFAULT_VALUE_SPLIT_CHARACTERS = "=";
static final Object DEFAULT_NON_MATCH_VALUE = null;
static final String DEFAULT_PREFIX = "";
Expand Down Expand Up @@ -49,6 +51,10 @@ public class KeyValueProcessorConfig {
@NotNull
private List<String> excludeKeys = DEFAULT_EXCLUDE_KEYS;

@JsonProperty("default_keys")
@NotNull
private Map<String, Object> defaultKeys = DEFAULT_DEFAULT_KEYS;

@JsonProperty("key_value_delimiter_regex")
private String keyValueDelimiterRegex;

Expand Down Expand Up @@ -109,6 +115,10 @@ public List<String> getExcludeKeys() {
return excludeKeys;
}

public Map<String, Object> getDefaultKeys() {
return defaultKeys;
}

public String getKeyValueDelimiterRegex() {
return keyValueDelimiterRegex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ void setup() {
lenient().when(mockConfig.getFieldSplitCharacters()).thenReturn(defaultConfig.getFieldSplitCharacters());
lenient().when(mockConfig.getIncludeKeys()).thenReturn(defaultConfig.getIncludeKeys());
lenient().when(mockConfig.getExcludeKeys()).thenReturn(defaultConfig.getExcludeKeys());
lenient().when(mockConfig.getDefaultKeys()).thenReturn(defaultConfig.getDefaultKeys());
lenient().when(mockConfig.getKeyValueDelimiterRegex()).thenReturn(defaultConfig.getKeyValueDelimiterRegex());
lenient().when(mockConfig.getValueSplitCharacters()).thenReturn(defaultConfig.getValueSplitCharacters());
lenient().when(mockConfig.getNonMatchValue()).thenReturn(defaultConfig.getNonMatchValue());
Expand Down Expand Up @@ -315,6 +316,65 @@ void testIncludeExcludeKeysOverlapKeyValueProcessor() {
assertThrows(IllegalArgumentException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig));
}

@Test
void testDefaultKeysKeyValueProcessor() {
shenkw1 marked this conversation as resolved.
Show resolved Hide resolved
final Map<String, Object> defaultMap = Map.of("dKey", "dValue");
when(mockConfig.getDefaultKeys()).thenReturn(defaultMap);
keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig);

final Record<Event> record = getMessage("key1=value1");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final LinkedHashMap<String, Object> parsed_message = getLinkedHashMap(editedRecords);
shenkw1 marked this conversation as resolved.
Show resolved Hide resolved

assertThat(parsed_message.size(), equalTo(2));
assertThatKeyEquals(parsed_message, "key1", "value1");
assertThatKeyEquals(parsed_message, "dKey", "dValue");
}

@Test
void testDefaultIncludeKeysOverlapKeyValueProcessor() {
final Map<String, Object> defaultMap = Map.of("dKey", "dValue");
final List<String> includeKeys = List.of("dKey");
when(mockConfig.getDefaultKeys()).thenReturn(defaultMap);
when(mockConfig.getIncludeKeys()).thenReturn(includeKeys);
keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig);

final Record<Event> record = getMessage("key1=value1");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final LinkedHashMap<String, Object> parsed_message = getLinkedHashMap(editedRecords);

assertThat(parsed_message.size(), equalTo(2));
assertThatKeyEquals(parsed_message, "key1", "value1");
shenkw1 marked this conversation as resolved.
Show resolved Hide resolved
assertThatKeyEquals(parsed_message, "dKey", "dValue");
}

@Test
void testDefaultKeysAlreadyInMessageKeyValueProcessor() {
final Map<String, Object> defaultMap = Map.of("dKey", "dValue");
when(mockConfig.getDefaultKeys()).thenReturn(defaultMap);
keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig);

final Record<Event> record = getMessage("key1=value1&dKey=dValue");
shenkw1 marked this conversation as resolved.
Show resolved Hide resolved
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final LinkedHashMap<String, Object> parsed_message = getLinkedHashMap(editedRecords);

assertThat(parsed_message.size(), equalTo(2));
assertThatKeyEquals(parsed_message, "key1", "value1");
assertThatKeyEquals(parsed_message, "dKey", "dValue");
}
shenkw1 marked this conversation as resolved.
Show resolved Hide resolved



@Test
void testDefaultExcludeKeysOverlapKeyValueProcessor() {
final Map<String, Object> defaultMap = Map.of("dKey", "dValue");
final List<String> excludeKeys = List.of("dKey");
when(mockConfig.getDefaultKeys()).thenReturn(defaultMap);
when(mockConfig.getExcludeKeys()).thenReturn(excludeKeys);

assertThrows(IllegalArgumentException.class, () -> new KeyValueProcessor(pluginMetrics, mockConfig));
}

shenkw1 marked this conversation as resolved.
Show resolved Hide resolved
@Test
void testCustomPrefixKvProcessor() {
when(mockConfig.getPrefix()).thenReturn("TEST_");
Expand Down
Loading