Skip to content

Commit

Permalink
GitHub-Issue#2778: Added CloudWatchLogs Sink Config Files (#2922)
Browse files Browse the repository at this point in the history
* Elasticsearch client implementation with pit and no context search (#2910)

Create Elasticsearch client, implement search and pit apis for ElasticsearchAccessor

Signed-off-by: Taylor Gray <tylgry@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* GitHub-Issue#2778: Refactoring config files for CloudWatchLogs Sink (#4)

Added Config Files for CloudWatchLogs Sink.

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added fixes from comments to code (including pathing and nomenclature syntax)

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Refactoring config (#5)

Added default params for back_off and log_send_interval alongside test cases for ThresholdConfig.

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Fixed deleted AwsConfig file

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Removed the s3 dependency from build.gradle, replaced the AwsAuth.. with AwsConfig.

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added modifiable back_off_timer, added threshold test for back_off_timer and params to AwsConfig

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added fixes to gradle file, added tests to AwsConfig, and used Reflective mapping to tests CwlSink

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added default value test to ThresholdConfig and renamed getter for maxRequestSize

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Removed unnecessary imports

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added cloudwatch-logs to settings.gradle

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added a quick fix to the back_off_time range

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

---------

Signed-off-by: Taylor Gray <tylgry@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <95880281+MaGonzalMayedo@users.noreply.github.com>
Co-authored-by: Taylor Gray <tylgry@amazon.com>
Co-authored-by: Marcos <alemayed@amazon.com>
  • Loading branch information
3 people committed Jul 5, 2023
1 parent e66b091 commit 4aa35a3
Show file tree
Hide file tree
Showing 9 changed files with 437 additions and 1 deletion.
38 changes: 38 additions & 0 deletions data-prepper-plugins/cloudwatch-logs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
plugins {
id 'java'
id 'java-library'
}

repositories {
mavenCentral()
}

dependencies {
api project(':data-prepper-api')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:common')
testImplementation 'org.junit.jupiter:junit-jupiter'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'software.amazon.awssdk:cloudwatch'
implementation 'software.amazon.awssdk:cloudwatchlogs'
implementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation project(path: ':data-prepper-test-common')
testImplementation project(path: ':data-prepper-test-common')
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.90
}
}
}
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.opensearch.dataprepper.plugins.sink.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.regions.Region;

import java.util.Map;

/**
* AwsConfig is based on the S3-Sink AwsAuthenticationOptions
* where the configuration allows the sink to fetch Aws credentials
* and resources.
*/
public class AwsConfig {
public static int DEFAULT_CONNECTION_ATTEMPTS = 5;

@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

@JsonProperty("sts_header_overrides")
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

@JsonProperty("sts_external_id")
@Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters")
private String awsStsExternalId;

public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}

public String getAwsStsExternalId() {
return awsStsExternalId;
}

public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.opensearch.dataprepper.plugins.sink.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

public class CwlSinkConfig {
public static final String DEFAULT_BUFFER_TYPE = "in_memory";

@JsonProperty("aws")
@NotNull
@Valid
private AwsConfig awsConfig;

@JsonProperty("threshold")
@NotNull
private ThresholdConfig thresholdConfig;

@JsonProperty("buffer_type")
private String bufferType = DEFAULT_BUFFER_TYPE;

@JsonProperty("log_group")
@NotEmpty
@NotNull
private String logGroup;

@JsonProperty("log_stream")
@NotEmpty
@NotNull
private String logStream;

public AwsConfig getAwsConfig() {
return awsConfig;
}

public ThresholdConfig getThresholdConfig() {
return thresholdConfig;
}

public String getBufferType() {
return bufferType;
}

public String getLogGroup() {
return logGroup;
}

public String getLogStream() {
return logStream;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.opensearch.dataprepper.plugins.sink.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;

/**
* The threshold config holds the different configurations for
* buffer restrictions, retransmission restrictions and timeout
* restrictions.
*/
public class ThresholdConfig {
public static final int DEFAULT_BATCH_SIZE = 100;
public static final int DEFAULT_EVENT_SIZE = 50;
public static final int DEFAULT_SIZE_OF_REQUEST = 524288;
public static final int DEFAULT_RETRY_COUNT = 5;
public static final int DEFAULT_LOG_SEND_INTERVAL_TIME = 60;
public static final int DEFAULT_BACKOFF_TIME = 5000;

@JsonProperty("batch_size")
@Size(min = 1, max = 10000, message = "batch_size amount should be between 1 to 10000")
private int batchSize = DEFAULT_BATCH_SIZE;

@JsonProperty("max_event_size")
@Size(min = 1, max = 256, message = "max_event_size amount should be between 1 to 256 kilobytes")
private int maxEventSize = DEFAULT_EVENT_SIZE;

@JsonProperty("max_request_size")
@Size(min = 1, max = 1048576, message = "max_batch_request_size amount should be between 1 and 1048576 bytes")
private int maxRequestSize = DEFAULT_SIZE_OF_REQUEST;

@JsonProperty("retry_count")
@Size(min = 1, max = 15, message = "retry_count amount should be between 1 and 15")
private int retryCount = DEFAULT_RETRY_COUNT;

@JsonProperty("log_send_interval")
@Size(min = 5, max = 300, message = "log_send_interval amount should be between 5 and 300 seconds")
private int logSendInterval = DEFAULT_LOG_SEND_INTERVAL_TIME;

@JsonProperty("back_off_time")
@Size(min = 500, max = 1000, message = "back_off_time amount should be between 500 and 1000 milliseconds")
private int backOffTime = DEFAULT_BACKOFF_TIME;

public int getBatchSize() {
return batchSize;
}

public int getMaxEventSize() {
return maxEventSize;
}

public int getMaxRequestSize() {
return maxRequestSize;
}

public int getRetryCount() {
return retryCount;
}

public int getLogSendInterval() {
return logSendInterval;
}

public int getBackOffTime() {
return backOffTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.opensearch.dataprepper.plugins.sink.configuration;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.plugins.sink.config.AwsConfig;
import software.amazon.awssdk.regions.Region;

import java.util.Collections;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;

public class AwsConfigTest {
private ObjectMapper objectMapper;

@BeforeEach
void setUp() {
objectMapper = new ObjectMapper();
}

@ParameterizedTest
@ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"})
void getAwsRegion_returns_Region_of(final String regionString) {
final Region expectedRegionObject = Region.of(regionString);
final Map<String, Object> jsonMap = Map.of("region", regionString);
final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class);
assertThat(objectUnderTest.getAwsRegion(), equalTo(expectedRegionObject));
}

@Test
void getAwsRegion_returns_null_when_region_is_null() {
final Map<String, Object> jsonMap = Collections.emptyMap();
final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class);
assertThat(objectUnderTest.getAwsRegion(), nullValue());
}

@Test
void getAwsStsRoleArn_returns_value_from_deserialized_JSON() {
final String stsRoleArn = UUID.randomUUID().toString();
final Map<String, Object> jsonMap = Map.of("sts_role_arn", stsRoleArn);
final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class);
assertThat(objectUnderTest.getAwsStsRoleArn(), equalTo(stsRoleArn));
}

@Test
void getAwsStsRoleArn_returns_null_if_not_in_JSON() {
final Map<String, Object> jsonMap = Collections.emptyMap();
final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class);
assertThat(objectUnderTest.getAwsStsRoleArn(), nullValue());
}

@Test
void getAwsStsExternalId_returns_value_from_deserialized_JSON() {
final String stsExternalId = UUID.randomUUID().toString();
final Map<String, Object> jsonMap = Map.of("sts_external_id", stsExternalId);
final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class);
assertThat(objectUnderTest.getAwsStsExternalId(), equalTo(stsExternalId));
}

@Test
void getAwsStsExternalId_returns_null_if_not_in_JSON() {
final Map<String, Object> jsonMap = Collections.emptyMap();
final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class);
assertThat(objectUnderTest.getAwsStsExternalId(), nullValue());
}

@Test
void getAwsStsHeaderOverrides_returns_value_from_deserialized_JSON() {
final Map<String, String> stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
final Map<String, Object> jsonMap = Map.of("sts_header_overrides", stsHeaderOverrides);
final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class);
assertThat(objectUnderTest.getAwsStsHeaderOverrides(), equalTo(stsHeaderOverrides));
}

@Test
void getAwsStsHeaderOverrides_returns_null_if_not_in_JSON() {
final Map<String, Object> jsonMap = Collections.emptyMap();
final AwsConfig objectUnderTest = objectMapper.convertValue(jsonMap, AwsConfig.class);
assertThat(objectUnderTest.getAwsStsHeaderOverrides(), nullValue());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.opensearch.dataprepper.plugins.sink.configuration;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.sink.config.AwsConfig;
import org.opensearch.dataprepper.plugins.sink.config.CwlSinkConfig;
import org.opensearch.dataprepper.plugins.sink.config.ThresholdConfig;
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

public class CwlSinkConfigTest {
private CwlSinkConfig cwlSinkConfig;
private AwsConfig awsConfig;
private ThresholdConfig thresholdConfig;
private final String LOG_GROUP = "testLogGroup";
private final String LOG_STREAM = "testLogStream";

@BeforeEach
void setUp() {
cwlSinkConfig = new CwlSinkConfig();
awsConfig = new AwsConfig();
thresholdConfig = new ThresholdConfig();
}

@Test
void check_null_auth_config_test() {
assertThat(new CwlSinkConfig().getAwsConfig(), equalTo(null));
}

@Test
void check_default_buffer_type_test() {
assertThat(new CwlSinkConfig().getBufferType(), equalTo(CwlSinkConfig.DEFAULT_BUFFER_TYPE));
}

@Test
void check_null_log_group_test() {
assertThat(new CwlSinkConfig().getLogGroup(), equalTo(null));
}
@Test
void check_null_log_stream_test() {
assertThat(new CwlSinkConfig().getLogStream(), equalTo(null));
}

@Test
void check_valid_log_group_and_log_stream_test() throws NoSuchFieldException, IllegalAccessException {
ReflectivelySetField.setField(cwlSinkConfig.getClass(), cwlSinkConfig, "logGroup", LOG_GROUP);
ReflectivelySetField.setField(cwlSinkConfig.getClass(), cwlSinkConfig, "logStream", LOG_STREAM);

assertThat(cwlSinkConfig.getLogGroup(), equalTo(LOG_GROUP));
assertThat(cwlSinkConfig.getLogStream(), equalTo(LOG_STREAM));
}

@Test
void check_valid_sub_config_test() throws NoSuchFieldException, IllegalAccessException {
ReflectivelySetField.setField(cwlSinkConfig.getClass(), cwlSinkConfig, "thresholdConfig", thresholdConfig);
ReflectivelySetField.setField(cwlSinkConfig.getClass(), cwlSinkConfig, "awsConfig", awsConfig);

assertThat(cwlSinkConfig.getAwsConfig(), equalTo(awsConfig));
assertThat(cwlSinkConfig.getThresholdConfig(), equalTo(thresholdConfig));
}
}
Loading

0 comments on commit 4aa35a3

Please sign in to comment.