Skip to content

Commit

Permalink
Http chunking fixes (#4823)
Browse files Browse the repository at this point in the history
* dplive1.yaml

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Delete .github/workflows/static.yml

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Fix http message chunking bug

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Modified tests to test for chunks correctly

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Added comments

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Addressed offline review comments

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Addressed  review comments

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Added tests

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Added tests

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
kkondaka and Krishna Kondaka authored Aug 14, 2024
1 parent 00cc2a5 commit 1bfed0d
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,24 @@ default boolean isByteBuffer() {
return false;
}

/**
* returns max request size of an entry in the buffer
*
* @return Optional value of the buffer's max request size
*/
default Optional<Integer> getMaxRequestSize() {
return Optional.empty();
}

/**
* returns optimal request size of an entry in the buffer
*
* @return Optional value of the buffer's optimal request size
*/
default Optional<Integer> getOptimalRequestSize() {
return Optional.empty();
}

/**
* Checks if the buffer enables acknowledgements for the pipeline
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ void testMaxRequestSize() {
assertEquals(buffer.getMaxRequestSize(), Optional.empty());
}

@Test
void testOptimalRequestSize() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();
assertEquals(buffer.getOptimalRequestSize(), Optional.empty());
}

@Test
void testShutdown() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public Optional<Integer> getMaxRequestSize() {
return maxRequestSize.isPresent() ? Optional.of(maxRequestSize.getAsInt()) : Optional.empty();
}

@Override
public Optional<Integer> getOptimalRequestSize() {
OptionalInt optimalRequestSize = allBuffers.stream().filter(b -> b.getOptimalRequestSize().isPresent()).mapToInt(b -> (Integer)b.getOptimalRequestSize().get()).min();
return optimalRequestSize.isPresent() ? Optional.of(optimalRequestSize.getAsInt()) : Optional.empty();
}

@Override
public void shutdown() {
allBuffers.forEach(Buffer::shutdown);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,20 @@ void test_getMaxRequestSize() {
assertThat(multiBufferDecorator.getMaxRequestSize(), equalTo(Optional.empty()));
}

@Test
void test_getOptimalRequestSize() {
when(primaryBuffer.getOptimalRequestSize()).thenReturn(Optional.empty());
when(secondaryBuffer.getOptimalRequestSize()).thenReturn(Optional.empty());

final MultiBufferDecorator multiBufferDecorator = createObjectUnderTest(2);
assertThat(multiBufferDecorator.getOptimalRequestSize(), equalTo(Optional.empty()));
}

private MultiBufferDecorator createObjectUnderTest(final int secondaryBufferCount) {
final List<Buffer> secondaryBuffers = IntStream.range(0, secondaryBufferCount)
.mapToObj(i -> secondaryBuffer)
.collect(Collectors.toList());

return new MultiBufferDecorator(primaryBuffer, secondaryBuffers);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,21 @@ public List<List<String>> parse(HttpData httpData, int maxSize) throws IOExcepti
List<List<String>> jsonList = new ArrayList<>();
final List<Map<String, Object>> logList = mapper.readValue(httpData.toInputStream(),
LIST_OF_MAP_TYPE_REFERENCE);
int size = OVERHEAD_CHARACTERS.length();
List<String> innerJsonList = new ArrayList<>();
for (final Map<String, Object> log: logList) {
int size = OVERHEAD_CHARACTERS.length();
for (Map<String, Object> log: logList) {
final String recordString = mapper.writeValueAsString(log);
if (size + recordString.length() > maxSize) {
final int nextRecordLength = recordString.getBytes(Charset.defaultCharset()).length;
// It is possible that the first record is larger than maxSize, then
// innerJsonList size would be zero.
if (size + nextRecordLength > maxSize && !innerJsonList.isEmpty()) {
jsonList.add(innerJsonList);
innerJsonList = new ArrayList<>();
size = OVERHEAD_CHARACTERS.length();
}
// The following may result in a innerJsonList with larger than "maxSize" length recordString
innerJsonList.add(recordString);
size += recordString.getBytes(Charset.defaultCharset()).length + COMMA_OVERHEAD_LENGTH;
size += nextRecordLength + COMMA_OVERHEAD_LENGTH;
}
if (size > OVERHEAD_CHARACTERS.length()) {
jsonList.add(innerJsonList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.params.provider.Arguments.arguments;
Expand Down Expand Up @@ -69,20 +70,28 @@ public void testParseSuccessWithMaxSize() throws IOException {
@ParameterizedTest
@ArgumentsSource(JsonArrayWithKnownFirstArgumentsProvider.class)
public void parse_should_return_lists_smaller_than_provided_length(
final String inputJsonArray, final String knownFirstPart) throws IOException {
final String inputJsonArray, final String knownFirstPart, final int maxSize, final List<List<String>> expectedChunks, final List<Boolean> exceedsMaxSize) throws IOException {
final int knownSingleBodySize = knownFirstPart.getBytes(Charset.defaultCharset()).length;
final int maxSize = (knownSingleBodySize * 2) + 3;
final List<List<String>> chunkedBodies = objectUnderTest.parse(HttpData.ofUtf8(inputJsonArray),
maxSize);

assertThat(chunkedBodies, notNullValue());
assertThat(chunkedBodies.size(), greaterThanOrEqualTo(1));
final String firstReconstructed = chunkedBodies.get(0).stream().collect(Collectors.joining(",", "[", "]"));
assertThat(firstReconstructed.getBytes(Charset.defaultCharset()).length,
lessThanOrEqualTo(maxSize));

assertThat(chunkedBodies.get(0).size(), greaterThanOrEqualTo(1));
assertThat(chunkedBodies.get(0).get(0), equalTo(knownFirstPart));
assertThat(chunkedBodies.size(), equalTo(expectedChunks.size()));

for (int i = 0; i < expectedChunks.size(); i++) {
final String reconstructed = chunkedBodies.get(i).stream().collect(Collectors.joining(",", "[", "]"));
if (exceedsMaxSize.get(i)) {
assertThat(reconstructed.getBytes(Charset.defaultCharset()).length,
greaterThanOrEqualTo(maxSize));
} else {
assertThat(reconstructed.getBytes(Charset.defaultCharset()).length,
lessThanOrEqualTo(maxSize));
}

for (int j = 0; j < expectedChunks.get(i).size(); j++) {
assertThat(chunkedBodies.get(i).get(j), equalTo(expectedChunks.get(i).get(j)));
}
}
}

@Test
Expand All @@ -103,14 +112,50 @@ public void testParseNonJsonFailure() {
static class JsonArrayWithKnownFirstArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) throws Exception {
// First test, all chunks smaller than maxSize, output has 3 lists, all smaller than maxSize
String chunk11 = "{\"ὊὊὊ1\":\"ὊὊὊ1\"}";
String chunk12 = "{\"ὊὊὊ2\":\"ὊὊὊO2\"}";
String chunk13 = "{\"a3\":\"b3\"}";
String chunk14 = "{\"ὊὊὊ4\":\"ὊὊὊ4\"}";
// Second test, all chunks smaller than maxSize, output has 2 lists, all smaller than maxSize
String chunk21 = "{\"aaa1\":\"aaa1\"}";
String chunk22 = "{\"aaa2\":\"aaa2\"}";
String chunk23 = "{\"a3\":\"b3\"}";
String chunk24 = "{\"bbb4\":\"bbb4\"}";
// Third test, all chunks larger than maxSize, output has 4 lists, all larger than maxSize
String chunk31 = "{\"ὊὊὊ1\":\"ὊὊὊ01\"}";
String chunk32 = "{\"ὊὊὊ2\":\"ὊὊὊO2\"}";
String chunk33 = "{\"ὊὊὊ3\":\"ὊὊὊO3\"}";
String chunk34 = "{\"ὊὊὊ4\":\"ὊὊὊO4\"}";
// Fourth test, only first chunk larger than maxSize, output has 3 lists, with first chunk larger than maxSize and others smaller
String chunk41 = "{\"aaaaaaaaaaa1\":\"aaaaaaaaaaa1\"}";
String chunk42 = "{\"aaa2\":\"aaa2\"}";
String chunk43 = "{\"a3\":\"b3\"}";
String chunk44 = "{\"bbb4\":\"bbb4\"}";
// Fifth test, only second chunk larger than maxSize, output has 3 lists, with second chunk larger than maxSize and others smaller
String chunk51 = "{\"aaa2\":\"aaa2\"}";
String chunk52 = "{\"aaaaaaaaaaa1\":\"aaaaaaaaaaa1\"}";
String chunk53 = "{\"a3\":\"b3\"}";
String chunk54 = "{\"bb4\":\"bb4\"}";
// Sixth test, only last chunk larger than maxSize, output has 3 lists, with last chunk larger than maxSize and others smaller
String chunk61 = "{\"aaa2\":\"aaa2\"}";
String chunk62 = "{\"a3\":\"b3\"}";
String chunk63 = "{\"bbb4\":\"bbb4\"}";
String chunk64 = "{\"aaaaaaaaaaa1\":\"aaaaaaaaaaa1\"}";
final int maxSize1 = chunk11.getBytes(Charset.defaultCharset()).length * 2 + 3;
final int maxSize2 = chunk21.getBytes(Charset.defaultCharset()).length * 2 + 3;
final int maxSize3 = chunk31.getBytes(Charset.defaultCharset()).length - 1;
final int maxSize4 = chunk42.getBytes(Charset.defaultCharset()).length * 2 + 3;
final int maxSize5 = chunk51.getBytes(Charset.defaultCharset()).length * 2 + 3;
final int maxSize6 = chunk61.getBytes(Charset.defaultCharset()).length * 2 + 3;
return Stream.of(
arguments(
"[{\"ὊὊὊ1\":\"ὊὊὊ1\"}, {\"ὊὊὊ2\":\"ὊὊὊ2\"}, {\"a3\":\"b3\"}, {\"ὊὊὊ4\":\"ὊὊὊ4\"}]",
"{\"ὊὊὊ1\":\"ὊὊὊ1\"}"),
arguments(
"[{\"aaa1\":\"aaa1\"}, {\"aaa2\":\"aaa2\"}, {\"a3\":\"b3\"}, {\"bbb4\":\"bbb4\"}]",
"{\"aaa1\":\"aaa1\"}")
arguments("["+chunk11+","+chunk12+","+chunk13+","+chunk14+"]", chunk11, maxSize1, List.of(List.of(chunk11), List.of(chunk12, chunk13), List.of(chunk14)), List.of(false, false, false)),
arguments("["+chunk21+","+chunk22+","+chunk23+","+chunk24+"]", chunk21, maxSize2, List.of(List.of(chunk21, chunk22), List.of(chunk23, chunk24)), List.of(false, false)),
arguments("["+chunk31+","+chunk32+","+chunk33+","+chunk34+"]", chunk31, maxSize3, List.of(List.of(chunk31), List.of(chunk32), List.of(chunk33), List.of(chunk34)), List.of(true, true, true, true)),
arguments("["+chunk41+","+chunk42+","+chunk43+","+chunk44+"]", chunk41, maxSize4, List.of(List.of(chunk41), List.of(chunk42, chunk43), List.of(chunk44)), List.of(true, false, false)),
arguments("["+chunk51+","+chunk52+","+chunk53+","+chunk54+"]", chunk51, maxSize5, List.of(List.of(chunk51), List.of(chunk52), List.of(chunk53,chunk54)), List.of(false, true, false)),
arguments("["+chunk61+","+chunk62+","+chunk63+","+chunk64+"]", chunk61, maxSize6, List.of(List.of(chunk61,chunk62), List.of(chunk63), List.of(chunk64)), List.of(false, false, true))
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class LogHTTPService {
private static final int SERIALIZATION_OVERHEAD = 1024;
public static final String REQUESTS_RECEIVED = "requestsReceived";
public static final String SUCCESS_REQUESTS = "successRequests";
public static final String REQUESTS_OVER_OPTIMAL_SIZE = "requestsOverOptimalSize";
public static final String REQUESTS_OVER_MAXIMUM_SIZE = "requestsOverMaximumSize";
public static final String PAYLOAD_SIZE = "payloadSize";
public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration";

Expand All @@ -50,9 +52,12 @@ public class LogHTTPService {
private final int bufferWriteTimeoutInMillis;
private final Counter requestsReceivedCounter;
private final Counter successRequestsCounter;
private final Counter requestsOverOptimalSizeCounter;
private final Counter requestsOverMaximumSizeCounter;
private final DistributionSummary payloadSizeSummary;
private final Timer requestProcessDuration;
private Integer maxRequestLength;
private Integer optimalRequestLength;

public LogHTTPService(final int bufferWriteTimeoutInMillis,
final Buffer<Record<Log>> buffer,
Expand All @@ -61,8 +66,11 @@ public LogHTTPService(final int bufferWriteTimeoutInMillis,
this.buffer = buffer;
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
this.maxRequestLength = buffer.getMaxRequestSize().isPresent() ? buffer.getMaxRequestSize().get(): null;
this.optimalRequestLength = buffer.getOptimalRequestSize().isPresent() ? buffer.getOptimalRequestSize().get(): null;
requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
requestsOverOptimalSizeCounter = pluginMetrics.counter(REQUESTS_OVER_OPTIMAL_SIZE);
requestsOverMaximumSizeCounter = pluginMetrics.counter(REQUESTS_OVER_MAXIMUM_SIZE);
payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE);
requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION);
}
Expand Down Expand Up @@ -91,24 +99,33 @@ private void sendJsonList(List<String> jsonList) throws Exception {
}
sb.append("]");
if (sb.toString().getBytes().length > maxRequestLength) {
requestsOverMaximumSizeCounter.increment();
throw new RuntimeException("Request length "+ sb.toString().getBytes().length + " exceeds maxRequestLength "+ maxRequestLength);
} else if (sb.toString().getBytes().length > optimalRequestLength) {
requestsOverOptimalSizeCounter.increment();
}
buffer.writeBytes(sb.toString().getBytes(), key, bufferWriteTimeoutInMillis);
}

private HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) throws Exception {
HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) throws Exception {
final HttpData content = aggregatedHttpRequest.content();
List<List<String>> jsonList;
boolean isJsonListSplit = false;

try {
jsonList = (maxRequestLength == null) ? jsonCodec.parse(content) : jsonCodec.parse(content, maxRequestLength - SERIALIZATION_OVERHEAD);
if (buffer.isByteBuffer() && maxRequestLength != null && optimalRequestLength != null) {
jsonList = jsonCodec.parse(content, optimalRequestLength - SERIALIZATION_OVERHEAD);
isJsonListSplit = true;
} else {
jsonList = jsonCodec.parse(content);
}
} catch (IOException e) {
LOG.error("Failed to parse the request of size {} due to: {}", content.length(), e.getMessage());
throw new IOException("Bad request data format. Needs to be json array.", e.getCause());
}
try {
if (buffer.isByteBuffer()) {
if (maxRequestLength != null && content.array().length > maxRequestLength) {
if (isJsonListSplit && content.array().length > optimalRequestLength) {
for (final List<String> innerJsonList: jsonList) {
sendJsonList(innerJsonList);
}
Expand Down
Loading

0 comments on commit 1bfed0d

Please sign in to comment.