From fa93e6a9d7304d4e339ce3379dd0e9f15cfff2e8 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Thu, 14 Nov 2024 08:39:25 -0500 Subject: [PATCH] wip: make it clear where callbacks run in switch/timer --- src/sync/dune | 2 +- src/sync/switch.ml | 33 +++++++++++++++++++++++++-------- src/sync/switch.mli | 4 +++- src/thread/background_thread.ml | 2 +- 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/src/sync/dune b/src/sync/dune index be2bac82b..f71b418ae 100644 --- a/src/sync/dune +++ b/src/sync/dune @@ -5,4 +5,4 @@ (synopsis "Sync primitives") (preprocess (pps ppx_deriving.std)) - (libraries containers threads.posix imandrakit)) + (libraries containers moonpool threads.posix imandrakit)) diff --git a/src/sync/switch.ml b/src/sync/switch.ml index 9c6ba3728..1abeda18c 100644 --- a/src/sync/switch.ml +++ b/src/sync/switch.ml @@ -8,7 +8,20 @@ type state = } | Off -type t = { st: state Atomic.t } [@@unboxed] +type run_on = + [ `Sync + | `Runner of Moonpool.Runner.t + ] + +type t = { + st: state Atomic.t; + run_on: run_on; +} + +let run_on (r : run_on) f : unit = + match r with + | `Sync -> f () + | `Runner runner -> Moonpool.Runner.run_async runner f let update_ (type a) (self : t) (f : state -> a * state) : a = let rec loop () = @@ -27,7 +40,7 @@ let on_turn_off (self : t) (f : cb) : unit = | Off -> true, Off | On r -> false, On { r with l = f :: r.l }) in - if must_fire then (* call now *) f () + if must_fire then (* call now *) run_on self.run_on f let with_on_turn_off (self : t) (cb : cb) f = let must_fire, cb_handle = @@ -40,7 +53,7 @@ let with_on_turn_off (self : t) (cb : cb) f = if must_fire then ( (* switch is already off, just call [cb] now and tailcall into [f] *) - cb (); + run_on self.run_on cb; f () ) else ( (* cleanup: remove the callback *) @@ -53,22 +66,26 @@ let with_on_turn_off (self : t) (cb : cb) f = Fun.protect f ~finally:remove_cb ) -let turn_off' ?(trace = true) self = +let turn_off_ ~trace self = (* When calling turn_off' from a signal handler, Trace.message may cause the thread to be killed. For this reason, we provide a way to disable tracing here. *) if trace then Trace.message "switch.turn-off"; match Atomic.exchange self.st Off with | Off -> `Was_off | On { l; m; n = _ } -> - List.iter (fun f -> f ()) l; - Int_map.iter (fun _ f -> f ()) m; + List.iter (fun f -> run_on self.run_on f) l; + Int_map.iter (fun _ f -> run_on self.run_on f) m; `Was_on +let turn_off' ?(trace = true) self = turn_off_ ~trace self + let[@inline] turn_off ?(trace = true) self = ignore (turn_off' self ~trace : [> `Was_on ]) -let create ?parent () : t = - let self = { st = Atomic.make (On { l = []; n = 0; m = Int_map.empty }) } in +let create ~run_on ?parent () : t = + let self = + { run_on; st = Atomic.make (On { l = []; n = 0; m = Int_map.empty }) } + in Option.iter (fun p -> on_turn_off p (fun () -> turn_off self)) parent; self diff --git a/src/sync/switch.mli b/src/sync/switch.mli index 85d07d7d5..a04989e3a 100644 --- a/src/sync/switch.mli +++ b/src/sync/switch.mli @@ -4,8 +4,10 @@ type t [@@deriving show] -val create : ?parent:t -> unit -> t +val create : + run_on:[ `Sync | `Runner of Moonpool.Runner.t ] -> ?parent:t -> unit -> t (** New switch. + @param run_on decides where callbacks will run when the switch is turned off. @param parent inherit from this switch. It means that the result switches off if the parent does, but conversely we can turn the result off without affecting the parent. diff --git a/src/thread/background_thread.ml b/src/thread/background_thread.ml index dc28c1463..6e9605741 100644 --- a/src/thread/background_thread.ml +++ b/src/thread/background_thread.ml @@ -2,7 +2,7 @@ type t = Executor.t let pp out _self = Fmt.string out "" -let start ?(active = Switch.create ()) ?on_exn ~name () : t = +let start ?(active = Switch.create ~run_on:`Sync ()) ?on_exn ~name () : t = let size_name_ = spf "%s.queue-size" name in let around_task = if Trace.enabled () then (