Skip to content

Commit

Permalink
Address missing processor JsonPropertyDescriptions and validations
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 committed Aug 15, 2024
1 parent 00cc2a5 commit d5b5d8d
Show file tree
Hide file tree
Showing 65 changed files with 528 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
Expand Down Expand Up @@ -78,6 +79,10 @@ public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfi
this.localMode = aggregateProcessorConfig.getLocalMode();

pluginMetrics.gauge(CURRENT_AGGREGATE_GROUPS, aggregateGroupManager, AggregateGroupManager::getAllGroupsSize);

if (aggregateProcessorConfig.getWhenCondition() != null && (!expressionEvaluator.isValidExpressionStatement(aggregateProcessorConfig.getWhenCondition()))) {
throw new InvalidPluginConfigurationException("aggregate_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax");
}
}

private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.configuration.PluginModel;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
Expand All @@ -28,7 +28,7 @@ public class AggregateProcessorConfig {
@JsonProperty("group_duration")
private Duration groupDuration = Duration.ofSeconds(DEFAULT_GROUP_DURATION_SECONDS);

@JsonPropertyDescription("The action to be performed on each group. One of the available aggregate actions must be provided, or you can create custom aggregate actions. remove_duplicates and put_all are the available actions. For more information, see Creating New Aggregate Actions.")
@JsonPropertyDescription("The action to be performed on each group. One of the available aggregate actions must be provided.")
@JsonProperty("action")
@NotNull
private PluginModel aggregateAction;
Expand Down Expand Up @@ -74,7 +74,7 @@ public Boolean getLocalMode() {
return localMode;
}

@AssertTrue(message="Aggragated Events Tag must be set when output_unaggregated_events is set")
@AssertTrue(message="Aggregated Events Tag must be set when output_unaggregated_events is set")
boolean isValidConfig() {
return (!outputUnaggregatedEvents || (outputUnaggregatedEvents && aggregatedEventsTag != null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@

public class AppendAggregateActionConfig {

@JsonPropertyDescription("List of keys to append.")
@JsonProperty("keys_to_append")
@JsonPropertyDescription("A list of keys to append to for the aggregated result.")
List<String> keysToAppend;

public List<String> getKeysToAppend() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
package org.opensearch.dataprepper.plugins.processor.aggregate.actions;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.NotNull;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.AssertTrue;

public class PercentSamplerAggregateActionConfig {
@JsonPropertyDescription("Percent value of the sampling to be done. 0.0 < percent < 100.0")
@JsonPropertyDescription("The percentage of events to be processed during a one second interval. Must be greater than 0.0 and less than 100.0")
@JsonProperty("percent")
@NotNull
private double percent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException {
String condition = "/firstRandomNumber < 100";
when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE));
when(aggregateProcessorConfig.getWhenCondition()).thenReturn(condition);
when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true);
int count = 0;
for (Record<Event> record: eventBatch) {
Event event = record.getData();
Expand Down Expand Up @@ -410,6 +411,7 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio
final String condition = "/firstRandomNumber < 100";
when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE));
when(aggregateProcessorConfig.getWhenCondition()).thenReturn(condition);
when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true);
int count = 0;
eventBatch = getBatchOfEvents(true);
for (Record<Event> record: eventBatch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
Expand Down Expand Up @@ -41,6 +42,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -152,6 +154,16 @@ void setUp() {
when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(timeElapsed);
}

@Test
void invalid_aggregate_when_statement_throws_InvalidPluginConfigurationException() {
final String whenCondition = UUID.randomUUID().toString();
when(aggregateProcessorConfig.getWhenCondition()).thenReturn(whenCondition);

when(expressionEvaluator.isValidExpressionStatement(whenCondition)).thenReturn(false);

assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}

@Test
void getIdentificationKeys_should_return_configured_identification_keys() {
final List<String> keys = List.of("key");
Expand Down Expand Up @@ -218,6 +230,7 @@ void handleEvent_returning_with_condition_eliminates_one_record() {
when(identificationKeysHasher.createIdentificationKeysMapFromEvent(firstEvent))
.thenReturn(identificationKeysMap);
when(aggregateActionSynchronizer.handleEventForGroup(firstEvent, identificationKeysMap, aggregateGroup)).thenReturn(firstAggregateActionResponse);
when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, event)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, firstEvent)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, secondEvent)).thenReturn(false);
Expand Down Expand Up @@ -280,6 +293,7 @@ void handleEvent_returning_with_condition_eliminates_one_record_local_only() {
when(identificationKeysHasher.createIdentificationKeysMapFromEvent(firstEvent))
.thenReturn(identificationKeysMap);
when(aggregateActionSynchronizer.handleEventForGroup(firstEvent, identificationKeysMap, aggregateGroup)).thenReturn(firstAggregateActionResponse);
when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, event)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, firstEvent)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, secondEvent)).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class CsvProcessorConfig {
private List<String> columnNames;

@JsonProperty("csv_when")
@JsonPropertyDescription("Allows you to specify a [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " +
@JsonPropertyDescription("Allows you to specify a Data Prepper conditional expression (https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " +
"such as `/some-key == \"test\"`, that will be evaluated to determine whether " +
"the processor should be applied to the event.")
private String csvWhen;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -63,6 +64,10 @@ public DateProcessor(PluginMetrics pluginMetrics, final DateProcessorConfig date

if (dateProcessorConfig.getMatch() != null)
extractKeyAndFormatters();

if (dateProcessorConfig.getDateWhen() != null && (!expressionEvaluator.isValidExpressionStatement(dateProcessorConfig.getDateWhen()))) {
throw new InvalidPluginConfigurationException("date_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static class DateMatch {
@JsonProperty("patterns")
@JsonPropertyDescription("A list of possible patterns that the timestamp value of the key can have. The patterns " +
"are based on a sequence of letters and symbols. The `patterns` support all the patterns listed in the " +
"Java [DatetimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) reference. " +
"Java DateTimeFormatter (https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) reference. " +
"The timestamp value also supports `epoch_second`, `epoch_milli`, and `epoch_nano` values, " +
"which represent the timestamp as the number of seconds, milliseconds, and nanoseconds since the epoch. " +
"Epoch values always use the UTC time zone.")
Expand All @@ -54,6 +54,7 @@ public List<String> getPatterns() {
}

@JsonIgnore
@AssertTrue
public boolean isValidPatterns() {
// For now, allow only one of the three "epoch_" pattern
int count = 0;
Expand Down Expand Up @@ -119,23 +120,23 @@ public static boolean isValidPattern(final String pattern) {
@JsonProperty("source_timezone")
@JsonPropertyDescription("The time zone used to parse dates, including when the zone or offset cannot be extracted " +
"from the value. If the zone or offset are part of the value, then the time zone is ignored. " +
"A list of all the available time zones is contained in the **TZ database name** column of " +
"[the list of database time zones](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List).")
"A list of all the available time zones is contained in the TZ database name column of " +
"(https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List).")
private String sourceTimezone = DEFAULT_SOURCE_TIMEZONE;

@JsonProperty("destination_timezone")
@JsonPropertyDescription("The time zone used for storing the timestamp in the `destination` field. " +
"A list of all the available time zones is contained in the **TZ database name** column of " +
"[the list of database time zones](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List).")
"A list of all the available time zones is contained in the TZ database name column of " +
"(https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List).")
private String destinationTimezone = DEFAULT_DESTINATION_TIMEZONE;

@JsonProperty("locale")
@JsonPropertyDescription("The location used for parsing dates. Commonly used for parsing month names (`MMM`). " +
"The value can contain language, country, or variant fields in IETF BCP 47, such as `en-US`, " +
"or a string representation of the " +
"[locale](https://docs.oracle.com/javase/8/docs/api/java/util/Locale.html) object, such as `en_US`. " +
"locale (https://docs.oracle.com/javase/8/docs/api/java/util/Locale.html) object, such as `en_US`. " +
"A full list of locale fields, including language, country, and variant, can be found in " +
"[the language subtag registry](https://www.iana.org/assignments/language-subtag-registry/language-subtag-registry). " +
"(https://www.iana.org/assignments/language-subtag-registry/language-subtag-registry). " +
"Default is `Locale.ROOT`.")
private String locale;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Instant;
Expand All @@ -44,6 +45,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
Expand Down Expand Up @@ -105,6 +107,17 @@ private DateProcessor createObjectUnderTest() {
return new DateProcessor(pluginMetrics, mockDateProcessorConfig, expressionEvaluator);
}

@Test
void invalid_date_when_condition_throws_InvalidPluginConfigurationException() {
final String dateWhen = UUID.randomUUID().toString();

when(mockDateProcessorConfig.getDateWhen()).thenReturn(dateWhen);

when(expressionEvaluator.isValidExpressionStatement(dateWhen)).thenReturn(false);

assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}

@Test
void from_time_received_with_default_destination_test() {
when(mockDateProcessorConfig.getFromTimeReceived()).thenReturn(true);
Expand All @@ -130,7 +143,9 @@ void from_time_received_with_default_destination_test() {

@Test
void date_when_does_not_run_date_processor_for_event_with_date_when_as_false() {
when(mockDateProcessorConfig.getDateWhen()).thenReturn(UUID.randomUUID().toString());
final String dateWhen = UUID.randomUUID().toString();
when(mockDateProcessorConfig.getDateWhen()).thenReturn(dateWhen);
when(expressionEvaluator.isValidExpressionStatement(dateWhen)).thenReturn(true);
dateProcessor = createObjectUnderTest();

Map<String, Object> testData = getTestData();
Expand Down Expand Up @@ -526,7 +541,9 @@ void match_without_year_test(String pattern) {

@Test
void date_processor_catches_exceptions_instead_of_throwing() {
when(mockDateProcessorConfig.getDateWhen()).thenReturn(UUID.randomUUID().toString());
final String dateWhen = UUID.randomUUID().toString();
when(mockDateProcessorConfig.getDateWhen()).thenReturn(dateWhen);
when(expressionEvaluator.isValidExpressionStatement(dateWhen)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(any(String.class), any(Event.class)))
.thenThrow(RuntimeException.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.EncodingType;
Expand All @@ -16,17 +17,22 @@

public class DecompressProcessorConfig {

@JsonPropertyDescription("The keys in the event that will be decompressed.")
@JsonProperty("keys")
@NotEmpty
@NotNull
private List<String> keys;

@JsonPropertyDescription("The type of decompression to use for the keys in the event. Only gzip is supported.")
@JsonProperty("type")
@NotNull
private DecompressionType decompressionType;

@JsonPropertyDescription("A conditional expression that determines when the decompress processor will run on certain events.")
@JsonProperty("decompress_when")
private String decompressWhen;

@JsonPropertyDescription("A list of strings with which to tag events when the processor fails to decompress the keys inside an event. Defaults to _decompression_failure.")
@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure = List.of("_decompression_failure");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -49,6 +50,11 @@ public DissectProcessor(PluginMetrics pluginMetrics, final DissectProcessorConfi
dissectorMap.put(key, dissector);
}

if (dissectConfig.getDissectWhen() != null &&
(!expressionEvaluator.isValidExpressionStatement(dissectConfig.getDissectWhen()))) {
throw new InvalidPluginConfigurationException("dissect_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax");
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ public class DissectProcessorConfig {
@NotNull
@JsonProperty("map")
@JsonPropertyDescription("Defines the `dissect` patterns for specific keys. For details on how to define fields " +
"in the `dissect` pattern, see [Field notations](#field-notations).")
"in the `dissect` pattern, see (https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/processors/dissect/#field-notations).")
private Map<String, String> map;
@JsonProperty("target_types")
@JsonPropertyDescription("Specifies the data types for extract fields. Valid options are `integer`, " +
"`double`, `string`, and `boolean`. By default, all fields are of the `string` type.")
"`double`, `string`, `long`, `big_decimal`, and `boolean`. By default, all fields are of the `string` type.")
private Map<String, TargetType> targetTypes;
@JsonProperty("dissect_when")
@JsonPropertyDescription("Specifies a condition for performing the `dissect` operation using a " +
"[Data Prepper expression]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/expression-syntax/). " +
@JsonPropertyDescription("Specifies a condition for performing the `dissect` operation using a Data Prepper conditional expression " +
"(https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/). " +
"If specified, the `dissect` operation will only run when the expression evaluates to true.")
private String dissectWhen;

Expand Down
Loading

0 comments on commit d5b5d8d

Please sign in to comment.