From b6168bc67244f69bd3aa688786ef0b76964b9ac9 Mon Sep 17 00:00:00 2001 From: Alan Frindell Date: Mon, 16 Dec 2024 22:27:05 -0800 Subject: [PATCH] WebTransport error handling improvements Summary: There's three main updates here: 1) Only attempt to decode application error codes when the underlying transport requires it (eg: HTTP) 2) Specify a WebTransport::Exception ctor that takes a string, so we can differentiate the source of the error (eg: RESET_STREAM, STOP_SENDING, other) 3) Set an interrupt handler for the read/write promises so they play with cancellation Reviewed By: mjoras Differential Revision: D67119088 fbshipit-source-id: b6b9ddbb24155ab560015e2945509361e3a05d3a --- proxygen/lib/http/session/HTTPTransaction.h | 4 + .../test/HTTPTransactionWebTransportTest.cpp | 115 +++++++++++++++--- .../lib/http/webtransport/QuicWebTransport.h | 4 + proxygen/lib/http/webtransport/WebTransport.h | 5 + .../http/webtransport/WebTransportImpl.cpp | 57 +++++++-- .../lib/http/webtransport/WebTransportImpl.h | 6 +- 6 files changed, 160 insertions(+), 31 deletions(-) diff --git a/proxygen/lib/http/session/HTTPTransaction.h b/proxygen/lib/http/session/HTTPTransaction.h index 498b6a03d1..596bc87454 100644 --- a/proxygen/lib/http/session/HTTPTransaction.h +++ b/proxygen/lib/http/session/HTTPTransaction.h @@ -652,6 +652,10 @@ class HTTPTransaction folly::assume_unreachable(); } + bool usesEncodedApplicationErrorCodes() override { + return true; + } + [[nodiscard]] virtual bool supportsWebTransport() const { return false; } diff --git a/proxygen/lib/http/session/test/HTTPTransactionWebTransportTest.cpp b/proxygen/lib/http/session/test/HTTPTransactionWebTransportTest.cpp index ddc21f59b7..cd7e497ab9 100644 --- a/proxygen/lib/http/session/test/HTTPTransactionWebTransportTest.cpp +++ b/proxygen/lib/http/session/test/HTTPTransactionWebTransportTest.cpp @@ -73,15 +73,24 @@ class HTTPTransactionWebTransportTest : public testing::Test { HTTP2PriorityQueue txnEgressQueue_; std::unique_ptr txn_; - static void readCallback(folly::Try streamData, - bool expectException, - size_t expectedLength, - bool expectFin) { + static void readCallback( + folly::Try streamData, + bool expectException, + size_t expectedLength, + bool expectFin, + folly::Optional expectedErrorCode = folly::none) { VLOG(4) << __func__ << " expectException=" << uint64_t(expectException) << " expectedLength=" << expectedLength << " expectFin=" << expectFin; EXPECT_EQ(streamData.hasException(), expectException); if (expectException || streamData.hasException()) { + if (expectedErrorCode) { + auto wtEx = streamData.tryGetExceptionObject(); + EXPECT_NE(wtEx, nullptr); + if (wtEx) { + EXPECT_EQ(wtEx->error, *expectedErrorCode); + } + } return; } if (streamData->data) { @@ -202,14 +211,18 @@ TEST_F(HTTPTransactionWebTransportTest, ReadStreamBufferedError) { auto implHandle = txn_->onWebTransportUniStream(0); EXPECT_NE(readHandle, nullptr); - implHandle->deliverReadError(WT_APP_ERROR_2); - - // read with buffered error - auto fut = readHandle->readStreamData() - .via(&eventBase_) - .thenTry([](auto streamData) { - readCallback(std::move(streamData), true, 0, false); - }); + implHandle->readError(implHandle->getID(), + quic::QuicError(quic::ApplicationErrorCode( + WebTransport::toHTTPErrorCode(WT_APP_ERROR_2)))); + + // read with buffered error - simulate coming directly from QUIC with + // encoded error code + auto fut = + readHandle->readStreamData() + .via(&eventBase_) + .thenTry([](auto streamData) { + readCallback(std::move(streamData), true, 0, false, WT_APP_ERROR_2); + }); eventBase_.loopOnce(); EXPECT_TRUE(fut.isReady()); } @@ -223,18 +236,43 @@ TEST_F(HTTPTransactionWebTransportTest, ReadStreamError) { EXPECT_NE(readHandle, nullptr); // read with nothing queued - auto fut = readHandle->readStreamData() - .via(&eventBase_) - .thenTry([](auto streamData) { - readCallback(std::move(streamData), true, 0, false); - }); + auto fut = + readHandle->readStreamData() + .via(&eventBase_) + .thenTry([](auto streamData) { + readCallback(std::move(streamData), true, 0, false, WT_APP_ERROR_2); + }); EXPECT_FALSE(fut.isReady()); - implHandle->deliverReadError(WT_APP_ERROR_2); + // Don't encode the error, it will be passed directly + implHandle->readError( + implHandle->getID(), + quic::QuicError(quic::ApplicationErrorCode(WT_APP_ERROR_2))); eventBase_.loopOnce(); EXPECT_TRUE(fut.isReady()); } +TEST_F(HTTPTransactionWebTransportTest, ReadStreamCancel) { + WebTransport::StreamReadHandle* readHandle{nullptr}; + EXPECT_CALL(handler_, onWebTransportUniStream(_, _)) + .WillOnce(SaveArg<1>(&readHandle)); + + txn_->onWebTransportUniStream(0); + EXPECT_NE(readHandle, nullptr); + + // Get the read future + auto fut = readHandle->readStreamData(); + + // Cancel the future, the transport will get a STOP_SENDING + EXPECT_CALL(transport_, + stopReadingWebTransportIngress(0, WebTransport::kInternalError)) + .WillOnce(Return(folly::unit)); + fut.cancel(); + EXPECT_TRUE(fut.isReady()); + EXPECT_NE(fut.result().tryGetExceptionObject(), + nullptr); +} + TEST_F(HTTPTransactionWebTransportTest, WriteFails) { EXPECT_CALL(transport_, newWebTransportUniStream()).WillOnce(Return(1)); auto res = wt_->createUniStream(); @@ -300,6 +338,47 @@ TEST_F(HTTPTransactionWebTransportTest, WriteStreamPauseStopSending) { EXPECT_TRUE(ready); } +TEST_F(HTTPTransactionWebTransportTest, AwaitWritableCancel) { + EXPECT_CALL(transport_, newWebTransportUniStream()).WillOnce(Return(1)); + auto writeHandle = wt_->createUniStream(); + EXPECT_FALSE(writeHandle.hasError()); + + // Block write + quic::StreamWriteCallback* wcb{nullptr}; + EXPECT_CALL(transport_, notifyPendingWriteOnStream(1, testing::_)) + .WillOnce(DoAll(SaveArg<1>(&wcb), Return(folly::unit))); + // awaitWritable + auto fut = writeHandle.value()->awaitWritable().value(); + + // Cancel future + fut.cancel(); + EXPECT_TRUE(fut.isReady()); + EXPECT_TRUE(fut.hasException()); + EXPECT_NE(fut.result().tryGetExceptionObject(), + nullptr); + + // awaitWritable again + bool ready = false; + EXPECT_CALL(transport_, notifyPendingWriteOnStream(1, testing::_)) + .WillOnce(DoAll(SaveArg<1>(&wcb), Return(folly::unit))); + writeHandle.value() + ->awaitWritable() + .value() + .via(&eventBase_) + .thenTry([&ready, &writeHandle, this](auto writeReady) { + EXPECT_TRUE(writeReady.hasValue()); + EXPECT_CALL(transport_, resetWebTransportEgress(1, WT_APP_ERROR_1)); + writeHandle.value()->resetStream(WT_APP_ERROR_1); + ready = true; + }); + EXPECT_FALSE(ready); + + // Resume - only happens once because the reset, maybe? + wcb->onStreamWriteReady(0, 65536); + eventBase_.loopOnce(); + EXPECT_TRUE(ready); +} + TEST_F(HTTPTransactionWebTransportTest, BidiStreamEdgeCases) { WebTransport::BidiStreamHandle streamHandle; EXPECT_CALL(handler_, onWebTransportBidiStream(_, _)) diff --git a/proxygen/lib/http/webtransport/QuicWebTransport.h b/proxygen/lib/http/webtransport/QuicWebTransport.h index b5c4f32469..2adf038ce5 100644 --- a/proxygen/lib/http/webtransport/QuicWebTransport.h +++ b/proxygen/lib/http/webtransport/QuicWebTransport.h @@ -117,6 +117,10 @@ class QuicWebTransport folly::Expected sendDatagram( std::unique_ptr /*datagram*/) override; + bool usesEncodedApplicationErrorCodes() override { + return false; + } + folly::Expected closeSession( folly::Optional /*error*/) override; diff --git a/proxygen/lib/http/webtransport/WebTransport.h b/proxygen/lib/http/webtransport/WebTransport.h index fa12f80323..0c1c735f1f 100644 --- a/proxygen/lib/http/webtransport/WebTransport.h +++ b/proxygen/lib/http/webtransport/WebTransport.h @@ -87,6 +87,11 @@ class WebTransport { "Peer reset or abandoned stream with error=", inError)), error(inError) { } + Exception(uint32_t inError, const std::string& msg) + : std::runtime_error( + folly::to(msg, " with error=", inError)), + error(inError) { + } uint32_t error; }; diff --git a/proxygen/lib/http/webtransport/WebTransportImpl.cpp b/proxygen/lib/http/webtransport/WebTransportImpl.cpp index 52c73d3de5..ad218305fc 100644 --- a/proxygen/lib/http/webtransport/WebTransportImpl.cpp +++ b/proxygen/lib/http/webtransport/WebTransportImpl.cpp @@ -24,7 +24,8 @@ void WebTransportImpl::destroy() { // Deliver an error to the application if needed if (stream.open()) { VLOG(4) << "aborting WT ingress id=" << id; - stream.deliverReadError(WebTransport::kInternalError); + stream.deliverReadError(WebTransport::Exception( + WebTransport::kInternalError, "shutting down")); stopReadingWebTransportIngress(id, WebTransport::kInternalError); // TODO: does the spec say how to handle this at the transport? Eg: the // peer must RESET any open write streams. @@ -156,12 +157,23 @@ WebTransportImpl::StreamWriteHandle::awaitWritable() { CHECK(!writePromise_) << "awaitWritable already called"; auto contract = folly::makePromiseContract(); writePromise_.emplace(std::move(contract.promise)); + writePromise_->setInterruptHandler( + [this](const folly::exception_wrapper& ex) { + VLOG(4) << "Exception from interrupt handler ex=" << ex.what(); + // if awaitWritable is cancelled, just reset it + CHECK(ex.with_exception([this](const folly::FutureCancellation& ex) { + VLOG(5) << "Setting exception ex=" << ex.what(); + writePromise_->setException(ex); + writePromise_.reset(); + })) << "Unexpected exception type"; + }); impl_.tp_.notifyPendingWriteOnStream(id_, this); return std::move(contract.future); } void WebTransportImpl::onWebTransportStopSending(HTTPCodec::StreamID id, uint32_t errorCode) { + // The caller already decodes errorCode, if necessary auto it = wtEgressStreams_.find(id); if (it != wtEgressStreams_.end()) { it->second.onStopSending(errorCode); @@ -169,6 +181,7 @@ void WebTransportImpl::onWebTransportStopSending(HTTPCodec::StreamID id, } void WebTransportImpl::StreamWriteHandle::onStopSending(uint32_t errorCode) { + // The caller already decodes errorCode, if necessary auto token = cancellationSource_.getToken(); if (writePromise_) { writePromise_->setException(WebTransport::Exception(errorCode)); @@ -193,13 +206,23 @@ WebTransportImpl::StreamReadHandle::readStreamData() { VLOG(4) << __func__; CHECK(!readPromise_) << "One read at a time"; if (error_) { - auto ex = folly::make_exception_wrapper(*error_); + auto ex = std::move(*error_); impl_.wtIngressStreams_.erase(getID()); return folly::makeSemiFuture(std::move(ex)); } else if (buf_.empty() && !eof_) { VLOG(4) << __func__ << " waiting for data"; auto contract = folly::makePromiseContract(); readPromise_.emplace(std::move(contract.promise)); + readPromise_->setInterruptHandler( + [this](const folly::exception_wrapper& ex) { + VLOG(4) << "Exception from interrupt handler ex=" << ex.what(); + CHECK(ex.with_exception([this](const folly::FutureCancellation& ex) { + // TODO: allow app to configure the reset code on cancellation? + impl_.tp_.stopReadingWebTransportIngress( + id_, WebTransport::kInternalError); + deliverReadError(ex); + })) << "Unexpected exception type"; + }); return std::move(contract.future); } else { VLOG(4) << __func__ << " returning data len=" << buf_.chainLength(); @@ -265,25 +288,37 @@ void WebTransportImpl::StreamReadHandle::readError( impl_.sp_.refreshTimeout(); auto quicAppErrorCode = error.code.asApplicationErrorCode(); if (quicAppErrorCode) { - auto appErrorCode = - proxygen::WebTransport::toApplicationErrorCode(*quicAppErrorCode); - if (appErrorCode) { - deliverReadError(*appErrorCode); - return; + folly::Expected appErrorCode{ + *quicAppErrorCode}; + if (impl_.tp_.usesEncodedApplicationErrorCodes()) { + appErrorCode = + proxygen::WebTransport::toApplicationErrorCode(*quicAppErrorCode); + if (!appErrorCode) { + deliverReadError(WebTransport::Exception( + *quicAppErrorCode, "received invalid reset_stream")); + return; + } } + deliverReadError( + WebTransport::Exception(*appErrorCode, "received reset_stream")); + return; + } else { + VLOG(4) << error; } // any other error - deliverReadError(proxygen::WebTransport::kInternalError); + deliverReadError(WebTransport::Exception( + proxygen::WebTransport::kInternalError, "quic error")); } -void WebTransportImpl::StreamReadHandle::deliverReadError(uint32_t error) { +void WebTransportImpl::StreamReadHandle::deliverReadError( + const folly::exception_wrapper& ex) { cancellationSource_.requestCancellation(); if (readPromise_) { - readPromise_->setException(WebTransport::Exception(error)); + readPromise_->setException(ex); readPromise_.reset(); impl_.wtIngressStreams_.erase(getID()); } else { - error_ = error; + error_ = ex; } } diff --git a/proxygen/lib/http/webtransport/WebTransportImpl.h b/proxygen/lib/http/webtransport/WebTransportImpl.h index 5c7899178f..7429324465 100644 --- a/proxygen/lib/http/webtransport/WebTransportImpl.h +++ b/proxygen/lib/http/webtransport/WebTransportImpl.h @@ -66,6 +66,8 @@ class WebTransportImpl : public WebTransport { uint32_t /*errorCode*/) = 0; virtual folly::Expected sendDatagram( std::unique_ptr /*datagram*/) = 0; + + virtual bool usesEncodedApplicationErrorCodes() = 0; }; class SessionProvider { @@ -242,7 +244,7 @@ class WebTransportImpl : public WebTransport { } FCState dataAvailable(std::unique_ptr data, bool eof); - void deliverReadError(uint32_t error); + void deliverReadError(const folly::exception_wrapper& ex); [[nodiscard]] bool open() const { return !eof_ && !error_; } @@ -257,7 +259,7 @@ class WebTransportImpl : public WebTransport { folly::Optional> readPromise_; folly::IOBufQueue buf_{folly::IOBufQueue::cacheChainLength()}; bool eof_{false}; - folly::Optional error_; + folly::Optional error_; folly::CancellationSource cancellationSource_; };