Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add handle failed events option to parse json processors #4844

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.drop;
package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.event.Event;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.slf4j.Logger;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

enum HandleFailedEventsOption {
DROP("drop", true, false),
DROP_SILENTLY("drop_silently", true, true),
SKIP("skip", false, false),
SKIP_SILENTLY("skip_silently", false, true);
public enum HandleFailedEventsOption {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you replace the existing option in drop processor with this? The drop and drop_silently options from that processor seems a decent option to provide elsewhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally wanted to do this, but if we don't support those two options in the parse processors, then it's a bit weird to have there isn't it? Although I could just add a validation that does not allow drop or drop_silently for these processors

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think a validation would be appropriate.

DROP("drop", true, true),
DROP_SILENTLY("drop_silently", true, false),
SKIP("skip", false, true),
SKIP_SILENTLY("skip_silently", false, false);

private static final Map<String, HandleFailedEventsOption> OPTIONS_MAP = Arrays.stream(HandleFailedEventsOption.values())
.collect(Collectors.toMap(
Expand All @@ -37,13 +33,14 @@ enum HandleFailedEventsOption {
this.isLogRequired = isLogRequired;
}

public boolean isDropEventOption(final Event event, final Throwable cause, final Logger log) {
if (isLogRequired) {
log.warn(EVENT, "An exception occurred while processing when expression for event {}", event, cause);
}
public boolean shouldDropEvent() {
return isDropEventOption;
}

public boolean shouldLog() {
return isLogRequired;
}

@JsonCreator
static HandleFailedEventsOption fromOptionValue(final String option) {
return OPTIONS_MAP.get(option.toLowerCase());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.hamcrest.CoreMatchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

class HandleFailedEventsOptionTest {
@ParameterizedTest
@EnumSource(HandleFailedEventsOption.class)
void fromOptionValue(final HandleFailedEventsOption option) {
assertThat(HandleFailedEventsOption.fromOptionValue(option.name()), CoreMatchers.is(option));

if (option == HandleFailedEventsOption.SKIP || option == HandleFailedEventsOption.SKIP_SILENTLY) {
assertThat(option.shouldDropEvent(), equalTo(false));
} else {
assertThat(option.shouldDropEvent(), equalTo(true));
}

if (option == HandleFailedEventsOption.SKIP_SILENTLY || option == HandleFailedEventsOption.DROP_SILENTLY) {
assertThat(option.shouldLog(), equalTo(false));
} else {
assertThat(option.shouldLog(), equalTo(true));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

public class DropEventProcessorConfig {
@JsonProperty("drop_when")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

/**
* @since 1.3
*
Expand Down Expand Up @@ -57,7 +60,10 @@ public boolean isStatementFalseWith(final Event event) {
try {
return !expressionEvaluator.evaluateConditional(dropWhen, event);
} catch (final Exception e) {
return handleFailedEventsSetting.isDropEventOption(event, e, LOG);
if (handleFailedEventsSetting.shouldLog()) {
LOG.warn(EVENT, "An exception occurred while processing when expression for event [{}]", event, e);
}
return handleFailedEventsSetting.shouldDropEvent();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.BeforeEach;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

import java.util.UUID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

import java.util.UUID;
import java.util.stream.Stream;
Expand Down

This file was deleted.

1 change: 1 addition & 0 deletions data-prepper-plugins/parse-json-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ plugins {
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,6 +33,7 @@

public abstract class AbstractParseProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractParseProcessor.class);
private static final String PROCESSING_FAILURES = "processingFailures";

private final EventKey source;
private final EventKey destination;
Expand All @@ -40,6 +43,10 @@ public abstract class AbstractParseProcessor extends AbstractProcessor<Record<Ev
private final boolean overwriteIfDestinationExists;
private final boolean deleteSourceRequested;

private final HandleFailedEventsOption handleFailedEventsOption;

protected final Counter processingFailuresCounter;

private final ExpressionEvaluator expressionEvaluator;
private final EventKeyFactory eventKeyFactory;

Expand All @@ -56,6 +63,8 @@ protected AbstractParseProcessor(final PluginMetrics pluginMetrics,
tagsOnFailure = commonParseConfig.getTagsOnFailure();
overwriteIfDestinationExists = commonParseConfig.getOverwriteIfDestinationExists();
deleteSourceRequested = commonParseConfig.isDeleteSourceRequested();
handleFailedEventsOption = commonParseConfig.getHandleFailedEventsOption();
processingFailuresCounter = pluginMetrics.counter(PROCESSING_FAILURES);
this.expressionEvaluator = expressionEvaluator;
this.eventKeyFactory = eventKeyFactory;
}
Expand Down Expand Up @@ -104,8 +113,11 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
if(deleteSourceRequested) {
event.delete(this.source);
}
} catch (final Exception e) {
LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e);
} catch (Exception e) {
processingFailuresCounter.increment();
if (handleFailedEventsOption.shouldLog()) {
LOG.error(EVENT, "An exception occurred while using the {} processor on Event [{}]", getProcessorName(), record.getData(), e);
}
}
}
return records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.parse;

import java.util.List;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;

public interface CommonParseConfig {
/**
Expand Down Expand Up @@ -59,4 +60,10 @@ public interface CommonParseConfig {
* Defaults to false.
*/
boolean isDeleteSourceRequested();

/**
* An optional setting used to determine how to handle parsing errors. Default is skip, which includes logging the error
* and passing the failed Event downstream to the next processor.
*/
HandleFailedEventsOption getHandleFailedEventsOption();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.dataformat.ion.IonObjectMapper;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor;
import org.slf4j.Logger;
Expand All @@ -27,9 +29,14 @@
@DataPrepperPlugin(name = "parse_ion", pluginType = Processor.class, pluginConfigurationType = ParseIonProcessorConfig.class)
public class ParseIonProcessor extends AbstractParseProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ParseIonProcessor.class);
private static final String PARSE_ERRORS = "parseErrors";

private final IonObjectMapper objectMapper = new IonObjectMapper();

private final Counter parseErrorsCounter;

private final HandleFailedEventsOption handleFailedEventsOption;

@DataPrepperPluginConstructor
public ParseIonProcessor(final PluginMetrics pluginMetrics,
final ParseIonProcessorConfig parseIonProcessorConfig,
Expand All @@ -39,6 +46,9 @@ public ParseIonProcessor(final PluginMetrics pluginMetrics,

// Convert Timestamps to ISO-8601 Z strings
objectMapper.registerModule(new IonTimestampConverterModule());

handleFailedEventsOption = parseIonProcessorConfig.getHandleFailedEventsOption();
parseErrorsCounter = pluginMetrics.counter(PARSE_ERRORS);
}

@Override
Expand All @@ -47,10 +57,16 @@ protected Optional<HashMap<String, Object>> readValue(String message, Event cont
// We need to do a two-step process here, read the value in, then convert away any Ion types like Timestamp
return Optional.of(objectMapper.convertValue(objectMapper.readValue(message, new TypeReference<>() {}), new TypeReference<>() {}));
} catch (JsonProcessingException e) {
LOG.error(SENSITIVE, "An exception occurred due to invalid Ion while parsing [{}] due to {}", message, e.getMessage());
if (handleFailedEventsOption.shouldLog()) {
LOG.error(SENSITIVE, "An exception occurred due to invalid Ion while parsing [{}] due to {}", message, e.getMessage());
}
parseErrorsCounter.increment();
return Optional.empty();
} catch (Exception e) {
LOG.error(SENSITIVE, "An exception occurred while using the parse_ion processor while parsing [{}]", message, e);
if (handleFailedEventsOption.shouldLog()) {
LOG.error(SENSITIVE, "An exception occurred while using the parse_ion processor while parsing [{}]", message, e);
}
processingFailuresCounter.increment();
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
package org.opensearch.dataprepper.plugins.processor.parse.ion;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.event.HandleFailedEventsOption;
import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig;

import java.util.List;
Expand Down Expand Up @@ -38,6 +41,14 @@ public class ParseIonProcessorConfig implements CommonParseConfig {
@JsonProperty
private boolean deleteSource = false;

@JsonProperty("handle_failed_events")
@JsonPropertyDescription("Determines how to handle events with ION processing errors. Options include 'skip', " +
"which will log the error and send the Event downstream to the next processor, and 'skip_silently', " +
"which will send the Event downstream to the next processor without logging the error. " +
"Default is 'skip'.")
@NotNull
private HandleFailedEventsOption handleFailedEventsOption = HandleFailedEventsOption.SKIP;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means, default behavior is not changed, right? ie, the failures are logged and skipped. This also, means customers will see this issue, unless they explicitly change this. Is this what is agreed upon? Should we set it to SKIP_SILENTLY in OSI?

Copy link
Member Author

@graytaylor0 graytaylor0 Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's right default is to log and skip still. OSI default decision will be made separately


@Override
public String getSource() {
return source;
Expand Down Expand Up @@ -78,4 +89,22 @@ boolean isValidDestination() {
public boolean isDeleteSourceRequested() {
return deleteSource;
}

@Override
public HandleFailedEventsOption getHandleFailedEventsOption() {
return handleFailedEventsOption;
}

@AssertTrue(message = "handled_failed_events must be set to 'skip' or 'skip_silently'.")
boolean isHandleFailedEventsOptionValid() {
if (handleFailedEventsOption == null) {
return true;
}

if (handleFailedEventsOption.shouldDropEvent()) {
return false;
}

return true;
}
}
Loading
Loading