From ca8f2196823fde45178d97a63dbb33472b42f64c Mon Sep 17 00:00:00 2001 From: Guest0x0 Date: Wed, 3 Dec 2025 15:27:47 +0800 Subject: [PATCH] refactor thread pool, make it extensible --- src/fs/dir.mbt | 64 +- src/fs/dir_test.mbt | 8 +- src/fs/fs.mbt | 56 +- src/fs/moon.pkg.json | 1 + src/fs/pkg.generated.mbti | 10 - src/fs/seek_wbtest.mbt | 54 -- src/fs/stub.c | 12 +- src/internal/c_buffer/c_buffer.mbt | 21 + src/internal/c_buffer/pkg.generated.mbti | 5 + src/internal/c_buffer/stub.c | 4 + src/internal/event_loop/event_loop.mbt | 275 +++++- src/internal/event_loop/fs.mbt | 286 +++--- src/internal/event_loop/misc_stub.c | 8 + src/internal/event_loop/network.mbt | 30 - src/internal/event_loop/pkg.generated.mbti | 20 +- src/internal/event_loop/thread_pool.c | 832 ++---------------- src/internal/event_loop/thread_pool.mbt | 155 +--- .../{worker_wbtest.mbt => worker_test.mbt} | 8 +- src/os_error/error.mbt | 3 + src/os_error/pkg.generated.mbti | 2 + src/os_error/stub.c | 4 + src/socket/addr.mbt | 60 +- src/socket/happy_eyeball.mbt | 7 +- src/socket/socket.c | 15 + 24 files changed, 716 insertions(+), 1224 deletions(-) delete mode 100644 src/fs/seek_wbtest.mbt rename src/internal/event_loop/{worker_wbtest.mbt => worker_test.mbt} (93%) diff --git a/src/fs/dir.mbt b/src/fs/dir.mbt index 403aa03d..dc991080 100644 --- a/src/fs/dir.mbt +++ b/src/fs/dir.mbt @@ -44,7 +44,12 @@ pub async fn rmdir(path : StringView, recursive? : Bool = false) -> Unit { ///| /// A directory in file system -struct Directory(@event_loop.Directory) +#external +type Directory + +///| +#external +priv type DirectoryEntry ///| extern "C" fn Directory::close_ffi(self : Directory) -> Int = "closedir" @@ -54,29 +59,68 @@ pub fn Directory::close(self : Directory) -> Unit { guard self.close_ffi() == 0 } +///| +#borrow(path) +extern "C" fn opendir_ffi(path : Bytes) -> Directory = "opendir" + +///| +async fn opendir_aux(path : StringView, context~ : String) -> Directory { + struct Job { + path : Bytes + mut result : Directory + mut err : Int + } + let path_bytes = @encoding/utf8.encode(path) + let job : Job = { path: path_bytes, result: @c_buffer.null_ptr(), err: 0 } + @event_loop.perform_job_in_worker(job, job => { + job.result = opendir_ffi(job.path) + job.err = @os_error.get_errno() + }) catch { + err => { + if !@c_buffer.ptr_is_null(job.result) { + ignore(job.result.close_ffi()) + } + raise err + } + } + if @c_buffer.ptr_is_null(job.result) { + raise @os_error.OSError(job.err, context="\{context}: \{repr(path)}") + } + job.result +} + ///| /// Open the directory at `path`. `path` is encoded UTF8. /// If `path` is not a directory, an error will be raised pub async fn opendir(path : StringView) -> Directory { - @event_loop.opendir(path, context="@fs.opendir()") + opendir_aux(path, context="@fs.opendir()") } -///| -using @event_loop {type DirectoryEntry} - ///| extern "C" fn DirectoryEntry::name(ent : DirectoryEntry) -> Bytes = "moonbitlang_async_dirent_name" ///| -extern "C" fn DirectoryEntry::is_null(ent : DirectoryEntry) -> Bool = "moonbitlang_async_dirent_is_null" +extern "C" fn readdir_ffi(dir : Directory) -> DirectoryEntry = "readdir" ///| async fn Directory::read_next(dir : Directory) -> String? { - let dir_ent = @event_loop.readdir(dir.0, context="@fs.readdir()") - if dir_ent.is_null() { + struct Job { + dir : Directory + mut result : DirectoryEntry + mut err : Int + } + let job : Job = { dir, result: @c_buffer.null_ptr(), err: 0 } + @event_loop.perform_job_in_worker(job, job => { + @os_error.clear_errno() + job.result = readdir_ffi(job.dir) + job.err = @os_error.get_errno() + }) + if job.err != 0 { + raise @os_error.OSError(job.err, context="@fs.readdir()") + } else if @c_buffer.ptr_is_null(job.result) { None } else { - Some(@encoding/utf8.decode(dir_ent.name())) + Some(@encoding/utf8.decode(job.result.name())) } } @@ -119,7 +163,7 @@ pub async fn readdir( include_special? : Bool = false, sort? : Bool = false, ) -> Array[String] { - let dir : Directory = @event_loop.opendir(path, context="@fs.readdir()") + let dir : Directory = opendir_aux(path, context="@fs.readdir()") defer dir.close() let list = dir.read_all(include_hidden~, include_special~) if sort { diff --git a/src/fs/dir_test.mbt b/src/fs/dir_test.mbt index e20dbfb4..a5da9434 100644 --- a/src/fs/dir_test.mbt +++ b/src/fs/dir_test.mbt @@ -23,7 +23,7 @@ async test "read_all" { @json.inspect(list, content=[ "fs.mbt", "stub.c", "dir.mbt", "README.md", "utils.mbt", "dir_test.mbt", "eof_test.mbt", "README.mbt.md", "constants.mbt", "moon.pkg.json", "stat_test.mbt", "walk_test.mbt", - "mkdir_test.mbt", "access_test.mbt", "create_test.mbt", "seek_wbtest.mbt", "read_all_test.mbt", + "mkdir_test.mbt", "access_test.mbt", "create_test.mbt", "read_all_test.mbt", "realpath_test.mbt", "unimplemented.mbt", "pkg.generated.mbti", "text_file_test.mbt", "timestamp_test.mbt", "random_access_test.mbt", ]) @@ -55,8 +55,8 @@ async test "as_dir" { "utils.mbt: Regular", "dir_test.mbt: Regular", "eof_test.mbt: Regular", "README.mbt.md: Regular", "constants.mbt: Regular", "moon.pkg.json: Regular", "stat_test.mbt: Regular", "walk_test.mbt: Regular", "mkdir_test.mbt: Regular", "access_test.mbt: Regular", - "create_test.mbt: Regular", "seek_wbtest.mbt: Regular", "read_all_test.mbt: Regular", - "realpath_test.mbt: Regular", "unimplemented.mbt: Regular", "pkg.generated.mbti: Regular", - "text_file_test.mbt: Regular", "timestamp_test.mbt: Regular", "random_access_test.mbt: Regular", + "create_test.mbt: Regular", "read_all_test.mbt: Regular", "realpath_test.mbt: Regular", + "unimplemented.mbt: Regular", "pkg.generated.mbti: Regular", "text_file_test.mbt: Regular", + "timestamp_test.mbt: Regular", "random_access_test.mbt: Regular", ]) } diff --git a/src/fs/fs.mbt b/src/fs/fs.mbt index 2808d2dd..c5ada044 100644 --- a/src/fs/fs.mbt +++ b/src/fs/fs.mbt @@ -239,38 +239,6 @@ pub async fn remove(path : StringView) -> Unit { @event_loop.remove(path, context="@fs.remove()") } -///| -/// Determine how an offset is interpreted when seeking in a file: -/// - `FromStart`: absolute offset from the start of the file -/// - `FromEnd`: offset is relative to end of file -/// - `Relative`: offset is relative to current position in the file -pub(all) enum SeekMode { - FromStart = 0 - FromEnd - Relative -} - -///| -fn SeekMode::to_int(self : SeekMode) -> Int = "%identity" - -///| -/// Change current position in file for reading and writing. -/// Can only be applied to a regular file, otherwise `seek` will fail. -/// The offset is interpreted using `mode`, see `SeekMode` for more detail. -/// Current position in the file after seeking (relative to start of file) -/// will be returned. -#deprecated("use `read_at` or `write_at` instead", skip_current_package=true) -pub async fn File::seek(self : File, offset : Int64, mode~ : SeekMode) -> Int64 { - self.io.seek(offset, mode.to_int(), context="@fs.File::seek()") -} - -///| -/// Get current position in the file. Can only be applied to a regular file. -#deprecated("use `read_at` or `write_at` instead", skip_current_package=true) -pub async fn File::curr_pos(self : File) -> Int64 { - self.seek(0, mode=Relative) -} - ///| /// Get the size of the file. This method will not change position in the file. /// Can only be applied to a regular file. @@ -281,9 +249,6 @@ pub async fn File::size(self : File) -> Int64 { ///| extern "C" fn as_dir_ffi(fd : Int) -> Directory = "fdopendir" -///| -extern "C" fn Directory::is_null(dir : Directory) -> Bool = "moonbitlang_async_dir_is_null" - ///| /// Convert a file to directory. /// If the file is not a directory, an error will be raised. @@ -295,7 +260,7 @@ extern "C" fn Directory::is_null(dir : Directory) -> Bool = "moonbitlang_async_d pub fn File::as_dir(self : File) -> Directory raise { let fd = self.io.detach_from_event_loop() let dir = as_dir_ffi(fd) - if dir.is_null() { + if @c_buffer.ptr_is_null(dir) { let context = "@fs.File::as_dir()" @fd_util.close(fd, context~) @os_error.check_errno(context) @@ -406,6 +371,10 @@ pub async fn symlink(target~ : StringView, path : StringView) -> Unit { @event_loop.symlink(target, path, context="@fs.link()") } +///| +#borrow(path) +extern "C" fn chmod_ffi(path : Bytes, mode : Int) -> Int = "moonbitlang_async_chmod" + ///| /// Change the permission of a file. /// Permission is represented as an integer in UNIX permission style. @@ -414,5 +383,18 @@ pub async fn symlink(target~ : StringView, path : StringView) -> Unit { /// - users in the owner group of the file can read the file (`4`) /// - other users can do nothing to the file pub async fn chmod(path : StringView, mode : Int) -> Unit { - @event_loop.chmod(path, mode, context="@fs.chmod()") + struct Job { + path : Bytes + mode : Int + mut err : Int + } + let path_bytes = @encoding/utf8.encode(path) + let job : Job = { path: path_bytes, mode, err: 0 } + @event_loop.perform_job_in_worker(job, job => if chmod_ffi(job.path, job.mode) < + 0 { + job.err = @os_error.get_errno() + }) + if job.err != 0 { + raise @os_error.OSError(job.err, context="@fs.chmod(\{repr(path)})") + } } diff --git a/src/fs/moon.pkg.json b/src/fs/moon.pkg.json index f6a5a94d..735f293a 100644 --- a/src/fs/moon.pkg.json +++ b/src/fs/moon.pkg.json @@ -4,6 +4,7 @@ "moonbitlang/async/io", "moonbitlang/async/internal/event_loop", "moonbitlang/async/internal/fd_util", + "moonbitlang/async/internal/c_buffer", "moonbitlang/async", "moonbitlang/async/semaphore" ], diff --git a/src/fs/pkg.generated.mbti b/src/fs/pkg.generated.mbti index dd7be7a2..4e7d51ec 100644 --- a/src/fs/pkg.generated.mbti +++ b/src/fs/pkg.generated.mbti @@ -66,15 +66,11 @@ pub fn File::as_dir(Self) -> Directory raise pub async fn File::atime(Self) -> (Int64, Int) pub fn File::close(Self) -> Unit pub async fn File::ctime(Self) -> (Int64, Int) -#deprecated -pub async fn File::curr_pos(Self) -> Int64 pub fn File::fd(Self) -> Int pub fn File::kind(Self) -> FileKind pub async fn File::mtime(Self) -> (Int64, Int) pub async fn File::read_at(Self, FixedArray[Byte], position~ : Int64, offset? : Int, len? : Int) -> Int pub async fn File::read_exactly_at(Self, Int, position~ : Int64) -> Bytes -#deprecated -pub async fn File::seek(Self, Int64, mode~ : SeekMode) -> Int64 pub async fn File::size(Self) -> Int64 pub async fn File::sync(Self, only_data? : Bool) -> Unit pub async fn File::write_at(Self, BytesView, position~ : Int64) -> Unit @@ -100,12 +96,6 @@ pub(all) enum Mode { ReadWrite } -pub(all) enum SeekMode { - FromStart - FromEnd - Relative -} - pub(all) enum SyncMode { NoSync Data diff --git a/src/fs/seek_wbtest.mbt b/src/fs/seek_wbtest.mbt deleted file mode 100644 index 8474df95..00000000 --- a/src/fs/seek_wbtest.mbt +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2025 International Digital Economy Academy -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -///| -async test "basic seek" { - let path = "target/basic_seek_test" - // initialize - { - let w = create(path, permission=0o644, sync=Full) - defer w.close() - inspect(w.size(), content="0") - w.write(b"abcdef") - inspect(w.size(), content="6") - } - // first read - { - let r = open(path, mode=ReadOnly) - defer r.close() - inspect(r.size(), content="6") - inspect(r.read_all().text(), content="abcdef") - } - // update content - { - let w = open(path, mode=WriteOnly) - defer w.close() - ignore(w.seek(-2, mode=FromEnd)) - inspect(w.curr_pos(), content="4") - w.write(b"56") - ignore(w.seek(2, mode=FromStart)) - inspect(w.curr_pos(), content="2") - w.write(b"34") - ignore(w.seek(-4, mode=Relative)) - inspect(w.curr_pos(), content="0") - w.write(b"12") - } - // read content - { - let r = open(path, mode=ReadOnly) - defer r.close() - inspect(r.read_all().text(), content="123456") - } - remove(path) -} diff --git a/src/fs/stub.c b/src/fs/stub.c index 80886593..93f5b815 100644 --- a/src/fs/stub.c +++ b/src/fs/stub.c @@ -23,10 +23,6 @@ #include #include -int moonbitlang_async_dir_is_null(DIR *dir) { - return dir == 0; -} - moonbit_bytes_t moonbitlang_async_dirent_name(struct dirent *dirent) { int len = strlen(dirent->d_name); moonbit_bytes_t result = moonbit_make_bytes(len, 0); @@ -34,10 +30,6 @@ moonbit_bytes_t moonbitlang_async_dirent_name(struct dirent *dirent) { return result; } -int moonbitlang_async_dirent_is_null(struct dirent *dirent) { - return dirent == 0; -} - int moonbitlang_async_get_R_OK() { return R_OK; } @@ -85,3 +77,7 @@ int moonbitlang_async_get_O_TRUNC() { int moonbitlang_async_get_O_CREAT() { return O_CREAT; } + +int moonbitlang_async_chmod(char *path, int32_t mode) { + return chmod(path, mode); +} diff --git a/src/internal/c_buffer/c_buffer.mbt b/src/internal/c_buffer/c_buffer.mbt index 17a4b899..f4620122 100644 --- a/src/internal/c_buffer/c_buffer.mbt +++ b/src/internal/c_buffer/c_buffer.mbt @@ -48,5 +48,26 @@ extern "C" fn Buffer::null() -> Buffer = "moonbitlang_async_null_pointer" ///| pub let null : Buffer = Buffer::null() +///| +#borrow(buf) +pub extern "C" fn Buffer::is_null(buf : Buffer) -> Bool = "moonbitlang_async_is_null" + ///| pub extern "C" fn Buffer::free(buf : Buffer) = "free" + +///| +fn[X, Y] unsafe_cast(x : X) -> Y = "%identity" + +///| +/// Get a null C pointer. +/// WARNING: the result type MUST be a foreign pointer type declared with `#external` +pub fn[X] null_ptr() -> X { + unsafe_cast(null) +} + +///| +/// Check if a C pointer is null. +/// WARNING: the parameter MUST be a foreign pointer type declared with `#external` +pub fn[X] ptr_is_null(ptr : X) -> Bool { + Buffer::is_null(unsafe_cast(ptr)) +} diff --git a/src/internal/c_buffer/pkg.generated.mbti b/src/internal/c_buffer/pkg.generated.mbti index 00c844c3..d5cb98b4 100644 --- a/src/internal/c_buffer/pkg.generated.mbti +++ b/src/internal/c_buffer/pkg.generated.mbti @@ -4,6 +4,10 @@ package "moonbitlang/async/internal/c_buffer" // Values pub let null : Buffer +pub fn[X] null_ptr() -> X + +pub fn[X] ptr_is_null(X) -> Bool + // Errors // Types and methods @@ -14,6 +18,7 @@ pub fn Buffer::blit_to_bytes(Self, dst~ : FixedArray[Byte], offset~ : Int, len~ pub fn Buffer::free(Self) -> Unit #alias("_[_]") pub fn Buffer::get(Self, Int) -> Byte +pub fn Buffer::is_null(Self) -> Bool pub fn Buffer::strlen(Self) -> Int // Type aliases diff --git a/src/internal/c_buffer/stub.c b/src/internal/c_buffer/stub.c index bd8b9fd8..e60f6088 100644 --- a/src/internal/c_buffer/stub.c +++ b/src/internal/c_buffer/stub.c @@ -37,3 +37,7 @@ int32_t moonbitlang_async_strlen(char *str) { char *moonbitlang_async_null_pointer() { return 0; } + +int32_t moonbitlang_async_is_null(void *ptr) { + return ptr == 0; +} diff --git a/src/internal/event_loop/event_loop.mbt b/src/internal/event_loop/event_loop.mbt index 2a44df21..8959b8fd 100644 --- a/src/internal/event_loop/event_loop.mbt +++ b/src/internal/event_loop/event_loop.mbt @@ -19,13 +19,21 @@ priv struct EventLoop { pids : Map[Int, @coroutine.Coroutine] notify_recv : Int max_worker_count : Int - job_queue : @deque.Deque[Job] + mut job_id : Int + job_queue : @deque.Deque[QueuedJob] idle_workers : @deque.Deque[Worker] running_workers : Map[Int, Worker] jobs : Map[Int, @coroutine.Coroutine] timers : @sorted_set.SortedSet[Timer] } +///| +priv struct QueuedJob { + id : Int + data : JobData + worker : JobWorkerFunc +} + ///| struct IoHandle { mut fd : Int @@ -82,6 +90,7 @@ pub fn with_event_loop(f : async () -> Unit) -> Unit raise { pids: {}, notify_recv, max_worker_count: 1024, + job_id: 0, job_queue: @deque.new(), idle_workers: @deque.new(), running_workers: {}, @@ -142,8 +151,8 @@ fn EventLoop::poll(self : Self) -> Unit raise { match self.job_queue.pop_front() { None => self.idle_workers.push_back(worker) Some(job) => { - self.running_workers[job.id()] = worker - wake_worker(worker, job) + self.running_workers[job.id] = worker + wake_worker(worker, job.id, job.data, job.worker) } } if self.jobs.get(job_id) is Some(coro) { @@ -335,36 +344,50 @@ pub fn IoHandle::close(handle : IoHandle) -> Unit { } ///| -async fn perform_job_in_worker( - job : Job, - context~ : String, +fn[X, Y] unsafe_cast(x : X) -> Y = "%identity" + +///| +/// Run an arbitrary, blocking or time-consuming job in a worker thread, +/// so that it won't block the main thread. +/// This function is VERY DANGEROUS and must be USED WITH CARE: +/// +/// - the job payload type `X` must be a boxed MoonBit object type +/// - the job worker function MUST NOT: +/// * call other regular MoonBit function or closure +/// * call C stub function with `#owned` function +pub async fn[X] perform_job_in_worker( + job_data : X, + job_worker : FuncRef[(X) -> Unit], allow_cancel? : Bool = false, -) -> Int { +) -> Unit { guard curr_loop.val is Some(evloop) + let job_id = evloop.job_id + evloop.job_id += 1 + let job_data : JobData = unsafe_cast(job_data) + let job_worker : JobWorkerFunc = unsafe_cast(job_worker) match evloop.idle_workers.pop_front() { None if evloop.running_workers.length() > evloop.max_worker_count => - evloop.job_queue.push_back(job) + evloop.job_queue.push_back({ + id: job_id, + data: job_data, + worker: job_worker, + }) None => { - let worker = spawn_worker(job) - evloop.running_workers[job.id()] = worker + let worker = spawn_worker(job_id, job_data, job_worker) + evloop.running_workers[job_id] = worker } Some(worker) => { - evloop.running_workers[job.id()] = worker - wake_worker(worker, job) + evloop.running_workers[job_id] = worker + wake_worker(worker, job_id, job_data, job_worker) } } - evloop.jobs[job.id()] = @coroutine.current_coroutine() - defer evloop.jobs.remove(job.id()) + evloop.jobs[job_id] = @coroutine.current_coroutine() + defer evloop.jobs.remove(job_id) if allow_cancel { @coroutine.suspend() } else { @coroutine.protect_from_cancel(@coroutine.suspend) } - if job.err() > 0 { - raise @os_error.OSError(job.err(), context~) - } else { - job.ret() - } } ///| @@ -376,6 +399,24 @@ extern "C" fn read_ffi( len~ : Int, ) -> Int = "moonbitlang_async_read" +///| +priv struct ReadJob { + fd : Int + buf : FixedArray[Byte] + offset : Int + len : Int + mut ret : Int + mut err : Int +} + +///| +let read_job_worker : FuncRef[(ReadJob) -> Unit] = job => { + job.ret = read_ffi(job.fd, job.buf, offset=job.offset, len=job.len) + if job.ret < 0 { + job.err = @os_error.get_errno() + } +} + ///| pub async fn IoHandle::read( handle : IoHandle, @@ -402,8 +443,47 @@ pub async fn IoHandle::read( } ret } else { - let job = Job::read(handle.fd, buf, offset, len, position=-1) - perform_job_in_worker(job, context~) + let job : ReadJob = { fd: handle.fd, buf, offset, len, ret: 0, err: 0 } + perform_job_in_worker(job, read_job_worker) + if job.ret < 0 { + raise @os_error.OSError(job.err, context~) + } + job.ret + } +} + +///| +#borrow(buf) +extern "C" fn pread_ffi( + fd : Int, + buf : FixedArray[Byte], + offset~ : Int, + len~ : Int, + position~ : Int64, +) -> Int = "moonbitlang_async_pread" + +///| +priv struct ReadAtJob { + fd : Int + buf : FixedArray[Byte] + offset : Int + len : Int + position : Int64 + mut ret : Int + mut err : Int +} + +///| +let read_at_job_worker : FuncRef[(ReadAtJob) -> Unit] = job => { + job.ret = pread_ffi( + job.fd, + job.buf, + offset=job.offset, + len=job.len, + position=job.position, + ) + if job.ret < 0 { + job.err = @os_error.get_errno() } } @@ -417,8 +497,20 @@ pub async fn IoHandle::read_at( context~ : String, ) -> Int { guard !handle.kind.can_poll() - let job = Job::read(handle.fd, buf, offset, len, position~) - perform_job_in_worker(job, context~) + let job : ReadAtJob = { + fd: handle.fd, + buf, + offset, + len, + position, + ret: 0, + err: 0, + } + perform_job_in_worker(job, read_at_job_worker) + if job.ret < 0 { + raise @os_error.OSError(job.err, context~) + } + job.ret } ///| @@ -430,6 +522,24 @@ extern "C" fn write_ffi( len~ : Int, ) -> Int = "moonbitlang_async_write" +///| +priv struct WriteJob { + fd : Int + buf : Bytes + offset : Int + len : Int + mut ret : Int + mut err : Int +} + +///| +let write_job_worker : FuncRef[(WriteJob) -> Unit] = job => { + job.ret = write_ffi(job.fd, job.buf, offset=job.offset, len=job.len) + if job.ret < 0 { + job.err = @os_error.get_errno() + } +} + ///| pub async fn IoHandle::write( handle : IoHandle, @@ -456,8 +566,47 @@ pub async fn IoHandle::write( } ret } else { - let job = Job::write(handle.fd, buf, offset, len, position=-1) - perform_job_in_worker(job, context~) + let job : WriteJob = { fd: handle.fd, buf, offset, len, ret: 0, err: 0 } + perform_job_in_worker(job, write_job_worker) + if job.ret < 0 { + raise @os_error.OSError(job.err, context~) + } + job.ret + } +} + +///| +#borrow(buf) +extern "C" fn pwrite_ffi( + fd : Int, + buf : Bytes, + offset~ : Int, + len~ : Int, + position~ : Int64, +) -> Int = "moonbitlang_async_pwrite" + +///| +priv struct WriteAtJob { + fd : Int + buf : Bytes + offset : Int + len : Int + position : Int64 + mut ret : Int + mut err : Int +} + +///| +let write_at_job_worker : FuncRef[(WriteAtJob) -> Unit] = job => { + job.ret = pwrite_ffi( + job.fd, + job.buf, + offset=job.offset, + len=job.len, + position=job.position, + ) + if job.ret < 0 { + job.err = @os_error.get_errno() } } @@ -471,8 +620,60 @@ pub async fn IoHandle::write_at( context~ : String, ) -> Unit { guard !handle.kind.can_poll() - let job = Job::write(handle.fd, buf, offset, len, position~) - perform_job_in_worker(job, context~) |> ignore + let job : WriteAtJob = { + fd: handle.fd, + buf, + offset, + len, + position, + ret: 0, + err: 0, + } + perform_job_in_worker(job, write_at_job_worker) + if job.ret < 0 { + raise @os_error.OSError(job.err, context~) + } +} + +///| +#borrow(path, args, env, cwd) +extern "C" fn spawn_ffi( + path : Bytes, + args : FixedArray[Bytes?], + env~ : FixedArray[Bytes?], + stdin~ : Int, + stdout~ : Int, + stderr~ : Int, + cwd~ : Bytes?, +) -> Int = "moonbitlang_async_spawn_process" + +///| +priv struct SpawnJob { + path : Bytes + args : FixedArray[Bytes?] + env : FixedArray[Bytes?] + stdin : Int + stdout : Int + stderr : Int + cwd : Bytes? + mut pid : Int + mut err : Int +} + +///| +let spawn_job_worker : FuncRef[(SpawnJob) -> Unit] = job => { + job.pid = spawn_ffi( + job.path, + job.args, + env=job.env, + stdin=job.stdin, + stdout=job.stdout, + stderr=job.stderr, + cwd=job.cwd, + ) + if job.pid < 0 { + job.err = @os_error.get_errno() + } } ///| @@ -486,10 +687,22 @@ pub async fn spawn( cwd~ : Bytes?, context~ : String, ) -> Int { - perform_job_in_worker( - Job::spawn(path, args, env, stdin, stdout, stderr, cwd), - context~, - ) + let job : SpawnJob = { + path, + args, + env, + stdin, + stdout, + stderr, + cwd, + pid: 0, + err: 0, + } + perform_job_in_worker(job, spawn_job_worker) + if job.pid < 0 { + raise @os_error.OSError(job.err, context~) + } + job.pid } ///| diff --git a/src/internal/event_loop/fs.mbt b/src/internal/event_loop/fs.mbt index 09026ab9..34f75f75 100644 --- a/src/internal/event_loop/fs.mbt +++ b/src/internal/event_loop/fs.mbt @@ -13,12 +13,22 @@ // limitations under the License. ///| -#external -pub type Directory +#borrow(path) +extern "C" fn open_ffi(path : Bytes, flags : Int, mode : Int) -> Int = "open" ///| -#external -pub type DirectoryEntry +#borrow(stat) +extern "C" fn fstat_sync_ffi(fd : Int, stat : @fd_util.Stat) -> Int = "fstat" + +///| +priv struct OpenJob { + path : Bytes + flags : Int + mode : Int + stat : @fd_util.Stat + mut ret : Int + mut err : Int +} ///| pub async fn open( @@ -29,14 +39,37 @@ pub async fn open( ) -> IoHandle { let path_bytes = @encoding/utf8.encode(path) let stat = @fd_util.Stat::new() - let job = Job::open(path_bytes, flags, mode, stat) - try perform_job_in_worker(job, context~) catch { - @os_error.OSError(errno, context~) => - raise @os_error.OSError(errno, context="\{context}: \{repr(path)}") - err => raise err - } noraise { - fd => IoHandle::from_fd(fd, kind=stat.kind()) + let job : OpenJob = { path: path_bytes, flags, mode, stat, ret: 0, err: 0 } + perform_job_in_worker(job, job => { + job.ret = open_ffi(job.path, job.flags, job.mode) + if job.ret < 0 { + job.err = @os_error.get_errno() + return + } + if fstat_sync_ffi(job.ret, job.stat) < 0 { + job.err = @os_error.get_errno() + return + } + }) + if job.ret < 0 { + raise @os_error.OSError(job.err, context="\{context}: \{repr(path)}") } + IoHandle::from_fd(job.ret, kind=job.stat.kind()) +} + +///| +#borrow(path, stat) +extern "C" fn stat_ffi(path : Bytes, stat : @fd_util.Stat) -> Int = "stat" + +///| +#borrow(path, stat) +extern "C" fn lstat_ffi(path : Bytes, stat : @fd_util.Stat) -> Int = "lstat" + +///| +priv struct StatJob { + path : Bytes + stat : @fd_util.Stat + mut err : Int } ///| @@ -47,161 +80,214 @@ pub async fn stat( ) -> @fd_util.Stat { let path_bytes = @encoding/utf8.encode(path) let stat = @fd_util.Stat::new() - let job = Job::stat(path_bytes, stat, follow_symlink~) - try perform_job_in_worker(job, context~) catch { - @os_error.OSError(errno, context~) => - raise @os_error.OSError(errno, context="\{context}: \{repr(path)}") - err => raise err - } noraise { - _ => () + let job : StatJob = { path: path_bytes, stat, err: 0 } + if follow_symlink { + perform_job_in_worker(job, job => if stat_ffi(job.path, job.stat) < 0 { + job.err = @os_error.get_errno() + }) + } else { + perform_job_in_worker(job, job => if lstat_ffi(job.path, job.stat) < 0 { + job.err = @os_error.get_errno() + }) + } + if job.err != 0 { + raise @os_error.OSError(job.err, context="\{context}: \{repr(path)}") } stat } +///| +priv struct FstatJob { + fd : Int + stat : @fd_util.Stat + mut err : Int +} + ///| pub async fn IoHandle::fstat( handle : IoHandle, context~ : String, ) -> @fd_util.Stat { let stat = @fd_util.Stat::new() - let job = Job::fstat(handle.fd, stat) - let _ = perform_job_in_worker(job, context~) + let job : FstatJob = { fd: handle.fd, stat, err: 0 } + perform_job_in_worker(job, job => if fstat_sync_ffi(job.fd, job.stat) < 0 { + job.err = @os_error.get_errno() + }) + if job.err != 0 { + raise @os_error.OSError(job.err, context~) + } stat } ///| -pub async fn IoHandle::seek( - handle : IoHandle, - offset : Int64, - whence : Int, - context~ : String, -) -> Int64 { - let job = Job::seek(handle.fd, offset, whence) - let _ = perform_job_in_worker(job, context~) - job.get_seek_result() -} +#borrow(path) +extern "C" fn access_ffi(path : Bytes, amode : Int) -> Int = "access" ///| -pub async fn access(path : StringView, amode~ : Int, context~ : String) -> Int { - let path_bytes = @encoding/utf8.encode(path) - perform_job_in_worker(Job::access(path_bytes, amode), context~) catch { - @os_error.OSError(errno, context~) => - raise @os_error.OSError(errno, context="\{context}: \{repr(path)}") - err => raise err - } +priv struct AccessJob { + path : Bytes + amode : Int + mut err : Int } ///| -pub async fn chmod(path : StringView, mode : Int, context~ : String) -> Unit { +pub async fn access(path : StringView, amode~ : Int, context~ : String) -> Unit { let path_bytes = @encoding/utf8.encode(path) - try perform_job_in_worker(Job::chmod(path_bytes, mode), context~) catch { - @os_error.OSError(errno, context~) => - raise @os_error.OSError(errno, context="\{context}: \{repr(path)}") - err => raise err - } noraise { - _ => () + let job : AccessJob = { path: path_bytes, amode, err: 0 } + perform_job_in_worker(job, job => if access_ffi(job.path, job.amode) < 0 { + job.err = @os_error.get_errno() + }) + if job.err != 0 { + raise @os_error.OSError(job.err, context="\{context}: \{repr(path)}") } } +///| +extern "C" fn fsync_ffi(fd : Int) -> Int = "fsync" + +///| +extern "C" fn fdatasync_ffi(fd : Int) -> Int = "fdatasync" + ///| pub async fn IoHandle::fsync( handle : IoHandle, only_data~ : Bool, context~ : String, ) -> Unit { - perform_job_in_worker(Job::fsync(handle.fd, only_data), context~) |> ignore + struct FsyncJob { + fd : Int + mut err : Int + } + let job : FsyncJob = { fd: handle.fd, err: 0 } + if only_data { + perform_job_in_worker(job, job => if fdatasync_ffi(job.fd) < 0 { + job.err = @os_error.get_errno() + }) + } else { + perform_job_in_worker(job, job => if fsync_ffi(job.fd) < 0 { + job.err = @os_error.get_errno() + }) + } + if job.err != 0 { + raise @os_error.OSError(job.err, context~) + } } +///| +#borrow(path) +extern "C" fn remove_ffi(path : Bytes) -> Int = "remove" + ///| pub async fn remove(path : StringView, context~ : String) -> Unit { + struct RemoveJob { + path : Bytes + mut err : Int + } let path_bytes = @encoding/utf8.encode(path) - try perform_job_in_worker(Job::remove(path_bytes), context~) catch { - @os_error.OSError(errno, context~) => - raise @os_error.OSError(errno, context="\{context}: \{repr(path)}") - err => raise err - } noraise { - _ => () + let job : RemoveJob = { path: path_bytes, err: 0 } + perform_job_in_worker(job, job => if remove_ffi(job.path) < 0 { + job.err = @os_error.get_errno() + }) + if job.err != 0 { + raise @os_error.OSError(job.err, context="\{context}: \{repr(path)}") } } +///| +#borrow(target, path) +extern "C" fn symlink_ffi(target : Bytes, path : Bytes) -> Int = "symlink" + ///| pub async fn symlink( target : StringView, path : StringView, context~ : String, ) -> Unit { + struct Job { + target : Bytes + path : Bytes + mut err : Int + } let target_bytes = @encoding/utf8.encode(target) let path_bytes = @encoding/utf8.encode(path) - try - perform_job_in_worker(Job::symlink(target_bytes, path_bytes), context~) - catch { - @os_error.OSError(errno, context~) => - raise @os_error.OSError( - errno, - context="\{context}: \{repr(target)} => \{repr(path)}", - ) - err => raise err - } noraise { - _ => () + let job : Job = { target: target_bytes, path: path_bytes, err: 0 } + perform_job_in_worker(job, job => if symlink_ffi(job.target, job.path) < 0 { + job.err = @os_error.get_errno() + }) + if job.err != 0 { + raise @os_error.OSError( + job.err, + context="\{context}: \{repr(target)} => \{repr(path)}", + ) } } +///| +#borrow(path) +extern "C" fn mkdir_ffi(path : Bytes, mode : Int) -> Int = "mkdir" + ///| pub async fn mkdir(path : StringView, mode~ : Int, context~ : String) -> Unit { + struct Job { + path : Bytes + mode : Int + mut err : Int + } let path_bytes = @encoding/utf8.encode(path) - try perform_job_in_worker(Job::mkdir(path_bytes, mode), context~) catch { - @os_error.OSError(errno, context~) => - raise @os_error.OSError(errno, context="\{context}: \{repr(path)}") - err => raise err - } noraise { - _ => () + let job : Job = { path: path_bytes, mode, err: 0 } + perform_job_in_worker(job, job => if mkdir_ffi(job.path, job.mode) < 0 { + job.err = @os_error.get_errno() + }) + if job.err != 0 { + raise @os_error.OSError(job.err, context="\{context}: \{repr(path)}") } } ///| -pub async fn rmdir(path : StringView, context~ : String) -> Unit { - let path_bytes = @encoding/utf8.encode(path) - try perform_job_in_worker(Job::rmdir(path_bytes), context~) catch { - @os_error.OSError(errno, context~) => - raise @os_error.OSError(errno, context="\{context}: \{repr(path)}") - err => raise err - } noraise { - _ => () - } -} +#borrow(path) +extern "C" fn rmdir_ffi(path : Bytes) -> Int = "rmdir" ///| -pub async fn opendir(path : StringView, context~ : String) -> Directory { +pub async fn rmdir(path : StringView, context~ : String) -> Unit { + struct Job { + path : Bytes + mut err : Int + } let path_bytes = @encoding/utf8.encode(path) - let job = Job::opendir(path_bytes) - let _ = perform_job_in_worker(job, context~) catch { - @os_error.OSError(errno, context~) => - raise @os_error.OSError(errno, context="\{context}: \{repr(path)}") - err => raise err + let job : Job = { path: path_bytes, err: 0 } + perform_job_in_worker(job, job => if rmdir_ffi(job.path) < 0 { + job.err = @os_error.get_errno() + }) + if job.err != 0 { + raise @os_error.OSError(job.err, context="\{context}: \{repr(path)}") } - job.get_opendir_result() } ///| -pub async fn readdir(dir : Directory, context~ : String) -> DirectoryEntry { - let job = Job::readdir(dir) - let _ = perform_job_in_worker(job, context~) - job.get_readdir_result() -} +#borrow(path, out) +extern "C" fn realpath_ffi(path : Bytes, out : Bytes?) -> @c_buffer.Buffer = "realpath" ///| pub async fn realpath(path : StringView, context~ : String) -> Bytes { + struct Job { + path : Bytes + mut result : @c_buffer.Buffer + mut err : Int + } let path_bytes = @encoding/utf8.encode(path) - let job = Job::realpath(path_bytes) - let _ = perform_job_in_worker(job, context~) catch { - @os_error.OSError(errno, context~) => - raise @os_error.OSError(errno, context="\{context}: \{repr(path)}") - err => raise err - } - let c_path = job.get_realpath_result() - defer c_path.free() - let len = c_path.strlen() + let job : Job = { path: path_bytes, result: @c_buffer.null, err: 0 } + perform_job_in_worker(job, job => { + job.result = realpath_ffi(job.path, None) + if job.result.is_null() { + job.err = @os_error.get_errno() + } + }) + if job.err != 0 { + raise @os_error.OSError(job.err, context="\{context}: \{repr(path)}") + } + defer job.result.free() + let len = job.result.strlen() let result = FixedArray::make(len, b'\x00') - c_path.blit_to_bytes(dst=result, offset=0, len~) + job.result.blit_to_bytes(dst=result, offset=0, len~) result.unsafe_reinterpret_as_bytes() } diff --git a/src/internal/event_loop/misc_stub.c b/src/internal/event_loop/misc_stub.c index 314fddfd..b9db1c6f 100644 --- a/src/internal/event_loop/misc_stub.c +++ b/src/internal/event_loop/misc_stub.c @@ -48,6 +48,14 @@ int moonbitlang_async_write(int fd, char *buf, int offset, int len) { return write(fd, buf + offset, len); } +int moonbitlang_async_pread(int fd, char *buf, int offset, int len, int64_t pos) { + return pread(fd, buf + offset, len, pos); +} + +int moonbitlang_async_pwrite(int fd, char *buf, int offset, int len, int64_t pos) { + return pwrite(fd, buf + offset, len, pos); +} + int moonbitlang_async_recvfrom( int sock, char *buf, diff --git a/src/internal/event_loop/network.mbt b/src/internal/event_loop/network.mbt index 4fc14821..a8e29edc 100644 --- a/src/internal/event_loop/network.mbt +++ b/src/internal/event_loop/network.mbt @@ -141,33 +141,3 @@ pub async fn IoHandle::accept( } IoHandle::from_fd(conn, kind=Socket) } - -///| -#external -pub type AddrInfo - -///| -extern "C" fn gai_strerror(code : Int) -> @c_buffer.Buffer = "gai_strerror" - -///| -pub async fn getaddrinfo( - host : StringView, - context~ : String, -) -> Result[AddrInfo, String] { - let host_bytes = @encoding/utf8.encode(host) - let job = Job::getaddrinfo(host_bytes) - let ret = perform_job_in_worker(job, allow_cancel=true, context~) catch { - @os_error.OSError(errno, context~) => - raise @os_error.OSError(errno, context="\{context}: \{repr(host)}") - err => raise err - } - if ret != 0 { - let c_str = gai_strerror(ret) - let len = c_str.strlen() - let bytes = FixedArray::make(len, b'\x00') - c_str.blit_to_bytes(dst=bytes, offset=0, len~) - bytes.unsafe_reinterpret_as_bytes() |> @encoding/utf8.decode |> Err - } else { - Ok(job.get_getaddrinfo_result()) - } -} diff --git a/src/internal/event_loop/pkg.generated.mbti b/src/internal/event_loop/pkg.generated.mbti index 2803f47c..2acee54d 100644 --- a/src/internal/event_loop/pkg.generated.mbti +++ b/src/internal/event_loop/pkg.generated.mbti @@ -6,19 +6,13 @@ import( ) // Values -pub async fn access(StringView, amode~ : Int, context~ : String) -> Int - -pub async fn chmod(StringView, Int, context~ : String) -> Unit - -pub async fn getaddrinfo(StringView, context~ : String) -> Result[AddrInfo, String] +pub async fn access(StringView, amode~ : Int, context~ : String) -> Unit pub async fn mkdir(StringView, mode~ : Int, context~ : String) -> Unit pub async fn open(StringView, flags~ : Int, mode~ : Int, context~ : String) -> IoHandle -pub async fn opendir(StringView, context~ : String) -> Directory - -pub async fn readdir(Directory, context~ : String) -> DirectoryEntry +pub async fn[X] perform_job_in_worker(X, FuncRef[(X) -> Unit], allow_cancel? : Bool) -> Unit pub async fn realpath(StringView, context~ : String) -> Bytes @@ -47,15 +41,6 @@ pub fn with_event_loop(async () -> Unit) -> Unit raise // Errors // Types and methods -#external -pub type AddrInfo - -#external -pub type Directory - -#external -pub type DirectoryEntry - type IoHandle pub async fn IoHandle::accept(Self, Bytes, context~ : String) -> Self pub fn IoHandle::close(Self) -> Unit @@ -70,7 +55,6 @@ pub fn IoHandle::kind(Self) -> @fd_util.FileKind pub async fn IoHandle::read(Self, FixedArray[Byte], offset~ : Int, len~ : Int, context~ : String) -> Int pub async fn IoHandle::read_at(Self, FixedArray[Byte], offset~ : Int, len~ : Int, position~ : Int64, context~ : String) -> Int pub async fn IoHandle::recvfrom(Self, FixedArray[Byte], offset~ : Int, len~ : Int, addr~ : Bytes, context~ : String) -> Int -pub async fn IoHandle::seek(Self, Int64, Int, context~ : String) -> Int64 pub async fn IoHandle::sendto(Self, Bytes, offset~ : Int, len~ : Int, addr~ : Bytes, context~ : String) -> Int pub async fn IoHandle::write(Self, Bytes, offset~ : Int, len~ : Int, context~ : String) -> Int pub async fn IoHandle::write_at(Self, Bytes, offset~ : Int, len~ : Int, position~ : Int64, context~ : String) -> Unit diff --git a/src/internal/event_loop/thread_pool.c b/src/internal/event_loop/thread_pool.c index 77fad7fb..97bbbb21 100644 --- a/src/internal/event_loop/thread_pool.c +++ b/src/internal/event_loop/thread_pool.c @@ -53,34 +53,14 @@ struct job { // used to find the waiter of a job int32_t job_id; - // the return value of the job. - // should be set by the worker and read by waiter. - // for result that cannot fit in an integer, - // jobs can also store extra result in their payload - int32_t ret; - - // the error code of the job. - // should be zefo iff the job succeeds - int32_t err; + // data passed to job worker + void *data; // the worker that actually performs the job. - // it will receive the job itself as parameter. - // extra payload can be placed after the header fields in `struct job` - void (*worker)(struct job*); + // it will receive `data` as parameter. + void (*worker)(void *); }; -int32_t moonbitlang_async_job_get_id(struct job *job) { - return job->job_id; -} - -int64_t moonbitlang_async_job_get_ret(struct job *job) { - return job->ret; -} - -int32_t moonbitlang_async_job_get_err(struct job *job) { - return job->err; -} - // ======================================================= // =================== the thread pool =================== // ======================================================= @@ -94,12 +74,11 @@ struct { sigset_t wakeup_signal; sigset_t old_sigmask; #endif - int32_t job_id; } pool; struct worker { pthread_t id; - struct job *job; + struct job job; int waiting; #ifdef WAKEUP_METHOD_COND_VAR pthread_mutex_t mutex; @@ -112,19 +91,16 @@ void *worker_loop(void *data) { int sig; struct worker *self = (struct worker*)data; - struct job *job = self->job; + struct job job = self->job; #ifdef WAKEUP_METHOD_COND_VAR pthread_mutex_init(&(self->mutex), 0); pthread_cond_init(&(self->cond), 0); #endif - while (job) { - int job_id = job->job_id; - job->ret = 0; - job->err = 0; - - job->worker(job); + while (job.worker) { + int job_id = job.job_id; + job.worker(job.data); self->waiting = 1; write(pool.notify_send, &job_id, sizeof(int)); @@ -150,9 +126,16 @@ void *worker_loop(void *data) { return 0; } -void moonbitlang_async_wake_worker(struct worker *worker, struct job *job) { - moonbit_decref(worker->job); - worker->job = job; +void moonbitlang_async_wake_worker( + struct worker *worker, + int32_t job_id, + void *job_data, + void (*job_worker)(void*) +) { + moonbit_decref(worker->job.data); + worker->job.job_id = job_id; + worker->job.data = job_data; + worker->job.worker = job_worker; #ifdef WAKEUP_METHOD_SIGNAL pthread_kill(worker->id, SIGUSR1); #elif defined(WAKEUP_METHOD_COND_VAR) @@ -167,8 +150,6 @@ void moonbitlang_async_init_thread_pool(int notify_send) { if (pool.initialized) abort(); - pool.job_id = 0; - #ifdef WAKEUP_METHOD_SIGNAL sigemptyset(&pool.wakeup_signal); sigaddset(&pool.wakeup_signal, SIGUSR1); @@ -195,14 +176,12 @@ void moonbitlang_async_destroy_thread_pool() { #ifdef WAKEUP_METHOD_SIGNAL pthread_sigmask(SIG_SETMASK, &pool.old_sigmask, 0); #endif - - pool.job_id = 0; } void free_worker(void *target) { struct worker *worker = (struct worker*)target; // terminate the worker - moonbitlang_async_wake_worker(worker, 0); + moonbitlang_async_wake_worker(worker, 0, 0, 0); pthread_join(worker->id, 0); #ifdef WAKEUP_METHOD_COND_VAR pthread_mutex_destroy(&(worker->mutex)); @@ -210,7 +189,11 @@ void free_worker(void *target) { #endif } -struct worker *moonbitlang_async_spawn_worker(struct job *init_job) { +struct worker *moonbitlang_async_spawn_worker( + int32_t init_job_id, + void *init_job_data, + void (*init_job_worker)(void*) +) { pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, 512); @@ -219,7 +202,9 @@ struct worker *moonbitlang_async_spawn_worker(struct job *init_job) { &free_worker, sizeof(struct worker) ); - worker->job = init_job; + worker->job.job_id = init_job_id; + worker->job.data = init_job_data; + worker->job.worker = init_job_worker; worker->waiting = 0; pthread_create(&(worker->id), &attr, &worker_loop, worker); pthread_attr_destroy(&attr); @@ -239,42 +224,12 @@ int32_t moonbitlang_async_fetch_completion(int notify_recv) { // ===================== concrete jobs ===================== // ========================================================= -static -struct job *make_job( - int32_t size, - void (*free_job)(void*), - void (*worker)(struct job*) -) { - struct job *job = (struct job*)moonbit_make_external_object( - free_job, - size - ); - job->job_id = pool.job_id++; - job->ret = 0; - job->err = 0; - job->worker = worker; - return job; -} - -#define MAKE_JOB(name) (struct name##_job*)make_job(\ - sizeof(struct name##_job),\ - free_##name##_job,\ - name##_job_worker\ -) - // ===== sleep job, sleep via thread pool, for testing only ===== -struct sleep_job { - struct job job; +void moonbitlang_async_blocking_sleep(int32_t ms) { struct timespec duration; -}; - -static -void free_sleep_job(void *job) {} - -static -void sleep_job_worker(struct job *job) { - struct timespec duration = ((struct sleep_job*)job)->duration; + duration.tv_sec = ms / 1000; + duration.tv_nsec = (ms % 1000) * 1000000; #ifdef __MACH__ // On GitHub CI MacOS runner, `nanosleep` is very imprecise, // causing corrupted test result. @@ -290,620 +245,19 @@ void sleep_job_worker(struct job *job) { #endif } -struct sleep_job *moonbitlang_async_make_sleep_job(int ms) { - struct sleep_job *job = MAKE_JOB(sleep); - job->duration.tv_sec = ms / 1000; - job->duration.tv_nsec = (ms % 1000) * 1000000; - return job; -} - -// ===== read job, for reading non-pollable stuff ===== - -struct read_job { - struct job job; - int fd; - char *buf; - int offset; - int len; - int64_t position; -}; - -static -void free_read_job(void *obj) { - struct read_job *job = (struct read_job*)obj; - moonbit_decref(job->buf); -} - -static -void read_job_worker(struct job *job) { - struct read_job *read_job = (struct read_job*)job; - if (read_job->position < 0) { - job->ret = read(read_job->fd, read_job->buf + read_job->offset, read_job->len); - } else { - job->ret = pread( - read_job->fd, - read_job->buf + read_job->offset, - read_job->len, - read_job->position - ); - } - if (job->ret < 0) - job->err = errno; -} - -struct read_job *moonbitlang_async_make_read_job( - int fd, - char *buf, - int offset, - int len, - int64_t position -) { - struct read_job *job = MAKE_JOB(read); - job->fd = fd; - job->buf = buf; - job->offset = offset; - job->len = len; - job->position = position; - return job; -} - -// ===== write job, for writing non-pollable stuff ===== - -struct write_job { - struct job job; - int fd; - char *buf; - int offset; - int len; - int64_t position; -}; - -static -void free_write_job(void *obj) { - struct write_job *job = (struct write_job*)obj; - moonbit_decref(job->buf); -} - -static -void write_job_worker(struct job *job) { - struct write_job *write_job = (struct write_job*)job; - if (write_job->position < 0) { - job->ret = write( - write_job->fd, - write_job->buf + write_job->offset, - write_job->len - ); - } else { - job->ret = pwrite( - write_job->fd, - write_job->buf + write_job->offset, - write_job->len, - write_job->position - ); - } - if (job->ret < 0) - job->err = errno; -} - -struct write_job *moonbitlang_async_make_write_job( - int fd, - char *buf, - int offset, - int len, - int64_t position -) { - struct write_job *job = MAKE_JOB(write); - job->fd = fd; - job->buf = buf; - job->offset = offset; - job->len = len; - job->position = position; - return job; -} - -// ===== open job ===== - -struct open_job { - struct job job; - char *filename; - int flags; - int mode; - void *stat_out; -}; - -static -void free_open_job(void *obj) { - struct open_job *job = (struct open_job*)obj; - moonbit_decref(job->filename); - moonbit_decref(job->stat_out); -} - -static -void open_job_worker(struct job *job) { - struct open_job *open_job = (struct open_job*)job; - job->ret = open( - open_job->filename, - open_job->flags | O_CLOEXEC, - open_job->mode - ); - if (job->ret < 0) { - job->err = errno; - return; - } - if (fstat(job->ret, open_job->stat_out) < 0) { - job->err = errno; - } -} - -struct open_job *moonbitlang_async_make_open_job( - char *filename, - int flags, - int mode, - int *stat_out -) { - struct open_job *job = MAKE_JOB(open); - job->filename = filename; - job->flags = flags; - job->mode = mode; - job->stat_out = stat_out; - return job; -} - -// ===== stat job, get info of file path ===== - -struct stat_job { - struct job job; - char *path; - void *out; - int follow_symlink; -}; - -static -void free_stat_job(void *obj) { - struct stat_job *job = (struct stat_job*)obj; - moonbit_decref(job->path); - moonbit_decref(job->out); -} - -static -void stat_job_worker(struct job *job) { - struct stat_job *stat_job = (struct stat_job*)job; - if (stat_job->follow_symlink) { - job->ret = stat(stat_job->path, stat_job->out); - } else { - job->ret = lstat(stat_job->path, stat_job->out); - } - if (job->ret < 0) - job->err = errno; -} +// ===== spawn job, spawn foreign process ===== -struct stat_job *moonbitlang_async_make_stat_job( +int32_t moonbitlang_async_spawn_process( char *path, - void *out, - int follow_symlink -) { - struct stat_job *job = MAKE_JOB(stat); - job->path = path; - job->out = out; - job->follow_symlink = follow_symlink; - return job; -} - -// ===== fstat job, get info of file descriptor ===== - -struct fstat_job { - struct job job; - int fd; - void *out; -}; - -static -void free_fstat_job(void *obj) { - struct fstat_job *job = (struct fstat_job*)obj; - moonbit_decref(job->out); -} - -static -void fstat_job_worker(struct job *job) { - struct fstat_job *fstat_job = (struct fstat_job*)job; - job->ret = fstat(fstat_job->fd, fstat_job->out); - if (job->ret < 0) - job->err = errno; -} - -struct fstat_job *moonbitlang_async_make_fstat_job(int fd, void *out) { - struct fstat_job *job = MAKE_JOB(fstat); - job->fd = fd; - job->out = out; - return job; -} - -// ===== seek job, move cursor within opened file ===== - -struct seek_job { - struct job job; - int fd; - int64_t offset; - int whence; - int64_t result; -}; - -static -void free_seek_job(void *obj) {} - -static -void seek_job_worker(struct job *job) { - static int whence_list[] = { SEEK_SET, SEEK_END, SEEK_CUR }; - - struct seek_job *seek_job = (struct seek_job*)job; - seek_job->result = lseek( - seek_job->fd, - seek_job->offset, - whence_list[seek_job->whence] - ); - if (seek_job->result < 0) { - job->err = errno; - } -} - -struct seek_job *moonbitlang_async_make_seek_job( - int fd, - int64_t offset, - int whence + char **args, + char **envp, + int stdin_fd, + int stdout_fd, + int stderr_fd, + char *cwd ) { - struct seek_job *job = MAKE_JOB(seek); - job->fd = fd; - job->offset = offset; - job->whence = whence; - return job; -} - -int64_t moonbitlang_async_get_seek_result(struct seek_job *job) { - return job->result; -} - -// ===== access job, test permission of file path ===== - -struct access_job { - struct job job; - char *path; - int amode; -}; - -static -void free_access_job(void *obj) { - struct access_job *job = (struct access_job*)obj; - moonbit_decref(job->path); -} - -static -void access_job_worker(struct job *job) { - struct access_job *access_job = (struct access_job*)job; - job->ret = access(access_job->path, access_job->amode); - if (job->ret < 0) - job->err = errno; -} - -struct access_job *moonbitlang_async_make_access_job(char *path, int amode) { - struct access_job *job = MAKE_JOB(access); - job->path = path; - job->amode = amode; - return job; -} - -// ===== chmod job, change permission of file ===== - -struct chmod_job { - struct job job; - char *path; - mode_t mode; -}; - -static -void free_chmod_job(void *obj) { - struct chmod_job *job = (struct chmod_job*)obj; - moonbit_decref(job->path); -} - -static -void chmod_job_worker(struct job *job) { - struct chmod_job *chmod_job = (struct chmod_job*)job; - job->ret = chmod(chmod_job->path, chmod_job->mode); - if (job->ret < 0) - job->err = errno; -} - -struct chmod_job *moonbitlang_async_make_chmod_job(char *path, int mode) { - struct chmod_job *job = MAKE_JOB(chmod); - job->path = path; - job->mode = mode; - return job; -} - - -// ===== fsync job, synchronize file modification to disk ===== - -struct fsync_job { - struct job job; - int fd; - int only_data; -}; - -static -void free_fsync_job(void *obj) {} - -static -void fsync_job_worker(struct job *job) { - struct fsync_job *fsync_job = (struct fsync_job*)job; -#ifdef __MACH__ - // it seems that `fdatasync` is not available on some MacOS versions - job->ret = fsync(fsync_job->fd); -#else - if (fsync_job->only_data) { - job->ret = fdatasync(fsync_job->fd); - } else { - job->ret = fsync(fsync_job->fd); - } -#endif - if (job->ret < 0) - job->err = errno; -} - -struct fsync_job *moonbitlang_async_make_fsync_job(int fd, int only_data) { - struct fsync_job *job = MAKE_JOB(fsync); - job->fd = fd; - job->only_data = only_data; - return job; -} - -// ===== remove job, remove file from file system ===== - -struct remove_job { - struct job job; - char *path; -}; - -static -void free_remove_job(void *obj) { - struct remove_job *job = (struct remove_job*)obj; - moonbit_decref(job->path); -} - -static -void remove_job_worker(struct job *job) { - struct remove_job *remove_job = (struct remove_job*)job; - job->ret = remove(remove_job->path); - if (job->ret < 0) - job->err = errno; -} - -struct remove_job *moonbitlang_async_make_remove_job(char *path) { - struct remove_job *job = MAKE_JOB(remove); - job->path = path; - return job; -} - -// ===== symlink job, create symbolic link ===== - -struct symlink_job { - struct job job; - char *target; - char *path; -}; - -static -void free_symlink_job(void *obj) { - struct symlink_job *job = (struct symlink_job*)obj; - moonbit_decref(job->target); - moonbit_decref(job->path); -} - -static -void symlink_job_worker(struct job *job) { - struct symlink_job *symlink_job = (struct symlink_job*)job; - job->ret = symlink(symlink_job->target, symlink_job->path); - if (job->ret < 0) - job->err = errno; -} - -struct symlink_job *moonbitlang_async_make_symlink_job(char *target, char *path) { - struct symlink_job *job = MAKE_JOB(symlink); - job->target = target; - job->path = path; - return job; -} - - -// ===== mkdir job, create new directory ===== - -struct mkdir_job { - struct job job; - char *path; - int mode; -}; - -static -void free_mkdir_job(void *obj) { - struct mkdir_job *job = (struct mkdir_job*)obj; - moonbit_decref(job->path); -} - -static -void mkdir_job_worker(struct job *job) { - struct mkdir_job *mkdir_job = (struct mkdir_job*)job; - job->ret = mkdir(mkdir_job->path, mkdir_job->mode); - if (job->ret < 0) - job->err = errno; -} - -struct mkdir_job *moonbitlang_async_make_mkdir_job(char *path, int mode) { - struct mkdir_job *job = MAKE_JOB(mkdir); - job->path = path; - job->mode = mode; - return job; -} - -// ===== rmdir job, remove directory ===== - -struct rmdir_job { - struct job job; - char *path; -}; - -static -void free_rmdir_job(void *obj) { - struct rmdir_job *job = (struct rmdir_job*)obj; - moonbit_decref(job->path); -} - -static -void rmdir_job_worker(struct job *job) { - struct rmdir_job *rmdir_job = (struct rmdir_job*)job; - job->ret = rmdir(rmdir_job->path); - if (job->ret < 0) - job->err = errno; -} - -struct rmdir_job *moonbitlang_async_make_rmdir_job(char *path) { - struct rmdir_job *job = MAKE_JOB(rmdir); - job->path = path; - return job; -} - -// ===== opendir job, open directory ===== - -struct opendir_job { - struct job job; - char *path; - DIR *result; - - // if the waiter is cancelled before `opendir` succeed, - // we need to call `closedir` to free resource on the result. - // however, if the waiter is not cancelled, - // the ownership of the result should be transferred to the waiter. - // here we use a flag `result_fetched` to determine which case it is. - int result_fetched; -}; - -static -void free_opendir_job(void *obj) { - struct opendir_job *job = (struct opendir_job*)obj; - moonbit_decref(job->path); - if (job->result && !(job->result_fetched)) - closedir(job->result); -} - -static -void opendir_job_worker(struct job *job) { - struct opendir_job *opendir_job = (struct opendir_job*)job; - opendir_job->result = opendir(opendir_job->path); - if (!(opendir_job->result)) { - job->err = errno; - } -} - -struct opendir_job *moonbitlang_async_make_opendir_job(char *path) { - struct opendir_job *job = MAKE_JOB(opendir); - job->path = path; - job->result_fetched = 0; - return job; -} - -DIR *moonbitlang_async_get_opendir_result(struct opendir_job *job) { - job->result_fetched = 1; - return job->result; -} - -// ===== readdir job, read directory entry ===== - -struct readdir_job { - struct job job; - DIR *dir; - struct dirent *result; -}; - -static -void free_readdir_job(void *obj) {} - -static -void readdir_job_worker(struct job *job) { - struct readdir_job *readdir_job = (struct readdir_job*)job; - errno = 0; - readdir_job->result = readdir(readdir_job->dir); - if (readdir_job->result == 0 && errno) { - job->ret = -1; - job->err = errno; - } -} - -struct readdir_job *moonbitlang_async_make_readdir_job(DIR *dir) { - struct readdir_job *job = MAKE_JOB(readdir); - job->dir = dir; - return job; -} - -struct dirent *moonbitlang_async_get_readdir_result(struct readdir_job *job) { - return job->result; -} - -// ===== realpath job, get canonical representation of a path ===== - -struct realpath_job { - struct job job; - char *path; - char *result; -}; - -static -void free_realpath_job(void *obj) { - struct realpath_job *job = (struct realpath_job*)obj; - moonbit_decref(job->path); -} - -static -void realpath_job_worker(struct job *job) { - struct realpath_job *realpath_job = (struct realpath_job*)job; - realpath_job->result = realpath(realpath_job->path, 0); - if (!realpath_job->result) { - job->ret = -1; - job->err = errno; - } -} - -struct realpath_job *moonbitlang_async_make_realpath_job(char *path) { - struct realpath_job *job = MAKE_JOB(realpath); - job->path = path; - return job; -} - -char *moonbitlang_async_get_realpath_result(struct realpath_job *job) { - return job->result; -} - -// ===== spawn job, spawn foreign process ===== - -struct spawn_job { - struct job job; - char *path; - char **args; - char **envp; - int stdio[3]; - char *cwd; -}; - -static -void free_spawn_job(void *obj) { - struct spawn_job *job = (struct spawn_job*)obj; - moonbit_decref(job->path); - moonbit_decref(job->args); - moonbit_decref(job->envp); - if (job->cwd) - moonbit_decref(job->cwd); -} - -static -void spawn_job_worker(struct job *job) { - struct spawn_job *spawn_job = (struct spawn_job *)job; + int32_t pid = -1; + int32_t err = 0; posix_spawnattr_t attr; posix_spawnattr_init(&attr); #ifdef WAKEUP_METHOD_SIGNAL @@ -925,102 +279,44 @@ void spawn_job_worker(struct job *job) { posix_spawn_file_actions_t file_actions; posix_spawn_file_actions_init(&file_actions); + + int stdio_fd[3] = { stdin_fd, stdout_fd, stderr_fd }; for (int i = 0; i < 3; ++i) { - int fd = spawn_job->stdio[i]; + int fd = stdio_fd[i]; if (fd >= 0) { - job->err = posix_spawn_file_actions_adddup2(&file_actions, fd, i); - if (job->err) goto exit; + err = posix_spawn_file_actions_adddup2(&file_actions, fd, i); + if (err) goto exit; } } - if (spawn_job->cwd) { - job->err = posix_spawn_file_actions_addchdir_np(&file_actions, spawn_job->cwd); - if (job->err) goto exit; + if (cwd) { + err = posix_spawn_file_actions_addchdir_np(&file_actions, cwd); + if (err) goto exit; } - if (strchr(spawn_job->path, '/')) { - job->err = posix_spawn( - &(job->ret), - spawn_job->path, + if (strchr(path, '/')) { + err = posix_spawn( + &pid, + path, &file_actions, &attr, - spawn_job->args, - spawn_job->envp + args, + envp ); } else { - job->err = posix_spawnp( - &(job->ret), - spawn_job->path, + err = posix_spawnp( + &pid, + path, &file_actions, &attr, - spawn_job->args, - spawn_job->envp + args, + envp ); } exit: posix_spawnattr_destroy(&attr); posix_spawn_file_actions_destroy(&file_actions); -} - -struct spawn_job *moonbitlang_async_make_spawn_job( - char *path, - char **args, - char **envp, - int stdin_fd, - int stdout_fd, - int stderr_fd, - char *cwd -) { - struct spawn_job *job = MAKE_JOB(spawn); - job->path = path; - job->args = args; - job->envp = envp; - job->stdio[0] = stdin_fd; - job->stdio[1] = stdout_fd; - job->stdio[2] = stderr_fd; - job->cwd = cwd; - return job; -} - -// ===== getaddrinfo job, resolve host name via `getaddrinfo` ===== - -struct getaddrinfo_job { - struct job job; - char *hostname; - struct addrinfo *result; -}; - -static -void free_getaddrinfo_job(void *obj) { - struct getaddrinfo_job *job = (struct getaddrinfo_job*)obj; - moonbit_decref(job->hostname); -} - -static -void getaddrinfo_job_worker(struct job *job) { - struct getaddrinfo_job *getaddrinfo_job = (struct getaddrinfo_job*)job; - struct addrinfo hint = { - AI_ADDRCONFIG, // ai_flags - AF_UNSPEC, // ai_family, support both IPv4 and IPv6 - 0, // ai_socktype - 0, // ai_protocol - 0, 0, 0, 0 - }; - job->ret = getaddrinfo( - getaddrinfo_job->hostname, - 0, - &hint, - &(getaddrinfo_job->result) - ); - if (job->ret == EAI_SYSTEM) - job->err = errno; -} - -struct getaddrinfo_job *moonbitlang_async_make_getaddrinfo_job(char *hostname) { - struct getaddrinfo_job *job = MAKE_JOB(getaddrinfo); - job->hostname = hostname; - return job; -} - -struct addrinfo *moonbitlang_async_get_getaddrinfo_result(struct getaddrinfo_job *job) { - return job->result; + if (err) { + errno = err; + } + return pid; } diff --git a/src/internal/event_loop/thread_pool.mbt b/src/internal/event_loop/thread_pool.mbt index a01a5407..967e81b6 100644 --- a/src/internal/event_loop/thread_pool.mbt +++ b/src/internal/event_loop/thread_pool.mbt @@ -22,148 +22,29 @@ extern "C" fn init_thread_pool_ffi(notify_send : Int) = "moonbitlang_async_init_ extern "C" fn destroy_thread_pool() = "moonbitlang_async_destroy_thread_pool" ///| -#owned(init_job) -extern "C" fn spawn_worker(init_job : Job) -> Worker = "moonbitlang_async_spawn_worker" +priv type JobData ///| -#borrow(worker) -#owned(job) -extern "C" fn wake_worker(worker : Worker, job : Job) = "moonbitlang_async_wake_worker" - -///| -extern "C" fn fetch_completion_ffi(notify_recv : Int) -> Int = "moonbitlang_async_fetch_completion" - -///| -priv type Job - -///| -#borrow(self) -extern "C" fn Job::id(self : Job) -> Int = "moonbitlang_async_job_get_id" - -///| -#borrow(self) -extern "C" fn Job::ret(self : Job) -> Int = "moonbitlang_async_job_get_ret" - -///| -#borrow(self) -extern "C" fn Job::err(self : Job) -> Int = "moonbitlang_async_job_get_err" - -///| -#owned(buf) -extern "C" fn Job::read( - fd : Int, - buf : FixedArray[Byte], - offset : Int, - len : Int, - // `-1` indicates no offset - position~ : Int64, -) -> Job = "moonbitlang_async_make_read_job" - -///| -#owned(buf) -extern "C" fn Job::write( - fd : Int, - buf : Bytes, - offset : Int, - len : Int, - // `-1` indicates no offset - position~ : Int64, -) -> Job = "moonbitlang_async_make_write_job" - -///| -#owned(filename, stat) -extern "C" fn Job::open( - filename : Bytes, - flags : Int, - mode : Int, - stat : @fd_util.Stat, -) -> Job = "moonbitlang_async_make_open_job" - -///| -#owned(path, out) -extern "C" fn Job::stat( - path : Bytes, - out : @fd_util.Stat, - follow_symlink~ : Bool, -) -> Job = "moonbitlang_async_make_stat_job" - -///| -#owned(out) -extern "C" fn Job::fstat(fd : Int, out : @fd_util.Stat) -> Job = "moonbitlang_async_make_fstat_job" - -///| -extern "C" fn Job::seek(fd : Int, offset : Int64, whence : Int) -> Job = "moonbitlang_async_make_seek_job" - -///| -#borrow(job) -extern "C" fn Job::get_seek_result(job : Job) -> Int64 = "moonbitlang_async_get_seek_result" - -///| -#owned(path) -extern "C" fn Job::access(path : Bytes, amode : Int) -> Job = "moonbitlang_async_make_access_job" - -///| -#owned(path) -extern "C" fn Job::chmod(path : Bytes, mode : Int) -> Job = "moonbitlang_async_make_chmod_job" - -///| -extern "C" fn Job::fsync(fd : Int, only_data : Bool) -> Job = "moonbitlang_async_make_fsync_job" - -///| -#owned(path) -extern "C" fn Job::remove(path : Bytes) -> Job = "moonbitlang_async_make_remove_job" +#external +priv type JobWorkerFunc ///| -#owned(target, path) -extern "C" fn Job::symlink(target : Bytes, path : Bytes) -> Job = "moonbitlang_async_make_symlink_job" +#owned(init_job_data) +extern "C" fn spawn_worker( + init_job_id : Int, + init_job_data : JobData, + init_job_worker : JobWorkerFunc, +) -> Worker = "moonbitlang_async_spawn_worker" ///| -#owned(path) -extern "C" fn Job::mkdir(path : Bytes, mode : Int) -> Job = "moonbitlang_async_make_mkdir_job" - -///| -#owned(path) -extern "C" fn Job::rmdir(path : Bytes) -> Job = "moonbitlang_async_make_rmdir_job" - -///| -#owned(path) -extern "C" fn Job::opendir(path : Bytes) -> Job = "moonbitlang_async_make_opendir_job" - -///| -#borrow(job) -extern "C" fn Job::get_opendir_result(job : Job) -> Directory = "moonbitlang_async_get_opendir_result" - -///| -extern "C" fn Job::readdir(dir : Directory) -> Job = "moonbitlang_async_make_readdir_job" - -///| -#borrow(job) -extern "C" fn Job::get_readdir_result(job : Job) -> DirectoryEntry = "moonbitlang_async_get_readdir_result" - -///| -#owned(path) -extern "C" fn Job::realpath(path : Bytes) -> Job = "moonbitlang_async_make_realpath_job" - -///| -#borrow(job) -extern "C" fn Job::get_realpath_result(job : Job) -> @c_buffer.Buffer = "moonbitlang_async_get_realpath_result" - -///| -#owned(path, args, env, cwd) -extern "C" fn Job::spawn( - path : Bytes, - args : FixedArray[Bytes?], - env : FixedArray[Bytes?], - stdin : Int, - stdout : Int, - stderr : Int, - cwd : Bytes?, -) -> Job = "moonbitlang_async_make_spawn_job" - -///| -#owned(host) -extern "C" fn Job::getaddrinfo(host : Bytes) -> Job = "moonbitlang_async_make_getaddrinfo_job" +#borrow(worker) +#owned(job_data) +extern "C" fn wake_worker( + worker : Worker, + job_id : Int, + job_data : JobData, + job_worker : JobWorkerFunc, +) = "moonbitlang_async_wake_worker" ///| -#borrow(job) -extern "C" fn Job::get_getaddrinfo_result(job : Job) -> AddrInfo = "moonbitlang_async_get_getaddrinfo_result" +extern "C" fn fetch_completion_ffi(notify_recv : Int) -> Int = "moonbitlang_async_fetch_completion" diff --git a/src/internal/event_loop/worker_wbtest.mbt b/src/internal/event_loop/worker_test.mbt similarity index 93% rename from src/internal/event_loop/worker_wbtest.mbt rename to src/internal/event_loop/worker_test.mbt index 70138a2f..7d44d47d 100644 --- a/src/internal/event_loop/worker_wbtest.mbt +++ b/src/internal/event_loop/worker_test.mbt @@ -14,12 +14,16 @@ ///| #cfg(target="native") -extern "C" fn Job::sleep(time : Int) -> Job = "moonbitlang_async_make_sleep_job" +extern "C" fn blocking_sleep(time : Int) = "moonbitlang_async_blocking_sleep" ///| #cfg(target="native") async fn perform_sleep_job(t : Int) -> Unit { - perform_job_in_worker(Job::sleep(t), context="") |> ignore + struct Job { + time : Int + } + let job : Job = { time: t } + @event_loop.perform_job_in_worker(job, job => blocking_sleep(job.time)) } ///| diff --git a/src/os_error/error.mbt b/src/os_error/error.mbt index a24877e0..97538417 100644 --- a/src/os_error/error.mbt +++ b/src/os_error/error.mbt @@ -15,6 +15,9 @@ ///| pub extern "C" fn get_errno() -> Int = "moonbitlang_async_get_errno" +///| +pub extern "C" fn clear_errno() = "moonbitlang_async_clear_errno" + ///| extern "C" fn errno_is_nonblocking_io_error(errno : Int) -> Bool = "moonbitlang_async_is_nonblocking_io_error" diff --git a/src/os_error/pkg.generated.mbti b/src/os_error/pkg.generated.mbti index 149129cd..a0e38019 100644 --- a/src/os_error/pkg.generated.mbti +++ b/src/os_error/pkg.generated.mbti @@ -4,6 +4,8 @@ package "moonbitlang/async/os_error" // Values pub fn check_errno(String) -> Unit raise OSError +pub fn clear_errno() -> Unit + pub fn get_errno() -> Int pub fn is_nonblocking_io_error() -> Bool diff --git a/src/os_error/stub.c b/src/os_error/stub.c index 1024496b..8c379bfb 100644 --- a/src/os_error/stub.c +++ b/src/os_error/stub.c @@ -21,6 +21,10 @@ int moonbitlang_async_get_errno() { return errno; } +void moonbitlang_async_clear_errno() { + errno = 0; +} + int moonbitlang_async_is_nonblocking_io_error(int err) { return err == EAGAIN || err == EINPROGRESS || err == EWOULDBLOCK; } diff --git a/src/socket/addr.mbt b/src/socket/addr.mbt index 960615b7..8ac30638 100644 --- a/src/socket/addr.mbt +++ b/src/socket/addr.mbt @@ -104,10 +104,8 @@ pub fn Addr::parse(src : String) -> Addr raise InvalidAddr { } ///| -using @event_loop {type AddrInfo} - -///| -extern "C" fn AddrInfo::is_null(self : AddrInfo) -> Bool = "moonbitlang_async_addrinfo_is_null" +#external +priv type AddrInfo ///| /// Convert AddrInfo to Addr(support both IPv4 and IPv6) @@ -119,6 +117,51 @@ extern "C" fn AddrInfo::next(self : AddrInfo) -> AddrInfo = "moonbitlang_async_a ///| extern "C" fn AddrInfo::free(self : AddrInfo) = "freeaddrinfo" +///| +extern "C" fn gai_strerror(code : Int) -> @c_buffer.Buffer = "gai_strerror" + +///| +extern "C" fn get_EAI_SYSTEM() -> Int = "moonbitlang_async_get_EAI_SYSTEM" + +///| +#borrow(host, out) +extern "C" fn getaddrinfo_ffi(host : Bytes, out : Ref[AddrInfo]) -> Int = "moonbitlang_async_getaddrinfo" + +///| +async fn getaddrinfo(host : StringView, context~ : String) -> AddrInfo { + struct Job { + host : Bytes + out : Ref[AddrInfo] + mut err : Int + mut sys_err : Int + } + let host_bytes = @encoding/utf8.encode(host) + let job : Job = { + host: host_bytes, + out: @ref.new(@c_buffer.null_ptr()), + err: 0, + sys_err: 0, + } + @event_loop.perform_job_in_worker(job, allow_cancel=true, job => { + job.err = getaddrinfo_ffi(job.host, job.out) + if job.err == get_EAI_SYSTEM() { + job.sys_err = @os_error.get_errno() + } + }) + if job.sys_err != 0 { + raise @os_error.OSError(job.sys_err, context~) + } + if job.err != 0 { + let c_str = gai_strerror(job.err) + let len = c_str.strlen() + let bytes = FixedArray::make(len, b'\x00') + c_str.blit_to_bytes(dst=bytes, offset=0, len~) + let msg = @encoding/utf8.decode(bytes.unsafe_reinterpret_as_bytes()) + raise ResolveHostnameError(msg) + } + job.out.val +} + ///| /// Error message from failure in resolving a network hostname. /// The content of the error message is platfrom-dependent, @@ -147,10 +190,7 @@ pub async fn Addr::resolve( ) -> Addr { // TODO: Add option to prefer ipv6 or ipv4 let context = "@socket.Addr::resolve()" - let ai_root = match @event_loop.getaddrinfo(host, context~) { - Ok(ai_root) => ai_root - Err(msg) => raise ResolveHostnameError(msg) - } + let ai_root = getaddrinfo(host, context~) defer ai_root.free() let first_addr = ai_root.to_addr(port) @@ -160,7 +200,7 @@ pub async fn Addr::resolve( if not(first_addr.is_ipv6()) { return first_addr } - for ai = ai_root.next(); not(ai.is_null()); ai = ai.next() { + for ai = ai_root.next(); not(@c_buffer.ptr_is_null(ai)); ai = ai.next() { let addr = ai.to_addr(port) if not(addr.is_ipv6()) { return addr @@ -176,7 +216,7 @@ pub async fn Addr::resolve( if first_addr.is_ipv6() { return first_addr } - for ai = ai_root.next(); not(ai.is_null()); ai = ai.next() { + for ai = ai_root.next(); not(@c_buffer.ptr_is_null(ai)); ai = ai.next() { let addr = ai.to_addr(port) if addr.is_ipv6() { return addr diff --git a/src/socket/happy_eyeball.mbt b/src/socket/happy_eyeball.mbt index 1165378f..647b4193 100644 --- a/src/socket/happy_eyeball.mbt +++ b/src/socket/happy_eyeball.mbt @@ -32,16 +32,13 @@ pub async fn Tcp::connect_to_host( protocol? : IpProtocolPreference = NoPreference, ) -> Tcp { let context = "@socket.Tcp::connect_to_host()" - let ai = match @event_loop.getaddrinfo(host, context~) { - Ok(ai) => ai - Err(msg) => raise ResolveHostnameError(msg) - } + let ai = getaddrinfo(host, context~) defer ai.free() let mut result = None let mut conn_err = None async fn connect_with_protocol(protocol : IpProtocolPreference) { @async.with_task_group(fn(group) { - for ai = ai; not(ai.is_null()); ai = ai.next() { + for ai = ai; not(@c_buffer.ptr_is_null(ai)); ai = ai.next() { let addr = ai.to_addr(port) match (protocol, addr.is_ipv6()) { (OnlyV4 | FavorV4, true) => continue diff --git a/src/socket/socket.c b/src/socket/socket.c index 56d3e6b6..e7bb0ba0 100644 --- a/src/socket/socket.c +++ b/src/socket/socket.c @@ -198,6 +198,21 @@ void* moonbitlang_async_addrinfo_to_addr(struct addrinfo *addrinfo, int port) { } } +int32_t moonbitlang_async_getaddrinfo(char *hostname, struct addrinfo **out) { + struct addrinfo hint = { + AI_ADDRCONFIG, // ai_flags + AF_UNSPEC, // ai_family, support both IPv4 and IPv6 + 0, // ai_socktype + 0, // ai_protocol + 0, 0, 0, 0 + }; + return getaddrinfo(hostname, 0, &hint, out); +} + +int32_t moonbitlang_async_get_EAI_SYSTEM() { + return EAI_SYSTEM; +} + int moonbitlang_async_getsockname(int sock, struct sockaddr *addr_out) { socklen_t len = Moonbit_array_length(addr_out); return getsockname(sock, addr_out, &len);