Skip to content

Commit

Permalink
Revert "Switch to poll for polling implementation."
Browse files Browse the repository at this point in the history
This reverts commit 6d54382.
  • Loading branch information
toots committed Feb 4, 2024
1 parent 5d62ba8 commit 903675e
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 47 deletions.
4 changes: 0 additions & 4 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
0.10.0 (unreleased)
======
* Switch to `poll` for polling implementation.

0.9.3 (2023-07-06)
======
* Make sure sure `ready_m` is release last to prevent any exception raised
Expand Down
3 changes: 1 addition & 2 deletions dune-project
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(lang dune 2.7)
(version 0.10.0)
(version 0.9.3)
(name duppy)
(source (github savonet/ocaml-duppy))
(license LGPL-2.1-only)
Expand All @@ -14,6 +14,5 @@
(depends
(ocaml (>= 4.07.0))
dune
poll
re)
)
3 changes: 1 addition & 2 deletions duppy.opam
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
version: "0.10.0"
version: "0.9.3"
synopsis: "Library providing monadic threads"
maintainer: ["The Savonet Team <savonet-users@lists.sourceforge.net>"]
authors: ["Romain Beauxis <toots@rastageeks.org>"]
Expand All @@ -10,7 +10,6 @@ bug-reports: "https://github.com/savonet/ocaml-duppy/issues"
depends: [
"ocaml" {>= "4.07.0"}
"dune" {>= "2.7"}
"poll"
"re"
"odoc" {with-doc}
]
Expand Down
2 changes: 1 addition & 1 deletion src/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(library
(name duppy)
(public_name duppy)
(libraries unix threads poll re)
(libraries unix threads re)
(foreign_stubs
(language c)
(names duppy_stubs))
Expand Down
72 changes: 37 additions & 35 deletions src/duppy.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,25 @@ module Pcre = Re.Pcre

type fd = Unix.file_descr

let poll r w timeout =
let timeout =
match timeout with
| x when x < 0. -> Poll.Timeout.never
| 0. -> Poll.Timeout.immediate
| x -> Poll.Timeout.after (Int64.of_float (x *. 1_000_000_000.))
in
let poll = Poll.create () in
List.iter (fun fd -> Poll.set poll fd Poll.Event.read) r;
List.iter (fun fd -> Poll.set poll fd Poll.Event.write) w;
match Poll.wait poll timeout with
| `Ok ->
let r = ref [] in
let w = ref [] in
Poll.iter_ready poll ~f:(fun fd -> function
| { Poll.Event.readable = true; _ } -> r := fd :: !r
| _ -> w := fd :: !w);
let r = !r in
let w = !w in
(r, w)
| `Timeout -> ([], [])
external poll :
Unix.file_descr array ->
Unix.file_descr array ->
Unix.file_descr array ->
float ->
Unix.file_descr array * Unix.file_descr array * Unix.file_descr array
= "caml_poll"

let poll r w e timeout =
let r = Array.of_list r in
let w = Array.of_list w in
let e = Array.of_list e in
let r, w, e = poll r w e timeout in
(Array.to_list r, Array.to_list w, Array.to_list e)

let select, select_fname =
match Sys.os_type with
| "Unix" -> (poll, "poll")
| _ -> (Unix.select, "select")

(** [remove f l] is like [List.find f l] but also returns the result of removing
* the found element from the original list. *)
Expand All @@ -58,7 +56,7 @@ let remove f l =
(** Events and tasks from the implementation point-of-view:
* we have to hide the 'a parameter. *)

type e = { r : fd list; w : fd list; t : float }
type e = { r : fd list; w : fd list; x : fd list; t : float }

type 'a t = {
timestamp : float;
Expand Down Expand Up @@ -113,7 +111,8 @@ let wake_up s = ignore (Unix.write s.in_pipe (Bytes.of_string "x") 0 1)
module Task = struct
(** Events and tasks from the user's point-of-view. *)

type event = [ `Delay of float | `Write of fd | `Read of fd ]
type event =
[ `Delay of float | `Write of fd | `Read of fd | `Exception of fd ]

type ('a, 'b) task = {
priority : 'a;
Expand All @@ -134,7 +133,8 @@ module Task = struct
(fun e -> function
| `Delay s -> { e with t = min e.t (t0 +. s) }
| `Read s -> { e with r = s :: e.r }
| `Write s -> { e with w = s :: e.w })
| `Write s -> { e with w = s :: e.w }
| `Exception s -> { e with x = s :: e.x })
e task.events);
is_ready =
(fun e ->
Expand All @@ -145,6 +145,7 @@ module Task = struct
| `Delay s when time () > t0 +. s -> true
| `Read s when List.mem s e.r -> true
| `Write s when List.mem s e.w -> true
| `Exception s when List.mem s e.x -> true
| _ -> false)
task.events
in
Expand All @@ -154,7 +155,7 @@ module Task = struct

let add_t s items =
let f item =
match item.is_ready { r = []; w = []; t = 0. } with
match item.is_ready { r = []; w = []; x = []; t = 0. } with
| Some f ->
Mutex.lock s.ready_m;
s.ready <- (item.prio, f) :: s.ready;
Expand Down Expand Up @@ -197,29 +198,30 @@ let process s log =
let e =
List.fold_left
(fun e t -> t.enrich e)
{ r = [s.out_pipe]; w = []; t = infinity }
{ r = [s.out_pipe]; w = []; x = []; t = infinity }
s.tasks
in
(* Poll for an event. *)
let r, w =
let r, w, x =
let rec f () =
try
let timeout = if e.t = infinity then -1. else max 0. (e.t -. time ()) in
log
(Printf.sprintf "Enter poll at %f, timeout %f (%d/%d)." (time ())
timeout (List.length e.r) (List.length e.w));
let r, w = poll e.r e.w timeout in
(Printf.sprintf "Enter %s at %f, timeout %f (%d/%d/%d)." select_fname
(time ()) timeout (List.length e.r) (List.length e.w)
(List.length e.x));
let r, w, x = select e.r e.w e.x timeout in
log
(Printf.sprintf "Left poll at %f (%d/%d)." (time ()) (List.length r)
(List.length w));
(r, w)
(Printf.sprintf "Left %s at %f (%d/%d/%d)." select_fname (time ())
(List.length r) (List.length w) (List.length x));
(r, w, x)
with
| Unix.Unix_error (Unix.EINTR, _, _) ->
(* [EINTR] means that select was interrupted by
* a signal before any of the selected events
* occurred and before the timeout interval expired.
* We catch it and restart.. *)
log (Printf.sprintf "Poll interrupted at %f." (time ()));
log (Printf.sprintf "Select interrupted at %f." (time ()));
f ()
| e ->
(* Uncaught exception:
Expand All @@ -242,7 +244,7 @@ let process s log =
ignore (Unix.read s.out_pipe tmp 0 1024)
in
(* Move ready tasks to the ready list. *)
let e = { r; w; t = 0. } in
let e = { r; w; x; t = 0. } in
Mutex.lock s.tasks_m;
(* Split [tasks] into [r]eady and still [w]aiting. *)
let r, w =
Expand Down
11 changes: 8 additions & 3 deletions src/duppy.mli
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ val create :
unit ->
'a scheduler

(** Internal polling function. Uses the `poll` package internally. *)
(** Internal polling function. Uses `Unix.select` on windows and
`poll` otherwise. *)
val poll :
Unix.file_descr list ->
Unix.file_descr list ->
Unix.file_descr list ->
float ->
Unix.file_descr list * Unix.file_descr list
Unix.file_descr list * Unix.file_descr list * Unix.file_descr list

(** [queue ~log ~priorities s name]
* starts a queue, on the scheduler [s] only processing priorities [p]
Expand Down Expand Up @@ -120,7 +122,10 @@ module Task : sig
* Please not that currently, under win32, all socket used in ocaml-duppy
* are expected to be in blocking mode only! *)
type event =
[ `Delay of float | `Write of Unix.file_descr | `Read of Unix.file_descr ]
[ `Delay of float
| `Write of Unix.file_descr
| `Read of Unix.file_descr
| `Exception of Unix.file_descr ]

(** Schedule a task. *)
val add : 'a scheduler -> ('a, [< event ]) task -> unit
Expand Down
102 changes: 102 additions & 0 deletions src/duppy_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,108 @@
#define Val_fd(fd) Val_int(fd)
#endif

#ifndef WIN32
#include <poll.h>

CAMLprim value caml_poll(value _read, value _write, value _err, value _timeout) {
CAMLparam3(_read, _write, _err);
CAMLlocal4(_pread, _pwrite, _perr, _ret);

struct pollfd *fds;
nfds_t nfds = 0;
nfds_t nread = 0;
nfds_t nwrite = 0;
nfds_t nerr = 0;
int timeout;
size_t last = 0;
int n, ret;

if (Double_val(_timeout) == -1)
timeout = -1;
else
timeout = Double_val(_timeout) * 1000;

nfds += Wosize_val(_read);
nfds += Wosize_val(_write);
nfds += Wosize_val(_err);

fds = calloc(nfds,sizeof(struct pollfd));
if (fds == NULL) caml_raise_out_of_memory();

for (n = 0; n < Wosize_val(_read); n++) {
fds[last+n].fd = Fd_val(Field(_read,n));
fds[last+n].events = POLLIN;
}
last += Wosize_val(_read);

for (n = 0; n < Wosize_val(_write); n++) {
fds[last+n].fd = Fd_val(Field(_write,n));
fds[last+n].events = POLLOUT;
}
last += Wosize_val(_write);

for (n = 0; n < Wosize_val(_err); n++) {
fds[last+n].fd = Fd_val(Field(_err,n));
fds[last+n].events = POLLERR;
}

caml_release_runtime_system();
ret = poll(fds, nfds, timeout);
caml_acquire_runtime_system();

if (ret == -1) {
free(fds);
uerror("poll",Nothing);
}

for (n = 0; n < nfds; n++) {
if (fds[n].revents & POLLIN)
nread++;
if (fds[n].revents & POLLOUT)
nwrite++;
if (fds[n].revents & POLLERR)
nerr++;
}

_pread = caml_alloc_tuple(nread);
nread = 0;

_pwrite = caml_alloc_tuple(nwrite);
nwrite = 0;

_perr = caml_alloc_tuple(nerr);
nerr = 0;

for (n = 0; n < nfds; n++) {
if (fds[n].revents & POLLIN) {
Store_field(_pread, nread, Val_fd(fds[n].fd));
nread++;
}
if (fds[n].revents & POLLOUT) {
Store_field(_pwrite, nwrite, Val_fd(fds[n].fd));
nwrite++;
}
if (fds[n].revents & POLLERR) {
Store_field(_pread, nerr, Val_fd(fds[n].fd));
nerr++;
}
}

free(fds);

_ret = caml_alloc_tuple(3);
Store_field(_ret, 0, _pread);
Store_field(_ret, 1, _pwrite);
Store_field(_ret, 2, _perr);

CAMLreturn(_ret);
}
#else
CAMLprim value caml_poll(value _read, value _write, value _err, value _timeout) {
caml_failwith("caml_poll");
}
#endif

CAMLprim value ocaml_duppy_write_ba(value _fd, value ba, value _ofs, value _len)
{
CAMLparam2(ba,_fd) ;
Expand Down

0 comments on commit 903675e

Please sign in to comment.