diff --git a/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java b/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java index 5174eb9c61..f2ac0c2557 100644 --- a/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java +++ b/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java @@ -131,18 +131,13 @@ public Collection> doExecute(final Collection> recor runWithTimeout(() -> grokProcessingTime.record(() -> matchAndMerge(event))); } - } catch (TimeoutException e) { + } catch (final TimeoutException e) { + event.getMetadata().addTags(tagsOnMatchFailure); LOG.error(EVENT, "Matching on record [{}] took longer than [{}] and timed out", record.getData(), grokProcessorConfig.getTimeoutMillis()); grokProcessingTimeoutsCounter.increment(); - } catch (ExecutionException e) { - LOG.error(EVENT, "An exception occurred while matching on record [{}]", record.getData(), e); - grokProcessingErrorsCounter.increment(); - } catch (InterruptedException e) { - LOG.error(EVENT, "Matching on record [{}] was interrupted", record.getData(), e); - grokProcessingErrorsCounter.increment(); - } catch (RuntimeException e) { + } catch (final ExecutionException | InterruptedException | RuntimeException e) { event.getMetadata().addTags(tagsOnMatchFailure); - LOG.error(EVENT, "Unknown exception occurred when matching record [{}]", record.getData(), e); + LOG.error(EVENT, "An exception occurred when matching record [{}]", record.getData(), e); grokProcessingErrorsCounter.increment(); } } diff --git a/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java b/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java index e8f7e49982..45d61bf988 100644 --- a/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java +++ b/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.processor.grok; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -41,6 +43,7 @@ import java.util.concurrent.TimeoutException; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -513,30 +516,92 @@ public void testNoCaptures() throws JsonProcessingException { verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter); } - @Test - public void testNoCapturesWithTag() throws JsonProcessingException { - final String tagOnMatchFailure1 = UUID.randomUUID().toString(); - final String tagOnMatchFailure2 = UUID.randomUUID().toString(); - pluginSetting.getSettings().put(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, List.of(tagOnMatchFailure1, tagOnMatchFailure2)); - - grokProcessor = createObjectUnderTest(); - lenient().when(grokSecondMatch.match(messageInput)).thenReturn(secondMatch); - lenient().when(secondMatch.capture()).thenReturn(secondCapture); - - final Map testData = new HashMap(); - testData.put("message", messageInput); - final Record record = buildRecordWithEvent(testData); - - final List> grokkedRecords = (List>) grokProcessor.doExecute(Collections.singletonList(record)); - - assertThat(grokkedRecords.size(), equalTo(1)); - assertThat(grokkedRecords.get(0), notNullValue()); - assertRecordsAreEqual(grokkedRecords.get(0), record); - assertTrue(((Event)record.getData()).getMetadata().getTags().contains(tagOnMatchFailure1)); - assertTrue(((Event)record.getData()).getMetadata().getTags().contains(tagOnMatchFailure2)); - verify(grokProcessingMismatchCounter, times(1)).increment(); - verify(grokProcessingTime, times(1)).record(any(Runnable.class)); - verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter); + @Nested + class WithTags { + private String tagOnMatchFailure1; + private String tagOnMatchFailure2; + + @BeforeEach + void setUp() { + tagOnMatchFailure1 = UUID.randomUUID().toString(); + tagOnMatchFailure2 = UUID.randomUUID().toString(); + pluginSetting.getSettings().put(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, List.of(tagOnMatchFailure1, tagOnMatchFailure2)); + } + + @Test + public void testNoCapturesWithTag() throws JsonProcessingException { + grokProcessor = createObjectUnderTest(); + lenient().when(grokSecondMatch.match(messageInput)).thenReturn(secondMatch); + lenient().when(secondMatch.capture()).thenReturn(secondCapture); + + final Map testData = new HashMap(); + testData.put("message", messageInput); + final Record record = buildRecordWithEvent(testData); + + final List> grokkedRecords = (List>) grokProcessor.doExecute(Collections.singletonList(record)); + + assertThat(grokkedRecords.size(), equalTo(1)); + assertThat(grokkedRecords.get(0), notNullValue()); + assertRecordsAreEqual(grokkedRecords.get(0), record); + assertTrue(((Event) record.getData()).getMetadata().getTags().contains(tagOnMatchFailure1)); + assertTrue(((Event) record.getData()).getMetadata().getTags().contains(tagOnMatchFailure2)); + verify(grokProcessingMismatchCounter, times(1)).increment(); + verify(grokProcessingTime, times(1)).record(any(Runnable.class)); + verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter); + } + + @Test + public void timeout_exception_tags_the_event() throws JsonProcessingException, TimeoutException, ExecutionException, InterruptedException { + when(task.get(GrokProcessorConfig.DEFAULT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)).thenThrow(TimeoutException.class); + + grokProcessor = createObjectUnderTest(); + + capture.put("key_capture_1", "value_capture_1"); + capture.put("key_capture_2", "value_capture_2"); + capture.put("key_capture_3", "value_capture_3"); + + final Map testData = new HashMap(); + testData.put("message", messageInput); + final Record record = buildRecordWithEvent(testData); + + final List> grokkedRecords = (List>) grokProcessor.doExecute(Collections.singletonList(record)); + + assertThat(grokkedRecords.size(), equalTo(1)); + assertThat(grokkedRecords.get(0), notNullValue()); + assertRecordsAreEqual(grokkedRecords.get(0), record); + assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure1)); + assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure2)); + verify(grokProcessingTimeoutsCounter, times(1)).increment(); + verify(grokProcessingTime, times(1)).record(any(Runnable.class)); + verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMismatchCounter); + } + + @ParameterizedTest + @ValueSource(classes = {ExecutionException.class, InterruptedException.class, RuntimeException.class}) + public void execution_exception_tags_the_event(Class exceptionClass) throws JsonProcessingException, TimeoutException, ExecutionException, InterruptedException { + when(task.get(GrokProcessorConfig.DEFAULT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)).thenThrow(exceptionClass); + + grokProcessor = createObjectUnderTest(); + + capture.put("key_capture_1", "value_capture_1"); + capture.put("key_capture_2", "value_capture_2"); + capture.put("key_capture_3", "value_capture_3"); + + final Map testData = new HashMap(); + testData.put("message", messageInput); + final Record record = buildRecordWithEvent(testData); + + final List> grokkedRecords = (List>) grokProcessor.doExecute(Collections.singletonList(record)); + + assertThat(grokkedRecords.size(), equalTo(1)); + assertThat(grokkedRecords.get(0), notNullValue()); + assertRecordsAreEqual(grokkedRecords.get(0), record); + assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure1)); + assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure2)); + verify(grokProcessingErrorsCounter, times(1)).increment(); + verify(grokProcessingTime, times(1)).record(any(Runnable.class)); + verifyNoInteractions(grokProcessingTimeoutsCounter, grokProcessingMismatchCounter); + } } @Test