Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SQS interactions for S3 source #1431

Merged
merged 11 commits into from
Jun 7, 2022
4 changes: 2 additions & 2 deletions data-prepper-plugins/s3-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ repositories {

dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation 'software.amazon.awssdk:s3:2.17.191'
implementation 'software.amazon.awssdk:sts:2.17.191'
implementation 'software.amazon.awssdk:sqs:2.17.191'
implementation 'com.amazonaws:aws-java-sdk-s3:1.12.220'
testImplementation 'org.hamcrest:hamcrest:2.2'
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sts.StsClient;

public class S3Service {
private static final Logger LOG = LoggerFactory.getLogger(S3Service.class);

private final S3SourceConfig s3SourceConfig;
private final S3Client s3Client;

public S3Service(final S3SourceConfig s3SourceConfig) {
this.s3SourceConfig = s3SourceConfig;
this.s3Client = createS3Client(StsClient.create());
}

S3ObjectReference addS3Object(final S3ObjectReference s3ObjectReference) {
// TODO: should return message id and receipt handle if successfully converted to event
return null;
Comment on lines +27 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few concerns over this method and subsequent workflow. I have some ideas for improvements but I want to understand a few things first.

What are we adding an S3Object to? I find this method name confusing and was really confused by the usage in the run() method.

Why are the parameters and return object the same object (S3ObjectReference)? Are we extending S3ObjectReference and expecting this class to mutate the variable with a messageId and Receipt? (I want to see if we can avoid mutation and nullable variables)

How is the receipt used by the calling function? Do we need it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the parameters and return object the same object (S3ObjectReference)? Are we extending S3ObjectReference and expecting this class to mutate the variable with a messageId and Receipt? (I want to see if we can avoid mutation and nullable variables)

@asifsmohammed and I discussed a couple approaches here:

  1. We can make this templated and add something like T getNotificationMetadata() which in this case returns SQS metadata. It could be extended for SNS metadata if that were ever added.
  2. We could extend this class with a sub-class.

In either case, the SQS metadata would include the ReceiptHandle and MessageId.

I'm fine with either approach. But, the inheritance approach would have the benefit of completely abstracting this metadata from the S3 code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial thought was to add ReceiptHandle and MessageId to S3ObjectReference which will be used to get S3 Object.
If S3 object is successfully processed we return the same S3ObjectReference to delete message from queue.

I'll create a subclass which contains ReceiptHandle and MessageId.

}

S3Client createS3Client(final StsClient stsClient) {
LOG.info("Creating S3 client");
return S3Client.builder()
.region(Region.of(s3SourceConfig.getAWSAuthenticationOptions().getAwsRegion()))
.credentialsProvider(s3SourceConfig.getAWSAuthenticationOptions().authenticateAwsConfiguration(stsClient))
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(5).build())
.build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,52 +12,38 @@
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.source.Source;
import org.apache.commons.lang3.NotImplementedException;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sts.StsClient;

import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name = "s3", pluginType = Source.class, pluginConfigurationType = S3SourceConfig.class)
public class S3Source implements Source<Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(S3Source.class);

private final PluginMetrics pluginMetrics;
private final S3SourceConfig s3SourceConfig;

private SqsService sqsService;

@DataPrepperPluginConstructor
public S3Source(PluginMetrics pluginMetrics, final S3SourceConfig s3SourceConfig) {
this.pluginMetrics = pluginMetrics;
this.s3SourceConfig = s3SourceConfig;

awsCredentialsProvider = createCredentialsProvider();
s3Client = createS3Client(awsCredentialsProvider);

throw new NotImplementedException();
}

@Override
public void start(Buffer<Record<Event>> buffer) {
if (buffer == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer is never used the by the S3Service or the SqsService. Who is going to be responsible for putting the item in the buffer (S3Service or SqsService)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The S3ObjectWorker handles this:

Also, @asifsmohammed and I are coordinating on the work here. He created this PR before the S3ObjectWorker was present in main, so some portions like this are currently unused in his branch and work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the logic with buffer here

throw new IllegalStateException("Buffer provided is null");
}

S3Service s3Service = new S3Service(s3SourceConfig);
sqsService = new SqsService(s3SourceConfig, s3Service);

sqsService.start();
}

@Override
public void stop() {

}

private final AwsCredentialsProvider awsCredentialsProvider;
private final S3Client s3Client;

private AwsCredentialsProvider createCredentialsProvider() {
return Objects.requireNonNull(s3SourceConfig.getAWSAuthentication().authenticateAwsConfiguration(StsClient.create()));
}

private S3Client createS3Client(final AwsCredentialsProvider awsCredentialsProvider) {
return S3Client.builder()
.region(Region.of(s3SourceConfig.getAWSAuthentication().getAwsRegion()))
.credentialsProvider(awsCredentialsProvider)
.build();
sqsService.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

package com.amazon.dataprepper.plugins.source;

import com.amazon.dataprepper.plugins.source.configuration.AwsAuthenticationOptions;
import com.amazon.dataprepper.plugins.source.configuration.CodecOption;
import com.amazon.dataprepper.plugins.source.configuration.CompressionOption;
import com.amazon.dataprepper.plugins.source.configuration.NotificationTypeOption;
import com.amazon.dataprepper.plugins.source.configuration.CompressionOption;
import com.amazon.dataprepper.plugins.source.configuration.CodecOption;
import com.amazon.dataprepper.plugins.source.configuration.SqsOptions;
import com.amazon.dataprepper.plugins.source.configuration.AwsAuthenticationOptions;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

public class S3SourceConfig {
Expand All @@ -27,10 +29,18 @@ public class S3SourceConfig {
@Valid
private CodecOption codec;

@JsonProperty("sqs")
@NotNull
private SqsOptions sqsOptions;

@JsonProperty("aws")
@NotNull
@Valid
private AwsAuthenticationOptions awsAuthentication;
private AwsAuthenticationOptions awsAuthenticationOptions;

@JsonProperty("thread_count")
@Min(0)
private int threadCount;

public NotificationTypeOption getNotificationType() {
return notificationType;
Expand All @@ -44,7 +54,15 @@ public CodecOption getCodec() {
return codec;
}

public AwsAuthenticationOptions getAWSAuthentication() {
return awsAuthentication;
public SqsOptions getSqsOptions() {
return sqsOptions;
}

public AwsAuthenticationOptions getAWSAuthenticationOptions() {
return awsAuthenticationOptions;
}

public int getThreadCount() {
return threadCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sts.StsClient;

public class SqsService {
private static final Logger LOG = LoggerFactory.getLogger(SqsService.class);

private final S3SourceConfig s3SourceConfig;
private final S3Service s3Accessor;
private final SqsClient sqsClient;

private Thread sqsWorkerThread;

public SqsService(final S3SourceConfig s3SourceConfig, final S3Service s3Accessor) {
this.s3SourceConfig = s3SourceConfig;
this.s3Accessor = s3Accessor;
this.sqsClient = createSqsClient(StsClient.create());
}

public void start() {
sqsWorkerThread = new Thread(new SqsWorker(sqsClient, s3Accessor, s3SourceConfig));
sqsWorkerThread.start();
}

SqsClient createSqsClient(final StsClient stsClient) {
LOG.info("Creating SQS client");
return SqsClient.builder()
.region(Region.of(s3SourceConfig.getAWSAuthenticationOptions().getAwsRegion()))
.credentialsProvider(s3SourceConfig.getAWSAuthenticationOptions().authenticateAwsConfiguration(stsClient))
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(5).build())
.build())
.build();
}

public void stop() {
sqsWorkerThread.interrupt();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source;

import com.amazon.dataprepper.plugins.source.configuration.SqsOptions;
import com.amazon.dataprepper.plugins.source.filter.ObjectCreatedFilter;
import com.amazon.dataprepper.plugins.source.filter.S3EventFilter;
import com.amazonaws.services.s3.event.S3EventNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.SqsException;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class SqsWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(SqsWorker.class);

private final S3SourceConfig s3SourceConfig;
private final SqsClient sqsClient;
private final S3Service s3Service;
private final SqsOptions sqsOptions;
private final S3EventFilter objectCreatedFilter;

public SqsWorker(final SqsClient sqsClient, final S3Service s3Service, final S3SourceConfig s3SourceConfig) {
this.s3SourceConfig = s3SourceConfig;
this.sqsClient = sqsClient;
this.s3Service = s3Service;
sqsOptions = s3SourceConfig.getSqsOptions();
objectCreatedFilter = new ObjectCreatedFilter();
}

@Override
public void run() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is getting very large in this partial state. I would encourage you to break it apart to improve maintainability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also make it easier to unit test and can remove variable reassignment/mutation in the code below.


while(true) {
// get messages from SQS
List<Message> sqsMessages = getMessagesFromSqs();

// convert each message to S3EventNotificationRecord
Map<Message, S3EventNotification.S3EventNotificationRecord> s3EventNotificationRecords =
getS3MessageEventNotificationRecordMap(sqsMessages);

// build s3ObjectReference from S3EventNotificationRecord if event name starts with ObjectCreated
processS3ObjectAndDeleteSqsMessages(s3EventNotificationRecords);

if (sqsMessages.size() < sqsOptions.getMaximumMessages() && s3SourceConfig.getSqsOptions().getPollDelay().toMillis() > 0) {
try {
Thread.sleep(s3SourceConfig.getSqsOptions().getPollDelay().toMillis());
} catch (InterruptedException e) {
LOG.error("Thread is interrupted while polling SQS.", e);
}
}
}
}

List<Message> getMessagesFromSqs() {
List<Message> messages = new ArrayList<>();
try {
ReceiveMessageRequest receiveMessageRequest = createReceiveMessageRequest();
messages = sqsClient.receiveMessage(receiveMessageRequest).messages();
} catch (SqsException e) {
LOG.error("Error reading from SQS: {}", e.awsErrorDetails().errorMessage());
}
return messages;
}

ReceiveMessageRequest createReceiveMessageRequest() {
return ReceiveMessageRequest.builder()
.queueUrl(sqsOptions.getSqsUrl())
.maxNumberOfMessages(sqsOptions.getMaximumMessages())
.visibilityTimeout((int) sqsOptions.getVisibilityTimeout().getSeconds())
.waitTimeSeconds((int) sqsOptions.getWaitTime().getSeconds())
.build();
}

private Map<Message, S3EventNotification.S3EventNotificationRecord> getS3MessageEventNotificationRecordMap(final List<Message> sqsMessages) {
return sqsMessages.stream().collect(Collectors.toMap(message -> message, this::convertS3EventMessages));
}

S3EventNotification.S3EventNotificationRecord convertS3EventMessages(final Message message) {
return S3EventNotification.parseJson(message.body()).getRecords().get(0);
}

private void processS3ObjectAndDeleteSqsMessages(final Map<Message, S3EventNotification.S3EventNotificationRecord> s3EventNotificationRecords) {
for (Map.Entry<Message, S3EventNotification.S3EventNotificationRecord> entry: s3EventNotificationRecords.entrySet()) {
if(isEventNameCreated(entry.getValue())) {
S3ObjectReference s3ObjectReference = populateS3Reference(entry.getValue());
s3Service.addS3Object(s3ObjectReference);
// TODO: delete sqsMessages which are successfully processed
}
}
}

boolean isEventNameCreated(final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord) {
Optional<S3EventNotification.S3EventNotificationRecord> filter = objectCreatedFilter.filter(s3EventNotificationRecord);
return filter.isPresent();
}

S3ObjectReference populateS3Reference(final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord) {
return S3ObjectReference.fromBucketAndKey(s3EventNotificationRecord.getS3().getBucket().getName(),
s3EventNotificationRecord.getS3().getObject().getKey());
}
}
Loading