Skip to content

Commit e7818ec

Browse files
committed
refactor thread pool, make it extensible
1 parent 7c6f413 commit e7818ec

22 files changed

+708
-1201
lines changed

src/fs/dir.mbt

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,12 @@ pub async fn rmdir(path : StringView, recursive? : Bool = false) -> Unit {
4444

4545
///|
4646
/// A directory in file system
47-
struct Directory(@event_loop.Directory)
47+
#external
48+
type Directory
49+
50+
///|
51+
#external
52+
priv type DirectoryEntry
4853

4954
///|
5055
extern "C" fn Directory::close_ffi(self : Directory) -> Int = "closedir"
@@ -54,29 +59,68 @@ pub fn Directory::close(self : Directory) -> Unit {
5459
guard self.close_ffi() == 0
5560
}
5661

62+
///|
63+
#borrow(path)
64+
extern "C" fn opendir_ffi(path : Bytes) -> Directory = "opendir"
65+
66+
///|
67+
async fn opendir_aux(path : StringView, context~ : String) -> Directory {
68+
struct Job {
69+
path : Bytes
70+
mut result : Directory
71+
mut err : Int
72+
}
73+
let path_bytes = @encoding/utf8.encode(path)
74+
let job : Job = { path: path_bytes, result: @c_buffer.null_ptr(), err: 0 }
75+
@event_loop.perform_job_in_worker(job, job => {
76+
job.result = opendir_ffi(job.path)
77+
job.err = @os_error.get_errno()
78+
}) catch {
79+
err => {
80+
if !@c_buffer.ptr_is_null(job.result) {
81+
ignore(job.result.close_ffi())
82+
}
83+
raise err
84+
}
85+
}
86+
if @c_buffer.ptr_is_null(job.result) {
87+
raise @os_error.OSError(job.err, context="\{context}: \{repr(path)}")
88+
}
89+
job.result
90+
}
91+
5792
///|
5893
/// Open the directory at `path`. `path` is encoded UTF8.
5994
/// If `path` is not a directory, an error will be raised
6095
pub async fn opendir(path : StringView) -> Directory {
61-
@event_loop.opendir(path, context="@fs.opendir()")
96+
opendir_aux(path, context="@fs.opendir()")
6297
}
6398

64-
///|
65-
using @event_loop {type DirectoryEntry}
66-
6799
///|
68100
extern "C" fn DirectoryEntry::name(ent : DirectoryEntry) -> Bytes = "moonbitlang_async_dirent_name"
69101

70102
///|
71-
extern "C" fn DirectoryEntry::is_null(ent : DirectoryEntry) -> Bool = "moonbitlang_async_dirent_is_null"
103+
extern "C" fn readdir_ffi(dir : Directory) -> DirectoryEntry = "readdir"
72104

73105
///|
74106
async fn Directory::read_next(dir : Directory) -> String? {
75-
let dir_ent = @event_loop.readdir(dir.0, context="@fs.readdir()")
76-
if dir_ent.is_null() {
107+
struct Job {
108+
dir : Directory
109+
mut result : DirectoryEntry
110+
mut err : Int
111+
}
112+
let job : Job = { dir, result: @c_buffer.null_ptr(), err: 0 }
113+
@event_loop.perform_job_in_worker(job, job => {
114+
@os_error.clear_errno()
115+
job.result = readdir_ffi(job.dir)
116+
job.err = @os_error.get_errno()
117+
})
118+
if job.err != 0 {
119+
raise @os_error.OSError(job.err, context="@fs.readdir()")
120+
} else if @c_buffer.ptr_is_null(job.result) {
77121
None
78122
} else {
79-
Some(@encoding/utf8.decode(dir_ent.name()))
123+
Some(@encoding/utf8.decode(job.result.name()))
80124
}
81125
}
82126

@@ -119,7 +163,7 @@ pub async fn readdir(
119163
include_special? : Bool = false,
120164
sort? : Bool = false,
121165
) -> Array[String] {
122-
let dir : Directory = @event_loop.opendir(path, context="@fs.readdir()")
166+
let dir : Directory = opendir_aux(path, context="@fs.readdir()")
123167
defer dir.close()
124168
let list = dir.read_all(include_hidden~, include_special~)
125169
if sort {

src/fs/fs.mbt

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -239,38 +239,6 @@ pub async fn remove(path : StringView) -> Unit {
239239
@event_loop.remove(path, context="@fs.remove()")
240240
}
241241

242-
///|
243-
/// Determine how an offset is interpreted when seeking in a file:
244-
/// - `FromStart`: absolute offset from the start of the file
245-
/// - `FromEnd`: offset is relative to end of file
246-
/// - `Relative`: offset is relative to current position in the file
247-
pub(all) enum SeekMode {
248-
FromStart = 0
249-
FromEnd
250-
Relative
251-
}
252-
253-
///|
254-
fn SeekMode::to_int(self : SeekMode) -> Int = "%identity"
255-
256-
///|
257-
/// Change current position in file for reading and writing.
258-
/// Can only be applied to a regular file, otherwise `seek` will fail.
259-
/// The offset is interpreted using `mode`, see `SeekMode` for more detail.
260-
/// Current position in the file after seeking (relative to start of file)
261-
/// will be returned.
262-
#deprecated("use `read_at` or `write_at` instead", skip_current_package=true)
263-
pub async fn File::seek(self : File, offset : Int64, mode~ : SeekMode) -> Int64 {
264-
self.io.seek(offset, mode.to_int(), context="@fs.File::seek()")
265-
}
266-
267-
///|
268-
/// Get current position in the file. Can only be applied to a regular file.
269-
#deprecated("use `read_at` or `write_at` instead", skip_current_package=true)
270-
pub async fn File::curr_pos(self : File) -> Int64 {
271-
self.seek(0, mode=Relative)
272-
}
273-
274242
///|
275243
/// Get the size of the file. This method will not change position in the file.
276244
/// Can only be applied to a regular file.

src/fs/moon.pkg.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"moonbitlang/async/io",
55
"moonbitlang/async/internal/event_loop",
66
"moonbitlang/async/internal/fd_util",
7+
"moonbitlang/async/internal/c_buffer",
78
"moonbitlang/async",
89
"moonbitlang/async/semaphore"
910
],

src/fs/pkg.generated.mbti

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,11 @@ pub fn File::as_dir(Self) -> Directory raise
6666
pub async fn File::atime(Self) -> (Int64, Int)
6767
pub fn File::close(Self) -> Unit
6868
pub async fn File::ctime(Self) -> (Int64, Int)
69-
#deprecated
70-
pub async fn File::curr_pos(Self) -> Int64
7169
pub fn File::fd(Self) -> Int
7270
pub fn File::kind(Self) -> FileKind
7371
pub async fn File::mtime(Self) -> (Int64, Int)
7472
pub async fn File::read_at(Self, FixedArray[Byte], position~ : Int64, offset? : Int, len? : Int) -> Int
7573
pub async fn File::read_exactly_at(Self, Int, position~ : Int64) -> Bytes
76-
#deprecated
77-
pub async fn File::seek(Self, Int64, mode~ : SeekMode) -> Int64
7874
pub async fn File::size(Self) -> Int64
7975
pub async fn File::sync(Self, only_data? : Bool) -> Unit
8076
pub async fn File::write_at(Self, BytesView, position~ : Int64) -> Unit
@@ -100,12 +96,6 @@ pub(all) enum Mode {
10096
ReadWrite
10197
}
10298

103-
pub(all) enum SeekMode {
104-
FromStart
105-
FromEnd
106-
Relative
107-
}
108-
10999
pub(all) enum SyncMode {
110100
NoSync
111101
Data

src/fs/seek_wbtest.mbt

Lines changed: 0 additions & 54 deletions
This file was deleted.

src/internal/c_buffer/c_buffer.mbt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,26 @@ extern "C" fn Buffer::null() -> Buffer = "moonbitlang_async_null_pointer"
4848
///|
4949
pub let null : Buffer = Buffer::null()
5050

51+
///|
52+
#borrow(buf)
53+
pub extern "C" fn Buffer::is_null(buf : Buffer) -> Bool = "moonbitlang_async_is_null"
54+
5155
///|
5256
pub extern "C" fn Buffer::free(buf : Buffer) = "free"
57+
58+
///|
59+
fn[X, Y] unsafe_cast(x : X) -> Y = "%identity"
60+
61+
///|
62+
/// Get a null C pointer.
63+
/// WARNING: the result type MUST be a foreign pointer type declared with `#external`
64+
pub fn[X] null_ptr() -> X {
65+
unsafe_cast(null)
66+
}
67+
68+
///|
69+
/// Check if a C pointer is null.
70+
/// WARNING: the parameter MUST be a foreign pointer type declared with `#external`
71+
pub fn[X] ptr_is_null(ptr : X) -> Bool {
72+
Buffer::is_null(unsafe_cast(ptr))
73+
}

src/internal/c_buffer/pkg.generated.mbti

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ package "moonbitlang/async/internal/c_buffer"
44
// Values
55
pub let null : Buffer
66

7+
pub fn[X] null_ptr() -> X
8+
9+
pub fn[X] ptr_is_null(X) -> Bool
10+
711
// Errors
812

913
// Types and methods
@@ -14,6 +18,7 @@ pub fn Buffer::blit_to_bytes(Self, dst~ : FixedArray[Byte], offset~ : Int, len~
1418
pub fn Buffer::free(Self) -> Unit
1519
#alias("_[_]")
1620
pub fn Buffer::get(Self, Int) -> Byte
21+
pub fn Buffer::is_null(Self) -> Bool
1722
pub fn Buffer::strlen(Self) -> Int
1823

1924
// Type aliases

src/internal/c_buffer/stub.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,7 @@ int32_t moonbitlang_async_strlen(char *str) {
3737
char *moonbitlang_async_null_pointer() {
3838
return 0;
3939
}
40+
41+
int32_t moonbitlang_async_is_null(void *ptr) {
42+
return ptr == 0;
43+
}

0 commit comments

Comments
 (0)