Skip to content

Commit

Permalink
Track event handles in S3Group, add integration test
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 committed Mar 29, 2024
1 parent 3a6ad06 commit 9b506d9
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class S3ObjectIndexUtility {
// For a string like "data-prepper-%{yyyy-MM}", "yyyy-MM" is matched.
private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\%\\{(.*?)\\}";

private static final Pattern DATE_TIME_PATTERN = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION);

private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID());

S3ObjectIndexUtility() {
Expand Down Expand Up @@ -76,8 +78,7 @@ public static long getTimeNanos() {
* @return returns date time formatter
*/
public static DateTimeFormatter validateAndGetDateTimeFormatter(final String indexAlias) {
final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION);
final Matcher timePatternMatcher = pattern.matcher(indexAlias);
final Matcher timePatternMatcher = DATE_TIME_PATTERN.matcher(indexAlias);
if (timePatternMatcher.find()) {
final String timePattern = timePatternMatcher.group(1);
if (timePatternMatcher.find()) { // check if there is a one more match.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,25 @@ public void validate(int expectedRecords, final List<Map<String, Object>> sample
assertThat(sampledData, equalTo(sampleEventData.size()));
}

public void validateDynamicPartition(int expectedRecords, int partitionNumber, final File actualContentFile, final CompressionScenario compressionScenario) throws IOException {
final InputStream inputStream = new BufferedInputStream(new FileInputStream(actualContentFile), 64 * 1024);

final Scanner scanner = new Scanner(inputStream);

int count = 0;
while (scanner.hasNext()) {

final String actualJsonString = scanner.next();

final Map<String, Object> actualData = OBJECT_MAPPER.readValue(actualJsonString, Map.class);
assertThat(actualData.get("sequence"), equalTo(partitionNumber));

count++;
}

assertThat(count, equalTo(expectedRecords));
}

@Override
public String toString() {
return "NDJSON";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -187,6 +189,11 @@ void test(final OutputScenario outputScenario,
int expectedTotalSize = sizeCombination.getTotalSize();
when(thresholdOptions.getEventCount()).thenReturn(expectedTotalSize);

when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(anyString()))
.thenReturn(Collections.emptyList());
when(expressionEvaluator.extractDynamicKeysFromFormatExpression(anyString()))
.thenReturn(Collections.emptyList());

final S3Sink objectUnderTest = createObjectUnderTest();

final int maxEventDataToSample = 2000;
Expand Down Expand Up @@ -235,6 +242,86 @@ void test(final OutputScenario outputScenario,
outputScenario.validate(expectedTotalSize, sampleEventData, actualContentFile, compressionScenario);
}

@Test
@Disabled
void testWithDynamicGroups() throws IOException {
final BufferScenario bufferScenario = new InMemoryBufferScenario();
final CompressionScenario compressionScenario = new NoneCompressionScenario();
final NdjsonOutputScenario outputScenario = new NdjsonOutputScenario();
final SizeCombination sizeCombination = SizeCombination.MEDIUM_SMALLER;

BufferTypeOptions bufferTypeOptions = bufferScenario.getBufferType();
String testRun = "grouping-" + outputScenario + "-" + bufferTypeOptions + "-" + compressionScenario + "-" + sizeCombination.getBatchSize() + "-" + sizeCombination.getNumberOfBatches();

final String pathPrefix = "folder-${/sequence}";
final List<String> dynamicKeys = new ArrayList<>();
dynamicKeys.add("/sequence");

when(objectKeyOptions.getPathPrefix()).thenReturn(pathPrefix + "/");

when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(pathPrefix))
.thenReturn(Collections.emptyList());
when(expressionEvaluator.extractDynamicKeysFromFormatExpression(pathPrefix))
.thenReturn(dynamicKeys);
when(expressionEvaluator.extractDynamicKeysFromFormatExpression(objectKeyOptions.getNamePattern()))
.thenReturn(Collections.emptyList());
when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(objectKeyOptions.getNamePattern()))
.thenReturn(Collections.emptyList());

when(pluginFactory.loadPlugin(eq(OutputCodec.class), any())).thenReturn(outputScenario.getCodec());
when(s3SinkConfig.getBufferType()).thenReturn(bufferTypeOptions);
when(s3SinkConfig.getCompression()).thenReturn(compressionScenario.getCompressionOption());
int expectedTotalSize = sizeCombination.getTotalSize();
when(thresholdOptions.getEventCount()).thenReturn(expectedTotalSize / sizeCombination.getBatchSize());

final S3Sink objectUnderTest = createObjectUnderTest();

final int maxEventDataToSample = 2000;
final List<Map<String, Object>> sampleEventData = new ArrayList<>(maxEventDataToSample);
for (int batchNumber = 0; batchNumber < sizeCombination.getNumberOfBatches(); batchNumber++) {
final int currentBatchNumber = batchNumber;
final List<Record<Event>> events = IntStream.range(0, sizeCombination.getBatchSize())
.mapToObj(sequence -> generateEventData((currentBatchNumber + 1) * (sequence + 1)))
.peek(data -> {
if (sampleEventData.size() < maxEventDataToSample)
sampleEventData.add(data);
})
.map(this::generateTestEvent)
.map(Record::new)
.collect(Collectors.toList());

LOG.debug("Writing dynamic batch {} with size {}.", currentBatchNumber, events.size());
objectUnderTest.doOutput(events);
}

LOG.info("Listing S3 path prefix: {}", pathPrefix);

final ListObjectsV2Response listObjectsResponse = s3Client.listObjectsV2(ListObjectsV2Request.builder()
.bucket(bucketName)
.prefix("folder-0" + "/")
.build());

assertThat(listObjectsResponse.contents(), notNullValue());
assertThat(listObjectsResponse.contents().size(), equalTo(1));

final S3Object s3Object = listObjectsResponse.contents().get(0);

final File target = new File(s3FileLocation, testRun + ".original");

LOG.info("Downloading S3 object to local file {}.", target);

GetObjectRequest getObjectRequest = GetObjectRequest.builder()
.bucket(bucketName)
.key(s3Object.key())
.build();
s3Client.getObject(getObjectRequest, target.toPath());

File actualContentFile = decompressFileIfNecessary(outputScenario, compressionScenario, testRun, target);

LOG.info("Validating output. totalSize={}; sampleDataSize={}", expectedTotalSize, sampleEventData.size());
outputScenario.validateDynamicPartition(expectedTotalSize, 0, actualContentFile, compressionScenario);
}

private File decompressFileIfNecessary(OutputScenario outputScenario, CompressionScenario compressionScenario, String pathPrefix, File target) throws IOException {

if (outputScenario.isCompressionInternal() || !compressionScenario.requiresDecompression())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.types.ByteCount;
Expand All @@ -27,7 +26,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -46,7 +44,6 @@ public class S3SinkService {
static final String S3_OBJECTS_SIZE = "s3SinkObjectSizeBytes";
private final S3SinkConfig s3SinkConfig;
private final Lock reentrantLock;
private final Collection<EventHandle> bufferedEventHandles;
private final OutputCodec codec;
private final S3Client s3Client;
private final int maxEvents;
Expand Down Expand Up @@ -82,8 +79,6 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final OutputCodec codec,
this.retrySleepTime = retrySleepTime;
reentrantLock = new ReentrantLock();

bufferedEventHandles = new LinkedList<>();

maxEvents = s3SinkConfig.getThresholdOptions().getEventCount();
maxBytes = s3SinkConfig.getThresholdOptions().getMaximumSize();
maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut();
Expand Down Expand Up @@ -126,8 +121,7 @@ void output(Collection<Record<Event>> records) {
codec.writeEvent(event, currentBuffer.getOutputStream());
int count = currentBuffer.getEventCount() + 1;
currentBuffer.setEventCount(count);

bufferedEventHandles.add(event.getEventHandle());
s3Group.addEventHandle(event.getEventHandle());
} catch (Exception ex) {
if(sampleException == null) {
sampleException = ex;
Expand All @@ -136,15 +130,15 @@ void output(Collection<Record<Event>> records) {
failedEvents.add(event);
}

final boolean flushed = flushToS3IfNeeded(currentBuffer);
final boolean flushed = flushToS3IfNeeded(s3Group);

if (flushed) {
s3GroupManager.removeGroup(s3Group);
}
}

for (final S3Group s3Group : s3GroupManager.getS3GroupEntries()) {
final boolean flushed = flushToS3IfNeeded(s3Group.getBuffer());
final boolean flushed = flushToS3IfNeeded(s3Group);

if (flushed) {
s3GroupManager.removeGroup(s3Group);
Expand All @@ -163,38 +157,30 @@ void output(Collection<Record<Event>> records) {
}
}

private void releaseEventHandles(final boolean result) {
for (EventHandle eventHandle : bufferedEventHandles) {
eventHandle.release(result);
}

bufferedEventHandles.clear();
}

/**
* @return whether the flush was attempted
*/
private boolean flushToS3IfNeeded(final Buffer currentBuffer) {
private boolean flushToS3IfNeeded(final S3Group s3Group) {
LOG.trace("Flush to S3 check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}",
currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration());
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
s3Group.getBuffer().getSize(), s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getDuration());
if (ThresholdCheck.checkThresholdExceed(s3Group.getBuffer(), maxEvents, maxBytes, maxCollectionDuration)) {
try {
codec.complete(currentBuffer.getOutputStream());
String s3Key = currentBuffer.getKey();
codec.complete(s3Group.getBuffer().getOutputStream());
String s3Key = s3Group.getBuffer().getKey();
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
s3Key, currentBuffer.getEventCount(), currentBuffer.getSize());
final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key);
s3Key, s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getSize());
final boolean isFlushToS3 = retryFlushToS3(s3Group.getBuffer(), s3Key);
if (isFlushToS3) {
LOG.info("Successfully saved {} to S3.", s3Key);
numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount());
numberOfRecordsSuccessCounter.increment(s3Group.getBuffer().getEventCount());
objectsSucceededCounter.increment();
s3ObjectSizeSummary.record(currentBuffer.getSize());
releaseEventHandles(true);
s3ObjectSizeSummary.record(s3Group.getBuffer().getSize());
s3Group.releaseEventHandles(true);
} else {
LOG.error("Failed to save {} to S3.", s3Key);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
numberOfRecordsFailedCounter.increment(s3Group.getBuffer().getEventCount());
objectsFailedCounter.increment();
releaseEventHandles(false);
s3Group.releaseEventHandles(false);
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,4 @@ public static String objectFileName(final S3SinkConfig s3SinkConfig,
(codecExtension!=null? codecExtension : DEFAULT_CODEC_FILE_EXTENSION);
}
}

public static String objectFileNameWithoutDateTimeAndCodecInjection(final S3SinkConfig s3SinkConfig,
final Event event,
final ExpressionEvaluator expressionEvaluator) {
String configNamePattern = s3SinkConfig.getObjectKeyOptions().getNamePattern();
return event.formatString(configNamePattern, expressionEvaluator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,46 @@

package org.opensearch.dataprepper.plugins.sink.s3.grouping;

import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer;

import java.util.Collection;
import java.util.LinkedList;

public class S3Group {

private final Buffer buffer;

private final S3GroupIdentifier s3GroupIdentifier;

private final Collection<EventHandle> groupEventHandles;

public S3Group(final S3GroupIdentifier s3GroupIdentifier,
final Buffer buffer) {
this.buffer = buffer;
this.s3GroupIdentifier = s3GroupIdentifier;
this.groupEventHandles = new LinkedList<>();
}

public Buffer getBuffer() {
return buffer;
}

S3GroupIdentifier getS3GroupIdentifier() { return s3GroupIdentifier; }

public void addEventHandle(final EventHandle eventHandle) {
groupEventHandles.add(eventHandle);
}

public void releaseEventHandles(final boolean result) {
for (EventHandle eventHandle : groupEventHandles) {
eventHandle.release(result);
}

groupEventHandles.clear();
}

Collection<EventHandle> getGroupEventHandles() {
return groupEventHandles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@

import com.google.common.collect.Maps;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.Log;
import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.Collection;
import java.util.Map;

public class S3GroupManager {

private static final Logger LOG = LoggerFactory.getLogger(S3GroupManager.class);

private final Map<S3GroupIdentifier, S3Group> allGroups = Maps.newConcurrentMap();
private final S3SinkConfig s3SinkConfig;
private final S3GroupIdentifierFactory s3GroupIdentifierFactory;
Expand Down Expand Up @@ -51,11 +56,13 @@ public S3Group getOrCreateGroupForEvent(final Event event) {
final S3GroupIdentifier s3GroupIdentifier = s3GroupIdentifierFactory.getS3GroupIdentifierForEvent(event);

if (allGroups.containsKey(s3GroupIdentifier)) {
LOG.debug("An S3 group already exists for the Event. Total number of groups: {}", allGroups.size());
return allGroups.get(s3GroupIdentifier);
} else {
final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3SinkConfig::getBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey);
final S3Group s3Group = new S3Group(s3GroupIdentifier, bufferForNewGroup);
allGroups.put(s3GroupIdentifier, s3Group);
LOG.debug("Created a new S3 group. Total number of groups: {}", allGroups.size());
return s3Group;
}
}
Expand Down
Loading

0 comments on commit 9b506d9

Please sign in to comment.