Skip to content

Commit f8a9c79

Browse files
committed
eio_linux: refactor fixed buffer core
Instead of having separate Alloc, Alloc_or_wait and Free effects, the scheduler now provides a single Get effect to return itself, and the actual work is now done in the calling fiber. This is cleaner, and seems to be slightly faster too. Note that `alloc_fixed_or_wait` is currently not cancellable (it wasn't before either, but it's more obvious now). It would be possible to use DLS to store the scheduler rather than using an effect. However, the improvement in speed is minimal and there are some complications with sys-threads, so probably better to wait for OCaml to support thread-local-storage first.
1 parent d47b5e2 commit f8a9c79

File tree

4 files changed

+36
-41
lines changed

4 files changed

+36
-41
lines changed

lib_eio/core/eio__core.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ module Private = struct
77
module Suspend = Suspend
88
module Cells = Cells
99
module Broadcast = Broadcast
10+
module Single_waiter = Single_waiter
1011
module Trace = Trace
1112
module Fiber_context = Cancel.Fiber_context
1213
module Debug = Debug

lib_eio/core/eio__core.mli

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,7 @@ module Private : sig
606606

607607
module Cells = Cells
608608
module Broadcast = Broadcast
609+
module Single_waiter = Single_waiter
609610

610611
(** Every fiber has an associated context. *)
611612
module Fiber_context : sig

lib_eio_linux/low_level.ml

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,34 @@ let write ?file_offset:off fd buf len =
207207
raise @@ Err.wrap (Uring.error_of_errno res) "write" ""
208208
)
209209

210-
let alloc_fixed () = Effect.perform Sched.Alloc
211-
212-
let alloc_fixed_or_wait () = Effect.perform Sched.Alloc_or_wait
213-
214-
let free_fixed buf = Effect.perform (Sched.Free buf)
210+
let alloc_fixed () =
211+
let s = Sched.get () in
212+
match s.mem with
213+
| None -> None
214+
| Some mem ->
215+
match Uring.Region.alloc mem with
216+
| buf -> Some buf
217+
| exception Uring.Region.No_space -> None
218+
219+
let alloc_fixed_or_wait () =
220+
let s = Sched.get () in
221+
match s.mem with
222+
| None -> failwith "No fixed buffer available"
223+
| Some mem ->
224+
match Uring.Region.alloc mem with
225+
| buf -> buf
226+
| exception Uring.Region.No_space ->
227+
let id = Eio.Private.Trace.mint_id () in
228+
let trigger = Eio.Private.Single_waiter.create () in
229+
Queue.push trigger s.mem_q;
230+
(* todo: remove protect; but needs to remove from queue on cancel *)
231+
Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id
232+
233+
let free_fixed buf =
234+
let s = Sched.get () in
235+
match Queue.take_opt s.mem_q with
236+
| None -> Uring.Region.free buf
237+
| Some k -> Eio.Private.Single_waiter.wake k (Ok buf)
215238

216239
let splice src ~dst ~len =
217240
Fd.use_exn "splice-src" src @@ fun src ->

lib_eio_linux/sched.ml

Lines changed: 6 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type t = {
5050
uring: io_job Uring.t;
5151
mem: Uring.Region.t option;
5252
io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *)
53-
mem_q : Uring.Region.chunk Suspended.t Queue.t;
53+
mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t;
5454

5555
(* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *)
5656
run_q : runnable Lf_queue.t;
@@ -74,9 +74,9 @@ type t = {
7474
type _ Effect.t +=
7575
| Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t
7676
| Cancel : io_job Uring.job -> unit Effect.t
77-
| Alloc : Uring.Region.chunk option Effect.t
78-
| Alloc_or_wait : Uring.Region.chunk Effect.t
79-
| Free : Uring.Region.chunk -> unit Effect.t
77+
| Get : t Effect.t
78+
79+
let get () = Effect.perform Get
8080

8181
let wake_buffer =
8282
let b = Bytes.create 8 in
@@ -339,21 +339,6 @@ and complete_rw_req st ({len; cur_off; action; _} as req) res =
339339
| _, Exactly len -> Suspended.continue action len
340340
| n, Upto _ -> Suspended.continue action n
341341

342-
let alloc_buf_or_wait st k =
343-
match st.mem with
344-
| None -> Suspended.discontinue k (Failure "No fixed buffer available")
345-
| Some mem ->
346-
match Uring.Region.alloc mem with
347-
| buf -> Suspended.continue k buf
348-
| exception Uring.Region.No_space ->
349-
Queue.push k st.mem_q;
350-
schedule st
351-
352-
let free_buf st buf =
353-
match Queue.take_opt st.mem_q with
354-
| None -> Uring.Region.free buf
355-
| Some k -> enqueue_thread st k buf
356-
357342
let rec enqueue_poll_add fd poll_mask st action =
358343
Trace.log "poll_add";
359344
let retry = with_cancel_hook ~action st (fun () ->
@@ -411,8 +396,9 @@ let run ~extra_effects st main arg =
411396
Fiber_context.destroy fiber;
412397
Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ())
413398
);
414-
effc = fun (type a) (e : a Effect.t) ->
399+
effc = fun (type a) (e : a Effect.t) : ((a, _) continuation -> _) option ->
415400
match e with
401+
| Get -> Some (fun k -> continue k st)
416402
| Enter fn -> Some (fun k ->
417403
match Fiber_context.get_error fiber with
418404
| Some e -> discontinue k e
@@ -467,22 +453,6 @@ let run ~extra_effects st main arg =
467453
Eio_unix.Private.Thread_pool.submit st.thread_pool ~ctx:fiber ~enqueue fn;
468454
schedule st
469455
)
470-
| Alloc -> Some (fun k ->
471-
match st.mem with
472-
| None -> continue k None
473-
| Some mem ->
474-
match Uring.Region.alloc mem with
475-
| buf -> continue k (Some buf)
476-
| exception Uring.Region.No_space -> continue k None
477-
)
478-
| Alloc_or_wait -> Some (fun k ->
479-
let k = { Suspended.k; fiber } in
480-
alloc_buf_or_wait st k
481-
)
482-
| Free buf -> Some (fun k ->
483-
free_buf st buf;
484-
continue k ()
485-
)
486456
| e -> extra_effects.effc e
487457
}
488458
in

0 commit comments

Comments
 (0)