@@ -111,6 +111,7 @@ def __init__(self) -> None:
111
111
self ._lock = threading .Lock ()
112
112
self ._interrupted = False
113
113
self ._setup = False
114
+ self .triggered_by = None
114
115
115
116
@property
116
117
def interrupted (self ) -> bool :
@@ -145,10 +146,12 @@ def prepare(self, probes: List[Probe]) -> None:
145
146
now_count += 1
146
147
147
148
self .repeating_until = threading .Event ()
149
+ self .wait_for_interruption = threading .Event ()
148
150
self .now_all_done = threading .Barrier (parties = now_count + 1 )
149
151
self .now = ThreadPoolExecutor (max_workers = now_count or 1 )
150
152
self .once = ThreadPoolExecutor (max_workers = once_count or 1 )
151
153
self .repeating = ThreadPoolExecutor (max_workers = repeating_count or 1 )
154
+ self .interrupter = ThreadPoolExecutor (max_workers = 1 )
152
155
self ._setup = True
153
156
154
157
def run (self , experiment : Experiment , probes : List [Probe ],
@@ -161,6 +164,7 @@ def run(self, experiment: Experiment, probes: List[Probe],
161
164
or not), then this call blocks until all these pre-check safeguards
162
165
are completed.
163
166
"""
167
+ self .interrupter .submit (self ._wait_interruption )
164
168
for p in probes :
165
169
f = None
166
170
if p .get ("frequency" ):
@@ -186,6 +190,26 @@ def run(self, experiment: Experiment, probes: List[Probe],
186
190
# this allows the experiment to block until these are passed
187
191
self .now_all_done .wait ()
188
192
193
+ def interrupt_now (self , triggered_by : str ) -> None :
194
+ with self ._lock :
195
+ self .triggered_by = triggered_by
196
+
197
+ self .wait_for_interruption .set ()
198
+
199
+ def _wait_interruption (self ) -> None :
200
+ self .wait_for_interruption .wait ()
201
+
202
+ if not self .triggered_by :
203
+ return None
204
+
205
+ if not self .interrupted :
206
+ self .interrupted = True
207
+ if not experiment_finished .is_set ():
208
+ logger .critical (
209
+ "Safeguard '{}' triggered the end of the experiment" .format (
210
+ self .triggered_by ))
211
+ exit_gracefully ()
212
+
189
213
def _log_finished (self , f : Future , probe : Probe ) -> None :
190
214
"""
191
215
Logs each safeguard when they terminated.
@@ -206,10 +230,12 @@ def terminate(self) -> None:
206
230
if not self ._setup :
207
231
return None
208
232
233
+ self .wait_for_interruption .set ()
209
234
self .repeating_until .set ()
210
- self .now .shutdown (wait = True )
211
- self .repeating .shutdown (wait = True )
212
- self .once .shutdown (wait = True )
235
+ self .now .shutdown (wait = False , cancel_futures = False )
236
+ self .repeating .shutdown (wait = False , cancel_futures = False )
237
+ self .once .shutdown (wait = False , cancel_futures = False )
238
+ logger .debug ("Guardian is now terminated" )
213
239
214
240
215
241
guardian = Guardian ()
@@ -292,13 +318,8 @@ def interrupt_experiment_on_unhealthy_probe(guard: Guardian, probe: Probe,
292
318
checked = within_tolerance (
293
319
tolerance , run ["output" ], configuration = configuration ,
294
320
secrets = secrets )
295
- if not checked and not guard .interrupted :
296
- guard .interrupted = True
297
- if not experiment_finished .is_set ():
298
- logger .critical (
299
- "Safeguard '{}' triggered the end of the experiment" .format (
300
- probe ["name" ]))
301
- exit_gracefully ()
321
+ if not checked :
322
+ guard .interrupt_now (probe ["name" ])
302
323
303
324
304
325
def execute_activity (experiment : Experiment , probe : Probe ,
0 commit comments