Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SQS interactions for S3 source #1431

Merged
merged 11 commits into from
Jun 7, 2022
5 changes: 3 additions & 2 deletions data-prepper-plugins/s3-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ repositories {

dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.3'
implementation 'software.amazon.awssdk:s3:2.17.191'
implementation 'software.amazon.awssdk:sts:2.17.191'
implementation 'software.amazon.awssdk:sqs:2.17.191'
implementation 'com.amazonaws:aws-java-sdk-s3:1.12.220'
testImplementation 'org.hamcrest:hamcrest:2.2'
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sts.StsClient;

public class S3Service {
private static final Logger LOG = LoggerFactory.getLogger(S3Service.class);

private S3SourceConfig s3SourceConfig;
private S3Client s3Client;

public S3Service(final S3SourceConfig s3SourceConfig) {
this.s3SourceConfig = s3SourceConfig;
this.s3Client = createS3Client(StsClient.create());
}

S3ObjectReference addS3Object(final S3ObjectReference s3ObjectReference) {
// TODO: should return message id and receipt handle if successfully converted to event
return null;
Comment on lines +27 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few concerns over this method and subsequent workflow. I have some ideas for improvements but I want to understand a few things first.

What are we adding an S3Object to? I find this method name confusing and was really confused by the usage in the run() method.

Why are the parameters and return object the same object (S3ObjectReference)? Are we extending S3ObjectReference and expecting this class to mutate the variable with a messageId and Receipt? (I want to see if we can avoid mutation and nullable variables)

How is the receipt used by the calling function? Do we need it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the parameters and return object the same object (S3ObjectReference)? Are we extending S3ObjectReference and expecting this class to mutate the variable with a messageId and Receipt? (I want to see if we can avoid mutation and nullable variables)

@asifsmohammed and I discussed a couple approaches here:

  1. We can make this templated and add something like T getNotificationMetadata() which in this case returns SQS metadata. It could be extended for SNS metadata if that were ever added.
  2. We could extend this class with a sub-class.

In either case, the SQS metadata would include the ReceiptHandle and MessageId.

I'm fine with either approach. But, the inheritance approach would have the benefit of completely abstracting this metadata from the S3 code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial thought was to add ReceiptHandle and MessageId to S3ObjectReference which will be used to get S3 Object.
If S3 object is successfully processed we return the same S3ObjectReference to delete message from queue.

I'll create a subclass which contains ReceiptHandle and MessageId.

}

S3Client createS3Client(final StsClient stsClient) {
LOG.info("Creating S3 client");
return S3Client.builder()
.region(Region.of(s3SourceConfig.getAWSAuthenticationOptions().getAwsRegion()))
.credentialsProvider(s3SourceConfig.getAWSAuthenticationOptions().authenticateAwsConfiguration(stsClient))
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(5).build())
.build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,52 +12,38 @@
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.source.Source;
import org.apache.commons.lang3.NotImplementedException;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sts.StsClient;

import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name = "s3", pluginType = Source.class, pluginConfigurationType = S3SourceConfig.class)
public class S3Source implements Source<Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(S3Source.class);

private final PluginMetrics pluginMetrics;
private final S3SourceConfig s3SourceConfig;

private SqsService sqsService;

@DataPrepperPluginConstructor
public S3Source(PluginMetrics pluginMetrics, final S3SourceConfig s3SourceConfig) {
this.pluginMetrics = pluginMetrics;
this.s3SourceConfig = s3SourceConfig;

awsCredentialsProvider = createCredentialsProvider();
s3Client = createS3Client(awsCredentialsProvider);

throw new NotImplementedException();
}

@Override
public void start(Buffer<Record<Event>> buffer) {
if (buffer == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer is never used the by the S3Service or the SqsService. Who is going to be responsible for putting the item in the buffer (S3Service or SqsService)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The S3ObjectWorker handles this:

Also, @asifsmohammed and I are coordinating on the work here. He created this PR before the S3ObjectWorker was present in main, so some portions like this are currently unused in his branch and work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the logic with buffer here

throw new IllegalStateException("Buffer provided is null");
}

S3Service s3Service = new S3Service(s3SourceConfig);
sqsService = new SqsService(s3SourceConfig, s3Service);

sqsService.start();
}

@Override
public void stop() {

}

private final AwsCredentialsProvider awsCredentialsProvider;
private final S3Client s3Client;

private AwsCredentialsProvider createCredentialsProvider() {
return Objects.requireNonNull(s3SourceConfig.getAWSAuthentication().authenticateAwsConfiguration(StsClient.create()));
}

private S3Client createS3Client(final AwsCredentialsProvider awsCredentialsProvider) {
return S3Client.builder()
.region(Region.of(s3SourceConfig.getAWSAuthentication().getAwsRegion()))
.credentialsProvider(awsCredentialsProvider)
.build();
sqsService.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

package com.amazon.dataprepper.plugins.source;

import com.amazon.dataprepper.plugins.source.configuration.AwsAuthenticationOptions;
import com.amazon.dataprepper.plugins.source.configuration.CodecOption;
import com.amazon.dataprepper.plugins.source.configuration.CompressionOption;
import com.amazon.dataprepper.plugins.source.configuration.NotificationTypeOption;
import com.amazon.dataprepper.plugins.source.configuration.CompressionOption;
import com.amazon.dataprepper.plugins.source.configuration.CodecOption;
import com.amazon.dataprepper.plugins.source.configuration.SqsOptions;
import com.amazon.dataprepper.plugins.source.configuration.AwsAuthenticationOptions;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;

public class S3SourceConfig {
Expand All @@ -27,10 +29,18 @@ public class S3SourceConfig {
@Valid
private CodecOption codec;

@JsonProperty("sqs")
@NotNull
private SqsOptions sqsOptions;

@JsonProperty("aws")
@NotNull
@Valid
private AwsAuthenticationOptions awsAuthentication;
private AwsAuthenticationOptions awsAuthenticationOptions;

@JsonProperty("thread_count")
@Min(0)
private int threadCount;

public NotificationTypeOption getNotificationType() {
return notificationType;
Expand All @@ -44,7 +54,15 @@ public CodecOption getCodec() {
return codec;
}

public AwsAuthenticationOptions getAWSAuthentication() {
return awsAuthentication;
public SqsOptions getSqsOptions() {
return sqsOptions;
}

public AwsAuthenticationOptions getAWSAuthenticationOptions() {
return awsAuthenticationOptions;
}

public int getThreadCount() {
return threadCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sts.StsClient;

public class SqsService {
private static final Logger LOG = LoggerFactory.getLogger(SqsService.class);

private final S3SourceConfig s3SourceConfig;
private final S3Service s3Accessor;
private final SqsClient sqsClient;

private Thread sqsWorkerThread;

public SqsService(final S3SourceConfig s3SourceConfig, final S3Service s3Accessor) {
this.s3SourceConfig = s3SourceConfig;
this.s3Accessor = s3Accessor;
this.sqsClient = createSqsClient(StsClient.create());
}

public void start() {
sqsWorkerThread = new Thread(new SqsWorker(sqsClient, s3Accessor, s3SourceConfig));
sqsWorkerThread.start();
}

SqsClient createSqsClient(final StsClient stsClient) {
LOG.info("Creating SQS client");
return SqsClient.builder()
.region(Region.of(s3SourceConfig.getAWSAuthenticationOptions().getAwsRegion()))
.credentialsProvider(s3SourceConfig.getAWSAuthenticationOptions().authenticateAwsConfiguration(stsClient))
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(5).build())
.build())
.build();
}

public void stop() {
sqsWorkerThread.interrupt();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source;

import com.amazon.dataprepper.plugins.source.configuration.SqsOptions;
import com.amazon.dataprepper.plugins.source.filter.ObjectCreatedFilter;
import com.amazon.dataprepper.plugins.source.filter.S3EventFilter;
import com.amazonaws.services.s3.event.S3EventNotification;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.SqsException;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class SqsWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(SqsWorker.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

private S3SourceConfig s3SourceConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These private variables should be final to prevent re-assignment

private SqsClient sqsClient;
private S3Service s3Service;
private SqsOptions sqsOptions;
private S3EventFilter objectCreatedFilter;

public SqsWorker(final SqsClient sqsClient, final S3Service s3Service, final S3SourceConfig s3SourceConfig) {
this.s3SourceConfig = s3SourceConfig;
this.sqsClient = sqsClient;
this.s3Service = s3Service;
sqsOptions = s3SourceConfig.getSqsOptions();
objectCreatedFilter = new ObjectCreatedFilter();
}

@Override
public void run() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is getting very large in this partial state. I would encourage you to break it apart to improve maintainability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also make it easier to unit test and can remove variable reassignment/mutation in the code below.


while(true) {
List<Message> messages = new ArrayList<>();

try {
ReceiveMessageRequest receiveMessageRequest = createReceiveMessageRequest();
messages.addAll(sqsClient.receiveMessage(receiveMessageRequest).messages());
} catch (SqsException e) {
LOG.error("Error reading from SQS: {}", e.awsErrorDetails().errorMessage());
}

// read each message as S3 event message
Map<Message, S3EventNotification.S3EventNotificationRecord> s3EventNotificationRecords = messages.stream()
.collect(Collectors.toMap(message -> message, this::convertS3EventMessages));

// build s3ObjectPointer from S3EventNotificationRecord if event name starts with ObjectCreated
List<S3ObjectReference> addedObjects = new ArrayList<>();
for (Map.Entry<Message, S3EventNotification.S3EventNotificationRecord> entry: s3EventNotificationRecords.entrySet()) {
Optional<S3EventNotification.S3EventNotificationRecord> filter = objectCreatedFilter.filter(entry.getValue());
if(filter.isPresent()) {
S3ObjectReference s3ObjectPointer = populateS3Reference(entry.getValue());
addedObjects.add(s3Service.addS3Object(s3ObjectPointer));
}
}

// TODO: delete messages which are successfully processed

if (messages.size() < sqsOptions.getMaximumMessages() && s3SourceConfig.getSqsOptions().getPollDelay().toMillis() > 0) {
try {
Thread.sleep(s3SourceConfig.getSqsOptions().getPollDelay().toMillis());
} catch (InterruptedException e) {
LOG.error("Thread is interrupted while polling SQS.", e);
}
}
}
}

S3ObjectReference populateS3Reference(final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord) {
return S3ObjectReference.fromBucketAndKey(s3EventNotificationRecord.getS3().getBucket().getName(),
s3EventNotificationRecord.getS3().getObject().getKey());
}

ReceiveMessageRequest createReceiveMessageRequest() {
return ReceiveMessageRequest.builder()
.queueUrl(sqsOptions.getSqsUrl())
.maxNumberOfMessages(sqsOptions.getMaximumMessages())
.visibilityTimeout((int) sqsOptions.getVisibilityTimeout().getSeconds())
.waitTimeSeconds((int) sqsOptions.getWaitTime().getSeconds())
.build();
}

private S3EventNotification.S3EventNotificationRecord convertS3EventMessages(final Message message) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may want to make this part a separate class which is injected. One reason is that the SQS message may actually be in a different format in some cases. If the SQS queue is connected to SNS, then this becomes a string rather than JSON. So we might need to make this configurable by pipeline authors. I'm fine to keep it here for now if that helps merge it in though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move this to a separate class once we decide on how pipeline authors can configure this.

return objectMapper.convertValue(message, S3EventNotification.S3EventNotificationRecord.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;

import java.time.Duration;

public class SqsOptions {
private static final int DEFAULT_MAXIMUM_MESSAGES = 10;
private static final Duration DEFAULT_VISIBILITY_TIMEOUT_SECONDS = Duration.ofSeconds(30);
private static final Duration DEFAULT_WAIT_TIME_SECONDS = Duration.ofSeconds(20);
private static final Duration DEFAULT_POLL_DELAY_SECONDS = Duration.ofSeconds(0);

@JsonProperty("queue_url")
@NotBlank(message = "SQS URL cannot be null or empty")
private String sqsUrl;

@JsonProperty("maximum_messages")
private int maximumMessages = DEFAULT_MAXIMUM_MESSAGES;

@JsonProperty("visibility_timeout")
@Min(0)
@Max(43200)
private Duration visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT_SECONDS;

@JsonProperty("wait_time")
@Min(0)
@Max(20)
private Duration waitTime = DEFAULT_WAIT_TIME_SECONDS;

@JsonProperty("poll_delay")
@Min(0)
private Duration pollDelay = DEFAULT_POLL_DELAY_SECONDS;

public String getSqsUrl() {
return sqsUrl;
}

public int getMaximumMessages() {
return maximumMessages;
}

public Duration getVisibilityTimeout() {
return visibilityTimeout;
}

public Duration getWaitTime() {
return waitTime;
}

public Duration getPollDelay() {
return pollDelay;
}
}
Loading