Skip to content

Commit

Permalink
WebTransport error handling improvements
Browse files Browse the repository at this point in the history
Summary:
There's three main updates here:

1) Only attempt to decode application error codes when the underlying transport requires it (eg: HTTP)
2) Specify a WebTransport::Exception ctor that takes a string, so we can differentiate the source of the error (eg: RESET_STREAM, STOP_SENDING, other)
3) Set an interrupt handler for the read/write promises so they play with cancellation

Reviewed By: mjoras

Differential Revision: D67119088

fbshipit-source-id: b6b9ddbb24155ab560015e2945509361e3a05d3a
  • Loading branch information
afrind authored and facebook-github-bot committed Dec 17, 2024
1 parent 31aa878 commit b6168bc
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 31 deletions.
4 changes: 4 additions & 0 deletions proxygen/lib/http/session/HTTPTransaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,10 @@ class HTTPTransaction
folly::assume_unreachable();
}

bool usesEncodedApplicationErrorCodes() override {
return true;
}

[[nodiscard]] virtual bool supportsWebTransport() const {
return false;
}
Expand Down
115 changes: 97 additions & 18 deletions proxygen/lib/http/session/test/HTTPTransactionWebTransportTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,24 @@ class HTTPTransactionWebTransportTest : public testing::Test {
HTTP2PriorityQueue txnEgressQueue_;
std::unique_ptr<HTTPTransaction> txn_;

static void readCallback(folly::Try<WebTransport::StreamData> streamData,
bool expectException,
size_t expectedLength,
bool expectFin) {
static void readCallback(
folly::Try<WebTransport::StreamData> streamData,
bool expectException,
size_t expectedLength,
bool expectFin,
folly::Optional<uint32_t> expectedErrorCode = folly::none) {
VLOG(4) << __func__ << " expectException=" << uint64_t(expectException)
<< " expectedLength=" << expectedLength
<< " expectFin=" << expectFin;
EXPECT_EQ(streamData.hasException(), expectException);
if (expectException || streamData.hasException()) {
if (expectedErrorCode) {
auto wtEx = streamData.tryGetExceptionObject<WebTransport::Exception>();
EXPECT_NE(wtEx, nullptr);
if (wtEx) {
EXPECT_EQ(wtEx->error, *expectedErrorCode);
}
}
return;
}
if (streamData->data) {
Expand Down Expand Up @@ -202,14 +211,18 @@ TEST_F(HTTPTransactionWebTransportTest, ReadStreamBufferedError) {
auto implHandle = txn_->onWebTransportUniStream(0);
EXPECT_NE(readHandle, nullptr);

implHandle->deliverReadError(WT_APP_ERROR_2);

// read with buffered error
auto fut = readHandle->readStreamData()
.via(&eventBase_)
.thenTry([](auto streamData) {
readCallback(std::move(streamData), true, 0, false);
});
implHandle->readError(implHandle->getID(),
quic::QuicError(quic::ApplicationErrorCode(
WebTransport::toHTTPErrorCode(WT_APP_ERROR_2))));

// read with buffered error - simulate coming directly from QUIC with
// encoded error code
auto fut =
readHandle->readStreamData()
.via(&eventBase_)
.thenTry([](auto streamData) {
readCallback(std::move(streamData), true, 0, false, WT_APP_ERROR_2);
});
eventBase_.loopOnce();
EXPECT_TRUE(fut.isReady());
}
Expand All @@ -223,18 +236,43 @@ TEST_F(HTTPTransactionWebTransportTest, ReadStreamError) {
EXPECT_NE(readHandle, nullptr);

// read with nothing queued
auto fut = readHandle->readStreamData()
.via(&eventBase_)
.thenTry([](auto streamData) {
readCallback(std::move(streamData), true, 0, false);
});
auto fut =
readHandle->readStreamData()
.via(&eventBase_)
.thenTry([](auto streamData) {
readCallback(std::move(streamData), true, 0, false, WT_APP_ERROR_2);
});
EXPECT_FALSE(fut.isReady());

implHandle->deliverReadError(WT_APP_ERROR_2);
// Don't encode the error, it will be passed directly
implHandle->readError(
implHandle->getID(),
quic::QuicError(quic::ApplicationErrorCode(WT_APP_ERROR_2)));
eventBase_.loopOnce();
EXPECT_TRUE(fut.isReady());
}

TEST_F(HTTPTransactionWebTransportTest, ReadStreamCancel) {
WebTransport::StreamReadHandle* readHandle{nullptr};
EXPECT_CALL(handler_, onWebTransportUniStream(_, _))
.WillOnce(SaveArg<1>(&readHandle));

txn_->onWebTransportUniStream(0);
EXPECT_NE(readHandle, nullptr);

// Get the read future
auto fut = readHandle->readStreamData();

// Cancel the future, the transport will get a STOP_SENDING
EXPECT_CALL(transport_,
stopReadingWebTransportIngress(0, WebTransport::kInternalError))
.WillOnce(Return(folly::unit));
fut.cancel();
EXPECT_TRUE(fut.isReady());
EXPECT_NE(fut.result().tryGetExceptionObject<folly::FutureCancellation>(),
nullptr);
}

TEST_F(HTTPTransactionWebTransportTest, WriteFails) {
EXPECT_CALL(transport_, newWebTransportUniStream()).WillOnce(Return(1));
auto res = wt_->createUniStream();
Expand Down Expand Up @@ -300,6 +338,47 @@ TEST_F(HTTPTransactionWebTransportTest, WriteStreamPauseStopSending) {
EXPECT_TRUE(ready);
}

TEST_F(HTTPTransactionWebTransportTest, AwaitWritableCancel) {
EXPECT_CALL(transport_, newWebTransportUniStream()).WillOnce(Return(1));
auto writeHandle = wt_->createUniStream();
EXPECT_FALSE(writeHandle.hasError());

// Block write
quic::StreamWriteCallback* wcb{nullptr};
EXPECT_CALL(transport_, notifyPendingWriteOnStream(1, testing::_))
.WillOnce(DoAll(SaveArg<1>(&wcb), Return(folly::unit)));
// awaitWritable
auto fut = writeHandle.value()->awaitWritable().value();

// Cancel future
fut.cancel();
EXPECT_TRUE(fut.isReady());
EXPECT_TRUE(fut.hasException());
EXPECT_NE(fut.result().tryGetExceptionObject<folly::FutureCancellation>(),
nullptr);

// awaitWritable again
bool ready = false;
EXPECT_CALL(transport_, notifyPendingWriteOnStream(1, testing::_))
.WillOnce(DoAll(SaveArg<1>(&wcb), Return(folly::unit)));
writeHandle.value()
->awaitWritable()
.value()
.via(&eventBase_)
.thenTry([&ready, &writeHandle, this](auto writeReady) {
EXPECT_TRUE(writeReady.hasValue());
EXPECT_CALL(transport_, resetWebTransportEgress(1, WT_APP_ERROR_1));
writeHandle.value()->resetStream(WT_APP_ERROR_1);
ready = true;
});
EXPECT_FALSE(ready);

// Resume - only happens once because the reset, maybe?
wcb->onStreamWriteReady(0, 65536);
eventBase_.loopOnce();
EXPECT_TRUE(ready);
}

TEST_F(HTTPTransactionWebTransportTest, BidiStreamEdgeCases) {
WebTransport::BidiStreamHandle streamHandle;
EXPECT_CALL(handler_, onWebTransportBidiStream(_, _))
Expand Down
4 changes: 4 additions & 0 deletions proxygen/lib/http/webtransport/QuicWebTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ class QuicWebTransport
folly::Expected<folly::Unit, WebTransport::ErrorCode> sendDatagram(
std::unique_ptr<folly::IOBuf> /*datagram*/) override;

bool usesEncodedApplicationErrorCodes() override {
return false;
}

folly::Expected<folly::Unit, WebTransport::ErrorCode> closeSession(
folly::Optional<uint32_t> /*error*/) override;

Expand Down
5 changes: 5 additions & 0 deletions proxygen/lib/http/webtransport/WebTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ class WebTransport {
"Peer reset or abandoned stream with error=", inError)),
error(inError) {
}
Exception(uint32_t inError, const std::string& msg)
: std::runtime_error(
folly::to<std::string>(msg, " with error=", inError)),
error(inError) {
}
uint32_t error;
};

Expand Down
57 changes: 46 additions & 11 deletions proxygen/lib/http/webtransport/WebTransportImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ void WebTransportImpl::destroy() {
// Deliver an error to the application if needed
if (stream.open()) {
VLOG(4) << "aborting WT ingress id=" << id;
stream.deliverReadError(WebTransport::kInternalError);
stream.deliverReadError(WebTransport::Exception(
WebTransport::kInternalError, "shutting down"));
stopReadingWebTransportIngress(id, WebTransport::kInternalError);
// TODO: does the spec say how to handle this at the transport? Eg: the
// peer must RESET any open write streams.
Expand Down Expand Up @@ -156,19 +157,31 @@ WebTransportImpl::StreamWriteHandle::awaitWritable() {
CHECK(!writePromise_) << "awaitWritable already called";
auto contract = folly::makePromiseContract<folly::Unit>();
writePromise_.emplace(std::move(contract.promise));
writePromise_->setInterruptHandler(
[this](const folly::exception_wrapper& ex) {
VLOG(4) << "Exception from interrupt handler ex=" << ex.what();
// if awaitWritable is cancelled, just reset it
CHECK(ex.with_exception([this](const folly::FutureCancellation& ex) {
VLOG(5) << "Setting exception ex=" << ex.what();
writePromise_->setException(ex);
writePromise_.reset();
})) << "Unexpected exception type";
});
impl_.tp_.notifyPendingWriteOnStream(id_, this);
return std::move(contract.future);
}

void WebTransportImpl::onWebTransportStopSending(HTTPCodec::StreamID id,
uint32_t errorCode) {
// The caller already decodes errorCode, if necessary
auto it = wtEgressStreams_.find(id);
if (it != wtEgressStreams_.end()) {
it->second.onStopSending(errorCode);
}
}

void WebTransportImpl::StreamWriteHandle::onStopSending(uint32_t errorCode) {
// The caller already decodes errorCode, if necessary
auto token = cancellationSource_.getToken();
if (writePromise_) {
writePromise_->setException(WebTransport::Exception(errorCode));
Expand All @@ -193,13 +206,23 @@ WebTransportImpl::StreamReadHandle::readStreamData() {
VLOG(4) << __func__;
CHECK(!readPromise_) << "One read at a time";
if (error_) {
auto ex = folly::make_exception_wrapper<WebTransport::Exception>(*error_);
auto ex = std::move(*error_);
impl_.wtIngressStreams_.erase(getID());
return folly::makeSemiFuture<WebTransport::StreamData>(std::move(ex));
} else if (buf_.empty() && !eof_) {
VLOG(4) << __func__ << " waiting for data";
auto contract = folly::makePromiseContract<WebTransport::StreamData>();
readPromise_.emplace(std::move(contract.promise));
readPromise_->setInterruptHandler(
[this](const folly::exception_wrapper& ex) {
VLOG(4) << "Exception from interrupt handler ex=" << ex.what();
CHECK(ex.with_exception([this](const folly::FutureCancellation& ex) {
// TODO: allow app to configure the reset code on cancellation?
impl_.tp_.stopReadingWebTransportIngress(
id_, WebTransport::kInternalError);
deliverReadError(ex);
})) << "Unexpected exception type";
});
return std::move(contract.future);
} else {
VLOG(4) << __func__ << " returning data len=" << buf_.chainLength();
Expand Down Expand Up @@ -265,25 +288,37 @@ void WebTransportImpl::StreamReadHandle::readError(
impl_.sp_.refreshTimeout();
auto quicAppErrorCode = error.code.asApplicationErrorCode();
if (quicAppErrorCode) {
auto appErrorCode =
proxygen::WebTransport::toApplicationErrorCode(*quicAppErrorCode);
if (appErrorCode) {
deliverReadError(*appErrorCode);
return;
folly::Expected<uint32_t, WebTransport::ErrorCode> appErrorCode{
*quicAppErrorCode};
if (impl_.tp_.usesEncodedApplicationErrorCodes()) {
appErrorCode =
proxygen::WebTransport::toApplicationErrorCode(*quicAppErrorCode);
if (!appErrorCode) {
deliverReadError(WebTransport::Exception(
*quicAppErrorCode, "received invalid reset_stream"));
return;
}
}
deliverReadError(
WebTransport::Exception(*appErrorCode, "received reset_stream"));
return;
} else {
VLOG(4) << error;
}
// any other error
deliverReadError(proxygen::WebTransport::kInternalError);
deliverReadError(WebTransport::Exception(
proxygen::WebTransport::kInternalError, "quic error"));
}

void WebTransportImpl::StreamReadHandle::deliverReadError(uint32_t error) {
void WebTransportImpl::StreamReadHandle::deliverReadError(
const folly::exception_wrapper& ex) {
cancellationSource_.requestCancellation();
if (readPromise_) {
readPromise_->setException(WebTransport::Exception(error));
readPromise_->setException(ex);
readPromise_.reset();
impl_.wtIngressStreams_.erase(getID());
} else {
error_ = error;
error_ = ex;
}
}

Expand Down
6 changes: 4 additions & 2 deletions proxygen/lib/http/webtransport/WebTransportImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class WebTransportImpl : public WebTransport {
uint32_t /*errorCode*/) = 0;
virtual folly::Expected<folly::Unit, WebTransport::ErrorCode> sendDatagram(
std::unique_ptr<folly::IOBuf> /*datagram*/) = 0;

virtual bool usesEncodedApplicationErrorCodes() = 0;
};

class SessionProvider {
Expand Down Expand Up @@ -242,7 +244,7 @@ class WebTransportImpl : public WebTransport {
}

FCState dataAvailable(std::unique_ptr<folly::IOBuf> data, bool eof);
void deliverReadError(uint32_t error);
void deliverReadError(const folly::exception_wrapper& ex);
[[nodiscard]] bool open() const {
return !eof_ && !error_;
}
Expand All @@ -257,7 +259,7 @@ class WebTransportImpl : public WebTransport {
folly::Optional<folly::Promise<WebTransport::StreamData>> readPromise_;
folly::IOBufQueue buf_{folly::IOBufQueue::cacheChainLength()};
bool eof_{false};
folly::Optional<uint32_t> error_;
folly::Optional<folly::exception_wrapper> error_;
folly::CancellationSource cancellationSource_;
};

Expand Down

0 comments on commit b6168bc

Please sign in to comment.