Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add creation and aggregation of dynamic S3 groups based on events #4346

Merged
merged 4 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.opensearch.dataprepper.model.event.Event;

import java.util.List;

/**
* @since 1.3
* ExpressionEvaluator interface to abstract the parse and evaluate implementations.
Expand Down Expand Up @@ -36,4 +38,8 @@ default Boolean evaluateConditional(final String statement, final Event context)
Boolean isValidExpressionStatement(final String statement);

Boolean isValidFormatExpression(final String format);

List<String> extractDynamicKeysFromFormatExpression(final String format);

List<String> extractDynamicExpressionsFromFormatExpression(final String format);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import javax.inject.Inject;
import javax.inject.Named;
import java.util.ArrayList;
import java.util.List;

/**
* Public class that {@link org.opensearch.dataprepper.model.processor.Processor},
Expand Down Expand Up @@ -74,4 +76,57 @@ public Boolean isValidFormatExpression(final String format) {
}
return true;
}

@Override
public List<String> extractDynamicKeysFromFormatExpression(final String format) {
final List<String> formatExpressionKeys = new ArrayList<>();

if (format == null) {
return formatExpressionKeys;
}

int fromIndex = 0;
int position = 0;
while ((position = format.indexOf("${", fromIndex)) != -1) {
int endPosition = format.indexOf("}", position + 1);
if (endPosition == -1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a user error in dynamic expression. Looks like we are ignoring any errors ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not responsible for catching invalid expressions, it will just parse out the valid ones. If you think it should throw, then it can here but it's job is not to validate, just to parse and return

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add some unit tests for validating the expression on startup of the S3 sink in the next PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a test case to cover validation on startup for s3 sink constructor as discussed offline.

return formatExpressionKeys;
}
String name = format.substring(position + 2, endPosition);

if (JacksonEvent.isValidEventKey(name)) {
if (!name.startsWith("/")) {
name = "/" + name;
}
formatExpressionKeys.add(name);
}
fromIndex = endPosition + 1;
}
return formatExpressionKeys;
}

@Override
public List<String> extractDynamicExpressionsFromFormatExpression(final String format) {
final List<String> dynamicExpressionStatements = new ArrayList<>();

if (format == null) {
return dynamicExpressionStatements;
}

int fromIndex = 0;
int position = 0;
while ((position = format.indexOf("${", fromIndex)) != -1) {
int endPosition = format.indexOf("}", position + 1);
if (endPosition == -1) {
return dynamicExpressionStatements;
}
String name = format.substring(position + 2, endPosition);

if (!JacksonEvent.isValidEventKey(name) && isValidExpressionStatement(name)) {
dynamicExpressionStatements.add(name);
}
fromIndex = endPosition + 1;
}
return dynamicExpressionStatements;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,33 @@
import org.antlr.v4.runtime.tree.ParseTree;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;

import java.util.Calendar;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -142,5 +154,43 @@ void isValidFormatExpressionsReturnsFalseWhenIsValidKeyAndValidExpressionIsFalse
assertThat(statementEvaluator.isValidFormatExpression(format), equalTo(false));
}

@ParameterizedTest
@ArgumentsSource(FormatExpressionsToExtractedDynamicKeysArgumentProvider.class)
void extractDynamicKeysFromFormatExpression_returns_expected_result(final String formatExpression, final List<String> expectedDynamicKeys) {
final List<String> result = statementEvaluator.extractDynamicKeysFromFormatExpression(formatExpression);

assertThat(result, notNullValue());
assertThat(result.equals(expectedDynamicKeys), equalTo(true));
}

@ParameterizedTest
@ArgumentsSource(FormatExpressionsToExtractedDynamicExpressionsArgumentProvider.class)
void extractDynamicExpressionsFromFormatExpression_returns_expected_result(final String formatExpression, final List<String> expectedDynamicExpressions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a negative test where invalid expression is passed ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a negative case where invalid expression syntax is found, it will just not be added to the list. This method is just supposed to parse out valid existing expressions

final List<String> result = statementEvaluator.extractDynamicExpressionsFromFormatExpression(formatExpression);

assertThat(result, notNullValue());
assertThat(result.equals(expectedDynamicExpressions), equalTo(true));
}

static class FormatExpressionsToExtractedDynamicKeysArgumentProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(
arguments("test-${foo}-${bar}", List.of("/foo", "/bar")),
arguments("test-${getMetadata(\"key\"}-${/test}", List.of("/test")),
arguments("test-format", List.of())
);
}
}

static class FormatExpressionsToExtractedDynamicExpressionsArgumentProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(
arguments("test-${foo}-${bar}", List.of()),
arguments("test-${getMetadata(\"key\")}-${/test}", List.of("getMetadata(\"key\")"))
);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class S3ObjectIndexUtility {
// For a string like "data-prepper-%{yyyy-MM}", "yyyy-MM" is matched.
private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\%\\{(.*?)\\}";

private static final Pattern DATE_TIME_PATTERN = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION);

private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID());

S3ObjectIndexUtility() {
Expand Down Expand Up @@ -76,8 +78,7 @@ public static long getTimeNanos() {
* @return returns date time formatter
*/
public static DateTimeFormatter validateAndGetDateTimeFormatter(final String indexAlias) {
final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION);
final Matcher timePatternMatcher = pattern.matcher(indexAlias);
final Matcher timePatternMatcher = DATE_TIME_PATTERN.matcher(indexAlias);
if (timePatternMatcher.find()) {
final String timePattern = timePatternMatcher.group(1);
if (timePatternMatcher.find()) { // check if there is a one more match.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,25 @@ public void validate(int expectedRecords, final List<Map<String, Object>> sample
assertThat(sampledData, equalTo(sampleEventData.size()));
}

public void validateDynamicPartition(int expectedRecords, int partitionNumber, final File actualContentFile, final CompressionScenario compressionScenario) throws IOException {
final InputStream inputStream = new BufferedInputStream(new FileInputStream(actualContentFile), 64 * 1024);

final Scanner scanner = new Scanner(inputStream);

int count = 0;
while (scanner.hasNext()) {

final String actualJsonString = scanner.next();

final Map<String, Object> actualData = OBJECT_MAPPER.readValue(actualJsonString, Map.class);
assertThat(actualData.get("sequence"), equalTo(partitionNumber));

count++;
}

assertThat(count, equalTo(expectedRecords));
}

@Override
public String toString() {
return "NDJSON";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -26,7 +27,6 @@
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.SinkContext;
Expand All @@ -45,6 +45,7 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add an integration test that verifies the key path is dynamic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way for me to create an actual GenericExpressionEvaluator for the integration tests? Or can I verify this while mocking ExpressionEvaluator?

import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;

import java.io.File;
import java.io.FileInputStream;
Expand All @@ -71,6 +72,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -93,10 +95,13 @@ public class S3SinkIT {
@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;

@Mock
@Mock(stubOnly = true)
private ThresholdOptions thresholdOptions;
@Mock
@Mock(stubOnly = true)
private ObjectKeyOptions objectKeyOptions;

@Mock(stubOnly = true)
private ExpressionEvaluator expressionEvaluator;
private String s3region;
private String bucketName;
private S3Client s3Client;
Expand All @@ -117,6 +122,8 @@ static void setUpAll() {
for (int i = 0; i < totalRandomStrings; i++) {
reusableRandomStrings.add(UUID.randomUUID().toString());
}


}

@BeforeEach
Expand Down Expand Up @@ -154,10 +161,12 @@ void setUp() {
.credentialsProvider(awsCredentialsProvider)
.region(region)
.build();

when(expressionEvaluator.isValidFormatExpression(anyString())).thenReturn(true);
}

private S3Sink createObjectUnderTest() {
return new S3Sink(pluginSetting, s3SinkConfig, pluginFactory, sinkContext, awsCredentialsSupplier);
return new S3Sink(pluginSetting, s3SinkConfig, pluginFactory, sinkContext, awsCredentialsSupplier, expressionEvaluator);
}

@ParameterizedTest
Expand All @@ -180,6 +189,11 @@ void test(final OutputScenario outputScenario,
int expectedTotalSize = sizeCombination.getTotalSize();
when(thresholdOptions.getEventCount()).thenReturn(expectedTotalSize);

when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(anyString()))
.thenReturn(Collections.emptyList());
when(expressionEvaluator.extractDynamicKeysFromFormatExpression(anyString()))
.thenReturn(Collections.emptyList());

final S3Sink objectUnderTest = createObjectUnderTest();

final int maxEventDataToSample = 2000;
Expand Down Expand Up @@ -228,6 +242,88 @@ void test(final OutputScenario outputScenario,
outputScenario.validate(expectedTotalSize, sampleEventData, actualContentFile, compressionScenario);
}

@Test
void testWithDynamicGroups() throws IOException {
final BufferScenario bufferScenario = new InMemoryBufferScenario();
final CompressionScenario compressionScenario = new NoneCompressionScenario();
final NdjsonOutputScenario outputScenario = new NdjsonOutputScenario();
final SizeCombination sizeCombination = SizeCombination.MEDIUM_SMALLER;

BufferTypeOptions bufferTypeOptions = bufferScenario.getBufferType();
String testRun = "grouping-" + outputScenario + "-" + bufferTypeOptions + "-" + compressionScenario + "-" + sizeCombination.getBatchSize() + "-" + sizeCombination.getNumberOfBatches();

final String staticPrefix = "s3-sink-grouping-integration-test/" + UUID.randomUUID() + "/";
final String pathPrefix = staticPrefix + "folder-${/sequence}/";
final List<String> dynamicKeys = new ArrayList<>();
dynamicKeys.add("/sequence");

when(objectKeyOptions.getPathPrefix()).thenReturn(pathPrefix);

when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(objectKeyOptions.getPathPrefix()))
.thenReturn(Collections.emptyList());
when(expressionEvaluator.extractDynamicKeysFromFormatExpression(objectKeyOptions.getPathPrefix()))
.thenReturn(dynamicKeys);
when(expressionEvaluator.extractDynamicKeysFromFormatExpression(objectKeyOptions.getNamePattern()))
.thenReturn(Collections.emptyList());
when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(objectKeyOptions.getNamePattern()))
.thenReturn(Collections.emptyList());

when(pluginFactory.loadPlugin(eq(OutputCodec.class), any())).thenReturn(outputScenario.getCodec());
when(s3SinkConfig.getBufferType()).thenReturn(bufferTypeOptions);
when(s3SinkConfig.getCompression()).thenReturn(compressionScenario.getCompressionOption());
int expectedTotalSize = sizeCombination.getTotalSize();
when(thresholdOptions.getEventCount()).thenReturn(expectedTotalSize / sizeCombination.getBatchSize());

final S3Sink objectUnderTest = createObjectUnderTest();

final int maxEventDataToSample = 2000;
final List<Map<String, Object>> sampleEventData = new ArrayList<>(maxEventDataToSample);
for (int batchNumber = 0; batchNumber < sizeCombination.getNumberOfBatches(); batchNumber++) {
final int currentBatchNumber = batchNumber;
final List<Record<Event>> events = IntStream.range(0, sizeCombination.getBatchSize())
.mapToObj(this::generateEventData)
.peek(data -> {
if (sampleEventData.size() < maxEventDataToSample)
sampleEventData.add(data);
})
.map(this::generateTestEvent)
.map(Record::new)
.collect(Collectors.toList());

LOG.debug("Writing dynamic batch {} with size {}.", currentBatchNumber, events.size());
objectUnderTest.doOutput(events);
}

for (int folderNumber = 0; folderNumber < 100; folderNumber++) {
LOG.info("Listing S3 path prefix: {}", staticPrefix + "folder-" + folderNumber + "/");

final ListObjectsV2Response listObjectsResponse = s3Client.listObjectsV2(ListObjectsV2Request.builder()
.bucket(bucketName)
.prefix(staticPrefix + "folder-" + folderNumber + "/")
.build());

assertThat(listObjectsResponse.contents(), notNullValue());
assertThat(listObjectsResponse.contents().size(), equalTo(1));

final S3Object s3Object = listObjectsResponse.contents().get(0);

final File target = new File(s3FileLocation, "folder-" + folderNumber + ".original");

LOG.info("Downloading S3 object to local file {}.", target);

GetObjectRequest getObjectRequest = GetObjectRequest.builder()
.bucket(bucketName)
.key(s3Object.key())
.build();
s3Client.getObject(getObjectRequest, target.toPath());

File actualContentFile = decompressFileIfNecessary(outputScenario, compressionScenario, testRun, target);

LOG.info("Validating output. totalSize={}; sampleDataSize={}", expectedTotalSize, sampleEventData.size());
outputScenario.validateDynamicPartition(expectedTotalSize / sizeCombination.getBatchSize(), folderNumber, actualContentFile, compressionScenario);
}
}

private File decompressFileIfNecessary(OutputScenario outputScenario, CompressionScenario compressionScenario, String pathPrefix, File target) throws IOException {

if (outputScenario.isCompressionInternal() || !compressionScenario.requiresDecompression())
Expand All @@ -245,7 +341,6 @@ private Event generateTestEvent(final Map<String, Object> eventData) {
final EventMetadata defaultEventMetadata = DefaultEventMetadata.builder()
.withEventType(EventType.LOG.toString())
.build();
final JacksonEvent event = JacksonLog.builder().withData(eventData).withEventMetadata(defaultEventMetadata).build();
return JacksonEvent.builder()
.withData(eventData)
.withEventMetadata(defaultEventMetadata)
Expand Down
Loading
Loading