-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GitHub-Issue#2778: Added CloudWatchLogsSink #3084
GitHub-Issue#2778: Added CloudWatchLogsSink #3084
Conversation
…pensearch-project#2910) Create Elasticsearch client, implement search and pit apis for ElasticsearchAccessor Signed-off-by: Taylor Gray <tylgry@amazon.com> Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Added Config Files for CloudWatchLogs Sink. Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
… syntax) Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Added default params for back_off and log_send_interval alongside test cases for ThresholdConfig. Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
…ith AwsConfig. Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
…mer and params to AwsConfig Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
…tive mapping to tests CwlSink Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
…xRequestSize Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <95880281+MaGonzalMayedo@users.noreply.github.com>
Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
import java.util.concurrent.Executor; | ||
import java.util.concurrent.Executors; | ||
|
||
@DataPrepperPlugin(name = "cloudwatchlogs-sink", pluginType = Sink.class, pluginConfigurationType = CloudWatchLogsSinkConfig.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the convention for sink naming is without -sink
. For example http
and s3
. Are you sure about this naming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, please drop the -sink
. You can name it cloudwatchlogs
or cloudwatch_logs
if that matches the naming convention better. Data Prepper does use underscores in between words.
See for example:
Line 39 in 93d06db
@DataPrepperPlugin(name = "otel_traces", deprecatedName = "otel_trace_raw", pluginType = Processor.class, pluginConfigurationType = OtelTraceRawProcessorConfig.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, I will have it change to "cloudwatch_logs".
|
||
@Override | ||
public void doInitialize() { | ||
isInitialized = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the initialize method for? Should we be creating failable object here or in the class constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it has to with data-prepper's way of initializing sinks. In the Pipeline class, data-prepper checks to see if every sink is active through an initializer thread. Which just waits until each sink is "initialized" or times out trying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlvenable Is there another reason fro this?
@Override | ||
public void doOutput(Collection<Record<Event>> records) { | ||
if (records.isEmpty()) { | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a test for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add the tests for this method for the next commit.
a9dd0cb
to
93f6fe5
Compare
import java.util.concurrent.Executor; | ||
import java.util.concurrent.Executors; | ||
|
||
@DataPrepperPlugin(name = "cloudwatchlogs-sink", pluginType = Sink.class, pluginConfigurationType = CloudWatchLogsSinkConfig.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, please drop the -sink
. You can name it cloudwatchlogs
or cloudwatch_logs
if that matches the naming convention better. Data Prepper does use underscores in between words.
See for example:
Line 39 in 93d06db
@DataPrepperPlugin(name = "otel_traces", deprecatedName = "otel_trace_raw", pluginType = Processor.class, pluginConfigurationType = OtelTraceRawProcessorConfig.class) |
thresholdConfig.getMaxRequestSize(),thresholdConfig.getLogSendInterval()); | ||
|
||
CloudWatchLogsClient cloudWatchLogsClient = CloudWatchLogsClientFactory.createCwlClient(awsConfig, awsCredentialsSupplier); | ||
BufferFactory bufferFactory = new InMemoryBufferFactory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there only one buffer available right now? Can you pull this from the config instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is only one sink right now. An "InMemoryBuffer". But I will make the change to extract it via string comparison so it can be expanded easily.
return; | ||
} | ||
|
||
cloudWatchLogsService.processLogEvents(records); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have unit test for this as well. Just to verify that the records are passed into the service.
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.sink; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put this project into a unique package. This way no class in here clobbers any other classes.
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, I will try to have them moved for the revision.
…he sink. Also moved plugin within "cloudwatch_logs" package. Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
897bc0d
to
645463e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thank you @MaGonzalMayedo !
@MaGonzalMayedo , There are failing tests.
|
Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Description
This change implements the main Sink entry point for the CloudWatchLogs Sink. It extends the plugin by adding a single sink class alongside a test.
Issues Resolved
This PR will work towards resolving CWL-Sink for Data-Prepper [Issue https://github.com/https://github.com/https://github.com/https://github.com/https://github.com//issues/2778]
Link: #2778 (comment)
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.