-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added SQS interactions for S3 source #1431
Changes from all commits
6c45969
cb834f8
421010f
ffcf989
fbc74f7
391452a
fdb2411
2b77a9a
eb2bcca
093a9a3
e387106
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package com.amazon.dataprepper.plugins.source; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; | ||
import software.amazon.awssdk.core.retry.RetryPolicy; | ||
import software.amazon.awssdk.regions.Region; | ||
import software.amazon.awssdk.services.s3.S3Client; | ||
import software.amazon.awssdk.services.sts.StsClient; | ||
|
||
public class S3Service { | ||
private static final Logger LOG = LoggerFactory.getLogger(S3Service.class); | ||
|
||
private final S3SourceConfig s3SourceConfig; | ||
private final S3Client s3Client; | ||
|
||
public S3Service(final S3SourceConfig s3SourceConfig) { | ||
this.s3SourceConfig = s3SourceConfig; | ||
this.s3Client = createS3Client(StsClient.create()); | ||
} | ||
|
||
S3ObjectReference addS3Object(final S3ObjectReference s3ObjectReference) { | ||
// TODO: should return message id and receipt handle if successfully converted to event | ||
return null; | ||
} | ||
|
||
S3Client createS3Client(final StsClient stsClient) { | ||
LOG.info("Creating S3 client"); | ||
return S3Client.builder() | ||
.region(Region.of(s3SourceConfig.getAWSAuthenticationOptions().getAwsRegion())) | ||
.credentialsProvider(s3SourceConfig.getAWSAuthenticationOptions().authenticateAwsConfiguration(stsClient)) | ||
.overrideConfiguration(ClientOverrideConfiguration.builder() | ||
.retryPolicy(RetryPolicy.builder().numRetries(5).build()) | ||
.build()) | ||
.build(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -12,52 +12,38 @@ | |||
import com.amazon.dataprepper.model.event.Event; | ||||
import com.amazon.dataprepper.model.record.Record; | ||||
import com.amazon.dataprepper.model.source.Source; | ||||
import org.apache.commons.lang3.NotImplementedException; | ||||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; | ||||
import software.amazon.awssdk.regions.Region; | ||||
import software.amazon.awssdk.services.s3.S3Client; | ||||
import software.amazon.awssdk.services.sts.StsClient; | ||||
|
||||
import java.util.Objects; | ||||
import org.slf4j.Logger; | ||||
import org.slf4j.LoggerFactory; | ||||
|
||||
@DataPrepperPlugin(name = "s3", pluginType = Source.class, pluginConfigurationType = S3SourceConfig.class) | ||||
public class S3Source implements Source<Record<Event>> { | ||||
private static final Logger LOG = LoggerFactory.getLogger(S3Source.class); | ||||
|
||||
private final PluginMetrics pluginMetrics; | ||||
private final S3SourceConfig s3SourceConfig; | ||||
|
||||
private SqsService sqsService; | ||||
|
||||
@DataPrepperPluginConstructor | ||||
public S3Source(PluginMetrics pluginMetrics, final S3SourceConfig s3SourceConfig) { | ||||
this.pluginMetrics = pluginMetrics; | ||||
this.s3SourceConfig = s3SourceConfig; | ||||
|
||||
awsCredentialsProvider = createCredentialsProvider(); | ||||
s3Client = createS3Client(awsCredentialsProvider); | ||||
|
||||
throw new NotImplementedException(); | ||||
} | ||||
|
||||
@Override | ||||
public void start(Buffer<Record<Event>> buffer) { | ||||
if (buffer == null) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The buffer is never used the by the S3Service or the SqsService. Who is going to be responsible for putting the item in the buffer (S3Service or SqsService)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Line 30 in be35437
Also, @asifsmohammed and I are coordinating on the work here. He created this PR before the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll update the logic with buffer here |
||||
throw new IllegalStateException("Buffer provided is null"); | ||||
} | ||||
|
||||
S3Service s3Service = new S3Service(s3SourceConfig); | ||||
sqsService = new SqsService(s3SourceConfig, s3Service); | ||||
|
||||
sqsService.start(); | ||||
} | ||||
|
||||
@Override | ||||
public void stop() { | ||||
|
||||
} | ||||
|
||||
private final AwsCredentialsProvider awsCredentialsProvider; | ||||
private final S3Client s3Client; | ||||
|
||||
private AwsCredentialsProvider createCredentialsProvider() { | ||||
return Objects.requireNonNull(s3SourceConfig.getAWSAuthentication().authenticateAwsConfiguration(StsClient.create())); | ||||
} | ||||
|
||||
private S3Client createS3Client(final AwsCredentialsProvider awsCredentialsProvider) { | ||||
return S3Client.builder() | ||||
.region(Region.of(s3SourceConfig.getAWSAuthentication().getAwsRegion())) | ||||
.credentialsProvider(awsCredentialsProvider) | ||||
.build(); | ||||
sqsService.stop(); | ||||
} | ||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package com.amazon.dataprepper.plugins.source; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; | ||
import software.amazon.awssdk.core.retry.RetryPolicy; | ||
import software.amazon.awssdk.regions.Region; | ||
import software.amazon.awssdk.services.sqs.SqsClient; | ||
import software.amazon.awssdk.services.sts.StsClient; | ||
|
||
public class SqsService { | ||
private static final Logger LOG = LoggerFactory.getLogger(SqsService.class); | ||
|
||
private final S3SourceConfig s3SourceConfig; | ||
private final S3Service s3Accessor; | ||
private final SqsClient sqsClient; | ||
|
||
private Thread sqsWorkerThread; | ||
|
||
public SqsService(final S3SourceConfig s3SourceConfig, final S3Service s3Accessor) { | ||
this.s3SourceConfig = s3SourceConfig; | ||
this.s3Accessor = s3Accessor; | ||
this.sqsClient = createSqsClient(StsClient.create()); | ||
} | ||
|
||
public void start() { | ||
sqsWorkerThread = new Thread(new SqsWorker(sqsClient, s3Accessor, s3SourceConfig)); | ||
sqsWorkerThread.start(); | ||
} | ||
|
||
SqsClient createSqsClient(final StsClient stsClient) { | ||
LOG.info("Creating SQS client"); | ||
return SqsClient.builder() | ||
.region(Region.of(s3SourceConfig.getAWSAuthenticationOptions().getAwsRegion())) | ||
.credentialsProvider(s3SourceConfig.getAWSAuthenticationOptions().authenticateAwsConfiguration(stsClient)) | ||
.overrideConfiguration(ClientOverrideConfiguration.builder() | ||
.retryPolicy(RetryPolicy.builder().numRetries(5).build()) | ||
.build()) | ||
.build(); | ||
} | ||
|
||
public void stop() { | ||
sqsWorkerThread.interrupt(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package com.amazon.dataprepper.plugins.source; | ||
|
||
import com.amazon.dataprepper.plugins.source.configuration.SqsOptions; | ||
import com.amazon.dataprepper.plugins.source.filter.ObjectCreatedFilter; | ||
import com.amazon.dataprepper.plugins.source.filter.S3EventFilter; | ||
import com.amazonaws.services.s3.event.S3EventNotification; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import software.amazon.awssdk.services.sqs.SqsClient; | ||
import software.amazon.awssdk.services.sqs.model.Message; | ||
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; | ||
import software.amazon.awssdk.services.sqs.model.SqsException; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.stream.Collectors; | ||
|
||
public class SqsWorker implements Runnable { | ||
private static final Logger LOG = LoggerFactory.getLogger(SqsWorker.class); | ||
|
||
private final S3SourceConfig s3SourceConfig; | ||
private final SqsClient sqsClient; | ||
private final S3Service s3Service; | ||
private final SqsOptions sqsOptions; | ||
private final S3EventFilter objectCreatedFilter; | ||
|
||
public SqsWorker(final SqsClient sqsClient, final S3Service s3Service, final S3SourceConfig s3SourceConfig) { | ||
this.s3SourceConfig = s3SourceConfig; | ||
this.sqsClient = sqsClient; | ||
this.s3Service = s3Service; | ||
sqsOptions = s3SourceConfig.getSqsOptions(); | ||
objectCreatedFilter = new ObjectCreatedFilter(); | ||
} | ||
|
||
@Override | ||
public void run() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is getting very large in this partial state. I would encourage you to break it apart to improve maintainability. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will also make it easier to unit test and can remove variable reassignment/mutation in the code below. |
||
|
||
while(true) { | ||
// get messages from SQS | ||
List<Message> sqsMessages = getMessagesFromSqs(); | ||
|
||
// convert each message to S3EventNotificationRecord | ||
Map<Message, S3EventNotification.S3EventNotificationRecord> s3EventNotificationRecords = | ||
getS3MessageEventNotificationRecordMap(sqsMessages); | ||
|
||
// build s3ObjectReference from S3EventNotificationRecord if event name starts with ObjectCreated | ||
processS3ObjectAndDeleteSqsMessages(s3EventNotificationRecords); | ||
|
||
if (sqsMessages.size() < sqsOptions.getMaximumMessages() && s3SourceConfig.getSqsOptions().getPollDelay().toMillis() > 0) { | ||
try { | ||
Thread.sleep(s3SourceConfig.getSqsOptions().getPollDelay().toMillis()); | ||
} catch (InterruptedException e) { | ||
LOG.error("Thread is interrupted while polling SQS.", e); | ||
} | ||
} | ||
} | ||
} | ||
|
||
List<Message> getMessagesFromSqs() { | ||
List<Message> messages = new ArrayList<>(); | ||
try { | ||
ReceiveMessageRequest receiveMessageRequest = createReceiveMessageRequest(); | ||
messages = sqsClient.receiveMessage(receiveMessageRequest).messages(); | ||
} catch (SqsException e) { | ||
LOG.error("Error reading from SQS: {}", e.awsErrorDetails().errorMessage()); | ||
} | ||
return messages; | ||
} | ||
|
||
ReceiveMessageRequest createReceiveMessageRequest() { | ||
return ReceiveMessageRequest.builder() | ||
.queueUrl(sqsOptions.getSqsUrl()) | ||
.maxNumberOfMessages(sqsOptions.getMaximumMessages()) | ||
.visibilityTimeout((int) sqsOptions.getVisibilityTimeout().getSeconds()) | ||
.waitTimeSeconds((int) sqsOptions.getWaitTime().getSeconds()) | ||
.build(); | ||
} | ||
|
||
private Map<Message, S3EventNotification.S3EventNotificationRecord> getS3MessageEventNotificationRecordMap(final List<Message> sqsMessages) { | ||
return sqsMessages.stream().collect(Collectors.toMap(message -> message, this::convertS3EventMessages)); | ||
} | ||
|
||
S3EventNotification.S3EventNotificationRecord convertS3EventMessages(final Message message) { | ||
return S3EventNotification.parseJson(message.body()).getRecords().get(0); | ||
} | ||
|
||
private void processS3ObjectAndDeleteSqsMessages(final Map<Message, S3EventNotification.S3EventNotificationRecord> s3EventNotificationRecords) { | ||
for (Map.Entry<Message, S3EventNotification.S3EventNotificationRecord> entry: s3EventNotificationRecords.entrySet()) { | ||
if(isEventNameCreated(entry.getValue())) { | ||
S3ObjectReference s3ObjectReference = populateS3Reference(entry.getValue()); | ||
s3Service.addS3Object(s3ObjectReference); | ||
// TODO: delete sqsMessages which are successfully processed | ||
} | ||
} | ||
} | ||
|
||
boolean isEventNameCreated(final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord) { | ||
Optional<S3EventNotification.S3EventNotificationRecord> filter = objectCreatedFilter.filter(s3EventNotificationRecord); | ||
return filter.isPresent(); | ||
} | ||
|
||
S3ObjectReference populateS3Reference(final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord) { | ||
return S3ObjectReference.fromBucketAndKey(s3EventNotificationRecord.getS3().getBucket().getName(), | ||
s3EventNotificationRecord.getS3().getObject().getKey()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few concerns over this method and subsequent workflow. I have some ideas for improvements but I want to understand a few things first.
What are we adding an S3Object to? I find this method name confusing and was really confused by the usage in the
run()
method.Why are the parameters and return object the same object (S3ObjectReference)? Are we extending S3ObjectReference and expecting this class to mutate the variable with a messageId and Receipt? (I want to see if we can avoid mutation and nullable variables)
How is the receipt used by the calling function? Do we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asifsmohammed and I discussed a couple approaches here:
T getNotificationMetadata()
which in this case returns SQS metadata. It could be extended for SNS metadata if that were ever added.In either case, the SQS metadata would include the
ReceiptHandle
andMessageId
.I'm fine with either approach. But, the inheritance approach would have the benefit of completely abstracting this metadata from the S3 code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My initial thought was to add
ReceiptHandle
andMessageId
toS3ObjectReference
which will be used to get S3 Object.If S3 object is successfully processed we return the same
S3ObjectReference
to delete message from queue.I'll create a subclass which contains
ReceiptHandle
andMessageId
.