-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add creation and aggregation of dynamic S3 groups based on events #4346
Conversation
Signed-off-by: Taylor Gray <tylgry@amazon.com>
final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, s3Client); | ||
|
||
|
||
s3SinkService = new S3SinkService(s3SinkConfig, codec, s3OutputCodecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics, s3GroupManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: what is the duration ? Can you move that to constant ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was already here. Looks like the variable is called retrySleepTime
. I can make it a constant
} | ||
} | ||
|
||
for (final Map.Entry<S3GroupIdentifier, S3Group> s3GroupEntry : s3GroupManager.getS3GroupEntries()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be flushed when global threshold is reached ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The global threshold will be handled in a follow-up PR, and will be configured separately as aggregate_threshold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that @graytaylor0 will submit a follow-on to support the aggregate_threshold
to do just that.
import static org.hamcrest.CoreMatchers.equalTo; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
|
||
public class S3GroupIdentifierTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should test hashCode()
as well.
@@ -45,6 +45,7 @@ | |||
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add an integration test that verifies the key path is dynamic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way for me to create an actual GenericExpressionEvaluator
for the integration tests? Or can I verify this while mocking ExpressionEvaluator
?
final Event event = record.getData(); | ||
final Map.Entry<S3GroupIdentifier, S3Group> s3GroupEntry = s3GroupManager.getOrCreateGroupForEvent(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can add S3GroupIdentifier
to S3Group
as a private field. With this, you can simplify this method to return just a S3Group
rather than a Map<Entry>
. You would need to update removeGroup()
to take in a S3Group
. But, this will work because it would have the S3GroupIdentifier
.
|
||
import java.util.Objects; | ||
|
||
public class S3GroupIdentifier { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you take my comments above about having S3Group
hold the S3GroupIdentifier
, then you can make this class package protected. Let the other classes just deal with S3Group
and not have to work about these internals.
|
||
public S3GroupIdentifier getS3GroupIdentifierForEvent(final Event event) { | ||
|
||
final String groupIdentificationHash = keyGenerator.generateKeyForEvent(event, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic couples the generateKeyForEvent
code greatly with the hashing. A change in that method would break this logic. Not only that, but it depends on ObjectKey
which thus broadens the places where a behavioral change could break this.
I recommend that you make this a Map<String, Object>
instead of a String
. The values might look like:
/my_key/path -> myStringValue1
You would need to add some code that extracts key paths from the expression. This might be something to add to ExpressionEvaluator
, though I'm not sure if that is the best place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would have to be in ExpressionEvaluator
since it would also need to run the expressions to create the values of the Map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or are you thinking that expression evaluator would support a function to extract all of the ${}
sections out of a format expression, and then each of these individual expressions would need to be run separately to get the values? Or can we just have expression evaluator return the full Map result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or are you thinking that expression evaluator would support a function to extract all of the ${} sections out of a format expression, and then each of these individual expressions would need to be run separately to get the values?
This is what I was thinking. And they should come out in a normalized manner. That is, all have the leading /
. This way you always get a consistent value.
Or can we just have expression evaluator return the full Map result?
This could be an option as well. But, it seems a little more specific to this use-case. Getting the list of keys may be more extensible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I was thinking. And they should come out in a normalized manner. That is, all have the leading /. This way you always get a consistent value.
So in this case, the S3 sink would have to explicitly check for keys and other expressions, and would have to check the Event itself for the values and run the expression evaluation functions individually as well
Signed-off-by: Taylor Gray <tylgry@amazon.com>
Signed-off-by: Taylor Gray <tylgry@amazon.com>
Signed-off-by: Taylor Gray <tylgry@amazon.com>
int position = 0; | ||
while ((position = format.indexOf("${", fromIndex)) != -1) { | ||
int endPosition = format.indexOf("}", position + 1); | ||
if (endPosition == -1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a user error in dynamic expression. Looks like we are ignoring any errors ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not responsible for catching invalid expressions, it will just parse out the valid ones. If you think it should throw, then it can here but it's job is not to validate, just to parse and return
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add some unit tests for validating the expression on startup of the S3 sink in the next PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a test case to cover validation on startup for s3 sink constructor as discussed offline.
|
||
@ParameterizedTest | ||
@ArgumentsSource(FormatExpressionsToExtractedDynamicExpressionsArgumentProvider.class) | ||
void extractDynamicExpressionsFromFormatExpression_returns_expected_result(final String formatExpression, final List<String> expectedDynamicExpressions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a negative test where invalid expression is passed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a negative case where invalid expression syntax is found, it will just not be added to the list. This method is just supposed to parse out valid existing expressions
Description
This change adds a concept of
S3Groups
to the S3 Sink. Groups will be created dynamically based on the Events that come into the sink, and will be sent to S3 as a single object when the thresholds are reached for that group.This PR only adds support for a dynamic
path_prefix
. The support for exposingobject_name
as a dynamic expression will be added in a future PRFuture Planned PRs
aggregate_threshold
for flushing the largest groups when the threshold is reachedgetEpochNanos()
andgetRandomUUID()
into the dynamic expressionsIssues Resolved
Related to #4345
Ran the s3 sink integration tests successfully
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.