From 1a7e09965cd1581967dd0b7b39fa0a232ec846e6 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Mon, 1 Apr 2024 17:43:12 -0500 Subject: [PATCH] Add creation and aggregation of dynamic S3 groups based on events (#4346) Signed-off-by: Taylor Gray --- .../expression/ExpressionEvaluator.java | 6 + .../expression/ExpressionEvaluatorTest.java | 18 +- .../GenericExpressionEvaluator.java | 55 ++++ .../GenericExpressionEvaluatorTest.java | 50 ++++ .../s3keyindex/S3ObjectIndexUtility.java | 5 +- .../plugins/sink/s3/NdjsonOutputScenario.java | 19 ++ .../dataprepper/plugins/sink/s3/S3SinkIT.java | 105 +++++++- .../plugins/sink/s3/S3SinkServiceIT.java | 24 +- .../plugins/sink/s3/KeyGenerator.java | 15 +- .../dataprepper/plugins/sink/s3/S3Sink.java | 26 +- .../plugins/sink/s3/S3SinkConfig.java | 2 +- .../plugins/sink/s3/S3SinkService.java | 86 ++++--- .../sink/s3/accumulator/InMemoryBuffer.java | 4 +- .../sink/s3/accumulator/ObjectKey.java | 34 ++- .../plugins/sink/s3/grouping/S3Group.java | 50 ++++ .../sink/s3/grouping/S3GroupIdentifier.java | 37 +++ .../s3/grouping/S3GroupIdentifierFactory.java | 61 +++++ .../sink/s3/grouping/S3GroupManager.java | 69 +++++ .../plugins/sink/s3/KeyGeneratorTest.java | 89 ++++--- .../plugins/sink/s3/S3SinkServiceTest.java | 241 ++++++++++-------- .../plugins/sink/s3/S3SinkTest.java | 8 +- .../sink/s3/accumulator/ObjectKeyTest.java | 45 ++-- .../S3GroupIdentifierFactoryTest.java | 106 ++++++++ .../s3/grouping/S3GroupIdentifierTest.java | 46 ++++ .../sink/s3/grouping/S3GroupManagerTest.java | 119 +++++++++ .../plugins/sink/s3/grouping/S3GroupTest.java | 35 +++ 26 files changed, 1123 insertions(+), 232 deletions(-) create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3Group.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactoryTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupTest.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java index 7dc5930816..084bf50452 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java @@ -7,6 +7,8 @@ import org.opensearch.dataprepper.model.event.Event; +import java.util.List; + /** * @since 1.3 * ExpressionEvaluator interface to abstract the parse and evaluate implementations. @@ -36,4 +38,8 @@ default Boolean evaluateConditional(final String statement, final Event context) Boolean isValidExpressionStatement(final String statement); Boolean isValidFormatExpression(final String format); + + List extractDynamicKeysFromFormatExpression(final String format); + + List extractDynamicExpressionsFromFormatExpression(final String format); } \ No newline at end of file diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java index 9b76fbc807..bd33525db3 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java @@ -5,12 +5,16 @@ package org.opensearch.dataprepper.expression; +import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.hamcrest.CoreMatchers.equalTo; public class ExpressionEvaluatorTest { private ExpressionEvaluator expressionEvaluator; @@ -28,6 +32,16 @@ public Boolean isValidExpressionStatement(final String statement) { public Boolean isValidFormatExpression(String format) { return true; } + + @Override + public List extractDynamicKeysFromFormatExpression(String format) { + return Collections.emptyList(); + } + + @Override + public List extractDynamicExpressionsFromFormatExpression(String format) { + return Collections.emptyList(); + } } @Test diff --git a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java index 6653e5c7b4..791d954c23 100644 --- a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java +++ b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java @@ -11,6 +11,8 @@ import javax.inject.Inject; import javax.inject.Named; +import java.util.ArrayList; +import java.util.List; /** * Public class that {@link org.opensearch.dataprepper.model.processor.Processor}, @@ -74,4 +76,57 @@ public Boolean isValidFormatExpression(final String format) { } return true; } + + @Override + public List extractDynamicKeysFromFormatExpression(final String format) { + final List formatExpressionKeys = new ArrayList<>(); + + if (format == null) { + return formatExpressionKeys; + } + + int fromIndex = 0; + int position = 0; + while ((position = format.indexOf("${", fromIndex)) != -1) { + int endPosition = format.indexOf("}", position + 1); + if (endPosition == -1) { + return formatExpressionKeys; + } + String name = format.substring(position + 2, endPosition); + + if (JacksonEvent.isValidEventKey(name)) { + if (!name.startsWith("/")) { + name = "/" + name; + } + formatExpressionKeys.add(name); + } + fromIndex = endPosition + 1; + } + return formatExpressionKeys; + } + + @Override + public List extractDynamicExpressionsFromFormatExpression(final String format) { + final List dynamicExpressionStatements = new ArrayList<>(); + + if (format == null) { + return dynamicExpressionStatements; + } + + int fromIndex = 0; + int position = 0; + while ((position = format.indexOf("${", fromIndex)) != -1) { + int endPosition = format.indexOf("}", position + 1); + if (endPosition == -1) { + return dynamicExpressionStatements; + } + String name = format.substring(position + 2, endPosition); + + if (!JacksonEvent.isValidEventKey(name) && isValidExpressionStatement(name)) { + dynamicExpressionStatements.add(name); + } + fromIndex = endPosition + 1; + } + return dynamicExpressionStatements; + } } diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java index a91e7fe368..b5685b8ed4 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java @@ -8,7 +8,11 @@ import org.antlr.v4.runtime.tree.ParseTree; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.InjectMocks; @@ -16,13 +20,17 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.event.Event; +import java.util.List; import java.util.Random; import java.util.UUID; +import java.util.stream.Stream; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -142,5 +150,47 @@ void isValidFormatExpressionsReturnsFalseWhenIsValidKeyAndValidExpressionIsFalse assertThat(statementEvaluator.isValidFormatExpression(format), equalTo(false)); } + @ParameterizedTest + @ArgumentsSource(FormatExpressionsToExtractedDynamicKeysArgumentProvider.class) + void extractDynamicKeysFromFormatExpression_returns_expected_result(final String formatExpression, final List expectedDynamicKeys) { + final List result = statementEvaluator.extractDynamicKeysFromFormatExpression(formatExpression); + + assertThat(result, notNullValue()); + assertThat(result.equals(expectedDynamicKeys), equalTo(true)); + } + + @ParameterizedTest + @ArgumentsSource(FormatExpressionsToExtractedDynamicExpressionsArgumentProvider.class) + void extractDynamicExpressionsFromFormatExpression_returns_expected_result(final String formatExpression, final List expectedDynamicExpressions) { + final List result = statementEvaluator.extractDynamicExpressionsFromFormatExpression(formatExpression); + + assertThat(result, notNullValue()); + assertThat(result.equals(expectedDynamicExpressions), equalTo(true)); + } + + static class FormatExpressionsToExtractedDynamicKeysArgumentProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments("test-${foo}-${bar}", List.of("/foo", "/bar")), + arguments("test-${getMetadata(\"key\"}-${/test}", List.of("/test")), + arguments("test-format", List.of()), + arguments("test-${/test", List.of()), + arguments(null, List.of()) + ); + } + } + + static class FormatExpressionsToExtractedDynamicExpressionsArgumentProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments("test-${foo}-${bar}", List.of()), + arguments("test-${getMetadata(\"key\")}-${/test}", List.of("getMetadata(\"key\")")), + arguments("test-${/test", List.of()), + arguments(null, List.of()) + ); + } + } } diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndexUtility.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndexUtility.java index bfcac56130..f08966fbf2 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndexUtility.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/s3keyindex/S3ObjectIndexUtility.java @@ -31,6 +31,8 @@ public class S3ObjectIndexUtility { // For a string like "data-prepper-%{yyyy-MM}", "yyyy-MM" is matched. private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\%\\{(.*?)\\}"; + private static final Pattern DATE_TIME_PATTERN = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION); + private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID()); S3ObjectIndexUtility() { @@ -76,8 +78,7 @@ public static long getTimeNanos() { * @return returns date time formatter */ public static DateTimeFormatter validateAndGetDateTimeFormatter(final String indexAlias) { - final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION); - final Matcher timePatternMatcher = pattern.matcher(indexAlias); + final Matcher timePatternMatcher = DATE_TIME_PATTERN.matcher(indexAlias); if (timePatternMatcher.find()) { final String timePattern = timePatternMatcher.group(1); if (timePatternMatcher.find()) { // check if there is a one more match. diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/NdjsonOutputScenario.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/NdjsonOutputScenario.java index 85ae980540..68643b564a 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/NdjsonOutputScenario.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/NdjsonOutputScenario.java @@ -56,6 +56,25 @@ public void validate(int expectedRecords, final List> sample assertThat(sampledData, equalTo(sampleEventData.size())); } + public void validateDynamicPartition(int expectedRecords, int partitionNumber, final File actualContentFile, final CompressionScenario compressionScenario) throws IOException { + final InputStream inputStream = new BufferedInputStream(new FileInputStream(actualContentFile), 64 * 1024); + + final Scanner scanner = new Scanner(inputStream); + + int count = 0; + while (scanner.hasNext()) { + + final String actualJsonString = scanner.next(); + + final Map actualData = OBJECT_MAPPER.readValue(actualJsonString, Map.class); + assertThat(actualData.get("sequence"), equalTo(partitionNumber)); + + count++; + } + + assertThat(count, equalTo(expectedRecords)); + } + @Override public String toString() { return "NDJSON"; diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java index 8a77e11ada..e889456dfa 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java @@ -8,6 +8,7 @@ import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.io.TempDir; @@ -26,7 +27,6 @@ import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.event.EventType; import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.SinkContext; @@ -45,6 +45,7 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import java.io.File; import java.io.FileInputStream; @@ -71,6 +72,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -93,10 +95,13 @@ public class S3SinkIT { @Mock private AwsCredentialsSupplier awsCredentialsSupplier; - @Mock + @Mock(stubOnly = true) private ThresholdOptions thresholdOptions; - @Mock + @Mock(stubOnly = true) private ObjectKeyOptions objectKeyOptions; + + @Mock(stubOnly = true) + private ExpressionEvaluator expressionEvaluator; private String s3region; private String bucketName; private S3Client s3Client; @@ -117,6 +122,8 @@ static void setUpAll() { for (int i = 0; i < totalRandomStrings; i++) { reusableRandomStrings.add(UUID.randomUUID().toString()); } + + } @BeforeEach @@ -154,10 +161,12 @@ void setUp() { .credentialsProvider(awsCredentialsProvider) .region(region) .build(); + + when(expressionEvaluator.isValidFormatExpression(anyString())).thenReturn(true); } private S3Sink createObjectUnderTest() { - return new S3Sink(pluginSetting, s3SinkConfig, pluginFactory, sinkContext, awsCredentialsSupplier); + return new S3Sink(pluginSetting, s3SinkConfig, pluginFactory, sinkContext, awsCredentialsSupplier, expressionEvaluator); } @ParameterizedTest @@ -180,6 +189,11 @@ void test(final OutputScenario outputScenario, int expectedTotalSize = sizeCombination.getTotalSize(); when(thresholdOptions.getEventCount()).thenReturn(expectedTotalSize); + when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(anyString())) + .thenReturn(Collections.emptyList()); + when(expressionEvaluator.extractDynamicKeysFromFormatExpression(anyString())) + .thenReturn(Collections.emptyList()); + final S3Sink objectUnderTest = createObjectUnderTest(); final int maxEventDataToSample = 2000; @@ -228,6 +242,88 @@ void test(final OutputScenario outputScenario, outputScenario.validate(expectedTotalSize, sampleEventData, actualContentFile, compressionScenario); } + @Test + void testWithDynamicGroups() throws IOException { + final BufferScenario bufferScenario = new InMemoryBufferScenario(); + final CompressionScenario compressionScenario = new NoneCompressionScenario(); + final NdjsonOutputScenario outputScenario = new NdjsonOutputScenario(); + final SizeCombination sizeCombination = SizeCombination.MEDIUM_SMALLER; + + BufferTypeOptions bufferTypeOptions = bufferScenario.getBufferType(); + String testRun = "grouping-" + outputScenario + "-" + bufferTypeOptions + "-" + compressionScenario + "-" + sizeCombination.getBatchSize() + "-" + sizeCombination.getNumberOfBatches(); + + final String staticPrefix = "s3-sink-grouping-integration-test/" + UUID.randomUUID() + "/"; + final String pathPrefix = staticPrefix + "folder-${/sequence}/"; + final List dynamicKeys = new ArrayList<>(); + dynamicKeys.add("/sequence"); + + when(objectKeyOptions.getPathPrefix()).thenReturn(pathPrefix); + + when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(objectKeyOptions.getPathPrefix())) + .thenReturn(Collections.emptyList()); + when(expressionEvaluator.extractDynamicKeysFromFormatExpression(objectKeyOptions.getPathPrefix())) + .thenReturn(dynamicKeys); + when(expressionEvaluator.extractDynamicKeysFromFormatExpression(objectKeyOptions.getNamePattern())) + .thenReturn(Collections.emptyList()); + when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(objectKeyOptions.getNamePattern())) + .thenReturn(Collections.emptyList()); + + when(pluginFactory.loadPlugin(eq(OutputCodec.class), any())).thenReturn(outputScenario.getCodec()); + when(s3SinkConfig.getBufferType()).thenReturn(bufferTypeOptions); + when(s3SinkConfig.getCompression()).thenReturn(compressionScenario.getCompressionOption()); + int expectedTotalSize = sizeCombination.getTotalSize(); + when(thresholdOptions.getEventCount()).thenReturn(expectedTotalSize / sizeCombination.getBatchSize()); + + final S3Sink objectUnderTest = createObjectUnderTest(); + + final int maxEventDataToSample = 2000; + final List> sampleEventData = new ArrayList<>(maxEventDataToSample); + for (int batchNumber = 0; batchNumber < sizeCombination.getNumberOfBatches(); batchNumber++) { + final int currentBatchNumber = batchNumber; + final List> events = IntStream.range(0, sizeCombination.getBatchSize()) + .mapToObj(this::generateEventData) + .peek(data -> { + if (sampleEventData.size() < maxEventDataToSample) + sampleEventData.add(data); + }) + .map(this::generateTestEvent) + .map(Record::new) + .collect(Collectors.toList()); + + LOG.debug("Writing dynamic batch {} with size {}.", currentBatchNumber, events.size()); + objectUnderTest.doOutput(events); + } + + for (int folderNumber = 0; folderNumber < 100; folderNumber++) { + LOG.info("Listing S3 path prefix: {}", staticPrefix + "folder-" + folderNumber + "/"); + + final ListObjectsV2Response listObjectsResponse = s3Client.listObjectsV2(ListObjectsV2Request.builder() + .bucket(bucketName) + .prefix(staticPrefix + "folder-" + folderNumber + "/") + .build()); + + assertThat(listObjectsResponse.contents(), notNullValue()); + assertThat(listObjectsResponse.contents().size(), equalTo(1)); + + final S3Object s3Object = listObjectsResponse.contents().get(0); + + final File target = new File(s3FileLocation, "folder-" + folderNumber + ".original"); + + LOG.info("Downloading S3 object to local file {}.", target); + + GetObjectRequest getObjectRequest = GetObjectRequest.builder() + .bucket(bucketName) + .key(s3Object.key()) + .build(); + s3Client.getObject(getObjectRequest, target.toPath()); + + File actualContentFile = decompressFileIfNecessary(outputScenario, compressionScenario, testRun, target); + + LOG.info("Validating output. totalSize={}; sampleDataSize={}", expectedTotalSize, sampleEventData.size()); + outputScenario.validateDynamicPartition(expectedTotalSize / sizeCombination.getBatchSize(), folderNumber, actualContentFile, compressionScenario); + } + } + private File decompressFileIfNecessary(OutputScenario outputScenario, CompressionScenario compressionScenario, String pathPrefix, File target) throws IOException { if (outputScenario.isCompressionInternal() || !compressionScenario.requiresDecompression()) @@ -245,7 +341,6 @@ private Event generateTestEvent(final Map eventData) { final EventMetadata defaultEventMetadata = DefaultEventMetadata.builder() .withEventType(EventType.LOG.toString()) .build(); - final JacksonEvent event = JacksonLog.builder().withData(eventData).withEventMetadata(defaultEventMetadata).build(); return JacksonEvent.builder() .withData(eventData) .withEventMetadata(defaultEventMetadata) diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java index a5a80aa710..2e606a7487 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java @@ -34,6 +34,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.DefaultEventMetadata; @@ -58,6 +59,8 @@ import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions; +import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupIdentifierFactory; +import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupManager; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; @@ -90,6 +93,7 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.when; @@ -107,6 +111,9 @@ class S3SinkServiceIT { private static final String FILE_SUFFIX = ".parquet"; @Mock private S3SinkConfig s3SinkConfig; + + private S3GroupManager s3GroupManager; + @Mock private ThresholdOptions thresholdOptions; @Mock @@ -126,6 +133,9 @@ class S3SinkServiceIT { @Mock private DistributionSummary s3ObjectSizeSummary; + @Mock + private ExpressionEvaluator expressionEvaluator; + private OutputCodec codec; private KeyGenerator keyGenerator; @@ -157,6 +167,11 @@ public void setUp() { lenient().when(pluginMetrics.counter(S3SinkService.NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED)). thenReturn(numberOfRecordsFailedCounter); lenient().when(pluginMetrics.summary(S3SinkService.S3_OBJECTS_SIZE)).thenReturn(s3ObjectSizeSummary); + + when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(anyString())) + .thenReturn(Collections.emptyList()); + when(expressionEvaluator.extractDynamicKeysFromFormatExpression(anyString())) + .thenReturn(Collections.emptyList()); } @Test @@ -171,7 +186,7 @@ void verify_flushed_object_count_into_s3_bucket() { void configureNewLineCodec() { codec = new NdjsonOutputCodec(ndjsonOutputConfig); - keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE)); + keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE), expressionEvaluator); } @Test @@ -240,7 +255,10 @@ void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOEx private S3SinkService createObjectUnderTest() { OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList()); - return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics); + final S3GroupIdentifierFactory groupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig); + s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, s3Client); + + return new S3SinkService(s3SinkConfig, codec, codecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics, s3GroupManager); } private int gets3ObjectCount() { @@ -352,7 +370,7 @@ private void configureParquetCodec() { parquetOutputCodecConfig = new ParquetOutputCodecConfig(); parquetOutputCodecConfig.setSchema(parseSchema().toString()); codec = new ParquetOutputCodec(parquetOutputCodecConfig); - keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE)); + keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE), expressionEvaluator); } private Collection> getRecordList() { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java index 5281921cee..fe1f90793f 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java @@ -5,15 +5,22 @@ package org.opensearch.dataprepper.plugins.sink.s3; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey; public class KeyGenerator { private final S3SinkConfig s3SinkConfig; private final ExtensionProvider extensionProvider; - public KeyGenerator(S3SinkConfig s3SinkConfig, ExtensionProvider extensionProvider) { + private final ExpressionEvaluator expressionEvaluator; + + public KeyGenerator(final S3SinkConfig s3SinkConfig, + final ExtensionProvider extensionProvider, + final ExpressionEvaluator expressionEvaluator) { this.s3SinkConfig = s3SinkConfig; this.extensionProvider = extensionProvider; + this.expressionEvaluator = expressionEvaluator; } /** @@ -21,9 +28,9 @@ public KeyGenerator(S3SinkConfig s3SinkConfig, ExtensionProvider extensionProvid * * @return object key path. */ - String generateKey() { - final String pathPrefix = ObjectKey.buildingPathPrefix(s3SinkConfig); - final String namePattern = ObjectKey.objectFileName(s3SinkConfig, extensionProvider.getExtension()); + public String generateKeyForEvent(final Event event) { + final String pathPrefix = ObjectKey.buildingPathPrefix(s3SinkConfig, event, expressionEvaluator); + final String namePattern = ObjectKey.objectFileName(s3SinkConfig, extensionProvider.getExtension(), event, expressionEvaluator); return (!pathPrefix.isEmpty()) ? pathPrefix + namePattern : namePattern; } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index 5266cdb36f..73750af754 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.sink.s3; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; @@ -27,6 +28,8 @@ import org.opensearch.dataprepper.plugins.sink.s3.codec.BufferedCodec; import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; +import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupIdentifierFactory; +import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; @@ -42,6 +45,8 @@ public class S3Sink extends AbstractSink> { private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class); + + private static final Duration RETRY_FLUSH_BACKOFF = Duration.ofSeconds(5); private final S3SinkConfig s3SinkConfig; private final OutputCodec codec; private volatile boolean sinkInitialized; @@ -59,7 +64,8 @@ public S3Sink(final PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory, final SinkContext sinkContext, - final AwsCredentialsSupplier awsCredentialsSupplier) { + final AwsCredentialsSupplier awsCredentialsSupplier, + final ExpressionEvaluator expressionEvaluator) { super(pluginSetting); this.s3SinkConfig = s3SinkConfig; this.sinkContext = sinkContext; @@ -82,13 +88,27 @@ public S3Sink(final PluginSetting pluginSetting, bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine, codec); ExtensionProvider extensionProvider = StandardExtensionProvider.create(codec, compressionOption); - KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, extensionProvider); + KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, extensionProvider, expressionEvaluator); + + if (s3SinkConfig.getObjectKeyOptions().getPathPrefix() != null && + !expressionEvaluator.isValidFormatExpression(s3SinkConfig.getObjectKeyOptions().getPathPrefix())) { + throw new InvalidPluginConfigurationException("path_prefix is not a valid format expression"); + } + + if (s3SinkConfig.getObjectKeyOptions().getNamePattern() != null && + !expressionEvaluator.isValidFormatExpression(s3SinkConfig.getObjectKeyOptions().getNamePattern())) { + throw new InvalidPluginConfigurationException("name_pattern is not a valid format expression"); + } S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption); codec.validateAgainstCodecContext(s3OutputCodecContext); - s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3OutputCodecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics); + final S3GroupIdentifierFactory s3GroupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig); + final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, s3Client); + + + s3SinkService = new S3SinkService(s3SinkConfig, codec, s3OutputCodecContext, s3Client, keyGenerator, RETRY_FLUSH_BACKOFF, pluginMetrics, s3GroupManager); } @Override diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java index 6124f20538..91b31ddddc 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java @@ -37,7 +37,7 @@ public class S3SinkConfig { private String bucketName; @JsonProperty("object_key") - private ObjectKeyOptions objectKeyOptions; + private ObjectKeyOptions objectKeyOptions = new ObjectKeyOptions(); @JsonProperty("compression") private CompressionOption compression = CompressionOption.NONE; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index 879854b546..fb04a5da39 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -10,12 +10,12 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; -import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3Group; +import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.exception.AwsServiceException; @@ -26,7 +26,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -45,11 +44,8 @@ public class S3SinkService { static final String S3_OBJECTS_SIZE = "s3SinkObjectSizeBytes"; private final S3SinkConfig s3SinkConfig; private final Lock reentrantLock; - private final BufferFactory bufferFactory; - private final Collection bufferedEventHandles; private final OutputCodec codec; private final S3Client s3Client; - private Buffer currentBuffer; private final int maxEvents; private final ByteCount maxBytes; private final Duration maxCollectionDuration; @@ -64,18 +60,18 @@ public class S3SinkService { private final KeyGenerator keyGenerator; private final Duration retrySleepTime; + private final S3GroupManager s3GroupManager; + /** * @param s3SinkConfig s3 sink related configuration. - * @param bufferFactory factory of buffer. * @param codec parser. * @param s3Client * @param pluginMetrics metrics. */ - public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory bufferFactory, - final OutputCodec codec, final OutputCodecContext codecContext, final S3Client s3Client, final KeyGenerator keyGenerator, - final Duration retrySleepTime, final PluginMetrics pluginMetrics) { + public S3SinkService(final S3SinkConfig s3SinkConfig, final OutputCodec codec, + final OutputCodecContext codecContext, final S3Client s3Client, final KeyGenerator keyGenerator, + final Duration retrySleepTime, final PluginMetrics pluginMetrics, final S3GroupManager s3GroupManager) { this.s3SinkConfig = s3SinkConfig; - this.bufferFactory = bufferFactory; this.codec = codec; this.s3Client = s3Client; this.codecContext = codecContext; @@ -83,8 +79,6 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer this.retrySleepTime = retrySleepTime; reentrantLock = new ReentrantLock(); - bufferedEventHandles = new LinkedList<>(); - maxEvents = s3SinkConfig.getThresholdOptions().getEventCount(); maxBytes = s3SinkConfig.getThresholdOptions().getMaximumSize(); maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut(); @@ -98,7 +92,7 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED); s3ObjectSizeSummary = pluginMetrics.summary(S3_OBJECTS_SIZE); - currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey); + this.s3GroupManager = s3GroupManager; } /** @@ -106,7 +100,7 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer */ void output(Collection> records) { // Don't acquire the lock if there's no work to be done - if (records.isEmpty() && currentBuffer.getEventCount() == 0) { + if (records.isEmpty() && s3GroupManager.hasNoGroups()) { return; } @@ -115,8 +109,10 @@ void output(Collection> records) { reentrantLock.lock(); try { for (Record record : records) { - final Event event = record.getData(); + final S3Group s3Group = s3GroupManager.getOrCreateGroupForEvent(event); + final Buffer currentBuffer = s3Group.getBuffer(); + try { if (currentBuffer.getEventCount() == 0) { codec.start(currentBuffer.getOutputStream(), event, codecContext); @@ -125,8 +121,7 @@ void output(Collection> records) { codec.writeEvent(event, currentBuffer.getOutputStream()); int count = currentBuffer.getEventCount() + 1; currentBuffer.setEventCount(count); - - bufferedEventHandles.add(event.getEventHandle()); + s3Group.addEventHandle(event.getEventHandle()); } catch (Exception ex) { if(sampleException == null) { sampleException = ex; @@ -135,9 +130,20 @@ void output(Collection> records) { failedEvents.add(event); } - flushToS3IfNeeded(); + final boolean flushed = flushToS3IfNeeded(s3Group); + + if (flushed) { + s3GroupManager.removeGroup(s3Group); + } + } + + for (final S3Group s3Group : s3GroupManager.getS3GroupEntries()) { + final boolean flushed = flushToS3IfNeeded(s3Group); + + if (flushed) { + s3GroupManager.removeGroup(s3Group); + } } - flushToS3IfNeeded(); } finally { reentrantLock.unlock(); } @@ -151,41 +157,39 @@ void output(Collection> records) { } } - private void releaseEventHandles(final boolean result) { - for (EventHandle eventHandle : bufferedEventHandles) { - eventHandle.release(result); - } - - bufferedEventHandles.clear(); - } - - private void flushToS3IfNeeded() { + /** + * @return whether the flush was attempted + */ + private boolean flushToS3IfNeeded(final S3Group s3Group) { LOG.trace("Flush to S3 check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", - currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration()); - if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) { + s3Group.getBuffer().getSize(), s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getDuration()); + if (ThresholdCheck.checkThresholdExceed(s3Group.getBuffer(), maxEvents, maxBytes, maxCollectionDuration)) { try { - codec.complete(currentBuffer.getOutputStream()); - String s3Key = currentBuffer.getKey(); + codec.complete(s3Group.getBuffer().getOutputStream()); + String s3Key = s3Group.getBuffer().getKey(); LOG.info("Writing {} to S3 with {} events and size of {} bytes.", - s3Key, currentBuffer.getEventCount(), currentBuffer.getSize()); - final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key); + s3Key, s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getSize()); + final boolean isFlushToS3 = retryFlushToS3(s3Group.getBuffer(), s3Key); if (isFlushToS3) { LOG.info("Successfully saved {} to S3.", s3Key); - numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount()); + numberOfRecordsSuccessCounter.increment(s3Group.getBuffer().getEventCount()); objectsSucceededCounter.increment(); - s3ObjectSizeSummary.record(currentBuffer.getSize()); - releaseEventHandles(true); + s3ObjectSizeSummary.record(s3Group.getBuffer().getSize()); + s3Group.releaseEventHandles(true); } else { LOG.error("Failed to save {} to S3.", s3Key); - numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); + numberOfRecordsFailedCounter.increment(s3Group.getBuffer().getEventCount()); objectsFailedCounter.increment(); - releaseEventHandles(false); + s3Group.releaseEventHandles(false); } - currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey); + + return true; } catch (final IOException e) { LOG.error("Exception while completing codec", e); } } + + return false; } /** diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java index 825fca5878..1020dd8ac5 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java @@ -20,8 +20,8 @@ */ public class InMemoryBuffer implements Buffer { - private static final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - private static final ByteArrayPositionOutputStream byteArrayPositionOutputStream = new ByteArrayPositionOutputStream(byteArrayOutputStream); + private final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + private final ByteArrayPositionOutputStream byteArrayPositionOutputStream = new ByteArrayPositionOutputStream(byteArrayOutputStream); private final S3Client s3Client; private final Supplier bucketSupplier; private final Supplier keySupplier; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKey.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKey.java index 6c8b05044e..c83ee192ec 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKey.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKey.java @@ -6,6 +6,9 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; import java.util.regex.Pattern; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.s3keyindex.S3ObjectIndexUtility; import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig; import org.slf4j.Logger; @@ -31,10 +34,23 @@ private ObjectKey(){} * @return s3 object path */ public static String buildingPathPrefix(final S3SinkConfig s3SinkConfig) { + return buildingPathPrefixInternal(s3SinkConfig, null, null); + } + + public static String buildingPathPrefix(final S3SinkConfig s3SinkConfig, + final Event event, + final ExpressionEvaluator expressionEvaluator) { + return buildingPathPrefixInternal(s3SinkConfig, event, expressionEvaluator); + } + + private static String buildingPathPrefixInternal(final S3SinkConfig s3SinkConfig, + final Event event, + final ExpressionEvaluator expressionEvaluator) { String pathPrefix = s3SinkConfig.getObjectKeyOptions().getPathPrefix(); + String pathPrefixExpressionResult = expressionEvaluator != null ? event.formatString(pathPrefix, expressionEvaluator) : pathPrefix; StringBuilder s3ObjectPath = new StringBuilder(); - if (pathPrefix != null && !pathPrefix.isEmpty()) { - String[] pathPrefixList = pathPrefix.split("\\/"); + if (pathPrefixExpressionResult != null && !pathPrefixExpressionResult.isEmpty()) { + String[] pathPrefixList = pathPrefixExpressionResult.split("\\/"); for (String prefixPath : pathPrefixList) { if (SIMPLE_DURATION_PATTERN.matcher(prefixPath).find()) { s3ObjectPath.append(S3ObjectIndexUtility.getObjectPathPrefix(prefixPath)).append("/"); @@ -53,14 +69,18 @@ public static String buildingPathPrefix(final S3SinkConfig s3SinkConfig) { * @param codecExtension extension * @return s3 object name with prefix */ - public static String objectFileName(S3SinkConfig s3SinkConfig, String codecExtension) { + public static String objectFileName(final S3SinkConfig s3SinkConfig, + final String codecExtension, + final Event event, + final ExpressionEvaluator expressionEvaluator) { String configNamePattern = s3SinkConfig.getObjectKeyOptions().getNamePattern(); - int extensionIndex = configNamePattern.lastIndexOf('.'); + String configNamePatternExpressionResult = event.formatString(configNamePattern, expressionEvaluator); + int extensionIndex = configNamePatternExpressionResult.lastIndexOf('.'); if (extensionIndex > 0) { - return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePattern.substring(0, extensionIndex)) + "." - + (codecExtension!=null? codecExtension :configNamePattern.substring(extensionIndex + 1)); + return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePatternExpressionResult.substring(0, extensionIndex)) + "." + + (codecExtension!=null? codecExtension :configNamePatternExpressionResult.substring(extensionIndex + 1)); } else { - return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePattern) + "." + + return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePatternExpressionResult) + "." + (codecExtension!=null? codecExtension : DEFAULT_CODEC_FILE_EXTENSION); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3Group.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3Group.java new file mode 100644 index 0000000000..74555842b5 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3Group.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.grouping; + +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; + +import java.util.Collection; +import java.util.LinkedList; + +public class S3Group { + + private final Buffer buffer; + + private final S3GroupIdentifier s3GroupIdentifier; + + private final Collection groupEventHandles; + + public S3Group(final S3GroupIdentifier s3GroupIdentifier, + final Buffer buffer) { + this.buffer = buffer; + this.s3GroupIdentifier = s3GroupIdentifier; + this.groupEventHandles = new LinkedList<>(); + } + + public Buffer getBuffer() { + return buffer; + } + + S3GroupIdentifier getS3GroupIdentifier() { return s3GroupIdentifier; } + + public void addEventHandle(final EventHandle eventHandle) { + groupEventHandles.add(eventHandle); + } + + public void releaseEventHandles(final boolean result) { + for (EventHandle eventHandle : groupEventHandles) { + eventHandle.release(result); + } + + groupEventHandles.clear(); + } + + Collection getGroupEventHandles() { + return groupEventHandles; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java new file mode 100644 index 0000000000..9b65348bae --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.grouping; + +import java.util.Map; +import java.util.Objects; + +class S3GroupIdentifier { + private final Map groupIdentifierHash; + private final String groupIdentifierFullObjectKey; + + public S3GroupIdentifier(final Map groupIdentifierHash, + final String groupIdentifierFullObjectKey) { + this.groupIdentifierHash = groupIdentifierHash; + this.groupIdentifierFullObjectKey = groupIdentifierFullObjectKey; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + S3GroupIdentifier that = (S3GroupIdentifier) o; + return Objects.equals(groupIdentifierHash, that.groupIdentifierHash); + } + + @Override + public int hashCode() { + return Objects.hash(groupIdentifierHash); + } + + public String getGroupIdentifierFullObjectKey() { return groupIdentifierFullObjectKey; } + + public Map getGroupIdentifierHash() { return groupIdentifierHash; } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java new file mode 100644 index 0000000000..0387f6f683 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.grouping; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.sink.s3.KeyGenerator; +import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class S3GroupIdentifierFactory { + + private final KeyGenerator keyGenerator; + + private final List dynamicEventsKeys; + + private final List dynamicExpressions; + + private final ExpressionEvaluator expressionEvaluator; + + private final S3SinkConfig s3SinkConfig; + + public S3GroupIdentifierFactory(final KeyGenerator keyGenerator, + final ExpressionEvaluator expressionEvaluator, + final S3SinkConfig s3SinkConfig) { + this.keyGenerator = keyGenerator; + this.expressionEvaluator = expressionEvaluator; + this.s3SinkConfig = s3SinkConfig; + + dynamicExpressions = expressionEvaluator.extractDynamicExpressionsFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getPathPrefix()); + dynamicExpressions.addAll(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getNamePattern())); + + dynamicEventsKeys = expressionEvaluator.extractDynamicKeysFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getPathPrefix()); + dynamicEventsKeys.addAll(expressionEvaluator.extractDynamicKeysFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getNamePattern())); + } + + + public S3GroupIdentifier getS3GroupIdentifierForEvent(final Event event) { + + final String fullObjectKey = keyGenerator.generateKeyForEvent(event); + final Map groupIdentificationHash = new HashMap<>(); + + for (final String key : dynamicEventsKeys) { + final Object value = event.get(key, Object.class); + groupIdentificationHash.put(key, value); + } + + for (final String expression : dynamicExpressions) { + final Object value = expressionEvaluator.evaluate(expression, event); + groupIdentificationHash.put(expression, value); + } + + return new S3GroupIdentifier(groupIdentificationHash, fullObjectKey); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java new file mode 100644 index 0000000000..a489a505e8 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.grouping; + +import com.google.common.collect.Maps; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig; +import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; +import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.S3Client; + +import java.util.Collection; +import java.util.Map; + +public class S3GroupManager { + + private static final Logger LOG = LoggerFactory.getLogger(S3GroupManager.class); + + private final Map allGroups = Maps.newConcurrentMap(); + private final S3SinkConfig s3SinkConfig; + private final S3GroupIdentifierFactory s3GroupIdentifierFactory; + private final BufferFactory bufferFactory; + + private final S3Client s3Client; + + public S3GroupManager(final S3SinkConfig s3SinkConfig, + final S3GroupIdentifierFactory s3GroupIdentifierFactory, + final BufferFactory bufferFactory, + final S3Client s3Client) { + this.s3SinkConfig = s3SinkConfig; + this.s3GroupIdentifierFactory = s3GroupIdentifierFactory; + this.bufferFactory = bufferFactory; + this.s3Client = s3Client; + } + + public boolean hasNoGroups() { + return allGroups.isEmpty(); + } + + public void removeGroup(final S3Group s3Group) { + allGroups.remove(s3Group.getS3GroupIdentifier()); + } + + public Collection getS3GroupEntries() { + return allGroups.values(); + } + + public S3Group getOrCreateGroupForEvent(final Event event) { + + final S3GroupIdentifier s3GroupIdentifier = s3GroupIdentifierFactory.getS3GroupIdentifierForEvent(event); + + if (allGroups.containsKey(s3GroupIdentifier)) { + return allGroups.get(s3GroupIdentifier); + } else { + final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3SinkConfig::getBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey); + final S3Group s3Group = new S3Group(s3GroupIdentifier, bufferForNewGroup); + allGroups.put(s3GroupIdentifier, s3Group); + LOG.debug("Created a new S3 group. Total number of groups: {}", allGroups.size()); + return s3Group; + } + } + + +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGeneratorTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGeneratorTest.java index 8b9b5f99ed..64189cd939 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGeneratorTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGeneratorTest.java @@ -9,26 +9,24 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.TimeZone; import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class KeyGeneratorTest { - public static final String OBJECT_KEY_NAME_PATTERN_START = "events-"; - public static final String OBJECT_KEY_NAME_PATTERN = OBJECT_KEY_NAME_PATTERN_START + "%{yyyy-MM-dd'T'hh-mm-ss}"; - @Mock private S3SinkConfig s3SinkConfig; @@ -36,54 +34,63 @@ class KeyGeneratorTest { private ExtensionProvider extensionProvider; @Mock - private ObjectKeyOptions objectKeyOptions; + private ExpressionEvaluator expressionEvaluator; @BeforeEach void setUp() { - when(s3SinkConfig.getObjectKeyOptions()).thenReturn(objectKeyOptions); - when(objectKeyOptions.getNamePattern()).thenReturn(OBJECT_KEY_NAME_PATTERN); - } - private KeyGenerator createObjectUnderTest() { - return new KeyGenerator(s3SinkConfig, extensionProvider); } - @Test - void test_generateKey_with_general_prefix() { - String pathPrefix = "events/"; - when(s3SinkConfig.getObjectKeyOptions().getPathPrefix()).thenReturn(pathPrefix); - String key = createObjectUnderTest().generateKey(); - assertNotNull(key); - assertThat(key, true); - assertThat(key, key.contains(pathPrefix)); + private KeyGenerator createObjectUnderTest() { + return new KeyGenerator(s3SinkConfig, extensionProvider, expressionEvaluator); } @Test void test_generateKey_with_date_prefix() { String pathPrefix = "logdata/"; - String datePattern = "%{yyyy}/%{MM}/%{dd}/"; - - DateTimeFormatter fomatter = DateTimeFormatter.ofPattern("yyyy/MM/dd"); - ZonedDateTime zdt = LocalDateTime.now().atZone(ZoneId.systemDefault()) - .withZoneSameInstant(ZoneId.of(TimeZone.getTimeZone("UTC").getID())); - String dateString = fomatter.format(zdt); - - when(s3SinkConfig.getObjectKeyOptions() - .getPathPrefix()).thenReturn(pathPrefix + datePattern); - String key = createObjectUnderTest().generateKey(); - assertNotNull(key); - assertThat(key, true); - assertThat(key, key.contains(pathPrefix + dateString)); + final String objectName = UUID.randomUUID().toString(); + when(extensionProvider.getExtension()).thenReturn(null); + + final KeyGenerator objectUnderTest = createObjectUnderTest(); + + final Event event = mock(Event.class); + + try (final MockedStatic objectKeyMockedStatic = mockStatic(ObjectKey.class)) { + + objectKeyMockedStatic.when(() -> ObjectKey.buildingPathPrefix(s3SinkConfig, event, expressionEvaluator)) + .thenReturn(pathPrefix); + objectKeyMockedStatic.when(() -> ObjectKey.objectFileName(s3SinkConfig, null, event, expressionEvaluator)) + .thenReturn(objectName); + + String key = objectUnderTest.generateKeyForEvent(event); + assertNotNull(key); + assertThat(key, true); + assertThat(key.contains(pathPrefix), equalTo(true)); + assertThat(key.contains(objectName), equalTo(true)); + } } @Test - void generateKey_ends_with_extension() { + void generateKey_with_extension() { String extension = UUID.randomUUID().toString(); + final String objectName = UUID.randomUUID().toString(); when(extensionProvider.getExtension()).thenReturn(extension); String pathPrefix = "events/"; - when(s3SinkConfig.getObjectKeyOptions().getPathPrefix()).thenReturn(pathPrefix); - String key = createObjectUnderTest().generateKey(); - assertThat(key, notNullValue()); - assertThat(key, key.endsWith("." + extension)); + + final Event event = mock(Event.class); + final KeyGenerator objectUnderTest = createObjectUnderTest(); + try (final MockedStatic objectKeyMockedStatic = mockStatic(ObjectKey.class)) { + + objectKeyMockedStatic.when(() -> ObjectKey.buildingPathPrefix(s3SinkConfig, event, expressionEvaluator)) + .thenReturn(pathPrefix); + objectKeyMockedStatic.when(() -> ObjectKey.objectFileName(s3SinkConfig, extension, event, expressionEvaluator)) + .thenReturn(objectName); + + String key = objectUnderTest.generateKeyForEvent(event); + assertThat(key, notNullValue()); + assertThat(key.contains(pathPrefix), equalTo(true)); + assertThat(key.contains(objectName), equalTo(true)); + } + } } \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java index 7160660137..d3f0edb8a2 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java @@ -12,26 +12,26 @@ import org.junit.jupiter.api.Test; import org.mockito.InOrder; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; -import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; -import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBuffer; -import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions; +import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3Group; +import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupManager; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; @@ -81,13 +81,14 @@ class S3SinkServiceTest { private OutputCodecContext codecContext; private KeyGenerator keyGenerator = mock(KeyGenerator.class); private PluginMetrics pluginMetrics; - private BufferFactory bufferFactory; private Counter snapshotSuccessCounter; private DistributionSummary s3ObjectSizeSummary; private Random random; private String tagsTargetKey; private AcknowledgementSet acknowledgementSet; + private S3GroupManager s3GroupManager; + @BeforeEach void setUp() { @@ -111,7 +112,7 @@ void setUp() { Counter numberOfRecordsFailedCounter = mock(Counter.class); s3ObjectSizeSummary = mock(DistributionSummary.class); - bufferFactory = new InMemoryBufferFactory(); + s3GroupManager = mock(S3GroupManager.class); when(objectKeyOptions.getNamePattern()).thenReturn(OBJECT_KEY_NAME_PATTERN); when(s3SinkConfig.getMaxUploadRetries()).thenReturn(MAX_RETRIES); @@ -143,7 +144,7 @@ private DefaultEventHandle castToDefaultHandle(EventHandle eventHandle) { } private S3SinkService createObjectUnderTest() { - return new S3SinkService(s3SinkConfig, bufferFactory, codec, codecContext, s3Client, keyGenerator, Duration.ofMillis(100), pluginMetrics); + return new S3SinkService(s3SinkConfig, codec, codecContext, s3Client, keyGenerator, Duration.ofMillis(100), pluginMetrics, s3GroupManager); } @Test @@ -155,15 +156,19 @@ void test_s3SinkService_notNull() { @Test void test_output_with_threshold_set_as_more_then_zero_event_count() throws IOException { - bufferFactory = mock(BufferFactory.class); InMemoryBuffer buffer = mock(InMemoryBuffer.class); when(buffer.getEventCount()).thenReturn(10); doNothing().when(buffer).flushToS3(); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(5); final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); + when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); doNothing().when(codec).writeEvent(event, outputStream); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); @@ -177,16 +182,19 @@ void test_output_with_threshold_set_as_more_then_zero_event_count() throws IOExc @Test void test_output_with_threshold_set_as_zero_event_count() throws IOException { - bufferFactory = mock(BufferFactory.class); InMemoryBuffer buffer = mock(InMemoryBuffer.class); when(buffer.getSize()).thenReturn(25500L); doNothing().when(buffer).flushToS3(); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(0); when(s3SinkConfig.getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse("2kb")); final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); + when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); doNothing().when(codec).writeEvent(event, outputStream); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); @@ -197,14 +205,18 @@ void test_output_with_threshold_set_as_zero_event_count() throws IOException { @Test void test_output_with_uploadedToS3_success() throws IOException { - bufferFactory = mock(BufferFactory.class); InMemoryBuffer buffer = mock(InMemoryBuffer.class); when(buffer.getEventCount()).thenReturn(10); doNothing().when(buffer).flushToS3(); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); + when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); + doNothing().when(codec).writeEvent(event, outputStream); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); @@ -216,15 +228,19 @@ void test_output_with_uploadedToS3_success() throws IOException { @Test void test_output_with_uploadedToS3_success_records_byte_count() throws IOException { - bufferFactory = mock(BufferFactory.class); Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); final long objectSize = random.nextInt(1_000_000) + 10_000; when(buffer.getSize()).thenReturn(objectSize); final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); + when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); s3SinkService.output(generateRandomStringEventRecord()); @@ -235,17 +251,21 @@ void test_output_with_uploadedToS3_success_records_byte_count() throws IOExcepti @Test void test_output_with_uploadedToS3_midBatch_generatesNewOutputStream() throws IOException { - bufferFactory = mock(BufferFactory.class); InMemoryBuffer buffer = mock(InMemoryBuffer.class); when(buffer.getEventCount()).thenReturn(10); doNothing().when(buffer).flushToS3(); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); final OutputStream outputStream1 = mock(OutputStream.class); final OutputStream outputStream2 = mock(OutputStream.class); when(buffer.getOutputStream()) .thenReturn(outputStream1) .thenReturn(outputStream2); + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); + when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); + doNothing().when(codec).writeEvent(any(), eq(outputStream1)); doNothing().when(codec).writeEvent(any(), eq(outputStream2)); @@ -267,6 +287,14 @@ void test_output_with_uploadedToS3_failed() throws IOException { final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); doNothing().when(codec).writeEvent(event, outputStream); + + final S3Group s3Group = mock(S3Group.class); + Buffer buffer = mock(Buffer.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); + when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); + S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); assertThat(s3SinkService, instanceOf(S3SinkService.class)); @@ -277,9 +305,7 @@ void test_output_with_uploadedToS3_failed() throws IOException { @Test void test_output_with_uploadedToS3_failure_does_not_record_byte_count() throws IOException { - bufferFactory = mock(BufferFactory.class); Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); doThrow(AwsServiceException.class).when(buffer).flushToS3(); @@ -288,6 +314,12 @@ void test_output_with_uploadedToS3_failure_does_not_record_byte_count() throws I final S3SinkService s3SinkService = createObjectUnderTest(); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); + when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); + final OutputStream outputStream = mock(OutputStream.class); doNothing().when(codec).writeEvent(event, outputStream); s3SinkService.output(Collections.singletonList(new Record<>(event))); @@ -299,13 +331,18 @@ void test_output_with_uploadedToS3_failure_does_not_record_byte_count() throws I @Test void test_output_with_no_incoming_records_flushes_batch() throws IOException { - bufferFactory = mock(BufferFactory.class); Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); when(buffer.getEventCount()).thenReturn(10); final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(event)).thenReturn(s3Group); + when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); s3SinkService.output(Collections.emptyList()); @@ -316,37 +353,29 @@ void test_output_with_no_incoming_records_flushes_batch() throws IOException { @Test void test_output_with_no_incoming_records_or_buffered_records_short_circuits() throws IOException { - - bufferFactory = mock(BufferFactory.class); - Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); - when(buffer.getEventCount()).thenReturn(0); - final long objectSize = random.nextInt(1_000_000) + 10_000; - when(buffer.getSize()).thenReturn(objectSize); - - final OutputStream outputStream = mock(OutputStream.class); - final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream); + when(s3GroupManager.hasNoGroups()).thenReturn(true); final S3SinkService s3SinkService = createObjectUnderTest(); s3SinkService.output(Collections.emptyList()); verify(snapshotSuccessCounter, times(0)).increment(); - verify(buffer, times(0)).flushToS3(); } @Test void test_retryFlushToS3_positive() throws InterruptedException, IOException { - - bufferFactory = mock(BufferFactory.class); InMemoryBuffer buffer = mock(InMemoryBuffer.class); doNothing().when(buffer).flushToS3(); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); assertNotNull(buffer); OutputStream outputStream = buffer.getOutputStream(); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(event)).thenReturn(s3Group); + when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); + codec.writeEvent(event, outputStream); final String s3Key = UUID.randomUUID().toString(); boolean isUploadedToS3 = s3SinkService.retryFlushToS3(buffer, s3Key); @@ -355,14 +384,18 @@ void test_retryFlushToS3_positive() throws InterruptedException, IOException { @Test void test_retryFlushToS3_negative() throws InterruptedException, IOException { - bufferFactory = mock(BufferFactory.class); InMemoryBuffer buffer = mock(InMemoryBuffer.class); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); when(s3SinkConfig.getBucketName()).thenReturn(""); S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); OutputStream outputStream = buffer.getOutputStream(); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(event)).thenReturn(s3Group); + when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); + codec.writeEvent(event, outputStream); final String s3Key = UUID.randomUUID().toString(); doThrow(AwsServiceException.class).when(buffer).flushToS3(); @@ -373,74 +406,73 @@ void test_retryFlushToS3_negative() throws InterruptedException, IOException { @Test void output_will_release_all_handles_since_a_flush() throws IOException { - bufferFactory = mock(BufferFactory.class); final Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); final long objectSize = random.nextInt(1_000_000) + 10_000; when(buffer.getSize()).thenReturn(objectSize); final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); + when(s3GroupManager.getS3GroupEntries()).thenReturn(Collections.singletonList(s3Group)); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); - for (DefaultEventHandle eventHandle : eventHandles) { - eventHandle.setAcknowledgementSet(acknowledgementSet); - } s3SinkService.output(records); - for (EventHandle eventHandle : eventHandles) { - verify(acknowledgementSet).release(eventHandle, true); + InOrder inOrder = inOrder(s3Group); + for (final EventHandle eventHandle : eventHandles) { + inOrder.verify(s3Group).addEventHandle(eventHandle); } + inOrder.verify(s3Group).releaseEventHandles(true); } @Test void output_will_skip_releasing_events_without_EventHandle_objects() throws IOException { - bufferFactory = mock(BufferFactory.class); final Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); - final long objectSize = random.nextInt(1_000_000) + 10_000; when(buffer.getSize()).thenReturn(objectSize); final OutputStream outputStream = mock(OutputStream.class); final Event event1 = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); + doNothing().when(codec).writeEvent(event1, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); - for (DefaultEventHandle eventHandle : eventHandles) { - eventHandle.setAcknowledgementSet(acknowledgementSet); - } s3SinkService.output(records); - for (EventHandle eventHandle : eventHandles) { - verify(acknowledgementSet).release(eventHandle, true); - } final Collection> records2 = generateRandomStringEventRecord(); final List eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); - for (DefaultEventHandle eventHandle : eventHandles2) { - eventHandle.setAcknowledgementSet(acknowledgementSet); - } - s3SinkService.output(records2); - for (EventHandle eventHandle : eventHandles2) { - verify(acknowledgementSet).release(eventHandle, true); + InOrder inOrder = inOrder(s3Group); + for (final EventHandle eventHandle : eventHandles) { + inOrder.verify(s3Group).addEventHandle(eventHandle); + } + inOrder.verify(s3Group).releaseEventHandles(true); + for (final EventHandle eventHandle : eventHandles2) { + inOrder.verify(s3Group).addEventHandle(eventHandle); } + inOrder.verify(s3Group).releaseEventHandles(true); } @Test void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOException { - bufferFactory = mock(BufferFactory.class); final Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); doThrow(AwsServiceException.class).when(buffer).flushToS3(); @@ -449,60 +481,62 @@ void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOExce final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final List> records = generateEventRecords(1); final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); - for (DefaultEventHandle eventHandle : eventHandles) { - eventHandle.setAcknowledgementSet(acknowledgementSet); - } s3SinkService.output(records); - for (EventHandle eventHandle : eventHandles) { - verify(acknowledgementSet).release(eventHandle, false); + InOrder inOrder = inOrder(s3Group); + for (final EventHandle eventHandle : eventHandles) { + inOrder.verify(s3Group).addEventHandle(eventHandle); } + inOrder.verify(s3Group).releaseEventHandles(false); } @Test void output_will_release_only_new_handles_since_a_flush() throws IOException { - bufferFactory = mock(BufferFactory.class); final Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); final long objectSize = random.nextInt(1_000_000) + 10_000; when(buffer.getSize()).thenReturn(objectSize); final OutputStream outputStream = mock(OutputStream.class); final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); + doNothing().when(codec).writeEvent(event, outputStream); final S3SinkService s3SinkService = createObjectUnderTest(); final Collection> records = generateRandomStringEventRecord(); final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); - for (DefaultEventHandle eventHandle : eventHandles) { - eventHandle.setAcknowledgementSet(acknowledgementSet); - } s3SinkService.output(records); - for (EventHandle eventHandle : eventHandles) { - verify(acknowledgementSet).release(eventHandle, true); - } final Collection> records2 = generateRandomStringEventRecord(); final List eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); - for (DefaultEventHandle eventHandle : eventHandles2) { - eventHandle.setAcknowledgementSet(acknowledgementSet); - } s3SinkService.output(records2); - for (EventHandle eventHandle : eventHandles2) { - verify(acknowledgementSet).release(eventHandle, true); - } + InOrder inOrder = inOrder(s3Group); + for (final EventHandle eventHandle : eventHandles) { + inOrder.verify(s3Group).addEventHandle(eventHandle); + } + inOrder.verify(s3Group).releaseEventHandles(true); + for (final EventHandle eventHandle : eventHandles2) { + inOrder.verify(s3Group).addEventHandle(eventHandle); + } + inOrder.verify(s3Group).releaseEventHandles(true); } @Test void output_will_skip_and_drop_failed_records() throws IOException { - bufferFactory = mock(BufferFactory.class); final Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); final long objectSize = random.nextInt(1_000_000) + 10_000; when(buffer.getSize()).thenReturn(objectSize); @@ -514,6 +548,11 @@ void output_will_skip_and_drop_failed_records() throws IOException { List> records = generateEventRecords(2); Event event1 = records.get(0).getData(); Event event2 = records.get(1).getData(); + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); + DefaultEventHandle eventHandle1 = (DefaultEventHandle)event1.getEventHandle(); DefaultEventHandle eventHandle2 = (DefaultEventHandle)event2.getEventHandle(); eventHandle1.setAcknowledgementSet(acknowledgementSet); @@ -523,22 +562,23 @@ void output_will_skip_and_drop_failed_records() throws IOException { createObjectUnderTest().output(records); - InOrder inOrder = inOrder(codec); + InOrder inOrder = inOrder(codec, s3Group); inOrder.verify(codec).start(eq(outputStream), eq(event1), any()); inOrder.verify(codec).writeEvent(event1, outputStream); + inOrder.verify(s3Group, never()).addEventHandle(eventHandle1); + inOrder.verify(s3Group).releaseEventHandles(true); inOrder.verify(codec).writeEvent(event2, outputStream); + inOrder.verify(s3Group).addEventHandle(eventHandle2); + inOrder.verify(s3Group).releaseEventHandles(true); verify(acknowledgementSet).release(eventHandle1, false); verify(acknowledgementSet, never()).release(eventHandle1, true); - verify(acknowledgementSet).release(eventHandle2, true); verify(acknowledgementSet, never()).release(eventHandle2, false); } @Test void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws IOException { - bufferFactory = mock(BufferFactory.class); final Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); doThrow(AwsServiceException.class).when(buffer).flushToS3(); @@ -546,29 +586,30 @@ void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws I when(buffer.getSize()).thenReturn(objectSize); final OutputStream outputStream = mock(OutputStream.class); - final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); - doNothing().when(codec).writeEvent(event, outputStream); + final S3Group s3Group = mock(S3Group.class); + when(s3Group.getBuffer()).thenReturn(buffer); + + when(s3GroupManager.getOrCreateGroupForEvent(any(Event.class))).thenReturn(s3Group); + + doNothing().when(codec).writeEvent(any(Event.class), eq(outputStream)); final S3SinkService s3SinkService = createObjectUnderTest(); final List> records = generateEventRecords(1); final List eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); - for (DefaultEventHandle eventHandle : eventHandles) { - eventHandle.setAcknowledgementSet(acknowledgementSet); - } s3SinkService.output(records); - for (EventHandle eventHandle : eventHandles) { - verify(acknowledgementSet).release(eventHandle, false); - } final List> records2 = generateEventRecords(1); final List eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).map(this::castToDefaultHandle).collect(Collectors.toList()); - for (DefaultEventHandle eventHandle : eventHandles2) { - eventHandle.setAcknowledgementSet(acknowledgementSet); - } s3SinkService.output(records2); - for (EventHandle eventHandle : eventHandles2) { - verify(acknowledgementSet).release(eventHandle, false); + InOrder inOrder = inOrder(s3Group); + for (final EventHandle eventHandle : eventHandles) { + inOrder.verify(s3Group).addEventHandle(eventHandle); + } + inOrder.verify(s3Group).releaseEventHandles(false); + for (final EventHandle eventHandle : eventHandles2) { + inOrder.verify(s3Group).addEventHandle(eventHandle); } + inOrder.verify(s3Group).releaseEventHandles(false); } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java index 553f96d2fb..25f26f07b2 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java @@ -10,6 +10,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -37,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -58,6 +60,7 @@ class S3SinkTest { private PluginFactory pluginFactory; private AwsCredentialsSupplier awsCredentialsSupplier; private SinkContext sinkContext; + private ExpressionEvaluator expressionEvaluator; private OutputCodec codec; @BeforeEach @@ -73,6 +76,7 @@ void setUp() { PluginModel pluginModel = mock(PluginModel.class); pluginFactory = mock(PluginFactory.class); awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); + expressionEvaluator = mock(ExpressionEvaluator.class); when(s3SinkConfig.getBufferType()).thenReturn(BufferTypeOptions.INMEMORY); when(s3SinkConfig.getThresholdOptions()).thenReturn(thresholdOptions); @@ -89,10 +93,12 @@ void setUp() { when(pluginSetting.getName()).thenReturn(SINK_PLUGIN_NAME); when(pluginSetting.getPipelineName()).thenReturn(SINK_PIPELINE_NAME); when(s3SinkConfig.getBucketName()).thenReturn(BUCKET_NAME); + when(s3SinkConfig.getObjectKeyOptions()).thenReturn(objectKeyOptions); + when(expressionEvaluator.isValidFormatExpression(anyString())).thenReturn(true); } private S3Sink createObjectUnderTest() { - return new S3Sink(pluginSetting, s3SinkConfig, pluginFactory, sinkContext, awsCredentialsSupplier); + return new S3Sink(pluginSetting, s3SinkConfig, pluginFactory, sinkContext, awsCredentialsSupplier, expressionEvaluator); } @Test diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKeyTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKeyTest.java index 1d10b7b7a3..3377b2b516 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKeyTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKeyTest.java @@ -11,9 +11,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.model.configuration.PluginModel; -import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; @@ -24,18 +23,16 @@ @ExtendWith(MockitoExtension.class) class ObjectKeyTest { - @Mock - private ObjectKey objectKey; @Mock private S3SinkConfig s3SinkConfig; @Mock - private PluginModel pluginModel; - @Mock - private PluginSetting pluginSetting; + private ObjectKeyOptions objectKeyOptions; + @Mock - private PluginFactory pluginFactory; + private ExpressionEvaluator expressionEvaluator; + @Mock - private ObjectKeyOptions objectKeyOptions; + private Event event; @BeforeEach void setUp() throws Exception { @@ -44,38 +41,46 @@ void setUp() throws Exception { @Test void test_buildingPathPrefix() { + final String pathPrefix = "events/%{yyyy}/%{MM}/%{dd}/"; - when(objectKeyOptions.getPathPrefix()).thenReturn("events/%{yyyy}/%{MM}/%{dd}/"); - String pathPrefix = ObjectKey.buildingPathPrefix(s3SinkConfig); - Assertions.assertNotNull(pathPrefix); - assertThat(pathPrefix, startsWith("events")); + when(objectKeyOptions.getPathPrefix()).thenReturn(pathPrefix); + when(event.formatString(pathPrefix, expressionEvaluator)).thenReturn(pathPrefix); + String pathPrefixResult = ObjectKey.buildingPathPrefix(s3SinkConfig, event, expressionEvaluator); + Assertions.assertNotNull(pathPrefixResult); + assertThat(pathPrefixResult, startsWith("events")); } @Test void test_objectFileName() { + final String namePattern = "my-elb-%{yyyy-MM-dd'T'hh-mm-ss}"; - when(objectKeyOptions.getNamePattern()).thenReturn("my-elb-%{yyyy-MM-dd'T'hh-mm-ss}"); - String objectFileName = ObjectKey.objectFileName(s3SinkConfig, null); + when(objectKeyOptions.getNamePattern()).thenReturn(namePattern); + when(event.formatString(namePattern, expressionEvaluator)).thenReturn(namePattern); + String objectFileName = ObjectKey.objectFileName(s3SinkConfig, null, event, expressionEvaluator); Assertions.assertNotNull(objectFileName); assertThat(objectFileName, startsWith("my-elb")); } @Test void test_objectFileName_with_fileExtension() { + final String namePattern = "events-%{yyyy-MM-dd'T'hh-mm-ss}.pdf"; when(s3SinkConfig.getObjectKeyOptions().getNamePattern()) - .thenReturn("events-%{yyyy-MM-dd'T'hh-mm-ss}.pdf"); - String objectFileName = ObjectKey.objectFileName(s3SinkConfig, null); + .thenReturn(namePattern); + when(event.formatString(namePattern, expressionEvaluator)).thenReturn(namePattern); + String objectFileName = ObjectKey.objectFileName(s3SinkConfig, null, event, expressionEvaluator); Assertions.assertNotNull(objectFileName); Assertions.assertTrue(objectFileName.contains(".pdf")); } @Test void test_objectFileName_default_fileExtension() { + final String namePattern = "events-%{yyyy-MM-dd'T'hh-mm-ss}"; when(s3SinkConfig.getObjectKeyOptions().getNamePattern()) - .thenReturn("events-%{yyyy-MM-dd'T'hh-mm-ss}"); - String objectFileName = ObjectKey.objectFileName(s3SinkConfig, null); + .thenReturn(namePattern); + when(event.formatString(namePattern, expressionEvaluator)).thenReturn(namePattern); + String objectFileName = ObjectKey.objectFileName(s3SinkConfig, null, event, expressionEvaluator); Assertions.assertNotNull(objectFileName); Assertions.assertTrue(objectFileName.contains(".json")); } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactoryTest.java new file mode 100644 index 0000000000..e76f8ecaaf --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactoryTest.java @@ -0,0 +1,106 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.grouping; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.sink.s3.KeyGenerator; +import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig; +import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class S3GroupIdentifierFactoryTest { + + private KeyGenerator keyGenerator; + + private S3SinkConfig s3SinkConfig; + + private ExpressionEvaluator expressionEvaluator; + + @BeforeEach + void setup() { + keyGenerator = mock(KeyGenerator.class); + expressionEvaluator = mock(ExpressionEvaluator.class); + s3SinkConfig = mock(S3SinkConfig.class); + + final String pathPrefix = UUID.randomUUID().toString(); + final String objectName = UUID.randomUUID().toString(); + final ObjectKeyOptions objectKeyOptions = mock(ObjectKeyOptions.class); + when(objectKeyOptions.getNamePattern()).thenReturn(objectName); + when(objectKeyOptions.getPathPrefix()).thenReturn(pathPrefix); + + when(s3SinkConfig.getObjectKeyOptions()).thenReturn(objectKeyOptions); + } + + private S3GroupIdentifierFactory createObjectUnderTest() { + return new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig); + } + + @Test + void getS3GroupIdentifierForEvent_returns_expected_s3GroupIdentifier() { + final String dynamicKeyPathPrefix = UUID.randomUUID().toString(); + final String dynamicValuePathPrefix = UUID.randomUUID().toString(); + final String dynamicExpressionPathPrefix = UUID.randomUUID().toString(); + final String dynamicExpressionResultPathPrefix = UUID.randomUUID().toString(); + final String dynamicKeyObjectName = UUID.randomUUID().toString(); + final String dynamicValueObjectName = UUID.randomUUID().toString(); + final String dynamicExpressionObjectName = UUID.randomUUID().toString(); + final String dynamicExpressionResultObjectName = UUID.randomUUID().toString(); + + final Map expectedIdentificationHash = Map.of( + dynamicKeyPathPrefix, dynamicValuePathPrefix, + dynamicExpressionPathPrefix, dynamicExpressionResultPathPrefix, + dynamicKeyObjectName, dynamicValueObjectName, + dynamicExpressionObjectName, dynamicExpressionResultObjectName + ); + final String expectedFullObjectKey = UUID.randomUUID().toString(); + final Event event = mock(Event.class); + + final List expectedDynamicKeysPathPrefix = new ArrayList<>(); + expectedDynamicKeysPathPrefix.add(dynamicKeyPathPrefix); + + final List expectedDynamicExpressionsPathPrefix = new ArrayList<>(); + expectedDynamicExpressionsPathPrefix.add(dynamicExpressionPathPrefix); + + when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getPathPrefix())) + .thenReturn(expectedDynamicExpressionsPathPrefix); + when(expressionEvaluator.extractDynamicKeysFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getPathPrefix())) + .thenReturn(expectedDynamicKeysPathPrefix); + when(event.get(dynamicKeyPathPrefix, Object.class)).thenReturn(dynamicValuePathPrefix); + when(expressionEvaluator.evaluate(dynamicExpressionPathPrefix, event)) + .thenReturn(dynamicExpressionResultPathPrefix); + + when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getNamePattern())) + .thenReturn(List.of(dynamicExpressionObjectName)); + when(expressionEvaluator.extractDynamicKeysFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getNamePattern())) + .thenReturn(List.of(dynamicKeyObjectName)); + when(event.get(dynamicKeyObjectName, Object.class)).thenReturn(dynamicValueObjectName); + when(expressionEvaluator.evaluate(dynamicExpressionObjectName, event)) + .thenReturn(dynamicExpressionResultObjectName); + + when(keyGenerator.generateKeyForEvent(event)).thenReturn(expectedFullObjectKey); + + final S3GroupIdentifierFactory objectUnderTest = createObjectUnderTest(); + + final S3GroupIdentifier result = objectUnderTest.getS3GroupIdentifierForEvent(event); + + assertThat(result, notNullValue()); + assertThat(result.getGroupIdentifierFullObjectKey(), equalTo(expectedFullObjectKey)); + assertThat(result.getGroupIdentifierHash(), equalTo(expectedIdentificationHash)); + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierTest.java new file mode 100644 index 0000000000..e8388a3b41 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierTest.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.grouping; + + +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class S3GroupIdentifierTest { + + @Test + void S3GroupIdentifier_with_the_same_identificationHash_and_different_fullObjectKey_are_considered_equal() { + final Map identificationHash = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + final String groupOneFullObjectKey = UUID.randomUUID().toString(); + final String groupTwoFullObjectKey = UUID.randomUUID().toString(); + + final S3GroupIdentifier s3GroupIdentifier = new S3GroupIdentifier(identificationHash, groupOneFullObjectKey); + final S3GroupIdentifier seconds3GroupIdentifier = new S3GroupIdentifier(identificationHash, groupTwoFullObjectKey); + + assertThat(s3GroupIdentifier.equals(seconds3GroupIdentifier), equalTo(true)); + assertThat(s3GroupIdentifier.hashCode(), equalTo(seconds3GroupIdentifier.hashCode())); + } + + @Test + void S3GroupIdentifier_with_different_identificationHash_is_not_considered_equal() { + final Map identificationHashOne = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + final Map identificationHashTwo = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + final String groupOneFullObjectKey = UUID.randomUUID().toString(); + final String groupTwoFullObjectKey = UUID.randomUUID().toString(); + + final S3GroupIdentifier s3GroupIdentifier = new S3GroupIdentifier(identificationHashOne, groupOneFullObjectKey); + final S3GroupIdentifier seconds3GroupIdentifier = new S3GroupIdentifier(identificationHashTwo, groupTwoFullObjectKey); + + assertThat(s3GroupIdentifier.equals(seconds3GroupIdentifier), equalTo(false)); + assertNotEquals(s3GroupIdentifier.hashCode(), seconds3GroupIdentifier.hashCode()); + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java new file mode 100644 index 0000000000..e2133ef9b6 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java @@ -0,0 +1,119 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.grouping; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig; +import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; +import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; +import software.amazon.awssdk.services.s3.S3Client; + +import java.util.Collection; +import java.util.function.Supplier; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class S3GroupManagerTest { + + @Mock + private S3SinkConfig s3SinkConfig; + + @Mock + private S3GroupIdentifierFactory s3GroupIdentifierFactory; + + @Mock + private BufferFactory bufferFactory; + + @Mock + private S3Client s3Client; + + private S3GroupManager createObjectUnderTest() { + return new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, s3Client); + } + + @Test + void hasNoGroups_returns_true_when_there_are_no_groups() { + final S3GroupManager objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.hasNoGroups(), equalTo(true)); + } + + @Test + void getOrCreateGroupForEvent_creates_expected_group_when_it_does_not_exist() { + final Event event = mock(Event.class); + final S3GroupIdentifier s3GroupIdentifier = mock(S3GroupIdentifier.class); + when(s3GroupIdentifierFactory.getS3GroupIdentifierForEvent(event)).thenReturn(s3GroupIdentifier); + + final Buffer buffer = mock(Buffer.class); + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class))) + .thenReturn(buffer); + + final S3GroupManager objectUnderTest = createObjectUnderTest(); + + final S3Group result = objectUnderTest.getOrCreateGroupForEvent(event); + + assertThat(result, notNullValue()); + assertThat(result.getS3GroupIdentifier(), equalTo(s3GroupIdentifier)); + assertThat(result.getBuffer(), equalTo(buffer)); + + final Collection groups = objectUnderTest.getS3GroupEntries(); + assertThat(groups, notNullValue()); + assertThat(groups.size(), equalTo(1)); + + assertThat(groups.contains(result), equalTo(true)); + assertThat(objectUnderTest.hasNoGroups(), equalTo(false)); + } + + @Test + void getOrCreateGroupForEvent_returns_expected_group_when_it_exists() { + final Event event = mock(Event.class); + final S3GroupIdentifier s3GroupIdentifier = mock(S3GroupIdentifier.class); + when(s3GroupIdentifierFactory.getS3GroupIdentifierForEvent(event)).thenReturn(s3GroupIdentifier); + + final Buffer buffer = mock(Buffer.class); + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class))) + .thenReturn(buffer); + + final S3GroupManager objectUnderTest = createObjectUnderTest(); + + final S3Group result = objectUnderTest.getOrCreateGroupForEvent(event); + + assertThat(result, notNullValue()); + assertThat(result.getS3GroupIdentifier(), equalTo(s3GroupIdentifier)); + assertThat(result.getBuffer(), equalTo(buffer)); + + final Event secondEvent = mock(Event.class); + when(s3GroupIdentifierFactory.getS3GroupIdentifierForEvent(secondEvent)).thenReturn(s3GroupIdentifier); + final S3Group secondResult = objectUnderTest.getOrCreateGroupForEvent(secondEvent); + + assertThat(secondResult, notNullValue()); + assertThat(secondResult.getS3GroupIdentifier(), equalTo(s3GroupIdentifier)); + assertThat(secondResult.getBuffer(), equalTo(buffer)); + + verify(bufferFactory, times(1)).getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class)); + + final Collection groups = objectUnderTest.getS3GroupEntries(); + assertThat(groups, notNullValue()); + assertThat(groups.size(), equalTo(1)); + + assertThat(groups.contains(result), equalTo(true)); + assertThat(groups.contains(secondResult), equalTo(true)); + assertThat(objectUnderTest.hasNoGroups(), equalTo(false)); + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupTest.java new file mode 100644 index 0000000000..6377a6365a --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupTest.java @@ -0,0 +1,35 @@ +package org.opensearch.dataprepper.plugins.sink.s3.grouping; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer; + +import java.util.Collection; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class S3GroupTest { + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void releasingEventHandles_releases_all_event_handles(final boolean result) { + final S3GroupIdentifier s3GroupIdentifier = mock(S3GroupIdentifier.class); + final Buffer buffer = mock(Buffer.class); + final S3Group objectUnderTest = new S3Group(s3GroupIdentifier, buffer); + final Collection eventHandles = List.of(mock(EventHandle.class), mock(EventHandle.class)); + + for (final EventHandle eventHandle : eventHandles) { + objectUnderTest.addEventHandle(eventHandle); + } + + objectUnderTest.releaseEventHandles(result); + + for (final EventHandle eventHandle : eventHandles) { + verify(eventHandle).release(result); + } + + } +}