Skip to content

Commit

Permalink
MAINT: resolve conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: George Chen <qchea@amazon.com>
  • Loading branch information
chenqi0805 committed Aug 21, 2024
2 parents 37cf84b + aa1c5c5 commit f31693d
Show file tree
Hide file tree
Showing 175 changed files with 3,666 additions and 619 deletions.
5 changes: 3 additions & 2 deletions build-resources.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ ext.coreProjects = [
project(':data-prepper-plugins'),
project(':data-prepper-test-common'),
project(':data-prepper-test-event'),
project(':data-prepper-plugin-framework')
]
project(':data-prepper-plugin-framework'),
project(':data-prepper-plugin-schema-cli')
]
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,24 @@ default boolean isByteBuffer() {
return false;
}

/**
* returns max request size of an entry in the buffer
*
* @return Optional value of the buffer's max request size
*/
default Optional<Integer> getMaxRequestSize() {
return Optional.empty();
}

/**
* returns optimal request size of an entry in the buffer
*
* @return Optional value of the buffer's optimal request size
*/
default Optional<Integer> getOptimalRequestSize() {
return Optional.empty();
}

/**
* Checks if the buffer enables acknowledgements for the pipeline
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.drop;
package org.opensearch.dataprepper.model.event;

import org.opensearch.dataprepper.model.event.Event;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.slf4j.Logger;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

enum HandleFailedEventsOption {
DROP("drop", true, false),
DROP_SILENTLY("drop_silently", true, true),
SKIP("skip", false, false),
SKIP_SILENTLY("skip_silently", false, true);
public enum HandleFailedEventsOption {
DROP("drop", true, true),
DROP_SILENTLY("drop_silently", true, false),
SKIP("skip", false, true),
SKIP_SILENTLY("skip_silently", false, false);

private static final Map<String, HandleFailedEventsOption> OPTIONS_MAP = Arrays.stream(HandleFailedEventsOption.values())
.collect(Collectors.toMap(
Expand All @@ -37,13 +33,14 @@ enum HandleFailedEventsOption {
this.isLogRequired = isLogRequired;
}

public boolean isDropEventOption(final Event event, final Throwable cause, final Logger log) {
if (isLogRequired) {
log.warn(EVENT, "An exception occurred while processing when expression for event {}", event, cause);
}
public boolean shouldDropEvent() {
return isDropEventOption;
}

public boolean shouldLog() {
return isLogRequired;
}

@JsonCreator
static HandleFailedEventsOption fromOptionValue(final String option) {
return OPTIONS_MAP.get(option.toLowerCase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ void testMaxRequestSize() {
assertEquals(buffer.getMaxRequestSize(), Optional.empty());
}

@Test
void testOptimalRequestSize() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();
assertEquals(buffer.getOptimalRequestSize(), Optional.empty());
}

@Test
void testShutdown() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import org.hamcrest.CoreMatchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

class HandleFailedEventsOptionTest {
@ParameterizedTest
@EnumSource(HandleFailedEventsOption.class)
void fromOptionValue(final HandleFailedEventsOption option) {
assertThat(HandleFailedEventsOption.fromOptionValue(option.name()), CoreMatchers.is(option));

if (option == HandleFailedEventsOption.SKIP || option == HandleFailedEventsOption.SKIP_SILENTLY) {
assertThat(option.shouldDropEvent(), equalTo(false));
} else {
assertThat(option.shouldDropEvent(), equalTo(true));
}

if (option == HandleFailedEventsOption.SKIP_SILENTLY || option == HandleFailedEventsOption.DROP_SILENTLY) {
assertThat(option.shouldLog(), equalTo(false));
} else {
assertThat(option.shouldLog(), equalTo(true));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public Optional<Integer> getMaxRequestSize() {
return maxRequestSize.isPresent() ? Optional.of(maxRequestSize.getAsInt()) : Optional.empty();
}

@Override
public Optional<Integer> getOptimalRequestSize() {
OptionalInt optimalRequestSize = allBuffers.stream().filter(b -> b.getOptimalRequestSize().isPresent()).mapToInt(b -> (Integer)b.getOptimalRequestSize().get()).min();
return optimalRequestSize.isPresent() ? Optional.of(optimalRequestSize.getAsInt()) : Optional.empty();
}

@Override
public void shutdown() {
allBuffers.forEach(Buffer::shutdown);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,20 @@ void test_getMaxRequestSize() {
assertThat(multiBufferDecorator.getMaxRequestSize(), equalTo(Optional.empty()));
}

@Test
void test_getOptimalRequestSize() {
when(primaryBuffer.getOptimalRequestSize()).thenReturn(Optional.empty());
when(secondaryBuffer.getOptimalRequestSize()).thenReturn(Optional.empty());

final MultiBufferDecorator multiBufferDecorator = createObjectUnderTest(2);
assertThat(multiBufferDecorator.getOptimalRequestSize(), equalTo(Optional.empty()));
}

private MultiBufferDecorator createObjectUnderTest(final int secondaryBufferCount) {
final List<Buffer> secondaryBuffers = IntStream.range(0, secondaryBufferCount)
.mapToObj(i -> secondaryBuffer)
.collect(Collectors.toList());

return new MultiBufferDecorator(primaryBuffer, secondaryBuffers);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ regexPattern
;

setInitializer
: LBRACE primary (SET_DELIMITER primary)* RBRACE
: LBRACE setMembers RBRACE
;

setMembers
: literal (SPACE* SET_DELIMITER SPACE* literal)*
;

unaryOperator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public Object coercePrimaryTerminalNode(final TerminalNode node, final Event eve
return Float.valueOf(nodeStringValue);
case DataPrepperExpressionParser.Boolean:
return Boolean.valueOf(nodeStringValue);
case DataPrepperExpressionParser.COMMA:
case DataPrepperExpressionParser.SET_DELIMITER:
return nodeType;
case DataPrepperExpressionParser.Null:
return null;
case DataPrepperExpressionParser.DataTypes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.opensearch.dataprepper.expression.antlr.DataPrepperExpressionListener;
import org.opensearch.dataprepper.expression.antlr.DataPrepperExpressionParser;

import java.util.HashSet;
import java.util.Set;
import java.util.Stack;

/**
Expand All @@ -35,13 +37,16 @@ class ParseTreeEvaluatorListener extends DataPrepperExpressionBaseListener {
private final Stack<Integer> operatorSymbolStack;
private final Stack<Object> operandStack;
private final Event event;
private boolean listStart;
private Set<Object> setMembers;

public ParseTreeEvaluatorListener(final OperatorProvider operatorProvider,
final ParseTreeCoercionService coercionService,
final Event event) {
this.coercionService = coercionService;
this.operatorProvider = operatorProvider;
this.event = event;
this.listStart = false;
operatorSymbolStack = new Stack<>();
operandStack = new Stack<>();
}
Expand All @@ -54,6 +59,24 @@ public Object getResult() {
return operandStack.peek();
}

private void validateSetMembers(Set<Object> setMembers) {
int numbers = 0;
int strings = 0;
int booleans = 0;
for (Object member: setMembers) {
if (member instanceof Number) {
numbers++;
} else if (member instanceof String) {
strings++;
} else if (member instanceof Boolean) {
booleans++;
}
}
if (numbers != setMembers.size() && strings != setMembers.size() && booleans != setMembers.size()) {
throw new RuntimeException("All set members should be of same type");
}
}

@Override
public void visitTerminal(TerminalNode node) {
final int nodeType = node.getSymbol().getType();
Expand All @@ -62,12 +85,25 @@ public void visitTerminal(TerminalNode node) {
}
if (operatorProvider.containsOperator(nodeType) || nodeType == DataPrepperExpressionParser.LPAREN) {
operatorSymbolStack.push(nodeType);
} else if (nodeType == DataPrepperExpressionParser.LBRACE) {
listStart = true;
setMembers = new HashSet<>();
} else if (nodeType == DataPrepperExpressionParser.RBRACE) {
listStart = false;
validateSetMembers(setMembers);
operandStack.push(setMembers);
} else if (nodeType == DataPrepperExpressionParser.RPAREN) {
// pop LPAREN at operatorSymbolStack top
operatorSymbolStack.pop();
} else {
final Object arg = coercionService.coercePrimaryTerminalNode(node, event);
operandStack.push(arg);
if (listStart) {
if (!(arg instanceof Integer) || (((int)arg) != DataPrepperExpressionParser.COMMA && ((int)arg) != DataPrepperExpressionParser.SET_DELIMITER)) {
setMembers.add(arg);
}
} else {
operandStack.push(arg);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.opensearch.dataprepper.expression;

import org.opensearch.dataprepper.model.event.Event;

import javax.inject.Named;
import java.util.List;
import java.util.function.Function;

@Named
public class StartsWithExpressionFunction implements ExpressionFunction {
private static final int NUMBER_OF_ARGS = 2;

static final String STARTS_WITH_FUNCTION_NAME = "startsWith";
@Override
public String getFunctionName() {
return STARTS_WITH_FUNCTION_NAME;
}

@Override
public Object evaluate(
final List<Object> args,
final Event event,
final Function<Object, Object> convertLiteralType) {

if (args.size() != NUMBER_OF_ARGS) {
throw new RuntimeException("startsWith() takes exactly two arguments");
}

String[] strArgs = new String[NUMBER_OF_ARGS];
for (int i = 0; i < NUMBER_OF_ARGS; i++) {
Object arg = args.get(i);
if (!(arg instanceof String)) {
throw new RuntimeException(String.format("startsWith() takes only string type arguments. \"%s\" is not of type string", arg));
}
String stringOrKey = (String) arg;
if (stringOrKey.charAt(0) == '"') {
strArgs[i] = stringOrKey.substring(1, stringOrKey.length()-1);
} else if (stringOrKey.charAt(0) == '/') {
Object obj = event.get(stringOrKey, Object.class);
if (obj == null) {
return false;
}
if (!(obj instanceof String)) {
throw new RuntimeException(String.format("startsWith() only operates on string types. The value at \"%s\" is \"%s\" which is not a string type.", stringOrKey, obj));
}
strArgs[i] = (String)obj;
} else {
throw new RuntimeException(String.format("Arguments to startsWith() must be a literal string or a Json Pointer. \"%s\" is not string literal or json pointer", stringOrKey));
}
}
return strArgs[0].startsWith(strArgs[1]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public ContainsExpressionFunction createObjectUnderTest() {
}

@Test
void testContainsBasic() {
containsExpressionFunction = createObjectUnderTest();
void testContainsBasic() {containsExpressionFunction = createObjectUnderTest();
assertThat(containsExpressionFunction.evaluate(List.of("\"abcde\"", "\"abcd\""), testEvent, testFunction), equalTo(true));
assertThat(containsExpressionFunction.evaluate(List.of("/"+testKey, "/"+testKey2), testEvent, testFunction), equalTo(true));
assertThat(containsExpressionFunction.evaluate(List.of("\""+testValue+"\"", "/"+testKey2), testEvent, testFunction), equalTo(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -216,11 +217,28 @@ private static Stream<Arguments> validExpressionArguments() {
arguments("/sourceIp == null", event("{\"sourceIp\": [\"test\", \"test_two\"]}"), false),
arguments("/sourceIp == null", event("{\"sourceIp\": {\"test\": \"test_two\"}}"), false),
arguments("/sourceIp != null", event("{\"sourceIp\": {\"test\": \"test_two\"}}"), true),
arguments("/value in {200.222, 300.333, 400}", event("{\"value\": 400}"), true),
arguments("/value in {200.222, 300.333, 400}", event("{\"value\": 400.222}"), false),
arguments("/value not in {200.222, 300.333, 400}", event("{\"value\": 400}"), false),
arguments("/value not in {200.222, 300.333, 400}", event("{\"value\": 800.222}"), true),
arguments("/color in {\"blue\", \"red\", \"yellow\", \"green\"}", event("{\"color\": \"yellow\"}"), true),
arguments("/color in {\"blue\", \"red\", \"yellow\", \"green\"}", event("{\"color\": \"gray\"}"), false),
arguments("/color not in {\"blue\", \"red\", \"yellow\", \"green\"}", event("{\"color\": \"gray\"}"), true),
arguments("/color not in {\"blue\", \"red\", \"yellow\", \"green\"}", event("{\"color\": \"blue\"}"), false),
arguments("/color in {\"blue\", \"\", \"red\", \"yellow\", \"green\"}", event("{\"color\": \"\"}"), true),
arguments("/status_code in {200 , 300}", event("{\"status_code\": 200}"), true),
arguments("/status_code in {2 , 3}", event("{\"status_code\": 2}"), true),
arguments("/status_code not in {200 , 300}", event("{\"status_code\": 400}"), true),
arguments("/status_code in {200 , 300}", event("{\"status_code\": 500}"), false),
arguments("/flag in {true , false}", event("{\"flag\": false}"), true),
arguments("/flag in {true , false}", event("{\"flag\": true}"), true),
arguments("/name =~ \".*dataprepper-[0-9]+\"", event("{\"name\": \"dataprepper-0\"}"), true),
arguments("/name =~ \".*dataprepper-[0-9]+\"", event("{\"name\": \"dataprepper-212\"}"), true),
arguments("/name =~ \".*dataprepper-[0-9]+\"", event("{\"name\": \"dataprepper-abc\"}"), false),
arguments("/name =~ \".*dataprepper-[0-9]+\"", event("{\"other\": \"dataprepper-abc\"}"), false)
);
arguments("/name =~ \".*dataprepper-[0-9]+\"", event("{\"other\": \"dataprepper-abc\"}"), false),
arguments("startsWith(\""+strValue+ UUID.randomUUID() + "\",/status)", event("{\"status\":\""+strValue+"\"}"), true),
arguments("startsWith(\""+ UUID.randomUUID() +strValue+ "\",/status)", event("{\"status\":\""+strValue+"\"}"), false)
);
}

private static Stream<Arguments> invalidExpressionArguments() {
Expand Down Expand Up @@ -273,6 +291,18 @@ private static Stream<Arguments> invalidExpressionArguments() {
arguments("contains(1234, /strField)", event("{\"intField\":1234,\"strField\":\"string\"}")),
arguments("contains(str, /strField)", event("{\"intField\":1234,\"strField\":\"string\"}")),
arguments("contains(/strField, 1234)", event("{\"intField\":1234,\"strField\":\"string\"}")),
arguments("/color in {\"blue\", 222.0, \"yellow\", \"green\"}", event("{\"color\": \"yellow\"}")),
arguments("/color in {\"blue, \"yellow\", \"green\"}", event("{\"color\": \"yellow\"}")),
arguments("/color in {\"blue\", yellow\", \"green\"}", event("{\"color\": \"yellow\"}")),
arguments("/color in {\", \"yellow\", \"green\"}", event("{\"color\": \"yellow\"}")),
arguments("/color in { \", \"yellow\", \"green\"}", event("{\"color\": \"yellow\"}")),
arguments("/color in {, \"yellow\", \"green\"}", event("{\"color\": \"yellow\"}")),
arguments("/color in { , \"yellow\", \"green\"}", event("{\"color\": \"yellow\"}")),
arguments("/color in {blue, \"yellow\", \"green\"}", event("{\"color\": \"yellow\"}")),
arguments("/color in { blue, \"yellow\", \"green\"}", event("{\"color\": \"yellow\"}")),
arguments("/color in {\"\",blue, \"yellow\", \"green\"}", event("{\"color\": \"yellow\"}")),
arguments("/value in {22a2.0, 100}", event("{\"value\": 100}")),
arguments("/value in {222, 10a0}", event("{\"value\": 100}")),
arguments("getMetadata(10)", tagEvent),
arguments("getMetadata("+ testMetadataKey+ ")", tagEvent),
arguments("getMetadata(\""+ testMetadataKey+")", tagEvent),
Expand Down
Loading

0 comments on commit f31693d

Please sign in to comment.