diff --git a/third-party/folly/src/folly/io/async/AsyncIoUringSocket.cpp b/third-party/folly/src/folly/io/async/AsyncIoUringSocket.cpp index 9a95fcf2799661..7e7d9fd46c1154 100644 --- a/third-party/folly/src/folly/io/async/AsyncIoUringSocket.cpp +++ b/third-party/folly/src/folly/io/async/AsyncIoUringSocket.cpp @@ -141,6 +141,7 @@ AsyncIoUringSocket::ReadSqe::ReadSqe(AsyncIoUringSocket* parent) : IoSqeBase(IoSqeBase::Type::Read), parent_(parent) { supportsMultishotRecv_ = parent->options_.multishotRecv && parent->backend_->kernelSupportsRecvmsgMultishot(); + setEventBase(parent->evb_); } AsyncIoUringSocket::~AsyncIoUringSocket() { @@ -868,6 +869,8 @@ AsyncIoUringSocket::WriteSqe::WriteSqe( msg_.msg_control = nullptr; msg_.msg_controllen = 0; msg_.msg_flags = 0; + + setEventBase(parent->evb_); } int AsyncIoUringSocket::WriteSqe::sendMsgFlags() const { @@ -1019,8 +1022,8 @@ void AsyncIoUringSocket::attachEventBase(EventBase* evb) { std::move(*detachedWriteResult_) .via(evb) .thenValue( - [w = writeSqeActive_, - a = std::weak_ptr(alive_)](auto&& resFlagsPairs) { + [w = writeSqeActive_, a = std::weak_ptr(alive_), evb]( + auto&& resFlagsPairs) { VLOG(5) << "attached write done, " << resFlagsPairs.size(); if (!a.lock()) { return; @@ -1031,6 +1034,7 @@ void AsyncIoUringSocket::attachEventBase(EventBase* evb) { cqe.res = res; cqe.flags = flags; + evb->bumpHandlingTime(); if (w->cancelled()) { w->callbackCancelled(&cqe); } else { @@ -1138,6 +1142,7 @@ void AsyncIoUringSocket::detachEventBase() { } readSqe_ = ReadSqe::UniquePtr(new ReadSqe(this)); readSqe_->setReadCallback(oldReadCallback, false); + readSqe_->setEventBase(nullptr); unregisterFd(); if (!drc) { @@ -1178,6 +1183,7 @@ folly::Optional>> AsyncIoUringSocket::ReadSqe::detachEventBase() { alive_ = nullptr; parent_ = nullptr; + setEventBase(nullptr); return std::move(oldEventBaseRead_); } @@ -1193,6 +1199,7 @@ void AsyncIoUringSocket::ReadSqe::attachEventBase() { return; } auto* evb = parent_->evb_; + setEventBase(evb); alive_ = std::make_shared(); folly::Func deferred = [p = parent_, a = std::weak_ptr(alive_)]() { @@ -1219,6 +1226,7 @@ AsyncIoUringSocket::FastOpenSqe::FastOpenSqe( parent_(parent), initialWrite(std::move(i)) { addrLen_ = addr.getAddress(&addrStorage); + setEventBase(parent->evb_); } void AsyncIoUringSocket::FastOpenSqe::cleanupMsg() noexcept { @@ -1311,6 +1319,7 @@ AsyncIoUringSocket::WriteSqe::detachEventBase() { newSqe->refs_ = refs_; parent_ = nullptr; + setEventBase(nullptr); detachedSignal_ = [prom = std::move(promise), ret = std::vector>{}, diff --git a/third-party/folly/src/folly/io/async/AsyncIoUringSocket.h b/third-party/folly/src/folly/io/async/AsyncIoUringSocket.h index 9ec3cf5ddc8e30..d172ad8f73e168 100644 --- a/third-party/folly/src/folly/io/async/AsyncIoUringSocket.h +++ b/third-party/folly/src/folly/io/async/AsyncIoUringSocket.h @@ -351,7 +351,10 @@ class AsyncIoUringSocket : public AsyncSocketTransport { struct CloseSqe : IoSqeBase { explicit CloseSqe(AsyncIoUringSocket* parent) - : IoSqeBase(IoSqeBase::Type::Close), parent_(parent) {} + : IoSqeBase(IoSqeBase::Type::Close), parent_(parent) { + setEventBase(parent->evb_); + } + void processSubmit(struct io_uring_sqe* sqe) noexcept override { parent_->closeProcessSubmit(sqe); } @@ -417,7 +420,10 @@ class AsyncIoUringSocket : public AsyncSocketTransport { explicit ConnectSqe(AsyncIoUringSocket* parent) : IoSqeBase(IoSqeBase::Type::Connect), AsyncTimeout(parent->evb_), - parent_(parent) {} + parent_(parent) { + setEventBase(parent->evb_); + } + void processSubmit(struct io_uring_sqe* sqe) noexcept override { parent_->processConnectSubmit(sqe, addrStorage); } diff --git a/third-party/folly/src/folly/io/async/IoUringBackend.cpp b/third-party/folly/src/folly/io/async/IoUringBackend.cpp index 02b2895817091e..76740c3eb58303 100644 --- a/third-party/folly/src/folly/io/async/IoUringBackend.cpp +++ b/third-party/folly/src/folly/io/async/IoUringBackend.cpp @@ -1258,6 +1258,7 @@ int IoUringBackend::eb_event_add(Event& event, const struct timeval* timeout) { auto* ioSqe = allocIoSqe(event.getCallback()); CHECK(ioSqe); ioSqe->event_ = &event; + ioSqe->setEventBase(event.eb_ev_base()); // just append it submitList_.push_back(*ioSqe); diff --git a/third-party/folly/src/folly/io/async/IoUringBackend.h b/third-party/folly/src/folly/io/async/IoUringBackend.h index fae934d4cdfe91..0dd0b15eca6138 100644 --- a/third-party/folly/src/folly/io/async/IoUringBackend.h +++ b/third-party/folly/src/folly/io/async/IoUringBackend.h @@ -522,6 +522,7 @@ class IoUringBackend : public EventBaseBackendBase { FOLLY_ALWAYS_INLINE void resetEvent() { // remove it from the list unlink(); + setEventBase(nullptr); if (event_) { event_->setUserData(nullptr); event_ = nullptr;