diff --git a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/HandleFailedEventsOption.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/HandleFailedEventsOption.java similarity index 61% rename from data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/HandleFailedEventsOption.java rename to data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/HandleFailedEventsOption.java index b3f4532e65..6c310eb395 100644 --- a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/HandleFailedEventsOption.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/HandleFailedEventsOption.java @@ -3,23 +3,19 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.processor.drop; +package org.opensearch.dataprepper.model.event; -import org.opensearch.dataprepper.model.event.Event; import com.fasterxml.jackson.annotation.JsonCreator; -import org.slf4j.Logger; import java.util.Arrays; import java.util.Map; import java.util.stream.Collectors; -import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; - -enum HandleFailedEventsOption { - DROP("drop", true, false), - DROP_SILENTLY("drop_silently", true, true), - SKIP("skip", false, false), - SKIP_SILENTLY("skip_silently", false, true); +public enum HandleFailedEventsOption { + DROP("drop", true, true), + DROP_SILENTLY("drop_silently", true, false), + SKIP("skip", false, true), + SKIP_SILENTLY("skip_silently", false, false); private static final Map OPTIONS_MAP = Arrays.stream(HandleFailedEventsOption.values()) .collect(Collectors.toMap( @@ -37,13 +33,14 @@ enum HandleFailedEventsOption { this.isLogRequired = isLogRequired; } - public boolean isDropEventOption(final Event event, final Throwable cause, final Logger log) { - if (isLogRequired) { - log.warn(EVENT, "An exception occurred while processing when expression for event {}", event, cause); - } + public boolean shouldDropEvent() { return isDropEventOption; } + public boolean shouldLog() { + return isLogRequired; + } + @JsonCreator static HandleFailedEventsOption fromOptionValue(final String option) { return OPTIONS_MAP.get(option.toLowerCase()); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/HandleFailedEventsOptionTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/HandleFailedEventsOptionTest.java new file mode 100644 index 0000000000..90a319ad24 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/HandleFailedEventsOptionTest.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.event; + +import org.hamcrest.CoreMatchers; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +class HandleFailedEventsOptionTest { + @ParameterizedTest + @EnumSource(HandleFailedEventsOption.class) + void fromOptionValue(final HandleFailedEventsOption option) { + assertThat(HandleFailedEventsOption.fromOptionValue(option.name()), CoreMatchers.is(option)); + + if (option == HandleFailedEventsOption.SKIP || option == HandleFailedEventsOption.SKIP_SILENTLY) { + assertThat(option.shouldDropEvent(), equalTo(false)); + } else { + assertThat(option.shouldDropEvent(), equalTo(true)); + } + + if (option == HandleFailedEventsOption.SKIP_SILENTLY || option == HandleFailedEventsOption.DROP_SILENTLY) { + assertThat(option.shouldLog(), equalTo(false)); + } else { + assertThat(option.shouldLog(), equalTo(true)); + } + } +} diff --git a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfig.java b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfig.java index 587a482064..ecc2d2d065 100644 --- a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfig.java +++ b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfig.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.NotEmpty; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; public class DropEventProcessorConfig { @JsonProperty("drop_when") diff --git a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenCondition.java b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenCondition.java index 7e2887e320..8d74ba6efb 100644 --- a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenCondition.java +++ b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenCondition.java @@ -7,11 +7,14 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; + /** * @since 1.3 * @@ -57,7 +60,10 @@ public boolean isStatementFalseWith(final Event event) { try { return !expressionEvaluator.evaluateConditional(dropWhen, event); } catch (final Exception e) { - return handleFailedEventsSetting.isDropEventOption(event, e, LOG); + if (handleFailedEventsSetting.shouldLog()) { + LOG.warn(EVENT, "An exception occurred while processing when expression for event [{}]", event, e); + } + return handleFailedEventsSetting.shouldDropEvent(); } } diff --git a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfigTest.java b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfigTest.java index 5cbdf91aad..84669606a3 100644 --- a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfigTest.java +++ b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfigTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; diff --git a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessorTests.java b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessorTests.java index 3c5694f632..1af2146139 100644 --- a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessorTests.java +++ b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessorTests.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; import org.junit.jupiter.api.BeforeEach; diff --git a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionBuilderTest.java b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionBuilderTest.java index 860b7042ae..a962e37b39 100644 --- a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionBuilderTest.java +++ b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionBuilderTest.java @@ -10,6 +10,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import java.util.UUID; diff --git a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionTest.java b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionTest.java index 98ec1536fc..8b309210db 100644 --- a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionTest.java +++ b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsWhenConditionTest.java @@ -15,6 +15,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import java.util.UUID; import java.util.stream.Stream; diff --git a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/HandleFailedEventsOptionTest.java b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/HandleFailedEventsOptionTest.java deleted file mode 100644 index 04b377ad2a..0000000000 --- a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/HandleFailedEventsOptionTest.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.processor.drop; - -import org.hamcrest.CoreMatchers; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; - -import static org.hamcrest.MatcherAssert.assertThat; - -class HandleFailedEventsOptionTest { - @ParameterizedTest - @EnumSource(HandleFailedEventsOption.class) - void fromOptionValue(final HandleFailedEventsOption option) { - assertThat(HandleFailedEventsOption.fromOptionValue(option.name()), CoreMatchers.is(option)); - } -} diff --git a/data-prepper-plugins/parse-json-processor/build.gradle b/data-prepper-plugins/parse-json-processor/build.gradle index 5125409731..91275eb799 100644 --- a/data-prepper-plugins/parse-json-processor/build.gradle +++ b/data-prepper-plugins/parse-json-processor/build.gradle @@ -10,6 +10,7 @@ plugins { dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:common') + implementation 'io.micrometer:micrometer-core' implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java index ffb0855590..18acb3dfd8 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java @@ -11,9 +11,11 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventKey; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.record.Record; +import io.micrometer.core.instrument.Counter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +33,7 @@ public abstract class AbstractParseProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(AbstractParseProcessor.class); + private static final String PROCESSING_FAILURES = "processingFailures"; private final EventKey source; private final EventKey destination; @@ -40,6 +43,10 @@ public abstract class AbstractParseProcessor extends AbstractProcessor> doExecute(final Collection> recor if(deleteSourceRequested) { event.delete(this.source); } - } catch (final Exception e) { - LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e); + } catch (Exception e) { + processingFailuresCounter.increment(); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e); + } } } return records; diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java index 5fd5050b3d..f10537bc7c 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/CommonParseConfig.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.processor.parse; import java.util.List; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; public interface CommonParseConfig { /** @@ -59,4 +60,10 @@ public interface CommonParseConfig { * Defaults to false. */ boolean isDeleteSourceRequested(); + + /** + * An optional setting used to determine how to handle parsing errors. Default is skip, which includes logging the error + * and passing the failed Event downstream to the next processor. + */ + HandleFailedEventsOption getHandleFailedEventsOption(); } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java index 9d2677e0be..4bfb88ded6 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessor.java @@ -8,12 +8,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.dataformat.ion.IonObjectMapper; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.slf4j.Logger; @@ -27,9 +29,14 @@ @DataPrepperPlugin(name = "parse_ion", pluginType = Processor.class, pluginConfigurationType = ParseIonProcessorConfig.class) public class ParseIonProcessor extends AbstractParseProcessor { private static final Logger LOG = LoggerFactory.getLogger(ParseIonProcessor.class); + private static final String PARSE_ERRORS = "parseErrors"; private final IonObjectMapper objectMapper = new IonObjectMapper(); + private final Counter parseErrorsCounter; + + private final HandleFailedEventsOption handleFailedEventsOption; + @DataPrepperPluginConstructor public ParseIonProcessor(final PluginMetrics pluginMetrics, final ParseIonProcessorConfig parseIonProcessorConfig, @@ -39,6 +46,9 @@ public ParseIonProcessor(final PluginMetrics pluginMetrics, // Convert Timestamps to ISO-8601 Z strings objectMapper.registerModule(new IonTimestampConverterModule()); + + handleFailedEventsOption = parseIonProcessorConfig.getHandleFailedEventsOption(); + parseErrorsCounter = pluginMetrics.counter(PARSE_ERRORS); } @Override @@ -47,10 +57,16 @@ protected Optional> readValue(String message, Event cont // We need to do a two-step process here, read the value in, then convert away any Ion types like Timestamp return Optional.of(objectMapper.convertValue(objectMapper.readValue(message, new TypeReference<>() {}), new TypeReference<>() {})); } catch (JsonProcessingException e) { - LOG.error(SENSITIVE, "An exception occurred due to invalid Ion while parsing [{}] due to {}", message, e.getMessage()); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(SENSITIVE, "An exception occurred due to invalid Ion while parsing [{}] due to {}", message, e.getMessage()); + } + parseErrorsCounter.increment(); return Optional.empty(); } catch (Exception e) { - LOG.error(SENSITIVE, "An exception occurred while using the parse_ion processor while parsing [{}]", message, e); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(SENSITIVE, "An exception occurred while using the parse_ion processor while parsing [{}]", message, e); + } + processingFailuresCounter.increment(); return Optional.empty(); } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java index fcc2950477..6fad364e17 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java @@ -6,8 +6,11 @@ package org.opensearch.dataprepper.plugins.processor.parse.ion; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; import java.util.List; @@ -38,6 +41,14 @@ public class ParseIonProcessorConfig implements CommonParseConfig { @JsonProperty private boolean deleteSource = false; + @JsonProperty("handle_failed_events") + @JsonPropertyDescription("Determines how to handle events with ION processing errors. Options include 'skip', " + + "which will log the error and send the Event downstream to the next processor, and 'skip_silently', " + + "which will send the Event downstream to the next processor without logging the error. " + + "Default is 'skip'.") + @NotNull + private HandleFailedEventsOption handleFailedEventsOption = HandleFailedEventsOption.SKIP; + @Override public String getSource() { return source; @@ -78,4 +89,22 @@ boolean isValidDestination() { public boolean isDeleteSourceRequested() { return deleteSource; } + + @Override + public HandleFailedEventsOption getHandleFailedEventsOption() { + return handleFailedEventsOption; + } + + @AssertTrue(message = "handled_failed_events must be set to 'skip' or 'skip_silently'.") + boolean isHandleFailedEventsOptionValid() { + if (handleFailedEventsOption == null) { + return true; + } + + if (handleFailedEventsOption.shouldDropEvent()) { + return false; + } + + return true; + } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java index 637cbdea0d..407b59fab1 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java @@ -8,12 +8,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.slf4j.Logger; @@ -27,15 +29,21 @@ @DataPrepperPlugin(name = "parse_json", pluginType = Processor.class, pluginConfigurationType = ParseJsonProcessorConfig.class) public class ParseJsonProcessor extends AbstractParseProcessor { private static final Logger LOG = LoggerFactory.getLogger(ParseJsonProcessor.class); + private static final String PARSE_ERRORS = "parseErrors"; private final ObjectMapper objectMapper = new ObjectMapper(); + private final HandleFailedEventsOption handleFailedEventsOption; + private final Counter parseErrorsCounter; + @DataPrepperPluginConstructor public ParseJsonProcessor(final PluginMetrics pluginMetrics, final ParseJsonProcessorConfig parseJsonProcessorConfig, final ExpressionEvaluator expressionEvaluator, final EventKeyFactory eventKeyFactory) { super(pluginMetrics, parseJsonProcessorConfig, expressionEvaluator, eventKeyFactory); + this.handleFailedEventsOption = parseJsonProcessorConfig.getHandleFailedEventsOption(); + parseErrorsCounter = pluginMetrics.counter(PARSE_ERRORS); } @Override @@ -43,10 +51,16 @@ protected Optional> readValue(String message, Event cont try { return Optional.of(objectMapper.readValue(message, new TypeReference<>() {})); } catch (JsonProcessingException e) { - LOG.error(SENSITIVE, "An exception occurred due to invalid JSON while parsing [{}] due to {}", message, e.getMessage()); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(SENSITIVE, "An exception occurred due to invalid JSON while parsing [{}] due to {}", message, e.getMessage()); + } + parseErrorsCounter.increment(); return Optional.empty(); } catch (Exception e) { - LOG.error(SENSITIVE, "An exception occurred while using the parse_json processor while parsing [{}]", message, e); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(SENSITIVE, "An exception occurred while using the parse_json processor while parsing [{}]", message, e); + } + processingFailuresCounter.increment(); return Optional.empty(); } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java index 49ff2a5969..b6a1b14a23 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java @@ -6,8 +6,11 @@ package org.opensearch.dataprepper.plugins.processor.parse.json; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; import java.util.Objects; @@ -38,6 +41,14 @@ public class ParseJsonProcessorConfig implements CommonParseConfig { @JsonProperty private boolean deleteSource = false; + @JsonProperty("handle_failed_events") + @JsonPropertyDescription("Determines how to handle events with JSON processing errors. Options include 'skip', " + + "which will log the error and send the Event downstream to the next processor, and 'skip_silently', " + + "which will send the Event downstream to the next processor without logging the error. " + + "Default is 'skip'.") + @NotNull + private HandleFailedEventsOption handleFailedEventsOption = HandleFailedEventsOption.SKIP; + @Override public String getSource() { return source; @@ -71,6 +82,11 @@ public boolean isDeleteSourceRequested() { return deleteSource; } + @Override + public HandleFailedEventsOption getHandleFailedEventsOption() { + return handleFailedEventsOption; + } + @AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)") boolean isValidDestination() { if (Objects.isNull(destination)) return true; @@ -78,4 +94,17 @@ boolean isValidDestination() { final String trimmedDestination = destination.trim(); return !trimmedDestination.isEmpty() && !(trimmedDestination.equals("/")); } + + @AssertTrue(message = "handled_failed_events must be set to 'skip' or 'skip_silently'.") + boolean isHandleFailedEventsOptionValid() { + if (handleFailedEventsOption == null) { + return true; + } + + if (handleFailedEventsOption.shouldDropEvent()) { + return false; + } + + return true; + } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java index 984a49964a..9dced55355 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessor.java @@ -3,12 +3,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.slf4j.Logger; @@ -22,8 +24,12 @@ @DataPrepperPlugin(name = "parse_xml", pluginType =Processor.class, pluginConfigurationType =ParseXmlProcessorConfig.class) public class ParseXmlProcessor extends AbstractParseProcessor { private static final Logger LOG = LoggerFactory.getLogger(ParseXmlProcessor.class); + private static final String PARSE_ERRORS = "parseErrors"; private final XmlMapper xmlMapper = new XmlMapper(); + private final Counter parseErrorsCounter; + + private final HandleFailedEventsOption handleFailedEventsOption; @DataPrepperPluginConstructor public ParseXmlProcessor(final PluginMetrics pluginMetrics, @@ -31,6 +37,9 @@ public ParseXmlProcessor(final PluginMetrics pluginMetrics, final ExpressionEvaluator expressionEvaluator, final EventKeyFactory eventKeyFactory) { super(pluginMetrics, parseXmlProcessorConfig, expressionEvaluator, eventKeyFactory); + + handleFailedEventsOption = parseXmlProcessorConfig.getHandleFailedEventsOption(); + parseErrorsCounter = pluginMetrics.counter(PARSE_ERRORS); } @Override @@ -38,10 +47,16 @@ protected Optional> readValue(final String message, fina try { return Optional.of(xmlMapper.readValue(message, new TypeReference<>() {})); } catch (JsonProcessingException e) { - LOG.error(SENSITIVE, "An exception occurred due to invalid XML while parsing [{}] due to {}", message, e.getMessage()); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(SENSITIVE, "An exception occurred due to invalid XML while parsing [{}] due to {}", message, e.getMessage()); + } + parseErrorsCounter.increment(); return Optional.empty(); } catch (Exception e) { - LOG.error(SENSITIVE, "An exception occurred while using the parse_xml processor while parsing [{}]", message, e); + if (handleFailedEventsOption.shouldLog()) { + LOG.error(SENSITIVE, "An exception occurred while using the parse_xml processor while parsing [{}]", message, e); + } + processingFailuresCounter.increment(); return Optional.empty(); } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java index c90173dc43..f84f2de4b6 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java @@ -1,8 +1,11 @@ package org.opensearch.dataprepper.plugins.processor.parse.xml; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; import java.util.List; @@ -33,6 +36,14 @@ public class ParseXmlProcessorConfig implements CommonParseConfig { @JsonProperty private boolean deleteSource = false; + @JsonProperty("handle_failed_events") + @JsonPropertyDescription("Determines how to handle events with XML processing errors. Options include 'skip', " + + "which will log the error and send the Event downstream to the next processor, and 'skip_silently', " + + "which will send the Event downstream to the next processor without logging the error. " + + "Default is 'skip'.") + @NotNull + private HandleFailedEventsOption handleFailedEventsOption = HandleFailedEventsOption.SKIP; + @Override public String getSource() { return source; @@ -75,4 +86,22 @@ boolean isValidDestination() { public boolean isDeleteSourceRequested() { return deleteSource; } + + @Override + public HandleFailedEventsOption getHandleFailedEventsOption() { + return handleFailedEventsOption; + } + + @AssertTrue(message = "handled_failed_events must be set to 'skip' or 'skip_silently'.") + boolean isHandleFailedEventsOptionValid() { + if (handleFailedEventsOption == null) { + return true; + } + + if (handleFailedEventsOption.shouldDropEvent()) { + return false; + } + + return true; + } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java index 8c47650c05..1768b701bb 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfigTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import java.util.List; @@ -29,6 +30,8 @@ public void test_when_defaultParseIonProcessorConfig_then_returns_default_values assertThat(objectUnderTest.getPointer(), equalTo(null)); assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null)); assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true)); + assertThat(objectUnderTest.getHandleFailedEventsOption(), equalTo(HandleFailedEventsOption.SKIP)); + assertThat(objectUnderTest.isHandleFailedEventsOptionValid(), equalTo(true)); } @Nested @@ -61,5 +64,19 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse( setField(ParseIonProcessorConfig.class, config, "deleteSource", true); assertThat(config.isDeleteSourceRequested(), equalTo(true)); } + + @Test + void isHandleFailedEventsOptionValid_returns_false_with_drop_option() throws NoSuchFieldException, IllegalAccessException { + setField(ParseIonProcessorConfig.class, config, "handleFailedEventsOption", HandleFailedEventsOption.DROP); + + assertThat(config.isHandleFailedEventsOptionValid(), equalTo(false)); + } + + @Test + void isHandleFailedEventsOptionValid_returns_true_with_null_handle_events() throws NoSuchFieldException, IllegalAccessException { + setField(ParseIonProcessorConfig.class, config, "handleFailedEventsOption", null); + + assertThat(config.isHandleFailedEventsOptionValid(), equalTo(true)); + } } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java index d7eb14c28a..8bd63c3eec 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.processor.parse.ion; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -16,6 +17,8 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -33,6 +36,12 @@ public void setup() { when(processorConfig.getPointer()).thenReturn(defaultConfig.getPointer()); when(processorConfig.getParseWhen()).thenReturn(null); when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(true); + + when(pluginMetrics.counter("recordsIn")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("recordsOut")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("processingFailures")).thenReturn(this.processingFailuresCounter); + when(pluginMetrics.counter("parseErrors")).thenReturn(this.parseErrorsCounter); + when(processorConfig.getHandleFailedEventsOption()).thenReturn(handleFailedEventsOption); } @Override @@ -53,6 +62,10 @@ void test_when_using_ion_features_then_processorParsesCorrectly() { assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL")); assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z")); assertThat(parsedEvent.get("attribute", Double.class), equalTo(100.0)); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -68,5 +81,9 @@ void test_when_deleteSourceFlagEnabled() { assertThat(parsedEvent.get("symbol", String.class), equalTo("SYMBOL")); assertThat(parsedEvent.get("timestamp", String.class), equalTo("2023-11-30T21:05:23.383Z")); assertThat(parsedEvent.get("attribute", Double.class), equalTo(100.0)); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java index aa138a0e7e..8d27120b36 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfigTest.java @@ -7,13 +7,14 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; + +import java.util.List; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; -import java.util.List; - public class ParseJsonProcessorConfigTest { private ParseJsonProcessorConfig createObjectUnderTest() { @@ -30,6 +31,8 @@ public void test_when_defaultParseJsonProcessorConfig_then_returns_default_value assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null)); assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true)); assertThat(objectUnderTest.isDeleteSourceRequested(), equalTo(false)); + assertThat(objectUnderTest.getHandleFailedEventsOption(), equalTo(HandleFailedEventsOption.SKIP)); + assertThat(objectUnderTest.isHandleFailedEventsOptionValid(), equalTo(true)); } @Nested @@ -62,5 +65,19 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse( setField(ParseJsonProcessorConfig.class, config, "deleteSource", true); assertThat(config.isDeleteSourceRequested(), equalTo(true)); } + + @Test + void isHandleFailedEventsOptionValid_returns_false_with_drop_option() throws NoSuchFieldException, IllegalAccessException { + setField(ParseJsonProcessorConfig.class, config, "handleFailedEventsOption", HandleFailedEventsOption.DROP); + + assertThat(config.isHandleFailedEventsOptionValid(), equalTo(false)); + } + + @Test + void isHandleFailedEventsOptionValid_returns_true_with_null_handle_events() throws NoSuchFieldException, IllegalAccessException { + setField(ParseJsonProcessorConfig.class, config, "handleFailedEventsOption", null); + + assertThat(config.isHandleFailedEventsOptionValid(), equalTo(true)); + } } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java index cf71f2251f..9aac54b23f 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.processor.parse.json; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -18,6 +19,7 @@ import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; @@ -34,6 +36,9 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -51,6 +56,15 @@ public class ParseJsonProcessorTest { @Mock protected ExpressionEvaluator expressionEvaluator; + @Mock + protected HandleFailedEventsOption handleFailedEventsOption; + + @Mock + protected Counter processingFailuresCounter; + + @Mock + protected Counter parseErrorsCounter; + protected AbstractParseProcessor parseJsonProcessor; private final EventFactory testEventFactory = TestEventFactory.getTestEventFactory(); protected final EventKeyFactory testEventKeyFactory = TestEventKeyFactory.getTestEventFactory(); @@ -64,6 +78,12 @@ public void setup() { when(processorConfig.getPointer()).thenReturn(defaultConfig.getPointer()); when(processorConfig.getParseWhen()).thenReturn(null); when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(true); + + when(pluginMetrics.counter("recordsIn")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("recordsOut")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("processingFailures")).thenReturn(processingFailuresCounter); + when(pluginMetrics.counter("parseErrors")).thenReturn(parseErrorsCounter); + when(processorConfig.getHandleFailedEventsOption()).thenReturn(handleFailedEventsOption); } protected AbstractParseProcessor createObjectUnderTest() { @@ -86,6 +106,10 @@ void test_when_differentSourceAndDestination_then_processorParsesCorrectly() { assertThat(parsedEvent.containsKey(destination), equalTo(true)); assertThatFirstMapIsSubsetOfSecondMap(data, parsedEvent.get(destination, Map.class)); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -104,6 +128,10 @@ void test_when_dataFieldEqualToRootField_then_overwritesOriginalFields() { assertThatKeyEquals(parsedEvent, source, "value_that_will_overwrite_source"); assertThatKeyEquals(parsedEvent, "key", "value"); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -119,6 +147,10 @@ void test_when_dataFieldEqualToRootField_then_notOverwritesOriginalFields() { final Event parsedEvent = createAndParseMessageEvent(serializedMessage); assertThatKeyEquals(parsedEvent, source, "{\"root_source\":\"value_that_will_not_be_overwritten\"}"); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -136,6 +168,10 @@ void test_when_dataFieldEqualToDestinationField_then_notOverwritesOriginalFields assertThatKeyEquals(parsedEvent, source, "{\"key\":\"value\"}"); assertThat(parsedEvent.containsKey("key"), equalTo(false)); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -149,6 +185,8 @@ void test_when_valueIsEmpty_then_notParsed() { assertThatKeyEquals(parsedEvent, processorConfig.getSource(), serializedMessage); assertThat(parsedEvent.toMap().size(), equalTo(1)); + + verify(parseErrorsCounter).increment(); } @Test @@ -164,6 +202,10 @@ void test_when_deeplyNestedFieldInRoot_then_canReachDeepestLayer() { assertThatKeyEquals(parsedEvent, DEEPLY_NESTED_KEY_NAME, messageMap.get(DEEPLY_NESTED_KEY_NAME)); final String jsonPointerToValue = constructDeeplyNestedJsonPointer(numberOfLayers); assertThat(parsedEvent.get(jsonPointerToValue, String.class), equalTo("value")); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -183,6 +225,10 @@ void test_when_deeplyNestedFieldInKey_then_canReachDeepestLayer() { final String jsonPointerToValue = destination + constructDeeplyNestedJsonPointer(numberOfLayers); assertThat(parsedEvent.get(jsonPointerToValue, String.class), equalTo("value")); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -198,6 +244,10 @@ void test_when_nestedJSONArray_then_parsedIntoArrayAndIndicesAccessible() { assertThat(parsedEvent.get(key, ArrayList.class), equalTo(value)); final String pointerToFirstElement = key + "/0"; assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0))); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -214,6 +264,10 @@ void test_when_deleteSourceFlagEnabled() { assertThat(parsedEvent.get(key, ArrayList.class), equalTo(value)); final String pointerToFirstElement = key + "/0"; assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0))); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -232,6 +286,10 @@ void test_when_nestedJSONArrayOfJSON_then_parsedIntoArrayAndIndicesAccessible() final String pointerToInternalValue = key + "/0/key0"; assertThat(parsedEvent.get(pointerToInternalValue, String.class), equalTo("value0")); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -250,6 +308,10 @@ void test_when_nestedJSONArrayOfJSONAndPointer_then_parsedIntoValue() { assertThat(parsedEvent.get("key0", String.class), equalTo("value0")); assertThat(parsedEvent.containsKey("key1"),equalTo(false)); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -263,6 +325,10 @@ void test_when_nestedJSONArrayAndIndexPointer_then_parsedIntoArrayAndIndicesAcce assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(true)); assertThat(parsedEvent.get("key.0", String.class), equalTo(value.get(0))); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -283,6 +349,10 @@ void test_when_pointerKeyAlreadyPresentInEvent_then_usesAbsolutePath() { assertThatKeyEquals(parsedEvent, "s3", data.get("s3")); assertThatKeyEquals(parsedEvent, "log.s3", Collections.singletonMap("data", "sample data")); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -298,6 +368,10 @@ void test_when_nestedDestinationField_then_writesToNestedDestination() { assertThat(parsedEvent.get(location, String.class), equalTo("value")); assertThat(parsedEvent.get(destination, Map.class), equalTo(data)); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -346,10 +420,16 @@ void test_when_condition_skips_processing_when_evaluates_to_false() { assertThat(parsedEvent.toMap(), equalTo(testEvent.getData().toMap())); + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(parseErrorsCounter); + verifyNoInteractions(handleFailedEventsOption); + } @Test void test_tags_when_json_parse_fails() { + when(handleFailedEventsOption.shouldLog()).thenReturn(true); + final String source = "different_source"; final String destination = "destination_key"; when(processorConfig.getSource()).thenReturn(source); @@ -364,10 +444,14 @@ void test_tags_when_json_parse_fails() { final Event parsedEvent = createAndParseMessageEvent(testEvent); assertTrue(parsedEvent.getMetadata().hasTags(testTags)); + + verify(parseErrorsCounter).increment(); } @Test void when_evaluate_conditional_throws_RuntimeException_events_are_not_dropped() { + when(handleFailedEventsOption.shouldLog()).thenReturn(true); + final String source = "different_source"; final String destination = "destination_key"; when(processorConfig.getSource()).thenReturn(source); @@ -383,6 +467,9 @@ void when_evaluate_conditional_throws_RuntimeException_events_are_not_dropped() final Event parsedEvent = createAndParseMessageEvent(testEvent); assertThat(parsedEvent.toMap(), equalTo(testEvent.getData().toMap())); + + verify(processingFailuresCounter).increment(); + verifyNoInteractions(parseErrorsCounter); } private String constructDeeplyNestedJsonPointer(final int numberOfLayers) { diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java index bab6d6e919..a0ef665124 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfigTest.java @@ -2,6 +2,7 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import java.util.List; @@ -24,6 +25,8 @@ public void test_when_defaultParseXmlProcessorConfig_then_returns_default_values assertThat(objectUnderTest.getPointer(), equalTo(null)); assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null)); assertThat(objectUnderTest.getOverwriteIfDestinationExists(), equalTo(true)); + assertThat(objectUnderTest.getHandleFailedEventsOption(), equalTo(HandleFailedEventsOption.SKIP)); + assertThat(objectUnderTest.isHandleFailedEventsOptionValid(), equalTo(true)); } @Nested @@ -56,5 +59,19 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse( setField(ParseXmlProcessorConfig.class, config, "deleteSource", true); assertThat(config.isDeleteSourceRequested(), equalTo(true)); } + + @Test + void isHandleFailedEventsOptionValid_returns_false_with_drop_option() throws NoSuchFieldException, IllegalAccessException { + setField(ParseXmlProcessorConfig.class, config, "handleFailedEventsOption", HandleFailedEventsOption.DROP); + + assertThat(config.isHandleFailedEventsOptionValid(), equalTo(false)); + } + + @Test + void isHandleFailedEventsOptionValid_returns_true_with_null_handle_events() throws NoSuchFieldException, IllegalAccessException { + setField(ParseXmlProcessorConfig.class, config, "handleFailedEventsOption", null); + + assertThat(config.isHandleFailedEventsOptionValid(), equalTo(true)); + } } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java index 5cd9037e5b..900a7a7bef 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java @@ -1,5 +1,9 @@ package org.opensearch.dataprepper.plugins.processor.parse.xml; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -13,8 +17,10 @@ import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import java.util.Collections; import java.util.HashMap; @@ -24,6 +30,11 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.processor.parse.xml.ParseXmlProcessorConfig.DEFAULT_SOURCE; @@ -40,6 +51,15 @@ public class ParseXmlProcessorTest { @Mock private ExpressionEvaluator expressionEvaluator; + @Mock + private Counter processingFailuresCounter; + + @Mock + private Counter parseErrorsCounter; + + @Mock + private HandleFailedEventsOption handleFailedEventsOption; + private AbstractParseProcessor parseXmlProcessor; private final EventFactory testEventFactory = TestEventFactory.getTestEventFactory(); private final EventKeyFactory testEventKeyFactory = TestEventKeyFactory.getTestEventFactory(); @@ -49,6 +69,11 @@ public void setup() { when(processorConfig.getSource()).thenReturn(DEFAULT_SOURCE); when(processorConfig.getParseWhen()).thenReturn(null); when(processorConfig.getOverwriteIfDestinationExists()).thenReturn(true); + when(pluginMetrics.counter("recordsIn")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("recordsOut")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("processingFailures")).thenReturn(processingFailuresCounter); + when(pluginMetrics.counter("parseErrors")).thenReturn(parseErrorsCounter); + when(processorConfig.getHandleFailedEventsOption()).thenReturn(handleFailedEventsOption); } protected AbstractParseProcessor createObjectUnderTest() { @@ -64,6 +89,9 @@ void test_when_using_xml_features_then_processorParsesCorrectly() { assertThat(parsedEvent.get("name", String.class), equalTo("John Doe")); assertThat(parsedEvent.get("age", String.class), equalTo("30")); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -80,6 +108,9 @@ void test_when_deleteSourceFlagEnabled() { assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(false)); assertThat(parsedEvent.get("name", String.class), equalTo("John Doe")); assertThat(parsedEvent.get("age", String.class), equalTo("30")); + + verifyNoInteractions(processingFailuresCounter); + verifyNoInteractions(handleFailedEventsOption); } @Test @@ -87,6 +118,7 @@ void test_when_using_invalid_xml_tags_correctly() { final String tagOnFailure = UUID.randomUUID().toString(); when(processorConfig.getTagsOnFailure()).thenReturn(List.of(tagOnFailure)); + when(handleFailedEventsOption.shouldLog()).thenReturn(true); parseXmlProcessor = createObjectUnderTest(); @@ -94,6 +126,31 @@ void test_when_using_invalid_xml_tags_correctly() { final Event parsedEvent = createAndParseMessageEvent(serializedMessage); assertThat(parsedEvent.getMetadata().hasTags(List.of(tagOnFailure)), equalTo(true)); + + verify(parseErrorsCounter).increment(); + verifyNoInteractions(processingFailuresCounter); + } + + @Test + void test_when_object_mapper_throws_other_exception_tags_correctly() throws JsonProcessingException, NoSuchFieldException, IllegalAccessException { + + final String tagOnFailure = UUID.randomUUID().toString(); + when(processorConfig.getTagsOnFailure()).thenReturn(List.of(tagOnFailure)); + when(handleFailedEventsOption.shouldLog()).thenReturn(true); + + parseXmlProcessor = createObjectUnderTest(); + + final XmlMapper mockMapper = mock(XmlMapper.class); + when(mockMapper.readValue(anyString(), any(TypeReference.class))).thenThrow(IllegalArgumentException.class); + + ReflectivelySetField.setField(ParseXmlProcessor.class, parseXmlProcessor, "xmlMapper", mockMapper); + + final String serializedMessage = "invalidXml"; + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + + assertThat(parsedEvent.getMetadata().hasTags(List.of(tagOnFailure)), equalTo(true)); + + verify(processingFailuresCounter).increment(); } private Event createAndParseMessageEvent(final String message) {