Skip to content

Commit

Permalink
Add handle failed events option to parse json processors (opensearch-…
Browse files Browse the repository at this point in the history
…project#4844)

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 authored Aug 19, 2024
1 parent 9a82590 commit ff2de26
Show file tree
Hide file tree
Showing 24 changed files with 430 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.drop;
package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.event.Event;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.slf4j.Logger;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

enum HandleFailedEventsOption {
DROP("drop", true, false),
DROP_SILENTLY("drop_silently", true, true),
SKIP("skip", false, false),
SKIP_SILENTLY("skip_silently", false, true);
public enum HandleFailedEventsOption {
DROP("drop", true, true),
DROP_SILENTLY("drop_silently", true, false),
SKIP("skip", false, true),
SKIP_SILENTLY("skip_silently", false, false);

private static final Map<String, HandleFailedEventsOption> OPTIONS_MAP = Arrays.stream(HandleFailedEventsOption.values())
.collect(Collectors.toMap(
Expand All @@ -37,13 +33,14 @@ enum HandleFailedEventsOption {
this.isLogRequired = isLogRequired;
}

public boolean isDropEventOption(final Event event, final Throwable cause, final Logger log) {
if (isLogRequired) {
log.warn(EVENT, "An exception occurred while processing when expression for event {}", event, cause);
}
public boolean shouldDropEvent() {
return isDropEventOption;
}

public boolean shouldLog() {
return isLogRequired;
}

@JsonCreator
static HandleFailedEventsOption fromOptionValue(final String option) {
return OPTIONS_MAP.get(option.toLowerCase());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.hamcrest.CoreMatchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

class HandleFailedEventsOptionTest {
@ParameterizedTest
@EnumSource(HandleFailedEventsOption.class)
void fromOptionValue(final HandleFailedEventsOption option) {
assertThat(HandleFailedEventsOption.fromOptionValue(option.name()), CoreMatchers.is(option));

if (option == HandleFailedEventsOption.SKIP || option == HandleFailedEventsOption.SKIP_SILENTLY) {
assertThat(option.shouldDropEvent(), equalTo(false));
} else {
assertThat(option.shouldDropEvent(), equalTo(true));
}

if (option == HandleFailedEventsOption.SKIP_SILENTLY || option == HandleFailedEventsOption.DROP_SILENTLY) {
assertThat(option.shouldLog(), equalTo(false));
} else {
assertThat(option.shouldLog(), equalTo(true));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

public class DropEventProcessorConfig {
@JsonProperty("drop_when")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

/**
* @since 1.3
*
Expand Down Expand Up @@ -57,7 +60,10 @@ public boolean isStatementFalseWith(final Event event) {
try {
return !expressionEvaluator.evaluateConditional(dropWhen, event);
} catch (final Exception e) {
return handleFailedEventsSetting.isDropEventOption(event, e, LOG);
if (handleFailedEventsSetting.shouldLog()) {
LOG.warn(EVENT, "An exception occurred while processing when expression for event [{}]", event, e);
}
return handleFailedEventsSetting.shouldDropEvent();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.BeforeEach;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

import java.util.UUID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

import java.util.UUID;
import java.util.stream.Stream;
Expand Down

This file was deleted.

1 change: 1 addition & 0 deletions data-prepper-plugins/parse-json-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ plugins {
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,6 +33,7 @@

public abstract class AbstractParseProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractParseProcessor.class);
private static final String PROCESSING_FAILURES = "processingFailures";

private final EventKey source;
private final EventKey destination;
Expand All @@ -40,6 +43,10 @@ public abstract class AbstractParseProcessor extends AbstractProcessor<Record<Ev
private final boolean overwriteIfDestinationExists;
private final boolean deleteSourceRequested;

private final HandleFailedEventsOption handleFailedEventsOption;

protected final Counter processingFailuresCounter;

private final ExpressionEvaluator expressionEvaluator;
private final EventKeyFactory eventKeyFactory;

Expand All @@ -56,6 +63,8 @@ protected AbstractParseProcessor(final PluginMetrics pluginMetrics,
tagsOnFailure = commonParseConfig.getTagsOnFailure();
overwriteIfDestinationExists = commonParseConfig.getOverwriteIfDestinationExists();
deleteSourceRequested = commonParseConfig.isDeleteSourceRequested();
handleFailedEventsOption = commonParseConfig.getHandleFailedEventsOption();
processingFailuresCounter = pluginMetrics.counter(PROCESSING_FAILURES);
this.expressionEvaluator = expressionEvaluator;
this.eventKeyFactory = eventKeyFactory;
}
Expand Down Expand Up @@ -104,8 +113,11 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
if(deleteSourceRequested) {
event.delete(this.source);
}
} catch (final Exception e) {
LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e);
} catch (Exception e) {
processingFailuresCounter.increment();
if (handleFailedEventsOption.shouldLog()) {
LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e);
}
}
}
return records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.parse;

import java.util.List;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

public interface CommonParseConfig {
/**
Expand Down Expand Up @@ -59,4 +60,10 @@ public interface CommonParseConfig {
* Defaults to false.
*/
boolean isDeleteSourceRequested();

/**
* An optional setting used to determine how to handle parsing errors. Default is skip, which includes logging the error
* and passing the failed Event downstream to the next processor.
*/
HandleFailedEventsOption getHandleFailedEventsOption();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.dataformat.ion.IonObjectMapper;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;
import org.slf4j.Logger;
Expand All @@ -27,9 +29,14 @@
@DataPrepperPlugin(name = "parse_ion", pluginType = Processor.class, pluginConfigurationType = ParseIonProcessorConfig.class)
public class ParseIonProcessor extends AbstractParseProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ParseIonProcessor.class);
private static final String PARSE_ERRORS = "parseErrors";

private final IonObjectMapper objectMapper = new IonObjectMapper();

private final Counter parseErrorsCounter;

private final HandleFailedEventsOption handleFailedEventsOption;

@DataPrepperPluginConstructor
public ParseIonProcessor(final PluginMetrics pluginMetrics,
final ParseIonProcessorConfig parseIonProcessorConfig,
Expand All @@ -39,6 +46,9 @@ public ParseIonProcessor(final PluginMetrics pluginMetrics,

// Convert Timestamps to ISO-8601 Z strings
objectMapper.registerModule(new IonTimestampConverterModule());

handleFailedEventsOption = parseIonProcessorConfig.getHandleFailedEventsOption();
parseErrorsCounter = pluginMetrics.counter(PARSE_ERRORS);
}

@Override
Expand All @@ -47,10 +57,16 @@ protected Optional<HashMap<String, Object>> readValue(String message, Event cont
// We need to do a two-step process here, read the value in, then convert away any Ion types like Timestamp
return Optional.of(objectMapper.convertValue(objectMapper.readValue(message, new TypeReference<>() {}), new TypeReference<>() {}));
} catch (JsonProcessingException e) {
LOG.error(SENSITIVE, "An exception occurred due to invalid Ion while parsing [{}] due to {}", message, e.getMessage());
if (handleFailedEventsOption.shouldLog()) {
LOG.error(SENSITIVE, "An exception occurred due to invalid Ion while parsing [{}] due to {}", message, e.getMessage());
}
parseErrorsCounter.increment();
return Optional.empty();
} catch (Exception e) {
LOG.error(SENSITIVE, "An exception occurred while using the parse_ion processor while parsing [{}]", message, e);
if (handleFailedEventsOption.shouldLog()) {
LOG.error(SENSITIVE, "An exception occurred while using the parse_ion processor while parsing [{}]", message, e);
}
processingFailuresCounter.increment();
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
package org.opensearch.dataprepper.plugins.processor.parse.ion;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig;

import java.util.List;
Expand Down Expand Up @@ -38,6 +41,14 @@ public class ParseIonProcessorConfig implements CommonParseConfig {
@JsonProperty
private boolean deleteSource = false;

@JsonProperty("handle_failed_events")
@JsonPropertyDescription("Determines how to handle events with ION processing errors. Options include 'skip', " +
"which will log the error and send the Event downstream to the next processor, and 'skip_silently', " +
"which will send the Event downstream to the next processor without logging the error. " +
"Default is 'skip'.")
@NotNull
private HandleFailedEventsOption handleFailedEventsOption = HandleFailedEventsOption.SKIP;

@Override
public String getSource() {
return source;
Expand Down Expand Up @@ -78,4 +89,22 @@ boolean isValidDestination() {
public boolean isDeleteSourceRequested() {
return deleteSource;
}

@Override
public HandleFailedEventsOption getHandleFailedEventsOption() {
return handleFailedEventsOption;
}

@AssertTrue(message = "handled_failed_events must be set to 'skip' or 'skip_silently'.")
boolean isHandleFailedEventsOptionValid() {
if (handleFailedEventsOption == null) {
return true;
}

if (handleFailedEventsOption.shouldDropEvent()) {
return false;
}

return true;
}
}
Loading

0 comments on commit ff2de26

Please sign in to comment.