Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duppy lockfree #3611

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/scripts/build-posix.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ cd /tmp/liquidsoap-full/liquidsoap
./.github/scripts/checkout-deps.sh

opam update
opam install -y saturn_lockfree magic-mime
opam install -y saturn_lockfree magic-mime moonpool

cd /tmp/liquidsoap-full

Expand Down
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
fileutils
menhirLib
(saturn_lockfree (>= 0.4.1))
moonpool
(metadata (>= 0.2.0))
magic-mime
dune-build-info
Expand Down
1 change: 1 addition & 0 deletions liquidsoap-core.opam
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ depends: [
"fileutils"
"menhirLib"
"saturn_lockfree" {>= "0.4.1"}
"moonpool"
"metadata" {>= "0.2.0"}
"magic-mime"
"dune-build-info"
Expand Down
45 changes: 27 additions & 18 deletions src/core/clock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type clock_variable = Source.clock_variable
type source = Source.source
type active_source = Source.active_source

module Future = Moonpool.Fut
include Source.Clock_variables

let create_known s = create_known (s :> Source.clock)
Expand Down Expand Up @@ -311,25 +312,33 @@ module MkClock (Time : Liq_time.T) = struct
let todo = on_before_output in
on_before_output <- [];
List.iter (fun fn -> fn ()) todo;
let futures =
List.map
(fun s ->
let exec () =
try
s#output;
None
with exn -> (
let bt = Printexc.get_raw_backtrace () in
match on_error with
| None ->
log#severe "Source %s failed while streaming: %s!\n%s"
s#id (Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt);
leave ~failed_to_start:true s;
Some s
| Some on_error ->
on_error exn bt;
None)
in
Future.spawn ~on:Clock_ready.clock_pool exec)
active
in
let error =
List.fold_left
(fun e s ->
try
s#output;
e
with exn -> (
let bt = Printexc.get_raw_backtrace () in
match on_error with
| None ->
log#severe "Source %s failed while streaming: %s!\n%s"
s#id (Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt);
leave ~failed_to_start:true s;
s :: e
| Some on_error ->
on_error exn bt;
e))
[] active
List.filter_map
(fun x -> x)
(Future.wait_block_exn (Future.join_list futures))
in
let todo = on_output in
on_output <- [];
Expand Down
8 changes: 8 additions & 0 deletions src/core/clock_ready.await.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module Future = Moonpool.Fut
module Pool = Moonpool.Ws_pool

type 'a t = 'a Future.t

let clock_pool = Pool.create ()
let make fn = Future.spawn ~on:clock_pool fn
let process fut = Future.await fut
5 changes: 5 additions & 0 deletions src/core/clock_ready.blocking.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type 'a t = unit -> 'a

let clock_pool = Moonpool.Ws_pool.create ()
let make fn = fn
let process fn = fn ()
5 changes: 5 additions & 0 deletions src/core/clock_ready.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type 'a t

val make : (unit -> 'a) -> 'a t
val process : 'a t -> 'a
val clock_pool : Moonpool.Ws_pool.t
14 changes: 14 additions & 0 deletions src/core/dune
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@
(action
(copy liquidsoap_paths.%{env:LIQUIDSOAP_BUILD_TARGET=default}.ml %{target})))

(rule
(target clock_ready.ml)
(enabled_if (>= %{ocaml_version} 5))
(action
(copy clock_ready.await.ml %{target})))

(rule
(target clock_ready.ml)
(enabled_if (< %{ocaml_version} 5))
(action
(copy clock_ready.blocking.ml %{target})))

(library
(name liquidsoap_core)
(preprocess
Expand All @@ -26,6 +38,7 @@
liquidsoap-lang
liquidsoap-lang.console
menhirLib
moonpool
camomile.lib
curl
cry
Expand Down Expand Up @@ -64,6 +77,7 @@
chord
clip
clock
clock_ready
comb
compand
compress
Expand Down
49 changes: 24 additions & 25 deletions src/core/source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type source_t = [ `Fallible | `Infallible ]
exception Unavailable

type streaming_state =
[ `Unavailable | `Ready of unit -> unit | `Done of Frame.t ]
[ `Unavailable | `Ready of Frame.t Clock_ready.t | `Done of Frame.t ]

(** {1 Proto clocks}

Expand Down Expand Up @@ -546,13 +546,15 @@ class virtual operator ?pos ?(name = "src") sources =

method virtual private can_generate_frame : bool
method virtual private generate_frame : Frame.t
val mutable streaming_state : streaming_state = `Unavailable
val streaming_state : streaming_state Atomic.t = Atomic.make `Unavailable

method is_ready =
if not content_type_computation_allowed then false
else (
self#has_ticked;
match streaming_state with `Ready _ | `Done _ -> true | _ -> false)
match Atomic.get streaming_state with
| `Ready _ | `Done _ -> true
| _ -> false)

val mutable _cache = None

Expand All @@ -569,28 +571,25 @@ class virtual operator ?pos ?(name = "src") sources =
let cache_pos = Frame.position cache in
let size = Lazy.force Frame.size in
if cache_pos > 0 || self#can_generate_frame then
streaming_state <-
`Ready
(fun () ->
let buf =
if cache_pos < size then
Frame.append cache self#instrumented_generate_frame
else cache
in
let buf_pos = Frame.position buf in
let buf =
if size < buf_pos then (
_cache <-
Some
(Frame.chunk ~start:size ~stop:(buf_pos - size) buf);
Frame.slice buf size)
else buf
in
streaming_state <- `Done buf)
else streaming_state <- `Unavailable);
Atomic.set streaming_state
(`Ready
(Clock_ready.make (fun () ->
let buf =
if cache_pos < size then
Frame.append cache self#instrumented_generate_frame
else cache
in
let buf_pos = Frame.position buf in
if size < buf_pos then (
_cache <-
Some
(Frame.chunk ~start:size ~stop:(buf_pos - size) buf);
Frame.slice buf size)
else buf)))
else Atomic.set streaming_state `Unavailable);

self#on_after_output (fun () ->
match (streaming_state, consumed) with
match (Atomic.get streaming_state, consumed) with
| `Done buf, 0 -> _cache <- Some buf
| `Done buf, n ->
let pos = Frame.position buf in
Expand All @@ -601,12 +600,12 @@ class virtual operator ?pos ?(name = "src") sources =

method peek_frame =
self#has_ticked;
match streaming_state with
match Atomic.get streaming_state with
| `Unavailable ->
log#critical "source called while not ready!";
raise Unavailable
| `Ready fn ->
fn ();
Atomic.set streaming_state (`Done (Clock_ready.process fn));
self#peek_frame
| `Done data -> data

Expand Down
2 changes: 1 addition & 1 deletion src/core/source.mli
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type source_t = [ `Fallible | `Infallible ]
exception Unavailable

type streaming_state =
[ `Unavailable | `Ready of unit -> unit | `Done of Frame.t ]
[ `Unavailable | `Ready of Frame.t Clock_ready.t | `Done of Frame.t ]

(** Instrumentation. *)

Expand Down
39 changes: 12 additions & 27 deletions src/core/tools/tutils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ let rec error_handler ~bt exn =
error_handler ~bt exn

let scheduler : priority Duppy.scheduler =
Duppy.create
let log = Log.make ["scheduler"] in
let log m = if scheduler_log#get then log#info "%s" m in
Duppy.create ~log
~on_error:(fun exn raw_bt ->
let bt = Printexc.raw_backtrace_to_string raw_bt in
if not (error_handler ~bt exn) then
Expand All @@ -273,38 +275,21 @@ let () =
Duppy.stop scheduler;
log#important "Scheduler shut down.")

let scheduler_log n =
if scheduler_log#get then (
let log = Log.make [n] in
fun m -> log#info "%s" m)
else fun _ -> ()

let new_queue ?priorities ~name () =
let qlog = scheduler_log name in
let queue () =
match priorities with
| None -> Duppy.queue scheduler ~log:qlog name
| Some priorities -> Duppy.queue scheduler ~log:qlog ~priorities name
in
ignore (create ~queue:true queue () name)
let new_pool ?(priorities = fun _ -> true) ~size ~name () =
Duppy.pool scheduler ~priorities ~size name

let create f x name = create ~queue:false f x name
let join_all () = join_all ~set:all ()

let start () =
if Atomic.compare_and_set state `Idle `Starting then (
for i = 1 to generic_queues#get do
let name = Printf.sprintf "generic queue #%d" i in
new_queue ~name ()
done;
for i = 1 to fast_queues#get do
let name = Printf.sprintf "fast queue #%d" i in
new_queue ~name ~priorities:(fun x -> x = `Maybe_blocking) ()
done;
for i = 1 to non_blocking_queues#get do
let name = Printf.sprintf "non-blocking queue #%d" i in
new_queue ~priorities:(fun x -> x = `Non_blocking) ~name ()
done)
new_pool ~name:"generic pool" ~size:generic_queues#get ();
new_pool ~name:"fast pool"
~priorities:(fun x -> x = `Maybe_blocking)
~size:fast_queues#get ();
new_pool ~name:"non-blocking pool"
~priorities:(fun x -> x = `Non_blocking)
~size:non_blocking_queues#get ())

(** Waits for [f()] to become true on condition [c]. *)
let wait c m f =
Expand Down
Loading