diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java index b260102d83..d3a038361a 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java @@ -23,7 +23,6 @@ "Then, the processor performs an action on each group, helping reduce unnecessary log volume and " + "creating aggregated logs over time.") public class AggregateProcessorConfig { - static int DEFAULT_GROUP_DURATION_SECONDS = 180; @JsonPropertyDescription("An unordered list by which to group events. Events with the same values as these keys are put into the same group. " + @@ -33,16 +32,16 @@ public class AggregateProcessorConfig { @NotEmpty private List identificationKeys; - @JsonPropertyDescription("The amount of time that a group should exist before it is concluded automatically. Supports ISO_8601 notation strings (\"PT20.345S\", \"PT15M\", etc.) as well as simple notation for seconds (\"60s\") and milliseconds (\"1500ms\"). Default value is 180s.") - @JsonProperty("group_duration") - private Duration groupDuration = Duration.ofSeconds(DEFAULT_GROUP_DURATION_SECONDS); - @JsonPropertyDescription("The action to be performed on each group. One of the available aggregate actions must be provided.") @JsonProperty("action") @NotNull @UsesDataPrepperPlugin(pluginType = AggregateAction.class) private PluginModel aggregateAction; + @JsonPropertyDescription("The amount of time that a group should exist before it is concluded automatically. Supports ISO_8601 notation strings (\"PT20.345S\", \"PT15M\", etc.) as well as simple notation for seconds (\"60s\") and milliseconds (\"1500ms\"). Default value is 180s.") + @JsonProperty("group_duration") + private Duration groupDuration = Duration.ofSeconds(DEFAULT_GROUP_DURATION_SECONDS); + @JsonPropertyDescription("When local_mode is set to true, the aggregation is performed locally on each node instead of forwarding events to a specific node based on the identification_keys using a hash function. Default is false.") @JsonProperty("local_mode") @NotNull diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateActionConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateActionConfig.java index 529ef0bde3..03067608cb 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateActionConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateActionConfig.java @@ -7,9 +7,13 @@ import java.util.List; +import com.fasterxml.jackson.annotation.JsonClassDescription; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +@JsonPropertyOrder +@JsonClassDescription("Appends multiple events into a single event.") public class AppendAggregateActionConfig { @JsonProperty("keys_to_append") diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java index c8fd772336..8b67ca64cd 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java @@ -51,7 +51,7 @@ public class CountAggregateAction implements AggregateAction { public final String countKey; public final String startTimeKey; public final String endTimeKey; - public final String outputFormat; + public final OutputFormat outputFormat; private long startTimeNanos; private final String metricName; private final IdentificationKeysHasher uniqueKeysHasher; @@ -141,7 +141,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA Instant endTime = (Instant)groupState.get(endTimeKey); groupState.remove(endTimeKey); groupState.remove(UNIQUE_KEYS_SETKEY); - if (outputFormat.equals(OutputFormat.RAW.toString())) { + if (outputFormat == OutputFormat.RAW) { groupState.put(startTimeKey, startTime.atZone(ZoneId.of(ZoneId.systemDefault().toString())).format(DateTimeFormatter.ofPattern(DATE_FORMAT))); event = JacksonEvent.builder() .withEventType(EVENT_TYPE) diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfig.java index a0325ee3a9..d1011f4145 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfig.java @@ -5,42 +5,49 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; -import java.util.HashSet; import java.util.List; -import java.util.Set; + +import com.fasterxml.jackson.annotation.JsonClassDescription; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +@JsonPropertyOrder +@JsonClassDescription("The count action counts events that belong to the same group and " + + "generates a new event with values of the identification_keys and the count, which indicates the number of new events.") public class CountAggregateActionConfig { static final String SUM_METRIC_NAME = "count"; public static final String DEFAULT_COUNT_KEY = "aggr._count"; public static final String DEFAULT_START_TIME_KEY = "aggr._start_time"; public static final String DEFAULT_END_TIME_KEY = "aggr._end_time"; - public static final Set validOutputFormats = new HashSet<>(Set.of(OutputFormat.OTEL_METRICS.toString(), OutputFormat.RAW.toString())); - @JsonPropertyDescription("Key used for storing the count. Default name is aggr._count.") - @JsonProperty("count_key") - String countKey = DEFAULT_COUNT_KEY; + @JsonPropertyDescription("Format of the aggregated event. Specifying otel_metrics outputs aggregate events in OTel metrics SUM type with count as value. " + + "Specifying raw outputs aggregate events as with the count_key field as a count value and includes the start_time_key and end_time_key keys.") + @JsonProperty("output_format") + OutputFormat outputFormat = OutputFormat.OTEL_METRICS; - @JsonPropertyDescription("Metric name to be used when otel format is used.") + @JsonPropertyDescription("Metric name to be used when the OTel metrics format is used. The default value is count.") @JsonProperty("metric_name") String metricName = SUM_METRIC_NAME; - @JsonPropertyDescription("List of unique keys to count.") - @JsonProperty("unique_keys") - List uniqueKeys = null; + @JsonPropertyDescription("The key in the aggregate event that will have the count value. " + + "This is the count of events in the aggregation. Default name is aggr._count.") + @JsonProperty("count_key") + String countKey = DEFAULT_COUNT_KEY; - @JsonPropertyDescription("Key used for storing the start time. Default name is aggr._start_time.") + @JsonPropertyDescription("The key in the aggregate event that will have the start time of the aggregation. " + + "Default name is aggr._start_time.") @JsonProperty("start_time_key") String startTimeKey = DEFAULT_START_TIME_KEY; - @JsonPropertyDescription("Key used for storing the end time. Default name is aggr._end_time.") + @JsonPropertyDescription("The key in the aggregate event that will have the end time of the aggregation. " + + "Default name is aggr._end_time.") @JsonProperty("end_time_key") String endTimeKey = DEFAULT_END_TIME_KEY; - @JsonPropertyDescription("Format of the aggregated event. otel_metrics is the default output format which outputs in OTel metrics SUM type with count as value. Other options is - raw - which generates a JSON object with the count_key field as a count value and the start_time_key field with aggregation start time as value.") - @JsonProperty("output_format") - String outputFormat = OutputFormat.OTEL_METRICS.toString(); + @JsonPropertyDescription("List of unique keys to count.") + @JsonProperty("unique_keys") + List uniqueKeys = null; public String getMetricName() { return metricName; @@ -62,10 +69,7 @@ public String getStartTimeKey() { return startTimeKey; } - public String getOutputFormat() { - if (!validOutputFormats.contains(outputFormat)) { - throw new IllegalArgumentException("Unknown output format " + outputFormat); - } + public OutputFormat getOutputFormat() { return outputFormat; } } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java index 0bded67d75..22cfa7efb7 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java @@ -49,7 +49,7 @@ public class HistogramAggregateAction implements AggregateAction { private final String bucketsKey; private final String startTimeKey; private final String endTimeKey; - private final String outputFormat; + private final OutputFormat outputFormat; private final String sumKey; private final String maxKey; private final String minKey; @@ -217,7 +217,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA List exemplarList = new ArrayList<>(); exemplarList.add(createExemplar("min", minEvent, minValue)); exemplarList.add(createExemplar("max", maxEvent, maxValue)); - if (outputFormat.equals(OutputFormat.RAW.toString())) { + if (outputFormat == OutputFormat.RAW) { groupState.put(histogramKey, key); groupState.put(durationKey, endTimeNanos-startTimeNanos); groupState.put(bucketsKey, Arrays.copyOfRange(this.buckets, 1, this.buckets.length-1)); diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionConfig.java index 6d89a1bd8f..11dafd7626 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionConfig.java @@ -5,13 +5,21 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; -import java.util.Set; import java.util.List; -import java.util.HashSet; + +import com.fasterxml.jackson.annotation.JsonClassDescription; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; import jakarta.validation.constraints.NotNull; +@JsonPropertyOrder +@JsonClassDescription("The histogram action aggregates events belonging to the same " + + "group and generates a new event with values of the identification_keys " + + "and histogram of the aggregated events based on a configured key. " + + "The histogram contains the number of events, sum, buckets, bucket counts, and optionally " + + "min and max of the values corresponding to the key. The action drops all events " + + "that make up the combined event.") public class HistogramAggregateActionConfig { public static final String HISTOGRAM_METRIC_NAME = "histogram"; public static final String DEFAULT_GENERATED_KEY_PREFIX = "aggr._"; @@ -24,13 +32,16 @@ public class HistogramAggregateActionConfig { public static final String START_TIME_KEY = "startTime"; public static final String END_TIME_KEY = "endTime"; public static final String DURATION_KEY = "duration"; - public static final Set validOutputFormats = new HashSet<>(Set.of(OutputFormat.OTEL_METRICS.toString(), OutputFormat.RAW.toString())); @JsonPropertyDescription("Name of the field in the events the histogram generates.") @JsonProperty("key") @NotNull String key; + @JsonPropertyDescription("Format of the aggregated event. otel_metrics is the default output format which outputs in OTel metrics SUM type with count as value. Other options is - raw - which generates a JSON object with the count_key field as a count value and the start_time_key field with aggregation start time as value.") + @JsonProperty("output_format") + OutputFormat outputFormat = OutputFormat.OTEL_METRICS; + @JsonPropertyDescription("The name of units for the values in the key. For example, bytes, traces etc") @JsonProperty("units") @NotNull @@ -49,10 +60,6 @@ public class HistogramAggregateActionConfig { @NotNull List buckets; - @JsonPropertyDescription("Format of the aggregated event. otel_metrics is the default output format which outputs in OTel metrics SUM type with count as value. Other options is - raw - which generates a JSON object with the count_key field as a count value and the start_time_key field with aggregation start time as value.") - @JsonProperty("output_format") - String outputFormat = OutputFormat.OTEL_METRICS.toString(); - @JsonPropertyDescription("A Boolean value indicating whether the histogram should include the min and max of the values in the aggregation.") @JsonProperty("record_minmax") boolean recordMinMax = false; @@ -120,10 +127,7 @@ public List getBuckets() { return buckets; } - public String getOutputFormat() { - if (!validOutputFormats.contains(outputFormat)) { - throw new IllegalArgumentException("Unknown output format " + outputFormat); - } + public OutputFormat getOutputFormat() { return outputFormat; } } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/OutputFormat.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/OutputFormat.java index d465355e4b..1fb77bd385 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/OutputFormat.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/OutputFormat.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; import java.util.Arrays; import java.util.Map; @@ -37,4 +38,8 @@ static OutputFormat fromOptionValue(final String option) { return ACTIONS_MAP.get(option.toLowerCase()); } + @JsonValue + public String getOptionValue() { + return name; + } } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateActionConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateActionConfig.java index 0a17e37c43..51d68a32b1 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateActionConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateActionConfig.java @@ -5,14 +5,18 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; +import com.fasterxml.jackson.annotation.JsonClassDescription; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; import jakarta.validation.constraints.NotNull; -import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.AssertTrue; +@JsonPropertyOrder +@JsonClassDescription("The percent_sampler action controls the number of events aggregated based " + + "on a percentage of events. The action drops any events not included in the percentage.") public class PercentSamplerAggregateActionConfig { - @JsonPropertyDescription("The percentage of events to be processed during a one second interval. Must be greater than 0.0 and less than 100.0") + @JsonPropertyDescription("The percentage of events to be processed during a one second interval. Must be greater than 0.0 and less than 100.0.") @JsonProperty("percent") @NotNull private double percent; diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java index 0f96584bd5..78debabb35 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; +import com.fasterxml.jackson.annotation.JsonClassDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; @@ -22,7 +24,7 @@ * most recently handled Event. * @since 1.3 */ -@DataPrepperPlugin(name = "put_all", pluginType = AggregateAction.class) +@DataPrepperPlugin(name = "put_all", pluginType = AggregateAction.class, pluginConfigurationType = PutAllAggregateAction.PutAllAggregateActionConfig.class) public class PutAllAggregateAction implements AggregateAction { static final String EVENT_TYPE = "event"; @@ -43,4 +45,10 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA return new AggregateActionOutput(List.of(event)); } + + @JsonPropertyOrder + @JsonClassDescription("The put_all action combines events belonging to the same group by overwriting existing keys and adding new keys, similarly to the Java `Map.putAll`. " + + "The action drops all events that make up the combined event.") + static class PutAllAggregateActionConfig { + } } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java index ecdb1b4438..3ea0d0b8af 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java @@ -22,7 +22,7 @@ @DataPrepperPlugin(name = "rate_limiter", pluginType = AggregateAction.class, pluginConfigurationType = RateLimiterAggregateActionConfig.class) public class RateLimiterAggregateAction implements AggregateAction { private final RateLimiter rateLimiter; - private final String rateLimiterMode; + private final RateLimiterMode rateLimiterMode; @DataPrepperPluginConstructor public RateLimiterAggregateAction(final RateLimiterAggregateActionConfig ratelimiterAggregateActionConfig) { @@ -33,7 +33,7 @@ public RateLimiterAggregateAction(final RateLimiterAggregateActionConfig ratelim @Override public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) { - if (rateLimiterMode.equals(RateLimiterMode.DROP.toString())) { + if (rateLimiterMode == RateLimiterMode.DROP) { if (!rateLimiter.tryAcquire()) { return AggregateActionResponse.nullEventResponse(); } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateActionConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateActionConfig.java index 158f3b1ac6..21eca83090 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateActionConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateActionConfig.java @@ -5,32 +5,32 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; -import java.util.Set; -import java.util.HashSet; +import com.fasterxml.jackson.annotation.JsonClassDescription; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; import jakarta.validation.constraints.NotNull; +@JsonPropertyOrder +@JsonClassDescription("The rate_limiter action controls the number of events aggregated per second. " + + "By default, rate_limiter blocks the aggregate processor from running if it receives more events than the configured number allowed. " + + "You can overwrite the number events that triggers the rate_limited by using the when_exceeds configuration option.") public class RateLimiterAggregateActionConfig { - public static final Set validRateLimiterModes = new HashSet<>(Set.of(RateLimiterMode.BLOCK.toString(), RateLimiterMode.DROP.toString())); - @JsonPropertyDescription("The number of events allowed per second.") @JsonProperty("events_per_second") @NotNull int eventsPerSecond; - @JsonPropertyDescription("Indicates what action the rate_limiter takes when the number of events received is greater than the number of events allowed per second. Default value is block, which blocks the processor from running after the maximum number of events allowed per second is reached until the next second. Alternatively, the drop option drops the excess events received in that second. Default is block") + @JsonPropertyDescription("Indicates what action the rate_limiter takes when the number of events received is greater than the number of events allowed per second. " + + "Default value is block, which blocks the processor from running after the maximum number of events allowed per second is reached until the next second. Alternatively, the drop option drops the excess events received in that second. Default is block") @JsonProperty("when_exceeds") - String whenExceedsMode = RateLimiterMode.BLOCK.toString(); + RateLimiterMode whenExceedsMode = RateLimiterMode.BLOCK; public int getEventsPerSecond() { return eventsPerSecond; } - public String getWhenExceeds() { - if (!validRateLimiterModes.contains(whenExceedsMode)) { - throw new IllegalArgumentException("Unknown rate limiter mode " + whenExceedsMode); - } + public RateLimiterMode getWhenExceeds() { return whenExceedsMode; } } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterMode.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterMode.java index bf7e5bba76..dd12d71de2 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterMode.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterMode.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; import java.util.Arrays; import java.util.Map; @@ -37,4 +38,9 @@ static RateLimiterMode fromOptionValue(final String option) { return ACTIONS_MAP.get(option.toLowerCase()); } + @JsonValue + public String getOptionValue() { + return name; + } + } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RemoveDuplicatesAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RemoveDuplicatesAggregateAction.java index 3d364eb0f2..7fc98d0bf8 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RemoveDuplicatesAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RemoveDuplicatesAggregateAction.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; +import com.fasterxml.jackson.annotation.JsonClassDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction; @@ -17,7 +19,8 @@ * that have a non-empty groupState associated with them * @since 1.3 */ -@DataPrepperPlugin(name = "remove_duplicates", pluginType = AggregateAction.class) +@DataPrepperPlugin(name = "remove_duplicates", pluginType = AggregateAction.class, + pluginConfigurationType = RemoveDuplicatesAggregateAction.RemoveDuplicatesAggregateActionConfig.class) public class RemoveDuplicatesAggregateAction implements AggregateAction { static final String GROUP_STATE_HAS_EVENT = "GROUP_STATE_HAS_EVENT"; @@ -31,4 +34,9 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct return AggregateActionResponse.nullEventResponse(); } + + @JsonPropertyOrder + @JsonClassDescription("The remove_duplicates action processes the first event for a group immediately and drops any events that duplicate the first event from the source.") + static class RemoveDuplicatesAggregateActionConfig { + } } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateActionConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateActionConfig.java index b8aba591f9..1c5ad1c393 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateActionConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateActionConfig.java @@ -5,25 +5,29 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; +import com.fasterxml.jackson.annotation.JsonClassDescription; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.AssertTrue; import java.time.Duration; +@JsonPropertyOrder +@JsonClassDescription("The tail_sampler action samples OpenTelemetry traces after collecting spans for a trace.") public class TailSamplerAggregateActionConfig { @JsonPropertyDescription("Period to wait before considering that a trace event is complete") @JsonProperty("wait_period") @NotNull private Duration waitPeriod; - @JsonPropertyDescription("Percent value to use for sampling non error events. 0.0 < percent < 100.0") + @JsonPropertyDescription("Percent value to use for sampling non error events. Must be greater than 0.0 and less than 100.0") @JsonProperty("percent") @NotNull private Integer percent; - @JsonPropertyDescription("A Data Prepper conditional expression, such as '/some-key == \"test\"', that will be evaluated to determine whether the event is an error event or not") + @JsonPropertyDescription("A conditional expression, such as '/some-key == \"test\"', that will be evaluated to determine whether the event is an error event or not") @JsonProperty("condition") private String condition; diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java index 46ec0a996e..a7608decec 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java @@ -297,7 +297,7 @@ void aggregateWithPercentSamplerAction(double testPercent) throws InterruptedExc void aggregateWithRateLimiterAction() throws InterruptedException { final int eventsPerSecond = 500; lenient().when(rateLimiterAggregateActionConfig.getEventsPerSecond()).thenReturn(eventsPerSecond); - lenient().when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.DROP.toString()); + lenient().when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.DROP); aggregateAction = new RateLimiterAggregateAction(rateLimiterAggregateActionConfig); when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) @@ -367,7 +367,7 @@ void aggregateWithRateLimiterActionNoDrops() throws InterruptedException { @RepeatedTest(value = 2) void aggregateWithCountAggregateAction() throws InterruptedException, NoSuchFieldException, IllegalAccessException { CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig(); - setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW.toString()); + setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW); aggregateAction = new CountAggregateAction(countAggregateActionConfig); when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) .thenReturn(aggregateAction); @@ -404,7 +404,7 @@ void aggregateWithCountAggregateAction() throws InterruptedException, NoSuchFiel @RepeatedTest(value = 2) void aggregateWithCountAggregateActionWithCondition() throws InterruptedException, NoSuchFieldException, IllegalAccessException { CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig(); - setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW.toString()); + setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW); aggregateAction = new CountAggregateAction(countAggregateActionConfig); when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) .thenReturn(aggregateAction); @@ -454,7 +454,7 @@ void aggregateWithCountAggregateActionWithUnaggregatedEvents() throws Interrupte String tag = UUID.randomUUID().toString(); when(aggregateProcessorConfig.getAggregatedEventsTag()).thenReturn(tag); CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig(); - setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW.toString()); + setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW); aggregateAction = new CountAggregateAction(countAggregateActionConfig); when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) .thenReturn(aggregateAction); @@ -496,7 +496,7 @@ void aggregateWithCountAggregateActionWithUnaggregatedEvents() throws Interrupte void aggregateWithHistogramAggregateAction() throws InterruptedException, NoSuchFieldException, IllegalAccessException { HistogramAggregateActionConfig histogramAggregateActionConfig = new HistogramAggregateActionConfig(); - setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "outputFormat", OutputFormat.RAW.toString()); + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "outputFormat", OutputFormat.RAW); final String testKey = RandomStringUtils.randomAlphabetic(5); setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "key", testKey); final String testKeyPrefix = RandomStringUtils.randomAlphabetic(4)+"_"; diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfigTests.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfigTests.java index 1975918e37..3581c27347 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfigTests.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionConfigTests.java @@ -5,7 +5,6 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; @@ -39,7 +38,7 @@ void setup() { void testDefault() { assertThat(countAggregateActionConfig.getCountKey(), equalTo(DEFAULT_COUNT_KEY)); assertThat(countAggregateActionConfig.getStartTimeKey(), equalTo(DEFAULT_START_TIME_KEY)); - assertThat(countAggregateActionConfig.getOutputFormat(), equalTo(OutputFormat.OTEL_METRICS.toString())); + assertThat(countAggregateActionConfig.getOutputFormat(), equalTo(OutputFormat.OTEL_METRICS)); assertThat(countAggregateActionConfig.getMetricName(), equalTo(CountAggregateActionConfig.SUM_METRIC_NAME)); assertThat(countAggregateActionConfig.getUniqueKeys(), equalTo(null)); } @@ -52,9 +51,9 @@ void testValidConfig() throws NoSuchFieldException, IllegalAccessException { final String testStartTimeKey = UUID.randomUUID().toString(); setField(CountAggregateActionConfig.class, countAggregateActionConfig, "startTimeKey", testStartTimeKey); assertThat(countAggregateActionConfig.getStartTimeKey(), equalTo(testStartTimeKey)); - final String testOutputFormat = OutputFormat.OTEL_METRICS.toString(); + final OutputFormat testOutputFormat = OutputFormat.OTEL_METRICS; setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", testOutputFormat); - assertThat(countAggregateActionConfig.getOutputFormat(), equalTo(OutputFormat.OTEL_METRICS.toString())); + assertThat(countAggregateActionConfig.getOutputFormat(), equalTo(OutputFormat.OTEL_METRICS)); final String testName = UUID.randomUUID().toString(); setField(CountAggregateActionConfig.class, countAggregateActionConfig, "metricName", testName); assertThat(countAggregateActionConfig.getMetricName(), equalTo(testName)); @@ -64,10 +63,4 @@ void testValidConfig() throws NoSuchFieldException, IllegalAccessException { setField(CountAggregateActionConfig.class, countAggregateActionConfig, "uniqueKeys", uniqueKeys); assertThat(countAggregateActionConfig.getUniqueKeys(), equalTo(uniqueKeys)); } - - @Test - void testInvalidConfig() throws NoSuchFieldException, IllegalAccessException { - setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", UUID.randomUUID().toString()); - assertThrows(IllegalArgumentException.class, () -> countAggregateActionConfig.getOutputFormat()); - } } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java index af81ca001f..c9ed55797b 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateActionTest.java @@ -52,7 +52,7 @@ private AggregateAction createObjectUnderTest(CountAggregateActionConfig config) void testCountAggregate(int testCount) throws NoSuchFieldException, IllegalAccessException { final String testName = UUID.randomUUID().toString(); CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig(); - setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW.toString()); + setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW); countAggregateAction = createObjectUnderTest(countAggregateActionConfig); final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); @@ -160,7 +160,7 @@ void testCountAggregateOTelFormatWithStartAndEndTimesInTheEvent(int testCount) { String endTimeKey = UUID.randomUUID().toString(); when(mockConfig.getStartTimeKey()).thenReturn(startTimeKey); when(mockConfig.getEndTimeKey()).thenReturn(endTimeKey); - when(mockConfig.getOutputFormat()).thenReturn(OutputFormat.OTEL_METRICS.toString()); + when(mockConfig.getOutputFormat()).thenReturn(OutputFormat.OTEL_METRICS); countAggregateAction = createObjectUnderTest(mockConfig); final String key1 = "key-"+UUID.randomUUID().toString(); final String value1 = UUID.randomUUID().toString(); diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionConfigTests.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionConfigTests.java index 60ba8dc202..de0be19bf2 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionConfigTests.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionConfigTests.java @@ -42,7 +42,7 @@ void setup() { void testDefault() { assertThat(histogramAggregateActionConfig.getGeneratedKeyPrefix(), equalTo(DEFAULT_GENERATED_KEY_PREFIX)); assertThat(histogramAggregateActionConfig.getRecordMinMax(), equalTo(false)); - assertThat(histogramAggregateActionConfig.getOutputFormat(), equalTo(OutputFormat.OTEL_METRICS.toString())); + assertThat(histogramAggregateActionConfig.getOutputFormat(), equalTo(OutputFormat.OTEL_METRICS)); assertThat(histogramAggregateActionConfig.getMetricName(), equalTo(HistogramAggregateActionConfig.HISTOGRAM_METRIC_NAME)); } @@ -53,9 +53,9 @@ void testValidConfig() throws NoSuchFieldException, IllegalAccessException { assertThat(histogramAggregateActionConfig.getGeneratedKeyPrefix(), equalTo(testGeneratedKeyPrefix)); setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "recordMinMax", true); assertThat(histogramAggregateActionConfig.getRecordMinMax(), equalTo(true)); - final String testOutputFormat = OutputFormat.OTEL_METRICS.toString(); + final OutputFormat testOutputFormat = OutputFormat.OTEL_METRICS; setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "outputFormat", testOutputFormat); - assertThat(histogramAggregateActionConfig.getOutputFormat(), equalTo(OutputFormat.OTEL_METRICS.toString())); + assertThat(histogramAggregateActionConfig.getOutputFormat(), equalTo(OutputFormat.OTEL_METRICS)); final String testKey = RandomStringUtils.randomAlphabetic(10); setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "key", testKey); assertThat(histogramAggregateActionConfig.getKey(), equalTo(testKey)); @@ -114,12 +114,6 @@ void testValidConfig() throws NoSuchFieldException, IllegalAccessException { assertThat(histogramAggregateActionConfig.getMetricName(), equalTo(testName)); } - @Test - void testInvalidOutputFormatConfig() throws NoSuchFieldException, IllegalAccessException { - setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "outputFormat", RandomStringUtils.randomAlphabetic(10)); - assertThrows(IllegalArgumentException.class, () -> histogramAggregateActionConfig.getOutputFormat()); - } - @Test void testInvalidBucketsConfig() throws NoSuchFieldException, IllegalAccessException { setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "buckets", new ArrayList()); diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java index 27e8f8d801..facd090a64 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateActionTests.java @@ -61,7 +61,7 @@ private AggregateAction createObjectUnderTest() { @ParameterizedTest @ValueSource(ints = {10, 20, 50, 100}) void testHistogramAggregate(final int testCount) throws NoSuchFieldException, IllegalAccessException { - setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "outputFormat", OutputFormat.RAW.toString()); + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "outputFormat", OutputFormat.RAW); final String testKeyPrefix = RandomStringUtils.randomAlphabetic(5)+"_"; setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "generatedKeyPrefix", testKeyPrefix); setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "units", "ms"); @@ -255,7 +255,7 @@ void testHistogramAggregateOTelFormatWithStartAndEndTimesInTheEvent(final int te when(histogramAggregateActionConfig.getEndTimeKey()).thenReturn(endTimeKey); final String testName = UUID.randomUUID().toString(); when(histogramAggregateActionConfig.getMetricName()).thenReturn(testName); - when(histogramAggregateActionConfig.getOutputFormat()).thenReturn(OutputFormat.OTEL_METRICS.toString()); + when(histogramAggregateActionConfig.getOutputFormat()).thenReturn(OutputFormat.OTEL_METRICS); String keyPrefix = UUID.randomUUID().toString(); final String testUnits = "ms"; when(histogramAggregateActionConfig.getUnits()).thenReturn(testUnits); @@ -381,7 +381,7 @@ void testHistogramAggregateOTelFormat_with_startTime_before_currentTime_and_all_ when(histogramAggregateActionConfig.getEndTimeKey()).thenReturn(endTimeKey); final String testName = UUID.randomUUID().toString(); when(histogramAggregateActionConfig.getMetricName()).thenReturn(testName); - when(histogramAggregateActionConfig.getOutputFormat()).thenReturn(OutputFormat.OTEL_METRICS.toString()); + when(histogramAggregateActionConfig.getOutputFormat()).thenReturn(OutputFormat.OTEL_METRICS); final String testUnits = "ms"; when(histogramAggregateActionConfig.getUnits()).thenReturn(testUnits); when(histogramAggregateActionConfig.getRecordMinMax()).thenReturn(true); diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/OutputFormatTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/OutputFormatTest.java index 9adaf228f3..53615a563e 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/OutputFormatTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/OutputFormatTest.java @@ -5,15 +5,25 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; +import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.EnumSource; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.emptyString; +import static org.junit.jupiter.params.provider.Arguments.arguments; public class OutputFormatTest { - @ParameterizedTest @EnumSource(OutputFormat.class) void fromOptionValue(final OutputFormat value) { @@ -21,4 +31,32 @@ void fromOptionValue(final OutputFormat value) { assertThat(value, instanceOf(OutputFormat.class)); } + @ParameterizedTest + @ArgumentsSource(OutputFormatToKnownName.class) + void fromOptionValue_returns_expected_value(final OutputFormat outputFormat, final String knownString) { + assertThat(OutputFormat.fromOptionValue(knownString), equalTo(outputFormat)); + } + + @ParameterizedTest + @EnumSource(OutputFormat.class) + void getOptionValue_returns_non_empty_string_for_all_types(final OutputFormat outputFormat) { + assertThat(outputFormat.getOptionValue(), notNullValue()); + assertThat(outputFormat.getOptionValue(), not(emptyString())); + } + + @ParameterizedTest + @ArgumentsSource(OutputFormatToKnownName.class) + void getOptionValue_returns_expected_name(final OutputFormat outputFormat, final String expectedString) { + assertThat(outputFormat.getOptionValue(), equalTo(expectedString)); + } + + static class OutputFormatToKnownName implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext extensionContext) { + return Stream.of( + arguments(OutputFormat.OTEL_METRICS, "otel_metrics"), + arguments(OutputFormat.RAW, "raw") + ); + } + } } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateActionConfigTests.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateActionConfigTests.java index e54b4a230d..5796d48609 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateActionConfigTests.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateActionConfigTests.java @@ -8,7 +8,6 @@ import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -16,7 +15,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; @ExtendWith(MockitoExtension.class) public class RateLimiterAggregateActionConfigTests { @@ -33,21 +31,15 @@ void setup() { @Test void testDefault() { - assertThat(rateLimiterAggregateActionConfig.getWhenExceeds(), equalTo(RateLimiterMode.BLOCK.toString())); - } - - @Test - void testInvalidConfig() throws NoSuchFieldException, IllegalAccessException { - setField(RateLimiterAggregateActionConfig.class, rateLimiterAggregateActionConfig, "whenExceedsMode", RandomStringUtils.randomAlphabetic(4)); - assertThrows(IllegalArgumentException.class, () -> rateLimiterAggregateActionConfig.getWhenExceeds()); + assertThat(rateLimiterAggregateActionConfig.getWhenExceeds(), equalTo(RateLimiterMode.BLOCK)); } @Test void testValidConfig() throws NoSuchFieldException, IllegalAccessException { final int testEventsPerSecond = ThreadLocalRandom.current().nextInt(); setField(RateLimiterAggregateActionConfig.class, rateLimiterAggregateActionConfig, "eventsPerSecond", testEventsPerSecond); - setField(RateLimiterAggregateActionConfig.class, rateLimiterAggregateActionConfig, "whenExceedsMode", "drop"); + setField(RateLimiterAggregateActionConfig.class, rateLimiterAggregateActionConfig, "whenExceedsMode", RateLimiterMode.fromOptionValue("drop")); assertThat(rateLimiterAggregateActionConfig.getEventsPerSecond(), equalTo(testEventsPerSecond)); - assertThat(rateLimiterAggregateActionConfig.getWhenExceeds(), equalTo(RateLimiterMode.DROP.toString())); + assertThat(rateLimiterAggregateActionConfig.getWhenExceeds(), equalTo(RateLimiterMode.DROP)); } } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateActionTests.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateActionTests.java index 202452bfce..b642a983f4 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateActionTests.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateActionTests.java @@ -46,7 +46,7 @@ private AggregateAction createObjectUnderTest(RateLimiterAggregateActionConfig c @ValueSource(ints = {1, 2, 100, 1000}) void testRateLimiterAggregateSmoothTraffic(int testEventsPerSecond) throws InterruptedException { when(rateLimiterAggregateActionConfig.getEventsPerSecond()).thenReturn(testEventsPerSecond); - when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.DROP.toString()); + when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.DROP); rateLimiterAggregateAction = createObjectUnderTest(rateLimiterAggregateActionConfig); final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); @@ -72,7 +72,7 @@ void testRateLimiterAggregateSmoothTraffic(int testEventsPerSecond) throws Inter @ValueSource(ints = {100, 200, 500, 1000}) void testRateLimiterInDropMode(int testEventsPerSecond) throws InterruptedException { when(rateLimiterAggregateActionConfig.getEventsPerSecond()).thenReturn(testEventsPerSecond); - when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.DROP.toString()); + when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.DROP); rateLimiterAggregateAction = createObjectUnderTest(rateLimiterAggregateActionConfig); final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); @@ -101,7 +101,7 @@ void testRateLimiterInDropMode(int testEventsPerSecond) throws InterruptedExcept @ValueSource(ints = {100, 200, 500, 1000}) void testRateLimiterInBlockMode(int testEventsPerSecond) throws InterruptedException { when(rateLimiterAggregateActionConfig.getEventsPerSecond()).thenReturn(testEventsPerSecond); - when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.BLOCK.toString()); + when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.BLOCK); rateLimiterAggregateAction = createObjectUnderTest(rateLimiterAggregateActionConfig); final String key = UUID.randomUUID().toString(); final String value = UUID.randomUUID().toString(); diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterModeTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterModeTest.java index 3fccff55ba..79de57a44d 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterModeTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterModeTest.java @@ -5,12 +5,23 @@ package org.opensearch.dataprepper.plugins.processor.aggregate.actions; +import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.EnumSource; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.emptyString; +import static org.junit.jupiter.params.provider.Arguments.arguments; public class RateLimiterModeTest { @@ -20,4 +31,33 @@ void fromOptionValue(final RateLimiterMode value) { assertThat(RateLimiterMode.fromOptionValue(value.name()), is(value)); assertThat(value, instanceOf(RateLimiterMode.class)); } + + @ParameterizedTest + @ArgumentsSource(RateLimiterModeToKnownName.class) + void fromOptionValue_returns_expected_value(final RateLimiterMode rateLimiterMode, final String knownString) { + assertThat(RateLimiterMode.fromOptionValue(knownString), equalTo(rateLimiterMode)); + } + + @ParameterizedTest + @EnumSource(RateLimiterMode.class) + void getOptionValue_returns_non_empty_string_for_all_types(final RateLimiterMode rateLimiterMode) { + assertThat(rateLimiterMode.getOptionValue(), notNullValue()); + assertThat(rateLimiterMode.getOptionValue(), not(emptyString())); + } + + @ParameterizedTest + @ArgumentsSource(RateLimiterModeToKnownName.class) + void getOptionValue_returns_expected_name(final RateLimiterMode rateLimiterMode, final String expectedString) { + assertThat(rateLimiterMode.getOptionValue(), equalTo(expectedString)); + } + + static class RateLimiterModeToKnownName implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext extensionContext) { + return Stream.of( + arguments(RateLimiterMode.DROP, "drop"), + arguments(RateLimiterMode.BLOCK, "block") + ); + } + } }