Skip to content

Commit

Permalink
Remove dynamic activations. (#3518)
Browse files Browse the repository at this point in the history
  • Loading branch information
toots authored Nov 10, 2023
1 parent 4cfb499 commit a531bf4
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 65 deletions.
16 changes: 6 additions & 10 deletions src/core/operators/cross.ml
Original file line number Diff line number Diff line change
Expand Up @@ -116,32 +116,28 @@ class cross val_source ~duration_getter ~override_duration ~persist_override

method private prepare_transition_source s =
let s = (s :> source) in
s#get_ready ~dynamic:true [(self :> source)];
s#get_ready [(self :> source)];
Clock.unify ~pos:self#pos source#clock s#clock;
transition_source <- Some s

method cleanup_transition_source =
match transition_source with
| None -> ()
| Some s ->
s#leave ~dynamic:true (self :> source);
s#leave (self :> source);
transition_source <- None

method! private wake_up a =
self#reset_analysis;
super#wake_up a;
source#get_ready ~dynamic:true [(self :> source)];
source#get_ready [(self :> source)];
Lang.iter_sources
(fun s -> s#get_ready ~dynamic:true [(self :> source)])
transition
source#get_ready [(self :> source)];
Lang.iter_sources (fun s -> s#get_ready [(self :> source)]) transition

method! private sleep =
source#leave (self :> source);
s#leave ~dynamic:true (self :> source);
Lang.iter_sources
(fun s -> s#leave ~dynamic:true (self :> source))
transition;
s#leave (self :> source);
Lang.iter_sources (fun s -> s#leave (self :> source)) transition;
self#cleanup_transition_source

(* in main time *)
Expand Down
6 changes: 2 additions & 4 deletions src/core/operators/dyn_op.ml
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,12 @@ class dyn ~init ~track_sensitive ~infallible ~resurection_time f =
Lang.iter_sources
(fun s ->
Typing.(s#frame_type <: self#frame_type);
s#get_ready ~dynamic:true activation)
s#get_ready activation)
f;
self#select

method! private sleep =
Lang.iter_sources
(fun s -> s#leave ~dynamic:true (self :> Source.source))
f;
Lang.iter_sources (fun s -> s#leave (self :> Source.source)) f;
self#unregister_source ~already_locked:false

method private _is_ready ?frame () =
Expand Down
12 changes: 4 additions & 8 deletions src/core/operators/switch.ml
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,15 @@ class virtual switch ~name ~override_meta ~transition_length
activation <- (self :> source) :: activator;
List.iter
(fun { transition; source = s; _ } ->
s#get_ready ~dynamic:true activation;
Lang.iter_sources
(fun s -> s#get_ready ~dynamic:true activation)
transition)
s#get_ready activation;
Lang.iter_sources (fun s -> s#get_ready activation) transition)
cases
method! private sleep =
List.iter
(fun { transition; source = s; _ } ->
s#leave ~dynamic:true (self :> source);
Lang.iter_sources
(fun s -> s#leave ~dynamic:true (self :> source))
transition)
s#leave (self :> source);
Lang.iter_sources (fun s -> s#leave (self :> source)) transition)
cases;
if self#is_selected_generated then
(Option.get selected).effective_source#leave (self :> source)
Expand Down
51 changes: 13 additions & 38 deletions src/core/source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -446,28 +446,15 @@ class virtual operator ?pos ?(name = "src") sources =
consistency of the delivered stream chunk.
Before that a source P accesses another source S it must activate it. The
activation can be static, or dynamic. A static activation means that
P may pull data from S at any time. A dynamic activation means that P
won't use S directly but may at some point build a source which will
access S. This dynamic creation may occur in the middle of an output
round, which is why S needs to know in advance, since in some cases it
might have to enter caching mode from the beginning of the round in case
the dynamic activation occurs.
An activation is identified by the path to the source which required it.
It is possible that two identical activations are done, and they should
not be treated as a single one.
In short, a source can avoid caching when: it has only one static
activation and all its dynamic activations are sub-paths of the static
one. When there is no static activation, there cannot be any access.
It is assumed that all streaming is done in one thread for a given clock,
so the activation management API is not thread-safe. *)

val mutable caching = false
val mutable dynamic_activations : operator list list = []
val mutable static_activations : operator list list = []
val mutable activations : operator list list = []

method private update_caching_mode =
let string_of activations =
Expand All @@ -476,23 +463,14 @@ class virtual operator ?pos ?(name = "src") sources =
(fun l -> String.concat ":" (List.map (fun s -> s#id) l))
activations)
in
self#log#debug "Activations changed: static=[%s], dynamic=[%s]."
(string_of static_activations)
(string_of dynamic_activations);
self#log#debug "Activations changed: static=[%s]." (string_of activations);

(* Decide whether caching mode is needed, and why *)
match
if self#is_active then Some "active source"
else (
match static_activations with
| [] -> None
| [s] ->
if
List.exists
(fun d -> not (Utils.prefix (List.rev d) (List.rev s)))
dynamic_activations
then Some "possible dynamic activation"
else None
match activations with
| [] | _ :: [] -> None
| _ -> Some "two static activations")
with
| None ->
Expand Down Expand Up @@ -524,16 +502,15 @@ class virtual operator ?pos ?(name = "src") sources =
The current implementation makes it dangerous to call #get_ready from
another thread than the Root one, as interleaving with #get is
forbidden. *)
method get_ready ?(dynamic = false) (activation : operator list) =
method get_ready (activation : operator list) =
self#content_type_computation_allowed;
if log == source_log then self#create_log;
if static_activations = [] && dynamic_activations = [] then (
if activations = [] then (
source_log#info "Source %s gets up with content type: %s." id
(Frame.string_of_content_type self#content_type);
self#wake_up activation;
List.iter (fun fn -> fn ()) on_wake_up);
if dynamic then dynamic_activations <- activation :: dynamic_activations
else static_activations <- activation :: static_activations;
activations <- activation :: activations;
self#update_caching_mode

val mutable on_sleep = []
Expand All @@ -546,7 +523,7 @@ class virtual operator ?pos ?(name = "src") sources =
The current implementation makes it dangerous to call #leave from
another thread than the Root one, as interleaving with #get is
forbidden. *)
method leave ?(failed_to_start = true) ?(dynamic = false) src =
method leave ?(failed_to_start = true) src =
let rec remove acc = function
| [] when failed_to_start -> []
| [] ->
Expand All @@ -555,28 +532,26 @@ class virtual operator ?pos ?(name = "src") sources =
| (s :: _) :: tl when s = src -> List.rev_append acc tl
| h :: tl -> remove (h :: acc) tl
in
if dynamic then dynamic_activations <- remove [] dynamic_activations
else static_activations <- remove [] static_activations;
activations <- remove [] activations;
self#update_caching_mode;
if static_activations = [] && dynamic_activations = [] then (
if activations = [] then (
source_log#info "Source %s gets down." id;
self#sleep;
List.iter (fun fn -> try fn () with _ -> ()) on_sleep)

method is_up = static_activations <> [] || dynamic_activations <> []
method is_up = activations <> []

(** Two methods called for initialization and shutdown of the source *)
method private wake_up activation =
self#log#debug "Clock is %s." (variable_to_string self#clock);
self#log#important "Content type is %s."
(Frame.string_of_content_type self#content_type);
let activation = (self :> operator) :: activation in
List.iter (fun s -> s#get_ready ?dynamic:None activation) sources
List.iter (fun s -> s#get_ready activation) sources

method private sleep =
List.iter
(fun s ->
s#leave ?failed_to_start:None ?dynamic:None (self :> operator))
(fun s -> s#leave ?failed_to_start:None (self :> operator))
sources

(** Streaming *)
Expand Down
4 changes: 2 additions & 2 deletions src/core/source.mli
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class virtual source :
method private set_clock : unit

(** The operator says to the source that he will ask it frames. It may be called multiple times. *)
method get_ready : ?dynamic:bool -> source list -> unit
method get_ready : source list -> unit

(** Register a callback when wake_up is called. *)
method on_wake_up : (unit -> unit) -> unit
Expand All @@ -137,7 +137,7 @@ class virtual source :
method private wake_up : source list -> unit

(** Opposite of [get_ready] : the operator no longer needs the source. it may be called multiple times. *)
method leave : ?failed_to_start:bool -> ?dynamic:bool -> source -> unit
method leave : ?failed_to_start:bool -> source -> unit

(** Register a callback when sleep is called. *)
method on_sleep : (unit -> unit) -> unit
Expand Down
5 changes: 2 additions & 3 deletions src/core/tools/producer_consumer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,13 @@ class producer ?pos ?create_known_clock ~check_self_sync ~consumers ~name () =
List.iter
(fun c ->
c#set_producer_buffer self#buffer;
c#get_ready ?dynamic:None [(self :> Source.source)])
c#get_ready [(self :> Source.source)])
consumers

method! sleep =
super#sleep;
List.iter
(fun c ->
c#leave ?failed_to_start:None ?dynamic:None (self :> Source.source))
(fun c -> c#leave ?failed_to_start:None (self :> Source.source))
consumers

method private get_frame buf =
Expand Down

0 comments on commit a531bf4

Please sign in to comment.