diff --git a/data-prepper-plugins/cloudwatch-logs/build.gradle b/data-prepper-plugins/cloudwatch-logs/build.gradle new file mode 100644 index 0000000000..2d00cbd2d0 --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/build.gradle @@ -0,0 +1,38 @@ +plugins { + id 'java' + id 'java-library' +} + +repositories { + mavenCentral() +} + +dependencies { + api project(':data-prepper-api') + implementation project(':data-prepper-plugins:aws-plugin-api') + implementation project(path: ':data-prepper-plugins:common') + testImplementation 'org.junit.jupiter:junit-jupiter' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' + implementation 'software.amazon.awssdk:cloudwatch' + implementation 'software.amazon.awssdk:cloudwatchlogs' + implementation 'org.apache.commons:commons-lang3:3.12.0' + testImplementation project(path: ':data-prepper-test-common') + testImplementation project(path: ':data-prepper-test-common') +} + +jacocoTestCoverageVerification { + dependsOn jacocoTestReport + violationRules { + rule { //in addition to core projects rule + limit { + minimum = 0.90 + } + } + } +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/AwsConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/AwsConfig.java new file mode 100644 index 0000000000..6a2a85557e --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/AwsConfig.java @@ -0,0 +1,48 @@ +package org.opensearch.dataprepper.plugins.sink.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Size; +import software.amazon.awssdk.regions.Region; + +import java.util.Map; + +/** + * AwsConfig is based on the S3-Sink AwsAuthenticationOptions + * where the configuration allows the sink to fetch Aws credentials + * and resources. + */ +public class AwsConfig { + public static int DEFAULT_CONNECTION_ATTEMPTS = 5; + + @JsonProperty("region") + @Size(min = 1, message = "Region cannot be empty string") + private String awsRegion; + + @JsonProperty("sts_role_arn") + @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") + private String awsStsRoleArn; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + + @JsonProperty("sts_external_id") + @Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters") + private String awsStsExternalId; + + public Region getAwsRegion() { + return awsRegion != null ? Region.of(awsRegion) : null; + } + + public String getAwsStsRoleArn() { + return awsStsRoleArn; + } + + public String getAwsStsExternalId() { + return awsStsExternalId; + } + + public Map getAwsStsHeaderOverrides() { + return awsStsHeaderOverrides; + } +} diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/CwlSinkConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/CwlSinkConfig.java new file mode 100644 index 0000000000..230512bde5 --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/CwlSinkConfig.java @@ -0,0 +1,52 @@ +package org.opensearch.dataprepper.plugins.sink.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; + +public class CwlSinkConfig { + public static final String DEFAULT_BUFFER_TYPE = "in_memory"; + + @JsonProperty("aws") + @NotNull + @Valid + private AwsConfig awsConfig; + + @JsonProperty("threshold") + @NotNull + private ThresholdConfig thresholdConfig; + + @JsonProperty("buffer_type") + private String bufferType = DEFAULT_BUFFER_TYPE; + + @JsonProperty("log_group") + @NotEmpty + @NotNull + private String logGroup; + + @JsonProperty("log_stream") + @NotEmpty + @NotNull + private String logStream; + + public AwsConfig getAwsConfig() { + return awsConfig; + } + + public ThresholdConfig getThresholdConfig() { + return thresholdConfig; + } + + public String getBufferType() { + return bufferType; + } + + public String getLogGroup() { + return logGroup; + } + + public String getLogStream() { + return logStream; + } +} diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/ThresholdConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/ThresholdConfig.java new file mode 100644 index 0000000000..77571a2c29 --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/config/ThresholdConfig.java @@ -0,0 +1,66 @@ +package org.opensearch.dataprepper.plugins.sink.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Size; + +/** + * The threshold config holds the different configurations for + * buffer restrictions, retransmission restrictions and timeout + * restrictions. + */ +public class ThresholdConfig { + public static final int DEFAULT_BATCH_SIZE = 100; + public static final int DEFAULT_EVENT_SIZE = 50; + public static final int DEFAULT_SIZE_OF_REQUEST = 524288; + public static final int DEFAULT_RETRY_COUNT = 5; + public static final int DEFAULT_LOG_SEND_INTERVAL_TIME = 60; + public static final int DEFAULT_BACKOFF_TIME = 5000; + + @JsonProperty("batch_size") + @Size(min = 1, max = 10000, message = "batch_size amount should be between 1 to 10000") + private int batchSize = DEFAULT_BATCH_SIZE; + + @JsonProperty("max_event_size") + @Size(min = 1, max = 256, message = "max_event_size amount should be between 1 to 256 kilobytes") + private int maxEventSize = DEFAULT_EVENT_SIZE; + + @JsonProperty("max_request_size") + @Size(min = 1, max = 1048576, message = "max_batch_request_size amount should be between 1 and 1048576 bytes") + private int maxRequestSize = DEFAULT_SIZE_OF_REQUEST; + + @JsonProperty("retry_count") + @Size(min = 1, max = 15, message = "retry_count amount should be between 1 and 15") + private int retryCount = DEFAULT_RETRY_COUNT; + + @JsonProperty("log_send_interval") + @Size(min = 5, max = 300, message = "log_send_interval amount should be between 5 and 300 seconds") + private int logSendInterval = DEFAULT_LOG_SEND_INTERVAL_TIME; + + @JsonProperty("back_off_time") + @Size(min = 500, max = 1000, message = "back_off_time amount should be between 500 and 1000 milliseconds") + private int backOffTime = DEFAULT_BACKOFF_TIME; + + public int getBatchSize() { + return batchSize; + } + + public int getMaxEventSize() { + return maxEventSize; + } + + public int getMaxRequestSize() { + return maxRequestSize; + } + + public int getRetryCount() { + return retryCount; + } + + public int getLogSendInterval() { + return logSendInterval; + } + + public int getBackOffTime() { + return backOffTime; + } +} diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsConfigTest.java new file mode 100644 index 0000000000..8ab03d575d --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsConfigTest.java @@ -0,0 +1,87 @@ +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.plugins.sink.config.AwsConfig; +import software.amazon.awssdk.regions.Region; + +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +public class AwsConfigTest { + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(); + } + + @ParameterizedTest + @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"}) + void getAwsRegion_returns_Region_of(final String regionString) { + final Region expectedRegionObject = Region.of(regionString); + final Map jsonMap = Map.of("region", regionString); + final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class); + assertThat(objectUnderTest.getAwsRegion(), equalTo(expectedRegionObject)); + } + + @Test + void getAwsRegion_returns_null_when_region_is_null() { + final Map jsonMap = Collections.emptyMap(); + final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class); + assertThat(objectUnderTest.getAwsRegion(), nullValue()); + } + + @Test + void getAwsStsRoleArn_returns_value_from_deserialized_JSON() { + final String stsRoleArn = UUID.randomUUID().toString(); + final Map jsonMap = Map.of("sts_role_arn", stsRoleArn); + final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class); + assertThat(objectUnderTest.getAwsStsRoleArn(), equalTo(stsRoleArn)); + } + + @Test + void getAwsStsRoleArn_returns_null_if_not_in_JSON() { + final Map jsonMap = Collections.emptyMap(); + final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class); + assertThat(objectUnderTest.getAwsStsRoleArn(), nullValue()); + } + + @Test + void getAwsStsExternalId_returns_value_from_deserialized_JSON() { + final String stsExternalId = UUID.randomUUID().toString(); + final Map jsonMap = Map.of("sts_external_id", stsExternalId); + final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class); + assertThat(objectUnderTest.getAwsStsExternalId(), equalTo(stsExternalId)); + } + + @Test + void getAwsStsExternalId_returns_null_if_not_in_JSON() { + final Map jsonMap = Collections.emptyMap(); + final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class); + assertThat(objectUnderTest.getAwsStsExternalId(), nullValue()); + } + + @Test + void getAwsStsHeaderOverrides_returns_value_from_deserialized_JSON() { + final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + final Map jsonMap = Map.of("sts_header_overrides", stsHeaderOverrides); + final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class); + assertThat(objectUnderTest.getAwsStsHeaderOverrides(), equalTo(stsHeaderOverrides)); + } + + @Test + void getAwsStsHeaderOverrides_returns_null_if_not_in_JSON() { + final Map jsonMap = Collections.emptyMap(); + final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class); + assertThat(objectUnderTest.getAwsStsHeaderOverrides(), nullValue()); + } +} diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/CwlSinkConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/CwlSinkConfigTest.java new file mode 100644 index 0000000000..9842a333ee --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/CwlSinkConfigTest.java @@ -0,0 +1,63 @@ +package org.opensearch.dataprepper.plugins.sink.configuration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.sink.config.AwsConfig; +import org.opensearch.dataprepper.plugins.sink.config.CwlSinkConfig; +import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class CwlSinkConfigTest { + private CwlSinkConfig cwlSinkConfig; + private AwsConfig awsConfig; + private ThresholdConfig thresholdConfig; + private final String LOG_GROUP = "testLogGroup"; + private final String LOG_STREAM = "testLogStream"; + + @BeforeEach + void setUp() { + cwlSinkConfig = new CwlSinkConfig(); + awsConfig = new AwsConfig(); + thresholdConfig = new ThresholdConfig(); + } + + @Test + void check_null_auth_config_test() { + assertThat(new CwlSinkConfig().getAwsConfig(), equalTo(null)); + } + + @Test + void check_default_buffer_type_test() { + assertThat(new CwlSinkConfig().getBufferType(), equalTo(CwlSinkConfig.DEFAULT_BUFFER_TYPE)); + } + + @Test + void check_null_log_group_test() { + assertThat(new CwlSinkConfig().getLogGroup(), equalTo(null)); + } + @Test + void check_null_log_stream_test() { + assertThat(new CwlSinkConfig().getLogStream(), equalTo(null)); + } + + @Test + void check_valid_log_group_and_log_stream_test() throws NoSuchFieldException, IllegalAccessException { + ReflectivelySetField.setField(cwlSinkConfig.getClass(), cwlSinkConfig, "logGroup", LOG_GROUP); + ReflectivelySetField.setField(cwlSinkConfig.getClass(), cwlSinkConfig, "logStream", LOG_STREAM); + + assertThat(cwlSinkConfig.getLogGroup(), equalTo(LOG_GROUP)); + assertThat(cwlSinkConfig.getLogStream(), equalTo(LOG_STREAM)); + } + + @Test + void check_valid_sub_config_test() throws NoSuchFieldException, IllegalAccessException { + ReflectivelySetField.setField(cwlSinkConfig.getClass(), cwlSinkConfig, "thresholdConfig", thresholdConfig); + ReflectivelySetField.setField(cwlSinkConfig.getClass(), cwlSinkConfig, "awsConfig", awsConfig); + + assertThat(cwlSinkConfig.getAwsConfig(), equalTo(awsConfig)); + assertThat(cwlSinkConfig.getThresholdConfig(), equalTo(thresholdConfig)); + } +} diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdConfigTest.java new file mode 100644 index 0000000000..e6af96e08d --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdConfigTest.java @@ -0,0 +1,82 @@ +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig; + +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class ThresholdConfigTest { + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(); + } + + @Test + void check_default_values() { + final ThresholdConfig thresholdConfig = new ThresholdConfig(); + + assertThat(thresholdConfig.getBackOffTime(), equalTo(ThresholdConfig.DEFAULT_BACKOFF_TIME)); + assertThat(thresholdConfig.getRetryCount(), equalTo(ThresholdConfig.DEFAULT_RETRY_COUNT)); + assertThat(thresholdConfig.getBatchSize(), equalTo(ThresholdConfig.DEFAULT_BATCH_SIZE)); + assertThat(thresholdConfig.getMaxEventSize(), equalTo(ThresholdConfig.DEFAULT_EVENT_SIZE)); + assertThat(thresholdConfig.getMaxRequestSize(), equalTo(ThresholdConfig.DEFAULT_SIZE_OF_REQUEST)); + assertThat(thresholdConfig.getLogSendInterval(), equalTo(ThresholdConfig.DEFAULT_LOG_SEND_INTERVAL_TIME)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 10, 10000}) + void check_valid_batch_size(final int batchSize) { + final Map jsonMap = Map.of("batch_size", batchSize); + final ThresholdConfig thresholdConfigTest = objectMapper.convertValue(jsonMap, ThresholdConfig.class); + assertThat(thresholdConfigTest.getBatchSize(), equalTo(batchSize)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 10, 256}) + void check_valid_max_event_size(final int max_event_size) { + final Map jsonMap = Map.of("max_event_size", max_event_size); + final ThresholdConfig thresholdConfigTest = objectMapper.convertValue(jsonMap, ThresholdConfig.class); + assertThat(thresholdConfigTest.getMaxEventSize(), equalTo(max_event_size)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 100, 1048576}) + void check_valid_request_size(final int max_batch_request_size) { + final Map jsonMap = Map.of("max_request_size", max_batch_request_size); + final ThresholdConfig thresholdConfigTest = objectMapper.convertValue(jsonMap, ThresholdConfig.class); + assertThat(thresholdConfigTest.getMaxRequestSize(), equalTo(max_batch_request_size)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 10, 15}) + void check_valid_retry_count(final int retry_count) { + final Map jsonMap = Map.of("retry_count", retry_count); + final ThresholdConfig thresholdConfigTest = objectMapper.convertValue(jsonMap, ThresholdConfig.class); + assertThat(thresholdConfigTest.getRetryCount(), equalTo(retry_count)); + } + + @ParameterizedTest + @ValueSource(ints = {5, 10, 300}) + void check_valid_log_send_interval(final int log_send_interval) { + final Map jsonMap = Map.of("log_send_interval", log_send_interval); + final ThresholdConfig thresholdConfigTest = objectMapper.convertValue(jsonMap, ThresholdConfig.class); + assertThat(thresholdConfigTest.getLogSendInterval(), equalTo(log_send_interval)); + } + + @ParameterizedTest + @ValueSource(ints = {0, 100, 5000}) + void check_valid_back_off_time(final int back_off_time) { + final Map jsonMap = Map.of("back_off_time", back_off_time); + final ThresholdConfig thresholdConfigTest = objectMapper.convertValue(jsonMap, ThresholdConfig.class); + assertThat(thresholdConfigTest.getBackOffTime(), equalTo(back_off_time)); + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java index e588e1f711..b32f4bb3a5 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java @@ -189,7 +189,6 @@ private void attachBasicAuth(final org.elasticsearch.client.RestClientBuilder re } else { LOG.warn("Authentication was explicitly disabled for the OpenSearch source"); } - attachSSLContext(httpClientBuilder, openSearchSourceConfiguration); httpClientBuilder.addInterceptorLast( (HttpResponseInterceptor) diff --git a/settings.gradle b/settings.gradle index d668b7d12c..67c85c3a12 100644 --- a/settings.gradle +++ b/settings.gradle @@ -126,4 +126,5 @@ include 'data-prepper-plugins:parquet-codecs' include 'data-prepper-plugins:aws-sqs-common' include 'data-prepper-plugins:buffer-common' include 'data-prepper-plugins:sqs-source' +include 'data-prepper-plugins:cloudwatch-logs' include 'data-prepper-plugins:http-sink'