Skip to content

Commit ca8f219

Browse files
committed
refactor thread pool, make it extensible
1 parent 7c6f413 commit ca8f219

24 files changed

+716
-1224
lines changed

src/fs/dir.mbt

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,12 @@ pub async fn rmdir(path : StringView, recursive? : Bool = false) -> Unit {
4444

4545
///|
4646
/// A directory in file system
47-
struct Directory(@event_loop.Directory)
47+
#external
48+
type Directory
49+
50+
///|
51+
#external
52+
priv type DirectoryEntry
4853

4954
///|
5055
extern "C" fn Directory::close_ffi(self : Directory) -> Int = "closedir"
@@ -54,29 +59,68 @@ pub fn Directory::close(self : Directory) -> Unit {
5459
guard self.close_ffi() == 0
5560
}
5661

62+
///|
63+
#borrow(path)
64+
extern "C" fn opendir_ffi(path : Bytes) -> Directory = "opendir"
65+
66+
///|
67+
async fn opendir_aux(path : StringView, context~ : String) -> Directory {
68+
struct Job {
69+
path : Bytes
70+
mut result : Directory
71+
mut err : Int
72+
}
73+
let path_bytes = @encoding/utf8.encode(path)
74+
let job : Job = { path: path_bytes, result: @c_buffer.null_ptr(), err: 0 }
75+
@event_loop.perform_job_in_worker(job, job => {
76+
job.result = opendir_ffi(job.path)
77+
job.err = @os_error.get_errno()
78+
}) catch {
79+
err => {
80+
if !@c_buffer.ptr_is_null(job.result) {
81+
ignore(job.result.close_ffi())
82+
}
83+
raise err
84+
}
85+
}
86+
if @c_buffer.ptr_is_null(job.result) {
87+
raise @os_error.OSError(job.err, context="\{context}: \{repr(path)}")
88+
}
89+
job.result
90+
}
91+
5792
///|
5893
/// Open the directory at `path`. `path` is encoded UTF8.
5994
/// If `path` is not a directory, an error will be raised
6095
pub async fn opendir(path : StringView) -> Directory {
61-
@event_loop.opendir(path, context="@fs.opendir()")
96+
opendir_aux(path, context="@fs.opendir()")
6297
}
6398

64-
///|
65-
using @event_loop {type DirectoryEntry}
66-
6799
///|
68100
extern "C" fn DirectoryEntry::name(ent : DirectoryEntry) -> Bytes = "moonbitlang_async_dirent_name"
69101

70102
///|
71-
extern "C" fn DirectoryEntry::is_null(ent : DirectoryEntry) -> Bool = "moonbitlang_async_dirent_is_null"
103+
extern "C" fn readdir_ffi(dir : Directory) -> DirectoryEntry = "readdir"
72104

73105
///|
74106
async fn Directory::read_next(dir : Directory) -> String? {
75-
let dir_ent = @event_loop.readdir(dir.0, context="@fs.readdir()")
76-
if dir_ent.is_null() {
107+
struct Job {
108+
dir : Directory
109+
mut result : DirectoryEntry
110+
mut err : Int
111+
}
112+
let job : Job = { dir, result: @c_buffer.null_ptr(), err: 0 }
113+
@event_loop.perform_job_in_worker(job, job => {
114+
@os_error.clear_errno()
115+
job.result = readdir_ffi(job.dir)
116+
job.err = @os_error.get_errno()
117+
})
118+
if job.err != 0 {
119+
raise @os_error.OSError(job.err, context="@fs.readdir()")
120+
} else if @c_buffer.ptr_is_null(job.result) {
77121
None
78122
} else {
79-
Some(@encoding/utf8.decode(dir_ent.name()))
123+
Some(@encoding/utf8.decode(job.result.name()))
80124
}
81125
}
82126

@@ -119,7 +163,7 @@ pub async fn readdir(
119163
include_special? : Bool = false,
120164
sort? : Bool = false,
121165
) -> Array[String] {
122-
let dir : Directory = @event_loop.opendir(path, context="@fs.readdir()")
166+
let dir : Directory = opendir_aux(path, context="@fs.readdir()")
123167
defer dir.close()
124168
let list = dir.read_all(include_hidden~, include_special~)
125169
if sort {

src/fs/dir_test.mbt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ async test "read_all" {
2323
@json.inspect(list, content=[
2424
"fs.mbt", "stub.c", "dir.mbt", "README.md", "utils.mbt", "dir_test.mbt", "eof_test.mbt",
2525
"README.mbt.md", "constants.mbt", "moon.pkg.json", "stat_test.mbt", "walk_test.mbt",
26-
"mkdir_test.mbt", "access_test.mbt", "create_test.mbt", "seek_wbtest.mbt", "read_all_test.mbt",
26+
"mkdir_test.mbt", "access_test.mbt", "create_test.mbt", "read_all_test.mbt",
2727
"realpath_test.mbt", "unimplemented.mbt", "pkg.generated.mbti", "text_file_test.mbt",
2828
"timestamp_test.mbt", "random_access_test.mbt",
2929
])
@@ -55,8 +55,8 @@ async test "as_dir" {
5555
"utils.mbt: Regular", "dir_test.mbt: Regular", "eof_test.mbt: Regular", "README.mbt.md: Regular",
5656
"constants.mbt: Regular", "moon.pkg.json: Regular", "stat_test.mbt: Regular",
5757
"walk_test.mbt: Regular", "mkdir_test.mbt: Regular", "access_test.mbt: Regular",
58-
"create_test.mbt: Regular", "seek_wbtest.mbt: Regular", "read_all_test.mbt: Regular",
59-
"realpath_test.mbt: Regular", "unimplemented.mbt: Regular", "pkg.generated.mbti: Regular",
60-
"text_file_test.mbt: Regular", "timestamp_test.mbt: Regular", "random_access_test.mbt: Regular",
58+
"create_test.mbt: Regular", "read_all_test.mbt: Regular", "realpath_test.mbt: Regular",
59+
"unimplemented.mbt: Regular", "pkg.generated.mbti: Regular", "text_file_test.mbt: Regular",
60+
"timestamp_test.mbt: Regular", "random_access_test.mbt: Regular",
6161
])
6262
}

src/fs/fs.mbt

Lines changed: 19 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -239,38 +239,6 @@ pub async fn remove(path : StringView) -> Unit {
239239
@event_loop.remove(path, context="@fs.remove()")
240240
}
241241

242-
///|
243-
/// Determine how an offset is interpreted when seeking in a file:
244-
/// - `FromStart`: absolute offset from the start of the file
245-
/// - `FromEnd`: offset is relative to end of file
246-
/// - `Relative`: offset is relative to current position in the file
247-
pub(all) enum SeekMode {
248-
FromStart = 0
249-
FromEnd
250-
Relative
251-
}
252-
253-
///|
254-
fn SeekMode::to_int(self : SeekMode) -> Int = "%identity"
255-
256-
///|
257-
/// Change current position in file for reading and writing.
258-
/// Can only be applied to a regular file, otherwise `seek` will fail.
259-
/// The offset is interpreted using `mode`, see `SeekMode` for more detail.
260-
/// Current position in the file after seeking (relative to start of file)
261-
/// will be returned.
262-
#deprecated("use `read_at` or `write_at` instead", skip_current_package=true)
263-
pub async fn File::seek(self : File, offset : Int64, mode~ : SeekMode) -> Int64 {
264-
self.io.seek(offset, mode.to_int(), context="@fs.File::seek()")
265-
}
266-
267-
///|
268-
/// Get current position in the file. Can only be applied to a regular file.
269-
#deprecated("use `read_at` or `write_at` instead", skip_current_package=true)
270-
pub async fn File::curr_pos(self : File) -> Int64 {
271-
self.seek(0, mode=Relative)
272-
}
273-
274242
///|
275243
/// Get the size of the file. This method will not change position in the file.
276244
/// Can only be applied to a regular file.
@@ -281,9 +249,6 @@ pub async fn File::size(self : File) -> Int64 {
281249
///|
282250
extern "C" fn as_dir_ffi(fd : Int) -> Directory = "fdopendir"
283251

284-
///|
285-
extern "C" fn Directory::is_null(dir : Directory) -> Bool = "moonbitlang_async_dir_is_null"
286-
287252
///|
288253
/// Convert a file to directory.
289254
/// If the file is not a directory, an error will be raised.
@@ -295,7 +260,7 @@ extern "C" fn Directory::is_null(dir : Directory) -> Bool = "moonbitlang_async_d
295260
pub fn File::as_dir(self : File) -> Directory raise {
296261
let fd = self.io.detach_from_event_loop()
297262
let dir = as_dir_ffi(fd)
298-
if dir.is_null() {
263+
if @c_buffer.ptr_is_null(dir) {
299264
let context = "@fs.File::as_dir()"
300265
@fd_util.close(fd, context~)
301266
@os_error.check_errno(context)
@@ -406,6 +371,10 @@ pub async fn symlink(target~ : StringView, path : StringView) -> Unit {
406371
@event_loop.symlink(target, path, context="@fs.link()")
407372
}
408373

374+
///|
375+
#borrow(path)
376+
extern "C" fn chmod_ffi(path : Bytes, mode : Int) -> Int = "moonbitlang_async_chmod"
377+
409378
///|
410379
/// Change the permission of a file.
411380
/// Permission is represented as an integer in UNIX permission style.
@@ -414,5 +383,18 @@ pub async fn symlink(target~ : StringView, path : StringView) -> Unit {
414383
/// - users in the owner group of the file can read the file (`4`)
415384
/// - other users can do nothing to the file
416385
pub async fn chmod(path : StringView, mode : Int) -> Unit {
417-
@event_loop.chmod(path, mode, context="@fs.chmod()")
386+
struct Job {
387+
path : Bytes
388+
mode : Int
389+
mut err : Int
390+
}
391+
let path_bytes = @encoding/utf8.encode(path)
392+
let job : Job = { path: path_bytes, mode, err: 0 }
393+
@event_loop.perform_job_in_worker(job, job => if chmod_ffi(job.path, job.mode) <
394+
0 {
395+
job.err = @os_error.get_errno()
396+
})
397+
if job.err != 0 {
398+
raise @os_error.OSError(job.err, context="@fs.chmod(\{repr(path)})")
399+
}
418400
}

src/fs/moon.pkg.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"moonbitlang/async/io",
55
"moonbitlang/async/internal/event_loop",
66
"moonbitlang/async/internal/fd_util",
7+
"moonbitlang/async/internal/c_buffer",
78
"moonbitlang/async",
89
"moonbitlang/async/semaphore"
910
],

src/fs/pkg.generated.mbti

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,11 @@ pub fn File::as_dir(Self) -> Directory raise
6666
pub async fn File::atime(Self) -> (Int64, Int)
6767
pub fn File::close(Self) -> Unit
6868
pub async fn File::ctime(Self) -> (Int64, Int)
69-
#deprecated
70-
pub async fn File::curr_pos(Self) -> Int64
7169
pub fn File::fd(Self) -> Int
7270
pub fn File::kind(Self) -> FileKind
7371
pub async fn File::mtime(Self) -> (Int64, Int)
7472
pub async fn File::read_at(Self, FixedArray[Byte], position~ : Int64, offset? : Int, len? : Int) -> Int
7573
pub async fn File::read_exactly_at(Self, Int, position~ : Int64) -> Bytes
76-
#deprecated
77-
pub async fn File::seek(Self, Int64, mode~ : SeekMode) -> Int64
7874
pub async fn File::size(Self) -> Int64
7975
pub async fn File::sync(Self, only_data? : Bool) -> Unit
8076
pub async fn File::write_at(Self, BytesView, position~ : Int64) -> Unit
@@ -100,12 +96,6 @@ pub(all) enum Mode {
10096
ReadWrite
10197
}
10298

103-
pub(all) enum SeekMode {
104-
FromStart
105-
FromEnd
106-
Relative
107-
}
108-
10999
pub(all) enum SyncMode {
110100
NoSync
111101
Data

src/fs/seek_wbtest.mbt

Lines changed: 0 additions & 54 deletions
This file was deleted.

src/fs/stub.c

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,13 @@
2323
#include <sys/stat.h>
2424
#include <moonbit.h>
2525

26-
int moonbitlang_async_dir_is_null(DIR *dir) {
27-
return dir == 0;
28-
}
29-
3026
moonbit_bytes_t moonbitlang_async_dirent_name(struct dirent *dirent) {
3127
int len = strlen(dirent->d_name);
3228
moonbit_bytes_t result = moonbit_make_bytes(len, 0);
3329
memcpy(result, dirent->d_name, len);
3430
return result;
3531
}
3632

37-
int moonbitlang_async_dirent_is_null(struct dirent *dirent) {
38-
return dirent == 0;
39-
}
40-
4133
int moonbitlang_async_get_R_OK() {
4234
return R_OK;
4335
}
@@ -85,3 +77,7 @@ int moonbitlang_async_get_O_TRUNC() {
8577
int moonbitlang_async_get_O_CREAT() {
8678
return O_CREAT;
8779
}
80+
81+
int moonbitlang_async_chmod(char *path, int32_t mode) {
82+
return chmod(path, mode);
83+
}

src/internal/c_buffer/c_buffer.mbt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,26 @@ extern "C" fn Buffer::null() -> Buffer = "moonbitlang_async_null_pointer"
4848
///|
4949
pub let null : Buffer = Buffer::null()
5050

51+
///|
52+
#borrow(buf)
53+
pub extern "C" fn Buffer::is_null(buf : Buffer) -> Bool = "moonbitlang_async_is_null"
54+
5155
///|
5256
pub extern "C" fn Buffer::free(buf : Buffer) = "free"
57+
58+
///|
59+
fn[X, Y] unsafe_cast(x : X) -> Y = "%identity"
60+
61+
///|
62+
/// Get a null C pointer.
63+
/// WARNING: the result type MUST be a foreign pointer type declared with `#external`
64+
pub fn[X] null_ptr() -> X {
65+
unsafe_cast(null)
66+
}
67+
68+
///|
69+
/// Check if a C pointer is null.
70+
/// WARNING: the parameter MUST be a foreign pointer type declared with `#external`
71+
pub fn[X] ptr_is_null(ptr : X) -> Bool {
72+
Buffer::is_null(unsafe_cast(ptr))
73+
}

src/internal/c_buffer/pkg.generated.mbti

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ package "moonbitlang/async/internal/c_buffer"
44
// Values
55
pub let null : Buffer
66

7+
pub fn[X] null_ptr() -> X
8+
9+
pub fn[X] ptr_is_null(X) -> Bool
10+
711
// Errors
812

913
// Types and methods
@@ -14,6 +18,7 @@ pub fn Buffer::blit_to_bytes(Self, dst~ : FixedArray[Byte], offset~ : Int, len~
1418
pub fn Buffer::free(Self) -> Unit
1519
#alias("_[_]")
1620
pub fn Buffer::get(Self, Int) -> Byte
21+
pub fn Buffer::is_null(Self) -> Bool
1722
pub fn Buffer::strlen(Self) -> Int
1823

1924
// Type aliases

src/internal/c_buffer/stub.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,7 @@ int32_t moonbitlang_async_strlen(char *str) {
3737
char *moonbitlang_async_null_pointer() {
3838
return 0;
3939
}
40+
41+
int32_t moonbitlang_async_is_null(void *ptr) {
42+
return ptr == 0;
43+
}

0 commit comments

Comments
 (0)