Skip to content

Commit

Permalink
Append subsequent HTTPEvents of type BODY
Browse files Browse the repository at this point in the history
Summary: Changed HTTPTransaction logic so that when we process a body event and the tail of the deferred ingress queue is also a body event, we append the body event we're processing to the tail's buffer chain.

Reviewed By: afrind

Differential Revision: D67172479

fbshipit-source-id: fa34ec3935168b32655adeaca88ea83edbf154fb
  • Loading branch information
Joanna Jo authored and facebook-github-bot committed Jan 10, 2025
1 parent 081bf38 commit f7f7929
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 18 deletions.
19 changes: 12 additions & 7 deletions third-party/proxygen/src/proxygen/lib/http/session/HTTPEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,8 @@ class HTTPEvent {
HTTPEvent(HTTPCodec::StreamID streamID,
Type event,
std::unique_ptr<folly::IOBuf> body)
: body_(std::move(body)),
streamID_(streamID),
length_(0),
event_(event),
upgrade_(false) {
: streamID_(streamID), length_(0), event_(event), upgrade_(false) {
body_.append(std::move(body));
}

HTTPEvent(HTTPCodec::StreamID streamID,
Expand Down Expand Up @@ -112,7 +109,15 @@ class HTTPEvent {
}

std::unique_ptr<folly::IOBuf> getBody() {
return std::move(body_);
return body_.move();
}

size_t getBodyLength() {
return body_.chainLength();
}

void appendChunk(std::unique_ptr<folly::IOBuf>&& chain) {
body_.append(std::move(chain));
}

std::unique_ptr<HTTPException> getError() {
Expand All @@ -139,7 +144,7 @@ class HTTPEvent {

private:
std::unique_ptr<HTTPMessage> headers_;
std::unique_ptr<folly::IOBuf> body_;
folly::IOBufQueue body_{folly::IOBufQueue::cacheChainLength()};
std::unique_ptr<HTTPHeaders> trailers_;
std::unique_ptr<HTTPException> error_;
HTTPCodec::StreamID streamID_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,22 @@ void HTTPTransaction::onIngressBody(unique_ptr<IOBuf> chain, uint16_t padding) {
}
if (mustQueueIngress()) {
checkCreateDeferredIngress();
deferredIngress_->emplace(id_, HTTPEvent::Type::BODY, std::move(chain));
VLOG(4) << "Queued ingress event of type " << HTTPEvent::Type::BODY
<< " size=" << len << " " << *this;

HTTPEvent* tailEvent =
deferredIngress_->empty() ? nullptr : &deferredIngress_->back();
bool shouldCoalesce =
tailEvent && tailEvent->getEvent() == HTTPEvent::Type::BODY &&
(tailEvent->getBodyLength() + len <= kMaxBufferPerTxn);

if (shouldCoalesce) {
VLOG(4) << "Coalesced ingress event of type " << HTTPEvent::Type::BODY
<< " size=" << len << " " << *this;
tailEvent->appendChunk(std::move(chain));
} else {
VLOG(4) << "Queued ingress event of type " << HTTPEvent::Type::BODY
<< " size=" << len << " " << *this;
deferredIngress_->emplace(id_, HTTPEvent::Type::BODY, std::move(chain));
}
} else {
INVARIANT(recvWindow_.free(len));
processIngressBody(std::move(chain), len);
Expand Down Expand Up @@ -1752,10 +1765,9 @@ void HTTPTransaction::resumeIngress() {
processIngressHeadersComplete(callback.getHeaders());
break;
case HTTPEvent::Type::BODY: {
unique_ptr<IOBuf> data = callback.getBody();
auto len = data->computeChainDataLength();
auto len = callback.getBodyLength();
INVARIANT(recvWindow_.free(len));
processIngressBody(std::move(data), len);
processIngressBody(callback.getBody(), len);
} break;
case HTTPEvent::Type::CHUNK_HEADER:
processIngressChunkHeader(callback.getChunkLength());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3431,7 +3431,7 @@ TEST_F(HTTP2DownstreamSessionTest, PaddingFlowControl) {
handler->txn_->pauseIngress();
eventBase_.runAfterDelay([&] { handler->txn_->resumeIngress(); }, 100);
});
EXPECT_CALL(*handler, _onBodyWithOffset(_, _)).Times(129);
EXPECT_CALL(*handler, _onBodyWithOffset(_, _)).Times(1);
handler->expectError();
handler->expectDetachTransaction();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,7 @@ class MockUpstreamController : public HTTPUpstreamSessionController {
};

ACTION_P(ExpectString, expected) {
std::string bodystr((const char*)arg1->data(), arg1->length());
EXPECT_EQ(bodystr, expected);
EXPECT_EQ(arg1->toString(), expected);
}

ACTION_P(ExpectBodyLen, expectedLen) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,7 @@ TEST_F(MockCodecDownstreamTest, Buffering) {
codecCallback_->onMessageComplete(HTTPCodec::StreamID(1), false);

EXPECT_CALL(handler, _onBodyWithOffset(_, _))
.WillOnce(ExpectString(chunkStr))
.WillOnce(ExpectString(chunkStr));
.WillOnce(ExpectString(chunkStr + chunkStr));

EXPECT_CALL(handler, _onEOM());

Expand Down

0 comments on commit f7f7929

Please sign in to comment.