Skip to content

Commit

Permalink
Parallelize
Browse files Browse the repository at this point in the history
DEPS=ocaml-duppy#lockfree
  • Loading branch information
toots committed Jan 30, 2024
1 parent 0e72a00 commit 0a19a40
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .github/scripts/build-posix.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ cd /tmp/liquidsoap-full/liquidsoap
./.github/scripts/checkout-deps.sh

opam update
opam install -y saturn_lockfree magic-mime
opam install -y saturn_lockfree magic-mime moonpool

cd /tmp/liquidsoap-full

Expand Down
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
fileutils
menhirLib
(saturn_lockfree (>= 0.4.1))
moonpool
(metadata (>= 0.2.0))
magic-mime
dune-build-info
Expand Down
1 change: 1 addition & 0 deletions liquidsoap-core.opam
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ depends: [
"fileutils"
"menhirLib"
"saturn_lockfree" {>= "0.4.1"}
"moonpool"
"metadata" {>= "0.2.0"}
"magic-mime"
"dune-build-info"
Expand Down
45 changes: 27 additions & 18 deletions src/core/clock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type clock_variable = Source.clock_variable
type source = Source.source
type active_source = Source.active_source

module Future = Moonpool.Fut
include Source.Clock_variables

let create_known s = create_known (s :> Source.clock)
Expand Down Expand Up @@ -311,25 +312,33 @@ module MkClock (Time : Liq_time.T) = struct
let todo = on_before_output in
on_before_output <- [];
List.iter (fun fn -> fn ()) todo;
let futures =
List.map
(fun s ->
let exec () =
try
s#output;
None
with exn -> (
let bt = Printexc.get_raw_backtrace () in
match on_error with
| None ->
log#severe "Source %s failed while streaming: %s!\n%s"
s#id (Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt);
leave ~failed_to_start:true s;
Some s
| Some on_error ->
on_error exn bt;
None)
in
Future.spawn ~on:Clock_ready.clock_pool exec)
active
in
let error =
List.fold_left
(fun e s ->
try
s#output;
e
with exn -> (
let bt = Printexc.get_raw_backtrace () in
match on_error with
| None ->
log#severe "Source %s failed while streaming: %s!\n%s"
s#id (Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt);
leave ~failed_to_start:true s;
s :: e
| Some on_error ->
on_error exn bt;
e))
[] active
List.filter_map
(fun x -> x)
(Future.wait_block_exn (Future.join_list futures))
in
let todo = on_output in
on_output <- [];
Expand Down
8 changes: 8 additions & 0 deletions src/core/clock_ready.await.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module Future = Moonpool.Fut
module Pool = Moonpool.Ws_pool

type 'a t = 'a Future.t

let clock_pool = Pool.create ()
let make fn = Future.spawn ~on:clock_pool fn
let process fut = Future.await fut
5 changes: 5 additions & 0 deletions src/core/clock_ready.blocking.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type 'a t = unit -> 'a

let clock_pool = Moonpool.Ws_pool.create ()
let make fn = fn
let process fn = fn ()
5 changes: 5 additions & 0 deletions src/core/clock_ready.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type 'a t

val make : (unit -> 'a) -> 'a t
val process : 'a t -> 'a
val clock_pool : Moonpool.Ws_pool.t
14 changes: 14 additions & 0 deletions src/core/dune
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@
(action
(copy liquidsoap_paths.%{env:LIQUIDSOAP_BUILD_TARGET=default}.ml %{target})))

(rule
(target clock_ready.ml)
(enabled_if (>= %{ocaml_version} 5))
(action
(copy clock_ready.await.ml %{target})))

(rule
(target clock_ready.ml)
(enabled_if (< %{ocaml_version} 5))
(action
(copy clock_ready.blocking.ml %{target})))

(library
(name liquidsoap_core)
(preprocess
Expand All @@ -26,6 +38,7 @@
liquidsoap-lang
liquidsoap-lang.console
menhirLib
moonpool
camomile.lib
curl
cry
Expand Down Expand Up @@ -64,6 +77,7 @@
chord
clip
clock
clock_ready
comb
compand
compress
Expand Down
49 changes: 24 additions & 25 deletions src/core/source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type source_t = [ `Fallible | `Infallible ]
exception Unavailable

type streaming_state =
[ `Unavailable | `Ready of unit -> unit | `Done of Frame.t ]
[ `Unavailable | `Ready of Frame.t Clock_ready.t | `Done of Frame.t ]

(** {1 Proto clocks}
Expand Down Expand Up @@ -546,13 +546,15 @@ class virtual operator ?pos ?(name = "src") sources =

method virtual private can_generate_frame : bool
method virtual private generate_frame : Frame.t
val mutable streaming_state : streaming_state = `Unavailable
val streaming_state : streaming_state Atomic.t = Atomic.make `Unavailable

method is_ready =
if not content_type_computation_allowed then false
else (
self#has_ticked;
match streaming_state with `Ready _ | `Done _ -> true | _ -> false)
match Atomic.get streaming_state with
| `Ready _ | `Done _ -> true
| _ -> false)

val mutable _cache = None

Expand All @@ -569,28 +571,25 @@ class virtual operator ?pos ?(name = "src") sources =
let cache_pos = Frame.position cache in
let size = Lazy.force Frame.size in
if cache_pos > 0 || self#can_generate_frame then
streaming_state <-
`Ready
(fun () ->
let buf =
if cache_pos < size then
Frame.append cache self#instrumented_generate_frame
else cache
in
let buf_pos = Frame.position buf in
let buf =
if size < buf_pos then (
_cache <-
Some
(Frame.chunk ~start:size ~stop:(buf_pos - size) buf);
Frame.slice buf size)
else buf
in
streaming_state <- `Done buf)
else streaming_state <- `Unavailable);
Atomic.set streaming_state
(`Ready
(Clock_ready.make (fun () ->
let buf =
if cache_pos < size then
Frame.append cache self#instrumented_generate_frame
else cache
in
let buf_pos = Frame.position buf in
if size < buf_pos then (
_cache <-
Some
(Frame.chunk ~start:size ~stop:(buf_pos - size) buf);
Frame.slice buf size)
else buf)))
else Atomic.set streaming_state `Unavailable);

self#on_after_output (fun () ->
match (streaming_state, consumed) with
match (Atomic.get streaming_state, consumed) with
| `Done buf, 0 -> _cache <- Some buf
| `Done buf, n ->
let pos = Frame.position buf in
Expand All @@ -601,12 +600,12 @@ class virtual operator ?pos ?(name = "src") sources =

method peek_frame =
self#has_ticked;
match streaming_state with
match Atomic.get streaming_state with
| `Unavailable ->
log#critical "source called while not ready!";
raise Unavailable
| `Ready fn ->
fn ();
Atomic.set streaming_state (`Done (Clock_ready.process fn));
self#peek_frame
| `Done data -> data

Expand Down
2 changes: 1 addition & 1 deletion src/core/source.mli
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type source_t = [ `Fallible | `Infallible ]
exception Unavailable

type streaming_state =
[ `Unavailable | `Ready of unit -> unit | `Done of Frame.t ]
[ `Unavailable | `Ready of Frame.t Clock_ready.t | `Done of Frame.t ]

(** Instrumentation. *)

Expand Down
39 changes: 12 additions & 27 deletions src/core/tools/tutils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ let rec error_handler ~bt exn =
error_handler ~bt exn

let scheduler : priority Duppy.scheduler =
Duppy.create
let log = Log.make ["scheduler"] in
let log m = if scheduler_log#get then log#info "%s" m in
Duppy.create ~log
~on_error:(fun exn raw_bt ->
let bt = Printexc.raw_backtrace_to_string raw_bt in
if not (error_handler ~bt exn) then
Expand All @@ -273,38 +275,21 @@ let () =
Duppy.stop scheduler;
log#important "Scheduler shut down.")

let scheduler_log n =
if scheduler_log#get then (
let log = Log.make [n] in
fun m -> log#info "%s" m)
else fun _ -> ()

let new_queue ?priorities ~name () =
let qlog = scheduler_log name in
let queue () =
match priorities with
| None -> Duppy.queue scheduler ~log:qlog name
| Some priorities -> Duppy.queue scheduler ~log:qlog ~priorities name
in
ignore (create ~queue:true queue () name)
let new_pool ?(priorities = fun _ -> true) ~size ~name () =
Duppy.pool scheduler ~priorities ~size name
let create f x name = create ~queue:false f x name
let join_all () = join_all ~set:all ()
let start () =
if Atomic.compare_and_set state `Idle `Starting then (
for i = 1 to generic_queues#get do
let name = Printf.sprintf "generic queue #%d" i in
new_queue ~name ()
done;
for i = 1 to fast_queues#get do
let name = Printf.sprintf "fast queue #%d" i in
new_queue ~name ~priorities:(fun x -> x = `Maybe_blocking) ()
done;
for i = 1 to non_blocking_queues#get do
let name = Printf.sprintf "non-blocking queue #%d" i in
new_queue ~priorities:(fun x -> x = `Non_blocking) ~name ()
done)
new_pool ~name:"generic pool" ~size:generic_queues#get ();
new_pool ~name:"fast pool"
~priorities:(fun x -> x = `Maybe_blocking)
~size:fast_queues#get ();
new_pool ~name:"non-blocking pool"
~priorities:(fun x -> x = `Non_blocking)
~size:non_blocking_queues#get ())
(** Waits for [f()] to become true on condition [c]. *)
let wait c m f =
Expand Down

0 comments on commit 0a19a40

Please sign in to comment.