Skip to content

Commit

Permalink
IoUring: enable completion injection
Browse files Browse the repository at this point in the history
Signed-off-by: He Jie Xu <hejie.xu@intel.com>
  • Loading branch information
soulxu committed Jul 24, 2023
1 parent 5badcfa commit 81a7fb7
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 75 deletions.
20 changes: 20 additions & 0 deletions envoy/common/io/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
)

licenses(["notice"]) # Apache 2

envoy_package()

envoy_cc_library(
name = "io_uring_interface",
hdrs = [
"io_uring.h",
],
deps = [
"//envoy/buffer:buffer_interface",
"//envoy/network:address_interface",
],
)
33 changes: 29 additions & 4 deletions source/common/io/io_uring.h → envoy/common/io/io_uring.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#pragma once

#include "envoy/common/pure.h"

#include "source/common/network/address_impl.h"
#include "envoy/network/address.h"

namespace Envoy {
namespace Io {
Expand All @@ -12,8 +11,15 @@ namespace Io {
* @param user_data is any data attached to an entry submitted to the submission
* queue.
* @param result is a return code of submitted system call.
* @param injected indicated the completion is injected or not.
*/
using CompletionCb = std::function<void(void* user_data, int32_t result, bool injected)>;

/**
* Callback for release the user data.
* @param user_data the pointer to the user data.
*/
using CompletionCb = std::function<void(void* user_data, int32_t result)>;
using InjectedCompletionUserDataReleasor = std::function<void(void* user_data)>;

enum class IoUringResult { Ok, Busy, Failed };

Expand Down Expand Up @@ -44,7 +50,7 @@ class IoUring {
* Iterates over entries in the completion queue, calls the given callback for
* every entry and marks them consumed.
*/
virtual void forEveryCompletion(CompletionCb completion_cb) PURE;
virtual void forEveryCompletion(const CompletionCb& completion_cb) PURE;

/**
* Prepares an accept system call and puts it into the submission queue.
Expand Down Expand Up @@ -95,8 +101,27 @@ class IoUring {
* with the forEveryCompletion() method and try again.
*/
virtual IoUringResult submit() PURE;

/**
* Inject a request completion into the io_uring. Those completions will be iterated
* when calling the `forEveryCompletion`.
* @param fd is the file descriptor of this completion refer to.
* @param user_data is the user data related to this completion.
* @param result is request result for this completion.
*/
virtual void injectCompletion(os_fd_t fd, void* user_data, int32_t result) PURE;

/**
* Remove all the injected completions for the specific fd.
* @param fd is used to refer to the completions will be removed.
* @param releasor should be provided for how to release the related user data.
*/
virtual void removeInjectedCompletion(os_fd_t fd,
InjectedCompletionUserDataReleasor releasor) PURE;
};

using IoUringPtr = std::unique_ptr<IoUring>;

/**
* Abstract factory for IoUring wrappers.
*/
Expand Down
2 changes: 1 addition & 1 deletion source/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ envoy_cc_library(
":lock_guard_lib",
":macros",
":non_copyable",
"//source/common/protobuf:protobuf",
"//source/common/protobuf",
] + select({
"//bazel:android_logger": ["logger_impl_lib_android"],
"//conditions:default": ["logger_impl_lib_standard"],
Expand Down
13 changes: 2 additions & 11 deletions source/common/io/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,6 @@ licenses(["notice"]) # Apache 2

envoy_package()

envoy_cc_library(
name = "io_uring_interface",
hdrs = [
"io_uring.h",
],
deps = [
"//source/common/network:address_lib",
],
)

envoy_cc_library(
name = "io_uring_impl_lib",
srcs = [
Expand All @@ -29,6 +19,7 @@ envoy_cc_library(
external_deps = ["uring"],
tags = ["nocompdb"],
deps = [
":io_uring_interface",
"//envoy/common/io:io_uring_interface",
"//envoy/thread_local:thread_local_interface",
],
)
55 changes: 50 additions & 5 deletions source/common/io/io_uring_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,60 @@ IoUringImpl::~IoUringImpl() { io_uring_queue_exit(&ring_); }

os_fd_t IoUringImpl::registerEventfd() {
ASSERT(!isEventfdRegistered());
event_fd_ = eventfd(0, 0);
// Mark the eventfd as non-blocking. since after injected completion is added. the eventfd
// will be activated to trigger the event callback. For the case of only injected completion
// is added and no actual iouring event. Then non-blocking can avoid the reading of eventfd
// blocking.
event_fd_ = eventfd(0, EFD_NONBLOCK);
int res = io_uring_register_eventfd(&ring_, event_fd_);
RELEASE_ASSERT(res == 0, fmt::format("unable to register eventfd: {}", errorDetails(-res)));
return event_fd_;
}

void IoUringImpl::unregisterEventfd() {
ASSERT(isEventfdRegistered());
int res = io_uring_unregister_eventfd(&ring_);
RELEASE_ASSERT(res == 0, fmt::format("unable to unregister eventfd: {}", errorDetails(-res)));
SET_SOCKET_INVALID(event_fd_);
}

bool IoUringImpl::isEventfdRegistered() const { return SOCKET_VALID(event_fd_); }

void IoUringImpl::forEveryCompletion(CompletionCb completion_cb) {
void IoUringImpl::forEveryCompletion(const CompletionCb& completion_cb) {
ASSERT(SOCKET_VALID(event_fd_));

eventfd_t v;
int ret = eventfd_read(event_fd_, &v);
RELEASE_ASSERT(ret == 0, "unable to drain eventfd");
while (true) {
int ret = eventfd_read(event_fd_, &v);
if (ret != 0) {
ASSERT(errno == EAGAIN);
break;
}
}

unsigned count = io_uring_peek_batch_cqe(&ring_, cqes_.data(), io_uring_size_);

for (unsigned i = 0; i < count; ++i) {
struct io_uring_cqe* cqe = cqes_[i];
completion_cb(reinterpret_cast<void*>(cqe->user_data), cqe->res);
completion_cb(reinterpret_cast<void*>(cqe->user_data), cqe->res, false);
}

io_uring_cq_advance(&ring_, count);

ENVOY_LOG(trace, "the num of injected completion is {}", injected_completions_.size());
// TODO(soulxu): Add bound here to avoid too many completion to stuck the thread too
// long.
// Iterate the injected completion.
while (!injected_completions_.empty()) {
auto& completion = injected_completions_.front();
completion_cb(completion.user_data_, completion.result_, true);
// The socket may closed in the completion_cb and all the related completions are
// removed.
if (injected_completions_.empty()) {
break;
}
injected_completions_.pop_front();
}
}

IoUringResult IoUringImpl::prepareAccept(os_fd_t fd, struct sockaddr* remote_addr,
Expand Down Expand Up @@ -143,5 +169,24 @@ IoUringResult IoUringImpl::submit() {
return res == -EBUSY ? IoUringResult::Busy : IoUringResult::Ok;
}

void IoUringImpl::injectCompletion(os_fd_t fd, void* user_data, int32_t result) {
injected_completions_.emplace_back(fd, user_data, result);
ENVOY_LOG(trace, "inject completion, fd = {}, req = {}, num injects = {}", fd,
fmt::ptr(user_data), injected_completions_.size());
}

void IoUringImpl::removeInjectedCompletion(os_fd_t fd,
InjectedCompletionUserDataReleasor releasor) {
ENVOY_LOG(trace, "remove injected completions for fd = {}, size = {}", fd,
injected_completions_.size());
injected_completions_.remove_if([fd, releasor](InjectedCompletion& completion) {
if (fd == completion.fd_) {
// Release the user data before remove this completion.
releasor(completion.user_data_);
}
return fd == completion.fd_;
});
}

} // namespace Io
} // namespace Envoy
21 changes: 18 additions & 3 deletions source/common/io/io_uring_impl.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#pragma once

#include "envoy/common/io/io_uring.h"
#include "envoy/thread_local/thread_local.h"

#include "source/common/io/io_uring.h"
#include "source/common/common/logger.h"

#include "liburing.h"

Expand All @@ -11,15 +12,26 @@ namespace Io {

bool isIoUringSupported();

class IoUringImpl : public IoUring, public ThreadLocal::ThreadLocalObject {
struct InjectedCompletion {
InjectedCompletion(os_fd_t fd, void* user_data, int32_t result)
: fd_(fd), user_data_(user_data), result_(result) {}

const os_fd_t fd_;
void* user_data_;
const int32_t result_;
};

class IoUringImpl : public IoUring,
public ThreadLocal::ThreadLocalObject,
protected Logger::Loggable<Logger::Id::io> {
public:
IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling);
~IoUringImpl() override;

os_fd_t registerEventfd() override;
void unregisterEventfd() override;
bool isEventfdRegistered() const override;
void forEveryCompletion(CompletionCb completion_cb) override;
void forEveryCompletion(const CompletionCb& completion_cb) override;
IoUringResult prepareAccept(os_fd_t fd, struct sockaddr* remote_addr, socklen_t* remote_addr_len,
void* user_data) override;
IoUringResult prepareConnect(os_fd_t fd, const Network::Address::InstanceConstSharedPtr& address,
Expand All @@ -30,12 +42,15 @@ class IoUringImpl : public IoUring, public ThreadLocal::ThreadLocalObject {
off_t offset, void* user_data) override;
IoUringResult prepareClose(os_fd_t fd, void* user_data) override;
IoUringResult submit() override;
void injectCompletion(os_fd_t fd, void* user_data, int32_t result) override;
void removeInjectedCompletion(os_fd_t fd, InjectedCompletionUserDataReleasor releasor) override;

private:
const uint32_t io_uring_size_;
struct io_uring ring_ {};
std::vector<struct io_uring_cqe*> cqes_;
os_fd_t event_fd_{INVALID_SOCKET};
std::list<InjectedCompletion> injected_completions_;
};

class IoUringFactoryImpl : public IoUringFactory {
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/all_extensions.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def _selected_extension_target(target):
return target + "_envoy_extension"

# Return all extensions to be compiled into Envoy.
def envoy_all_extensions(denylist = []):
def envoy_all_extensions(denylist = ["envoy.transport_sockets.tcp_stats"]):
all_extensions = dicts.add(_required_extensions, EXTENSIONS)

# These extensions can be removed on a site specific basis.
Expand Down
1 change: 1 addition & 0 deletions test/common/io/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ envoy_cc_test(
],
deps = [
"//source/common/io:io_uring_impl_lib",
"//source/common/network:address_lib",
"//test/mocks/server:server_mocks",
"//test/test_common:environment_lib",
"//test/test_common:utility_lib",
Expand Down
Loading

0 comments on commit 81a7fb7

Please sign in to comment.