diff --git a/data-prepper-plugins/s3-source/build.gradle b/data-prepper-plugins/s3-source/build.gradle index 14f9a15148..fc6c6c0034 100644 --- a/data-prepper-plugins/s3-source/build.gradle +++ b/data-prepper-plugins/s3-source/build.gradle @@ -13,10 +13,10 @@ repositories { dependencies { implementation project(':data-prepper-api') - implementation 'com.fasterxml.jackson.core:jackson-databind' - implementation 'org.apache.commons:commons-lang3:3.12.0' implementation 'software.amazon.awssdk:s3:2.17.191' implementation 'software.amazon.awssdk:sts:2.17.191' + implementation 'software.amazon.awssdk:sqs:2.17.191' + implementation 'com.amazonaws:aws-java-sdk-s3:1.12.220' testImplementation 'org.hamcrest:hamcrest:2.2' } diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3Service.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3Service.java new file mode 100644 index 0000000000..6f806fb612 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3Service.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.sts.StsClient; + +public class S3Service { + private static final Logger LOG = LoggerFactory.getLogger(S3Service.class); + + private final S3SourceConfig s3SourceConfig; + private final S3Client s3Client; + + public S3Service(final S3SourceConfig s3SourceConfig) { + this.s3SourceConfig = s3SourceConfig; + this.s3Client = createS3Client(StsClient.create()); + } + + S3ObjectReference addS3Object(final S3ObjectReference s3ObjectReference) { + // TODO: should return message id and receipt handle if successfully converted to event + return null; + } + + S3Client createS3Client(final StsClient stsClient) { + LOG.info("Creating S3 client"); + return S3Client.builder() + .region(Region.of(s3SourceConfig.getAWSAuthenticationOptions().getAwsRegion())) + .credentialsProvider(s3SourceConfig.getAWSAuthenticationOptions().authenticateAwsConfiguration(stsClient)) + .overrideConfiguration(ClientOverrideConfiguration.builder() + .retryPolicy(RetryPolicy.builder().numRetries(5).build()) + .build()) + .build(); + } +} diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3Source.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3Source.java index 7b4b36b0d0..0eac8fcbea 100644 --- a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3Source.java +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3Source.java @@ -12,52 +12,38 @@ import com.amazon.dataprepper.model.event.Event; import com.amazon.dataprepper.model.record.Record; import com.amazon.dataprepper.model.source.Source; -import org.apache.commons.lang3.NotImplementedException; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.sts.StsClient; - -import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @DataPrepperPlugin(name = "s3", pluginType = Source.class, pluginConfigurationType = S3SourceConfig.class) public class S3Source implements Source> { + private static final Logger LOG = LoggerFactory.getLogger(S3Source.class); private final PluginMetrics pluginMetrics; private final S3SourceConfig s3SourceConfig; + private SqsService sqsService; + @DataPrepperPluginConstructor public S3Source(PluginMetrics pluginMetrics, final S3SourceConfig s3SourceConfig) { this.pluginMetrics = pluginMetrics; this.s3SourceConfig = s3SourceConfig; - - awsCredentialsProvider = createCredentialsProvider(); - s3Client = createS3Client(awsCredentialsProvider); - - throw new NotImplementedException(); } @Override public void start(Buffer> buffer) { + if (buffer == null) { + throw new IllegalStateException("Buffer provided is null"); + } + + S3Service s3Service = new S3Service(s3SourceConfig); + sqsService = new SqsService(s3SourceConfig, s3Service); + sqsService.start(); } @Override public void stop() { - - } - - private final AwsCredentialsProvider awsCredentialsProvider; - private final S3Client s3Client; - - private AwsCredentialsProvider createCredentialsProvider() { - return Objects.requireNonNull(s3SourceConfig.getAWSAuthentication().authenticateAwsConfiguration(StsClient.create())); - } - - private S3Client createS3Client(final AwsCredentialsProvider awsCredentialsProvider) { - return S3Client.builder() - .region(Region.of(s3SourceConfig.getAWSAuthentication().getAwsRegion())) - .credentialsProvider(awsCredentialsProvider) - .build(); + sqsService.stop(); } } diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3SourceConfig.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3SourceConfig.java index c42d1df508..c125ebc87e 100644 --- a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3SourceConfig.java +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3SourceConfig.java @@ -5,12 +5,14 @@ package com.amazon.dataprepper.plugins.source; -import com.amazon.dataprepper.plugins.source.configuration.AwsAuthenticationOptions; -import com.amazon.dataprepper.plugins.source.configuration.CodecOption; -import com.amazon.dataprepper.plugins.source.configuration.CompressionOption; import com.amazon.dataprepper.plugins.source.configuration.NotificationTypeOption; +import com.amazon.dataprepper.plugins.source.configuration.CompressionOption; +import com.amazon.dataprepper.plugins.source.configuration.CodecOption; +import com.amazon.dataprepper.plugins.source.configuration.SqsOptions; +import com.amazon.dataprepper.plugins.source.configuration.AwsAuthenticationOptions; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; +import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; public class S3SourceConfig { @@ -27,10 +29,18 @@ public class S3SourceConfig { @Valid private CodecOption codec; + @JsonProperty("sqs") + @NotNull + private SqsOptions sqsOptions; + @JsonProperty("aws") @NotNull @Valid - private AwsAuthenticationOptions awsAuthentication; + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("thread_count") + @Min(0) + private int threadCount; public NotificationTypeOption getNotificationType() { return notificationType; @@ -44,7 +54,15 @@ public CodecOption getCodec() { return codec; } - public AwsAuthenticationOptions getAWSAuthentication() { - return awsAuthentication; + public SqsOptions getSqsOptions() { + return sqsOptions; + } + + public AwsAuthenticationOptions getAWSAuthenticationOptions() { + return awsAuthenticationOptions; + } + + public int getThreadCount() { + return threadCount; } } diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/SqsService.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/SqsService.java new file mode 100644 index 0000000000..a117f32e73 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/SqsService.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sts.StsClient; + +public class SqsService { + private static final Logger LOG = LoggerFactory.getLogger(SqsService.class); + + private final S3SourceConfig s3SourceConfig; + private final S3Service s3Accessor; + private final SqsClient sqsClient; + + private Thread sqsWorkerThread; + + public SqsService(final S3SourceConfig s3SourceConfig, final S3Service s3Accessor) { + this.s3SourceConfig = s3SourceConfig; + this.s3Accessor = s3Accessor; + this.sqsClient = createSqsClient(StsClient.create()); + } + + public void start() { + sqsWorkerThread = new Thread(new SqsWorker(sqsClient, s3Accessor, s3SourceConfig)); + sqsWorkerThread.start(); + } + + SqsClient createSqsClient(final StsClient stsClient) { + LOG.info("Creating SQS client"); + return SqsClient.builder() + .region(Region.of(s3SourceConfig.getAWSAuthenticationOptions().getAwsRegion())) + .credentialsProvider(s3SourceConfig.getAWSAuthenticationOptions().authenticateAwsConfiguration(stsClient)) + .overrideConfiguration(ClientOverrideConfiguration.builder() + .retryPolicy(RetryPolicy.builder().numRetries(5).build()) + .build()) + .build(); + } + + public void stop() { + sqsWorkerThread.interrupt(); + } +} diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/SqsWorker.java new file mode 100644 index 0000000000..a6131c525b --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/SqsWorker.java @@ -0,0 +1,113 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source; + +import com.amazon.dataprepper.plugins.source.configuration.SqsOptions; +import com.amazon.dataprepper.plugins.source.filter.ObjectCreatedFilter; +import com.amazon.dataprepper.plugins.source.filter.S3EventFilter; +import com.amazonaws.services.s3.event.S3EventNotification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.SqsException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +public class SqsWorker implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(SqsWorker.class); + + private final S3SourceConfig s3SourceConfig; + private final SqsClient sqsClient; + private final S3Service s3Service; + private final SqsOptions sqsOptions; + private final S3EventFilter objectCreatedFilter; + + public SqsWorker(final SqsClient sqsClient, final S3Service s3Service, final S3SourceConfig s3SourceConfig) { + this.s3SourceConfig = s3SourceConfig; + this.sqsClient = sqsClient; + this.s3Service = s3Service; + sqsOptions = s3SourceConfig.getSqsOptions(); + objectCreatedFilter = new ObjectCreatedFilter(); + } + + @Override + public void run() { + + while(true) { + // get messages from SQS + List sqsMessages = getMessagesFromSqs(); + + // convert each message to S3EventNotificationRecord + Map s3EventNotificationRecords = + getS3MessageEventNotificationRecordMap(sqsMessages); + + // build s3ObjectReference from S3EventNotificationRecord if event name starts with ObjectCreated + processS3ObjectAndDeleteSqsMessages(s3EventNotificationRecords); + + if (sqsMessages.size() < sqsOptions.getMaximumMessages() && s3SourceConfig.getSqsOptions().getPollDelay().toMillis() > 0) { + try { + Thread.sleep(s3SourceConfig.getSqsOptions().getPollDelay().toMillis()); + } catch (InterruptedException e) { + LOG.error("Thread is interrupted while polling SQS.", e); + } + } + } + } + + List getMessagesFromSqs() { + List messages = new ArrayList<>(); + try { + ReceiveMessageRequest receiveMessageRequest = createReceiveMessageRequest(); + messages = sqsClient.receiveMessage(receiveMessageRequest).messages(); + } catch (SqsException e) { + LOG.error("Error reading from SQS: {}", e.awsErrorDetails().errorMessage()); + } + return messages; + } + + ReceiveMessageRequest createReceiveMessageRequest() { + return ReceiveMessageRequest.builder() + .queueUrl(sqsOptions.getSqsUrl()) + .maxNumberOfMessages(sqsOptions.getMaximumMessages()) + .visibilityTimeout((int) sqsOptions.getVisibilityTimeout().getSeconds()) + .waitTimeSeconds((int) sqsOptions.getWaitTime().getSeconds()) + .build(); + } + + private Map getS3MessageEventNotificationRecordMap(final List sqsMessages) { + return sqsMessages.stream().collect(Collectors.toMap(message -> message, this::convertS3EventMessages)); + } + + S3EventNotification.S3EventNotificationRecord convertS3EventMessages(final Message message) { + return S3EventNotification.parseJson(message.body()).getRecords().get(0); + } + + private void processS3ObjectAndDeleteSqsMessages(final Map s3EventNotificationRecords) { + for (Map.Entry entry: s3EventNotificationRecords.entrySet()) { + if(isEventNameCreated(entry.getValue())) { + S3ObjectReference s3ObjectReference = populateS3Reference(entry.getValue()); + s3Service.addS3Object(s3ObjectReference); + // TODO: delete sqsMessages which are successfully processed + } + } + } + + boolean isEventNameCreated(final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord) { + Optional filter = objectCreatedFilter.filter(s3EventNotificationRecord); + return filter.isPresent(); + } + + S3ObjectReference populateS3Reference(final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord) { + return S3ObjectReference.fromBucketAndKey(s3EventNotificationRecord.getS3().getBucket().getName(), + s3EventNotificationRecord.getS3().getObject().getKey()); + } +} diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/configuration/SqsOptions.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/configuration/SqsOptions.java new file mode 100644 index 0000000000..985559a7d5 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/configuration/SqsOptions.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotBlank; + +import java.time.Duration; + +public class SqsOptions { + private static final int DEFAULT_MAXIMUM_MESSAGES = 10; + private static final Duration DEFAULT_VISIBILITY_TIMEOUT_SECONDS = Duration.ofSeconds(30); + private static final Duration DEFAULT_WAIT_TIME_SECONDS = Duration.ofSeconds(20); + private static final Duration DEFAULT_POLL_DELAY_SECONDS = Duration.ofSeconds(0); + + @JsonProperty("queue_url") + @NotBlank(message = "SQS URL cannot be null or empty") + private String sqsUrl; + + @JsonProperty("maximum_messages") + private int maximumMessages = DEFAULT_MAXIMUM_MESSAGES; + + @JsonProperty("visibility_timeout") + @Min(0) + @Max(43200) + private Duration visibilityTimeout = DEFAULT_VISIBILITY_TIMEOUT_SECONDS; + + @JsonProperty("wait_time") + @Min(0) + @Max(20) + private Duration waitTime = DEFAULT_WAIT_TIME_SECONDS; + + @JsonProperty("poll_delay") + @Min(0) + private Duration pollDelay = DEFAULT_POLL_DELAY_SECONDS; + + public String getSqsUrl() { + return sqsUrl; + } + + public int getMaximumMessages() { + return maximumMessages; + } + + public Duration getVisibilityTimeout() { + return visibilityTimeout; + } + + public Duration getWaitTime() { + return waitTime; + } + + public Duration getPollDelay() { + return pollDelay; + } +} diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilter.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilter.java new file mode 100644 index 0000000000..8d672e80ba --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilter.java @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source.filter; + +import com.amazonaws.services.s3.event.S3EventNotification; + +import java.util.Optional; + +public class ObjectCreatedFilter implements S3EventFilter { + @Override + public Optional filter(final S3EventNotification.S3EventNotificationRecord notification) { + if (notification.getEventName().startsWith("ObjectCreated")) + return Optional.of(notification); + else + return Optional.empty(); + } +} diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/S3EventFilter.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/S3EventFilter.java new file mode 100644 index 0000000000..e9b56c20ed --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/S3EventFilter.java @@ -0,0 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source.filter; + +import com.amazonaws.services.s3.event.S3EventNotification; + +import java.util.Optional; + +public interface S3EventFilter { + Optional filter(S3EventNotification.S3EventNotificationRecord notification); +} diff --git a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/S3SourceTest.java b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/S3SourceTest.java new file mode 100644 index 0000000000..0dae720715 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/S3SourceTest.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source; + +import com.amazon.dataprepper.metrics.PluginMetrics; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; + +class S3SourceTest { + private final String PLUGIN_NAME = "s3"; + private final String TEST_PIPELINE_NAME = "test_pipeline"; + + private S3Source s3Source; + private PluginMetrics pluginMetrics; + private S3SourceConfig s3SourceConfig; + + + @BeforeEach + void setUp() { + pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); + s3SourceConfig = mock(S3SourceConfig.class); + + s3Source = new S3Source(pluginMetrics, s3SourceConfig); + } + + @Test + void start_should_throw_IllegalStateException_when_buffer_is_null() { + assertThrows(IllegalStateException.class, () -> s3Source.start(null)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/SqsWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/SqsWorkerTest.java new file mode 100644 index 0000000000..0eddec87d8 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/SqsWorkerTest.java @@ -0,0 +1,114 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source; + +import com.amazon.dataprepper.plugins.source.configuration.AwsAuthenticationOptions; +import com.amazon.dataprepper.plugins.source.configuration.SqsOptions; +import com.amazon.dataprepper.plugins.source.filter.ObjectCreatedFilter; +import com.amazon.dataprepper.plugins.source.filter.S3EventFilter; +import com.amazonaws.services.s3.event.S3EventNotification; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class SqsWorkerTest { + private SqsWorker sqsWorker; + private SqsClient sqsClient; + private S3Service s3Service; + private S3SourceConfig s3SourceConfig; + private S3EventFilter objectCreatedFilter; + + @BeforeEach + void setUp() { + sqsClient = mock(SqsClient.class); + s3Service = mock(S3Service.class); + s3SourceConfig = mock(S3SourceConfig.class); + objectCreatedFilter = new ObjectCreatedFilter(); + + AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn("us-east-1"); + when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn("arn:aws:iam::123456789012:iam-role"); + + SqsOptions sqsOptions = mock(SqsOptions.class); + when(sqsOptions.getSqsUrl()).thenReturn("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"); + + when(s3SourceConfig.getAWSAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + when(s3SourceConfig.getSqsOptions()).thenReturn(sqsOptions); + + sqsWorker = new SqsWorker(sqsClient, s3Service, s3SourceConfig); + } + + @Test + void getMessagesFromSqs_throws_null_pointer_exception_with_dummy_queue_url() { + assertThrows(NullPointerException.class, () -> sqsWorker.getMessagesFromSqs()); + } + + @Test + void createReceiveMessageRequest_should_return_ReceiveMessageRequest() { + assertThat(sqsWorker.createReceiveMessageRequest(), instanceOf(ReceiveMessageRequest.class)); + } + + @Test + void convertS3EventMessages_convert_message_to_S3EventNotificationRecord() { + Message message = mock(Message.class); + when(message.body()).thenReturn("{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\"," + + "\"eventTime\":\"2022-06-06T18:02:33.495Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:AROAX:xxxxxx\"}," + + "\"requestParameters\":{\"sourceIPAddress\":\"99.99.999.99\"},\"responseElements\":{\"x-amz-request-id\":\"ABCD\"," + + "\"x-amz-id-2\":\"abcd\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"s3SourceEventNotification\"," + + "\"bucket\":{\"name\":\"bucketName\",\"ownerIdentity\":{\"principalId\":\"ID\"},\"arn\":\"arn:aws:s3:::bucketName\"}," + + "\"object\":{\"key\":\"File.gz\",\"size\":72,\"eTag\":\"abcd\",\"sequencer\":\"ABCD\"}}}]}"); + S3EventNotification.S3EventNotificationRecord actualS3EventNotificationRecord = sqsWorker.convertS3EventMessages(message); + assertThat(actualS3EventNotificationRecord, instanceOf(S3EventNotification.S3EventNotificationRecord.class)); + assertThat(actualS3EventNotificationRecord.getAwsRegion(), equalTo("us-east-1")); + assertThat(actualS3EventNotificationRecord.getS3().getBucket().getName(), equalTo("bucketName")); + assertThat(actualS3EventNotificationRecord.getS3().getObject().getKey(), equalTo("File.gz")); + } + + @Test + void isEventNameCreated_should_return_true_if_event_is_created() { + S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord = mock(S3EventNotification.S3EventNotificationRecord.class); + when(s3EventNotificationRecord.getEventName()).thenReturn("ObjectCreated"); + Assertions.assertTrue(sqsWorker.isEventNameCreated(s3EventNotificationRecord)); + } + + @Test + void isEventNameCreated_should_return_false_if_event_is_not_created() { + S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord = mock(S3EventNotification.S3EventNotificationRecord.class); + when(s3EventNotificationRecord.getEventName()).thenReturn("ObjectRemoved"); + Assertions.assertFalse(sqsWorker.isEventNameCreated(s3EventNotificationRecord)); + } + + @Test + void populateS3Reference_should_return_instance_of_S3ObjectReference() { + S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord = mock(S3EventNotification.S3EventNotificationRecord.class); + S3EventNotification.S3Entity s3Entity = mock(S3EventNotification.S3Entity.class); + S3EventNotification.S3BucketEntity s3BucketEntity = mock(S3EventNotification.S3BucketEntity.class); + S3EventNotification.S3ObjectEntity s3ObjectEntity = mock(S3EventNotification.S3ObjectEntity.class); + + when(s3EventNotificationRecord.getS3()).thenReturn(s3Entity); + when(s3Entity.getBucket()).thenReturn(s3BucketEntity); + when(s3Entity.getObject()).thenReturn(s3ObjectEntity); + + when(s3EventNotificationRecord.getS3().getBucket().getName()).thenReturn("s3-source-test-bucket"); + when(s3EventNotificationRecord.getS3().getObject().getKey()).thenReturn("s3-bucket-key"); + + S3ObjectReference s3ObjectReference = sqsWorker.populateS3Reference(s3EventNotificationRecord); + + assertThat(s3ObjectReference, instanceOf(S3ObjectReference.class)); + assertThat(s3ObjectReference.getBucketName(), equalTo("s3-source-test-bucket")); + assertThat(s3ObjectReference.getKey(), equalTo("s3-bucket-key")); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilterTest.java b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilterTest.java new file mode 100644 index 0000000000..4b93dfd8e9 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilterTest.java @@ -0,0 +1,54 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source.filter; + +import com.amazonaws.services.s3.event.S3EventNotification; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class ObjectCreatedFilterTest { + + private ObjectCreatedFilter objectCreatedFilter; + private S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord; + + + @BeforeEach + void setUp() { + objectCreatedFilter = new ObjectCreatedFilter(); + s3EventNotificationRecord = mock(S3EventNotification.S3EventNotificationRecord.class); + } + + @Test + void filter_with_eventName_ObjectCreated_should_return_non_empty_instance_of_optional() { + when(s3EventNotificationRecord.getEventName()).thenReturn("ObjectCreated:Put"); + Optional actualValue = objectCreatedFilter.filter(s3EventNotificationRecord); + + assertThat(actualValue, instanceOf(Optional.class)); + assertTrue(actualValue.isPresent()); + assertThat(actualValue, equalTo(Optional.of(s3EventNotificationRecord))); + } + + @Test + void filter_with_eventName_ObjectRemoved_should_return_empty_instance_of_optional() { + when(s3EventNotificationRecord.getEventName()).thenReturn("ObjectRemoved:Delete"); + Optional actualValue = objectCreatedFilter.filter(s3EventNotificationRecord); + + assertThat(actualValue, instanceOf(Optional.class)); + assertFalse(actualValue.isPresent()); + assertThat(actualValue, equalTo(Optional.empty())); + } + +} \ No newline at end of file