Skip to content

Commit

Permalink
folly async io_uring: set EVB for IoSqeBase derived classes
Browse files Browse the repository at this point in the history
Summary:
There are two categories of IoSqeBase derived classes:
1. IoSqe in IoUringBackend
2. Various derivatives in AsyncIoUringSocket

Call `IoSqeBase::setEventBase()` from all derived classes to set/unset the EVB.

It's possible for `AsyncIoUringSocket` to detach and attach to different EVBs. Ensure that `IoSqeBase`s are synced.

Reviewed By: yfeldblum

Differential Revision: D68540579

fbshipit-source-id: 83ad739aec37378e08e4d7fd4855166af6bef290
  • Loading branch information
spikeh authored and facebook-github-bot committed Jan 24, 2025
1 parent 5f1eba8 commit 4a70a51
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 4 deletions.
13 changes: 11 additions & 2 deletions third-party/folly/src/folly/io/async/AsyncIoUringSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ AsyncIoUringSocket::ReadSqe::ReadSqe(AsyncIoUringSocket* parent)
: IoSqeBase(IoSqeBase::Type::Read), parent_(parent) {
supportsMultishotRecv_ = parent->options_.multishotRecv &&
parent->backend_->kernelSupportsRecvmsgMultishot();
setEventBase(parent->evb_);
}

AsyncIoUringSocket::~AsyncIoUringSocket() {
Expand Down Expand Up @@ -868,6 +869,8 @@ AsyncIoUringSocket::WriteSqe::WriteSqe(
msg_.msg_control = nullptr;
msg_.msg_controllen = 0;
msg_.msg_flags = 0;

setEventBase(parent->evb_);
}

int AsyncIoUringSocket::WriteSqe::sendMsgFlags() const {
Expand Down Expand Up @@ -1019,8 +1022,8 @@ void AsyncIoUringSocket::attachEventBase(EventBase* evb) {
std::move(*detachedWriteResult_)
.via(evb)
.thenValue(
[w = writeSqeActive_,
a = std::weak_ptr<folly::Unit>(alive_)](auto&& resFlagsPairs) {
[w = writeSqeActive_, a = std::weak_ptr<folly::Unit>(alive_), evb](
auto&& resFlagsPairs) {
VLOG(5) << "attached write done, " << resFlagsPairs.size();
if (!a.lock()) {
return;
Expand All @@ -1031,6 +1034,7 @@ void AsyncIoUringSocket::attachEventBase(EventBase* evb) {
cqe.res = res;
cqe.flags = flags;

evb->bumpHandlingTime();
if (w->cancelled()) {
w->callbackCancelled(&cqe);
} else {
Expand Down Expand Up @@ -1138,6 +1142,7 @@ void AsyncIoUringSocket::detachEventBase() {
}
readSqe_ = ReadSqe::UniquePtr(new ReadSqe(this));
readSqe_->setReadCallback(oldReadCallback, false);
readSqe_->setEventBase(nullptr);

unregisterFd();
if (!drc) {
Expand Down Expand Up @@ -1178,6 +1183,7 @@ folly::Optional<folly::SemiFuture<std::unique_ptr<IOBuf>>>
AsyncIoUringSocket::ReadSqe::detachEventBase() {
alive_ = nullptr;
parent_ = nullptr;
setEventBase(nullptr);
return std::move(oldEventBaseRead_);
}

Expand All @@ -1193,6 +1199,7 @@ void AsyncIoUringSocket::ReadSqe::attachEventBase() {
return;
}
auto* evb = parent_->evb_;
setEventBase(evb);
alive_ = std::make_shared<folly::Unit>();
folly::Func deferred =
[p = parent_, a = std::weak_ptr<folly::Unit>(alive_)]() {
Expand All @@ -1219,6 +1226,7 @@ AsyncIoUringSocket::FastOpenSqe::FastOpenSqe(
parent_(parent),
initialWrite(std::move(i)) {
addrLen_ = addr.getAddress(&addrStorage);
setEventBase(parent->evb_);
}

void AsyncIoUringSocket::FastOpenSqe::cleanupMsg() noexcept {
Expand Down Expand Up @@ -1311,6 +1319,7 @@ AsyncIoUringSocket::WriteSqe::detachEventBase() {
newSqe->refs_ = refs_;

parent_ = nullptr;
setEventBase(nullptr);
detachedSignal_ =
[prom = std::move(promise),
ret = std::vector<std::pair<int, uint32_t>>{},
Expand Down
10 changes: 8 additions & 2 deletions third-party/folly/src/folly/io/async/AsyncIoUringSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,10 @@ class AsyncIoUringSocket : public AsyncSocketTransport {

struct CloseSqe : IoSqeBase {
explicit CloseSqe(AsyncIoUringSocket* parent)
: IoSqeBase(IoSqeBase::Type::Close), parent_(parent) {}
: IoSqeBase(IoSqeBase::Type::Close), parent_(parent) {
setEventBase(parent->evb_);
}

void processSubmit(struct io_uring_sqe* sqe) noexcept override {
parent_->closeProcessSubmit(sqe);
}
Expand Down Expand Up @@ -417,7 +420,10 @@ class AsyncIoUringSocket : public AsyncSocketTransport {
explicit ConnectSqe(AsyncIoUringSocket* parent)
: IoSqeBase(IoSqeBase::Type::Connect),
AsyncTimeout(parent->evb_),
parent_(parent) {}
parent_(parent) {
setEventBase(parent->evb_);
}

void processSubmit(struct io_uring_sqe* sqe) noexcept override {
parent_->processConnectSubmit(sqe, addrStorage);
}
Expand Down
1 change: 1 addition & 0 deletions third-party/folly/src/folly/io/async/IoUringBackend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,7 @@ int IoUringBackend::eb_event_add(Event& event, const struct timeval* timeout) {
auto* ioSqe = allocIoSqe(event.getCallback());
CHECK(ioSqe);
ioSqe->event_ = &event;
ioSqe->setEventBase(event.eb_ev_base());

// just append it
submitList_.push_back(*ioSqe);
Expand Down
1 change: 1 addition & 0 deletions third-party/folly/src/folly/io/async/IoUringBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ class IoUringBackend : public EventBaseBackendBase {
FOLLY_ALWAYS_INLINE void resetEvent() {
// remove it from the list
unlink();
setEventBase(nullptr);
if (event_) {
event_->setUserData(nullptr);
event_ = nullptr;
Expand Down

0 comments on commit 4a70a51

Please sign in to comment.