Skip to content

Commit

Permalink
Added SQS interactions for S3 source (#1431)
Browse files Browse the repository at this point in the history
* Added sqs configuration and basic sqs interactions

Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
  • Loading branch information
asifsmohammed committed Jun 7, 2022
1 parent be35437 commit 8d1454c
Show file tree
Hide file tree
Showing 12 changed files with 543 additions and 35 deletions.
4 changes: 2 additions & 2 deletions data-prepper-plugins/s3-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ repositories {

dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation 'software.amazon.awssdk:s3:2.17.191'
implementation 'software.amazon.awssdk:sts:2.17.191'
implementation 'software.amazon.awssdk:sqs:2.17.191'
implementation 'com.amazonaws:aws-java-sdk-s3:1.12.220'
testImplementation 'org.hamcrest:hamcrest:2.2'
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sts.StsClient;

public class S3Service {
private static final Logger LOG = LoggerFactory.getLogger(S3Service.class);

private final S3SourceConfig s3SourceConfig;
private final S3Client s3Client;

public S3Service(final S3SourceConfig s3SourceConfig) {
this.s3SourceConfig = s3SourceConfig;
this.s3Client = createS3Client(StsClient.create());
}

S3ObjectReference addS3Object(final S3ObjectReference s3ObjectReference) {
// TODO: should return message id and receipt handle if successfully converted to event
return null;
}

S3Client createS3Client(final StsClient stsClient) {
LOG.info("Creating S3 client");
return S3Client.builder()
.region(Region.of(s3SourceConfig.getAWSAuthenticationOptions().getAwsRegion()))
.credentialsProvider(s3SourceConfig.getAWSAuthenticationOptions().authenticateAwsConfiguration(stsClient))
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(5).build())
.build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,52 +12,38 @@
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.source.Source;
import org.apache.commons.lang3.NotImplementedException;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sts.StsClient;

import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name = "s3", pluginType = Source.class, pluginConfigurationType = S3SourceConfig.class)
public class S3Source implements Source<Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(S3Source.class);

private final PluginMetrics pluginMetrics;
private final S3SourceConfig s3SourceConfig;

private SqsService sqsService;

@DataPrepperPluginConstructor
public S3Source(PluginMetrics pluginMetrics, final S3SourceConfig s3SourceConfig) {
this.pluginMetrics = pluginMetrics;
this.s3SourceConfig = s3SourceConfig;

awsCredentialsProvider = createCredentialsProvider();
s3Client = createS3Client(awsCredentialsProvider);

throw new NotImplementedException();
}

@Override
public void start(Buffer<Record<Event>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}

S3Service s3Service = new S3Service(s3SourceConfig);
sqsService = new SqsService(s3SourceConfig, s3Service);

sqsService.start();
}

@Override
public void stop() {

}

private final AwsCredentialsProvider awsCredentialsProvider;
private final S3Client s3Client;

private AwsCredentialsProvider createCredentialsProvider() {
return Objects.requireNonNull(s3SourceConfig.getAWSAuthentication().authenticateAwsConfiguration(StsClient.create()));
}

private S3Client createS3Client(final AwsCredentialsProvider awsCredentialsProvider) {
return S3Client.builder()
.region(Region.of(s3SourceConfig.getAWSAuthentication().getAwsRegion()))
.credentialsProvider(awsCredentialsProvider)
.build();
sqsService.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

package com.amazon.dataprepper.plugins.source;

import com.amazon.dataprepper.plugins.source.configuration.AwsAuthenticationOptions;
import com.amazon.dataprepper.plugins.source.configuration.CodecOption;
import com.amazon.dataprepper.plugins.source.configuration.CompressionOption;
import com.amazon.dataprepper.plugins.source.configuration.NotificationTypeOption;
import com.amazon.dataprepper.plugins.source.configuration.CompressionOption;
import com.amazon.dataprepper.plugins.source.configuration.CodecOption;
import com.amazon.dataprepper.plugins.source.configuration.SqsOptions;
import com.amazon.dataprepper.plugins.source.configuration.AwsAuthenticationOptions;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

public class S3SourceConfig {
Expand All @@ -27,10 +29,18 @@ public class S3SourceConfig {
@Valid
private CodecOption codec;

@JsonProperty("sqs")
@NotNull
private SqsOptions sqsOptions;

@JsonProperty("aws")
@NotNull
@Valid
private AwsAuthenticationOptions awsAuthentication;
private AwsAuthenticationOptions awsAuthenticationOptions;

@JsonProperty("thread_count")
@Min(0)
private int threadCount;

public NotificationTypeOption getNotificationType() {
return notificationType;
Expand All @@ -44,7 +54,15 @@ public CodecOption getCodec() {
return codec;
}

public AwsAuthenticationOptions getAWSAuthentication() {
return awsAuthentication;
public SqsOptions getSqsOptions() {
return sqsOptions;
}

public AwsAuthenticationOptions getAWSAuthenticationOptions() {
return awsAuthenticationOptions;
}

public int getThreadCount() {
return threadCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sts.StsClient;

public class SqsService {
private static final Logger LOG = LoggerFactory.getLogger(SqsService.class);

private final S3SourceConfig s3SourceConfig;
private final S3Service s3Accessor;
private final SqsClient sqsClient;

private Thread sqsWorkerThread;

public SqsService(final S3SourceConfig s3SourceConfig, final S3Service s3Accessor) {
this.s3SourceConfig = s3SourceConfig;
this.s3Accessor = s3Accessor;
this.sqsClient = createSqsClient(StsClient.create());
}

public void start() {
sqsWorkerThread = new Thread(new SqsWorker(sqsClient, s3Accessor, s3SourceConfig));
sqsWorkerThread.start();
}

SqsClient createSqsClient(final StsClient stsClient) {
LOG.info("Creating SQS client");
return SqsClient.builder()
.region(Region.of(s3SourceConfig.getAWSAuthenticationOptions().getAwsRegion()))
.credentialsProvider(s3SourceConfig.getAWSAuthenticationOptions().authenticateAwsConfiguration(stsClient))
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(5).build())
.build())
.build();
}

public void stop() {
sqsWorkerThread.interrupt();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source;

import com.amazon.dataprepper.plugins.source.configuration.SqsOptions;
import com.amazon.dataprepper.plugins.source.filter.ObjectCreatedFilter;
import com.amazon.dataprepper.plugins.source.filter.S3EventFilter;
import com.amazonaws.services.s3.event.S3EventNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.SqsException;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class SqsWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(SqsWorker.class);

private final S3SourceConfig s3SourceConfig;
private final SqsClient sqsClient;
private final S3Service s3Service;
private final SqsOptions sqsOptions;
private final S3EventFilter objectCreatedFilter;

public SqsWorker(final SqsClient sqsClient, final S3Service s3Service, final S3SourceConfig s3SourceConfig) {
this.s3SourceConfig = s3SourceConfig;
this.sqsClient = sqsClient;
this.s3Service = s3Service;
sqsOptions = s3SourceConfig.getSqsOptions();
objectCreatedFilter = new ObjectCreatedFilter();
}

@Override
public void run() {

while(true) {
// get messages from SQS
List<Message> sqsMessages = getMessagesFromSqs();

// convert each message to S3EventNotificationRecord
Map<Message, S3EventNotification.S3EventNotificationRecord> s3EventNotificationRecords =
getS3MessageEventNotificationRecordMap(sqsMessages);

// build s3ObjectReference from S3EventNotificationRecord if event name starts with ObjectCreated
processS3ObjectAndDeleteSqsMessages(s3EventNotificationRecords);

if (sqsMessages.size() < sqsOptions.getMaximumMessages() && s3SourceConfig.getSqsOptions().getPollDelay().toMillis() > 0) {
try {
Thread.sleep(s3SourceConfig.getSqsOptions().getPollDelay().toMillis());
} catch (InterruptedException e) {
LOG.error("Thread is interrupted while polling SQS.", e);
}
}
}
}

List<Message> getMessagesFromSqs() {
List<Message> messages = new ArrayList<>();
try {
ReceiveMessageRequest receiveMessageRequest = createReceiveMessageRequest();
messages = sqsClient.receiveMessage(receiveMessageRequest).messages();
} catch (SqsException e) {
LOG.error("Error reading from SQS: {}", e.awsErrorDetails().errorMessage());
}
return messages;
}

ReceiveMessageRequest createReceiveMessageRequest() {
return ReceiveMessageRequest.builder()
.queueUrl(sqsOptions.getSqsUrl())
.maxNumberOfMessages(sqsOptions.getMaximumMessages())
.visibilityTimeout((int) sqsOptions.getVisibilityTimeout().getSeconds())
.waitTimeSeconds((int) sqsOptions.getWaitTime().getSeconds())
.build();
}

private Map<Message, S3EventNotification.S3EventNotificationRecord> getS3MessageEventNotificationRecordMap(final List<Message> sqsMessages) {
return sqsMessages.stream().collect(Collectors.toMap(message -> message, this::convertS3EventMessages));
}

S3EventNotification.S3EventNotificationRecord convertS3EventMessages(final Message message) {
return S3EventNotification.parseJson(message.body()).getRecords().get(0);
}

private void processS3ObjectAndDeleteSqsMessages(final Map<Message, S3EventNotification.S3EventNotificationRecord> s3EventNotificationRecords) {
for (Map.Entry<Message, S3EventNotification.S3EventNotificationRecord> entry: s3EventNotificationRecords.entrySet()) {
if(isEventNameCreated(entry.getValue())) {
S3ObjectReference s3ObjectReference = populateS3Reference(entry.getValue());
s3Service.addS3Object(s3ObjectReference);
// TODO: delete sqsMessages which are successfully processed
}
}
}

boolean isEventNameCreated(final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord) {
Optional<S3EventNotification.S3EventNotificationRecord> filter = objectCreatedFilter.filter(s3EventNotificationRecord);
return filter.isPresent();
}

S3ObjectReference populateS3Reference(final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord) {
return S3ObjectReference.fromBucketAndKey(s3EventNotificationRecord.getS3().getBucket().getName(),
s3EventNotificationRecord.getS3().getObject().getKey());
}
}
Loading

0 comments on commit 8d1454c

Please sign in to comment.