Skip to content

Commit

Permalink
Support convert entry type on arrays (#4925)
Browse files Browse the repository at this point in the history
* Support convert entry type on arrays

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Fixed failing tests

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Kondaka <krishkdk@amazon.com>

---------

Signed-off-by: Kondaka <krishkdk@amazon.com>
  • Loading branch information
kkondaka committed Sep 27, 2024
1 parent a4460cc commit 69b1f9a
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@DataPrepperPlugin(name = "convert_entry_type", pluginType = Processor.class, pluginConfigurationType = ConvertEntryTypeProcessorConfig.class)
public class ConvertEntryTypeProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
Expand Down Expand Up @@ -80,7 +83,18 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
if (keyVal != null) {
if (!nullValues.contains(keyVal.toString())) {
try {
recordEvent.put(key, converter.convert(keyVal, converterArguments));
if (keyVal instanceof List || keyVal.getClass().isArray()) {
Stream<Object> inputStream;
if (keyVal.getClass().isArray()) {
inputStream = Arrays.stream((Object[])keyVal);
} else {
inputStream = ((List<Object>)keyVal).stream();
}
List<?> replacementList = inputStream.map(i -> converter.convert(i, converterArguments)).collect(Collectors.toList());
recordEvent.put(key, replacementList);
} else {
recordEvent.put(key, converter.convert(keyVal, converterArguments));
}
} catch (final RuntimeException e) {
LOG.error(EVENT, "Unable to convert key: {} with value: {} to {}", key, keyVal, type, e);
recordEvent.getMetadata().addTags(tagsOnFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.record.Record;

import java.util.ArrayList;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Stream;

Expand Down Expand Up @@ -111,6 +113,45 @@ void testStringToIntegerConvertEntryTypeProcessor() {
assertThat(event.get(TEST_KEY, Integer.class), equalTo(testValue));
}

@Test
void testArrayOfStringsToIntegerConvertEntryTypeProcessor() {
when(mockConfig.getType()).thenReturn(TargetType.fromOptionValue("integer"));
typeConversionProcessor = new ConvertEntryTypeProcessor(pluginMetrics, mockConfig, expressionEvaluator);

Random random = new Random();
Integer testValue1 = random.nextInt();
Integer testValue2 = random.nextInt();
Integer testValue3 = random.nextInt();
String[] inputArray = {testValue1.toString(), testValue2.toString(), testValue3.toString()};
List<Integer> expectedResult = new ArrayList<>();
expectedResult.add(testValue1);
expectedResult.add(testValue2);
expectedResult.add(testValue3);
Event event = executeAndGetProcessedEvent(inputArray);
assertThat(event.get(TEST_KEY, List.class), equalTo(expectedResult));
}

@Test
void testArrayListOfStringsToIntegerConvertEntryTypeProcessor() {
when(mockConfig.getType()).thenReturn(TargetType.fromOptionValue("integer"));
typeConversionProcessor = new ConvertEntryTypeProcessor(pluginMetrics, mockConfig, expressionEvaluator);

Random random = new Random();
Integer testValue1 = random.nextInt();
Integer testValue2 = random.nextInt();
Integer testValue3 = random.nextInt();
List<String> inputList = new ArrayList<>();
inputList.add(testValue1.toString());
inputList.add(testValue2.toString());
inputList.add(testValue3.toString());
List<Integer> expectedResult = new ArrayList<>();
expectedResult.add(testValue1);
expectedResult.add(testValue2);
expectedResult.add(testValue3);
Event event = executeAndGetProcessedEvent(inputList);
assertThat(event.get(TEST_KEY, List.class), equalTo(expectedResult));
}

@Test
void testBigDecimalToIntegerConvertEntryTypeProcessor() {
BigDecimal testValue = new BigDecimal(Integer.MAX_VALUE);
Expand Down

0 comments on commit 69b1f9a

Please sign in to comment.