Skip to content

Commit

Permalink
Add support for lambda sink (opensearch-project#4292)
Browse files Browse the repository at this point in the history
* Add support for lambda sink

Signed-off-by: srigovs <srigovs@amazon.com>

* Address event handle comment

Signed-off-by: Srikanth Govindarajan <srikanthjg123@gmail.com>

---------

Signed-off-by: srigovs <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srikanthjg123@gmail.com>
  • Loading branch information
srikanthjg authored Jun 6, 2024
1 parent 5cf0927 commit 4cf86e9
Show file tree
Hide file tree
Showing 30 changed files with 2,521 additions and 2 deletions.
36 changes: 36 additions & 0 deletions data-prepper-plugins/lambda-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Lambda Sink

This plugin enables you to send data from your Data Prepper pipeline directly to AWS Lambda functions for further processing.

## Usage
```aidl
lambda-pipeline:
...
sink:
- lambda:
aws:
region: "us-east-1"
sts_role_arn: "<arn>"
function_name: "uploadToS3Lambda"
max_retries: 3
batch:
batch_key: "osi_key"
threshold:
event_count: 3
maximum_size: 6mb
event_collect_timeout: 15s
dlq:
s3:
bucket: test-bucket
key_path_prefix: dlq/
```

## Developer Guide

The integration tests for this plugin do not run as part of the Data Prepper build.
The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:lambda-sink:integrationTest -Dtests.sink.lambda.region="us-east-1" -Dtests.sink.lambda.functionName="lambda_test_function" -Dtests.sink.lambda.sts_role_arn="arn:aws:iam::123456789012:role/dataprepper-role
```
63 changes: 63 additions & 0 deletions data-prepper-plugins/lambda-sink/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

dependencies {
implementation project(':data-prepper-api')
implementation project(path: ':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(':data-prepper-plugins:failures-common')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'software.amazon.awssdk:lambda:2.17.99'
implementation 'software.amazon.awssdk:sdk-core:2.x.x'
implementation 'software.amazon.awssdk:sts'
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation'org.json:json'
implementation libs.commons.lang3
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-plugins:parse-json-processor')
}

test {
useJUnitPlatform()
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
resources.srcDir file('src/integrationTest/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'
systemProperty 'tests.sink.lambda.region', System.getProperty('tests.sink.lambda.region')
systemProperty 'tests.sink.lambda.functionName', System.getProperty('tests.sink.lambda.functionName')
systemProperty 'tests.sink.lambda.sts_role_arn', System.getProperty('tests.sink.lambda.sts_role_arn')

filter {
includeTestsMatching '*IT'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.lambda.accumlator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.lambda.config.BatchOptions;
import org.opensearch.dataprepper.plugins.sink.lambda.config.ThresholdOptions;
import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.lambda.dlq.DlqPushHandler;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaClient;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class LambdaSinkServiceIT {

private LambdaClient lambdaClient;
private String functionName;
private String lambdaRegion;
private String role;
private BufferFactory bufferFactory;
@Mock
private LambdaSinkConfig lambdaSinkConfig;
@Mock
private BatchOptions batchOptions;
@Mock
private ThresholdOptions thresholdOptions;
@Mock
private AwsAuthenticationOptions awsAuthenticationOptions;
@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;
@Mock
private PluginMetrics pluginMetrics;
@Mock
private DlqPushHandler dlqPushHandler;
@Mock
private PluginFactory pluginFactory;
@Mock
private PluginSetting pluginSetting;
@Mock
private Counter numberOfRecordsSuccessCounter;
@Mock
private Counter numberOfRecordsFailedCounter;
private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));


@BeforeEach
public void setUp() throws Exception {
MockitoAnnotations.openMocks(this);
lambdaRegion = System.getProperty("tests.sink.lambda.region");
functionName = System.getProperty("tests.sink.lambda.functionName");
role = System.getProperty("tests.sink.lambda.sts_role_arn");

final Region region = Region.of(lambdaRegion);

lambdaClient = LambdaClient.builder()
.region(Region.of(lambdaRegion))
.build();

bufferFactory = new InMemoryBufferFactory();

when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS)).
thenReturn(numberOfRecordsSuccessCounter);
when(pluginMetrics.counter(LambdaSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED)).
thenReturn(numberOfRecordsFailedCounter);
}


private static Record<Event> createRecord() {
final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build();
return new Record<>(event);
}

public LambdaSinkService createObjectUnderTest(final String config) throws JsonProcessingException {

final LambdaSinkConfig lambdaSinkConfig = objectMapper.readValue(config, LambdaSinkConfig.class);
OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList());
pluginFactory = null;
return new LambdaSinkService(lambdaClient,
lambdaSinkConfig,
pluginMetrics,
pluginFactory,
pluginSetting,
codecContext,
awsCredentialsSupplier,
dlqPushHandler,
bufferFactory);
}

public LambdaSinkService createObjectUnderTest(LambdaSinkConfig lambdaSinkConfig) throws JsonProcessingException {

OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList());
pluginFactory = null;
return new LambdaSinkService(lambdaClient,
lambdaSinkConfig,
pluginMetrics,
pluginFactory,
pluginSetting,
codecContext,
awsCredentialsSupplier,
dlqPushHandler,
bufferFactory);
}


private static Collection<Record<Event>> generateRecords(int numberOfRecords) {
List<Record<Event>> recordList = new ArrayList<>();

for (int rows = 0; rows < numberOfRecords; rows++) {
HashMap<String, String> eventData = new HashMap<>();
eventData.put("name", "Person" + rows);
eventData.put("age", Integer.toString(rows));

Record<Event> eventRecord = new Record<>(JacksonEvent.builder().withData(eventData).withEventType("event").build());
recordList.add(eventRecord);
}
return recordList;
}

@ParameterizedTest
@ValueSource(ints = {1,5})
void verify_flushed_records_to_lambda_success(final int recordCount) throws Exception {

final String LAMBDA_SINK_CONFIG_YAML =
" function_name: " + functionName +"\n" +
" aws:\n" +
" region: us-east-1\n" +
" sts_role_arn: " + role + "\n" +
" max_retries: 3\n";
LambdaSinkService objectUnderTest = createObjectUnderTest(LAMBDA_SINK_CONFIG_YAML);

Collection<Record<Event>> recordsData = generateRecords(recordCount);
objectUnderTest.output(recordsData);
Thread.sleep(Duration.ofSeconds(10).toMillis());

verify(numberOfRecordsSuccessCounter, times(recordCount)).increment(1);
}

@ParameterizedTest
@ValueSource(ints = {1,5,10})
void verify_flushed_records_to_lambda_failed_and_dlq_works(final int recordCount) throws Exception {
final String LAMBDA_SINK_CONFIG_INVALID_FUNCTION_NAME =
" function_name: $$$\n" +
" aws:\n" +
" region: us-east-1\n" +
" sts_role_arn: arn:aws:iam::176893235612:role/osis-s3-opensearch-role\n" +
" max_retries: 3\n" +
" dlq: #any failed even\n"+
" s3:\n"+
" bucket: test-bucket\n"+
" key_path_prefix: dlq/\n";
LambdaSinkService objectUnderTest = createObjectUnderTest(LAMBDA_SINK_CONFIG_INVALID_FUNCTION_NAME);

Collection<Record<Event>> recordsData = generateRecords(recordCount);
objectUnderTest.output(recordsData);
Thread.sleep(Duration.ofSeconds(10).toMillis());

verify( numberOfRecordsFailedCounter, times(recordCount)).increment(1);
}

@ParameterizedTest
@ValueSource(ints = {2,5})
void verify_flushed_records_with_batching_to_lambda(final int recordCount) throws JsonProcessingException, InterruptedException {

int event_count = 2;
when(lambdaSinkConfig.getFunctionName()).thenReturn(functionName);
when(lambdaSinkConfig.getMaxConnectionRetries()).thenReturn(3);
when(thresholdOptions.getEventCount()).thenReturn(event_count);
when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb"));
when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT10s"));
when(batchOptions.getBatchKey()).thenReturn("lambda_batch_key");
when(batchOptions.getThresholdOptions()).thenReturn(thresholdOptions);
when(lambdaSinkConfig.getBatchOptions()).thenReturn(batchOptions);

LambdaSinkService objectUnderTest = createObjectUnderTest(lambdaSinkConfig);
Collection<Record<Event>> recordsData = generateRecords(recordCount);
objectUnderTest.output(recordsData);
Thread.sleep(Duration.ofSeconds(10).toMillis());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.lambda;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.sink.lambda.config.AwsAuthenticationOptions;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.lambda.LambdaClient;

public final class LambdaClientFactory {
private LambdaClientFactory() { }

static LambdaClient createLambdaClient(final LambdaSinkConfig lambdaSinkConfig,
final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(lambdaSinkConfig.getAwsAuthenticationOptions());
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions);

return LambdaClient.builder()
.region(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsRegion())
.credentialsProvider(awsCredentialsProvider)
.overrideConfiguration(createOverrideConfiguration(lambdaSinkConfig)).build();

}

private static ClientOverrideConfiguration createOverrideConfiguration(final LambdaSinkConfig lambdaSinkConfig) {
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(lambdaSinkConfig.getMaxConnectionRetries()).build();
return ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
.build();
}

private static AwsCredentialsOptions convertToCredentialsOptions(final AwsAuthenticationOptions awsAuthenticationOptions) {
return AwsCredentialsOptions.builder()
.withRegion(awsAuthenticationOptions.getAwsRegion())
.withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn())
.withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId())
.withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides())
.build();
}
}
Loading

0 comments on commit 4cf86e9

Please sign in to comment.