Skip to content

Commit

Permalink
Add creation and aggregation of dynamic S3 groups based on events (#4346
Browse files Browse the repository at this point in the history
)

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 authored Apr 1, 2024
1 parent 2d84ba9 commit 1a7e099
Show file tree
Hide file tree
Showing 26 changed files with 1,123 additions and 232 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.opensearch.dataprepper.model.event.Event;

import java.util.List;

/**
* @since 1.3
* ExpressionEvaluator interface to abstract the parse and evaluate implementations.
Expand Down Expand Up @@ -36,4 +38,8 @@ default Boolean evaluateConditional(final String statement, final Event context)
Boolean isValidExpressionStatement(final String statement);

Boolean isValidFormatExpression(final String format);

List<String> extractDynamicKeysFromFormatExpression(final String format);

List<String> extractDynamicExpressionsFromFormatExpression(final String format);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@

package org.opensearch.dataprepper.expression;

import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.hamcrest.CoreMatchers.equalTo;

public class ExpressionEvaluatorTest {
private ExpressionEvaluator expressionEvaluator;
Expand All @@ -28,6 +32,16 @@ public Boolean isValidExpressionStatement(final String statement) {
public Boolean isValidFormatExpression(String format) {
return true;
}

@Override
public List<String> extractDynamicKeysFromFormatExpression(String format) {
return Collections.emptyList();
}

@Override
public List<String> extractDynamicExpressionsFromFormatExpression(String format) {
return Collections.emptyList();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import javax.inject.Inject;
import javax.inject.Named;
import java.util.ArrayList;
import java.util.List;

/**
* Public class that {@link org.opensearch.dataprepper.model.processor.Processor},
Expand Down Expand Up @@ -74,4 +76,57 @@ public Boolean isValidFormatExpression(final String format) {
}
return true;
}

@Override
public List<String> extractDynamicKeysFromFormatExpression(final String format) {
final List<String> formatExpressionKeys = new ArrayList<>();

if (format == null) {
return formatExpressionKeys;
}

int fromIndex = 0;
int position = 0;
while ((position = format.indexOf("${", fromIndex)) != -1) {
int endPosition = format.indexOf("}", position + 1);
if (endPosition == -1) {
return formatExpressionKeys;
}
String name = format.substring(position + 2, endPosition);

if (JacksonEvent.isValidEventKey(name)) {
if (!name.startsWith("/")) {
name = "/" + name;
}
formatExpressionKeys.add(name);
}
fromIndex = endPosition + 1;
}
return formatExpressionKeys;
}

@Override
public List<String> extractDynamicExpressionsFromFormatExpression(final String format) {
final List<String> dynamicExpressionStatements = new ArrayList<>();

if (format == null) {
return dynamicExpressionStatements;
}

int fromIndex = 0;
int position = 0;
while ((position = format.indexOf("${", fromIndex)) != -1) {
int endPosition = format.indexOf("}", position + 1);
if (endPosition == -1) {
return dynamicExpressionStatements;
}
String name = format.substring(position + 2, endPosition);

if (!JacksonEvent.isValidEventKey(name) && isValidExpressionStatement(name)) {
dynamicExpressionStatements.add(name);
}
fromIndex = endPosition + 1;
}
return dynamicExpressionStatements;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,29 @@
import org.antlr.v4.runtime.tree.ParseTree;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;

import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -142,5 +150,47 @@ void isValidFormatExpressionsReturnsFalseWhenIsValidKeyAndValidExpressionIsFalse
assertThat(statementEvaluator.isValidFormatExpression(format), equalTo(false));
}

@ParameterizedTest
@ArgumentsSource(FormatExpressionsToExtractedDynamicKeysArgumentProvider.class)
void extractDynamicKeysFromFormatExpression_returns_expected_result(final String formatExpression, final List<String> expectedDynamicKeys) {
final List<String> result = statementEvaluator.extractDynamicKeysFromFormatExpression(formatExpression);

assertThat(result, notNullValue());
assertThat(result.equals(expectedDynamicKeys), equalTo(true));
}

@ParameterizedTest
@ArgumentsSource(FormatExpressionsToExtractedDynamicExpressionsArgumentProvider.class)
void extractDynamicExpressionsFromFormatExpression_returns_expected_result(final String formatExpression, final List<String> expectedDynamicExpressions) {
final List<String> result = statementEvaluator.extractDynamicExpressionsFromFormatExpression(formatExpression);

assertThat(result, notNullValue());
assertThat(result.equals(expectedDynamicExpressions), equalTo(true));
}

static class FormatExpressionsToExtractedDynamicKeysArgumentProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(
arguments("test-${foo}-${bar}", List.of("/foo", "/bar")),
arguments("test-${getMetadata(\"key\"}-${/test}", List.of("/test")),
arguments("test-format", List.of()),
arguments("test-${/test", List.of()),
arguments(null, List.of())
);
}
}

static class FormatExpressionsToExtractedDynamicExpressionsArgumentProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(
arguments("test-${foo}-${bar}", List.of()),
arguments("test-${getMetadata(\"key\")}-${/test}", List.of("getMetadata(\"key\")")),
arguments("test-${/test", List.of()),
arguments(null, List.of())
);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class S3ObjectIndexUtility {
// For a string like "data-prepper-%{yyyy-MM}", "yyyy-MM" is matched.
private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\%\\{(.*?)\\}";

private static final Pattern DATE_TIME_PATTERN = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION);

private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID());

S3ObjectIndexUtility() {
Expand Down Expand Up @@ -76,8 +78,7 @@ public static long getTimeNanos() {
* @return returns date time formatter
*/
public static DateTimeFormatter validateAndGetDateTimeFormatter(final String indexAlias) {
final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION);
final Matcher timePatternMatcher = pattern.matcher(indexAlias);
final Matcher timePatternMatcher = DATE_TIME_PATTERN.matcher(indexAlias);
if (timePatternMatcher.find()) {
final String timePattern = timePatternMatcher.group(1);
if (timePatternMatcher.find()) { // check if there is a one more match.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,25 @@ public void validate(int expectedRecords, final List<Map<String, Object>> sample
assertThat(sampledData, equalTo(sampleEventData.size()));
}

public void validateDynamicPartition(int expectedRecords, int partitionNumber, final File actualContentFile, final CompressionScenario compressionScenario) throws IOException {
final InputStream inputStream = new BufferedInputStream(new FileInputStream(actualContentFile), 64 * 1024);

final Scanner scanner = new Scanner(inputStream);

int count = 0;
while (scanner.hasNext()) {

final String actualJsonString = scanner.next();

final Map<String, Object> actualData = OBJECT_MAPPER.readValue(actualJsonString, Map.class);
assertThat(actualData.get("sequence"), equalTo(partitionNumber));

count++;
}

assertThat(count, equalTo(expectedRecords));
}

@Override
public String toString() {
return "NDJSON";
Expand Down
Loading

0 comments on commit 1a7e099

Please sign in to comment.