Skip to content

Commit

Permalink
Tags events that fail for all exceptions. Resolves opensearch-project…
Browse files Browse the repository at this point in the history
…#4031

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable committed Jan 30, 2024
1 parent a255822 commit a52adae
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,13 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
runWithTimeout(() -> grokProcessingTime.record(() -> matchAndMerge(event)));
}

} catch (TimeoutException e) {
} catch (final TimeoutException e) {
event.getMetadata().addTags(tagsOnMatchFailure);
LOG.error(EVENT, "Matching on record [{}] took longer than [{}] and timed out", record.getData(), grokProcessorConfig.getTimeoutMillis());
grokProcessingTimeoutsCounter.increment();
} catch (ExecutionException e) {
LOG.error(EVENT, "An exception occurred while matching on record [{}]", record.getData(), e);
grokProcessingErrorsCounter.increment();
} catch (InterruptedException e) {
LOG.error(EVENT, "Matching on record [{}] was interrupted", record.getData(), e);
grokProcessingErrorsCounter.increment();
} catch (RuntimeException e) {
} catch (final ExecutionException | InterruptedException | RuntimeException e) {
event.getMetadata().addTags(tagsOnMatchFailure);
LOG.error(EVENT, "Unknown exception occurred when matching record [{}]", record.getData(), e);
LOG.error(EVENT, "An exception occurred when matching record [{}]", record.getData(), e);
grokProcessingErrorsCounter.increment();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.grok;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
Expand Down Expand Up @@ -41,6 +43,7 @@
import java.util.concurrent.TimeoutException;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -513,30 +516,92 @@ public void testNoCaptures() throws JsonProcessingException {
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter);
}

@Test
public void testNoCapturesWithTag() throws JsonProcessingException {
final String tagOnMatchFailure1 = UUID.randomUUID().toString();
final String tagOnMatchFailure2 = UUID.randomUUID().toString();
pluginSetting.getSettings().put(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, List.of(tagOnMatchFailure1, tagOnMatchFailure2));

grokProcessor = createObjectUnderTest();
lenient().when(grokSecondMatch.match(messageInput)).thenReturn(secondMatch);
lenient().when(secondMatch.capture()).thenReturn(secondCapture);

final Map<String, Object> testData = new HashMap();
testData.put("message", messageInput);
final Record<Event> record = buildRecordWithEvent(testData);

final List<Record<Event>> grokkedRecords = (List<Record<Event>>) grokProcessor.doExecute(Collections.singletonList(record));

assertThat(grokkedRecords.size(), equalTo(1));
assertThat(grokkedRecords.get(0), notNullValue());
assertRecordsAreEqual(grokkedRecords.get(0), record);
assertTrue(((Event)record.getData()).getMetadata().getTags().contains(tagOnMatchFailure1));
assertTrue(((Event)record.getData()).getMetadata().getTags().contains(tagOnMatchFailure2));
verify(grokProcessingMismatchCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter);
@Nested
class WithTags {
private String tagOnMatchFailure1;
private String tagOnMatchFailure2;

@BeforeEach
void setUp() {
tagOnMatchFailure1 = UUID.randomUUID().toString();
tagOnMatchFailure2 = UUID.randomUUID().toString();
pluginSetting.getSettings().put(GrokProcessorConfig.TAGS_ON_MATCH_FAILURE, List.of(tagOnMatchFailure1, tagOnMatchFailure2));
}

@Test
public void testNoCapturesWithTag() throws JsonProcessingException {
grokProcessor = createObjectUnderTest();
lenient().when(grokSecondMatch.match(messageInput)).thenReturn(secondMatch);
lenient().when(secondMatch.capture()).thenReturn(secondCapture);

final Map<String, Object> testData = new HashMap();
testData.put("message", messageInput);
final Record<Event> record = buildRecordWithEvent(testData);

final List<Record<Event>> grokkedRecords = (List<Record<Event>>) grokProcessor.doExecute(Collections.singletonList(record));

assertThat(grokkedRecords.size(), equalTo(1));
assertThat(grokkedRecords.get(0), notNullValue());
assertRecordsAreEqual(grokkedRecords.get(0), record);
assertTrue(((Event) record.getData()).getMetadata().getTags().contains(tagOnMatchFailure1));
assertTrue(((Event) record.getData()).getMetadata().getTags().contains(tagOnMatchFailure2));
verify(grokProcessingMismatchCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter);
}

@Test
public void timeout_exception_tags_the_event() throws JsonProcessingException, TimeoutException, ExecutionException, InterruptedException {
when(task.get(GrokProcessorConfig.DEFAULT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)).thenThrow(TimeoutException.class);

grokProcessor = createObjectUnderTest();

capture.put("key_capture_1", "value_capture_1");
capture.put("key_capture_2", "value_capture_2");
capture.put("key_capture_3", "value_capture_3");

final Map<String, Object> testData = new HashMap();
testData.put("message", messageInput);
final Record<Event> record = buildRecordWithEvent(testData);

final List<Record<Event>> grokkedRecords = (List<Record<Event>>) grokProcessor.doExecute(Collections.singletonList(record));

assertThat(grokkedRecords.size(), equalTo(1));
assertThat(grokkedRecords.get(0), notNullValue());
assertRecordsAreEqual(grokkedRecords.get(0), record);
assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure1));
assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure2));
verify(grokProcessingTimeoutsCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMismatchCounter);
}

@ParameterizedTest
@ValueSource(classes = {ExecutionException.class, InterruptedException.class, RuntimeException.class})
public void execution_exception_tags_the_event(Class<Exception> exceptionClass) throws JsonProcessingException, TimeoutException, ExecutionException, InterruptedException {
when(task.get(GrokProcessorConfig.DEFAULT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)).thenThrow(exceptionClass);

grokProcessor = createObjectUnderTest();

capture.put("key_capture_1", "value_capture_1");
capture.put("key_capture_2", "value_capture_2");
capture.put("key_capture_3", "value_capture_3");

final Map<String, Object> testData = new HashMap();
testData.put("message", messageInput);
final Record<Event> record = buildRecordWithEvent(testData);

final List<Record<Event>> grokkedRecords = (List<Record<Event>>) grokProcessor.doExecute(Collections.singletonList(record));

assertThat(grokkedRecords.size(), equalTo(1));
assertThat(grokkedRecords.get(0), notNullValue());
assertRecordsAreEqual(grokkedRecords.get(0), record);
assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure1));
assertThat(record.getData().getMetadata().getTags(), hasItem(tagOnMatchFailure2));
verify(grokProcessingErrorsCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
verifyNoInteractions(grokProcessingTimeoutsCounter, grokProcessingMismatchCounter);
}
}

@Test
Expand Down

0 comments on commit a52adae

Please sign in to comment.