Skip to content

Commit

Permalink
Add delete_source parameter to the csv processor (opensearch-project#…
Browse files Browse the repository at this point in the history
…4828)

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 authored Aug 12, 2024
1 parent aa50a1d commit 2f21a43
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
if (thisEventHasHeaderSource && Boolean.TRUE.equals(config.isDeleteHeader())) {
event.delete(config.getColumnNamesSourceKey());
}

if (config.isDeleteSource()) {
event.delete(config.getSource());
}
} catch (final IOException e) {
csvInvalidEventsCounter.increment();
LOG.error(EVENT, "An exception occurred while reading event [{}]", event, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public class CsvProcessorConfig {
"the processor should be applied to the event.")
private String csvWhen;

@JsonPropertyDescription("If true, the configured source field will be deleted after the CSV data is parsed into separate fields.")
@JsonProperty
private boolean deleteSource = false;

/**
* The field of the Event that contains the CSV data to be processed.
*
Expand Down Expand Up @@ -120,6 +124,8 @@ public List<String> getColumnNames() {

public String getCsvWhen() { return csvWhen; }

public Boolean isDeleteSource() { return deleteSource; }

@AssertTrue(message = "delimiter must be exactly one character.")
boolean isValidDelimiter() {
return delimiter.length() == 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void setup() {
lenient().when(processorConfig.getQuoteCharacter()).thenReturn(defaultConfig.getQuoteCharacter());
lenient().when(processorConfig.getColumnNamesSourceKey()).thenReturn(defaultConfig.getColumnNamesSourceKey());
lenient().when(processorConfig.getColumnNames()).thenReturn(defaultConfig.getColumnNames());
lenient().when(processorConfig.isDeleteSource()).thenReturn(false);

lenient().when(pluginMetrics.counter(CsvProcessor.CSV_INVALID_EVENTS)).thenReturn(csvInvalidEventsCounter);

Expand All @@ -66,6 +67,24 @@ private CsvProcessor createObjectUnderTest() {
return new CsvProcessor(pluginMetrics, processorConfig, expressionEvaluator);
}

@Test
void delete_source_true_deletes_the_source() {
when(processorConfig.isDeleteSource()).thenReturn(true);

when(processorConfig.getSource()).thenReturn("different_source");

final Map<String, Object> eventData = new HashMap<>();
eventData.put("different_source","1,2,3");
final Record<Event> eventUnderTest = buildRecordWithEvent(eventData);

final List<Record<Event>> editedEvents = (List<Record<Event>>) csvProcessor.doExecute(Collections.singletonList(eventUnderTest));
final Event parsedEvent = getSingleEvent(editedEvents);
assertThat(parsedEvent.containsKey("different_source"), equalTo(false));
assertThatKeyEquals(parsedEvent, "column1", "1");
assertThatKeyEquals(parsedEvent, "column2", "2");
assertThatKeyEquals(parsedEvent, "column3", "3");
}

@Test
void do_nothing_when_source_is_null_value_or_does_not_exist_in_the_Event() {

Expand Down

0 comments on commit 2f21a43

Please sign in to comment.