diff --git a/src/lava/magma/runtime/message_infrastructure/multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/multiprocessing.py index 7c9e5aed9..5e3fd6b00 100644 --- a/src/lava/magma/runtime/message_infrastructure/multiprocessing.py +++ b/src/lava/magma/runtime/message_infrastructure/multiprocessing.py @@ -113,11 +113,13 @@ def start(self): def build_actor(self, target_fn: ty.Callable, builder: ty.Union[ ty.Dict['AbstractProcess', 'PyProcessBuilder'], ty.Dict[ - SyncDomain, 'RuntimeServiceBuilder']]) -> ty.Any: + SyncDomain, 'RuntimeServiceBuilder']], + exception_q: mp.Queue = None) -> ty.Any: """Given a target_fn starts a system (os) process""" system_process = SystemProcess(target=target_fn, args=(), - kwargs={"builder": builder}) + kwargs={"builder": builder, + "exception_q": exception_q}) system_process.start() self._actors.append(system_process) return system_process diff --git a/src/lava/magma/runtime/runtime.py b/src/lava/magma/runtime/runtime.py index d5dde5a92..f1922f3a3 100644 --- a/src/lava/magma/runtime/runtime.py +++ b/src/lava/magma/runtime/runtime.py @@ -42,6 +42,7 @@ from lava.magma.core.run_conditions import (AbstractRunCondition, RunContinuous, RunSteps) from lava.magma.compiler.channels.watchdog import WatchdogManagerInterface +from multiprocessing import Queue """Defines a Runtime which takes a lava executable and a pluggable message passing infrastructure (for instance multiprocessing+shared memory or ray in @@ -94,13 +95,15 @@ def target_fn(*args, **kwargs): """ try: builder = kwargs.pop("builder") + exception_q = kwargs.pop('exception_q') actor = builder.build() + # No exception occured + exception_q.put(None) actor.start(*args, **kwargs) except Exception as e: - print("Encountered Fatal Exception: " + str(e)) - print("Traceback: ") - print(traceback.format_exc()) - raise e + e.trace = traceback.format_exc() + exception_q.put(e) + raise(e) class Runtime: @@ -134,6 +137,7 @@ def __init__(self, self.num_steps: int = 0 self._watchdog_manager = None + self.exception_q = [] def __del__(self): """On destruction, terminate Runtime automatically to @@ -160,6 +164,15 @@ def initialize(self, node_cfg_idx: int = 0): self._build_processes() self._build_runtime_services() self._start_ports() + + # Check if any exception was thrown + for q in self.exception_q: + e = q.get() + if e: + print(str(e), e.trace) + raise(e) + del self.exception_q + self.log.debug("Runtime Initialization Complete") self._is_initialized = True @@ -293,17 +306,22 @@ def _build_processes(self): if isinstance(proc_builder, PyProcessBuilder): # Assign current Runtime to process proc._runtime = self + exception_q = Queue() + self.exception_q.append(exception_q) self._messaging_infrastructure.build_actor(target_fn, - proc_builder) + proc_builder, + exception_q) def _build_runtime_services(self): """Builds the runtime services""" runtime_service_builders = self._executable.runtime_service_builders if self._executable.runtime_service_builders: for _, rs_builder in runtime_service_builders.items(): + self.exception_q.append(Queue()) self._messaging_infrastructure. \ build_actor(target_fn, - rs_builder) + rs_builder, + self.exception_q[-1]) def _get_resp_for_run(self): """