Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http chunking fixes #4823

Merged
merged 10 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,24 @@ default boolean isByteBuffer() {
return false;
}

/**
* returns max request size of an entry in the buffer
*
* @return Optional value of the buffer's max request size
*/
default Optional<Integer> getMaxRequestSize() {
return Optional.empty();
}

/**
* returns optimal request size of an entry in the buffer
*
* @return Optional value of the buffer's optimal request size
*/
default Optional<Integer> getOptimalRequestSize() {
return Optional.empty();
}

/**
* Checks if the buffer enables acknowledgements for the pipeline
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ void testMaxRequestSize() {
assertEquals(buffer.getMaxRequestSize(), Optional.empty());
}

@Test
void testOptimalRequestSize() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();
assertEquals(buffer.getOptimalRequestSize(), Optional.empty());
}

@Test
void testShutdown() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public Optional<Integer> getMaxRequestSize() {
return maxRequestSize.isPresent() ? Optional.of(maxRequestSize.getAsInt()) : Optional.empty();
}

@Override
public Optional<Integer> getOptimalRequestSize() {
OptionalInt optimalRequestSize = allBuffers.stream().filter(b -> b.getOptimalRequestSize().isPresent()).mapToInt(b -> (Integer)b.getOptimalRequestSize().get()).min();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some unit test cases. We should be sure to test both when this value is present and not present.

return optimalRequestSize.isPresent() ? Optional.of(optimalRequestSize.getAsInt()) : Optional.empty();
}

@Override
public void shutdown() {
allBuffers.forEach(Buffer::shutdown);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,20 @@ void test_getMaxRequestSize() {
assertThat(multiBufferDecorator.getMaxRequestSize(), equalTo(Optional.empty()));
}

@Test
void test_getOptimalRequestSize() {
when(primaryBuffer.getOptimalRequestSize()).thenReturn(Optional.empty());
when(secondaryBuffer.getOptimalRequestSize()).thenReturn(Optional.empty());

final MultiBufferDecorator multiBufferDecorator = createObjectUnderTest(2);
assertThat(multiBufferDecorator.getOptimalRequestSize(), equalTo(Optional.empty()));
}

private MultiBufferDecorator createObjectUnderTest(final int secondaryBufferCount) {
final List<Buffer> secondaryBuffers = IntStream.range(0, secondaryBufferCount)
.mapToObj(i -> secondaryBuffer)
.collect(Collectors.toList());

return new MultiBufferDecorator(primaryBuffer, secondaryBuffers);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,21 @@ public List<List<String>> parse(HttpData httpData, int maxSize) throws IOExcepti
List<List<String>> jsonList = new ArrayList<>();
final List<Map<String, Object>> logList = mapper.readValue(httpData.toInputStream(),
LIST_OF_MAP_TYPE_REFERENCE);
int size = OVERHEAD_CHARACTERS.length();
List<String> innerJsonList = new ArrayList<>();
for (final Map<String, Object> log: logList) {
int size = OVERHEAD_CHARACTERS.length();
for (Map<String, Object> log: logList) {
final String recordString = mapper.writeValueAsString(log);
if (size + recordString.length() > maxSize) {
final int nextRecordLength = recordString.getBytes(Charset.defaultCharset()).length;
// It is possible that the first record is larger than maxSize, then
// innerJsonList size would be zero.
if (size + nextRecordLength > maxSize && !innerJsonList.isEmpty()) {
jsonList.add(innerJsonList);
innerJsonList = new ArrayList<>();
size = OVERHEAD_CHARACTERS.length();
}
// The following may result in a innerJsonList with larger than "maxSize" length recordString
innerJsonList.add(recordString);
size += recordString.getBytes(Charset.defaultCharset()).length + COMMA_OVERHEAD_LENGTH;
size += nextRecordLength + COMMA_OVERHEAD_LENGTH;
}
if (size > OVERHEAD_CHARACTERS.length()) {
jsonList.add(innerJsonList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.params.provider.Arguments.arguments;
Expand Down Expand Up @@ -69,20 +70,28 @@ public void testParseSuccessWithMaxSize() throws IOException {
@ParameterizedTest
@ArgumentsSource(JsonArrayWithKnownFirstArgumentsProvider.class)
public void parse_should_return_lists_smaller_than_provided_length(
final String inputJsonArray, final String knownFirstPart) throws IOException {
final String inputJsonArray, final String knownFirstPart, final int maxSize, final List<List<String>> expectedChunks, final List<Boolean> exceedsMaxSize) throws IOException {
final int knownSingleBodySize = knownFirstPart.getBytes(Charset.defaultCharset()).length;
final int maxSize = (knownSingleBodySize * 2) + 3;
final List<List<String>> chunkedBodies = objectUnderTest.parse(HttpData.ofUtf8(inputJsonArray),
maxSize);

assertThat(chunkedBodies, notNullValue());
assertThat(chunkedBodies.size(), greaterThanOrEqualTo(1));
final String firstReconstructed = chunkedBodies.get(0).stream().collect(Collectors.joining(",", "[", "]"));
assertThat(firstReconstructed.getBytes(Charset.defaultCharset()).length,
lessThanOrEqualTo(maxSize));

assertThat(chunkedBodies.get(0).size(), greaterThanOrEqualTo(1));
assertThat(chunkedBodies.get(0).get(0), equalTo(knownFirstPart));
assertThat(chunkedBodies.size(), equalTo(expectedChunks.size()));

for (int i = 0; i < expectedChunks.size(); i++) {
final String reconstructed = chunkedBodies.get(i).stream().collect(Collectors.joining(",", "[", "]"));
if (exceedsMaxSize.get(i)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might it be simpler to provide the expected size and then do a single assertThat(..., equalTo(expectedSize))?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to be so precise.

assertThat(reconstructed.getBytes(Charset.defaultCharset()).length,
greaterThanOrEqualTo(maxSize));
} else {
assertThat(reconstructed.getBytes(Charset.defaultCharset()).length,
lessThanOrEqualTo(maxSize));
}

for (int j = 0; j < expectedChunks.get(i).size(); j++) {
assertThat(chunkedBodies.get(i).get(j), equalTo(expectedChunks.get(i).get(j)));
}
}
}

@Test
Expand All @@ -103,14 +112,50 @@ public void testParseNonJsonFailure() {
static class JsonArrayWithKnownFirstArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) throws Exception {
// First test, all chunks smaller than maxSize, output has 3 lists, all smaller than maxSize
String chunk11 = "{\"ὊὊὊ1\":\"ὊὊὊ1\"}";
String chunk12 = "{\"ὊὊὊ2\":\"ὊὊὊO2\"}";
String chunk13 = "{\"a3\":\"b3\"}";
String chunk14 = "{\"ὊὊὊ4\":\"ὊὊὊ4\"}";
// Second test, all chunks smaller than maxSize, output has 2 lists, all smaller than maxSize
String chunk21 = "{\"aaa1\":\"aaa1\"}";
String chunk22 = "{\"aaa2\":\"aaa2\"}";
String chunk23 = "{\"a3\":\"b3\"}";
String chunk24 = "{\"bbb4\":\"bbb4\"}";
// Third test, all chunks larger than maxSize, output has 4 lists, all larger than maxSize
String chunk31 = "{\"ὊὊὊ1\":\"ὊὊὊ01\"}";
String chunk32 = "{\"ὊὊὊ2\":\"ὊὊὊO2\"}";
String chunk33 = "{\"ὊὊὊ3\":\"ὊὊὊO3\"}";
String chunk34 = "{\"ὊὊὊ4\":\"ὊὊὊO4\"}";
// Fourth test, only first chunk larger than maxSize, output has 3 lists, with first chunk larger than maxSize and others smaller
String chunk41 = "{\"aaaaaaaaaaa1\":\"aaaaaaaaaaa1\"}";
String chunk42 = "{\"aaa2\":\"aaa2\"}";
String chunk43 = "{\"a3\":\"b3\"}";
String chunk44 = "{\"bbb4\":\"bbb4\"}";
// Fifth test, only second chunk larger than maxSize, output has 3 lists, with second chunk larger than maxSize and others smaller
String chunk51 = "{\"aaa2\":\"aaa2\"}";
String chunk52 = "{\"aaaaaaaaaaa1\":\"aaaaaaaaaaa1\"}";
String chunk53 = "{\"a3\":\"b3\"}";
String chunk54 = "{\"bb4\":\"bb4\"}";
// Sixth test, only last chunk larger than maxSize, output has 3 lists, with last chunk larger than maxSize and others smaller
String chunk61 = "{\"aaa2\":\"aaa2\"}";
String chunk62 = "{\"a3\":\"b3\"}";
String chunk63 = "{\"bbb4\":\"bbb4\"}";
String chunk64 = "{\"aaaaaaaaaaa1\":\"aaaaaaaaaaa1\"}";
final int maxSize1 = chunk11.getBytes(Charset.defaultCharset()).length * 2 + 3;
final int maxSize2 = chunk21.getBytes(Charset.defaultCharset()).length * 2 + 3;
final int maxSize3 = chunk31.getBytes(Charset.defaultCharset()).length - 1;
final int maxSize4 = chunk42.getBytes(Charset.defaultCharset()).length * 2 + 3;
final int maxSize5 = chunk51.getBytes(Charset.defaultCharset()).length * 2 + 3;
final int maxSize6 = chunk61.getBytes(Charset.defaultCharset()).length * 2 + 3;
return Stream.of(
arguments(
"[{\"ὊὊὊ1\":\"ὊὊὊ1\"}, {\"ὊὊὊ2\":\"ὊὊὊ2\"}, {\"a3\":\"b3\"}, {\"ὊὊὊ4\":\"ὊὊὊ4\"}]",
"{\"ὊὊὊ1\":\"ὊὊὊ1\"}"),
arguments(
"[{\"aaa1\":\"aaa1\"}, {\"aaa2\":\"aaa2\"}, {\"a3\":\"b3\"}, {\"bbb4\":\"bbb4\"}]",
"{\"aaa1\":\"aaa1\"}")
arguments("["+chunk11+","+chunk12+","+chunk13+","+chunk14+"]", chunk11, maxSize1, List.of(List.of(chunk11), List.of(chunk12, chunk13), List.of(chunk14)), List.of(false, false, false)),
arguments("["+chunk21+","+chunk22+","+chunk23+","+chunk24+"]", chunk21, maxSize2, List.of(List.of(chunk21, chunk22), List.of(chunk23, chunk24)), List.of(false, false)),
arguments("["+chunk31+","+chunk32+","+chunk33+","+chunk34+"]", chunk31, maxSize3, List.of(List.of(chunk31), List.of(chunk32), List.of(chunk33), List.of(chunk34)), List.of(true, true, true, true)),
arguments("["+chunk41+","+chunk42+","+chunk43+","+chunk44+"]", chunk41, maxSize4, List.of(List.of(chunk41), List.of(chunk42, chunk43), List.of(chunk44)), List.of(true, false, false)),
arguments("["+chunk51+","+chunk52+","+chunk53+","+chunk54+"]", chunk51, maxSize5, List.of(List.of(chunk51), List.of(chunk52), List.of(chunk53,chunk54)), List.of(false, true, false)),
arguments("["+chunk61+","+chunk62+","+chunk63+","+chunk64+"]", chunk61, maxSize6, List.of(List.of(chunk61,chunk62), List.of(chunk63), List.of(chunk64)), List.of(false, false, true))
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class LogHTTPService {
private static final int SERIALIZATION_OVERHEAD = 1024;
public static final String REQUESTS_RECEIVED = "requestsReceived";
public static final String SUCCESS_REQUESTS = "successRequests";
public static final String REQUESTS_OVER_OPTIMAL_SIZE = "requestsOverOptimalSize";
public static final String REQUESTS_OVER_MAXIMUM_SIZE = "requestsOverMaximumSize";
public static final String PAYLOAD_SIZE = "payloadSize";
public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration";

Expand All @@ -50,9 +52,12 @@ public class LogHTTPService {
private final int bufferWriteTimeoutInMillis;
private final Counter requestsReceivedCounter;
private final Counter successRequestsCounter;
private final Counter requestsOverOptimalSizeCounter;
private final Counter requestsOverMaximumSizeCounter;
private final DistributionSummary payloadSizeSummary;
private final Timer requestProcessDuration;
private Integer maxRequestLength;
private Integer optimalRequestLength;

public LogHTTPService(final int bufferWriteTimeoutInMillis,
final Buffer<Record<Log>> buffer,
Expand All @@ -61,8 +66,11 @@ public LogHTTPService(final int bufferWriteTimeoutInMillis,
this.buffer = buffer;
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
this.maxRequestLength = buffer.getMaxRequestSize().isPresent() ? buffer.getMaxRequestSize().get(): null;
this.optimalRequestLength = buffer.getOptimalRequestSize().isPresent() ? buffer.getOptimalRequestSize().get(): null;
requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
requestsOverOptimalSizeCounter = pluginMetrics.counter(REQUESTS_OVER_OPTIMAL_SIZE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than have a metric to count these scenarios (and there may be others as time goes on), a distribution summary of the size each payload would tell us all we need.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think so. Definitely the exception case we want a different counter. If you want distribution summary for all the sizes < 4MB, I am OK.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, there is already DistributionSummary for payload. These are counters for special cases.

requestsOverMaximumSizeCounter = pluginMetrics.counter(REQUESTS_OVER_MAXIMUM_SIZE);
payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE);
requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION);
}
Expand Down Expand Up @@ -91,24 +99,33 @@ private void sendJsonList(List<String> jsonList) throws Exception {
}
sb.append("]");
if (sb.toString().getBytes().length > maxRequestLength) {
requestsOverMaximumSizeCounter.increment();
throw new RuntimeException("Request length "+ sb.toString().getBytes().length + " exceeds maxRequestLength "+ maxRequestLength);
} else if (sb.toString().getBytes().length > optimalRequestLength) {
requestsOverOptimalSizeCounter.increment();
}
buffer.writeBytes(sb.toString().getBytes(), key, bufferWriteTimeoutInMillis);
}

private HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) throws Exception {
HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) throws Exception {
final HttpData content = aggregatedHttpRequest.content();
List<List<String>> jsonList;
boolean isJsonListSplit = false;

try {
jsonList = (maxRequestLength == null) ? jsonCodec.parse(content) : jsonCodec.parse(content, maxRequestLength - SERIALIZATION_OVERHEAD);
if (buffer.isByteBuffer() && maxRequestLength != null && optimalRequestLength != null) {
jsonList = jsonCodec.parse(content, optimalRequestLength - SERIALIZATION_OVERHEAD);
isJsonListSplit = true;
} else {
jsonList = jsonCodec.parse(content);
}
} catch (IOException e) {
LOG.error("Failed to parse the request of size {} due to: {}", content.length(), e.getMessage());
throw new IOException("Bad request data format. Needs to be json array.", e.getCause());
}
try {
if (buffer.isByteBuffer()) {
if (maxRequestLength != null && content.array().length > maxRequestLength) {
if (isJsonListSplit && content.array().length > optimalRequestLength) {
for (final List<String> innerJsonList: jsonList) {
sendJsonList(innerJsonList);
}
Expand Down
Loading
Loading