Skip to content

Commit 413eafa

Browse files
committed
Parallelize
DEPS=ocaml-duppy#lockfree
1 parent ac3451d commit 413eafa

11 files changed

+101
-74
lines changed

.github/scripts/build-posix.sh

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ echo "::group::Setting up specific dependencies"
2929

3030
cd /tmp/liquidsoap-full/liquidsoap
3131

32-
./.github/scripts/checkout-deps.sh
33-
3432
opam update
35-
opam install -y saturn_lockfree
33+
opam install -y saturn_lockfree moonpool
34+
35+
./.github/scripts/checkout-deps.sh
3636

3737
cd /tmp/liquidsoap-full
3838

dune-project

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
fileutils
5757
menhirLib
5858
(saturn_lockfree (>= 0.4.1))
59+
moonpool
5960
(metadata (>= 0.2.0))
6061
dune-build-info
6162
(liquidsoap-lang (= :version))

liquidsoap-core.opam

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ depends: [
2121
"fileutils"
2222
"menhirLib"
2323
"saturn_lockfree" {>= "0.4.1"}
24+
"moonpool"
2425
"metadata" {>= "0.2.0"}
2526
"dune-build-info"
2627
"liquidsoap-lang" {= version}

src/core/clock.ml

+27-18
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type clock_variable = Source.clock_variable
2424
type source = Source.source
2525
type active_source = Source.active_source
2626

27+
module Future = Moonpool.Fut
2728
include Source.Clock_variables
2829

2930
let create_known s = create_known (s :> Source.clock)
@@ -311,25 +312,33 @@ module MkClock (Time : Liq_time.T) = struct
311312
let todo = on_before_output in
312313
on_before_output <- [];
313314
List.iter (fun fn -> fn ()) todo;
315+
let futures =
316+
List.map
317+
(fun s ->
318+
let exec () =
319+
try
320+
s#output;
321+
None
322+
with exn -> (
323+
let bt = Printexc.get_raw_backtrace () in
324+
match on_error with
325+
| None ->
326+
log#severe "Source %s failed while streaming: %s!\n%s"
327+
s#id (Printexc.to_string exn)
328+
(Printexc.raw_backtrace_to_string bt);
329+
leave ~failed_to_start:true s;
330+
Some s
331+
| Some on_error ->
332+
on_error exn bt;
333+
None)
334+
in
335+
Future.spawn ~on:Clock_ready.clock_pool exec)
336+
active
337+
in
314338
let error =
315-
List.fold_left
316-
(fun e s ->
317-
try
318-
s#output;
319-
e
320-
with exn -> (
321-
let bt = Printexc.get_raw_backtrace () in
322-
match on_error with
323-
| None ->
324-
log#severe "Source %s failed while streaming: %s!\n%s"
325-
s#id (Printexc.to_string exn)
326-
(Printexc.raw_backtrace_to_string bt);
327-
leave ~failed_to_start:true s;
328-
s :: e
329-
| Some on_error ->
330-
on_error exn bt;
331-
e))
332-
[] active
339+
List.filter_map
340+
(fun x -> x)
341+
(Future.wait_block_exn (Future.join_list futures))
333342
in
334343
let todo = on_output in
335344
on_output <- [];

src/core/clock_ready.await.ml

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
module Future = Moonpool.Fut
2+
module Pool = Moonpool.Ws_pool
3+
4+
type 'a t = 'a Future.t
5+
6+
let clock_pool = Pool.create ()
7+
let make fn = Future.spawn ~on:clock_pool fn
8+
let process fut = Future.await fut

src/core/clock_ready.blocking.ml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
type 'a t = unit -> 'a
2+
3+
let clock_pool = Moonpool.Ws_pool.create ()
4+
let make fn = fn
5+
let process fn = fn ()

src/core/clock_ready.mli

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
type 'a t
2+
3+
val make : (unit -> 'a) -> 'a t
4+
val process : 'a t -> 'a
5+
val clock_pool : Moonpool.Ws_pool.t

src/core/dune

+14
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,18 @@
1313
(action
1414
(copy liquidsoap_paths.%{env:LIQUIDSOAP_BUILD_TARGET=default}.ml %{target})))
1515

16+
(rule
17+
(target clock_ready.ml)
18+
(enabled_if (>= %{ocaml_version} 5))
19+
(action
20+
(copy clock_ready.await.ml %{target})))
21+
22+
(rule
23+
(target clock_ready.ml)
24+
(enabled_if (< %{ocaml_version} 5))
25+
(action
26+
(copy clock_ready.blocking.ml %{target})))
27+
1628
(library
1729
(name liquidsoap_core)
1830
(preprocess
@@ -26,6 +38,7 @@
2638
liquidsoap-lang
2739
liquidsoap-lang.console
2840
menhirLib
41+
moonpool
2942
camomile.lib
3043
curl
3144
cry
@@ -68,6 +81,7 @@
6881
chord
6982
clip
7083
clock
84+
clock_ready
7185
comb
7286
compand
7387
compress

src/core/source.ml

+24-25
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type source_t = [ `Fallible | `Infallible ]
5454
exception Unavailable
5555

5656
type streaming_state =
57-
[ `Unavailable | `Ready of unit -> unit | `Done of Frame.t ]
57+
[ `Unavailable | `Ready of Frame.t Clock_ready.t | `Done of Frame.t ]
5858

5959
(** {1 Proto clocks}
6060
@@ -546,13 +546,15 @@ class virtual operator ?pos ?(name = "src") sources =
546546

547547
method virtual private can_generate_frame : bool
548548
method virtual private generate_frame : Frame.t
549-
val mutable streaming_state : streaming_state = `Unavailable
549+
val streaming_state : streaming_state Atomic.t = Atomic.make `Unavailable
550550

551551
method is_ready =
552552
if not content_type_computation_allowed then false
553553
else (
554554
self#has_ticked;
555-
match streaming_state with `Ready _ | `Done _ -> true | _ -> false)
555+
match Atomic.get streaming_state with
556+
| `Ready _ | `Done _ -> true
557+
| _ -> false)
556558

557559
val mutable _cache = None
558560

@@ -569,28 +571,25 @@ class virtual operator ?pos ?(name = "src") sources =
569571
let cache_pos = Frame.position cache in
570572
let size = Lazy.force Frame.size in
571573
if cache_pos > 0 || self#can_generate_frame then
572-
streaming_state <-
573-
`Ready
574-
(fun () ->
575-
let buf =
576-
if cache_pos < size then
577-
Frame.append cache self#instrumented_generate_frame
578-
else cache
579-
in
580-
let buf_pos = Frame.position buf in
581-
let buf =
582-
if size < buf_pos then (
583-
_cache <-
584-
Some
585-
(Frame.chunk ~start:size ~stop:(buf_pos - size) buf);
586-
Frame.slice buf size)
587-
else buf
588-
in
589-
streaming_state <- `Done buf)
590-
else streaming_state <- `Unavailable);
574+
Atomic.set streaming_state
575+
(`Ready
576+
(Clock_ready.make (fun () ->
577+
let buf =
578+
if cache_pos < size then
579+
Frame.append cache self#instrumented_generate_frame
580+
else cache
581+
in
582+
let buf_pos = Frame.position buf in
583+
if size < buf_pos then (
584+
_cache <-
585+
Some
586+
(Frame.chunk ~start:size ~stop:(buf_pos - size) buf);
587+
Frame.slice buf size)
588+
else buf)))
589+
else Atomic.set streaming_state `Unavailable);
591590

592591
self#on_after_output (fun () ->
593-
match (streaming_state, consumed) with
592+
match (Atomic.get streaming_state, consumed) with
594593
| `Done buf, 0 -> _cache <- Some buf
595594
| `Done buf, n ->
596595
let pos = Frame.position buf in
@@ -601,12 +600,12 @@ class virtual operator ?pos ?(name = "src") sources =
601600

602601
method peek_frame =
603602
self#has_ticked;
604-
match streaming_state with
603+
match Atomic.get streaming_state with
605604
| `Unavailable ->
606605
log#critical "source called while not ready!";
607606
raise Unavailable
608607
| `Ready fn ->
609-
fn ();
608+
Atomic.set streaming_state (`Done (Clock_ready.process fn));
610609
self#peek_frame
611610
| `Done data -> data
612611

src/core/source.mli

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type source_t = [ `Fallible | `Infallible ]
5050
exception Unavailable
5151

5252
type streaming_state =
53-
[ `Unavailable | `Ready of unit -> unit | `Done of Frame.t ]
53+
[ `Unavailable | `Ready of Frame.t Clock_ready.t | `Done of Frame.t ]
5454

5555
(** Instrumentation. *)
5656

src/core/tools/tutils.ml

+12-27
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,9 @@ let rec error_handler ~bt exn =
260260
error_handler ~bt exn
261261

262262
let scheduler : priority Duppy.scheduler =
263-
Duppy.create
263+
let log = Log.make ["scheduler"] in
264+
let log m = if scheduler_log#get then log#info "%s" m in
265+
Duppy.create ~log
264266
~on_error:(fun exn raw_bt ->
265267
let bt = Printexc.raw_backtrace_to_string raw_bt in
266268
if not (error_handler ~bt exn) then
@@ -273,38 +275,21 @@ let () =
273275
Duppy.stop scheduler;
274276
log#important "Scheduler shut down.")
275277

276-
let scheduler_log n =
277-
if scheduler_log#get then (
278-
let log = Log.make [n] in
279-
fun m -> log#info "%s" m)
280-
else fun _ -> ()
281-
282-
let new_queue ?priorities ~name () =
283-
let qlog = scheduler_log name in
284-
let queue () =
285-
match priorities with
286-
| None -> Duppy.queue scheduler ~log:qlog name
287-
| Some priorities -> Duppy.queue scheduler ~log:qlog ~priorities name
288-
in
289-
ignore (create ~queue:true queue () name)
278+
let new_pool ?(priorities = fun _ -> true) ~size ~name () =
279+
Duppy.pool scheduler ~priorities ~size name
290280
291281
let create f x name = create ~queue:false f x name
292282
let join_all () = join_all ~set:all ()
293283
294284
let start () =
295285
if Atomic.compare_and_set state `Idle `Starting then (
296-
for i = 1 to generic_queues#get do
297-
let name = Printf.sprintf "generic queue #%d" i in
298-
new_queue ~name ()
299-
done;
300-
for i = 1 to fast_queues#get do
301-
let name = Printf.sprintf "fast queue #%d" i in
302-
new_queue ~name ~priorities:(fun x -> x = `Maybe_blocking) ()
303-
done;
304-
for i = 1 to non_blocking_queues#get do
305-
let name = Printf.sprintf "non-blocking queue #%d" i in
306-
new_queue ~priorities:(fun x -> x = `Non_blocking) ~name ()
307-
done)
286+
new_pool ~name:"generic pool" ~size:generic_queues#get ();
287+
new_pool ~name:"fast pool"
288+
~priorities:(fun x -> x = `Maybe_blocking)
289+
~size:fast_queues#get ();
290+
new_pool ~name:"non-blocking pool"
291+
~priorities:(fun x -> x = `Non_blocking)
292+
~size:non_blocking_queues#get ())
308293
309294
(** Waits for [f()] to become true on condition [c]. *)
310295
let wait c m f =

0 commit comments

Comments
 (0)