42
42
from lava .magma .core .run_conditions import (AbstractRunCondition ,
43
43
RunContinuous , RunSteps )
44
44
from lava .magma .compiler .channels .watchdog import WatchdogManagerInterface
45
+ from multiprocessing import Queue
45
46
46
47
"""Defines a Runtime which takes a lava executable and a pluggable message
47
48
passing infrastructure (for instance multiprocessing+shared memory or ray in
@@ -94,13 +95,15 @@ def target_fn(*args, **kwargs):
94
95
"""
95
96
try :
96
97
builder = kwargs .pop ("builder" )
98
+ exception_q = kwargs .pop ('exception_q' )
97
99
actor = builder .build ()
100
+ # No exception occured
101
+ exception_q .put (None )
98
102
actor .start (* args , ** kwargs )
99
103
except Exception as e :
100
- print ("Encountered Fatal Exception: " + str (e ))
101
- print ("Traceback: " )
102
- print (traceback .format_exc ())
103
- raise e
104
+ e .trace = traceback .format_exc ()
105
+ exception_q .put (e )
106
+ raise (e )
104
107
105
108
106
109
class Runtime :
@@ -134,6 +137,7 @@ def __init__(self,
134
137
self .num_steps : int = 0
135
138
136
139
self ._watchdog_manager = None
140
+ self .exception_q = []
137
141
138
142
def __del__ (self ):
139
143
"""On destruction, terminate Runtime automatically to
@@ -160,6 +164,15 @@ def initialize(self, node_cfg_idx: int = 0):
160
164
self ._build_processes ()
161
165
self ._build_runtime_services ()
162
166
self ._start_ports ()
167
+
168
+ # Check if any exception was thrown
169
+ for q in self .exception_q :
170
+ e = q .get ()
171
+ if e :
172
+ print (str (e ), e .trace )
173
+ raise (e )
174
+ del self .exception_q
175
+
163
176
self .log .debug ("Runtime Initialization Complete" )
164
177
self ._is_initialized = True
165
178
@@ -293,17 +306,22 @@ def _build_processes(self):
293
306
if isinstance (proc_builder , PyProcessBuilder ):
294
307
# Assign current Runtime to process
295
308
proc ._runtime = self
309
+ exception_q = Queue ()
310
+ self .exception_q .append (exception_q )
296
311
self ._messaging_infrastructure .build_actor (target_fn ,
297
- proc_builder )
312
+ proc_builder ,
313
+ exception_q )
298
314
299
315
def _build_runtime_services (self ):
300
316
"""Builds the runtime services"""
301
317
runtime_service_builders = self ._executable .runtime_service_builders
302
318
if self ._executable .runtime_service_builders :
303
319
for _ , rs_builder in runtime_service_builders .items ():
320
+ self .exception_q .append (Queue ())
304
321
self ._messaging_infrastructure . \
305
322
build_actor (target_fn ,
306
- rs_builder )
323
+ rs_builder ,
324
+ self .exception_q [- 1 ])
307
325
308
326
def _get_resp_for_run (self ):
309
327
"""
0 commit comments