Skip to content

Commit

Permalink
Async: Use dynamically loaded liburing by default on Linux. Fallback …
Browse files Browse the repository at this point in the history
…to epoll if necessary.
  • Loading branch information
Pagghiu committed Feb 14, 2024
1 parent d970016 commit 882336b
Show file tree
Hide file tree
Showing 13 changed files with 900 additions and 413 deletions.
25 changes: 0 additions & 25 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,30 +67,5 @@
"command source ${workspaceRoot}/Support/DebugVisualizers/LLDB/.lldbinit"
],
},
{
"name": "SCTest [linux][io_uring] (gdb)",
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/_Build/Output/Posix-Any-Any-Makefile-Debug/SCTest",
"args": [],
"stopAtEntry": false,
"environment": [],
"externalConsole": false,
"MIMode": "gdb",
"cwd": "${workspaceRoot}",
"preLaunchTask": "Build SCTest Debug [linux][io_uring]",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
},
{
"description": "Set Disassembly Flavor to Intel",
"text": "-gdb-set disassembly-flavor intel",
"ignoreFailures": true
}
]
},
]
}
21 changes: 0 additions & 21 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,6 @@
"isDefault": true
}
},
{
"label": "Build SCTest Debug [linux][io_uring]",
"type": "shell",
"linux": {
"command": "make -j -C _Build/Projects/Make CPPFLAGS=-DSC_ASYNC_USE_IO_URING=1 LDFLAGS=-luring"
},
"problemMatcher": [
"$gcc"
],
"presentation": {
"echo": true,
"reveal": "never",
"focus": false,
"panel": "shared",
"showReuseMessage": true,
"clear": false
},
"group": {
"kind": "build",
}
},
{
"label": "Build SCTest Debug ARM64",
"type": "shell",
Expand Down
3 changes: 2 additions & 1 deletion Documentation/Libraries/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ Event loop can be run in different ways to allow integrated it in multiple ways
Library abstracts async operations by exposing a completion based mechanism.
This mechanism currently maps on `kqueue` on macOS and `OVERLAPPED` on Windows.

It currently uses by default `epoll` on Linux but an experimental `io_uring` backend can be enabled by defining `SC_ASYNC_USE_IO_URING=1` and linking `liburing` (with `-luring`).
It currently tries to dynamically load `io_uring` on Linux doing an `epoll` backend fallback in case `liburing` is not available on the system.
There is not need to link `liburing` because the library loads it dynamically and embeds the minimal set of `static` `inline` functions needed to interface with it.

The api works on file and socket descriptors, that can be obtained from the [File](@ref library_file) and [Socket](@ref library_socket) libraries.

Expand Down
51 changes: 27 additions & 24 deletions Libraries/Async/Async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,9 @@
#if SC_PLATFORM_WINDOWS
#include "Internal/AsyncWindows.inl"
#elif SC_PLATFORM_APPLE
#define SC_ASYNC_USE_EPOLL 0 // uses kqueue
#include "Internal/AsyncPosix.inl"
#elif SC_PLATFORM_LINUX
#if SC_ASYNC_USE_IO_URING
#include "Internal/AsyncLinux.inl" // uses io_uring
#else
#define SC_ASYNC_USE_EPOLL 1 // uses epoll
#include "Internal/AsyncPosix.inl"
#endif
#elif SC_PLATFORM_EMSCRIPTEN
#include "Internal/AsyncEmscripten.inl"
#endif
Expand Down Expand Up @@ -247,10 +241,10 @@ void SC::AsyncEventLoop::invokeExpiredTimers()
}
}

SC::Result SC::AsyncEventLoop::create()
SC::Result SC::AsyncEventLoop::create(Options options)
{
Internal& self = internal.get();
SC_TRY(self.createEventLoop());
SC_TRY(self.createEventLoop(options));
SC_TRY(self.createSharedWatchers(*this));
return SC::Result(true);
}
Expand Down Expand Up @@ -334,7 +328,7 @@ SC::Result SC::AsyncEventLoop::completeAndEventuallyReactivate(KernelQueue& queu

SC::Result SC::AsyncEventLoop::runStep(SyncMode syncMode)
{
KernelQueue queue;
KernelQueue queue(internal.get());
SC_LOG_MESSAGE("---------------\n");

while (AsyncRequest* async = submissions.dequeueFront())
Expand Down Expand Up @@ -387,15 +381,20 @@ void SC::AsyncEventLoop::runStepExecuteManualCompletions(KernelQueue& queue)

void SC::AsyncEventLoop::runStepExecuteCompletions(KernelQueue& queue)
{
Internal& self = internal.get();
for (decltype(KernelQueue::newEvents) idx = 0; idx < queue.newEvents; ++idx)
for (uint32_t idx = 0; idx < queue.getNumEvents(); ++idx)
{
SC_LOG_MESSAGE(" Iteration = {}\n", (int)idx);
SC_LOG_MESSAGE(" Iteration = {}\n", idx);
SC_LOG_MESSAGE(" Active Requests = {}\n", getTotalNumberOfActiveHandle());
bool continueProcessing = true;

Result result = Result(queue.validateEvent(queue.events[idx], continueProcessing));
AsyncRequest& async = *self.getAsyncRequest(queue.events[idx]);
AsyncRequest* request = queue.getAsyncRequest(idx);
if (request == nullptr)
{
continue;
}

AsyncRequest& async = *request;
Result result = Result(queue.validateEvent(idx, continueProcessing));
if (not result)
{
reportError(queue, async, move(result));
Expand All @@ -406,7 +405,7 @@ void SC::AsyncEventLoop::runStepExecuteCompletions(KernelQueue& queue)
{
continue;
}
async.eventIndex = idx;
async.eventIndex = static_cast<int32_t>(idx);
if (async.state == AsyncRequest::State::Active)
{
if (not completeAndEventuallyReactivate(queue, async, move(result)))
Expand Down Expand Up @@ -562,7 +561,7 @@ void SC::AsyncEventLoop::updateTime() { loopTime.snap(); }

void SC::AsyncEventLoop::executeTimers(KernelQueue& queue, const Time::HighResolutionCounter& nextTimer)
{
const bool timeoutOccurredWithoutIO = queue.newEvents == 0;
const bool timeoutOccurredWithoutIO = queue.getNumEvents() == 0;
const bool timeoutWasAlreadyExpired = loopTime.isLaterThanOrEqualTo(nextTimer);
if (timeoutOccurredWithoutIO or timeoutWasAlreadyExpired)
{
Expand Down Expand Up @@ -638,14 +637,6 @@ void SC::AsyncEventLoop::scheduleManualCompletion(AsyncRequest& async)
async.state = backupState;
}

#if SC_PLATFORM_WINDOWS
SC::Result SC::AsyncEventLoop::getLoopFileDescriptor(SC::FileDescriptor::Handle& fileDescriptor) const
{
return internal.get().loopFd.get(fileDescriptor,
SC::Result::Error("AsyncEventLoop::getLoopFileDescriptor invalid handle"));
}
#endif

SC::Result SC::AsyncEventLoop::createAsyncTCPSocket(SocketFlags::AddressFamily family, SocketDescriptor& outDescriptor)
{
auto res = outDescriptor.create(family, SocketFlags::SocketStream, SocketFlags::ProtocolTcp,
Expand All @@ -654,4 +645,16 @@ SC::Result SC::AsyncEventLoop::createAsyncTCPSocket(SocketFlags::AddressFamily f
return associateExternallyCreatedTCPSocket(outDescriptor);
}

SC::Result SC::AsyncEventLoop::wakeUpFromExternalThread() { return internal.get().wakeUpFromExternalThread(); }

SC::Result SC::AsyncEventLoop::associateExternallyCreatedTCPSocket(SocketDescriptor& outDescriptor)
{
return internal.get().associateExternallyCreatedTCPSocket(outDescriptor);
}

SC::Result SC::AsyncEventLoop::associateExternallyCreatedFileDescriptor(FileDescriptor& outDescriptor)
{
return internal.get().associateExternallyCreatedFileDescriptor(outDescriptor);
}

SC::Result SC::AsyncLoopWakeUp::wakeUp() { return getEventLoop()->wakeUpFromExternalThread(*this); }
56 changes: 39 additions & 17 deletions Libraries/Async/Async.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@
/// - `IOCP` on Windows
/// - `kqueue` on macOS
/// - `epoll` on Linux
/// - `io_uring` on Linux (experimental).
///
/// Note: To enable `io_uring` backend set `SC_ASYNC_USE_IO_URING=1` and link `liburing` (`-luring`).
/// - `io_uring` on Linux (dynamically loading `liburing`)
///
/// @note If `liburing` is not available on the system, the library will transparently fallback to epoll.
namespace SC
{
struct EventObject;
Expand Down Expand Up @@ -118,7 +117,7 @@ struct SC::AsyncRequest
#if SC_CONFIGURATION_DEBUG
void setDebugName(const char* newDebugName) { debugName = newDebugName; }
#else
void setDebugName(const char* newDebugName) { SC_COMPILER_UNUSED(newDebugName); }
void setDebugName(const char* newDebugName) { SC_COMPILER_UNUSED(newDebugName); }
#endif

/// @brief Get the event loop associated with this AsyncRequest
Expand Down Expand Up @@ -322,7 +321,7 @@ struct AsyncProcessExit : public AsyncRequest
detail::WinOverlappedOpaque overlapped;
detail::WinWaitHandle waitHandle;
#elif SC_PLATFORM_LINUX
FileDescriptor pidFd;
FileDescriptor pidFd;
#endif
};

Expand Down Expand Up @@ -568,7 +567,7 @@ struct AsyncFileRead : public AsyncRequest
#if SC_PLATFORM_WINDOWS
detail::WinOverlappedOpaque overlapped;
#elif SC_PLATFORM_LINUX
size_t syncReadBytes = 0;
size_t syncReadBytes = 0;
#endif
};

Expand Down Expand Up @@ -613,7 +612,7 @@ struct AsyncFileWrite : public AsyncRequest
#if SC_PLATFORM_WINDOWS
detail::WinOverlappedOpaque overlapped;
#elif SC_PLATFORM_LINUX
size_t syncWrittenBytes = 0;
size_t syncWrittenBytes = 0;
#endif
};

Expand All @@ -639,8 +638,8 @@ struct AsyncFileClose : public AsyncRequest
};

/// @brief Starts an handle polling operation.
/// Uses `GetOverlappedResult` (windows), `kevent` (macOS), `epoll` (Linux) and `io_uring` (Linux).
/// Callback will be called when any of the three API signals readiness events on the given file descriptor.
/// Uses `GetOverlappedResult` (windows), `kevent` (macOS), `epoll` (Linux) and `io_uring` (Linux).
/// Callback will be called when any of the three API signals readiness events on the given file descriptor.
/// Check @ref library_file_system_watcher for an example usage of this notification.
struct AsyncFilePoll : public AsyncRequest
{
Expand Down Expand Up @@ -675,8 +674,20 @@ struct AsyncFilePoll : public AsyncRequest
/// \snippet Libraries/Async/Tests/AsyncTest.cpp AsyncEventLoopSnippet
struct SC::AsyncEventLoop
{
struct Options
{
enum class ApiType
{
Automatic = 0,
ForceUseIOURing, // Only valid for Linux
ForceUseEpoll, // Only valid for Linux
};
ApiType apiType;

Options() { apiType = ApiType::Automatic; }
};
/// Creates the event loop kernel object
[[nodiscard]] Result create();
[[nodiscard]] Result create(Options options = Options());

/// Closes the event loop kernel object
[[nodiscard]] Result close();
Expand Down Expand Up @@ -717,11 +728,6 @@ struct SC::AsyncEventLoop
/// Associates a File descriptor created externally with the eventLoop.
[[nodiscard]] Result associateExternallyCreatedFileDescriptor(FileDescriptor& outDescriptor);

#if SC_PLATFORM_WINDOWS
/// Returns handle to the kernel level IO queue object.
/// Used by external systems calling OS async function themselves (FileSystemWatcher on windows for example)
[[nodiscard]] Result getLoopFileDescriptor(FileDescriptor::Handle& fileDescriptor) const;
#endif
/// Get Loop time
[[nodiscard]] Time::HighResolutionCounter getLoopTime() const { return loopTime; }

Expand All @@ -740,14 +746,30 @@ struct SC::AsyncEventLoop

Time::HighResolutionCounter loopTime;

#if SC_PLATFORM_LINUX
struct InternalPosix;
struct KernelQueuePosix;
struct InternalIoURing;
struct KernelQueueIoURing;
struct Internal;
struct KernelQueue;

#elif SC_PLATFORM_APPLE
struct InternalPosix;
struct KernelQueuePosix;
using Internal = InternalPosix;
using KernelQueue = KernelQueuePosix;
#elif SC_PLATFORM_WINDOWS
struct Internal;
struct KernelQueue;
#else
struct Internal;
struct KernelQueue;
#endif
struct InternalDefinition
{
static constexpr int Windows = 160;
static constexpr int Apple = 88;
static constexpr int Default = 304;
static constexpr int Default = 336;

static constexpr size_t Alignment = alignof(void*);

Expand Down
26 changes: 12 additions & 14 deletions Libraries/Async/Internal/AsyncEmscripten.inl
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@

struct SC::AsyncEventLoop::Internal
{
FileDescriptor loopFd;

~Internal() { SC_TRUST_RESULT(close()); }
[[nodiscard]] Result close() { return loopFd.close(); }
[[nodiscard]] Result createEventLoop() { return Result(true); }
[[nodiscard]] Result close() { return Result(true); }
[[nodiscard]] Result createEventLoop(AsyncEventLoop::Options) { return Result(true); }
[[nodiscard]] Result createSharedWatchers(AsyncEventLoop&) { return Result(true); }

[[nodiscard]] AsyncRequest* getAsyncRequest(const int&) const { return nullptr; }
[[nodiscard]] Result wakeUpFromExternalThread() { return Result(true); }
[[nodiscard]] Result associateExternallyCreatedTCPSocket(SocketDescriptor&) { return Result(true); }
[[nodiscard]] Result associateExternallyCreatedFileDescriptor(FileDescriptor&) { return Result(true); }
};

struct SC::AsyncEventLoop::KernelQueue
{
int newEvents = 0;
int events[1] = {0};
KernelQueue(Internal&) {}
uint32_t getNumEvents() const { return 0; }

[[nodiscard]] Result syncWithKernel(AsyncEventLoop&, SyncMode) { return Result(true); }
[[nodiscard]] Result validateEvent(uint32_t, bool&) { return Result(true); }

[[nodiscard]] AsyncRequest* getAsyncRequest(uint32_t) const { return nullptr; }

[[nodiscard]] Result syncWithKernel(AsyncEventLoop&, SyncMode) { return Result(false); }
[[nodiscard]] Result validateEvent(int&, bool&) { return Result(true); }
// clang-format off
template <typename T> [[nodiscard]] bool setupAsync(T&) { return true; }
template <typename T> [[nodiscard]] bool teardownAsync(T&) { return true; }
Expand All @@ -29,7 +31,3 @@ struct SC::AsyncEventLoop::KernelQueue
template <typename T> [[nodiscard]] bool cancelAsync(T&) { return true; }
// clang-format on
};

SC::Result SC::AsyncEventLoop::wakeUpFromExternalThread() { return Result(true); }
SC::Result SC::AsyncEventLoop::associateExternallyCreatedTCPSocket(SocketDescriptor&) { return Result(true); }
SC::Result SC::AsyncEventLoop::associateExternallyCreatedFileDescriptor(FileDescriptor&) { return Result(true); }
Loading

0 comments on commit 882336b

Please sign in to comment.