Skip to content

Commit

Permalink
Add support for AWS security lake sink as a bucket selector mode in S…
Browse files Browse the repository at this point in the history
…3 sink (opensearch-project#4846)

* dplive1.yaml

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Delete .github/workflows/static.yml

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Add support for AWS security lake sink as a bucket selector mode in S3 sink

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Fixed tests

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Added javadoc for S3BucketSelector

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Added new  tests for KeyGenerator

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Added new  tests and fixed style errors

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Addressed review comments

Signed-off-by: Kondaka <krishkdk@amazon.com>

* Fixed test build failure

Signed-off-by: Kondaka <krishkdk@amazon.com>

---------

Signed-off-by: Kondaka <krishkdk@amazon.com>
  • Loading branch information
kkondaka committed Aug 20, 2024
1 parent ff2de26 commit 8e3865d
Show file tree
Hide file tree
Showing 33 changed files with 488 additions and 73 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/s3-sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:securitylake:2.26.18'
implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.9.22'
implementation project(':data-prepper-plugins:avro-codecs')
implementation libs.avro.core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ void verify_flushed_object_count_into_s3_bucket() {

void configureNewLineCodec() {
codec = new NdjsonOutputCodec(ndjsonOutputConfig);
keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE), expressionEvaluator);
keyGenerator = new KeyGenerator(s3SinkConfig, null, StandardExtensionProvider.create(codec, CompressionOption.NONE), expressionEvaluator);
}

@Test
Expand Down Expand Up @@ -272,7 +272,7 @@ void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOEx

private S3SinkService createObjectUnderTest() {
OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList());
final S3GroupIdentifierFactory groupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig);
final S3GroupIdentifierFactory groupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig, null);
s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, codecFactory, s3AsyncClient, bucketOwnerProvider);

return new S3SinkService(s3SinkConfig, codecContext, Duration.ofSeconds(5), pluginMetrics, s3GroupManager);
Expand Down Expand Up @@ -389,7 +389,7 @@ private void configureParquetCodec() {
parquetOutputCodecConfig = new ParquetOutputCodecConfig();
parquetOutputCodecConfig.setSchema(parseSchema().toString());
codec = new ParquetOutputCodec(parquetOutputCodecConfig);
keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE), expressionEvaluator);
keyGenerator = new KeyGenerator(s3SinkConfig, null, StandardExtensionProvider.create(codec, CompressionOption.NONE), expressionEvaluator);
}

private Collection<Record<Event>> getRecordList() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@

public class KeyGenerator {
private final S3SinkConfig s3SinkConfig;
private final S3BucketSelector s3BucketSelector;
private final ExtensionProvider extensionProvider;

private final ExpressionEvaluator expressionEvaluator;

public KeyGenerator(final S3SinkConfig s3SinkConfig,
final S3BucketSelector s3BucketSelector,
final ExtensionProvider extensionProvider,
final ExpressionEvaluator expressionEvaluator) {
this.s3SinkConfig = s3SinkConfig;
this.s3BucketSelector = s3BucketSelector;
this.extensionProvider = extensionProvider;
this.expressionEvaluator = expressionEvaluator;
}
Expand All @@ -30,7 +33,7 @@ public KeyGenerator(final S3SinkConfig s3SinkConfig,
* @return object key path.
*/
public String generateKeyForEvent(final Event event) {
final String pathPrefix = ObjectKey.buildingPathPrefix(s3SinkConfig, event, expressionEvaluator);
final String pathPrefix = s3BucketSelector != null ? s3BucketSelector.getPathPrefix() : ObjectKey.buildingPathPrefix(s3SinkConfig, event, expressionEvaluator);
final String namePattern = ObjectKey.objectFileName(s3SinkConfig, extensionProvider.getExtension(), event, expressionEvaluator);
return (!pathPrefix.isEmpty()) ? pathPrefix + namePattern : namePattern;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3;

import com.fasterxml.jackson.annotation.JsonProperty;
public class PredefinedObjectMetadata {
@JsonProperty("number_of_objects")
private String numberOfObjects;

public String getNumberOfObjects() {
return numberOfObjects;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3;

public interface S3BucketSelector {
/**
* initialize - initializes the selector
* @param s3SinkConfig - s3 sink configuration
*/
void initialize(S3SinkConfig s3SinkConfig);

/**
* getBucketName - returns the name of the bucket created by the bucket selector
*
* @return - bucket name
*/
String getBucketName();

/**
* getPathPrefix - returns the prefix to be used for the objects created in the bucket
*
* @return path prefix
*/
String getPathPrefix();
}

Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,17 @@ public S3Sink(final PluginSetting pluginSetting,
bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine, testCodec);

ExtensionProvider extensionProvider = StandardExtensionProvider.create(testCodec, compressionOption);
KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, extensionProvider, expressionEvaluator);
String bucketName;
S3BucketSelector s3BucketSelector = null;
if (s3SinkConfig.getBucketSelector() != null) {
s3BucketSelector = loadS3BucketSelector(pluginFactory);
s3BucketSelector.initialize(s3SinkConfig);
bucketName = s3BucketSelector.getBucketName();
} else {
bucketName = s3SinkConfig.getBucketName();
}

KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, s3BucketSelector, extensionProvider, expressionEvaluator);

if (s3SinkConfig.getObjectKeyOptions().getPathPrefix() != null &&
!expressionEvaluator.isValidFormatExpression(s3SinkConfig.getObjectKeyOptions().getPathPrefix())) {
Expand All @@ -106,22 +116,28 @@ public S3Sink(final PluginSetting pluginSetting,
throw new InvalidPluginConfigurationException("name_pattern is not a valid format expression");
}

if (s3SinkConfig.getBucketName() != null &&
!expressionEvaluator.isValidFormatExpression(s3SinkConfig.getBucketName())) {
if (bucketName != null &&
!expressionEvaluator.isValidFormatExpression(bucketName)) {
throw new InvalidPluginConfigurationException("bucket name is not a valid format expression");
}

S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption);

testCodec.validateAgainstCodecContext(s3OutputCodecContext);

final S3GroupIdentifierFactory s3GroupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig);
final S3GroupIdentifierFactory s3GroupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig, s3BucketSelector);
final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client, bucketOwnerProvider);


s3SinkService = new S3SinkService(s3SinkConfig, s3OutputCodecContext, RETRY_FLUSH_BACKOFF, pluginMetrics, s3GroupManager);
}

private S3BucketSelector loadS3BucketSelector(PluginFactory pluginFactory) {
final PluginModel modeConfiguration = s3SinkConfig.getBucketSelector();
final PluginSetting modePluginSetting = new PluginSetting(modeConfiguration.getPluginName(), modeConfiguration.getPluginSettings());
return pluginFactory.loadPlugin(S3BucketSelector.class, modePluginSetting);
}

@Override
public boolean isReady() {
return sinkInitialized;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package org.opensearch.dataprepper.plugins.sink.s3;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.aws.validator.AwsAccountId;
Expand Down Expand Up @@ -36,10 +36,21 @@ public class S3SinkConfig {
private AwsAuthenticationOptions awsAuthenticationOptions;

@JsonProperty("bucket")
@NotEmpty
@Size(min = 3, max = 500, message = "bucket length should be at least 3 characters")
private String bucketName;

@JsonProperty("bucket_selector")
private PluginModel bucketSelector;

@JsonProperty("predefined_object_metadata")
private PredefinedObjectMetadata predefinedObjectMetadata;

@AssertTrue(message = "You may not use both bucket and bucket_selector together in one S3 sink.")
private boolean isValidBucketConfig() {
return (bucketName != null && bucketSelector == null) ||
(bucketName == null && bucketSelector != null);
}

/**
* The default bucket to send to if using a dynamic bucket name and failures occur
* for any reason when sending to a dynamic bucket
Expand Down Expand Up @@ -127,6 +138,18 @@ public ObjectKeyOptions getObjectKeyOptions() {
return objectKeyOptions;
}

public PredefinedObjectMetadata getPredefinedObjectMetadata() {
return predefinedObjectMetadata;
}

/**
* Bucket selector configuration options.
* @return bucketSelector plugin model.
*/
public PluginModel getBucketSelector() {
return bucketSelector;
}

/**
* Sink codec configuration Options.
* @return codec plugin model.
Expand Down Expand Up @@ -172,4 +195,4 @@ public Map<String, String> getBucketOwners() {
public String getDefaultBucketOwner() {
return defaultBucketOwner;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class S3SinkService {
private final int maxEvents;
private final ByteCount maxBytes;
private final Duration maxCollectionDuration;
private final String bucket;
private final int maxRetries;
private final Counter objectsSucceededCounter;
private final Counter objectsFailedCounter;
Expand Down Expand Up @@ -84,7 +83,6 @@ public S3SinkService(final S3SinkConfig s3SinkConfig,
maxBytes = s3SinkConfig.getThresholdOptions().getMaximumSize();
maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut();

bucket = s3SinkConfig.getBucketName();
maxRetries = s3SinkConfig.getMaxUploadRetries();

objectsSucceededCounter = pluginMetrics.counter(OBJECTS_SUCCEEDED);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3;

import org.apache.commons.lang3.RandomStringUtils;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import software.amazon.awssdk.services.securitylake.SecurityLakeClient;
import software.amazon.awssdk.services.securitylake.model.AwsIdentity;
import software.amazon.awssdk.services.securitylake.model.CreateCustomLogSourceRequest;
import software.amazon.awssdk.services.securitylake.model.CustomLogSourceProvider;
import software.amazon.awssdk.services.securitylake.model.CustomLogSourceConfiguration;
import software.amazon.awssdk.services.securitylake.model.CreateCustomLogSourceResponse;
import software.amazon.awssdk.services.securitylake.model.CustomLogSourceCrawlerConfiguration;

import java.time.LocalDate;
import java.util.List;

@DataPrepperPlugin(name = "aws_security_lake", pluginType = S3BucketSelector.class, pluginConfigurationType = SecurityLakeBucketSelectorConfig.class)
public class SecurityLakeBucketSelector implements S3BucketSelector {
private static final String EXT_PATH = "/ext/";
private final SecurityLakeBucketSelectorConfig securityLakeBucketSelectorConfig;

private S3SinkConfig s3SinkConfig;

private String pathPrefix;

private String sourceLocation;

@DataPrepperPluginConstructor
public SecurityLakeBucketSelector(final SecurityLakeBucketSelectorConfig securityLakeBucketSelectorConfig) {
this.securityLakeBucketSelectorConfig = securityLakeBucketSelectorConfig;
}

public void initialize(S3SinkConfig s3SinkConfig) {
this.s3SinkConfig = s3SinkConfig;
SecurityLakeClient securityLakeClient = SecurityLakeClient.create();
String arn = s3SinkConfig.getAwsAuthenticationOptions().getAwsStsRoleArn();
String principal = arn.split(":")[4];
String sourceName = securityLakeBucketSelectorConfig.getSourceName() != null ? securityLakeBucketSelectorConfig.getSourceName() : RandomStringUtils.randomAlphabetic(7);
CreateCustomLogSourceResponse response =
securityLakeClient.createCustomLogSource(
CreateCustomLogSourceRequest.builder()
.sourceName(sourceName+RandomStringUtils.randomAlphabetic(4))
.eventClasses(List.of(securityLakeBucketSelectorConfig.getLogClass()))
.sourceVersion(securityLakeBucketSelectorConfig.getSourceVersion())
.configuration(CustomLogSourceConfiguration.builder()
.crawlerConfiguration(CustomLogSourceCrawlerConfiguration.builder()
.roleArn(arn)
.build())
.providerIdentity(AwsIdentity.builder()
.externalId(securityLakeBucketSelectorConfig.getExternalId())
.principal(principal)
.build())
.build())
.build());
CustomLogSourceProvider provider = response.source().provider();
this.sourceLocation = provider.location();
final String region=s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion().toString();
final String accountId=arn.split(":")[4];

final LocalDate now = LocalDate.now();
final String eventDay = String.format("%d%02d%02d", now.getYear(), now.getMonthValue(), now.getDayOfMonth());
int locIndex = sourceLocation.indexOf(EXT_PATH);
pathPrefix = String.format("%sregion=%s/accountId=%s/eventDay=%s/",sourceLocation.substring(locIndex+1), region, accountId, eventDay);
}

public String getPathPrefix() {
return pathPrefix;
}

@Override
public String getBucketName() {
int locIndex = sourceLocation.indexOf(EXT_PATH);
return sourceLocation.substring(EXT_PATH.length(), locIndex);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3;

import com.fasterxml.jackson.annotation.JsonProperty;

public class SecurityLakeBucketSelectorConfig {
static final String DEFAULT_SOURCE_VERSION = "1.0";

static final String DEFAULT_EXTERNAL_ID = "extid";

@JsonProperty("source_name")
private String sourceName;

@JsonProperty("source_version")
private String sourceVersion = DEFAULT_SOURCE_VERSION;

@JsonProperty("external_id")
private String externalId = DEFAULT_EXTERNAL_ID;

@JsonProperty("log_class")
private String logClass;


public String getSourceName() {
return sourceName;
}

public String getSourceVersion() {
return sourceVersion;
}

public String getExternalId() {
return externalId;
}

public String getLogClass() {
return logClass;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import org.opensearch.dataprepper.plugins.sink.s3.ownership.BucketOwnerProvider;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;
import java.util.function.Supplier;
import java.util.function.Function;

public interface BufferFactory {
Buffer getBuffer(S3AsyncClient s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier, String defaultBucket, BucketOwnerProvider bucketOwnerProvider);
Buffer getBuffer(S3AsyncClient s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier, String defaultBucket, Function<Integer, Map<String, String>> metadataSupplier, BucketOwnerProvider bucketOwnerProvider);
}
Loading

0 comments on commit 8e3865d

Please sign in to comment.