Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http chunking fixes #4823

Merged
merged 10 commits into from
Aug 14, 2024
Merged

Conversation

kkondaka
Copy link
Collaborator

Description

HTTP chunking has couple of issues

  • The multi-byte fix added in #PR4656 needs also use the same unicode length in another place
  • The code puts at least one message(or chunk) in each of the lists. But it is possible for the chunk itself can be larger than 1MB. To address this issue, a new "optimal size" is added to the buffer. In case of kafka, optimal size is set to 1MB and max size is set to 4MB.

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [ X] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

kkondaka and others added 4 commits August 11, 2024 01:56
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I think this is the right solution. I have a few comments on some of the details.

@JsonProperty("max_request_size")
private int maxRequestSize = DEFAULT_MAX_REQUEST_SIZE;
private int maxRequestSize = 4*DEFAULT_MAX_REQUEST_SIZE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep the max_request_size the same. There are a few reasons: 1) This change is going beyond Kafka recommendations (of 1MB). 2) This does change the existing behavior because it affects the topics on Kafka. 3) This may affect users without them clearly understanding the implications.

@@ -96,6 +96,11 @@ public Optional<Integer> getMaxRequestSize() {
return Optional.of(producer.getMaxRequestSize());
}

@Override
public Optional<Integer> getOptimalRequestSize() {
return Optional.of(producer.getMaxRequestSize() / 4);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return Optional.of(producer.getMaxRequestSize() / 4);
return Optional.of(ONE_MEGABYTE);

Can we just make this value equal to 1MB? I think this is really what we are aiming for.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to DEFAULT MAX REQUEST SIZE. not dividing with 4 anymore.

@@ -101,7 +101,7 @@ private HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRe
List<List<String>> jsonList;

try {
jsonList = (maxRequestLength == null) ? jsonCodec.parse(content) : jsonCodec.parse(content, maxRequestLength - SERIALIZATION_OVERHEAD);
jsonList = (maxRequestLength == null) ? jsonCodec.parse(content) : jsonCodec.parse(content, buffer.getOptimalRequestSize().get() - SERIALIZATION_OVERHEAD);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two small problems here that you can fix.

buffer.getOptimalRequestSize().get() 
  1. buffer.getOptimalRequestSize() may return null, leading to NPE on .get()
  2. buffer.getOptimalRequestSize() may return empty, leading to a NoSuchElementException on .get().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need an NPE check actually. Just check that the optional is present.

if (size + nextRecordLength > maxSize) {
// It is possible that the first record is larger than maxSize, then
// innerJsonList size would be zero.
if (innerJsonList.size() > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a unit test case for this condition.

@@ -50,6 +50,12 @@ public Optional<Integer> getMaxRequestSize() {
return maxRequestSize.isPresent() ? Optional.of(maxRequestSize.getAsInt()) : Optional.empty();
}

@Override
public Optional<Integer> getOptimalRequestSize() {
OptionalInt optimalRequestSize = allBuffers.stream().filter(b -> b.getOptimalRequestSize().isPresent()).mapToInt(b -> (Integer)b.getOptimalRequestSize().get()).min();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some unit test cases. We should be sure to test both when this value is present and not present.

Krishna Kondaka added 4 commits August 12, 2024 18:38
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
final int knownSingleBodySize = knownFirstPart.getBytes(Charset.defaultCharset()).length;
final int maxSize = (knownSingleBodySize * 2) + 3;
//final int maxSize = (knownSingleBodySize * 2) + 3;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove commented code.

requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
requestsOverOptimalSizeCounter = pluginMetrics.counter(REQUESTS_OVER_OPTIMAL_SIZE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than have a metric to count these scenarios (and there may be others as time goes on), a distribution summary of the size each payload would tell us all we need.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think so. Definitely the exception case we want a different counter. If you want distribution summary for all the sizes < 4MB, I am OK.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, there is already DistributionSummary for payload. These are counters for special cases.


for (int i = 0; i < expectedChunks.size(); i++) {
final String reconstructed = chunkedBodies.get(i).stream().collect(Collectors.joining(",", "[", "]"));
if (exceedsMaxSize.get(i)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might it be simpler to provide the expected size and then do a single assertThat(..., equalTo(expectedSize))?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to be so precise.

for (final Map<String, Object> log: logList) {
final String recordString = mapper.writeValueAsString(log);
int nextRecordLength = recordString.getBytes(Charset.defaultCharset()).length;
String recordString = mapper.writeValueAsString(logList.get(0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow this change from your previous commit. Why would index 0 be a special case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can go back to previous commit, I thought this way, I can avoid the check inside the loop. I am OK, either way

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the previous commit to be clearer.

}
buffer.writeBytes(sb.toString().getBytes(), key, bufferWriteTimeoutInMillis);
}

private HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) throws Exception {
final HttpData content = aggregatedHttpRequest.content();
List<List<String>> jsonList;
boolean jsonListSplitSuccess = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is "success" the right word here? I had some difficulty understanding this code at first. I think you want to ask is "is the JSON list actually split?".

Perhaps: isJsonListSplit?

@@ -103,13 +113,49 @@ public void testParseNonJsonFailure() {
static class JsonArrayWithKnownFirstArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) throws Exception {
// First test, all chunks smaller than maxSize, but output has 3 lists
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is getting quite hard to understand now.

I'm not sure I have a great suggestion. The first suggestion I have is that perhaps you can keep the existing tests, but add a new test case as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only two existing tests and that test is already kept as the first test here. The existing test doesn't do proper validations. I can add more comments if that helps.

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working this!

@kkondaka kkondaka merged commit 1bfed0d into opensearch-project:main Aug 14, 2024
48 of 50 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants