Skip to content

Commit

Permalink
Support AWS Kinesis Data Streams as a Source (#4836)
Browse files Browse the repository at this point in the history
Support AWS Kinesis Data Streams as a Source

Signed-off-by: Souvik Bose <souvbose@amazon.com>
  • Loading branch information
sb2k16 committed Sep 19, 2024
1 parent ce9cd64 commit a3bd538
Show file tree
Hide file tree
Showing 46 changed files with 3,623 additions and 1 deletion.
42 changes: 42 additions & 0 deletions data-prepper-plugins/kinesis-source/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'io.micrometer:micrometer-core'
implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0'
compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'

testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-test-event')
testImplementation project(':data-prepper-core')
testImplementation project(':data-prepper-plugin-framework')
testImplementation project(':data-prepper-pipeline-parser')
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-plugins:parse-json-processor')
testImplementation project(':data-prepper-plugins:newline-codecs')
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 1.0
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;

@Getter
public class KinesisLeaseConfig {
@JsonProperty("lease_coordination")
private KinesisLeaseCoordinationTableConfig leaseCoordinationTable;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;

import org.opensearch.dataprepper.model.annotations.DataPrepperExtensionPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.plugin.ExtensionPlugin;
import org.opensearch.dataprepper.model.plugin.ExtensionPoints;

@DataPrepperExtensionPlugin(modelType = KinesisLeaseConfig.class, rootKeyJsonPath = "/kinesis", allowInPipelineConfigurations = true)
public class KinesisLeaseConfigExtension implements ExtensionPlugin {

private KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier;
@DataPrepperPluginConstructor
public KinesisLeaseConfigExtension(final KinesisLeaseConfig kinesisLeaseConfig) {
this.kinesisLeaseConfigSupplier = new KinesisLeaseConfigSupplier(kinesisLeaseConfig);
}

@Override
public void apply(final ExtensionPoints extensionPoints) {
extensionPoints.addExtensionProvider(new KinesisLeaseConfigProvider(this.kinesisLeaseConfigSupplier));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;

import org.opensearch.dataprepper.model.plugin.ExtensionProvider;

import java.util.Optional;

class KinesisLeaseConfigProvider implements ExtensionProvider<KinesisLeaseConfigSupplier> {
private final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier;

public KinesisLeaseConfigProvider(final KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier) {
this.kinesisLeaseConfigSupplier = kinesisLeaseConfigSupplier;
}

@Override
public Optional<KinesisLeaseConfigSupplier> provideInstance(Context context) {
return Optional.of(this.kinesisLeaseConfigSupplier);
}

@Override
public Class<KinesisLeaseConfigSupplier> supportedClass() {
return KinesisLeaseConfigSupplier.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;

import java.util.Optional;

public class KinesisLeaseConfigSupplier {

private KinesisLeaseConfig kinesisLeaseConfig;

public KinesisLeaseConfigSupplier(final KinesisLeaseConfig kinesisLeaseConfig) {
this.kinesisLeaseConfig = kinesisLeaseConfig;
}

public Optional<KinesisLeaseConfig> getKinesisExtensionLeaseConfig() {
return Optional.ofNullable(kinesisLeaseConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.extension;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.NonNull;
import software.amazon.awssdk.regions.Region;

@Getter
public class KinesisLeaseCoordinationTableConfig {

@JsonProperty("table_name")
@NonNull
private String tableName;

@JsonProperty("region")
@NonNull
private String region;

public Region getAwsRegion() {
return Region.of(region);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* Generate a unique ID to represent a consumer application instance.
*/
public class HostNameWorkerIdentifierGenerator implements WorkerIdentifierGenerator {

private static final String hostName;

static {
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (final UnknownHostException e) {
throw new RuntimeException(e);
}
}


/**
* @return Default to use host name.
*/
@Override
public String generate() {
return hostName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.KinesisClientUtil;

public class KinesisClientFactory {
private final AwsCredentialsProvider awsCredentialsProvider;
private final AwsCredentialsProvider defaultCredentialsProvider;
private final AwsAuthenticationConfig awsAuthenticationConfig;

public KinesisClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier,
final AwsAuthenticationConfig awsAuthenticationConfig) {
awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder()
.withRegion(awsAuthenticationConfig.getAwsRegion())
.withStsRoleArn(awsAuthenticationConfig.getAwsStsRoleArn())
.withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId())
.withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides())
.build());
defaultCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.defaultOptions());
this.awsAuthenticationConfig = awsAuthenticationConfig;
}

public DynamoDbAsyncClient buildDynamoDBClient(Region region) {
return DynamoDbAsyncClient.builder()
.credentialsProvider(defaultCredentialsProvider)
.region(region)
.build();
}

public KinesisAsyncClient buildKinesisAsyncClient(Region region) {
return KinesisClientUtil.createKinesisAsyncClient(
KinesisAsyncClient.builder()
.credentialsProvider(awsCredentialsProvider)
.region(region)
);
}

public CloudWatchAsyncClient buildCloudWatchAsyncClient(Region region) {
return CloudWatchAsyncClient.builder()
.credentialsProvider(defaultCredentialsProvider)
.region(region)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source;

import com.amazonaws.arn.Arn;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;


public class KinesisMultiStreamTracker implements MultiStreamTracker {
private static final String COLON = ":";

private final KinesisAsyncClient kinesisClient;
private final KinesisSourceConfig sourceConfig;
private final String applicationName;

public KinesisMultiStreamTracker(KinesisAsyncClient kinesisClient, final KinesisSourceConfig sourceConfig, final String applicationName) {
this.kinesisClient = kinesisClient;
this.sourceConfig = sourceConfig;
this.applicationName = applicationName;
}

@Override
public List<StreamConfig> streamConfigList() {
List<StreamConfig> streamConfigList = new ArrayList<>();
for (KinesisStreamConfig kinesisStreamConfig : sourceConfig.getStreams()) {
StreamConfig streamConfig = getStreamConfig(kinesisStreamConfig);
streamConfigList.add(streamConfig);
}
return streamConfigList;
}

private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) {
StreamIdentifier sourceStreamIdentifier = getStreamIdentifier(kinesisStreamConfig);
return new StreamConfig(sourceStreamIdentifier,
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()));
}

private StreamIdentifier getStreamIdentifier(KinesisStreamConfig kinesisStreamConfig) {
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(kinesisStreamConfig.getName())
.build();
DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest).join();
String streamIdentifierString = getStreamIdentifierString(describeStreamResponse.streamDescription());
return StreamIdentifier.multiStreamInstance(streamIdentifierString);
}

private String getStreamIdentifierString(StreamDescription streamDescription) {
String accountId = Arn.fromString(streamDescription.streamARN()).getAccountId();
long creationEpochSecond = streamDescription.streamCreationTimestamp().getEpochSecond();
return String.join(COLON, accountId, streamDescription.streamName(), String.valueOf(creationEpochSecond));
}

/**
* Setting the deletion policy as autodetect and release shard lease with a wait time of 10 sec
*/
@Override
public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() {
return new FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy() {
@Override
public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofSeconds(10);
}
};

}
}
Loading

0 comments on commit a3bd538

Please sign in to comment.