From 18d36a3f27c117feeb1b611b37db29949f29ae98 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Sat, 19 Aug 2023 23:48:55 +0100 Subject: [PATCH 1/3] Cstruct.t -> bytes --- README.md | 4 +- bench/readv.ml | 2 +- lib/uring/dune | 2 +- lib/uring/region.ml | 12 ++---- lib/uring/region.mli | 10 +---- lib/uring/uring.ml | 81 +++++++++++++++++++++++------------------ lib/uring/uring.mli | 20 ++++++---- lib/uring/uring_stubs.c | 36 +++++++----------- tests/dune | 2 +- tests/sketch.md | 4 +- tests/urcat.ml | 11 +++--- tests/urcp_fixed_lib.ml | 2 +- tests/urcp_lib.ml | 10 ++--- 13 files changed, 96 insertions(+), 100 deletions(-) diff --git a/README.md b/README.md index 2888173c..ddb4a1a3 100644 --- a/README.md +++ b/README.md @@ -100,12 +100,12 @@ let rec write_all fd = function let result, data = wait_with_retry uring in assert (data = `Write_all); (* There aren't any other requests pending *) assert (result > 0); (* Check for error return *) - let bufs = Cstruct.shiftv bufs result in + let bufs = Uring.Bytes.shiftv bufs result in write_all fd bufs ``` ```ocaml -# write_all fd Cstruct.[of_string "INFO: "; of_string "A log message"];; +# write_all fd Bytes.[of_string "INFO: "; of_string "A log message"];; - : unit = () ``` diff --git a/bench/readv.ml b/bench/readv.ml index 8f977c7d..ba691053 100644 --- a/bench/readv.ml +++ b/bench/readv.ml @@ -20,7 +20,7 @@ let run_bechmark ~polling_timeout fd = let t = Uring.create ?polling_timeout ~queue_depth:(n_concurrent * 2) () in (* We start by submitting [n_concurrent] reads. *) for _ = 1 to n_concurrent do - let buf = Cstruct.create buffer_size in + let buf = Bytes.create buffer_size in let _job : _ Uring.job = Uring.readv t fd [buf] ~file_offset:Optint.Int63.zero [buf] |> Option.get in () done; diff --git a/lib/uring/dune b/lib/uring/dune index c55e49e2..c809723c 100644 --- a/lib/uring/dune +++ b/lib/uring/dune @@ -2,7 +2,7 @@ (name uring) (public_name uring) (foreign_archives uring) - (libraries cstruct fmt optint unix) + (libraries fmt optint unix) (foreign_stubs (language c) (names uring_stubs) diff --git a/lib/uring/region.ml b/lib/uring/region.ml index b738cfa0..1cc0d953 100644 --- a/lib/uring/region.ml +++ b/lib/uring/region.ml @@ -3,7 +3,7 @@ (* TODO turn into a variable length slab allocator *) type t = { - buf: Cstruct.buffer; + buf: bytes; block_size: int; freelist: int Queue.t; } @@ -37,14 +37,10 @@ let length_option t = function else len -let to_cstruct ?len (t, chunk) = - Cstruct.of_bigarray ~off:chunk ~len:(length_option t len) t.buf +let to_bytes ?len (t, chunk) = + Bytes.sub t.buf chunk (length_option t len) -let to_bigstring ?len (t, chunk) = - Bigarray.Array1.sub t.buf chunk (length_option t len) - -let to_string ?len (t, chunk) = - Cstruct.to_string (to_cstruct ?len (t, chunk)) +let to_string ?len (t, chunk) = Bytes.to_string (to_bytes ?len (t, chunk)) let avail {freelist;_} = Queue.length freelist diff --git a/lib/uring/region.mli b/lib/uring/region.mli index 9cbe1637..1c6e7804 100644 --- a/lib/uring/region.mli +++ b/lib/uring/region.mli @@ -17,7 +17,7 @@ type t (** [No_space] is raised when an allocation request cannot be satisfied. *) - val init: block_size:int -> Cstruct.buffer -> int -> t + val init: block_size:int -> bytes -> int -> t (** [init ~block_size buf slots] initialises a region from the buffer [buf] with total size of [block_size * slots]. *) @@ -37,18 +37,12 @@ type t offset in its associated region. This can be used in IO calls involving that memory. *) - val to_cstruct : ?len:int -> chunk -> Cstruct.t + val to_bytes : ?len:int -> chunk -> bytes (** [to_cstruct chunk] is a cstruct of [chunk]'s slice of the region. Note that this is a zero-copy view into the underlying region [t] and so [chunk] should not be freed until this cstruct is no longer used. @param len Use only the first [len] bytes of [chunk]. *) - val to_bigstring : ?len:int -> chunk -> Cstruct.buffer - (** [to_bigstring] is like {!to_cstruct}, but creates a {!Bigarray}. - Note that this is a zero-copy view into the underlying region [t] - and so [chunk] should not be freed until this Bigarray reference is no longer used. - @param len Use only the first [len] bytes of [chunk]. *) - val to_string : ?len:int -> chunk -> string (** [to_string ?len chunk] will return a copy of [chunk] as an OCaml string. @param len Use only the first [len] bytes of [chunk]. *) diff --git a/lib/uring/uring.ml b/lib/uring/uring.ml index 3436711a..c048b310 100644 --- a/lib/uring/uring.ml +++ b/lib/uring/uring.ml @@ -191,16 +191,6 @@ end module Op = Config.Op -(* The C stubs rely on the layout of Cstruct.t, so we just check here that it hasn't changed. *) -module Check_cstruct : sig - [@@@warning "-34"] - type t = private { - buffer: (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t; - off : int; - len : int; - } -end = Cstruct - (* * A Sketch buffer is an area used to hold objects that remain alive * until the next `Uring.submit`. @@ -210,19 +200,19 @@ end = Cstruct *) module Sketch = struct type t = { - mutable buffer : Cstruct.buffer; + mutable buffer : bytes; mutable off : int; - mutable old_buffers : Cstruct.buffer list; + mutable old_buffers : bytes list; } - type ptr = Cstruct.buffer * int * int + type ptr = bytes * int * int - let create_buffer len = Bigarray.(Array1.create char c_layout len) + let create_buffer len = Bytes.create len let create () = - { buffer = Cstruct.empty.buffer; off = 0; old_buffers = [] } + { buffer = Bytes.empty; off = 0; old_buffers = [] } - let length t = Bigarray.Array1.size_in_bytes t.buffer + let length t = Bytes.length t.buffer let round a x = (x + (a - 1)) land (lnot (a - 1)) let round = round (Sys.word_size / 8) @@ -244,15 +234,12 @@ module Sketch = struct t.off <- t.off + alloc_len; (t.buffer, off, alloc_len) - let _cstruct_of_ptr ((buf, off, len) : ptr) = - Cstruct.of_bigarray buf ~off ~len - let release t = t.off <- 0; t.old_buffers <- [] module Iovec = struct - external set : ptr -> Cstruct.t list -> unit = "ocaml_uring_set_iovec" [@@noalloc] + external set : ptr -> bytes list -> unit = "ocaml_uring_set_iovec" [@@noalloc] let sizeof = Config.sizeof_iovec @@ -275,7 +262,7 @@ end (* Used for the sendmsg/recvmsg calls. Liburing doesn't support sendto/recvfrom at the time of writing. *) module Msghdr = struct type msghdr - type t = msghdr * Sockaddr.t option * Cstruct.t list (* `Cstruct.t list` is here only for preventing it being GCed *) + type t = msghdr * Sockaddr.t option * bytes list (* `bytes list` is here only for preventing it being GCed *) external make_msghdr : int -> Unix.file_descr list -> Sockaddr.t option -> msghdr = "ocaml_uring_make_msghdr" external get_msghdr_fds : msghdr -> Unix.file_descr list = "ocaml_uring_get_msghdr_fds" @@ -303,7 +290,7 @@ module Uring = struct external exit : t -> unit = "ocaml_uring_exit" external unregister_buffers : t -> unit = "ocaml_uring_unregister_buffers" - external register_bigarray : t -> Cstruct.buffer -> unit = "ocaml_uring_register_ba" + external register_bytes : t -> bytes -> unit = "ocaml_uring_register_bytes" external submit : t -> int = "ocaml_uring_submit" external sq_ready : t -> int = "ocaml_uring_sq_ready" [@@noalloc] @@ -316,12 +303,12 @@ module Uring = struct external submit_nop : t -> id -> bool = "ocaml_uring_submit_nop" [@@noalloc] external submit_timeout : t -> id -> Sketch.ptr -> clock -> bool -> bool = "ocaml_uring_submit_timeout" [@@noalloc] external submit_poll_add : t -> Unix.file_descr -> id -> Poll_mask.t -> bool = "ocaml_uring_submit_poll_add" [@@noalloc] - external submit_read : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> bool = "ocaml_uring_submit_read" [@@noalloc] - external submit_write : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> bool = "ocaml_uring_submit_write" [@@noalloc] + external submit_read : t -> Unix.file_descr -> id -> bytes -> offset -> bool = "ocaml_uring_submit_read" [@@noalloc] + external submit_write : t -> Unix.file_descr -> id -> bytes -> offset -> bool = "ocaml_uring_submit_write" [@@noalloc] external submit_readv : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_readv" [@@noalloc] external submit_writev : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_writev" [@@noalloc] - external submit_readv_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> bool = "ocaml_uring_submit_readv_fixed_byte" "ocaml_uring_submit_readv_fixed_native" [@@noalloc] - external submit_writev_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> bool = "ocaml_uring_submit_writev_fixed_byte" "ocaml_uring_submit_writev_fixed_native" [@@noalloc] + external submit_readv_fixed : t -> Unix.file_descr -> id -> bytes -> int -> int -> offset -> bool = "ocaml_uring_submit_readv_fixed_byte" "ocaml_uring_submit_readv_fixed_native" [@@noalloc] + external submit_writev_fixed : t -> Unix.file_descr -> id -> bytes -> int -> int -> offset -> bool = "ocaml_uring_submit_writev_fixed_byte" "ocaml_uring_submit_writev_fixed_native" [@@noalloc] external submit_close : t -> Unix.file_descr -> id -> bool = "ocaml_uring_submit_close" [@@noalloc] external submit_statx : t -> id -> Unix.file_descr -> Statx.t -> Sketch.ptr -> int -> int -> bool = "ocaml_uring_submit_statx_byte" "ocaml_uring_submit_statx_native" [@@noalloc] external submit_splice : t -> id -> Unix.file_descr -> Unix.file_descr -> int -> bool = "ocaml_uring_submit_splice" [@@noalloc] @@ -348,7 +335,7 @@ end type 'a t = { id : < >; uring: Uring.t; - mutable fixed_iobuf: Cstruct.buffer; + mutable fixed_iobuf: bytes; data : 'a Heap.t; sketch : Sketch.t; queue_depth: int; @@ -394,7 +381,7 @@ let create ?polling_timeout ~queue_depth () = let uring = Uring.create queue_depth polling_timeout in let data = Heap.create queue_depth in let id = object end in - let fixed_iobuf = Cstruct.empty.buffer in + let fixed_iobuf = Bytes.empty in let sketch = Sketch.create () in let t = { id; uring; sketch; fixed_iobuf; data; queue_depth } in register_gc_root t; @@ -412,11 +399,11 @@ let ensure_idle t op = let set_fixed_buffer t iobuf = ensure_idle t "set_fixed_buffer"; - if Bigarray.Array1.dim t.fixed_iobuf > 0 then + if Bytes.length t.fixed_iobuf > 0 then Uring.unregister_buffers t.uring; t.fixed_iobuf <- iobuf; - if Bigarray.Array1.dim iobuf > 0 then ( - match Uring.register_bigarray t.uring iobuf with + if Bytes.length iobuf > 0 then ( + match Uring.register_bytes t.uring iobuf with | () -> Ok () | exception Unix.Unix_error(Unix.ENOMEM, "io_uring_register_buffers", "") -> Error `ENOMEM ) else Ok () @@ -470,10 +457,10 @@ let unlink t ~dir ?(fd=at_fdcwd) path user_data = Uring.submit_unlinkat t.uring id fd buf dir ) user_data -let read t ~file_offset fd (buf : Cstruct.t) user_data = +let read t ~file_offset fd (buf : bytes) user_data = with_id_full t (fun id -> Uring.submit_read t.uring fd id buf file_offset) user_data ~extra_data:buf -let write t ~file_offset fd (buf : Cstruct.t) user_data = +let write t ~file_offset fd (buf : bytes) user_data = with_id_full t (fun id -> Uring.submit_write t.uring fd id buf file_offset) user_data ~extra_data:buf let iov_max = Config.iov_max @@ -487,7 +474,9 @@ let read_fixed t ~file_offset fd ~off ~len user_data = with_id t (fun id -> Uring.submit_readv_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data let read_chunk ?len t ~file_offset fd chunk user_data = - let { Cstruct.buffer; off; len } = Region.to_cstruct ?len chunk in + (* TODO: Not the right offset? *) + let buffer, off = Region.to_bytes ?len chunk, 0 in + let len = Bytes.length buffer in if buffer != t.fixed_iobuf then invalid_arg "Chunk does not belong to ring!"; with_id t (fun id -> Uring.submit_readv_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data @@ -495,7 +484,9 @@ let write_fixed t ~file_offset fd ~off ~len user_data = with_id t (fun id -> Uring.submit_writev_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data let write_chunk ?len t ~file_offset fd chunk user_data = - let { Cstruct.buffer; off; len } = Region.to_cstruct ?len chunk in + (* TODO: Not the right offset? *) + let buffer, off = Region.to_bytes ?len chunk, 0 in + let len = Bytes.length buffer in if buffer != t.fixed_iobuf then invalid_arg "Chunk does not belong to ring!"; with_id t (fun id -> Uring.submit_writev_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data @@ -633,6 +624,24 @@ let get_debug_stats t = sqe_ready = Uring.sq_ready t.uring; active_ops = active_ops t; sketch_used = t.sketch.off; - sketch_buffer_size = Bigarray.Array1.dim t.sketch.buffer; + sketch_buffer_size = Bytes.length t.sketch.buffer; sketch_old_buffers = List.length t.sketch.old_buffers; } + +module Bytes = struct + let rec skip_empty = function + | t :: ts when Bytes.length t = 0 -> skip_empty ts + | x -> x + + (* We can't do sub views on raw bytes so + it is a bit wasteful as we just drop half + empty byte arrays :S *) + let rec shiftv ts = function + | 0 -> skip_empty ts + | n -> + match ts with + | [] -> failwith "Error" + | t :: ts -> + let len = Bytes.length t in + if n >= len then shiftv ts (n - len) else ts +end \ No newline at end of file diff --git a/lib/uring/uring.mli b/lib/uring/uring.mli index 7cc1c4ea..9f50de10 100644 --- a/lib/uring/uring.mli +++ b/lib/uring/uring.mli @@ -44,7 +44,7 @@ val exit : 'a t -> unit for the "fixed buffer" mode of io_uring to avoid data copying between userspace and the kernel. *) -val set_fixed_buffer : 'a t -> Cstruct.buffer -> (unit, [> `ENOMEM]) result +val set_fixed_buffer : 'a t -> bytes -> (unit, [> `ENOMEM]) result (** [set_fixed_buffer t buf] sets [buf] as the fixed buffer for [t]. You will normally want to wrap this with {!Region.alloc} or similar @@ -57,7 +57,7 @@ val set_fixed_buffer : 'a t -> Cstruct.buffer -> (unit, [> `ENOMEM]) result @raise Invalid_argument if there are any requests in progress *) -val buf : 'a t -> Cstruct.buffer +val buf : 'a t -> bytes (** [buf t] is the fixed internal memory buffer associated with uring [t] using {!set_fixed_buffer}, or a zero-length buffer if none is set. *) @@ -169,13 +169,13 @@ type offset := Optint.Int63.t (** For files, give the absolute offset, or use [Optint.Int63.minus_one] for the current position. For sockets, use an offset of [Optint.Int63.zero] ([minus_one] is not allowed here). *) -val read : 'a t -> file_offset:offset -> Unix.file_descr -> Cstruct.t -> 'a -> 'a job option +val read : 'a t -> file_offset:offset -> Unix.file_descr -> bytes -> 'a -> 'a job option (** [read t ~file_offset fd buf d] will submit a [read(2)] request to uring [t]. It reads from absolute [file_offset] on the [fd] file descriptor and writes the results into the memory pointed to by [buf]. The user data [d] will be returned by {!wait} or {!peek} upon completion. *) -val write : 'a t -> file_offset:offset -> Unix.file_descr -> Cstruct.t -> 'a -> 'a job option +val write : 'a t -> file_offset:offset -> Unix.file_descr -> bytes -> 'a -> 'a job option (** [write t ~file_offset fd buf d] will submit a [write(2)] request to uring [t]. It writes to absolute [file_offset] on the [fd] file descriptor from the the memory pointed to by [buf]. The user data [d] will be returned by @@ -184,7 +184,7 @@ val write : 'a t -> file_offset:offset -> Unix.file_descr -> Cstruct.t -> 'a -> val iov_max : int (** The maximum length of the list that can be passed to [readv] and similar. *) -val readv : 'a t -> file_offset:offset -> Unix.file_descr -> Cstruct.t list -> 'a -> 'a job option +val readv : 'a t -> file_offset:offset -> Unix.file_descr -> bytes list -> 'a -> 'a job option (** [readv t ~file_offset fd iov d] will submit a [readv(2)] request to uring [t]. It reads from absolute [file_offset] on the [fd] file descriptor and writes the results into the memory pointed to by [iov]. The user data [d] will @@ -192,7 +192,7 @@ val readv : 'a t -> file_offset:offset -> Unix.file_descr -> Cstruct.t list -> ' Requires [List.length iov <= Uring.iov_max] *) -val writev : 'a t -> file_offset:offset -> Unix.file_descr -> Cstruct.t list -> 'a -> 'a job option +val writev : 'a t -> file_offset:offset -> Unix.file_descr -> bytes list -> 'a -> 'a job option (** [writev t ~file_offset fd iov d] will submit a [writev(2)] request to uring [t]. It writes to absolute [file_offset] on the [fd] file descriptor from the the memory pointed to by [iov]. The user data [d] will be returned by @@ -377,7 +377,7 @@ val cancel : 'a t -> 'a job -> 'a -> 'a job option module Msghdr : sig type t - val create : ?n_fds:int -> ?addr:Sockaddr.t -> Cstruct.t list -> t + val create : ?n_fds:int -> ?addr:Sockaddr.t -> bytes list -> t (** [create buffs] makes a new [msghdr] using the [buffs] for the underlying [iovec]. @@ -390,7 +390,7 @@ module Msghdr : sig val get_fds : t -> Unix.file_descr list end -val send_msg : ?fds:Unix.file_descr list -> ?dst:Unix.sockaddr -> 'a t -> Unix.file_descr -> Cstruct.t list -> 'a -> 'a job option +val send_msg : ?fds:Unix.file_descr list -> ?dst:Unix.sockaddr -> 'a t -> Unix.file_descr -> bytes list -> 'a -> 'a job option (** [send_msg t fd buffs d] will submit a [sendmsg(2)] request. The [Msghdr] will be constructed from the FDs ([fds]), address ([dst]) and buffers ([buffs]). @@ -466,3 +466,7 @@ val get_debug_stats : _ t -> Stats.t module Private : sig module Heap = Heap end + +module Bytes : sig + val shiftv : bytes list -> int -> bytes list +end diff --git a/lib/uring/uring_stubs.c b/lib/uring/uring_stubs.c index 4236a787..d8c7fae3 100644 --- a/lib/uring/uring_stubs.c +++ b/lib/uring/uring_stubs.c @@ -59,7 +59,7 @@ #endif #define Ring_val(v) *((struct io_uring**)Data_custom_val(v)) -#define Sketch_ptr_val(vsp) (Caml_ba_data_val(Field(vsp, 0)) + Long_val(Field(vsp, 1))) +#define Sketch_ptr_val(vsp) ((void *)(Bytes_val(Field(vsp, 0)) + Long_val(Field(vsp, 1)))) #define Sketch_ptr_len_val(vsp) Long_val(Field(vsp, 2)) // Note that this does not free the ring data. You must not allow this to be @@ -104,12 +104,12 @@ value ocaml_uring_setup(value entries, value polling_timeout) { } // Note that the ring must be idle when calling this. -value ocaml_uring_register_ba(value v_uring, value v_ba) { +value ocaml_uring_register_bytes(value v_uring, value v_ba) { CAMLparam2(v_uring, v_ba); struct io_uring *ring = Ring_val(v_uring); struct iovec iov[1]; - iov[0].iov_base = Caml_ba_data_val(v_ba); - iov[0].iov_len = Caml_ba_array_val(v_ba)->dim[0]; + iov[0].iov_base = Bytes_val(v_ba); + iov[0].iov_len = caml_string_length(v_ba); dprintf("uring %p: registering iobuf base %p len %lu\n", ring, iov[0].iov_base, iov[0].iov_len); int ret = io_uring_register_buffers(ring, iov, 1); if (ret) @@ -280,11 +280,8 @@ ocaml_uring_set_iovec(value v_sketch_ptr, value v_csl) v_aux != Val_emptylist; v_aux = Field(v_aux, 1), i++) { value v_cs = Field(v_aux, 0); - value v_ba = Field(v_cs, 0); - value v_off = Field(v_cs, 1); - value v_len = Field(v_cs, 2); - iovs[i].iov_base = Caml_ba_data_val(v_ba) + Long_val(v_off); - iovs[i].iov_len = Long_val(v_len); + iovs[i].iov_base = Bytes_val(v_cs); + iovs[i].iov_len = caml_string_length(v_cs); } } @@ -295,7 +292,6 @@ ocaml_uring_submit_readv(value v_uring, value v_fd, value v_id, value v_sketch_p struct io_uring_sqe *sqe = io_uring_get_sqe(ring); struct iovec *iovs = Sketch_ptr_val(v_sketch_ptr); size_t len = Sketch_ptr_len_val(v_sketch_ptr) / sizeof(*iovs); - if (sqe == NULL) return (Val_false); dprintf("submit_readv: %d ents len[0] %lu off %d\n", len, iovs[0].iov_len, Int63_val(v_fileoff)); @@ -325,7 +321,7 @@ value /* noalloc */ ocaml_uring_submit_readv_fixed_native(value v_uring, value v_fd, value v_id, value v_ba, value v_off, value v_len, value v_fileoff) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - void *buf = Caml_ba_data_val(v_ba) + Long_val(v_off); + void *buf = Bytes_val(v_ba) + Long_val(v_off); if (!sqe) return (Val_false); dprintf("submit_readv_fixed: buf %p off %d len %d fileoff %d", buf, Int_val(v_off), Int_val(v_len), Int63_val(v_fileoff)); io_uring_prep_read_fixed(sqe, Int_val(v_fd), buf, Int_val(v_len), Int63_val(v_fileoff), 0); @@ -350,7 +346,7 @@ value /* noalloc */ ocaml_uring_submit_writev_fixed_native(value v_uring, value v_fd, value v_id, value v_ba, value v_off, value v_len, value v_fileoff) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - void *buf = Caml_ba_data_val(v_ba) + Long_val(v_off); + void *buf = Bytes_val(v_ba) + Long_val(v_off); if (!sqe) return (Val_false); dprintf("submit_writev_fixed: buf %p off %d len %d fileoff %d", buf, Int_val(v_off), Int_val(v_len), Int63_val(v_fileoff)); @@ -372,13 +368,11 @@ ocaml_uring_submit_writev_fixed_byte(value* values, int argc) { } value /* noalloc */ -ocaml_uring_submit_read(value v_uring, value v_fd, value v_id, value v_cstruct, value v_fileoff) { +ocaml_uring_submit_read(value v_uring, value v_fd, value v_id, value v_bytes, value v_fileoff) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - value v_ba = Field(v_cstruct, 0); - value v_off = Field(v_cstruct, 1); - value v_len = Field(v_cstruct, 2); - void *buf = Caml_ba_data_val(v_ba) + Long_val(v_off); + value v_len = Val_long(caml_string_length(v_bytes)); + void *buf = Bytes_val(v_bytes); if (!sqe) return (Val_false); dprintf("submit_read: fd %d buff %p len %zd fileoff %d\n", Int_val(v_fd), buf, Long_val(v_len), Int63_val(v_fileoff)); @@ -388,13 +382,11 @@ ocaml_uring_submit_read(value v_uring, value v_fd, value v_id, value v_cstruct, } value /* noalloc */ -ocaml_uring_submit_write(value v_uring, value v_fd, value v_id, value v_cstruct, value v_fileoff) { +ocaml_uring_submit_write(value v_uring, value v_fd, value v_id, value v_bytes, value v_fileoff) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - value v_ba = Field(v_cstruct, 0); - value v_off = Field(v_cstruct, 1); - value v_len = Field(v_cstruct, 2); - void *buf = Caml_ba_data_val(v_ba) + Long_val(v_off); + value v_len = Val_int(caml_string_length(v_bytes)); + void *buf = Bytes_val(v_bytes); if (!sqe) return (Val_false); dprintf("submit_write: fd %d buff %p len %zd fileoff %d\n", Int_val(v_fd), buf, Long_val(v_len), Int63_val(v_fileoff)); diff --git a/tests/dune b/tests/dune index 449edf9c..9a68074e 100644 --- a/tests/dune +++ b/tests/dune @@ -16,7 +16,7 @@ (executable (name urcat) (modules urcat) - (libraries unix uring)) + (libraries unix compiler-libs.optcomp uring)) (executable (name urcp) diff --git a/tests/sketch.md b/tests/sketch.md index 8fcc5a9f..815ba51b 100644 --- a/tests/sketch.md +++ b/tests/sketch.md @@ -20,8 +20,8 @@ let rec consume t = val t : unit Uring.t = # let fd = Unix.openfile "/dev/zero" [ O_RDONLY ] 0;; val fd : Unix.file_descr = -# let b = Cstruct.create 1;; -val b : Cstruct.t = {Cstruct.buffer = ; off = 0; len = 1} +# let b = Bytes.create 1;; +val b : bytes = Bytes.of_string "\000" # Uring.readv t fd (ldup 1 b) () ~file_offset:Int63.zero;; - : unit Uring.job option = Some diff --git a/tests/urcat.ml b/tests/urcat.ml index b114f5ca..fdf214b7 100644 --- a/tests/urcat.ml +++ b/tests/urcat.ml @@ -1,7 +1,7 @@ (* cat(1) built with liburing. OCaml version of https://unixism.net/loti/tutorial/cat_liburing.html *) -let block_size = 1024 +let block_size = Config.max_young_wosize * Sys.word_size let get_file_size fd = Unix.handle_unix_error Unix.fstat fd |> @@ -17,13 +17,13 @@ let get_completion_and_print uring = let remaining = ref len in Printf.eprintf "%d bytes read\n%!" len; List.iter (fun buf -> - let buflen = Cstruct.length buf in + let buflen = Bytes.length buf in if !remaining > 0 then begin if buflen <= !remaining then begin - print_string (Cstruct.to_string buf); + print_string (Bytes.to_string buf); remaining := !remaining - buflen; end else begin - print_string (Cstruct.to_string ~off:0 ~len:!remaining buf); + print_string (Bytes.sub_string buf 0 !remaining); remaining := 0; end end @@ -33,8 +33,9 @@ let submit_read_request fname uring = let fd = Unix.(handle_unix_error (openfile fname [O_RDONLY]) 0) in let file_sz = get_file_size fd in let blocks = if file_sz mod block_size <> 0 then (file_sz / block_size)+1 else file_sz/block_size in - let iov = List.init blocks (fun _ -> Cstruct.create block_size) in + let iov = List.init blocks (fun _ -> Bytes.create block_size) in let _ = Uring.readv uring fd iov iov ~file_offset:Optint.Int63.zero in + Gc.full_major (); (* <--- KABOOM! ... well not really, just the bytes are empty... did something move? *) let numreq = Uring.submit uring in assert(numreq=1); () diff --git a/tests/urcp_fixed_lib.ml b/tests/urcp_fixed_lib.ml index 83d93c6f..05ecd305 100644 --- a/tests/urcp_fixed_lib.ml +++ b/tests/urcp_fixed_lib.ml @@ -159,7 +159,7 @@ let run_cp block_size queue_depth infile outfile () = Logs.debug (fun l -> l "starting: %a bs=%d qd=%d" pp t block_size queue_depth); let fixed_buf_len = queue_depth * block_size in let uring = Uring.create ~queue_depth () in - let fbuf = Bigarray.(Array1.create char c_layout fixed_buf_len) in + let fbuf = Bytes.create fixed_buf_len in Fun.protect (fun () -> match Uring.set_fixed_buffer uring fbuf with diff --git a/tests/urcp_lib.ml b/tests/urcp_lib.ml index b1d194fe..74b48f09 100644 --- a/tests/urcp_lib.ml +++ b/tests/urcp_lib.ml @@ -26,8 +26,8 @@ let pp ppf {insize;offset;reads;writes;write_left; read_left;_} = insize Int63.pp offset reads writes read_left write_left type iovec = { - all : Cstruct.t list; - mutable next : Cstruct.t list; + all : bytes list; + mutable next : bytes list; } type req = { @@ -44,7 +44,7 @@ let pp_req ppf {op; len; off; fileoff; t; _ } = (* Perform a complete read into bufs. *) let queue_read uring t len = - let all = [Cstruct.create len] in + let all = [Bytes.create len] in let iov = { all; next = all } in let req = { op=`R; iov; fileoff=t.offset; len; off=0; t } in Logs.debug (fun l -> l "queue_read: %a" pp_req req); @@ -75,7 +75,7 @@ let handle_read_completion uring req res = raise (Failure ("unix errorno " ^ (string_of_int n))) | n when n < bytes_to_read -> (* handle short read so new iovec and resubmit *) - req.iov.next <- Cstruct.shiftv req.iov.next n; + req.iov.next <- Uring.Bytes.shiftv req.iov.next n; req.off <- req.off + n; let r = Uring.readv ~file_offset:(Int63.of_int req.off) uring req.t.infd req.iov.next req in assert(r <> None); @@ -104,7 +104,7 @@ let handle_write_completion uring req res = Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req); | n when n < bytes_to_write -> (* handle short write so new iovec and resubmit *) - req.iov.next <- Cstruct.shiftv req.iov.next n; + req.iov.next <- Uring.Bytes.shiftv req.iov.next n; req.off <- req.off + n; let r = Uring.writev ~file_offset:req.fileoff uring req.t.infd req.iov.next req in assert(r <> None); From 728a3baffcf29b5f1c566b088b481d726e463a9c Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Sun, 20 Aug 2023 14:20:20 +0100 Subject: [PATCH 2/3] Working with BIG bytes --- lib/uring/dune | 4 +- lib/uring/include/discover.ml | 2 +- lib/uring/uring.ml | 46 ++- lib/uring/uring.mli | 9 +- lib/uring/uring_stubs.c | 30 +- tests/main.md | 518 +++------------------------------- 6 files changed, 104 insertions(+), 505 deletions(-) diff --git a/lib/uring/dune b/lib/uring/dune index c809723c..c6139f59 100644 --- a/lib/uring/dune +++ b/lib/uring/dune @@ -2,7 +2,7 @@ (name uring) (public_name uring) (foreign_archives uring) - (libraries fmt optint unix) + (libraries fmt optint unix compiler-libs.optcomp) (foreign_stubs (language c) (names uring_stubs) @@ -11,7 +11,7 @@ (extra_deps include/liburing/compat.h))) (rule - (targets config.ml) + (targets uring_config.ml) (deps include/liburing.h include/liburing/io_uring.h diff --git a/lib/uring/include/discover.ml b/lib/uring/include/discover.ml index 9e0d3d0c..8e2985b2 100644 --- a/lib/uring/include/discover.ml +++ b/lib/uring/include/discover.ml @@ -189,7 +189,7 @@ let () = let at_struct = List.map (fun (name, v) -> Printf.sprintf " let %s = 0x%x" name v) at_flags in let mask_struct = List.map (fun (name, v) -> Printf.sprintf " let %s = 0x%x" name v) mask_flags in let attr_struct = List.map (fun (name, v) -> Printf.sprintf " let %s = 0x%x" name v) attr_flags in - C.Flags.write_lines "config.ml" + C.Flags.write_lines "uring_config.ml" (defs @ ["module Op : sig"; " type t"; diff --git a/lib/uring/uring.ml b/lib/uring/uring.ml index c048b310..8ad71f8c 100644 --- a/lib/uring/uring.ml +++ b/lib/uring/uring.ml @@ -14,6 +14,9 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) +let major_alloc_byte_size = Config.max_young_wosize * Sys.word_size +module Config = Uring_config + module Private = struct module Heap = Heap end @@ -198,21 +201,24 @@ module Op = Config.Op * `writev`, once we call `Uring.submit` the `iovec` structures are * copied by the kernel and we can release them, which we do. *) -module Sketch = struct + module Sketch = struct + type buffer = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t type t = { - mutable buffer : bytes; + mutable buffer : buffer; mutable off : int; - mutable old_buffers : bytes list; + mutable old_buffers : buffer list; } - type ptr = bytes * int * int + type ptr = buffer * int * int + + let create_buffer len = Bigarray.(Array1.create char c_layout len) - let create_buffer len = Bytes.create len + let empty = create_buffer 0 let create () = - { buffer = Bytes.empty; off = 0; old_buffers = [] } + { buffer = empty; off = 0; old_buffers = [] } - let length t = Bytes.length t.buffer + let length t = Bigarray.Array1.size_in_bytes t.buffer let round a x = (x + (a - 1)) land (lnot (a - 1)) let round = round (Sys.word_size / 8) @@ -303,8 +309,8 @@ module Uring = struct external submit_nop : t -> id -> bool = "ocaml_uring_submit_nop" [@@noalloc] external submit_timeout : t -> id -> Sketch.ptr -> clock -> bool -> bool = "ocaml_uring_submit_timeout" [@@noalloc] external submit_poll_add : t -> Unix.file_descr -> id -> Poll_mask.t -> bool = "ocaml_uring_submit_poll_add" [@@noalloc] - external submit_read : t -> Unix.file_descr -> id -> bytes -> offset -> bool = "ocaml_uring_submit_read" [@@noalloc] - external submit_write : t -> Unix.file_descr -> id -> bytes -> offset -> bool = "ocaml_uring_submit_write" [@@noalloc] + external submit_read : t -> Unix.file_descr -> id -> bytes -> int -> offset -> bool = "ocaml_uring_submit_read_bytes" "ocaml_uring_submit_read_native" [@@noalloc] + external submit_write : t -> Unix.file_descr -> id -> bytes -> int -> offset -> bool = "ocaml_uring_submit_write_bytes" "ocaml_uring_submit_write_native" [@@noalloc] external submit_readv : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_readv" [@@noalloc] external submit_writev : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_writev" [@@noalloc] external submit_readv_fixed : t -> Unix.file_descr -> id -> bytes -> int -> int -> offset -> bool = "ocaml_uring_submit_readv_fixed_byte" "ocaml_uring_submit_readv_fixed_native" [@@noalloc] @@ -457,11 +463,11 @@ let unlink t ~dir ?(fd=at_fdcwd) path user_data = Uring.submit_unlinkat t.uring id fd buf dir ) user_data -let read t ~file_offset fd (buf : bytes) user_data = - with_id_full t (fun id -> Uring.submit_read t.uring fd id buf file_offset) user_data ~extra_data:buf +let read ?len t ~file_offset fd (buf : bytes) user_data = + with_id_full t (fun id -> Uring.submit_read t.uring fd id buf (Option.value ~default:(Bytes.length buf) len) file_offset) user_data ~extra_data:buf -let write t ~file_offset fd (buf : bytes) user_data = - with_id_full t (fun id -> Uring.submit_write t.uring fd id buf file_offset) user_data ~extra_data:buf +let write ?len t ~file_offset fd (buf : bytes) user_data = + with_id_full t (fun id -> Uring.submit_write t.uring fd id buf (Option.value ~default:(Bytes.length buf) len) file_offset) user_data ~extra_data:buf let iov_max = Config.iov_max @@ -624,7 +630,7 @@ let get_debug_stats t = sqe_ready = Uring.sq_ready t.uring; active_ops = active_ops t; sketch_used = t.sketch.off; - sketch_buffer_size = Bytes.length t.sketch.buffer; + sketch_buffer_size = Bigarray.Array1.dim t.sketch.buffer; sketch_old_buffers = List.length t.sketch.old_buffers; } @@ -636,7 +642,7 @@ module Bytes = struct (* We can't do sub views on raw bytes so it is a bit wasteful as we just drop half empty byte arrays :S *) - let rec shiftv ts = function + let rec shiftv ts v = match v with | 0 -> skip_empty ts | n -> match ts with @@ -644,4 +650,14 @@ module Bytes = struct | t :: ts -> let len = Bytes.length t in if n >= len then shiftv ts (n - len) else ts + + + let of_bigarray ~off ~len ba = + let ba_len = Bigarray.Array1.dim ba in + assert (off >= 0 && off + len < ba_len); + let b = Bytes.create len in + for i = 0 to len do + Bytes.unsafe_set b i (Bigarray.Array1.unsafe_get ba (off + i)) + done; + b end \ No newline at end of file diff --git a/lib/uring/uring.mli b/lib/uring/uring.mli index 9f50de10..7860c80c 100644 --- a/lib/uring/uring.mli +++ b/lib/uring/uring.mli @@ -14,6 +14,8 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) +module Config = Uring_config + (** Io_uring interface. *) module Region = Region @@ -25,6 +27,8 @@ type 'a job (** A handle for a submitted job, which can be used to cancel it. If an operation returns [None], this means that submission failed because the ring is full. *) +val major_alloc_byte_size : int + val create : ?polling_timeout:int -> queue_depth:int -> unit -> 'a t (** [create ~queue_depth] will return a fresh Io_uring structure [t]. Initially, [t] has no fixed buffer. Use {!set_fixed_buffer} if you want one. @@ -169,13 +173,13 @@ type offset := Optint.Int63.t (** For files, give the absolute offset, or use [Optint.Int63.minus_one] for the current position. For sockets, use an offset of [Optint.Int63.zero] ([minus_one] is not allowed here). *) -val read : 'a t -> file_offset:offset -> Unix.file_descr -> bytes -> 'a -> 'a job option +val read : ?len:int -> 'a t -> file_offset:offset -> Unix.file_descr -> bytes -> 'a -> 'a job option (** [read t ~file_offset fd buf d] will submit a [read(2)] request to uring [t]. It reads from absolute [file_offset] on the [fd] file descriptor and writes the results into the memory pointed to by [buf]. The user data [d] will be returned by {!wait} or {!peek} upon completion. *) -val write : 'a t -> file_offset:offset -> Unix.file_descr -> bytes -> 'a -> 'a job option +val write : ?len:int -> 'a t -> file_offset:offset -> Unix.file_descr -> bytes -> 'a -> 'a job option (** [write t ~file_offset fd buf d] will submit a [write(2)] request to uring [t]. It writes to absolute [file_offset] on the [fd] file descriptor from the the memory pointed to by [buf]. The user data [d] will be returned by @@ -469,4 +473,5 @@ end module Bytes : sig val shiftv : bytes list -> int -> bytes list + val of_bigarray : off:int -> len:int -> (char, 'a, 'b) Bigarray.Array1.t -> bytes end diff --git a/lib/uring/uring_stubs.c b/lib/uring/uring_stubs.c index d8c7fae3..ebe9f53c 100644 --- a/lib/uring/uring_stubs.c +++ b/lib/uring/uring_stubs.c @@ -59,7 +59,7 @@ #endif #define Ring_val(v) *((struct io_uring**)Data_custom_val(v)) -#define Sketch_ptr_val(vsp) ((void *)(Bytes_val(Field(vsp, 0)) + Long_val(Field(vsp, 1)))) +#define Sketch_ptr_val(vsp) (Caml_ba_data_val(Field(vsp, 0)) + Long_val(Field(vsp, 1))) #define Sketch_ptr_len_val(vsp) Long_val(Field(vsp, 2)) // Note that this does not free the ring data. You must not allow this to be @@ -368,10 +368,9 @@ ocaml_uring_submit_writev_fixed_byte(value* values, int argc) { } value /* noalloc */ -ocaml_uring_submit_read(value v_uring, value v_fd, value v_id, value v_bytes, value v_fileoff) { +ocaml_uring_submit_read_native(value v_uring, value v_fd, value v_id, value v_bytes, value v_len, value v_fileoff) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - value v_len = Val_long(caml_string_length(v_bytes)); void *buf = Bytes_val(v_bytes); if (!sqe) return (Val_false); dprintf("submit_read: fd %d buff %p len %zd fileoff %d\n", @@ -381,11 +380,21 @@ ocaml_uring_submit_read(value v_uring, value v_fd, value v_id, value v_bytes, va return (Val_true); } +value +ocaml_uring_submit_read_bytes(value* values, int argc) { + return ocaml_uring_submit_read_native( + values[0], + values[1], + values[2], + values[3], + values[4], + values[5]); +} + value /* noalloc */ -ocaml_uring_submit_write(value v_uring, value v_fd, value v_id, value v_bytes, value v_fileoff) { +ocaml_uring_submit_write_native(value v_uring, value v_fd, value v_id, value v_bytes, value v_len, value v_fileoff) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - value v_len = Val_int(caml_string_length(v_bytes)); void *buf = Bytes_val(v_bytes); if (!sqe) return (Val_false); dprintf("submit_write: fd %d buff %p len %zd fileoff %d\n", @@ -395,6 +404,17 @@ ocaml_uring_submit_write(value v_uring, value v_fd, value v_id, value v_bytes, v return (Val_true); } +value +ocaml_uring_submit_write_bytes(value* values, int argc) { + return ocaml_uring_submit_write_native( + values[0], + values[1], + values[2], + values[3], + values[4], + values[5]); +} + value /* noalloc */ ocaml_uring_submit_splice(value v_uring, value v_id, value v_fd_in, value v_fd_out, value v_nbytes) { struct io_uring *ring = Ring_val(v_uring); diff --git a/tests/main.md b/tests/main.md index 96df21d5..096fc219 100644 --- a/tests/main.md +++ b/tests/main.md @@ -25,6 +25,14 @@ let traceln fmt = Format.printf (fmt ^^ "@.") ``` +Setup a new printer for bytes to make things readable. + +```ocaml +# let pp_bytes ppf b = Format.fprintf ppf "Bytes.t " (Bytes.length b);; +val pp_bytes : Format.formatter -> bytes -> unit = +# #install_printer pp_bytes;; +``` + ## Queue depth ```ocaml @@ -40,8 +48,8 @@ val t : [ `Read ] Uring.t = # let fd = Unix.openfile "/dev/zero" Unix.[O_RDONLY] 0;; val fd : Unix.file_descr = -# let b = Cstruct.create 1;; -val b : Cstruct.t = {Cstruct.buffer = ; off = 0; len = 1} +# let b = Bytes.create 1;; +val b : bytes = Bytes.t # Uring.read t fd b `Read ~file_offset:Int63.minus_one;; - : [ `Read ] Uring.job option = Some # Uring.submit t;; @@ -319,7 +327,7 @@ Exception: Unix.Unix_error(Unix.EXDEV, "openat2", "..") ```ocaml let set_fixed_buffer t size = - let fbuf = Bigarray.(Array1.create char c_layout size) in + let fbuf = Bytes.create Uring.major_alloc_byte_size in match Uring.set_fixed_buffer t fbuf with | Ok () -> fbuf | Error `ENOMEM -> failwith "Resource limit exceeded" @@ -328,12 +336,11 @@ let () = Test_data.setup () ``` ```ocaml -# let t : [ `Read ] Uring.t = Uring.create ~queue_depth:1 ();; -val t : [ `Read ] Uring.t = -# let fbuf = set_fixed_buffer t 1024;; -val fbuf : - (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t = - +let t : [ `Read ] Uring.t = Uring.create ~queue_depth:1 () +let fbuf : Bytes.t = set_fixed_buffer t Uring.major_alloc_byte_size +``` + +```ocaml # let off = 3;; val off : int = 3 # let len = 5;; @@ -347,7 +354,7 @@ val fd : Unix.file_descr = - : int = 1 # consume t;; - : [ `Read ] * int = (`Read, 5) -# Cstruct.of_bigarray fbuf ~off ~len |> Cstruct.to_string;; +# Bytes.sub fbuf off len |> Bytes.to_string;; - : string = "test " # let fd : unit = Unix.close fd;; @@ -367,26 +374,26 @@ val fd : Unix.file_descr = # let b1_len = 3 and b2_len = 7;; val b1_len : int = 3 val b2_len : int = 7 -# let b1 = Cstruct.create b1_len and b2 = Cstruct.create b2_len;; -val b1 : Cstruct.t = {Cstruct.buffer = ; off = 0; len = 3} -val b2 : Cstruct.t = {Cstruct.buffer = ; off = 0; len = 7} +# let b1 = Bytes.create Uring.major_alloc_byte_size and b2 = Bytes.create Uring.major_alloc_byte_size;; +val b1 : bytes = Bytes.t +val b2 : bytes = Bytes.t -# Uring.read t fd b1 `Read ~file_offset:Int63.minus_one;; +# Uring.read ~len:b1_len t fd b1 `Read ~file_offset:Int63.minus_one;; - : [ `Read ] Uring.job option = Some # Uring.submit t;; - : int = 1 # let `Read, read = consume t;; val read : int = 3 -# Cstruct.to_string b1;; +# Bytes.sub_string b1 0 b1_len;; - : string = "A t" -# Uring.read t fd b2 `Read ~file_offset:Int63.minus_one;; +# Uring.read ~len:b2_len t fd b2 `Read ~file_offset:Int63.minus_one;; - : [ `Read ] Uring.job option = Some # Uring.submit t;; - : int = 1 # let `Read, read = consume t;; val read : int = 7 -# Cstruct.to_string b2;; +# Bytes.sub_string b2 0 b2_len;; - : string = "est fil" # let fd : unit = Unix.close fd;; @@ -399,14 +406,22 @@ Writing with write: # let t : [`Read | `Write] Uring.t = Uring.create ~queue_depth:2 ();; val t : [ `Read | `Write ] Uring.t = -# let rb = Cstruct.create 10 and wb = Cstruct.of_string "Hello";; -val rb : Cstruct.t = {Cstruct.buffer = ; off = 0; len = 10} -val wb : Cstruct.t = {Cstruct.buffer = ; off = 0; len = 5} +# let rb_len = 10;; +val rb_len : int = 10 + +# let rb = Bytes.create Uring.major_alloc_byte_size;; +val rb : bytes = Bytes.t +# let wb = + let b = Bytes.create Uring.major_alloc_byte_size in + Bytes.blit_string "Hello" 0 b 0 5; + b;; +val wb : bytes = Bytes.t + # let r, w = Unix.pipe ();; val r : Unix.file_descr = val w : Unix.file_descr = -# Uring.write t w wb `Write ~file_offset:Int63.minus_one;; +# Uring.write ~len:5 t w wb `Write ~file_offset:Int63.minus_one;; - : [ `Read | `Write ] Uring.job option = Some # Uring.submit t;; - : int = 1 @@ -414,7 +429,7 @@ val w : Unix.file_descr = val v : [ `Read | `Write ] = `Write val read : int = 5 -# Uring.read t r rb `Read ~file_offset:Int63.minus_one;; +# Uring.read ~len:rb_len t r rb `Read ~file_offset:Int63.minus_one;; - : [ `Read | `Write ] Uring.job option = Some # Uring.submit t;; - : int = 1 @@ -422,9 +437,7 @@ val read : int = 5 val v : [ `Read | `Write ] = `Read val read : int = 5 -# let rb = Cstruct.sub rb 0 5;; -val rb : Cstruct.t = {Cstruct.buffer = ; off = 0; len = 5} -# Cstruct.to_string rb;; +# Bytes.sub_string rb 0 5;; - : string = "Hello" # let w : unit = Unix.close w;; @@ -432,458 +445,3 @@ val w : unit = () # let r : unit = Unix.close r;; val r : unit = () ``` - -Reading with readv: - -```ocaml -# let t : [ `Readv ] Uring.t = Uring.create ~queue_depth:1 ();; -val t : [ `Readv ] Uring.t = - -# let fd = Unix.openfile Test_data.path [ O_RDONLY ] 0;; -val fd : Unix.file_descr = -# let b1_len = 3 and b2_len = 7;; -val b1_len : int = 3 -val b2_len : int = 7 -# let b1 = Cstruct.create b1_len and b2 = Cstruct.create b2_len;; -val b1 : Cstruct.t = {Cstruct.buffer = ; off = 0; len = 3} -val b2 : Cstruct.t = {Cstruct.buffer = ; off = 0; len = 7} -# let iov = [b1; b2] in - Uring.readv t fd iov `Readv ~file_offset:Int63.zero;; -- : [ `Readv ] Uring.job option = Some - -# Uring.submit t;; -- : int = 1 - -# let `Readv, read = consume t;; -val read : int = 10 -# Cstruct.to_string b1;; -- : string = "A t" -# Cstruct.to_string b2;; -- : string = "est fil" - -# let fd : unit = Unix.close fd;; -val fd : unit = () -``` - -Test using cstructs with offsets: - -```ocaml -# let fd = Unix.openfile Test_data.path [ O_RDONLY ] 0;; -val fd : Unix.file_descr = -# let b = Cstruct.of_string "Gathered [ ] and [ ]";; -val b : Cstruct.t = {Cstruct.buffer = ; off = 0; len = 25} -# let b1 = Cstruct.sub b 10 4 and b2 = Cstruct.sub b 21 3 in - let iov = [b1; b2] in - Uring.readv t fd iov `Readv ~file_offset:Int63.zero;; -- : [ `Readv ] Uring.job option = Some -# Uring.submit t;; -- : int = 1 -# consume t;; -- : [ `Readv ] * int = (`Readv, 7) -# Cstruct.to_string b;; -- : string = "Gathered [A te] and [st ]" - -# let fd : unit = Unix.close fd;; -val fd : unit = () -# Uring.exit t;; -- : unit = () -``` - -## Regions - -```ocaml -# let t : [ `Read ] Uring.t = Uring.create ~queue_depth:1 ();; -val t : [ `Read ] Uring.t = - -# let fbuf = set_fixed_buffer t 64;; -val fbuf : - (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t = - -# let region = Uring.Region.init fbuf 4 ~block_size:16;; -val region : Uring.Region.t = -# let chunk = Uring.Region.alloc region;; -val chunk : Uring.Region.chunk = - -# let fd = Unix.openfile Test_data.path [ O_RDONLY ] 0;; -val fd : Unix.file_descr = -# Uring.read_chunk t fd chunk `Read ~file_offset:Int63.zero;; -- : [ `Read ] Uring.job option = Some -# let `Read, read = consume t;; -val read : int = 11 -# Uring.Region.to_string ~len:read chunk;; -- : string = "A test file" -# Uring.read_chunk ~len:17 t fd chunk `Read ~file_offset:Int63.zero;; -Exception: -Invalid_argument "to_cstruct: requested length 17 > block size 16". -``` - -Attempt to use a chunk from one ring with another: - -```ocaml -# let t2 : [`Read] Uring.t = Uring.create ~queue_depth:1 ();; -val t2 : [ `Read ] Uring.t = -# Uring.read_chunk ~len:16 t2 fd chunk `Read ~file_offset:Int63.zero;; -Exception: Invalid_argument "Chunk does not belong to ring!". - -# let fd = Unix.close fd;; -val fd : unit = () -# Uring.exit t;; -- : unit = () -``` - -## Cancellation - -Ask to read from a pipe (with no data available), then cancel it. - -```ocaml -# exception Multiple of Unix.error list;; -exception Multiple of Unix.error list - -# let t : [ `Cancel | `Read ] Uring.t = Uring.create ~queue_depth:5 ();; -val t : [ `Cancel | `Read ] Uring.t = - -# set_fixed_buffer t 1024;; -- : (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t = - -# let r, w = Unix.pipe ();; -val r : Unix.file_descr = -val w : Unix.file_descr = -# let read = Uring.read_fixed t ~file_offset:Int63.zero r ~off:0 ~len:1 `Read |> Option.get;; -val read : [ `Cancel | `Read ] Uring.job = - -# Uring.cancel t read `Cancel;; -- : [ `Cancel | `Read ] Uring.job option = Some -# Uring.submit t;; -- : int = 2 -# let t1, r1 = consume t in - let t2, r2 = consume t in - let r_read, r_cancel = - match t1, t2 with - | `Read, `Cancel -> r1, r2 - | `Cancel, `Read -> r2, r1 - | _ -> assert false - in - begin match Uring.error_of_errno r_read, Uring.error_of_errno r_cancel with - | EINTR, EALREADY - (* Occasionally, the read is actually busy just as we try to cancel. - In that case it gets interrupted and the cancel returns EALREADY. *) - | EUNKNOWNERR 125 (* ECANCELLED *), EUNKNOWNERR 0 -> - (* This is the common case. The read is blocked and can just be removed. *) - () - | e1, e2 -> raise (Multiple [e1; e2]) - end;; -- : unit = () -# let r : unit = Unix.close r;; -val r : unit = () -# let w : unit = Unix.close w;; -val w : unit = () -# Uring.exit t;; -- : unit = () -``` - -By the time we cancel, the request has already succeeded (we just didn't process the reply yet): - -```ocaml -# let t : [ `Read | `Cancel ] Uring.t = Uring.create ~queue_depth:5 ();; -val t : [ `Cancel | `Read ] Uring.t = -# set_fixed_buffer t 102;; -- : (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t = - -# let r = Unix.openfile "/dev/zero" Unix.[O_RDONLY] 0;; -val r : Unix.file_descr = -# let read = Uring.read_fixed t ~file_offset:Int63.zero r ~off:0 ~len:1 `Read |> Option.get;; -val read : [ `Cancel | `Read ] Uring.job = -# Uring.submit t;; -- : int = 1 -# Unix.sleepf 0.001;; -- : unit = () -# Uring.cancel t read `Cancel;; -- : [ `Cancel | `Read ] Uring.job option = Some -# Uring.submit t;; -- : int = 1 -# let t1, r1 = consume t in - let t2, r2 = consume t in - let r_read, r_cancel = - match t1, t2 with - | `Read, `Cancel -> r1, r2 - | `Cancel, `Read -> r2, r1 - | _ -> assert false - in - if r_read = 1 then ( - match Uring.error_of_errno r_cancel with - | ENOENT -> () - | e -> raise (Unix.Unix_error (e, "cancel", "")) - ) else ( - match Uring.error_of_errno r_read, Uring.error_of_errno r_cancel with - | EUNKNOWNERR 125 (* ECANCELLED *), EUNKNOWNERR 0 -> - (* This isn't the case we want to test, but it can happen sometimes. *) - () - | e1, e2 -> raise (Multiple [e1; e2]) - );; -- : unit = () -# let r : unit = Unix.close r;; -val r : unit = () - -# Uring.exit t;; -- : unit = () -``` - -By the time we cancel, we already knew the operation was over: - -```ocaml -# let t : [ `Read | `Cancel ] Uring.t = Uring.create ~queue_depth:5 ();; -val t : [ `Cancel | `Read ] Uring.t = -# set_fixed_buffer t 1024;; -- : (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t = - -# let r = Unix.openfile "/dev/zero" Unix.[O_RDONLY] 0;; -val r : Unix.file_descr = -# let read = Uring.read_fixed t ~file_offset:Int63.zero r ~off:0 ~len:1 `Read |> Option.get;; -val read : [ `Cancel | `Read ] Uring.job = -# let token, r_read = consume t;; -val token : [ `Cancel | `Read ] = `Read -val r_read : int = 1 -# let r : unit = Unix.close r;; -val r : unit = () -``` - -Try to cancel after we may have reused the index: -```ocaml -# Uring.cancel t read `Cancel;; -Exception: Invalid_argument "Entry has already been freed!". - -# Uring.exit t;; -- : unit = () -``` - -## Freeing the ring - -We can't exit the ring while an operation is still pending: - -```ocaml -# let t : [ `Read | `Mkdir ] Uring.t = Uring.create ~queue_depth:1 ();; -val t : [ `Mkdir | `Read ] Uring.t = -# set_fixed_buffer t 1024;; -- : (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t = - -# let r, w = Unix.pipe ();; -val r : Unix.file_descr = -val w : Unix.file_descr = -# Uring.read_fixed t ~file_offset:Int63.minus_one r ~off:0 ~len:1 `Read;; -- : [ `Mkdir | `Read ] Uring.job option = Some -# Uring.submit t;; -- : int = 1 -# Uring.exit t;; -Exception: Invalid_argument "exit: 1 request(s) still active!". -``` - -But we can once it's complete: - -```ocaml -# let w : unit = Unix.close w;; -val w : unit = () -# consume t;; -- : [ `Mkdir | `Read ] * int = (`Read, 0) -# Uring.exit t;; -- : unit = () -# let r : unit = Unix.close r;; -val r : unit = () -``` - -We can't free the ring a second time, or use it after freeing it: - -```ocaml -# Uring.unlink t ~dir:false "/doesntexist" `Mkdir;; -Exception: -Invalid_argument "Can't use ring after Uring.exit has been called". - -# Uring.submit t;; -Exception: -Invalid_argument "Can't use ring after Uring.exit has been called". - -# Uring.wait t;; -Exception: -Invalid_argument "Can't use ring after Uring.exit has been called". - -# Uring.get_cqe_nonblocking t;; -Exception: -Invalid_argument "Can't use ring after Uring.exit has been called". - -# Uring.get_probe t;; -Exception: -Invalid_argument "Can't use ring after Uring.exit has been called". - -# Uring.exit t;; -Exception: -Invalid_argument "Can't use ring after Uring.exit has been called". -``` - -## Send_msg - -```ocaml -# let r, w = Unix.pipe ();; -val r : Unix.file_descr = -val w : Unix.file_descr = -# let t : [ `Recv | `Send ] Uring.t= Uring.create ~queue_depth:2 ();; -val t : [ `Recv | `Send ] Uring.t = -# let a, b = Unix.(socketpair PF_UNIX SOCK_STREAM 0);; -val a : Unix.file_descr = -val b : Unix.file_descr = -# let bufs = [Cstruct.of_string "hi"];; -val bufs : Cstruct.t list = [{Cstruct.buffer = ; off = 0; len = 2}] -# Uring.send_msg t a ~fds:[r; w] bufs `Send;; -- : [ `Recv | `Send ] Uring.job option = Some -# Uring.submit t;; -- : int = 1 -# consume t;; -- : [ `Recv | `Send ] * int = (`Send, 2) -# let recv_buf = Cstruct.of_string "XX";; -val recv_buf : Cstruct.t = {Cstruct.buffer = ; off = 0; len = 2} -# let recv = Uring.Msghdr.create ~n_fds:2 [recv_buf];; -val recv : Uring.Msghdr.t = -# List.length (Uring.Msghdr.get_fds recv);; -- : int = 0 -# Uring.recv_msg t b recv `Recv;; -- : [ `Recv | `Send ] Uring.job option = Some -# Uring.submit t;; -- : int = 1 -# consume t;; -- : [ `Recv | `Send ] * int = (`Recv, 2) -# Cstruct.to_string recv_buf;; -- : string = "hi" -# let r2, w2 = - match Uring.Msghdr.get_fds recv with - | [r2; w2] -> r2, w2 - | _ -> failwith "Expected two FDs!";; -val r2 : Unix.file_descr = -val w2 : Unix.file_descr = -# Unix.write_substring w2 "to-w2" 0 5;; -- : int = 5 -# really_input_string (Unix.in_channel_of_descr r) 5;; -- : string = "to-w2" -# Unix.write_substring w "to-w" 0 4;; -- : int = 4 -# really_input_string (Unix.in_channel_of_descr r2) 4;; -- : string = "to-w" -# let r : unit = Unix.close r;; -val r : unit = () -# let r2 : unit = Unix.close r2;; -val r2 : unit = () -# let w2 : unit = Unix.close w2;; -val w2 : unit = () -# let w : unit = Unix.close w;; -val w : unit = () -# Uring.exit t;; -- : unit = () -``` - -## Unlink and rmdir - -```ocaml -# let t : unit Uring.t = Uring.create ~queue_depth:2 ();; -val t : unit Uring.t = - -# close_out (open_out "test-file"); Unix.mkdir "test-dir" 0o700;; -- : unit = () - -# let check () = Sys.file_exists "test-file", Sys.file_exists "test-dir";; -val check : unit -> bool * bool = -# check ();; -- : bool * bool = (true, true) - -# Uring.unlink t ~dir:false "test-file" ();; -- : unit Uring.job option = Some - -# Uring.unlink t ~dir:true "test-dir" ();; -- : unit Uring.job option = Some - -# Uring.wait t;; -- : unit Uring.completion_option = Uring.Some {Uring.result = 0; data = ()} - -# Uring.wait t;; -- : unit Uring.completion_option = Uring.Some {Uring.result = 0; data = ()} - -# check ();; -- : bool * bool = (false, false) - -# Uring.exit t;; -- : unit = () -``` - -## Timeout - -Timeout should return (-ETIME). This is defined in https://github.com/torvalds/linux/blob/master/include/uapi/asm-generic/errno.h#L45 - -```ocaml -# let t : [`Timeout] Uring.t = Uring.create ~queue_depth:1 ();; -val t : [ `Timeout ] Uring.t = - -# let ns1 = Int64.(mul 10L 1_000_000L) in - Uring.(timeout t Boottime ns1 `Timeout);; -- : [ `Timeout ] Uring.job option = Some - -# Uring.submit t;; -- : int = 1 - -# let `Timeout, timeout = consume t;; -val timeout : int = -62 - -# let ns = - ((Unix.gettimeofday () +. 0.01) *. 1e9) - |> Int64.of_float - in - Uring.(timeout ~absolute:true t Realtime ns `Timeout);; -- : [ `Timeout ] Uring.job option = Some - -# let `Timeout, timeout = consume t;; -val timeout : int = -62 - -# let ns1 = Int64.(mul 10L 1_000_000L) in - Uring.(timeout ~absolute:true t Boottime ns1 `Timeout);; -- : [ `Timeout ] Uring.job option = Some - -# let `Timeout, timeout = consume t;; -val timeout : int = -62 - -# Uring.exit t;; -- : unit = () -``` - -If there is a timeout but we did submit something, `io_uring_submit_and_wait_timeout` returns success instead: - -```ocaml -# let t : [`Timeout | `Cancel] Uring.t = Uring.create ~queue_depth:1 ();; -val t : [ `Cancel | `Timeout ] Uring.t = - -# let job = - let ns = Int64.(mul 10L 1_000_000_000L) in - Uring.(timeout t Boottime ns `Timeout);; -val job : [ `Cancel | `Timeout ] Uring.job option = Some - -# Uring.wait ~timeout:0.01 t;;Uring.wait ~timeout:0.01 t;; -- : [ `Cancel | `Timeout ] Uring.completion_option = Uring.None - -# Uring.cancel t (Option.get job) `Cancel;; -- : [ `Cancel | `Timeout ] Uring.job option = Some - -# ignore (Uring.wait ~timeout:10.0 t, Uring.wait ~timeout:10.0 t);; -- : unit = () - -# Uring.exit t;; -- : unit = () -``` - - -## Probing - -```ocaml -# let t : unit Uring.t = Uring.create ~queue_depth:1 ();; -val t : unit Uring.t = - -# let probe = Uring.get_probe t in - Uring.op_supported probe Uring.Op.nop;; -- : bool = true - -# Uring.exit t;; -- : unit = () -``` From b43a8ee3a2b4117c50aa53ec20b5255edf5465c5 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Sun, 20 Aug 2023 16:07:30 +0100 Subject: [PATCH 3/3] Bstruct and Slab --- README.md | 8 ++- bench/readv.ml | 14 +++--- lib/uring/uring.ml | 19 ++++--- lib/uring/uring.mli | 38 +++++++++++--- lib/uring/uring_stubs.c | 43 ++++++---------- lib/uring/util.ml | 106 ++++++++++++++++++++++++++++++++++++++++ tests/main.md | 83 ++++++++----------------------- tests/sketch.md | 6 ++- tests/urcat.ml | 13 ++--- tests/urcp_lib.ml | 19 +++---- 10 files changed, 220 insertions(+), 129 deletions(-) create mode 100644 lib/uring/util.ml diff --git a/README.md b/README.md index ddb4a1a3..6dfe38e9 100644 --- a/README.md +++ b/README.md @@ -100,12 +100,16 @@ let rec write_all fd = function let result, data = wait_with_retry uring in assert (data = `Write_all); (* There aren't any other requests pending *) assert (result > 0); (* Check for error return *) - let bufs = Uring.Bytes.shiftv bufs result in + let bufs = Uring.Bstruct.shiftv bufs result in write_all fd bufs ``` ```ocaml -# write_all fd Bytes.[of_string "INFO: "; of_string "A log message"];; +# let slab = Uring.Slab.create Uring.major_alloc_byte_size;; +val slab : Uring.Slab.t = +# let bs = Uring.Slab.slice_strings slab ["INFO: "; "A log message"];; +val bs : Uring.Bstruct.t list = [; ] +# write_all fd bs;; - : unit = () ``` diff --git a/bench/readv.ml b/bench/readv.ml index ba691053..6e7d37e7 100644 --- a/bench/readv.ml +++ b/bench/readv.ml @@ -13,14 +13,14 @@ let rec wait t handle = | None -> wait t handle | Some { result; data = buf } -> handle result buf -let run_bechmark ~polling_timeout fd = +let run_bechmark slab ~polling_timeout fd = let got = ref 0 in (* For polling mode, [queue_depth] needs to be slightly larger than [n_concurrent] or submission occasionally fails for some reason. *) let t = Uring.create ?polling_timeout ~queue_depth:(n_concurrent * 2) () in (* We start by submitting [n_concurrent] reads. *) for _ = 1 to n_concurrent do - let buf = Bytes.create buffer_size in + let buf = Uring.Slab.slice slab buffer_size in let _job : _ Uring.job = Uring.readv t fd [buf] ~file_offset:Optint.Int63.zero [buf] |> Option.get in () done; @@ -55,8 +55,10 @@ let run_bechmark ~polling_timeout fd = let () = let fd = Unix.openfile "/dev/zero" Unix.[O_RDONLY] 0 in - run_bechmark fd ~polling_timeout:None; - run_bechmark fd ~polling_timeout:(Some 1000); - run_bechmark fd ~polling_timeout:None; - run_bechmark fd ~polling_timeout:(Some 1000); + (* TODO: The slab doesn't release space so this could in theory run out of room... *) + let slab = Uring.Slab.create Uring.major_alloc_byte_size in + run_bechmark slab fd ~polling_timeout:None; + run_bechmark slab fd ~polling_timeout:(Some 1000); + run_bechmark slab fd ~polling_timeout:None; + run_bechmark slab fd ~polling_timeout:(Some 1000); Unix.close fd diff --git a/lib/uring/uring.ml b/lib/uring/uring.ml index 8ad71f8c..a049c90b 100644 --- a/lib/uring/uring.ml +++ b/lib/uring/uring.ml @@ -17,6 +17,9 @@ let major_alloc_byte_size = Config.max_young_wosize * Sys.word_size module Config = Uring_config +module Bstruct = Util.Bstruct +module Slab = Util.Slab + module Private = struct module Heap = Heap end @@ -245,7 +248,7 @@ module Op = Config.Op t.old_buffers <- [] module Iovec = struct - external set : ptr -> bytes list -> unit = "ocaml_uring_set_iovec" [@@noalloc] + external set : ptr -> Bstruct.t list -> unit = "ocaml_uring_set_iovec" [@@noalloc] let sizeof = Config.sizeof_iovec @@ -268,7 +271,7 @@ end (* Used for the sendmsg/recvmsg calls. Liburing doesn't support sendto/recvfrom at the time of writing. *) module Msghdr = struct type msghdr - type t = msghdr * Sockaddr.t option * bytes list (* `bytes list` is here only for preventing it being GCed *) + type t = msghdr * Sockaddr.t option * Bstruct.t list (* `bytes list` is here only for preventing it being GCed *) external make_msghdr : int -> Unix.file_descr list -> Sockaddr.t option -> msghdr = "ocaml_uring_make_msghdr" external get_msghdr_fds : msghdr -> Unix.file_descr list = "ocaml_uring_get_msghdr_fds" @@ -309,8 +312,8 @@ module Uring = struct external submit_nop : t -> id -> bool = "ocaml_uring_submit_nop" [@@noalloc] external submit_timeout : t -> id -> Sketch.ptr -> clock -> bool -> bool = "ocaml_uring_submit_timeout" [@@noalloc] external submit_poll_add : t -> Unix.file_descr -> id -> Poll_mask.t -> bool = "ocaml_uring_submit_poll_add" [@@noalloc] - external submit_read : t -> Unix.file_descr -> id -> bytes -> int -> offset -> bool = "ocaml_uring_submit_read_bytes" "ocaml_uring_submit_read_native" [@@noalloc] - external submit_write : t -> Unix.file_descr -> id -> bytes -> int -> offset -> bool = "ocaml_uring_submit_write_bytes" "ocaml_uring_submit_write_native" [@@noalloc] + external submit_read : t -> Unix.file_descr -> id -> Bstruct.t -> offset -> bool = "ocaml_uring_submit_read" [@@noalloc] + external submit_write : t -> Unix.file_descr -> id -> Bstruct.t -> offset -> bool = "ocaml_uring_submit_write" [@@noalloc] external submit_readv : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_readv" [@@noalloc] external submit_writev : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_writev" [@@noalloc] external submit_readv_fixed : t -> Unix.file_descr -> id -> bytes -> int -> int -> offset -> bool = "ocaml_uring_submit_readv_fixed_byte" "ocaml_uring_submit_readv_fixed_native" [@@noalloc] @@ -463,11 +466,11 @@ let unlink t ~dir ?(fd=at_fdcwd) path user_data = Uring.submit_unlinkat t.uring id fd buf dir ) user_data -let read ?len t ~file_offset fd (buf : bytes) user_data = - with_id_full t (fun id -> Uring.submit_read t.uring fd id buf (Option.value ~default:(Bytes.length buf) len) file_offset) user_data ~extra_data:buf +let read t ~file_offset fd (buf : Bstruct.t) user_data = + with_id_full t (fun id -> Uring.submit_read t.uring fd id buf file_offset) user_data ~extra_data:buf -let write ?len t ~file_offset fd (buf : bytes) user_data = - with_id_full t (fun id -> Uring.submit_write t.uring fd id buf (Option.value ~default:(Bytes.length buf) len) file_offset) user_data ~extra_data:buf +let write t ~file_offset fd (buf : Bstruct.t) user_data = + with_id_full t (fun id -> Uring.submit_write t.uring fd id buf file_offset) user_data ~extra_data:buf let iov_max = Config.iov_max diff --git a/lib/uring/uring.mli b/lib/uring/uring.mli index 7860c80c..9042fe07 100644 --- a/lib/uring/uring.mli +++ b/lib/uring/uring.mli @@ -16,6 +16,32 @@ module Config = Uring_config +module Bstruct : sig + type t + + val length : t -> int + + val to_string : ?off:int -> ?len:int -> t -> string + + val shiftv : t list -> int -> t list +end + +module Slab : sig + type t + + val create : int -> t + (** [create size] is a new slab. *) + + val slice : t -> int -> Bstruct.t + (** [slice slab len] returns a new bstruct from the bigger slab if there + is space. *) + + val slice_string : t -> string -> Bstruct.t + + val slice_strings : t -> string list -> Bstruct.t list + (** [slice_strings t s] will copy the strings into the slab and return bstructs for them. *) +end + (** Io_uring interface. *) module Region = Region @@ -173,13 +199,13 @@ type offset := Optint.Int63.t (** For files, give the absolute offset, or use [Optint.Int63.minus_one] for the current position. For sockets, use an offset of [Optint.Int63.zero] ([minus_one] is not allowed here). *) -val read : ?len:int -> 'a t -> file_offset:offset -> Unix.file_descr -> bytes -> 'a -> 'a job option +val read : 'a t -> file_offset:offset -> Unix.file_descr -> Bstruct.t -> 'a -> 'a job option (** [read t ~file_offset fd buf d] will submit a [read(2)] request to uring [t]. It reads from absolute [file_offset] on the [fd] file descriptor and writes the results into the memory pointed to by [buf]. The user data [d] will be returned by {!wait} or {!peek} upon completion. *) -val write : ?len:int -> 'a t -> file_offset:offset -> Unix.file_descr -> bytes -> 'a -> 'a job option +val write : 'a t -> file_offset:offset -> Unix.file_descr -> Bstruct.t -> 'a -> 'a job option (** [write t ~file_offset fd buf d] will submit a [write(2)] request to uring [t]. It writes to absolute [file_offset] on the [fd] file descriptor from the the memory pointed to by [buf]. The user data [d] will be returned by @@ -188,7 +214,7 @@ val write : ?len:int -> 'a t -> file_offset:offset -> Unix.file_descr -> bytes - val iov_max : int (** The maximum length of the list that can be passed to [readv] and similar. *) -val readv : 'a t -> file_offset:offset -> Unix.file_descr -> bytes list -> 'a -> 'a job option +val readv : 'a t -> file_offset:offset -> Unix.file_descr -> Bstruct.t list -> 'a -> 'a job option (** [readv t ~file_offset fd iov d] will submit a [readv(2)] request to uring [t]. It reads from absolute [file_offset] on the [fd] file descriptor and writes the results into the memory pointed to by [iov]. The user data [d] will @@ -196,7 +222,7 @@ val readv : 'a t -> file_offset:offset -> Unix.file_descr -> bytes list -> 'a -> Requires [List.length iov <= Uring.iov_max] *) -val writev : 'a t -> file_offset:offset -> Unix.file_descr -> bytes list -> 'a -> 'a job option +val writev : 'a t -> file_offset:offset -> Unix.file_descr -> Bstruct.t list -> 'a -> 'a job option (** [writev t ~file_offset fd iov d] will submit a [writev(2)] request to uring [t]. It writes to absolute [file_offset] on the [fd] file descriptor from the the memory pointed to by [iov]. The user data [d] will be returned by @@ -381,7 +407,7 @@ val cancel : 'a t -> 'a job -> 'a -> 'a job option module Msghdr : sig type t - val create : ?n_fds:int -> ?addr:Sockaddr.t -> bytes list -> t + val create : ?n_fds:int -> ?addr:Sockaddr.t -> Bstruct.t list -> t (** [create buffs] makes a new [msghdr] using the [buffs] for the underlying [iovec]. @@ -394,7 +420,7 @@ module Msghdr : sig val get_fds : t -> Unix.file_descr list end -val send_msg : ?fds:Unix.file_descr list -> ?dst:Unix.sockaddr -> 'a t -> Unix.file_descr -> bytes list -> 'a -> 'a job option +val send_msg : ?fds:Unix.file_descr list -> ?dst:Unix.sockaddr -> 'a t -> Unix.file_descr -> Bstruct.t list -> 'a -> 'a job option (** [send_msg t fd buffs d] will submit a [sendmsg(2)] request. The [Msghdr] will be constructed from the FDs ([fds]), address ([dst]) and buffers ([buffs]). diff --git a/lib/uring/uring_stubs.c b/lib/uring/uring_stubs.c index ebe9f53c..f23d6076 100644 --- a/lib/uring/uring_stubs.c +++ b/lib/uring/uring_stubs.c @@ -280,8 +280,11 @@ ocaml_uring_set_iovec(value v_sketch_ptr, value v_csl) v_aux != Val_emptylist; v_aux = Field(v_aux, 1), i++) { value v_cs = Field(v_aux, 0); - iovs[i].iov_base = Bytes_val(v_cs); - iovs[i].iov_len = caml_string_length(v_cs); + value v_ba = Field(v_cs, 0); + value v_off = Field(v_cs, 1); + value v_len = Field(v_cs, 2); + iovs[i].iov_base = Bytes_val(v_ba) + Long_val(v_off); + iovs[i].iov_len = Long_val(v_len); } } @@ -368,10 +371,13 @@ ocaml_uring_submit_writev_fixed_byte(value* values, int argc) { } value /* noalloc */ -ocaml_uring_submit_read_native(value v_uring, value v_fd, value v_id, value v_bytes, value v_len, value v_fileoff) { +ocaml_uring_submit_read(value v_uring, value v_fd, value v_id, value v_bstruct, value v_fileoff) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - void *buf = Bytes_val(v_bytes); + value v_ba = Field(v_bstruct, 0); + value v_off = Field(v_bstruct, 1); + value v_len = Field(v_bstruct, 2); + void *buf = Bytes_val(v_ba) + Long_val(v_off); if (!sqe) return (Val_false); dprintf("submit_read: fd %d buff %p len %zd fileoff %d\n", Int_val(v_fd), buf, Long_val(v_len), Int63_val(v_fileoff)); @@ -380,22 +386,14 @@ ocaml_uring_submit_read_native(value v_uring, value v_fd, value v_id, value v_by return (Val_true); } -value -ocaml_uring_submit_read_bytes(value* values, int argc) { - return ocaml_uring_submit_read_native( - values[0], - values[1], - values[2], - values[3], - values[4], - values[5]); -} - value /* noalloc */ -ocaml_uring_submit_write_native(value v_uring, value v_fd, value v_id, value v_bytes, value v_len, value v_fileoff) { +ocaml_uring_submit_write(value v_uring, value v_fd, value v_id, value v_bstruct, value v_fileoff) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - void *buf = Bytes_val(v_bytes); + value v_ba = Field(v_bstruct, 0); + value v_off = Field(v_bstruct, 1); + value v_len = Field(v_bstruct, 2); + void *buf = Bytes_val(v_ba) + Long_val(v_off); if (!sqe) return (Val_false); dprintf("submit_write: fd %d buff %p len %zd fileoff %d\n", Int_val(v_fd), buf, Long_val(v_len), Int63_val(v_fileoff)); @@ -404,17 +402,6 @@ ocaml_uring_submit_write_native(value v_uring, value v_fd, value v_id, value v_b return (Val_true); } -value -ocaml_uring_submit_write_bytes(value* values, int argc) { - return ocaml_uring_submit_write_native( - values[0], - values[1], - values[2], - values[3], - values[4], - values[5]); -} - value /* noalloc */ ocaml_uring_submit_splice(value v_uring, value v_id, value v_fd_in, value v_fd_out, value v_nbytes) { struct io_uring *ring = Ring_val(v_uring); diff --git a/lib/uring/util.ml b/lib/uring/util.ml new file mode 100644 index 00000000..32ee25d7 --- /dev/null +++ b/lib/uring/util.ml @@ -0,0 +1,106 @@ +module Bstruct = struct + type t = { + buffer : bytes; + off : int; + len : int; + } + + let empty = { buffer = Bytes.empty; off = 0; len = 0 } + + let length t = t.len + + let copy_to_string src srcoff len = + if len < 0 || srcoff < 0 || src.len - srcoff < len then + failwith "Error copying to string!" + else + let b = Bytes.create len in + Bytes.blit src.buffer (src.off+srcoff) b 0 len; + (* The following call is safe, since b is not visible elsewhere. *) + Bytes.unsafe_to_string b + + let blit_from_string src srcoff dst dstoff len = + if len < 0 || srcoff < 0 || String.length src - srcoff < len then + invalid_arg "Bstruct blit" + else if dstoff < 0 || dst.len - dstoff < len then + invalid_arg "Bstruct blit" + else + String.blit src srcoff dst.buffer + (dst.off+dstoff) len + + let to_string ?(off=0) ?len t = + let len = match len with None -> length t - off | Some l -> l in + copy_to_string t off len + + let pp_t ppf t = + Format.fprintf ppf "[%d,%d](%d)" t.off t.len (Bytes.length t.buffer) + let string_t ppf str = + Format.fprintf ppf "[%d]" (String.length str) + let bytes_t ppf str = + Format.fprintf ppf "[%d]" (Bytes.length str) + + let err fmt = + let b = Buffer.create 20 in (* for thread safety. *) + let ppf = Format.formatter_of_buffer b in + let k ppf = Format.pp_print_flush ppf (); invalid_arg (Buffer.contents b) in + Format.kfprintf k ppf fmt + + let check_bounds t len = + len >= 0 && Bytes.length t.buffer >= len + + let err_shift t = err "Bstruct.shift %a %d" pp_t t + let err_shiftv n = err "Bstruct.shiftv short by %d" n + + let shift t amount = + let off = t.off + amount in + let len = t.len - amount in + if amount < 0 || amount > t.len || not (check_bounds t (off+len)) then + err_shift t amount + else { t with off; len } + + let rec skip_empty = function + | t :: ts when t.len = 0 -> skip_empty ts + | x -> x + + let rec shiftv ts = function + | 0 -> skip_empty ts + | n -> + match ts with + | [] -> err_shiftv n + | t :: ts when n >= t.len -> shiftv ts (n - t.len) + | t :: ts -> shift t n :: ts +end + +module Slab = struct + type t = { + size : int; + mutable free_ptr : int; + mutable bytes : bytes; + mutable slices : (int * int) list; + } + + let create size = { size; free_ptr = 0; bytes = Bytes.create size; slices = [] } + + (* It would be nice if the bstruct here was just the raw pointer to the byte + array instead of probably all the extra header bytes... *) + let slice t len = + if t.free_ptr + len > t.size then invalid_arg "No space" + else begin + let b = { + Bstruct.buffer = t.bytes; + off = t.free_ptr; + len = len; + } in + t.slices <- (t.free_ptr, len) :: t.slices; + t.free_ptr <- t.free_ptr + len; + b + end + + let slice_string t s = + let len = String.length s in + let b = slice t len in + Bstruct.blit_from_string s 0 b 0 len; + b + + let slice_strings t s = + List.map (slice_string t) s +end \ No newline at end of file diff --git a/tests/main.md b/tests/main.md index 096fc219..13b9eec2 100644 --- a/tests/main.md +++ b/tests/main.md @@ -23,6 +23,8 @@ let rec consume t = let traceln fmt = Format.printf (fmt ^^ "@.") + +let () = Test_data.setup () ``` Setup a new printer for bytes to make things readable. @@ -46,10 +48,13 @@ Prove we can wait more entries than queue depth # let t : [ `Read ] Uring.t = Uring.create ~queue_depth:1 ();; val t : [ `Read ] Uring.t = +# let slab = Uring.Slab.create Uring.major_alloc_byte_size;; +val slab : Uring.Slab.t = + # let fd = Unix.openfile "/dev/zero" Unix.[O_RDONLY] 0;; val fd : Unix.file_descr = -# let b = Bytes.create 1;; -val b : bytes = Bytes.t +# let b = Uring.Slab.slice slab 1;; +val b : Uring.Bstruct.t = # Uring.read t fd b `Read ~file_offset:Int63.minus_one;; - : [ `Read ] Uring.job option = Some # Uring.submit t;; @@ -323,46 +328,6 @@ Exception: Unix.Unix_error(Unix.EXDEV, "openat2", "..") - : unit = () ``` -## Read with fixed buffer - -```ocaml -let set_fixed_buffer t size = - let fbuf = Bytes.create Uring.major_alloc_byte_size in - match Uring.set_fixed_buffer t fbuf with - | Ok () -> fbuf - | Error `ENOMEM -> failwith "Resource limit exceeded" - -let () = Test_data.setup () -``` - -```ocaml -let t : [ `Read ] Uring.t = Uring.create ~queue_depth:1 () -let fbuf : Bytes.t = set_fixed_buffer t Uring.major_alloc_byte_size -``` - -```ocaml -# let off = 3;; -val off : int = 3 -# let len = 5;; -val len : int = 5 -# let fd = Unix.openfile Test_data.path [ O_RDONLY ] 0;; -val fd : Unix.file_descr = -# let file_offset = Int63.of_int 2 in - Uring.read_fixed t ~file_offset fd ~off ~len `Read;; -- : [ `Read ] Uring.job option = Some -# Uring.submit t;; -- : int = 1 -# consume t;; -- : [ `Read ] * int = (`Read, 5) -# Bytes.sub fbuf off len |> Bytes.to_string;; -- : string = "test " - -# let fd : unit = Unix.close fd;; -val fd : unit = () -# Uring.exit t;; -- : unit = () -``` - Reading with read: ```ocaml @@ -374,26 +339,26 @@ val fd : Unix.file_descr = # let b1_len = 3 and b2_len = 7;; val b1_len : int = 3 val b2_len : int = 7 -# let b1 = Bytes.create Uring.major_alloc_byte_size and b2 = Bytes.create Uring.major_alloc_byte_size;; -val b1 : bytes = Bytes.t -val b2 : bytes = Bytes.t +# let b1 = Uring.Slab.slice slab b1_len and b2 = Uring.Slab.slice slab b2_len;; +val b1 : Uring.Bstruct.t = +val b2 : Uring.Bstruct.t = -# Uring.read ~len:b1_len t fd b1 `Read ~file_offset:Int63.minus_one;; +# Uring.read t fd b1 `Read ~file_offset:Int63.minus_one;; - : [ `Read ] Uring.job option = Some # Uring.submit t;; - : int = 1 # let `Read, read = consume t;; val read : int = 3 -# Bytes.sub_string b1 0 b1_len;; +# Uring.Bstruct.to_string b1;; - : string = "A t" -# Uring.read ~len:b2_len t fd b2 `Read ~file_offset:Int63.minus_one;; +# Uring.read t fd b2 `Read ~file_offset:Int63.minus_one;; - : [ `Read ] Uring.job option = Some # Uring.submit t;; - : int = 1 # let `Read, read = consume t;; val read : int = 7 -# Bytes.sub_string b2 0 b2_len;; +# Uring.Bstruct.to_string b2;; - : string = "est fil" # let fd : unit = Unix.close fd;; @@ -406,22 +371,16 @@ Writing with write: # let t : [`Read | `Write] Uring.t = Uring.create ~queue_depth:2 ();; val t : [ `Read | `Write ] Uring.t = -# let rb_len = 10;; -val rb_len : int = 10 - -# let rb = Bytes.create Uring.major_alloc_byte_size;; -val rb : bytes = Bytes.t -# let wb = - let b = Bytes.create Uring.major_alloc_byte_size in - Bytes.blit_string "Hello" 0 b 0 5; - b;; -val wb : bytes = Bytes.t +# let rb = Uring.Slab.slice slab 10;; +val rb : Uring.Bstruct.t = +# let wb = Uring.Slab.slice_string slab "Hello";; +val wb : Uring.Bstruct.t = # let r, w = Unix.pipe ();; val r : Unix.file_descr = val w : Unix.file_descr = -# Uring.write ~len:5 t w wb `Write ~file_offset:Int63.minus_one;; +# Uring.write t w wb `Write ~file_offset:Int63.minus_one;; - : [ `Read | `Write ] Uring.job option = Some # Uring.submit t;; - : int = 1 @@ -429,7 +388,7 @@ val w : Unix.file_descr = val v : [ `Read | `Write ] = `Write val read : int = 5 -# Uring.read ~len:rb_len t r rb `Read ~file_offset:Int63.minus_one;; +# Uring.read t r rb `Read ~file_offset:Int63.minus_one;; - : [ `Read | `Write ] Uring.job option = Some # Uring.submit t;; - : int = 1 @@ -437,7 +396,7 @@ val read : int = 5 val v : [ `Read | `Write ] = `Read val read : int = 5 -# Bytes.sub_string rb 0 5;; +# Uring.Bstruct.to_string ~len:read rb;; - : string = "Hello" # let w : unit = Unix.close w;; diff --git a/tests/sketch.md b/tests/sketch.md index 815ba51b..d864c551 100644 --- a/tests/sketch.md +++ b/tests/sketch.md @@ -20,8 +20,10 @@ let rec consume t = val t : unit Uring.t = # let fd = Unix.openfile "/dev/zero" [ O_RDONLY ] 0;; val fd : Unix.file_descr = -# let b = Bytes.create 1;; -val b : bytes = Bytes.of_string "\000" +# let b = + let slab = Uring.Slab.create Uring.major_alloc_byte_size in + Uring.Slab.slice slab 1;; +val b : Uring.Bstruct.t = # Uring.readv t fd (ldup 1 b) () ~file_offset:Int63.zero;; - : unit Uring.job option = Some diff --git a/tests/urcat.ml b/tests/urcat.ml index fdf214b7..f07b7901 100644 --- a/tests/urcat.ml +++ b/tests/urcat.ml @@ -17,23 +17,23 @@ let get_completion_and_print uring = let remaining = ref len in Printf.eprintf "%d bytes read\n%!" len; List.iter (fun buf -> - let buflen = Bytes.length buf in + let buflen = Uring.Bstruct.length buf in if !remaining > 0 then begin if buflen <= !remaining then begin - print_string (Bytes.to_string buf); + print_string (Uring.Bstruct.to_string buf); remaining := !remaining - buflen; end else begin - print_string (Bytes.sub_string buf 0 !remaining); + print_string (Uring.Bstruct.to_string buf ~off:0 ~len:!remaining); remaining := 0; end end ) iov -let submit_read_request fname uring = +let submit_read_request fname uring slab = let fd = Unix.(handle_unix_error (openfile fname [O_RDONLY]) 0) in let file_sz = get_file_size fd in let blocks = if file_sz mod block_size <> 0 then (file_sz / block_size)+1 else file_sz/block_size in - let iov = List.init blocks (fun _ -> Bytes.create block_size) in + let iov = List.init blocks (fun _ -> Uring.Slab.slice slab block_size) in let _ = Uring.readv uring fd iov iov ~file_offset:Optint.Int63.zero in Gc.full_major (); (* <--- KABOOM! ... well not really, just the bytes are empty... did something move? *) let numreq = Uring.submit uring in @@ -43,5 +43,6 @@ let submit_read_request fname uring = let () = let fname = Sys.argv.(1) in let uring = Uring.create ~queue_depth:1 () in - submit_read_request fname uring; + let slab = Uring.Slab.create Uring.major_alloc_byte_size in + submit_read_request fname uring slab; get_completion_and_print uring diff --git a/tests/urcp_lib.ml b/tests/urcp_lib.ml index 74b48f09..65b49bbe 100644 --- a/tests/urcp_lib.ml +++ b/tests/urcp_lib.ml @@ -26,8 +26,8 @@ let pp ppf {insize;offset;reads;writes;write_left; read_left;_} = insize Int63.pp offset reads writes read_left write_left type iovec = { - all : bytes list; - mutable next : bytes list; + all : Uring.Bstruct.t list; + mutable next : Uring.Bstruct.t list; } type req = { @@ -43,8 +43,8 @@ let pp_req ppf {op; len; off; fileoff; t; _ } = Fmt.pf ppf "[%s fileoff %a len %d off %d] [%a]" (match op with |`R -> "r" |`W -> "w") Int63.pp fileoff len off pp t (* Perform a complete read into bufs. *) -let queue_read uring t len = - let all = [Bytes.create len] in +let queue_read slab uring t len = + let all = [ Uring.Slab.slice slab len ] in let iov = { all; next = all } in let req = { op=`R; iov; fileoff=t.offset; len; off=0; t } in Logs.debug (fun l -> l "queue_read: %a" pp_req req); @@ -75,7 +75,7 @@ let handle_read_completion uring req res = raise (Failure ("unix errorno " ^ (string_of_int n))) | n when n < bytes_to_read -> (* handle short read so new iovec and resubmit *) - req.iov.next <- Uring.Bytes.shiftv req.iov.next n; + req.iov.next <- Uring.Bstruct.shiftv req.iov.next n; req.off <- req.off + n; let r = Uring.readv ~file_offset:(Int63.of_int req.off) uring req.t.infd req.iov.next req in assert(r <> None); @@ -104,7 +104,7 @@ let handle_write_completion uring req res = Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req); | n when n < bytes_to_write -> (* handle short write so new iovec and resubmit *) - req.iov.next <- Uring.Bytes.shiftv req.iov.next n; + req.iov.next <- Uring.Bstruct.shiftv req.iov.next n; req.off <- req.off + n; let r = Uring.writev ~file_offset:req.fileoff uring req.t.infd req.iov.next req in assert(r <> None); @@ -120,7 +120,7 @@ let handle_completion uring req res = |`R -> handle_read_completion uring req res |`W -> handle_write_completion uring req res -let copy_file uring t = +let copy_file uring slab t = (* Create a set of read requests that we will turn into write requests * up until the queue depth *) while t.write_left > 0 || t.read_left > 0 do @@ -128,7 +128,7 @@ let copy_file uring t = if t.read_left > 0 then begin if t.reads + t.writes < (Uring.queue_depth uring) then begin let size = min t.block_size t.read_left in - queue_read uring t size; + queue_read slab uring t size; submit_reads () end end; @@ -161,7 +161,8 @@ let run_cp block_size queue_depth infile outfile () = let t = { block_size; insize; offset=Int63.zero; reads=0; writes=0; write_left=insize; read_left=insize; infd; outfd } in Logs.debug (fun l -> l "starting: %a bs=%d qd=%d" pp t block_size queue_depth); let uring = Uring.create ~queue_depth () in - copy_file uring t; + let slab = Uring.Slab.create Uring.major_alloc_byte_size in + copy_file uring slab t; Unix.close infd; Unix.close outfd; Uring.exit uring;