diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index 68cb6f6e65..544928c242 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; @@ -78,6 +79,10 @@ public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfi this.localMode = aggregateProcessorConfig.getLocalMode(); pluginMetrics.gauge(CURRENT_AGGREGATE_GROUPS, aggregateGroupManager, AggregateGroupManager::getAllGroupsSize); + + if (aggregateProcessorConfig.getWhenCondition() != null && (!expressionEvaluator.isValidExpressionStatement(aggregateProcessorConfig.getWhenCondition()))) { + throw new InvalidPluginConfigurationException("aggregate_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax"); + } } private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) { diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java index 1c2c9fa701..cfb986fe53 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java @@ -5,8 +5,8 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; -import org.opensearch.dataprepper.model.configuration.PluginModel; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import org.opensearch.dataprepper.model.configuration.PluginModel; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotEmpty; @@ -28,7 +28,7 @@ public class AggregateProcessorConfig { @JsonProperty("group_duration") private Duration groupDuration = Duration.ofSeconds(DEFAULT_GROUP_DURATION_SECONDS); - @JsonPropertyDescription("The action to be performed on each group. One of the available aggregate actions must be provided, or you can create custom aggregate actions. remove_duplicates and put_all are the available actions. For more information, see Creating New Aggregate Actions.") + @JsonPropertyDescription("The action to be performed on each group. One of the available aggregate actions must be provided.") @JsonProperty("action") @NotNull private PluginModel aggregateAction; @@ -46,7 +46,7 @@ public class AggregateProcessorConfig { @JsonProperty("aggregated_events_tag") private String aggregatedEventsTag; - @JsonPropertyDescription("A Data Prepper conditional expression (https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the processor will be run on the event.") + @JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the processor will be run on the event.") @JsonProperty("aggregate_when") private String whenCondition; @@ -74,7 +74,7 @@ public Boolean getLocalMode() { return localMode; } - @AssertTrue(message="Aggragated Events Tag must be set when output_unaggregated_events is set") + @AssertTrue(message="Aggregated Events Tag must be set when output_unaggregated_events is set") boolean isValidConfig() { return (!outputUnaggregatedEvents || (outputUnaggregatedEvents && aggregatedEventsTag != null)); } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateActionConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateActionConfig.java index 3d4a9b4a86..529ef0bde3 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateActionConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateActionConfig.java @@ -12,8 +12,8 @@ public class AppendAggregateActionConfig { - @JsonPropertyDescription("List of keys to append.") @JsonProperty("keys_to_append") + @JsonPropertyDescription("A list of keys to append to for the aggregated result.") List keysToAppend; public List getKeysToAppend() { diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateActionConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateActionConfig.java index be9770400a..0a17e37c43 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateActionConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateActionConfig.java @@ -6,12 +6,13 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.NotNull; import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.AssertTrue; public class PercentSamplerAggregateActionConfig { - @JsonPropertyDescription("Percent value of the sampling to be done. 0.0 < percent < 100.0") + @JsonPropertyDescription("The percentage of events to be processed during a one second interval. Must be greater than 0.0 and less than 100.0") @JsonProperty("percent") @NotNull private double percent; diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateActionConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateActionConfig.java index f86672e3b9..85ce0b1135 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateActionConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateActionConfig.java @@ -23,7 +23,7 @@ public class TailSamplerAggregateActionConfig { @NotNull private Integer percent; - @JsonPropertyDescription("A Data Prepper conditional expression (https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the event is an error event or not") + @JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the event is an error event or not") @JsonProperty("condition") private String condition; diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java index fc416b0e45..46ec0a996e 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java @@ -221,6 +221,7 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException { String condition = "/firstRandomNumber < 100"; when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); when(aggregateProcessorConfig.getWhenCondition()).thenReturn(condition); + when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true); int count = 0; for (Record record: eventBatch) { Event event = record.getData(); @@ -410,6 +411,7 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio final String condition = "/firstRandomNumber < 100"; when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); when(aggregateProcessorConfig.getWhenCondition()).thenReturn(condition); + when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true); int count = 0; eventBatch = getBatchOfEvents(true); for (Record record: eventBatch) { diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java index 6e60e7d965..a72921cf71 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import io.micrometer.core.instrument.Counter; @@ -41,6 +42,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -152,6 +154,16 @@ void setUp() { when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(timeElapsed); } + @Test + void invalid_aggregate_when_statement_throws_InvalidPluginConfigurationException() { + final String whenCondition = UUID.randomUUID().toString(); + when(aggregateProcessorConfig.getWhenCondition()).thenReturn(whenCondition); + + when(expressionEvaluator.isValidExpressionStatement(whenCondition)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test void getIdentificationKeys_should_return_configured_identification_keys() { final List keys = List.of("key"); @@ -218,6 +230,7 @@ void handleEvent_returning_with_condition_eliminates_one_record() { when(identificationKeysHasher.createIdentificationKeysMapFromEvent(firstEvent)) .thenReturn(identificationKeysMap); when(aggregateActionSynchronizer.handleEventForGroup(firstEvent, identificationKeysMap, aggregateGroup)).thenReturn(firstAggregateActionResponse); + when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true); when(expressionEvaluator.evaluateConditional(condition, event)).thenReturn(true); when(expressionEvaluator.evaluateConditional(condition, firstEvent)).thenReturn(true); when(expressionEvaluator.evaluateConditional(condition, secondEvent)).thenReturn(false); @@ -280,6 +293,7 @@ void handleEvent_returning_with_condition_eliminates_one_record_local_only() { when(identificationKeysHasher.createIdentificationKeysMapFromEvent(firstEvent)) .thenReturn(identificationKeysMap); when(aggregateActionSynchronizer.handleEventForGroup(firstEvent, identificationKeysMap, aggregateGroup)).thenReturn(firstAggregateActionResponse); + when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true); when(expressionEvaluator.evaluateConditional(condition, event)).thenReturn(true); when(expressionEvaluator.evaluateConditional(condition, firstEvent)).thenReturn(true); when(expressionEvaluator.evaluateConditional(condition, secondEvent)).thenReturn(false); diff --git a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorConfig.java b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorConfig.java index fb803798b2..e21968ebdf 100644 --- a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorConfig.java +++ b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorConfig.java @@ -57,7 +57,7 @@ public class CsvProcessorConfig { private List columnNames; @JsonProperty("csv_when") - @JsonPropertyDescription("Allows you to specify a [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + + @JsonPropertyDescription("Allows you to specify a Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + "such as `/some-key == \"test\"`, that will be evaluated to determine whether " + "the processor should be applied to the event.") private String csvWhen; diff --git a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java index a494cf5334..328f8294bc 100644 --- a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java +++ b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessor.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -63,6 +64,10 @@ public DateProcessor(PluginMetrics pluginMetrics, final DateProcessorConfig date if (dateProcessorConfig.getMatch() != null) extractKeyAndFormatters(); + + if (dateProcessorConfig.getDateWhen() != null && (!expressionEvaluator.isValidExpressionStatement(dateProcessorConfig.getDateWhen()))) { + throw new InvalidPluginConfigurationException("date_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax"); + } } @Override diff --git a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java index aed3a38674..b62f1b6efd 100644 --- a/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java +++ b/data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java @@ -31,7 +31,7 @@ public static class DateMatch { @JsonProperty("patterns") @JsonPropertyDescription("A list of possible patterns that the timestamp value of the key can have. The patterns " + "are based on a sequence of letters and symbols. The `patterns` support all the patterns listed in the " + - "Java [DatetimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) reference. " + + "Java DateTimeFormatter (https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) reference. " + "The timestamp value also supports `epoch_second`, `epoch_milli`, and `epoch_nano` values, " + "which represent the timestamp as the number of seconds, milliseconds, and nanoseconds since the epoch. " + "Epoch values always use the UTC time zone.") @@ -54,6 +54,7 @@ public List getPatterns() { } @JsonIgnore + @AssertTrue public boolean isValidPatterns() { // For now, allow only one of the three "epoch_" pattern int count = 0; @@ -119,23 +120,23 @@ public static boolean isValidPattern(final String pattern) { @JsonProperty("source_timezone") @JsonPropertyDescription("The time zone used to parse dates, including when the zone or offset cannot be extracted " + "from the value. If the zone or offset are part of the value, then the time zone is ignored. " + - "A list of all the available time zones is contained in the **TZ database name** column of " + - "[the list of database time zones](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List).") + "A list of all the available time zones is contained in the TZ database name column of " + + "(https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List).") private String sourceTimezone = DEFAULT_SOURCE_TIMEZONE; @JsonProperty("destination_timezone") @JsonPropertyDescription("The time zone used for storing the timestamp in the `destination` field. " + - "A list of all the available time zones is contained in the **TZ database name** column of " + - "[the list of database time zones](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List).") + "A list of all the available time zones is contained in the TZ database name column of " + + "(https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List).") private String destinationTimezone = DEFAULT_DESTINATION_TIMEZONE; @JsonProperty("locale") @JsonPropertyDescription("The location used for parsing dates. Commonly used for parsing month names (`MMM`). " + "The value can contain language, country, or variant fields in IETF BCP 47, such as `en-US`, " + "or a string representation of the " + - "[locale](https://docs.oracle.com/javase/8/docs/api/java/util/Locale.html) object, such as `en_US`. " + + "locale (https://docs.oracle.com/javase/8/docs/api/java/util/Locale.html) object, such as `en_US`. " + "A full list of locale fields, including language, country, and variant, can be found in " + - "[the language subtag registry](https://www.iana.org/assignments/language-subtag-registry/language-subtag-registry). " + + "(https://www.iana.org/assignments/language-subtag-registry/language-subtag-registry). " + "Default is `Locale.ROOT`.") private String locale; diff --git a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java index c6688d08e3..91299dc4b3 100644 --- a/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java +++ b/data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorTests.java @@ -22,6 +22,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import java.time.Instant; @@ -44,6 +45,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; @@ -105,6 +107,17 @@ private DateProcessor createObjectUnderTest() { return new DateProcessor(pluginMetrics, mockDateProcessorConfig, expressionEvaluator); } + @Test + void invalid_date_when_condition_throws_InvalidPluginConfigurationException() { + final String dateWhen = UUID.randomUUID().toString(); + + when(mockDateProcessorConfig.getDateWhen()).thenReturn(dateWhen); + + when(expressionEvaluator.isValidExpressionStatement(dateWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test void from_time_received_with_default_destination_test() { when(mockDateProcessorConfig.getFromTimeReceived()).thenReturn(true); @@ -130,7 +143,9 @@ void from_time_received_with_default_destination_test() { @Test void date_when_does_not_run_date_processor_for_event_with_date_when_as_false() { - when(mockDateProcessorConfig.getDateWhen()).thenReturn(UUID.randomUUID().toString()); + final String dateWhen = UUID.randomUUID().toString(); + when(mockDateProcessorConfig.getDateWhen()).thenReturn(dateWhen); + when(expressionEvaluator.isValidExpressionStatement(dateWhen)).thenReturn(true); dateProcessor = createObjectUnderTest(); Map testData = getTestData(); @@ -526,7 +541,9 @@ void match_without_year_test(String pattern) { @Test void date_processor_catches_exceptions_instead_of_throwing() { - when(mockDateProcessorConfig.getDateWhen()).thenReturn(UUID.randomUUID().toString()); + final String dateWhen = UUID.randomUUID().toString(); + when(mockDateProcessorConfig.getDateWhen()).thenReturn(dateWhen); + when(expressionEvaluator.isValidExpressionStatement(dateWhen)).thenReturn(true); when(expressionEvaluator.evaluateConditional(any(String.class), any(Event.class))) .thenThrow(RuntimeException.class); diff --git a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java index ce2d985277..32248cdba5 100644 --- a/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java +++ b/data-prepper-plugins/decompress-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/decompress/DecompressProcessorConfig.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import org.opensearch.dataprepper.plugins.processor.decompress.encoding.EncodingType; @@ -16,17 +17,22 @@ public class DecompressProcessorConfig { + @JsonPropertyDescription("The keys in the event that will be decompressed.") @JsonProperty("keys") @NotEmpty + @NotNull private List keys; + @JsonPropertyDescription("The type of decompression to use for the keys in the event. Only gzip is supported.") @JsonProperty("type") @NotNull private DecompressionType decompressionType; + @JsonPropertyDescription("A conditional expression that determines when the decompress processor will run on certain events.") @JsonProperty("decompress_when") private String decompressWhen; + @JsonPropertyDescription("A list of strings with which to tag events when the processor fails to decompress the keys inside an event. Defaults to _decompression_failure.") @JsonProperty("tags_on_failure") private List tagsOnFailure = List.of("_decompression_failure"); diff --git a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java index 5e5fc296bc..cc93a9a4d1 100644 --- a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java +++ b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -49,6 +50,11 @@ public DissectProcessor(PluginMetrics pluginMetrics, final DissectProcessorConfi dissectorMap.put(key, dissector); } + if (dissectConfig.getDissectWhen() != null && + (!expressionEvaluator.isValidExpressionStatement(dissectConfig.getDissectWhen()))) { + throw new InvalidPluginConfigurationException("dissect_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax"); + } + } @Override diff --git a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorConfig.java b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorConfig.java index bc8ef4705a..f934919e81 100644 --- a/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorConfig.java +++ b/data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorConfig.java @@ -11,15 +11,15 @@ public class DissectProcessorConfig { @NotNull @JsonProperty("map") @JsonPropertyDescription("Defines the `dissect` patterns for specific keys. For details on how to define fields " + - "in the `dissect` pattern, see [Field notations](#field-notations).") + "in the `dissect` pattern, see (https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/processors/dissect/#field-notations).") private Map map; @JsonProperty("target_types") @JsonPropertyDescription("Specifies the data types for extract fields. Valid options are `integer`, " + - "`double`, `string`, and `boolean`. By default, all fields are of the `string` type.") + "`double`, `string`, `long`, `big_decimal`, and `boolean`. By default, all fields are of the `string` type.") private Map targetTypes; @JsonProperty("dissect_when") - @JsonPropertyDescription("Specifies a condition for performing the `dissect` operation using a " + - "[Data Prepper expression]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/expression-syntax/). " + + @JsonPropertyDescription("Specifies a condition for performing the `dissect` operation using a Data Prepper [conditional expression]" + + "(https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/). " + "If specified, the `dissect` operation will only run when the expression evaluates to true.") private String dissectWhen; diff --git a/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorTest.java b/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorTest.java index b344bda68e..9ab6dec585 100644 --- a/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorTest.java +++ b/data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorTest.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.processor.dissect.Fields.AppendField; import org.opensearch.dataprepper.plugins.processor.dissect.Fields.Field; @@ -25,9 +26,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; @@ -49,6 +52,17 @@ void setUp() { when(dissectConfig.getMap()).thenReturn(Map.of()); } + @Test + void invalid_dissect_when_condition_throws_InvalidPluginConfigurationException() { + final String dissectWhen = UUID.randomUUID().toString(); + + when(dissectConfig.getDissectWhen()).thenReturn(dissectWhen); + + when(expressionEvaluator.isValidExpressionStatement(dissectWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test void test_normal_fields_dissect_succeeded() throws NoSuchFieldException, IllegalAccessException { diff --git a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfig.java b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfig.java index ecc2d2d065..b1383fbdf0 100644 --- a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfig.java +++ b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventProcessorConfig.java @@ -6,14 +6,18 @@ package org.opensearch.dataprepper.plugins.processor.drop; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.NotEmpty; import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; public class DropEventProcessorConfig { + + @JsonPropertyDescription("Accepts a Data Prepper conditional expression string following the [Data Prepper Expression Syntax](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/). Configuring drop_events with drop_when: true drops all the events received.") @JsonProperty("drop_when") @NotEmpty private String dropWhen; + @JsonPropertyDescription("Specifies how exceptions are handled when an exception occurs while evaluating an event. Default value is 'drop', which drops the event so that it is not sent to OpenSearch. Available options are 'drop', 'drop_silently', 'skip', and 'skip_silently'.") @JsonProperty("handle_failed_events") private HandleFailedEventsOption handleFailedEventsOption = HandleFailedEventsOption.SKIP; diff --git a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessor.java b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessor.java index ccfec9a3ab..6196894beb 100644 --- a/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessor.java +++ b/data-prepper-plugins/drop-events-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessor.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.annotations.SingleThread; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -33,6 +34,11 @@ public DropEventsProcessor( ) { super(pluginMetrics); + if (dropEventProcessorConfig.getDropWhen() != null && + (!expressionEvaluator.isValidExpressionStatement(dropEventProcessorConfig.getDropWhen()))) { + throw new InvalidPluginConfigurationException("drop_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax"); + } + whenCondition = new DropEventsWhenCondition.Builder() .withDropEventsProcessorConfig(dropEventProcessorConfig) .withExpressionEvaluator(expressionEvaluator) diff --git a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessorTests.java b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessorTests.java index 1af2146139..65cc35f80f 100644 --- a/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessorTests.java +++ b/data-prepper-plugins/drop-events-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/drop/DropEventsProcessorTests.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -28,11 +29,15 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class DropEventsProcessorTests { @@ -49,11 +54,23 @@ public class DropEventsProcessorTests { @BeforeEach void beforeEach() { whenSetting = UUID.randomUUID().toString(); - doReturn(HandleFailedEventsOption.SKIP) + when(expressionEvaluator.isValidExpressionStatement(anyString())).thenReturn(true); + lenient().doReturn(HandleFailedEventsOption.SKIP) .when(dropEventProcessorConfig) .getHandleFailedEventsOption(); } + @Test + void invalid_drop_when_throws_InvalidPluginConfigurationException() { + + final String dropWhen = UUID.randomUUID().toString(); + + when(dropEventProcessorConfig.getDropWhen()).thenReturn(dropWhen); + when(expressionEvaluator.isValidExpressionStatement(dropWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, () -> new DropEventsProcessor(pluginMetrics, dropEventProcessorConfig, expressionEvaluator)); + } + @Test void testSingleMessageToDropProcessor() { doReturn("true") diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java index 5babe81a29..dae4931d6e 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig; @@ -22,6 +23,9 @@ public class DynamoDBSourceConfig { @JsonProperty("tables") + @NotEmpty + @NotNull + @Valid private List tableConfigs = Collections.emptyList(); @JsonProperty("aws") diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/TableConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/TableConfig.java index d7c90ea1dd..d0e2b2c25d 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/TableConfig.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/TableConfig.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.configuration; import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; @@ -17,9 +18,11 @@ public class TableConfig { private String tableArn; @JsonProperty("export") + @Valid private ExportConfig exportConfig; @JsonProperty(value = "stream") + @Valid private StreamConfig streamConfig; diff --git a/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessor.java b/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessor.java index 9e3218be88..0e22a6db10 100644 --- a/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessor.java +++ b/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessor.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -40,6 +41,11 @@ public FlattenProcessor(final PluginMetrics pluginMetrics, final FlattenProcesso for (final String key : config.getExcludeKeys()) { excludeKeysAndJsonPointers.put(key, getJsonPointer(config.getSource(), key)); } + + if (config.getFlattenWhen() != null && + (!expressionEvaluator.isValidExpressionStatement(config.getFlattenWhen()))) { + throw new InvalidPluginConfigurationException("flatten_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax"); + } } @Override diff --git a/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfig.java b/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfig.java index 783f73a9da..648936f2bf 100644 --- a/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfig.java +++ b/data-prepper-plugins/flatten-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorConfig.java @@ -50,7 +50,7 @@ public class FlattenProcessorConfig { private List excludeKeys = DEFAULT_EXCLUDE_KEYS; @JsonProperty("flatten_when") - @JsonPropertyDescription("A [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + + @JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + "such as `/some-key == \"test\"'`, that determines whether the `flatten` processor will be run on the " + "event. Default is `null`, which means that all events will be processed unless otherwise stated.") private String flattenWhen; diff --git a/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorTest.java b/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorTest.java index df693f7f6f..d5c0a4ad2a 100644 --- a/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorTest.java +++ b/data-prepper-plugins/flatten-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/flatten/FlattenProcessorTest.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import java.util.ArrayList; @@ -29,6 +30,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.when; @@ -58,6 +60,17 @@ void setUp() { lenient().when(mockConfig.getExcludeKeys()).thenReturn(new ArrayList<>()); } + @Test + void invalid_flatten_when_expression_throws_InvalidPluginConfigurationException() { + final String flattenWhen = UUID.randomUUID().toString(); + + when(mockConfig.getFlattenWhen()).thenReturn(flattenWhen); + + when(expressionEvaluator.isValidExpressionStatement(flattenWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test void testFlattenEntireEventData() { final FlattenProcessor processor = createObjectUnderTest(); @@ -252,6 +265,7 @@ void testFlattenWithSpecificFieldsAsSourceAndTargetAndRemoveListIndicesAndRemove public void testEventNotProcessedWhenTheWhenConditionIsFalse() { final String whenCondition = UUID.randomUUID().toString(); when(mockConfig.getFlattenWhen()).thenReturn(whenCondition); + when(expressionEvaluator.isValidExpressionStatement(whenCondition)).thenReturn(true); final FlattenProcessor processor = createObjectUnderTest(); final Record testRecord = createTestRecord(createTestData()); diff --git a/data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessorIT.java b/data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessorIT.java index 57de2a3185..d222dd4828 100644 --- a/data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessorIT.java +++ b/data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessorIT.java @@ -110,6 +110,8 @@ public void setUp() { lenient().when(pluginMetrics.counter(GEO_IP_EVENTS_FAILED)).thenReturn(geoIpEventsFailed); lenient().when(pluginMetrics.counter(GEO_IP_EVENTS_FAILED_ENGINE_EXCEPTION)).thenReturn(geoIpEventsFailedEngineException); lenient().when(pluginMetrics.counter(GEO_IP_EVENTS_FAILED_IP_NOT_FOUND)).thenReturn(geoIpEventsFailedIPNotFound); + + when(expressionEvaluator.isValidExpressionStatement("/peer/status == \"success\"")).thenReturn(true); } public GeoIPProcessor createObjectUnderTest() { diff --git a/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/geoip/processor/EntryConfig.java b/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/geoip/processor/EntryConfig.java index d1bfe17dbd..b425ce3bbb 100644 --- a/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/geoip/processor/EntryConfig.java +++ b/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/geoip/processor/EntryConfig.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.geoip.processor; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotEmpty; import org.opensearch.dataprepper.plugins.geoip.GeoIPField; @@ -16,16 +17,21 @@ public class EntryConfig { static final String DEFAULT_TARGET = "geo"; + + @JsonPropertyDescription("The key of the source field containing the IP address to geolocate.") @JsonProperty("source") @NotEmpty private String source; + @JsonPropertyDescription("The key of the target field in which to save the geolocation data. Default is geo.") @JsonProperty("target") private String target = DEFAULT_TARGET; + @JsonPropertyDescription("The list of geolocation fields to include in the target object. By default, this is all the fields provided by the configured databases.") @JsonProperty("include_fields") private List includeFields; + @JsonPropertyDescription("The list of geolocation fields to exclude from the target object.") @JsonProperty("exclude_fields") private List excludeFields; diff --git a/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessor.java b/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessor.java index 933cec7c89..a264d7f66a 100644 --- a/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessor.java +++ b/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessor.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -73,6 +74,12 @@ public GeoIPProcessor(final PluginMetrics pluginMetrics, final GeoIpConfigSupplier geoIpConfigSupplier, final ExpressionEvaluator expressionEvaluator) { super(pluginMetrics); + + if (geoIPProcessorConfig.getWhenCondition() != null && + (!expressionEvaluator.isValidExpressionStatement(geoIPProcessorConfig.getWhenCondition()))) { + throw new InvalidPluginConfigurationException("geoip_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax"); + } + this.geoIPProcessorService = geoIpConfigSupplier.getGeoIPProcessorService().orElseThrow(() -> new IllegalStateException("geoip_service configuration is required when using geoip processor.")); this.geoIPProcessorConfig = geoIPProcessorConfig; diff --git a/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessorConfig.java b/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessorConfig.java index 8d9c56c899..23b351390a 100644 --- a/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessorConfig.java +++ b/data-prepper-plugins/geoip-processor/src/main/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessorConfig.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.geoip.processor; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Size; @@ -21,18 +22,25 @@ public class GeoIPProcessorConfig { @NotNull @Size(min = 1) @JsonProperty("entries") + @JsonPropertyDescription("The list of entries marked for enrichment.") private List entries; @JsonProperty("tags_on_engine_failure") + @JsonPropertyDescription("The tags to add to the event metadata if the geoip processor is unable to enrich an event due to an engine failure.") private List tagsOnEngineFailure; @JsonProperty("tags_on_ip_not_found") + @JsonPropertyDescription("The tags to add to the event metadata if the geoip processor is unable to find a location for the IP address.") private List tagsOnIPNotFound; @JsonProperty("tags_on_no_valid_ip") + @JsonPropertyDescription("The tags to add to the event metadata if the source field is not a valid IP address. This includes the localhost IP address.") private List tagsOnNoValidIp; @JsonProperty("geoip_when") + @JsonPropertyDescription("Specifies a condition for including Events in the `geoip` processor using a Data Prepper [conditional expression]" + + "(https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/)." + + " If specified, the `geoip` processor will only run when the expression evaluates to true.") private String whenCondition; /** diff --git a/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessorTest.java b/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessorTest.java index 0508f05b6c..7d28c46e0d 100644 --- a/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessorTest.java +++ b/data-prepper-plugins/geoip-processor/src/test/java/org/opensearch/dataprepper/plugins/geoip/processor/GeoIPProcessorTest.java @@ -20,6 +20,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.geoip.GeoIPField; import org.opensearch.dataprepper.plugins.geoip.exception.EngineFailureException; @@ -43,6 +44,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; @@ -91,7 +93,7 @@ class GeoIPProcessorTest { @BeforeEach void setUp() { - when(geoIpConfigSupplier.getGeoIPProcessorService()).thenReturn(Optional.of(geoIPProcessorService)); + lenient().when(geoIpConfigSupplier.getGeoIPProcessorService()).thenReturn(Optional.of(geoIPProcessorService)); lenient().when(geoIPProcessorService.getGeoIPDatabaseReader()).thenReturn(geoIPDatabaseReader); lenient().when(pluginMetrics.counter(GEO_IP_EVENTS_PROCESSED)).thenReturn(geoIpEventsProcessed); lenient().when(pluginMetrics.counter(GEO_IP_EVENTS_SUCCEEDED)).thenReturn(geoIpEventsSucceeded); @@ -113,12 +115,24 @@ private GeoIPProcessor createObjectUnderTest() { return new GeoIPProcessor(pluginMetrics, geoIPProcessorConfig, geoIpConfigSupplier, expressionEvaluator); } + @Test + void invalid_geoip_when_condition_throws_InvalidPluginConfigurationException() { + final String geoipWhen = UUID.randomUUID().toString(); + + when(geoIPProcessorConfig.getWhenCondition()).thenReturn(geoipWhen); + + when(expressionEvaluator.isValidExpressionStatement(geoipWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test void doExecuteTest_with_when_condition_should_enrich_events_that_match_when_condition() { final String whenCondition = "/peer/status == success"; when(geoIPProcessorConfig.getEntries()).thenReturn(List.of(entry)); when(geoIPProcessorConfig.getWhenCondition()).thenReturn(whenCondition); + when(expressionEvaluator.isValidExpressionStatement(whenCondition)).thenReturn(true); when(entry.getSource()).thenReturn("/peer/ip"); when(entry.getTarget()).thenReturn(TARGET); when(entry.getGeoIPFields()).thenReturn(setFields()); @@ -150,6 +164,7 @@ void doExecuteTest_with_when_condition_should_not_enrich_if_when_condition_is_fa when(geoIPProcessorConfig.getEntries()).thenReturn(List.of(entry)); when(geoIPProcessorConfig.getWhenCondition()).thenReturn(whenCondition); + when(expressionEvaluator.isValidExpressionStatement(whenCondition)).thenReturn(true); final GeoIPProcessor geoIPProcessor = createObjectUnderTest(); diff --git a/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java b/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java index 8cc9c6a716..6470830e49 100644 --- a/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java +++ b/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.annotations.SingleThread; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -119,6 +120,11 @@ public GrokProcessor(final PluginMetrics pluginMetrics, registerPatterns(); compileMatchPatterns(); + + if (grokProcessorConfig.getGrokWhen() != null && + (!expressionEvaluator.isValidExpressionStatement(grokProcessorConfig.getGrokWhen()))) { + throw new InvalidPluginConfigurationException("grok_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax"); + } } /** diff --git a/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java b/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java index aedad1fe5c..26f8420c1f 100644 --- a/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java +++ b/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java @@ -25,6 +25,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import java.util.ArrayList; @@ -46,6 +47,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; @@ -155,6 +157,16 @@ private GrokProcessor createObjectUnderTest() { pluginMetrics, grokProcessorConfig, grokCompiler, executorService, expressionEvaluator); } + @Test + void invalid_grok_when_throws_InvalidPluginConfigurationException() { + final String grokWhen = UUID.randomUUID().toString(); + + when(grokProcessorConfig.getGrokWhen()).thenReturn(grokWhen); + when(expressionEvaluator.isValidExpressionStatement(grokWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test public void testMatchMerge() throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException { when(grokProcessorConfig.getIncludePerformanceMetadata()).thenReturn(false); @@ -798,6 +810,7 @@ private void configureDefaultGrokProcessorConfig() { public void testNoGrok_when_GrokWhen_returns_false() throws JsonProcessingException { final String grokWhen = UUID.randomUUID().toString(); when(grokProcessorConfig.getGrokWhen()).thenReturn(grokWhen); + when(expressionEvaluator.isValidExpressionStatement(grokWhen)).thenReturn(true); grokProcessor = createObjectUnderTest(); capture.put("key_capture_1", "value_capture_1"); diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java index bcc8eb0a27..b3f283136a 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java @@ -176,7 +176,7 @@ public class KeyValueProcessorConfig { private boolean dropKeysWithNoValue = false; @JsonProperty("key_value_when") - @JsonPropertyDescription("Allows you to specify a [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + + @JsonPropertyDescription("Allows you to specify a Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + "such as `/some-key == \"test\"`, that will be evaluated to determine whether " + "the processor should be applied to the event.") private String keyValueWhen; diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java index b501eea7e0..816e03c211 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -39,6 +40,14 @@ public AddEntryProcessor(final PluginMetrics pluginMetrics, final AddEntryProces super(pluginMetrics); this.entries = config.getEntries(); this.expressionEvaluator = expressionEvaluator; + + config.getEntries().forEach(entry -> { + if (entry.getAddWhen() != null + && !expressionEvaluator.isValidExpressionStatement(entry.getAddWhen())) { + throw new InvalidPluginConfigurationException( + String.format("add_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getAddWhen())); + } + }); } @Override diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessor.java index 24f56ef2ba..53ca543c17 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessor.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -55,6 +56,12 @@ public ConvertEntryTypeProcessor(final PluginMetrics pluginMetrics, .orElse(List.of()); this.expressionEvaluator = expressionEvaluator; this.tagsOnFailure = convertEntryTypeProcessorConfig.getTagsOnFailure(); + + if (convertWhen != null + && !expressionEvaluator.isValidExpressionStatement(convertWhen)) { + throw new InvalidPluginConfigurationException( + String.format("convert_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", convertWhen)); + } } @Override diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessorConfig.java index 448d9bb0a4..ce6aaed15c 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessorConfig.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.processor.mutateevent; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import org.opensearch.dataprepper.typeconverter.ConverterArguments; import java.util.List; @@ -13,27 +14,34 @@ public class ConvertEntryTypeProcessorConfig implements ConverterArguments { @JsonProperty("key") + @JsonPropertyDescription("Key whose value needs to be converted to a different type.") private String key; @JsonProperty("keys") + @JsonPropertyDescription("List of keys whose value needs to be converted to a different type.") private List keys; @JsonProperty("type") + @JsonPropertyDescription("Target type for the key-value pair. Possible values are integer, long, double, big_decimal, string, and boolean. Default value is integer.") private TargetType type = TargetType.INTEGER; /** * Optional scale value used only in the case of BigDecimal converter */ @JsonProperty("scale") + @JsonPropertyDescription("Modifies the scale of the big_decimal when converting to a big_decimal. The default value is 0.") private int scale = 0; @JsonProperty("convert_when") + @JsonPropertyDescription("Specifies a condition using a [Data Prepper expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/) for performing the convert_entry_type operation. If specified, the convert_entry_type operation runs only when the expression evaluates to true.") private String convertWhen; @JsonProperty("null_values") + @JsonPropertyDescription("String representation of what constitutes a null value. If the field value equals one of these strings, then the value is considered null and is converted to null.") private List nullValues; @JsonProperty("tags_on_failure") + @JsonPropertyDescription("A list of tags to be added to the event metadata when the event fails to convert.") private List tagsOnFailure; public String getKey() { diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/CopyValueProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/CopyValueProcessor.java index 845ae40e38..0ed56b1b8d 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/CopyValueProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/CopyValueProcessor.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -38,6 +39,14 @@ public CopyValueProcessor(final PluginMetrics pluginMetrics, final CopyValueProc this.config = config; this.entries = config.getEntries(); this.expressionEvaluator = expressionEvaluator; + + config.getEntries().forEach(entry -> { + if (entry.getCopyWhen() != null + && !expressionEvaluator.isValidExpressionStatement(entry.getCopyWhen())) { + throw new InvalidPluginConfigurationException( + String.format("copy_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getCopyWhen())); + } + }); } @Override diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java index cfadf70d03..5dc0ee8b42 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessor.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventKey; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -38,6 +39,12 @@ public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntry this.entries = config.getWithKeys(); this.deleteWhen = config.getDeleteWhen(); this.expressionEvaluator = expressionEvaluator; + + if (deleteWhen != null + && !expressionEvaluator.isValidExpressionStatement(deleteWhen)) { + throw new InvalidPluginConfigurationException( + String.format("delete_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", deleteWhen)); + } } @Override diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessor.java index d042f8fa28..9c8655a4cb 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessor.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -38,6 +39,13 @@ public ListToMapProcessor(final PluginMetrics pluginMetrics, final ListToMapProc super(pluginMetrics); this.config = config; this.expressionEvaluator = expressionEvaluator; + + if (config.getListToMapWhen() != null + && !expressionEvaluator.isValidExpressionStatement(config.getListToMapWhen())) { + throw new InvalidPluginConfigurationException( + String.format("list_to_map_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", + config.getListToMapWhen())); + } } @Override diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorConfig.java index b63deb727c..13b6c2f0fb 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorConfig.java @@ -84,7 +84,7 @@ static FlattenedElement fromOptionValue(final String option) { private FlattenedElement flattenedElement = FlattenedElement.FIRST; @JsonProperty("list_to_map_when") - @JsonPropertyDescription("A [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + + @JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + "such as `/some-key == \"test\"'`, that will be evaluated to determine whether the processor will be " + "run on the event. Default is `null`. All events will be processed unless otherwise stated.") private String listToMapWhen; diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java index d911cd6194..38b5c3cf82 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -43,6 +44,13 @@ public MapToListProcessor(final PluginMetrics pluginMetrics, final MapToListProc this.config = config; this.expressionEvaluator = expressionEvaluator; excludeKeySet.addAll(config.getExcludeKeys()); + + if (config.getMapToListWhen() != null + && !expressionEvaluator.isValidExpressionStatement(config.getMapToListWhen())) { + throw new InvalidPluginConfigurationException( + String.format("map_to_list_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", + config.getMapToListWhen())); + } } @Override diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java index ce317eca49..a402c27cdc 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java @@ -40,7 +40,7 @@ public class MapToListProcessorConfig { private String valueName = DEFAULT_VALUE_NAME; @JsonProperty("map_to_list_when") - @JsonPropertyDescription("A [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + + @JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + "such as `/some-key == \"test\"'`, that will be evaluated to determine whether the processor will " + "be run on the event. Default is `null`. All events will be processed unless otherwise stated.") private String mapToListWhen; diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java index 05c1ad7530..25fb27a9fe 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -35,6 +36,15 @@ public RenameKeyProcessor(final PluginMetrics pluginMetrics, final RenameKeyProc super(pluginMetrics); this.entries = config.getEntries(); this.expressionEvaluator = expressionEvaluator; + + config.getEntries().forEach(entry -> { + if (entry.getRenameWhen() != null + && !expressionEvaluator.isValidExpressionStatement(entry.getRenameWhen())) { + throw new InvalidPluginConfigurationException( + String.format("rename_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", + entry.getRenameWhen())); + } + }); } @Override diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorConfig.java index d1ee0178a6..7449cc9968 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorConfig.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.processor.mutateevent; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; @@ -30,9 +31,13 @@ public static class Entry { private EventKey toKey; @JsonProperty("overwrite_if_to_key_exists") + @JsonPropertyDescription("When set to true, the existing value is overwritten if key already exists in the event. The default value is false.") private boolean overwriteIfToKeyExists = false; @JsonProperty("rename_when") + @JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + + "such as `/some-key == \"test\"'`, that will be evaluated to determine whether the processor will be " + + "run on the event. Default is `null`. All events will be processed unless otherwise stated.") private String renameWhen; public EventKey getFromKey() { diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorConfig.java index e19723f20d..a9a8ff9ce6 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorConfig.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.processor.mutateevent; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; @@ -15,9 +16,13 @@ public class SelectEntriesProcessorConfig { @NotEmpty @NotNull @JsonProperty("include_keys") + @JsonPropertyDescription("A list of keys to be selected from an event.") private List includeKeys; @JsonProperty("select_when") + @JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + + "such as `/some-key == \"test\"'`, that will be evaluated to determine whether the processor will be " + + "run on the event. Default is `null`. All events will be processed unless otherwise stated.") private String selectWhen; public List getIncludeKeys() { diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java index 36dc7ac5d4..2a61d05241 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import java.util.ArrayList; @@ -46,6 +47,17 @@ public class AddEntryProcessorTests { @Mock private ExpressionEvaluator expressionEvaluator; + @Test + void invalid_add_when_throws_InvalidPluginConfigurationException() { + final String addWhen = UUID.randomUUID().toString(); + + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,addWhen))); + + when(expressionEvaluator.isValidExpressionStatement(addWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test public void testSingleAddProcessorTests() { when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,null))); @@ -417,6 +429,7 @@ public void testKeyIsNotAdded_when_addWhen_condition_is_false() { final String addWhen = UUID.randomUUID().toString(); when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("newMessage", null, 3, null, null, false, false,addWhen))); + when(expressionEvaluator.isValidExpressionStatement(addWhen)).thenReturn(true); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -434,6 +447,7 @@ public void testMetadataKeyIsNotAdded_when_addWhen_condition_is_false() { final String addWhen = UUID.randomUUID().toString(); when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "newMessage", 3, null, null, false, false,addWhen))); + when(expressionEvaluator.isValidExpressionStatement(addWhen)).thenReturn(true); final AddEntryProcessor processor = createObjectUnderTest(); final Record record = getEventWithMetadata("thisisamessage", Map.of("key", "value")); diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessorTests.java index 5f8b66a6a6..4165e32934 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessorTests.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ConvertEntryTypeProcessorTests.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import java.math.BigDecimal; @@ -89,6 +90,18 @@ private Event executeAndGetProcessedEvent(final Record record) { return event; } + @Test + void invalid_convert_when_throws_InvalidPluginConfigurationException() { + final String convertWhen = UUID.randomUUID().toString(); + + when(mockConfig.getType()).thenReturn(TargetType.fromOptionValue("integer")); + when(mockConfig.getConvertWhen()).thenReturn(convertWhen); + + when(expressionEvaluator.isValidExpressionStatement(convertWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, () -> new ConvertEntryTypeProcessor(pluginMetrics, mockConfig, expressionEvaluator)); + } + @Test void testStringToIntegerConvertEntryTypeProcessor() { Integer testValue = 123; @@ -289,6 +302,7 @@ void testNoConversionWhenConvertWhenIsFalse() { final String convertWhen = UUID.randomUUID().toString(); when(mockConfig.getType()).thenReturn(TargetType.fromOptionValue("integer")); when(mockConfig.getConvertWhen()).thenReturn(convertWhen); + when(expressionEvaluator.isValidExpressionStatement(convertWhen)).thenReturn(true); final Record record = getMessage(UUID.randomUUID().toString(), testValue); when(expressionEvaluator.evaluateConditional(convertWhen, record.getData())).thenReturn(false); diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/CopyValueProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/CopyValueProcessorTests.java index 98c6ad7d2c..bc1a9f6d1a 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/CopyValueProcessorTests.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/CopyValueProcessorTests.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import java.util.Arrays; @@ -27,6 +28,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.when; @@ -48,6 +50,16 @@ void setUp() { lenient().when(mockConfig.getOverwriteIfToListExists()).thenReturn(false); } + @Test + void invalid_copy_when_throws_InvalidPluginConfigurationException() { + final String copyWhen = UUID.randomUUID().toString(); + + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false, copyWhen))); + when(expressionEvaluator.isValidExpressionStatement(copyWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test public void testSingleCopyProcessorTests() { when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false, null))); @@ -184,6 +196,7 @@ public void testKey_is_not_copied_when_copyWhen_returns_false() { when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message2", "newMessage", true, copyWhen))); + when(expressionEvaluator.isValidExpressionStatement(copyWhen)).thenReturn(true); final CopyValueProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -277,6 +290,8 @@ public void testCopyEntriesFromListWithWhenConditions() { createEntry("color", "fruit_color", true, copyWhen) )); + when(expressionEvaluator.isValidExpressionStatement(copyWhen)).thenReturn(true); + final CopyValueProcessor processor = createObjectUnderTest(); final Record record = getEventWithLists(List.of( Map.of("name", "apple", "color", "red", "shape", "round"), diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorTests.java index bc0fb78870..f2453e6ac0 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorTests.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/DeleteEntryProcessorTests.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventKeyFactory; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import java.util.Collections; @@ -25,6 +26,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -40,6 +42,17 @@ public class DeleteEntryProcessorTests { private final EventKeyFactory eventKeyFactory = TestEventKeyFactory.getTestEventFactory(); + @Test + void invalid_delete_when_throws_InvalidPluginConfigurationException() { + final String deleteWhen = UUID.randomUUID().toString(); + + when(mockConfig.getDeleteWhen()).thenReturn(deleteWhen); + + when(expressionEvaluator.isValidExpressionStatement(deleteWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test public void testSingleDeleteProcessorTest() { when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey("message", EventKeyFactory.EventAction.DELETE))); @@ -92,6 +105,7 @@ public void testKeyIsNotDeleted_when_deleteWhen_returns_false() { when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey("message", EventKeyFactory.EventAction.DELETE))); final String deleteWhen = UUID.randomUUID().toString(); when(mockConfig.getDeleteWhen()).thenReturn(deleteWhen); + when(expressionEvaluator.isValidExpressionStatement(deleteWhen)).thenReturn(true); final DeleteEntryProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorTest.java index a32bd8f8d5..a5321b719e 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorTest.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorTest.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import java.util.Collections; @@ -26,6 +27,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.when; @@ -51,6 +53,16 @@ void setUp() { lenient().when(mockConfig.getExtractValue()).thenReturn(false); } + @Test + void invalid_list_to_map_when_throws_InvalidPluginConfigurationException() { + final String listToMapWhen = UUID.randomUUID().toString(); + when(mockConfig.getListToMapWhen()).thenReturn(listToMapWhen); + + when(expressionEvaluator.isValidExpressionStatement(listToMapWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test public void testValueExtractionWithFlattenAndWriteToRoot() { when(mockConfig.getValueKey()).thenReturn("value"); @@ -329,6 +341,7 @@ public void testTagsAreAddedOnFailure() { public void testNoValueExtraction_when_the_when_condition_returns_false() { final String whenCondition = UUID.randomUUID().toString(); when(mockConfig.getListToMapWhen()).thenReturn(whenCondition); + when(expressionEvaluator.isValidExpressionStatement(whenCondition)).thenReturn(true); final ListToMapProcessor processor = createObjectUnderTest(); final Record testRecord = createTestRecord(); diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java index 83d736ba21..1b2ca68833 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import java.util.ArrayList; @@ -26,6 +27,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.when; @@ -53,6 +55,16 @@ void setUp() { lenient().when(mockConfig.getTagsOnFailure()).thenReturn(new ArrayList<>()); } + @Test + void invalid_map_to_list_when_throws_InvalidPluginConfigurationException() { + final String mapToListWhen = UUID.randomUUID().toString(); + when(mockConfig.getMapToListWhen()).thenReturn(mapToListWhen); + + when(expressionEvaluator.isValidExpressionStatement(mapToListWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test void testMapToListSuccessWithDefaultOptions() { @@ -312,6 +324,7 @@ public void testConvertFieldToListSuccessWithRootAsSource() { public void testEventNotProcessedWhenTheWhenConditionIsFalse() { final String whenCondition = UUID.randomUUID().toString(); when(mockConfig.getMapToListWhen()).thenReturn(whenCondition); + when(expressionEvaluator.isValidExpressionStatement(whenCondition)).thenReturn(true); final MapToListProcessor processor = createObjectUnderTest(); final Record testRecord = createTestRecord(); diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorTests.java index 6ae362bc46..3cdf47e344 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorTests.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorTests.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.event.EventKey; import org.opensearch.dataprepper.model.event.EventKeyFactory; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -29,6 +30,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -44,6 +46,17 @@ public class RenameKeyProcessorTests { private final EventKeyFactory eventKeyFactory = TestEventKeyFactory.getTestEventFactory(); + @Test + void invalid_rename_when_throws_InvalidPluginConfigurationException() { + final String renameWhen = UUID.randomUUID().toString(); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, renameWhen))); + + + when(expressionEvaluator.isValidExpressionStatement(renameWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test public void testSingleOverwriteRenameProcessorTests() { when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, null))); @@ -123,6 +136,7 @@ public void testNoRename_when_RenameWhen_returns_false() { final String renameWhen = UUID.randomUUID().toString(); when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false, renameWhen))); + when(expressionEvaluator.isValidExpressionStatement(renameWhen)).thenReturn(true); final RenameKeyProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorTests.java index 048f304582..c514687606 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorTests.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/SelectEntriesProcessorTests.java @@ -39,6 +39,16 @@ public class SelectEntriesProcessorTests { @Mock private ExpressionEvaluator expressionEvaluator; + @Test + void invalid_select_when_throws_InvalidPluginConfigurationException() { + final String selectWhen = UUID.randomUUID().toString(); + + when(mockConfig.getSelectWhen()).thenReturn(selectWhen); + + when(expressionEvaluator.isValidExpressionStatement(selectWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } @Test public void testSelectEntriesProcessor() { when(mockConfig.getIncludeKeys()).thenReturn(List.of("key1", "key2")); diff --git a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SplitStringProcessor.java b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SplitStringProcessor.java index 6bc89178d8..eacc41b151 100644 --- a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SplitStringProcessor.java +++ b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SplitStringProcessor.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventKey; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.Processor; import java.util.HashMap; @@ -43,6 +44,12 @@ public SplitStringProcessor(final PluginMetrics pluginMetrics, final SplitString } else { patternMap.put(entry.getDelimiter(), Pattern.compile(Pattern.quote(entry.getDelimiter()))); } + + if (entry.getSplitWhen() != null + && !expressionEvaluator.isValidExpressionStatement(entry.getSplitWhen())) { + throw new InvalidPluginConfigurationException( + String.format("split_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getSplitWhen())); + } } } diff --git a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SubstituteStringProcessor.java b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SubstituteStringProcessor.java index e6dceb62fc..dbfc2d07c8 100644 --- a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SubstituteStringProcessor.java +++ b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SubstituteStringProcessor.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventKey; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.Processor; import java.util.HashMap; @@ -35,6 +36,12 @@ public SubstituteStringProcessor(final PluginMetrics pluginMetrics, final Substi for(final SubstituteStringProcessorConfig.Entry entry : config.getEntries()) { patternMap.put(entry.getFrom(), Pattern.compile(entry.getFrom())); + + if (entry.getSubstituteWhen() != null + && !expressionEvaluator.isValidExpressionStatement(entry.getSubstituteWhen())) { + throw new InvalidPluginConfigurationException( + String.format("substitute_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getSubstituteWhen())); + } } } diff --git a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SubstituteStringProcessorConfig.java b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SubstituteStringProcessorConfig.java index 4a8f53f0fe..722a42dcbe 100644 --- a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SubstituteStringProcessorConfig.java +++ b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SubstituteStringProcessorConfig.java @@ -24,6 +24,9 @@ public static class Entry { private String to; @JsonProperty("substitute_when") + @JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + + "such as `/some-key == \"test\"'`, that will be evaluated to determine whether the processor will be " + + "run on the event. Default is `null`. All events will be processed unless otherwise stated.") private String substituteWhen; public EventKey getSource() { @@ -50,7 +53,7 @@ public Entry(final EventKey source, final String from, final String to, final St public Entry() {} } - @JsonPropertyDescription("List of entries. Valid values are `source`, `from`, and `to`.") + @JsonPropertyDescription("List of entries. Valid values are `source`, `from`, and `to`, and `substitute_when`.") private List entries; public List getEntries() { diff --git a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SplitStringProcessorTests.java b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SplitStringProcessorTests.java index 7883dcfd05..ad24657f1a 100644 --- a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SplitStringProcessorTests.java +++ b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SplitStringProcessorTests.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.event.EventKey; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -57,6 +58,17 @@ private SplitStringProcessor createObjectUnderTest() { return new SplitStringProcessor(pluginMetrics, config, expressionEvaluator); } + @Test + void invalid_split_when_throws_InvalidPluginConfigurationException() { + final String splitWhen = UUID.randomUUID().toString(); + when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", ",", null, splitWhen))); + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", ",", null, splitWhen))); + + when(expressionEvaluator.isValidExpressionStatement(splitWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @ParameterizedTest @ArgumentsSource(SplitStringArgumentsProvider.class) void testSingleSplitProcessor(final String message, final List splitMessage) { @@ -111,6 +123,7 @@ void test_event_is_the_same_when_splitWhen_condition_returns_false() { when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", ",", null, splitWhen))); when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", ",", null, splitWhen))); + when(expressionEvaluator.isValidExpressionStatement(splitWhen)).thenReturn(true); final SplitStringProcessor splitStringProcessor = createObjectUnderTest(); final Record record = createEvent(message); diff --git a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SubstituteStringProcessorTests.java b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SubstituteStringProcessorTests.java index dd8d9b1dd8..854eb0a72c 100644 --- a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SubstituteStringProcessorTests.java +++ b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/SubstituteStringProcessorTests.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.event.EventKey; import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -56,6 +57,16 @@ public void setup() { lenient().when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", "a", "b", null))); } + @Test + void invalid_substitute_when_throws_InvalidPluginConfigurationException() { + final String substituteWhen = UUID.randomUUID().toString(); + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", "a", "b", substituteWhen))); + + when(expressionEvaluator.isValidExpressionStatement(substituteWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test public void testHappyPathSubstituteStringProcessor() { final SubstituteStringProcessor processor = createObjectUnderTest(); @@ -151,6 +162,7 @@ public void test_events_are_identical_when_substituteWhen_condition_returns_fals when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", "[?\\\\+]", "b", substituteWhen))); when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", "[?\\\\+]", "b", substituteWhen))); + when(expressionEvaluator.isValidExpressionStatement(substituteWhen)).thenReturn(true); final SubstituteStringProcessor processor = createObjectUnderTest(); final Record record = getEvent("abcd"); diff --git a/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessorConfig.java b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessorConfig.java index e5893476e0..8a6f8e17e5 100644 --- a/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessorConfig.java +++ b/data-prepper-plugins/obfuscate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/obfuscation/ObfuscationProcessorConfig.java @@ -24,6 +24,7 @@ public class ObfuscationProcessorConfig { private String source; @JsonProperty("patterns") + @JsonPropertyDescription("A list of regex patterns that allow you to obfuscate specific parts of a field. Only parts that match the regex pattern will obfuscate. When not provided, the processor obfuscates the whole field.") private List patterns; @JsonProperty("target") @@ -33,7 +34,7 @@ public class ObfuscationProcessorConfig { private String target; @JsonProperty("action") - @JsonPropertyDescription("The obfuscation action. As of Data Prepper 2.3, only the `mask` action is supported.") + @JsonPropertyDescription("The obfuscation action. Available actions include 'hash' and 'mask'.") private PluginModel action; @JsonProperty("obfuscate_when") @@ -94,7 +95,7 @@ public boolean getSingleWordOnly() { void validateObfuscateWhen(final ExpressionEvaluator expressionEvaluator) { if (obfuscateWhen != null && !expressionEvaluator.isValidExpressionStatement(obfuscateWhen)) { - throw new InvalidPluginConfigurationException(String.format("obfuscate_when value %s is not a valid Data Prepper expression statement", obfuscateWhen)); + throw new InvalidPluginConfigurationException(String.format("obfuscate_when value %s is not a valid Data Prepper expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", obfuscateWhen)); } } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java index 18acb3dfd8..a22d635163 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java @@ -13,6 +13,7 @@ import org.opensearch.dataprepper.model.event.EventKeyFactory; import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.record.Record; import io.micrometer.core.instrument.Counter; @@ -67,6 +68,13 @@ protected AbstractParseProcessor(final PluginMetrics pluginMetrics, processingFailuresCounter = pluginMetrics.counter(PROCESSING_FAILURES); this.expressionEvaluator = expressionEvaluator; this.eventKeyFactory = eventKeyFactory; + + if (commonParseConfig.getParseWhen() != null + && !expressionEvaluator.isValidExpressionStatement(commonParseConfig.getParseWhen())) { + throw new InvalidPluginConfigurationException( + String.format("parse_when value of %s is not a valid expression statement. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.", commonParseConfig.getParseWhen())); + } } /** diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java index 6fad364e17..2a8484131e 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorConfig.java @@ -21,24 +21,31 @@ public class ParseIonProcessorConfig implements CommonParseConfig { @NotBlank @JsonProperty("source") + @JsonPropertyDescription("The field in the event that will be parsed. Default value is message.") private String source = DEFAULT_SOURCE; @JsonProperty("destination") + @JsonPropertyDescription("The destination field of the parsed JSON. Defaults to the root of the event. Cannot be an empty string, /, or any white-space-only string because these are not valid event fields.") private String destination; @JsonProperty("pointer") + @JsonPropertyDescription("A JSON pointer to the field to be parsed. There is no pointer by default, meaning the entire source is parsed. The pointer can access JSON array indexes as well. If the JSON pointer is invalid then the entire source data is parsed into the outgoing event. If the key that is pointed to already exists in the event and the destination is the root, then the pointer uses the entire path of the key.") private String pointer; @JsonProperty("parse_when") + @JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the processor will be run on the event.") private String parseWhen; @JsonProperty("tags_on_failure") + @JsonPropertyDescription("A list of strings specifying the tags to be set in the event that the processor fails or an unknown exception occurs during parsing.") private List tagsOnFailure; @JsonProperty("overwrite_if_destination_exists") + @JsonPropertyDescription("Overwrites the destination if set to true. Set to false to prevent changing a destination value that exists. Defaults to true.") private boolean overwriteIfDestinationExists = true; @JsonProperty + @JsonPropertyDescription("If true, the configured source field will be deleted after the JSON data is parsed into separate fields.") private boolean deleteSource = false; @JsonProperty("handle_failed_events") diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java index b6a1b14a23..13a6e2e24a 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorConfig.java @@ -21,24 +21,31 @@ public class ParseJsonProcessorConfig implements CommonParseConfig { @NotBlank @JsonProperty("source") + @JsonPropertyDescription("The field in the event that will be parsed. Default value is message.") private String source = DEFAULT_SOURCE; @JsonProperty("destination") + @JsonPropertyDescription("The destination field of the parsed JSON. Defaults to the root of the event. Cannot be an empty string, /, or any white-space-only string because these are not valid event fields.") private String destination; @JsonProperty("pointer") + @JsonPropertyDescription("A JSON pointer to the field to be parsed. There is no pointer by default, meaning the entire source is parsed. The pointer can access JSON array indexes as well. If the JSON pointer is invalid then the entire source data is parsed into the outgoing event. If the key that is pointed to already exists in the event and the destination is the root, then the pointer uses the entire path of the key.") private String pointer; @JsonProperty("parse_when") + @JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the processor will be run on the event.") private String parseWhen; @JsonProperty("tags_on_failure") + @JsonPropertyDescription("A list of strings specifying the tags to be set in the event that the processor fails or an unknown exception occurs during parsing.") private List tagsOnFailure; @JsonProperty("overwrite_if_destination_exists") + @JsonPropertyDescription("Overwrites the destination if set to true. Set to false to prevent changing a destination value that exists. Defaults to true.") private boolean overwriteIfDestinationExists = true; @JsonProperty + @JsonPropertyDescription("If true, the configured source field will be deleted after the JSON data is parsed into separate fields.") private boolean deleteSource = false; @JsonProperty("handle_failed_events") diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java index f84f2de4b6..82d19a2098 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorConfig.java @@ -16,24 +16,31 @@ public class ParseXmlProcessorConfig implements CommonParseConfig { @NotBlank @JsonProperty("source") + @JsonPropertyDescription("The field in the event that will be parsed. Default value is message.") private String source = DEFAULT_SOURCE; @JsonProperty("destination") + @JsonPropertyDescription("The destination field of the parsed JSON. Defaults to the root of the event. Cannot be an empty string, /, or any white-space-only string because these are not valid event fields.") private String destination; @JsonProperty("pointer") + @JsonPropertyDescription("A JSON pointer to the field to be parsed. There is no pointer by default, meaning the entire source is parsed. The pointer can access JSON array indexes as well. If the JSON pointer is invalid then the entire source data is parsed into the outgoing event. If the key that is pointed to already exists in the event and the destination is the root, then the pointer uses the entire path of the key.") private String pointer; @JsonProperty("parse_when") + @JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the processor will be run on the event.") private String parseWhen; @JsonProperty("tags_on_failure") + @JsonPropertyDescription("A list of strings specifying the tags to be set in the event that the processor fails or an unknown exception occurs during parsing.") private List tagsOnFailure; @JsonProperty("overwrite_if_destination_exists") + @JsonPropertyDescription("Overwrites the destination if set to true. Set to false to prevent changing a destination value that exists. Defaults to true.") private boolean overwriteIfDestinationExists = true; @JsonProperty + @JsonPropertyDescription("If true, the configured source field will be deleted after the JSON data is parsed into separate fields.") private boolean deleteSource = false; @JsonProperty("handle_failed_events") diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java index 8bd63c3eec..dce1c79b2e 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/ion/ParseIonProcessorTest.java @@ -12,13 +12,18 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.opensearch.dataprepper.plugins.processor.parse.json.ParseJsonProcessorTest; +import java.util.UUID; + import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verifyNoInteractions; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -40,7 +45,7 @@ public void setup() { when(pluginMetrics.counter("recordsIn")).thenReturn(mock(Counter.class)); when(pluginMetrics.counter("recordsOut")).thenReturn(mock(Counter.class)); when(pluginMetrics.counter("processingFailures")).thenReturn(this.processingFailuresCounter); - when(pluginMetrics.counter("parseErrors")).thenReturn(this.parseErrorsCounter); + lenient().when(pluginMetrics.counter("parseErrors")).thenReturn(this.parseErrorsCounter); when(processorConfig.getHandleFailedEventsOption()).thenReturn(handleFailedEventsOption); } @@ -49,6 +54,16 @@ protected AbstractParseProcessor createObjectUnderTest() { return new ParseIonProcessor(pluginMetrics, ionProcessorConfig, expressionEvaluator, testEventKeyFactory); } + @Test + void invalid_parse_when_throws_InvalidPluginConfigurationException() { + final String parseWhen = UUID.randomUUID().toString(); + + when(processorConfig.getParseWhen()).thenReturn(parseWhen); + when(expressionEvaluator.isValidExpressionStatement(parseWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test void test_when_using_ion_features_then_processorParsesCorrectly() { parseJsonProcessor = createObjectUnderTest(); diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java index 9aac54b23f..50dd55b501 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessorTest.java @@ -6,6 +6,11 @@ package org.opensearch.dataprepper.plugins.processor.parse.json; import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.record.Record; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -13,14 +18,10 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.event.TestEventFactory; import org.opensearch.dataprepper.event.TestEventKeyFactory; -import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.event.EventKeyFactory; import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; -import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.opensearch.dataprepper.plugins.processor.parse.CommonParseConfig; @@ -35,7 +36,9 @@ import static java.util.Map.entry; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -82,7 +85,7 @@ public void setup() { when(pluginMetrics.counter("recordsIn")).thenReturn(mock(Counter.class)); when(pluginMetrics.counter("recordsOut")).thenReturn(mock(Counter.class)); when(pluginMetrics.counter("processingFailures")).thenReturn(processingFailuresCounter); - when(pluginMetrics.counter("parseErrors")).thenReturn(parseErrorsCounter); + lenient().when(pluginMetrics.counter("parseErrors")).thenReturn(parseErrorsCounter); when(processorConfig.getHandleFailedEventsOption()).thenReturn(handleFailedEventsOption); } @@ -90,6 +93,16 @@ protected AbstractParseProcessor createObjectUnderTest() { return new ParseJsonProcessor(pluginMetrics, jsonProcessorConfig, expressionEvaluator, testEventKeyFactory); } + @Test + void invalid_parse_when_throws_InvalidPluginConfigurationException() { + final String parseWhen = UUID.randomUUID().toString(); + + when(processorConfig.getParseWhen()).thenReturn(parseWhen); + when(expressionEvaluator.isValidExpressionStatement(parseWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test void test_when_differentSourceAndDestination_then_processorParsesCorrectly() { final String source = "different_source"; @@ -413,6 +426,7 @@ void test_when_condition_skips_processing_when_evaluates_to_false() { final Map data = Collections.singletonMap("key", "value"); final String serializedMessage = convertMapToJSONString(data); final Record testEvent = createMessageEvent(serializedMessage); + when(expressionEvaluator.isValidExpressionStatement(whenCondition)).thenReturn(true); when(expressionEvaluator.evaluateConditional(whenCondition, testEvent.getData())).thenReturn(false); parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used @@ -439,6 +453,7 @@ void test_tags_when_json_parse_fails() { List testTags = List.of("tag1", "tag2"); when(processorConfig.getTagsOnFailure()).thenReturn(testTags); final Record testEvent = createMessageEvent("{key:}"); + when(expressionEvaluator.isValidExpressionStatement(whenCondition)).thenReturn(true); when(expressionEvaluator.evaluateConditional(whenCondition, testEvent.getData())).thenReturn(true); parseJsonProcessor = createObjectUnderTest(); @@ -461,6 +476,7 @@ void when_evaluate_conditional_throws_RuntimeException_events_are_not_dropped() final Map data = Collections.singletonMap("key", "value"); final String serializedMessage = convertMapToJSONString(data); final Record testEvent = createMessageEvent(serializedMessage); + when(expressionEvaluator.isValidExpressionStatement(whenCondition)).thenReturn(true); when(expressionEvaluator.evaluateConditional(whenCondition, testEvent.getData())).thenThrow(RuntimeException.class); parseJsonProcessor = createObjectUnderTest(); diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java index 900a7a7bef..b1dc756c25 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/parse/xml/ParseXmlProcessorTest.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.event.EventKeyFactory; import org.opensearch.dataprepper.model.event.HandleFailedEventsOption; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.processor.parse.AbstractParseProcessor; import org.opensearch.dataprepper.test.helper.ReflectivelySetField; @@ -32,9 +33,11 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.processor.parse.xml.ParseXmlProcessorConfig.DEFAULT_SOURCE; @@ -72,7 +75,7 @@ public void setup() { when(pluginMetrics.counter("recordsIn")).thenReturn(mock(Counter.class)); when(pluginMetrics.counter("recordsOut")).thenReturn(mock(Counter.class)); when(pluginMetrics.counter("processingFailures")).thenReturn(processingFailuresCounter); - when(pluginMetrics.counter("parseErrors")).thenReturn(parseErrorsCounter); + lenient().when(pluginMetrics.counter("parseErrors")).thenReturn(parseErrorsCounter); when(processorConfig.getHandleFailedEventsOption()).thenReturn(handleFailedEventsOption); } @@ -80,6 +83,16 @@ protected AbstractParseProcessor createObjectUnderTest() { return new ParseXmlProcessor(pluginMetrics, processorConfig, expressionEvaluator, testEventKeyFactory); } + @Test + void invalid_parse_when_throws_InvalidPluginConfigurationException() { + final String parseWhen = UUID.randomUUID().toString(); + + when(processorConfig.getParseWhen()).thenReturn(parseWhen); + when(expressionEvaluator.isValidExpressionStatement(parseWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @Test void test_when_using_xml_features_then_processorParsesCorrectly() { parseXmlProcessor = createObjectUnderTest(); diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java index 2063652a7c..6d29082bd6 100644 --- a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java +++ b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; @@ -39,6 +40,14 @@ public TruncateProcessor(final PluginMetrics pluginMetrics, final TruncateProces super(pluginMetrics); this.expressionEvaluator = expressionEvaluator; this.entries = config.getEntries(); + + config.getEntries().forEach(entry -> { + if (entry.getTruncateWhen() != null + && !expressionEvaluator.isValidExpressionStatement(entry.getTruncateWhen())) { + throw new InvalidPluginConfigurationException( + String.format("truncate_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getTruncateWhen())); + } + }); } private String getTruncatedValue(final String value, final int startIndex, final Integer length) { diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java index 02c83f5773..0d525cddd7 100644 --- a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java +++ b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java @@ -32,6 +32,7 @@ public static class Entry { private Integer length; @JsonProperty("recursive") + @JsonPropertyDescription("Recursively truncates the fields. If the value of a field is a map (json object), then it recursively applies truncate operation on the fields in the map.") private Boolean recurse = false; @JsonProperty("truncate_when") diff --git a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java index 00af15ed63..065b87ab80 100644 --- a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java +++ b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java @@ -9,12 +9,15 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; + +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.params.provider.Arguments.arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; @@ -50,6 +53,16 @@ private TruncateProcessor createObjectUnderTest() { return new TruncateProcessor(pluginMetrics, config, expressionEvaluator); } + @Test + void invalid_truncate_when_throws_InvalidPluginConfigurationException() { + final String truncateWhen = UUID.randomUUID().toString(); + when(expressionEvaluator.isValidExpressionStatement(truncateWhen)).thenReturn(false); + + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), null, null, truncateWhen, false))); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + @ParameterizedTest @ArgumentsSource(TruncateArgumentsProvider.class) void testTruncateProcessor(final Object messageValue, final Integer startAt, final Integer truncateLength, final Object truncatedMessage) { @@ -84,6 +97,7 @@ void test_event_is_the_same_when_truncateWhen_condition_returns_false() { final String message = UUID.randomUUID().toString(); when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), null, 5, truncateWhen, false))); + when(expressionEvaluator.isValidExpressionStatement(truncateWhen)).thenReturn(true); final TruncateProcessor truncateProcessor = createObjectUnderTest(); final Record record = createEvent("message", message); diff --git a/data-prepper-plugins/write-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/write_json/WriteJsonProcessorConfig.java b/data-prepper-plugins/write-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/write_json/WriteJsonProcessorConfig.java index 86d69dfedd..a36bc41ad5 100644 --- a/data-prepper-plugins/write-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/write_json/WriteJsonProcessorConfig.java +++ b/data-prepper-plugins/write-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/write_json/WriteJsonProcessorConfig.java @@ -6,14 +6,17 @@ package org.opensearch.dataprepper.plugins.processor.write_json; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.NotNull; public class WriteJsonProcessorConfig { @JsonProperty("source") + @JsonPropertyDescription("Specifies the name of the field in the event containing the message or object to be parsed.") @NotNull private String source; @JsonProperty("target") + @JsonPropertyDescription("Specifies the name of the field in which the resulting JSON string should be stored. If target is not specified, then the source field is used.") private String target; public String getSource() {