Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,16 @@ let rec write_all fd = function
let result, data = wait_with_retry uring in
assert (data = `Write_all); (* There aren't any other requests pending *)
assert (result > 0); (* Check for error return *)
let bufs = Cstruct.shiftv bufs result in
let bufs = Uring.Bstruct.shiftv bufs result in
write_all fd bufs
```

```ocaml
# write_all fd Cstruct.[of_string "INFO: "; of_string "A log message"];;
# let slab = Uring.Slab.create Uring.major_alloc_byte_size;;
val slab : Uring.Slab.t = <abstr>
# let bs = Uring.Slab.slice_strings slab ["INFO: "; "A log message"];;
val bs : Uring.Bstruct.t list = [<abstr>; <abstr>]
# write_all fd bs;;
- : unit = ()
```

Expand Down
14 changes: 8 additions & 6 deletions bench/readv.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ let rec wait t handle =
| None -> wait t handle
| Some { result; data = buf } -> handle result buf

let run_bechmark ~polling_timeout fd =
let run_bechmark slab ~polling_timeout fd =
let got = ref 0 in
(* For polling mode, [queue_depth] needs to be slightly larger than [n_concurrent] or submission
occasionally fails for some reason. *)
let t = Uring.create ?polling_timeout ~queue_depth:(n_concurrent * 2) () in
(* We start by submitting [n_concurrent] reads. *)
for _ = 1 to n_concurrent do
let buf = Cstruct.create buffer_size in
let buf = Uring.Slab.slice slab buffer_size in
let _job : _ Uring.job = Uring.readv t fd [buf] ~file_offset:Optint.Int63.zero [buf] |> Option.get in
()
done;
Expand Down Expand Up @@ -55,8 +55,10 @@ let run_bechmark ~polling_timeout fd =

let () =
let fd = Unix.openfile "/dev/zero" Unix.[O_RDONLY] 0 in
run_bechmark fd ~polling_timeout:None;
run_bechmark fd ~polling_timeout:(Some 1000);
run_bechmark fd ~polling_timeout:None;
run_bechmark fd ~polling_timeout:(Some 1000);
(* TODO: The slab doesn't release space so this could in theory run out of room... *)
let slab = Uring.Slab.create Uring.major_alloc_byte_size in
run_bechmark slab fd ~polling_timeout:None;
run_bechmark slab fd ~polling_timeout:(Some 1000);
run_bechmark slab fd ~polling_timeout:None;
run_bechmark slab fd ~polling_timeout:(Some 1000);
Unix.close fd
4 changes: 2 additions & 2 deletions lib/uring/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(name uring)
(public_name uring)
(foreign_archives uring)
(libraries cstruct fmt optint unix)
(libraries fmt optint unix compiler-libs.optcomp)
(foreign_stubs
(language c)
(names uring_stubs)
Expand All @@ -11,7 +11,7 @@
(extra_deps include/liburing/compat.h)))

(rule
(targets config.ml)
(targets uring_config.ml)
(deps
include/liburing.h
include/liburing/io_uring.h
Expand Down
2 changes: 1 addition & 1 deletion lib/uring/include/discover.ml
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ let () =
let at_struct = List.map (fun (name, v) -> Printf.sprintf " let %s = 0x%x" name v) at_flags in
let mask_struct = List.map (fun (name, v) -> Printf.sprintf " let %s = 0x%x" name v) mask_flags in
let attr_struct = List.map (fun (name, v) -> Printf.sprintf " let %s = 0x%x" name v) attr_flags in
C.Flags.write_lines "config.ml"
C.Flags.write_lines "uring_config.ml"
(defs @
["module Op : sig";
" type t";
Expand Down
12 changes: 4 additions & 8 deletions lib/uring/region.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

(* TODO turn into a variable length slab allocator *)
type t = {
buf: Cstruct.buffer;
buf: bytes;
block_size: int;
freelist: int Queue.t;
}
Expand Down Expand Up @@ -37,14 +37,10 @@ let length_option t = function
else
len

let to_cstruct ?len (t, chunk) =
Cstruct.of_bigarray ~off:chunk ~len:(length_option t len) t.buf
let to_bytes ?len (t, chunk) =
Bytes.sub t.buf chunk (length_option t len)

let to_bigstring ?len (t, chunk) =
Bigarray.Array1.sub t.buf chunk (length_option t len)

let to_string ?len (t, chunk) =
Cstruct.to_string (to_cstruct ?len (t, chunk))
let to_string ?len (t, chunk) = Bytes.to_string (to_bytes ?len (t, chunk))

let avail {freelist;_} = Queue.length freelist

Expand Down
10 changes: 2 additions & 8 deletions lib/uring/region.mli
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type t
(** [No_space] is raised when an allocation request cannot
be satisfied. *)

val init: block_size:int -> Cstruct.buffer -> int -> t
val init: block_size:int -> bytes -> int -> t
(** [init ~block_size buf slots] initialises a region from
the buffer [buf] with total size of [block_size * slots]. *)

Expand All @@ -37,18 +37,12 @@ type t
offset in its associated region. This can be used in IO calls
involving that memory. *)

val to_cstruct : ?len:int -> chunk -> Cstruct.t
val to_bytes : ?len:int -> chunk -> bytes
(** [to_cstruct chunk] is a cstruct of [chunk]'s slice of the region.
Note that this is a zero-copy view into the underlying region [t]
and so [chunk] should not be freed until this cstruct is no longer used.
@param len Use only the first [len] bytes of [chunk]. *)

val to_bigstring : ?len:int -> chunk -> Cstruct.buffer
(** [to_bigstring] is like {!to_cstruct}, but creates a {!Bigarray}.
Note that this is a zero-copy view into the underlying region [t]
and so [chunk] should not be freed until this Bigarray reference is no longer used.
@param len Use only the first [len] bytes of [chunk]. *)

val to_string : ?len:int -> chunk -> string
(** [to_string ?len chunk] will return a copy of [chunk] as an OCaml string.
@param len Use only the first [len] bytes of [chunk]. *)
Expand Down
96 changes: 62 additions & 34 deletions lib/uring/uring.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

let major_alloc_byte_size = Config.max_young_wosize * Sys.word_size
module Config = Uring_config

module Bstruct = Util.Bstruct
module Slab = Util.Slab

module Private = struct
module Heap = Heap
end
Expand Down Expand Up @@ -191,36 +197,29 @@ end

module Op = Config.Op

(* The C stubs rely on the layout of Cstruct.t, so we just check here that it hasn't changed. *)
module Check_cstruct : sig
[@@@warning "-34"]
type t = private {
buffer: (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t;
off : int;
len : int;
}
end = Cstruct

(*
* A Sketch buffer is an area used to hold objects that remain alive
* until the next `Uring.submit`.
* For example an `iovec` must be passed to io_uring in `readv` and
* `writev`, once we call `Uring.submit` the `iovec` structures are
* copied by the kernel and we can release them, which we do.
*)
module Sketch = struct
module Sketch = struct
type buffer = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
type t = {
mutable buffer : Cstruct.buffer;
mutable buffer : buffer;
mutable off : int;
mutable old_buffers : Cstruct.buffer list;
mutable old_buffers : buffer list;
}

type ptr = Cstruct.buffer * int * int
type ptr = buffer * int * int

let create_buffer len = Bigarray.(Array1.create char c_layout len)

let empty = create_buffer 0

let create () =
{ buffer = Cstruct.empty.buffer; off = 0; old_buffers = [] }
{ buffer = empty; off = 0; old_buffers = [] }

let length t = Bigarray.Array1.size_in_bytes t.buffer

Expand All @@ -244,15 +243,12 @@ module Sketch = struct
t.off <- t.off + alloc_len;
(t.buffer, off, alloc_len)

let _cstruct_of_ptr ((buf, off, len) : ptr) =
Cstruct.of_bigarray buf ~off ~len

let release t =
t.off <- 0;
t.old_buffers <- []

module Iovec = struct
external set : ptr -> Cstruct.t list -> unit = "ocaml_uring_set_iovec" [@@noalloc]
external set : ptr -> Bstruct.t list -> unit = "ocaml_uring_set_iovec" [@@noalloc]

let sizeof = Config.sizeof_iovec

Expand All @@ -275,7 +271,7 @@ end
(* Used for the sendmsg/recvmsg calls. Liburing doesn't support sendto/recvfrom at the time of writing. *)
module Msghdr = struct
type msghdr
type t = msghdr * Sockaddr.t option * Cstruct.t list (* `Cstruct.t list` is here only for preventing it being GCed *)
type t = msghdr * Sockaddr.t option * Bstruct.t list (* `bytes list` is here only for preventing it being GCed *)
external make_msghdr : int -> Unix.file_descr list -> Sockaddr.t option -> msghdr = "ocaml_uring_make_msghdr"
external get_msghdr_fds : msghdr -> Unix.file_descr list = "ocaml_uring_get_msghdr_fds"

Expand Down Expand Up @@ -303,7 +299,7 @@ module Uring = struct
external exit : t -> unit = "ocaml_uring_exit"

external unregister_buffers : t -> unit = "ocaml_uring_unregister_buffers"
external register_bigarray : t -> Cstruct.buffer -> unit = "ocaml_uring_register_ba"
external register_bytes : t -> bytes -> unit = "ocaml_uring_register_bytes"
external submit : t -> int = "ocaml_uring_submit"
external sq_ready : t -> int = "ocaml_uring_sq_ready" [@@noalloc]

Expand All @@ -316,12 +312,12 @@ module Uring = struct
external submit_nop : t -> id -> bool = "ocaml_uring_submit_nop" [@@noalloc]
external submit_timeout : t -> id -> Sketch.ptr -> clock -> bool -> bool = "ocaml_uring_submit_timeout" [@@noalloc]
external submit_poll_add : t -> Unix.file_descr -> id -> Poll_mask.t -> bool = "ocaml_uring_submit_poll_add" [@@noalloc]
external submit_read : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> bool = "ocaml_uring_submit_read" [@@noalloc]
external submit_write : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> bool = "ocaml_uring_submit_write" [@@noalloc]
external submit_read : t -> Unix.file_descr -> id -> Bstruct.t -> offset -> bool = "ocaml_uring_submit_read" [@@noalloc]
external submit_write : t -> Unix.file_descr -> id -> Bstruct.t -> offset -> bool = "ocaml_uring_submit_write" [@@noalloc]
external submit_readv : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_readv" [@@noalloc]
external submit_writev : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_writev" [@@noalloc]
external submit_readv_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> bool = "ocaml_uring_submit_readv_fixed_byte" "ocaml_uring_submit_readv_fixed_native" [@@noalloc]
external submit_writev_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> bool = "ocaml_uring_submit_writev_fixed_byte" "ocaml_uring_submit_writev_fixed_native" [@@noalloc]
external submit_readv_fixed : t -> Unix.file_descr -> id -> bytes -> int -> int -> offset -> bool = "ocaml_uring_submit_readv_fixed_byte" "ocaml_uring_submit_readv_fixed_native" [@@noalloc]
external submit_writev_fixed : t -> Unix.file_descr -> id -> bytes -> int -> int -> offset -> bool = "ocaml_uring_submit_writev_fixed_byte" "ocaml_uring_submit_writev_fixed_native" [@@noalloc]
external submit_close : t -> Unix.file_descr -> id -> bool = "ocaml_uring_submit_close" [@@noalloc]
external submit_statx : t -> id -> Unix.file_descr -> Statx.t -> Sketch.ptr -> int -> int -> bool = "ocaml_uring_submit_statx_byte" "ocaml_uring_submit_statx_native" [@@noalloc]
external submit_splice : t -> id -> Unix.file_descr -> Unix.file_descr -> int -> bool = "ocaml_uring_submit_splice" [@@noalloc]
Expand All @@ -348,7 +344,7 @@ end
type 'a t = {
id : < >;
uring: Uring.t;
mutable fixed_iobuf: Cstruct.buffer;
mutable fixed_iobuf: bytes;
data : 'a Heap.t;
sketch : Sketch.t;
queue_depth: int;
Expand Down Expand Up @@ -394,7 +390,7 @@ let create ?polling_timeout ~queue_depth () =
let uring = Uring.create queue_depth polling_timeout in
let data = Heap.create queue_depth in
let id = object end in
let fixed_iobuf = Cstruct.empty.buffer in
let fixed_iobuf = Bytes.empty in
let sketch = Sketch.create () in
let t = { id; uring; sketch; fixed_iobuf; data; queue_depth } in
register_gc_root t;
Expand All @@ -412,11 +408,11 @@ let ensure_idle t op =

let set_fixed_buffer t iobuf =
ensure_idle t "set_fixed_buffer";
if Bigarray.Array1.dim t.fixed_iobuf > 0 then
if Bytes.length t.fixed_iobuf > 0 then
Uring.unregister_buffers t.uring;
t.fixed_iobuf <- iobuf;
if Bigarray.Array1.dim iobuf > 0 then (
match Uring.register_bigarray t.uring iobuf with
if Bytes.length iobuf > 0 then (
match Uring.register_bytes t.uring iobuf with
| () -> Ok ()
| exception Unix.Unix_error(Unix.ENOMEM, "io_uring_register_buffers", "") -> Error `ENOMEM
) else Ok ()
Expand Down Expand Up @@ -470,10 +466,10 @@ let unlink t ~dir ?(fd=at_fdcwd) path user_data =
Uring.submit_unlinkat t.uring id fd buf dir
) user_data

let read t ~file_offset fd (buf : Cstruct.t) user_data =
let read t ~file_offset fd (buf : Bstruct.t) user_data =
with_id_full t (fun id -> Uring.submit_read t.uring fd id buf file_offset) user_data ~extra_data:buf

let write t ~file_offset fd (buf : Cstruct.t) user_data =
let write t ~file_offset fd (buf : Bstruct.t) user_data =
with_id_full t (fun id -> Uring.submit_write t.uring fd id buf file_offset) user_data ~extra_data:buf

let iov_max = Config.iov_max
Expand All @@ -487,15 +483,19 @@ let read_fixed t ~file_offset fd ~off ~len user_data =
with_id t (fun id -> Uring.submit_readv_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data

let read_chunk ?len t ~file_offset fd chunk user_data =
let { Cstruct.buffer; off; len } = Region.to_cstruct ?len chunk in
(* TODO: Not the right offset? *)
let buffer, off = Region.to_bytes ?len chunk, 0 in
let len = Bytes.length buffer in
if buffer != t.fixed_iobuf then invalid_arg "Chunk does not belong to ring!";
with_id t (fun id -> Uring.submit_readv_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data

let write_fixed t ~file_offset fd ~off ~len user_data =
with_id t (fun id -> Uring.submit_writev_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data

let write_chunk ?len t ~file_offset fd chunk user_data =
let { Cstruct.buffer; off; len } = Region.to_cstruct ?len chunk in
(* TODO: Not the right offset? *)
let buffer, off = Region.to_bytes ?len chunk, 0 in
let len = Bytes.length buffer in
if buffer != t.fixed_iobuf then invalid_arg "Chunk does not belong to ring!";
with_id t (fun id -> Uring.submit_writev_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data

Expand Down Expand Up @@ -636,3 +636,31 @@ let get_debug_stats t =
sketch_buffer_size = Bigarray.Array1.dim t.sketch.buffer;
sketch_old_buffers = List.length t.sketch.old_buffers;
}

module Bytes = struct
let rec skip_empty = function
| t :: ts when Bytes.length t = 0 -> skip_empty ts
| x -> x

(* We can't do sub views on raw bytes so
it is a bit wasteful as we just drop half
empty byte arrays :S *)
let rec shiftv ts v = match v with
| 0 -> skip_empty ts
| n ->
match ts with
| [] -> failwith "Error"
| t :: ts ->
let len = Bytes.length t in
if n >= len then shiftv ts (n - len) else ts


let of_bigarray ~off ~len ba =
let ba_len = Bigarray.Array1.dim ba in
assert (off >= 0 && off + len < ba_len);
let b = Bytes.create len in
for i = 0 to len do
Bytes.unsafe_set b i (Bigarray.Array1.unsafe_get ba (off + i))
done;
b
end
Loading