diff --git a/src/core/operators/cross.ml b/src/core/operators/cross.ml index 2313a75dd1..9a9eb6b508 100644 --- a/src/core/operators/cross.ml +++ b/src/core/operators/cross.ml @@ -116,7 +116,7 @@ class cross val_source ~duration_getter ~override_duration ~persist_override method private prepare_transition_source s = let s = (s :> source) in - s#get_ready ~dynamic:true [(self :> source)]; + s#get_ready [(self :> source)]; Clock.unify ~pos:self#pos source#clock s#clock; transition_source <- Some s @@ -124,24 +124,20 @@ class cross val_source ~duration_getter ~override_duration ~persist_override match transition_source with | None -> () | Some s -> - s#leave ~dynamic:true (self :> source); + s#leave (self :> source); transition_source <- None method! private wake_up a = self#reset_analysis; super#wake_up a; - source#get_ready ~dynamic:true [(self :> source)]; source#get_ready [(self :> source)]; - Lang.iter_sources - (fun s -> s#get_ready ~dynamic:true [(self :> source)]) - transition + source#get_ready [(self :> source)]; + Lang.iter_sources (fun s -> s#get_ready [(self :> source)]) transition method! private sleep = source#leave (self :> source); - s#leave ~dynamic:true (self :> source); - Lang.iter_sources - (fun s -> s#leave ~dynamic:true (self :> source)) - transition; + s#leave (self :> source); + Lang.iter_sources (fun s -> s#leave (self :> source)) transition; self#cleanup_transition_source (* in main time *) diff --git a/src/core/operators/dyn_op.ml b/src/core/operators/dyn_op.ml index ad78490e4e..b3b63848a9 100644 --- a/src/core/operators/dyn_op.ml +++ b/src/core/operators/dyn_op.ml @@ -73,14 +73,12 @@ class dyn ~init ~track_sensitive ~infallible ~resurection_time f = Lang.iter_sources (fun s -> Typing.(s#frame_type <: self#frame_type); - s#get_ready ~dynamic:true activation) + s#get_ready activation) f; self#select method! private sleep = - Lang.iter_sources - (fun s -> s#leave ~dynamic:true (self :> Source.source)) - f; + Lang.iter_sources (fun s -> s#leave (self :> Source.source)) f; self#unregister_source ~already_locked:false method private _is_ready ?frame () = diff --git a/src/core/operators/switch.ml b/src/core/operators/switch.ml index 99b998c03a..342480b295 100644 --- a/src/core/operators/switch.ml +++ b/src/core/operators/switch.ml @@ -97,19 +97,15 @@ class virtual switch ~name ~override_meta ~transition_length activation <- (self :> source) :: activator; List.iter (fun { transition; source = s; _ } -> - s#get_ready ~dynamic:true activation; - Lang.iter_sources - (fun s -> s#get_ready ~dynamic:true activation) - transition) + s#get_ready activation; + Lang.iter_sources (fun s -> s#get_ready activation) transition) cases method! private sleep = List.iter (fun { transition; source = s; _ } -> - s#leave ~dynamic:true (self :> source); - Lang.iter_sources - (fun s -> s#leave ~dynamic:true (self :> source)) - transition) + s#leave (self :> source); + Lang.iter_sources (fun s -> s#leave (self :> source)) transition) cases; if self#is_selected_generated then (Option.get selected).effective_source#leave (self :> source) diff --git a/src/core/source.ml b/src/core/source.ml index 29b77ee11f..34b9168c42 100644 --- a/src/core/source.ml +++ b/src/core/source.ml @@ -446,28 +446,15 @@ class virtual operator ?pos ?(name = "src") sources = consistency of the delivered stream chunk. Before that a source P accesses another source S it must activate it. The - activation can be static, or dynamic. A static activation means that - P may pull data from S at any time. A dynamic activation means that P - won't use S directly but may at some point build a source which will - access S. This dynamic creation may occur in the middle of an output - round, which is why S needs to know in advance, since in some cases it - might have to enter caching mode from the beginning of the round in case - the dynamic activation occurs. - An activation is identified by the path to the source which required it. It is possible that two identical activations are done, and they should not be treated as a single one. - In short, a source can avoid caching when: it has only one static - activation and all its dynamic activations are sub-paths of the static - one. When there is no static activation, there cannot be any access. - It is assumed that all streaming is done in one thread for a given clock, so the activation management API is not thread-safe. *) val mutable caching = false - val mutable dynamic_activations : operator list list = [] - val mutable static_activations : operator list list = [] + val mutable activations : operator list list = [] method private update_caching_mode = let string_of activations = @@ -476,23 +463,14 @@ class virtual operator ?pos ?(name = "src") sources = (fun l -> String.concat ":" (List.map (fun s -> s#id) l)) activations) in - self#log#debug "Activations changed: static=[%s], dynamic=[%s]." - (string_of static_activations) - (string_of dynamic_activations); + self#log#debug "Activations changed: static=[%s]." (string_of activations); (* Decide whether caching mode is needed, and why *) match if self#is_active then Some "active source" else ( - match static_activations with - | [] -> None - | [s] -> - if - List.exists - (fun d -> not (Utils.prefix (List.rev d) (List.rev s))) - dynamic_activations - then Some "possible dynamic activation" - else None + match activations with + | [] | _ :: [] -> None | _ -> Some "two static activations") with | None -> @@ -524,16 +502,15 @@ class virtual operator ?pos ?(name = "src") sources = The current implementation makes it dangerous to call #get_ready from another thread than the Root one, as interleaving with #get is forbidden. *) - method get_ready ?(dynamic = false) (activation : operator list) = + method get_ready (activation : operator list) = self#content_type_computation_allowed; if log == source_log then self#create_log; - if static_activations = [] && dynamic_activations = [] then ( + if activations = [] then ( source_log#info "Source %s gets up with content type: %s." id (Frame.string_of_content_type self#content_type); self#wake_up activation; List.iter (fun fn -> fn ()) on_wake_up); - if dynamic then dynamic_activations <- activation :: dynamic_activations - else static_activations <- activation :: static_activations; + activations <- activation :: activations; self#update_caching_mode val mutable on_sleep = [] @@ -546,7 +523,7 @@ class virtual operator ?pos ?(name = "src") sources = The current implementation makes it dangerous to call #leave from another thread than the Root one, as interleaving with #get is forbidden. *) - method leave ?(failed_to_start = true) ?(dynamic = false) src = + method leave ?(failed_to_start = true) src = let rec remove acc = function | [] when failed_to_start -> [] | [] -> @@ -555,15 +532,14 @@ class virtual operator ?pos ?(name = "src") sources = | (s :: _) :: tl when s = src -> List.rev_append acc tl | h :: tl -> remove (h :: acc) tl in - if dynamic then dynamic_activations <- remove [] dynamic_activations - else static_activations <- remove [] static_activations; + activations <- remove [] activations; self#update_caching_mode; - if static_activations = [] && dynamic_activations = [] then ( + if activations = [] then ( source_log#info "Source %s gets down." id; self#sleep; List.iter (fun fn -> try fn () with _ -> ()) on_sleep) - method is_up = static_activations <> [] || dynamic_activations <> [] + method is_up = activations <> [] (** Two methods called for initialization and shutdown of the source *) method private wake_up activation = @@ -571,12 +547,11 @@ class virtual operator ?pos ?(name = "src") sources = self#log#important "Content type is %s." (Frame.string_of_content_type self#content_type); let activation = (self :> operator) :: activation in - List.iter (fun s -> s#get_ready ?dynamic:None activation) sources + List.iter (fun s -> s#get_ready activation) sources method private sleep = List.iter - (fun s -> - s#leave ?failed_to_start:None ?dynamic:None (self :> operator)) + (fun s -> s#leave ?failed_to_start:None (self :> operator)) sources (** Streaming *) diff --git a/src/core/source.mli b/src/core/source.mli index a2b8ba95b2..76ec44b0e8 100644 --- a/src/core/source.mli +++ b/src/core/source.mli @@ -125,7 +125,7 @@ class virtual source : method private set_clock : unit (** The operator says to the source that he will ask it frames. It may be called multiple times. *) - method get_ready : ?dynamic:bool -> source list -> unit + method get_ready : source list -> unit (** Register a callback when wake_up is called. *) method on_wake_up : (unit -> unit) -> unit @@ -137,7 +137,7 @@ class virtual source : method private wake_up : source list -> unit (** Opposite of [get_ready] : the operator no longer needs the source. it may be called multiple times. *) - method leave : ?failed_to_start:bool -> ?dynamic:bool -> source -> unit + method leave : ?failed_to_start:bool -> source -> unit (** Register a callback when sleep is called. *) method on_sleep : (unit -> unit) -> unit diff --git a/src/core/tools/producer_consumer.ml b/src/core/tools/producer_consumer.ml index b7958750ab..876d11cdcc 100644 --- a/src/core/tools/producer_consumer.ml +++ b/src/core/tools/producer_consumer.ml @@ -107,14 +107,13 @@ class producer ?pos ?create_known_clock ~check_self_sync ~consumers ~name () = List.iter (fun c -> c#set_producer_buffer self#buffer; - c#get_ready ?dynamic:None [(self :> Source.source)]) + c#get_ready [(self :> Source.source)]) consumers method! sleep = super#sleep; List.iter - (fun c -> - c#leave ?failed_to_start:None ?dynamic:None (self :> Source.source)) + (fun c -> c#leave ?failed_to_start:None (self :> Source.source)) consumers method private get_frame buf =