Skip to content

Commit

Permalink
Track event handles in S3Group, add integration test
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 committed Apr 1, 2024
1 parent 3a6ad06 commit 8daaf43
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class S3ObjectIndexUtility {
// For a string like "data-prepper-%{yyyy-MM}", "yyyy-MM" is matched.
private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\%\\{(.*?)\\}";

private static final Pattern DATE_TIME_PATTERN = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION);

private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID());

S3ObjectIndexUtility() {
Expand Down Expand Up @@ -76,8 +78,7 @@ public static long getTimeNanos() {
* @return returns date time formatter
*/
public static DateTimeFormatter validateAndGetDateTimeFormatter(final String indexAlias) {
final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION);
final Matcher timePatternMatcher = pattern.matcher(indexAlias);
final Matcher timePatternMatcher = DATE_TIME_PATTERN.matcher(indexAlias);
if (timePatternMatcher.find()) {
final String timePattern = timePatternMatcher.group(1);
if (timePatternMatcher.find()) { // check if there is a one more match.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,25 @@ public void validate(int expectedRecords, final List<Map<String, Object>> sample
assertThat(sampledData, equalTo(sampleEventData.size()));
}

public void validateDynamicPartition(int expectedRecords, int partitionNumber, final File actualContentFile, final CompressionScenario compressionScenario) throws IOException {
final InputStream inputStream = new BufferedInputStream(new FileInputStream(actualContentFile), 64 * 1024);

final Scanner scanner = new Scanner(inputStream);

int count = 0;
while (scanner.hasNext()) {

final String actualJsonString = scanner.next();

final Map<String, Object> actualData = OBJECT_MAPPER.readValue(actualJsonString, Map.class);
assertThat(actualData.get("sequence"), equalTo(partitionNumber));

count++;
}

assertThat(count, equalTo(expectedRecords));
}

@Override
public String toString() {
return "NDJSON";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -95,12 +97,12 @@ public class S3SinkIT {
@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;

@Mock
@Mock(stubOnly = true)
private ThresholdOptions thresholdOptions;
@Mock
@Mock(stubOnly = true)
private ObjectKeyOptions objectKeyOptions;

@Mock
@Mock(stubOnly = true)
private ExpressionEvaluator expressionEvaluator;
private String s3region;
private String bucketName;
Expand All @@ -122,6 +124,8 @@ static void setUpAll() {
for (int i = 0; i < totalRandomStrings; i++) {
reusableRandomStrings.add(UUID.randomUUID().toString());
}


}

@BeforeEach
Expand Down Expand Up @@ -187,6 +191,11 @@ void test(final OutputScenario outputScenario,
int expectedTotalSize = sizeCombination.getTotalSize();
when(thresholdOptions.getEventCount()).thenReturn(expectedTotalSize);

when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(anyString()))
.thenReturn(Collections.emptyList());
when(expressionEvaluator.extractDynamicKeysFromFormatExpression(anyString()))
.thenReturn(Collections.emptyList());

final S3Sink objectUnderTest = createObjectUnderTest();

final int maxEventDataToSample = 2000;
Expand Down Expand Up @@ -235,6 +244,88 @@ void test(final OutputScenario outputScenario,
outputScenario.validate(expectedTotalSize, sampleEventData, actualContentFile, compressionScenario);
}

@Test
void testWithDynamicGroups() throws IOException {
final BufferScenario bufferScenario = new InMemoryBufferScenario();
final CompressionScenario compressionScenario = new NoneCompressionScenario();
final NdjsonOutputScenario outputScenario = new NdjsonOutputScenario();
final SizeCombination sizeCombination = SizeCombination.MEDIUM_SMALLER;

BufferTypeOptions bufferTypeOptions = bufferScenario.getBufferType();
String testRun = "grouping-" + outputScenario + "-" + bufferTypeOptions + "-" + compressionScenario + "-" + sizeCombination.getBatchSize() + "-" + sizeCombination.getNumberOfBatches();

final String staticPrefix = "s3-sink-grouping-integration-test/" + UUID.randomUUID() + "/";
final String pathPrefix = staticPrefix + "folder-${/sequence}/";
final List<String> dynamicKeys = new ArrayList<>();
dynamicKeys.add("/sequence");

when(objectKeyOptions.getPathPrefix()).thenReturn(pathPrefix);

when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(objectKeyOptions.getPathPrefix()))
.thenReturn(Collections.emptyList());
when(expressionEvaluator.extractDynamicKeysFromFormatExpression(objectKeyOptions.getPathPrefix()))
.thenReturn(dynamicKeys);
when(expressionEvaluator.extractDynamicKeysFromFormatExpression(objectKeyOptions.getNamePattern()))
.thenReturn(Collections.emptyList());
when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(objectKeyOptions.getNamePattern()))
.thenReturn(Collections.emptyList());

when(pluginFactory.loadPlugin(eq(OutputCodec.class), any())).thenReturn(outputScenario.getCodec());
when(s3SinkConfig.getBufferType()).thenReturn(bufferTypeOptions);
when(s3SinkConfig.getCompression()).thenReturn(compressionScenario.getCompressionOption());
int expectedTotalSize = sizeCombination.getTotalSize();
when(thresholdOptions.getEventCount()).thenReturn(expectedTotalSize / sizeCombination.getBatchSize());

final S3Sink objectUnderTest = createObjectUnderTest();

final int maxEventDataToSample = 2000;
final List<Map<String, Object>> sampleEventData = new ArrayList<>(maxEventDataToSample);
for (int batchNumber = 0; batchNumber < sizeCombination.getNumberOfBatches(); batchNumber++) {
final int currentBatchNumber = batchNumber;
final List<Record<Event>> events = IntStream.range(0, sizeCombination.getBatchSize())
.mapToObj(this::generateEventData)
.peek(data -> {
if (sampleEventData.size() < maxEventDataToSample)
sampleEventData.add(data);
})
.map(this::generateTestEvent)
.map(Record::new)
.collect(Collectors.toList());

LOG.debug("Writing dynamic batch {} with size {}.", currentBatchNumber, events.size());
objectUnderTest.doOutput(events);
}

for (int folderNumber = 0; folderNumber < 100; folderNumber++) {
LOG.info("Listing S3 path prefix: {}", staticPrefix + "folder-" + folderNumber + "/");

final ListObjectsV2Response listObjectsResponse = s3Client.listObjectsV2(ListObjectsV2Request.builder()
.bucket(bucketName)
.prefix(staticPrefix + "folder-" + folderNumber + "/")
.build());

assertThat(listObjectsResponse.contents(), notNullValue());
assertThat(listObjectsResponse.contents().size(), equalTo(1));

final S3Object s3Object = listObjectsResponse.contents().get(0);

final File target = new File(s3FileLocation, "folder-" + folderNumber + ".original");

LOG.info("Downloading S3 object to local file {}.", target);

GetObjectRequest getObjectRequest = GetObjectRequest.builder()
.bucket(bucketName)
.key(s3Object.key())
.build();
s3Client.getObject(getObjectRequest, target.toPath());

File actualContentFile = decompressFileIfNecessary(outputScenario, compressionScenario, testRun, target);

LOG.info("Validating output. totalSize={}; sampleDataSize={}", expectedTotalSize, sampleEventData.size());
outputScenario.validateDynamicPartition(expectedTotalSize / sizeCombination.getBatchSize(), folderNumber, actualContentFile, compressionScenario);
}
}

private File decompressFileIfNecessary(OutputScenario outputScenario, CompressionScenario compressionScenario, String pathPrefix, File target) throws IOException {

if (outputScenario.isCompressionInternal() || !compressionScenario.requiresDecompression())
Expand All @@ -252,7 +343,6 @@ private Event generateTestEvent(final Map<String, Object> eventData) {
final EventMetadata defaultEventMetadata = DefaultEventMetadata.builder()
.withEventType(EventType.LOG.toString())
.build();
final JacksonEvent event = JacksonLog.builder().withData(eventData).withEventMetadata(defaultEventMetadata).build();
return JacksonEvent.builder()
.withData(eventData)
.withEventMetadata(defaultEventMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions;
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupIdentifierFactory;
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupManager;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.regions.Region;
Expand Down Expand Up @@ -92,6 +93,7 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;

Expand All @@ -109,7 +111,7 @@ class S3SinkServiceIT {
private static final String FILE_SUFFIX = ".parquet";
@Mock
private S3SinkConfig s3SinkConfig;
@Mock

private S3GroupManager s3GroupManager;

@Mock
Expand Down Expand Up @@ -165,6 +167,11 @@ public void setUp() {
lenient().when(pluginMetrics.counter(S3SinkService.NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED)).
thenReturn(numberOfRecordsFailedCounter);
lenient().when(pluginMetrics.summary(S3SinkService.S3_OBJECTS_SIZE)).thenReturn(s3ObjectSizeSummary);

when(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(anyString()))
.thenReturn(Collections.emptyList());
when(expressionEvaluator.extractDynamicKeysFromFormatExpression(anyString()))
.thenReturn(Collections.emptyList());
}

@Test
Expand Down Expand Up @@ -248,6 +255,9 @@ void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOEx

private S3SinkService createObjectUnderTest() {
OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList());
final S3GroupIdentifierFactory groupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig);
s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, s3Client);

return new S3SinkService(s3SinkConfig, codec, codecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics, s3GroupManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.types.ByteCount;
Expand All @@ -27,7 +26,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -46,7 +44,6 @@ public class S3SinkService {
static final String S3_OBJECTS_SIZE = "s3SinkObjectSizeBytes";
private final S3SinkConfig s3SinkConfig;
private final Lock reentrantLock;
private final Collection<EventHandle> bufferedEventHandles;
private final OutputCodec codec;
private final S3Client s3Client;
private final int maxEvents;
Expand Down Expand Up @@ -82,8 +79,6 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final OutputCodec codec,
this.retrySleepTime = retrySleepTime;
reentrantLock = new ReentrantLock();

bufferedEventHandles = new LinkedList<>();

maxEvents = s3SinkConfig.getThresholdOptions().getEventCount();
maxBytes = s3SinkConfig.getThresholdOptions().getMaximumSize();
maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut();
Expand Down Expand Up @@ -126,8 +121,7 @@ void output(Collection<Record<Event>> records) {
codec.writeEvent(event, currentBuffer.getOutputStream());
int count = currentBuffer.getEventCount() + 1;
currentBuffer.setEventCount(count);

bufferedEventHandles.add(event.getEventHandle());
s3Group.addEventHandle(event.getEventHandle());
} catch (Exception ex) {
if(sampleException == null) {
sampleException = ex;
Expand All @@ -136,15 +130,15 @@ void output(Collection<Record<Event>> records) {
failedEvents.add(event);
}

final boolean flushed = flushToS3IfNeeded(currentBuffer);
final boolean flushed = flushToS3IfNeeded(s3Group);

if (flushed) {
s3GroupManager.removeGroup(s3Group);
}
}

for (final S3Group s3Group : s3GroupManager.getS3GroupEntries()) {
final boolean flushed = flushToS3IfNeeded(s3Group.getBuffer());
final boolean flushed = flushToS3IfNeeded(s3Group);

if (flushed) {
s3GroupManager.removeGroup(s3Group);
Expand All @@ -163,38 +157,30 @@ void output(Collection<Record<Event>> records) {
}
}

private void releaseEventHandles(final boolean result) {
for (EventHandle eventHandle : bufferedEventHandles) {
eventHandle.release(result);
}

bufferedEventHandles.clear();
}

/**
* @return whether the flush was attempted
*/
private boolean flushToS3IfNeeded(final Buffer currentBuffer) {
private boolean flushToS3IfNeeded(final S3Group s3Group) {
LOG.trace("Flush to S3 check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}",
currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration());
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
s3Group.getBuffer().getSize(), s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getDuration());
if (ThresholdCheck.checkThresholdExceed(s3Group.getBuffer(), maxEvents, maxBytes, maxCollectionDuration)) {
try {
codec.complete(currentBuffer.getOutputStream());
String s3Key = currentBuffer.getKey();
codec.complete(s3Group.getBuffer().getOutputStream());
String s3Key = s3Group.getBuffer().getKey();
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
s3Key, currentBuffer.getEventCount(), currentBuffer.getSize());
final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key);
s3Key, s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getSize());
final boolean isFlushToS3 = retryFlushToS3(s3Group.getBuffer(), s3Key);
if (isFlushToS3) {
LOG.info("Successfully saved {} to S3.", s3Key);
numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount());
numberOfRecordsSuccessCounter.increment(s3Group.getBuffer().getEventCount());
objectsSucceededCounter.increment();
s3ObjectSizeSummary.record(currentBuffer.getSize());
releaseEventHandles(true);
s3ObjectSizeSummary.record(s3Group.getBuffer().getSize());
s3Group.releaseEventHandles(true);
} else {
LOG.error("Failed to save {} to S3.", s3Key);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
numberOfRecordsFailedCounter.increment(s3Group.getBuffer().getEventCount());
objectsFailedCounter.increment();
releaseEventHandles(false);
s3Group.releaseEventHandles(false);
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
*/
public class InMemoryBuffer implements Buffer {

private static final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
private static final ByteArrayPositionOutputStream byteArrayPositionOutputStream = new ByteArrayPositionOutputStream(byteArrayOutputStream);
private final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
private final ByteArrayPositionOutputStream byteArrayPositionOutputStream = new ByteArrayPositionOutputStream(byteArrayOutputStream);
private final S3Client s3Client;
private final Supplier<String> bucketSupplier;
private final Supplier<String> keySupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,4 @@ public static String objectFileName(final S3SinkConfig s3SinkConfig,
(codecExtension!=null? codecExtension : DEFAULT_CODEC_FILE_EXTENSION);
}
}

public static String objectFileNameWithoutDateTimeAndCodecInjection(final S3SinkConfig s3SinkConfig,
final Event event,
final ExpressionEvaluator expressionEvaluator) {
String configNamePattern = s3SinkConfig.getObjectKeyOptions().getNamePattern();
return event.formatString(configNamePattern, expressionEvaluator);
}
}
Loading

0 comments on commit 8daaf43

Please sign in to comment.