Skip to content

Commit

Permalink
iouring sq poll and io poll
Browse files Browse the repository at this point in the history
  • Loading branch information
lihuiba committed Dec 26, 2024
1 parent 21c6130 commit a6b533c
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 79 deletions.
16 changes: 8 additions & 8 deletions .github/workflows/ci.linux.arm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ jobs:
- name: Build
run: |
source /opt/rh/gcc-toolset-9/enable
cmake -B build \
-D CMAKE_BUILD_TYPE=MinSizeRel \
-D PHOTON_ENABLE_ECOSYSTEM=ON \
-D PHOTON_BUILD_TESTING=ON \
-D PHOTON_ENABLE_SASL=ON \
-D PHOTON_ENABLE_FUSE=ON \
-D PHOTON_ENABLE_LIBCURL=ON \
-D PHOTON_ENABLE_EXTFS=ON
cmake -B build -D CMAKE_BUILD_TYPE=MinSizeRel \
-D PHOTON_ENABLE_ECOSYSTEM=ON \
-D PHOTON_BUILD_TESTING=ON \
-D PHOTON_ENABLE_SASL=ON \
-D PHOTON_ENABLE_FUSE=ON \
-D PHOTON_ENABLE_URING=ON \
-D PHOTON_ENABLE_LIBCURL=ON \
-D PHOTON_ENABLE_EXTFS=ON
cmake --build build -j $(nproc) -- VERBOSE=1
- name: Test
Expand Down
52 changes: 44 additions & 8 deletions io/fd-events.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,33 +136,69 @@ CascadingEventEngine* new_##name##_cascading_engine(); \

DECLARE_MASTER_AND_CASCADING_ENGINE(epoll);
DECLARE_MASTER_AND_CASCADING_ENGINE(select);
DECLARE_MASTER_AND_CASCADING_ENGINE(iouring);
// DECLARE_MASTER_AND_CASCADING_ENGINE(iouring);
DECLARE_MASTER_AND_CASCADING_ENGINE(kqueue);
DECLARE_MASTER_AND_CASCADING_ENGINE(epoll_ng);

inline int fd_events_init(uint64_t master_engine) {
struct iouring_args {
bool is_master = true;
bool setup_sqpoll = false;
bool setup_sq_aff = false;
bool setup_iopoll = false;
uint32_t sq_thread_cpu;
uint32_t sq_thread_idle_ms = 1000; // by default polls for 1s
};

void* new_iouring_event_engine(iouring_args args = {});

inline MasterEventEngine*
new_iouring_master_engine(iouring_args args = {}) {
args.is_master = true;
auto e = new_iouring_event_engine(args);
return (MasterEventEngine*)e;
}

inline CascadingEventEngine*
new_iouring_cascading_engine(iouring_args args = {}) {
args.is_master = false;
auto e = new_iouring_event_engine(args);
return (CascadingEventEngine*)e;
}

template<typename...Ts> inline MasterEventEngine*
new_master_event_engine(uint64_t master_engine, const Ts&...xs) {
switch (master_engine) {
#ifdef __linux__
case INIT_EVENT_EPOLL:
return _fd_events_init(&new_epoll_master_engine);
return new_epoll_master_engine(xs...);
case INIT_EVENT_EPOLL_NG:
return _fd_events_init(&new_epoll_ng_master_engine);
return new_epoll_ng_master_engine(xs...);
#endif
case INIT_EVENT_SELECT:
return _fd_events_init(&new_select_master_engine);
return new_select_master_engine(xs...);
#ifdef PHOTON_URING
case INIT_EVENT_IOURING:
return _fd_events_init(&new_iouring_master_engine);
return new_iouring_master_engine(xs...);
#endif
#ifdef __APPLE__
case INIT_EVENT_KQUEUE:
return _fd_events_init(&new_kqueue_master_engine);
return new_kqueue_master_engine(xs...);
#endif
default:
return -1;
return nullptr;
}
}

inline int fd_events_init(MasterEventEngine* master_engine) {
return !master_engine ? -1 :
(get_vcpu()->master_event_engine = master_engine, 0);
}


inline int fd_events_init(uint64_t master_engine) {
return fd_events_init(new_master_event_engine(master_engine));
}

inline int fd_events_fini() {
reset_master_event_engine_default();
return 0;
Expand Down
109 changes: 71 additions & 38 deletions io/iouring-wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ constexpr static EventsMap<EVUnderlay<POLLIN | POLLRDHUP, POLLOUT, POLLERR>> evm

class iouringEngine : public MasterEventEngine, public CascadingEventEngine, public ResetHandle {
public:
explicit iouringEngine(bool master) : m_master(master) {}

~iouringEngine() {
LOG_INFO("Finish event engine: iouring ", VALUE(m_master));
LOG_INFO("Finish event engine: iouring ",
make_named_value("is_master", m_args.is_master));
fini();
}

Expand All @@ -59,12 +58,11 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
}

int fini() {
if (m_eventfd >= 0 && !m_master) {
if (io_uring_unregister_eventfd(m_ring) != 0) {
LOG_ERROR("iouring: failed to unregister cascading event fd");
}
}
if (m_eventfd >= 0) {
if (!m_args.is_master) {
if (io_uring_unregister_eventfd(m_ring) != 0)
LOG_ERROR("iouring: failed to unregister cascading event fd ", ERRNO());
}
close(m_eventfd);
}
if (m_ring != nullptr) {
Expand All @@ -75,7 +73,10 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
return 0;
}

int init() {
int init() { return init(m_args); }
int init(iouring_args args) {
m_args = args;
m_args.setup_sq_aff = false;
int compare_result;
if (kernel_version_compare("5.11", compare_result) == 0 && compare_result <= 0) {
rlimit resource_limit{.rlim_cur = RLIM_INFINITY, .rlim_max = RLIM_INFINITY};
Expand All @@ -90,21 +91,54 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub

m_ring = new io_uring{};
io_uring_params params{};
if (m_cooperative_task_flag == 1)
if (m_cooperative_task_flag == 1) {
params.flags = IORING_SETUP_COOP_TASKRUN;
}
if (args.setup_iopoll)
params.flags |= IORING_SETUP_IOPOLL;
if (args.setup_sqpoll) {
params.flags |= IORING_SETUP_SQPOLL;
params.sq_thread_idle = args.sq_thread_idle_ms;
if (args.setup_sq_aff) {
params.flags |= IORING_SETUP_SQ_AFF;
params.sq_thread_cpu = args.sq_thread_cpu;
}
}

retry:
int ret = io_uring_queue_init_params(QUEUE_DEPTH, m_ring, &params);
if (ret != 0) {
if (-ret == EINVAL) {
auto& p = params;
if (p.flags & IORING_SETUP_DEFER_TASKRUN) {
p.flags &= ~IORING_SETUP_DEFER_TASKRUN;
p.flags &= ~IORING_SETUP_SINGLE_ISSUER;
LOG_INFO("io_uring_queue_init failed, removing IORING_SETUP_DEFER_TASKRUN, IORING_SETUP_SINGLE_ISSUER");
goto retry;
}
if (p.flags & IORING_SETUP_COOP_TASKRUN) {
// this seems to be conflicting with IORING_SETUP_SQPOLL,
// at least in 6.4.12-1.el8.elrepo.x86_64
p.flags &= ~IORING_SETUP_COOP_TASKRUN;
LOG_INFO("io_uring_queue_init failed, removing IORING_SETUP_COOP_TASKRUN");
goto retry;
}
if (p.flags & IORING_SETUP_CQSIZE) {
p.flags &= ~IORING_SETUP_CQSIZE;
LOG_INFO("io_uring_queue_init failed, removing IORING_SETUP_CQSIZE");
goto retry;
} }
// reset m_ring so that the destructor won't do duplicate munmap cleanup (io_uring_queue_exit)
delete m_ring;
m_ring = nullptr;
LOG_ERROR_RETURN(0, -1, "iouring: failed to init queue: ", ERRNO(-ret));
}

// Check feature supported
for (auto i : REQUIRED_FEATURES) {
if (!(params.features & i)) {
LOG_ERROR_RETURN(0, -1, "iouring: required feature not supported");
}
if (!check_required_features(params, IORING_FEAT_CUR_PERSONALITY,
IORING_FEAT_NODROP, IORING_FEAT_FAST_POLL,
IORING_FEAT_EXT_ARG, IORING_FEAT_RW_CUR_POS)) {
LOG_ERROR_RETURN(0, -1, "iouring: required feature not supported");
}

// Check opcode supported
Expand Down Expand Up @@ -142,7 +176,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
LOG_ERRNO_RETURN(0, -1, "iouring: failed to create eventfd");
}

if (m_master) {
if (args.is_master) {
// Setup a multishot poll on master engine to watch the cancel_wait
uint32_t poll_mask = evmap.translate_bitwisely(EVENT_READ);
auto sqe = _get_sqe();
Expand All @@ -162,6 +196,15 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
return 0;
}


template<typename T, typename...Ts>
bool check_required_features(const io_uring_params& params, T f, Ts...fs) {
return (params.features & f) && check_required_features(params, fs...);
}
bool check_required_features(const io_uring_params& params) {
return true;
}

/**
* @brief Get a SQE from ring, prepare IO, and wait for completion. Note all the SQEs are batch submitted
* later in the `wait_and_fire_events`.
Expand Down Expand Up @@ -239,7 +282,6 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
}
return 0;
}

int add_interest(Event e) override {
auto* sqe = _get_sqe();
if (sqe == nullptr)
Expand All @@ -261,9 +303,8 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
}
io_uring_sqe_set_data(sqe, &pair.first->second.io_ctx);
int ret = io_uring_submit(m_ring);
if (ret < 0) {
if (ret < 0)
LOG_ERROR_RETURN(-ret, -1, "iouring: fail to submit when adding interest, ", ERRNO(-ret));
}
return 0;
}

Expand All @@ -281,9 +322,8 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
io_uring_prep_poll_remove(sqe, (__u64) &iter->second.io_ctx);
io_uring_sqe_set_data(sqe, nullptr);
int ret = io_uring_submit(m_ring);
if (ret < 0) {
if (ret < 0)
LOG_ERROR_RETURN(-ret, -1, "iouring: fail to submit when removing interest, ", ERRNO(-ret));
}
return 0;
}

Expand Down Expand Up @@ -537,23 +577,17 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub
return {sec, nsec};
}

static constexpr const uint32_t REQUIRED_FEATURES[] = {
IORING_FEAT_CUR_PERSONALITY, IORING_FEAT_NODROP,
IORING_FEAT_FAST_POLL, IORING_FEAT_EXT_ARG,
IORING_FEAT_RW_CUR_POS};
static const int QUEUE_DEPTH = 16384;
static const int REGISTER_FILES_SPARSE_FD = -1;
static const int REGISTER_FILES_MAX_NUM = 10000;
bool m_master;
iouring_args m_args;
io_uring* m_ring = nullptr;
int m_eventfd = -1;
std::unordered_map<fdInterest, eventCtx, fdInterestHasher> m_event_contexts;
static int m_register_files_flag;
static int m_cooperative_task_flag;
};

constexpr const uint32_t iouringEngine::REQUIRED_FEATURES[];

int iouringEngine::m_register_files_flag = -1;

int iouringEngine::m_cooperative_task_flag = -1;
Expand Down Expand Up @@ -664,18 +698,17 @@ int iouring_unregister_files(int fd) {
return ee->register_unregister_files(fd, false);
}

__attribute__((noinline))
static iouringEngine* new_iouring(bool is_master) {
LOG_INFO("Init event engine: iouring ", VALUE(is_master));
return NewObj<iouringEngine>(is_master) -> init();
void* new_iouring_event_engine(iouring_args args) {
LOG_INFO("Init event engine: iouring ",
make_named_value("is_master", args.is_master),
make_named_value("setup_sqpoll", args.setup_sqpoll),
make_named_value("setup_sq_aff", args.setup_sq_aff),
make_named_value("sq_thread_cpu", args.sq_thread_cpu));
auto uring = NewObj<iouringEngine>() -> init(args);
if (args.is_master) return uring;
CascadingEventEngine* c = uring;
return c;
}

MasterEventEngine* new_iouring_master_engine() {
return new_iouring(true);
}

CascadingEventEngine* new_iouring_cascading_engine() {
return new_iouring(false);
}

}
14 changes: 5 additions & 9 deletions io/test/test-iouring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ limitations under the License.

using namespace photon;

const auto IOURING_FLAGS = INIT_EVENT_IOURING | INIT_EVENT_IOURING_SQPOLL;

// Common parameters
bool stop_test = false;
uint64_t qps = 0;
Expand Down Expand Up @@ -221,7 +223,7 @@ static void do_io_test(IOTestType type) {
off_t max_offset = st_buf.st_size - max_io_size;
ASSERT_GT(max_offset, 0);

photon::WorkPool wp(FLAGS_vcpu_num, photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE);
WorkPool wp(FLAGS_vcpu_num, IOURING_FLAGS, INIT_IO_NONE);
std::vector<photon::thread*> join_threads;

#ifdef TEST_IOURING_REGISTER_FILES
Expand Down Expand Up @@ -340,14 +342,8 @@ TEST(perf, DISABLED_read) {
class event_engine : public testing::Test {
protected:
void SetUp() override {
GTEST_ASSERT_EQ(0, photon::init(photon::INIT_EVENT_DEFAULT,
photon::INIT_IO_NONE));
#ifdef PHOTON_URING
engine = (ci_ev_engine == photon::INIT_EVENT_EPOLL) ? photon::new_epoll_cascading_engine()
: photon::new_iouring_cascading_engine();
#else
engine = photon::new_default_cascading_engine();
#endif
GTEST_ASSERT_EQ(0, init(IOURING_FLAGS, INIT_IO_NONE));
engine = photon::new_epoll_cascading_engine();
}
void TearDown() override {
delete engine;
Expand Down
Loading

0 comments on commit a6b533c

Please sign in to comment.