Skip to content

Commit

Permalink
GitHub-Issue#2778: Added CouldWatchLogsService, Tests and Retransmiss…
Browse files Browse the repository at this point in the history
…ionException (opensearch-project#3023)

* Elasticsearch client implementation with pit and no context search (opensearch-project#2910)

Create Elasticsearch client, implement search and pit apis for ElasticsearchAccessor

Signed-off-by: Taylor Gray <tylgry@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* GitHub-Issue#2778: Refactoring config files for CloudWatchLogs Sink (#4)

Added Config Files for CloudWatchLogs Sink.

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added fixes from comments to code (including pathing and nomenclature syntax)

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Refactoring config (#5)

Added default params for back_off and log_send_interval alongside test cases for ThresholdConfig.

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Fixed deleted AwsConfig file

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Removed the s3 dependency from build.gradle, replaced the AwsAuth.. with AwsConfig.

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added modifiable back_off_timer, added threshold test for back_off_timer and params to AwsConfig

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added fixes to gradle file, added tests to AwsConfig, and used Reflective mapping to tests CwlSink

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added default value test to ThresholdConfig and renamed getter for maxRequestSize

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Removed unnecessary imports

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added cloudwatch-logs to settings.gradle

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added a quick fix to the back_off_time range

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added Buffer classes, ClientFactory similar to S3, and ThresholdCheck

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Removed unnecessary default method from ClientFactory

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added comments in Buffer Interface, change some default values to suit the plugin use case more

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Removed unused imports

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Changed the unused imports, made parameters final in the ThresholdCheck

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Made changes to the tests and the method signatures in ThresholdCheck, made fixes to gradle file to include catalog

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Removed unused methods/comments

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added CloudWatchLogsService, CloudWatchLogsServiceTest and RetransmissionLimitException

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Fixed retransmission logging fixed value

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Fixed unused imports

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Fixed making ThresholdCheck public

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added fixes to ThresholdCheck and CloudWatchLogsService to decouple methods

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Fixed syntax start import in CloudWatchLogsServiceTest

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Extracted LogPusher and SinkStopWatch classes for code cleanup. Addded fixes to variables and retry logic for InterruptExceptions

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Changed method uses in CloudWatchLogsService and removed logging the batch size in LogPusher

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added Multithreaded CloudWatchLogsDispatcher for handling various async calls to perform PLE's

and added tests

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added fixesto test and defaulted the parameters in the config to CloudWatchLogs limits, customer can change this in config file

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added exponential backofftime

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Fixed unused imports

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Fixed up deepcopy of arraylist for service workers in CloudWatchLogsService, and fixed Log calling methods

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added CloudWatchLogsDispatcher builder pattern, fixed tests for Service and Dispatcher and modified backOffTimeBase

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Removed unused imports

Signed-off-by:Marcos Gonzalez Mayedo <alemayed@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added resetBuffer method, removed unnecessary RetransmissionException, and added logString pass in parameter for staging log events.

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Started making changes to the tests to implement the new class structure (performance enhancement)

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Refactored the CloudWatchLogsDispatcher into two classes with the addition of Uploader, introduced simple multithread tests for CloudWatchLogsService

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Fixed issues with locking in try block and added final multithreaded tests to the CloudWatchLogsService class

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added CloudWatchLogsMetricsTest, changed upper back off time bound and scale, and refactoring changes for better code syntax (renaming, refactoring methods for conciseness, etc...)

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Added changes to javadoc

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

* Update data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/client/CloudWatchLogsDispatcher.java

Co-authored-by: Mark Kuhn <kuhnmar@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <95880281+MaGonzalMayedo@users.noreply.github.com>

* Fixed comment on CloudWatchLogsDispatcher

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>

---------

Signed-off-by: Taylor Gray <tylgry@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <95880281+MaGonzalMayedo@users.noreply.github.com>
Co-authored-by: Taylor Gray <tylgry@amazon.com>
Co-authored-by: Marcos <alemayed@amazon.com>
Co-authored-by: Mark Kuhn <kuhnmar@amazon.com>
  • Loading branch information
4 people authored Jul 31, 2023
1 parent f809163 commit 93d06db
Show file tree
Hide file tree
Showing 23 changed files with 1,088 additions and 215 deletions.
3 changes: 3 additions & 0 deletions data-prepper-plugins/cloudwatch-logs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ dependencies {
implementation 'software.amazon.awssdk:cloudwatch'
implementation 'software.amazon.awssdk:cloudwatchlogs'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation 'org.projectlombok:lombok:1.18.26'
testImplementation project(path: ':data-prepper-test-common')
testImplementation testLibs.mockito.inline
testImplementation 'org.junit.jupiter:junit-jupiter'
compileOnly 'org.projectlombok:lombok:1.18.24'
annotationProcessor 'org.projectlombok:lombok:1.18.24'
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.dataprepper.plugins.sink.buffer;

import java.util.ArrayList;
import java.util.List;

/**
* Buffer that handles the temporary storage of
Expand All @@ -14,6 +14,13 @@
* 2. Transforms to Byte type.
* 3. Returns a Byte type.
*/

/*
TODO:
Need to add PriorityQueue for extracting timestamp, this will need the timestamp and the actual string message itself.
Can refactor the buffer to contain
*/

public interface Buffer {
/**
* Size of buffer in events.
Expand All @@ -31,7 +38,9 @@ public interface Buffer {

byte[] popEvent();

ArrayList<byte[]> getBufferedData();
List<byte[]> getBufferedData();

void clearBuffer();

void resetBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
package org.opensearch.dataprepper.plugins.sink.buffer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class InMemoryBuffer implements Buffer {
private final ArrayList<byte[]> eventsBuffered;
private List<byte[]> eventsBuffered;
private int bufferSize = 0;

InMemoryBuffer() {
Expand All @@ -33,18 +35,27 @@ public void writeEvent(final byte[] event) {

@Override
public byte[] popEvent() {
if (eventsBuffered.isEmpty()) {
return new byte[0];
}
bufferSize -= eventsBuffered.get(0).length;
return eventsBuffered.remove(0);
}

@Override
public ArrayList<byte[]> getBufferedData() {
return eventsBuffered;
public List<byte[]> getBufferedData() {
return Collections.unmodifiableList(eventsBuffered);
}

@Override
public void clearBuffer() {
bufferSize = 0;
eventsBuffered.clear();
}

@Override
public void resetBuffer() {
bufferSize = 0;
eventsBuffered = new ArrayList<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,21 @@
import org.opensearch.dataprepper.plugins.sink.config.AwsConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;

/**
* CwlClientFactory is in charge of reading in
* aws config parameters to return a working
* client for interfacing with
* CloudWatchLogs services.
* CwlClientFactory is in charge of reading in aws config parameters to return a working
* client for interfacing with CloudWatchLogs services.
*/
public final class CloudWatchLogsClientFactory {
private CloudWatchLogsClientFactory() {
}

/**
* Generates a CloudWatchLogs Client based on STS role ARN system credentials.
* @param awsConfig - AwsConfig specifying region, roles, and header overrides.
* @param awsCredentialsSupplier - AwsCredentialsSupplier Interface for which to create CredentialsProvider for Client config.
* @return CloudWatchLogsClient - used to interact with CloudWatch Logs services.
* @param awsConfig AwsConfig specifying region, roles, and header overrides.
* @param awsCredentialsSupplier AwsCredentialsSupplier Interface for which to create CredentialsProvider for Client config.
* @return CloudWatchLogsClient used to interact with CloudWatch Logs services.
*/
public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialOptions(awsConfig);
Expand All @@ -38,10 +37,8 @@ public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, fi
}

private static ClientOverrideConfiguration createOverrideConfiguration() {
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS).build();

return ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
.retryPolicy(r -> r.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;

import lombok.Builder;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException;
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;

@Builder
public class CloudWatchLogsDispatcher {
private static final long UPPER_RETRY_TIME_BOUND_MILLISECONDS = 2000;
private static final float EXP_TIME_SCALE = 1.25F;
private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsDispatcher.class);
private CloudWatchLogsClient cloudWatchLogsClient;
private CloudWatchLogsMetrics cloudWatchLogsMetrics;
private Executor executor;
private String logGroup;
private String logStream;
private int retryCount;
private long backOffTimeBase;
public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient,
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
final Executor executor,
final String logGroup, final String logStream,
final int retryCount, final long backOffTimeBase) {
this.cloudWatchLogsClient = cloudWatchLogsClient;
this.cloudWatchLogsMetrics = cloudWatchLogsMetrics;
this.logGroup = logGroup;
this.logStream = logStream;
this.retryCount = retryCount;
this.backOffTimeBase = backOffTimeBase;

this.executor = executor;
}

/**
* Will read in a collection of log messages in byte form and transform them into a collection of InputLogEvents.
* @param eventMessageBytes Collection of byte arrays holding event messages.
* @return List of InputLogEvents holding the wrapped event messages.
*/
public List<InputLogEvent> prepareInputLogEvents(final Collection<byte[]> eventMessageBytes) {
List<InputLogEvent> logEventList = new ArrayList<>();

/**
* In the current implementation, the timestamp is generated during transmission.
* To properly extract timestamp we need to order the InputLogEvents. Can be done by
* refactoring buffer class with timestamp param, or adding a sorting algorithm in between
* making the PLE object (in prepareInputLogEvents).
*/

for (byte[] data : eventMessageBytes) {
InputLogEvent tempLogEvent = InputLogEvent.builder()
.message(new String(data, StandardCharsets.UTF_8))
.timestamp(System.currentTimeMillis())
.build();
logEventList.add(tempLogEvent);
}

return logEventList;
}

public void dispatchLogs(List<InputLogEvent> inputLogEvents, Collection<EventHandle> eventHandles) {
PutLogEventsRequest putLogEventsRequest = PutLogEventsRequest.builder()
.logEvents(inputLogEvents)
.logGroupName(logGroup)
.logStreamName(logStream)
.build();

executor.execute(Uploader.builder()
.cloudWatchLogsClient(cloudWatchLogsClient)
.cloudWatchLogsMetrics(cloudWatchLogsMetrics)
.putLogEventsRequest(putLogEventsRequest)
.eventHandles(eventHandles)
.backOffTimeBase(backOffTimeBase)
.retryCount(retryCount)
.build());
}

@Builder
protected static class Uploader implements Runnable {
private final CloudWatchLogsClient cloudWatchLogsClient;
private final CloudWatchLogsMetrics cloudWatchLogsMetrics;
private final PutLogEventsRequest putLogEventsRequest;
private final Collection<EventHandle> eventHandles;
private final int retryCount;
private final long backOffTimeBase;

@Override
public void run() {
upload();
}

public void upload() {
boolean failedToTransmit = true;
int failCount = 0;

try {
while (failedToTransmit && (failCount < retryCount)) {
try {
cloudWatchLogsClient.putLogEvents(putLogEventsRequest);

cloudWatchLogsMetrics.increaseRequestSuccessCounter(1);
failedToTransmit = false;

} catch (CloudWatchLogsException | SdkClientException e) {
LOG.error("Failed to push logs with error: {}", e.getMessage());
cloudWatchLogsMetrics.increaseRequestFailCounter(1);
Thread.sleep(calculateBackOffTime(backOffTimeBase, failCount));
failCount++;
}
}
} catch (InterruptedException e) {
LOG.warn("Uploader Thread got interrupted during retransmission with exception: {}", e.getMessage());
//TODO: Push to DLQ.
Thread.currentThread().interrupt();
}


if (failedToTransmit) {
cloudWatchLogsMetrics.increaseLogEventFailCounter(eventHandles.size());
releaseEventHandles(false, eventHandles);
} else {
cloudWatchLogsMetrics.increaseLogEventSuccessCounter(eventHandles.size());
releaseEventHandles(true, eventHandles);
}
}

private long calculateBackOffTime(final long backOffTimeBase, final int failCounter) {
long scale = (long)Math.pow(EXP_TIME_SCALE, failCounter);

if (scale >= UPPER_RETRY_TIME_BOUND_MILLISECONDS) {
return UPPER_RETRY_TIME_BOUND_MILLISECONDS;
}

return scale * backOffTimeBase;
}

private void releaseEventHandles(final boolean result, final Collection<EventHandle> eventHandles) {
if (eventHandles.isEmpty()) {
return;
}

for (EventHandle eventHandle : eventHandles) {
eventHandle.release(result);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;

/**
* Class is meant to abstract the metric book-keeping of
* CloudWatchLogs metrics so that multiple instances
* may refer to it.
*/
public class CloudWatchLogsMetrics {
protected static final String CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED = "cloudWatchLogsRequestsSucceeded";
protected static final String CLOUDWATCH_LOGS_EVENTS_SUCCEEDED = "cloudWatchLogsEventsSucceeded";
protected static final String CLOUDWATCH_LOGS_EVENTS_FAILED = "cloudWatchLogsEventsFailed";
protected static final String CLOUDWATCH_LOGS_REQUESTS_FAILED = "cloudWatchLogsRequestsFailed";
private final Counter logEventSuccessCounter;
private final Counter logEventFailCounter;
private final Counter requestSuccessCount;
private final Counter requestFailCount;

public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) {
this.logEventSuccessCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED);
this.requestFailCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED);
this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED);
this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED);
}

public void increaseLogEventSuccessCounter(int value) {
logEventSuccessCounter.increment(value);
}

public void increaseRequestSuccessCounter(int value) {
requestSuccessCount.increment(value);
}

public void increaseLogEventFailCounter(int value) {
logEventFailCounter.increment(value);
}

public void increaseRequestFailCounter(int value) {
requestFailCount.increment(value);
}
}
Loading

0 comments on commit 93d06db

Please sign in to comment.