-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add aggregate_threshold with maximum_size to s3 sink #4385
Add aggregate_threshold with maximum_size to s3 sink #4385
Conversation
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1526dbd
to
7d052d9
Compare
@@ -50,6 +55,10 @@ public Collection<S3Group> getS3GroupEntries() { | |||
return allGroups.values(); | |||
} | |||
|
|||
public Collection<S3Group> getS3GroupsSortedBySize() { | |||
return allGroups.values().stream().sorted().collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially tried using a TreeSet for this, but the buffer size is changing so often, that groups need to be iterated over and re-added to the TreeSet whenever the function is called, which used more memory and is still O(n log(n)) just like sorting the list like this
@@ -46,6 +47,9 @@ public class S3SinkConfig { | |||
@NotNull | |||
private ThresholdOptions thresholdOptions; | |||
|
|||
@JsonProperty("aggregate_threshold") | |||
private AggregateThresholdOptions aggregateThresholdOptions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be made to private AggregateThresholdOptions aggregateThresholdOptions = new AggregateThresholdOptions();
, or should this remain null by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of having a default. But, I think it would have to by dynamic. That is, we'd probably need to assign it like this:
max(DEFAULT_BYTE_CAPACITY, threshold.maximum_size)
As it is now, we may have a bug if you set this:
aggregate_threshold:
flush_capacity_ratio: 0.25
threshold:
maximum_size: 100mb
You would have an aggregate threshold of 50mb, but you are trying to get a 100mb threshold on each group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense. It is a little strange to add defaults based on other config parameters though, as they aren't really connected in the code. Also, it may be better to not require the code that checks for aggregate thresholds to run every time by default, since in many cases (all existing cases) users only have a single group, making the aggregate threshold obsolete. For these reasons, I will just leave it with no default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, let's not have any default - even the maximum_size. I think if the user is going to specify the aggregate_threshold
at all, then he must also specify the maximum_size
. This will require the user to think about it more and hopefully get it correct in relation to the other thresholds.
We could always change this later. But, if you remove the default 50mb for now, we won't have this possible confusion.
@@ -46,6 +47,9 @@ public class S3SinkConfig { | |||
@NotNull | |||
private ThresholdOptions thresholdOptions; | |||
|
|||
@JsonProperty("aggregate_threshold") | |||
private AggregateThresholdOptions aggregateThresholdOptions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of having a default. But, I think it would have to by dynamic. That is, we'd probably need to assign it like this:
max(DEFAULT_BYTE_CAPACITY, threshold.maximum_size)
As it is now, we may have a bug if you set this:
aggregate_threshold:
flush_capacity_ratio: 0.25
threshold:
maximum_size: 100mb
You would have an aggregate threshold of 50mb, but you are trying to get a 100mb threshold on each group.
private static final String DEFAULT_BYTE_CAPACITY = "50mb"; | ||
|
||
@JsonProperty("maximum_size") | ||
private String maximumSize = DEFAULT_BYTE_CAPACITY; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make this a ByteCount
and data-prepper-core will parse it.
Lines 96 to 97 in 41f2d64
@JsonProperty("max_request_length") | |
private ByteCount maxRequestLength; |
Collection<EventHandle> getGroupEventHandles() { | ||
return groupEventHandles; | ||
@Override | ||
public int compareTo(final S3Group o) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some unit tests for this.
final List<Buffer> expectedOrder = List.of(thirdBuffer, secondBuffer, buffer); | ||
|
||
int index = 0; | ||
for (final S3Group s3Group : sortedGroups) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you do this?
assertThat(s3Group.size(), equalTo(3));
assertThat(sortedGroups, contains(thirdBuffer, secondBuffer, buffer));
assertThat(sortedGroupsAfterRemoval.size(), equalTo(2)); | ||
assertThat(objectUnderTest.getNumberOfGroups(), equalTo(2)); | ||
|
||
final List<Buffer> expectedOrderAfterRemoval = List.of(thirdBuffer, buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly:
assertThat(sortedGroupsAfterRemoval.size(), equalTo(2));
assertThat(sortedGroupsAfterRemoval, contains(thirdBuffer, buffer));
Signed-off-by: Taylor Gray <tylgry@amazon.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work identifying the potential memory issue and addressing it!
return groupEventHandles; | ||
@Override | ||
public int compareTo(final S3Group o) { | ||
return Long.compare(o.getBuffer().getSize(), buffer.getSize()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this is a little counter-intuitive that largerGroup.compareTo(smallerGroup) results in a negative value. But it's a minor thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it is a little odd for sure. But it means that sort() will give the reverse order, which is what we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@graytaylor0 , @oeyh , This is a good point. It might end up causing bugs later if we expand this.
How about making this return natural order and then use the following? That should work because it will provide a reverse comparison.
return allGroups.values().stream().sorted(Collections.reverseOrder()).collect(Collectors.toList());
Relevant Java code:
and
Signed-off-by: Taylor Gray <tylgry@amazon.com>
ebf9725
to
ec98dc0
Compare
Description
This change adds an
aggregate_threshold
configuration to the s3 sink to prevent out of memory from holding too many groups in memory, including the following parametersIssues Resolved
Related to #4345
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.