Skip to content

Commit

Permalink
Handle exceptions in runtime service so unittest can catch them (#813)
Browse files Browse the repository at this point in the history
* handle exceptions in runtime service so unittest can catch them

* lint

* Update multiprocessing.py

Added typing

* Update multiprocessing.py

* Update multiprocessing.py

* add queue per actor

---------

Co-authored-by: PhilippPlank <32519998+PhilippPlank@users.noreply.github.com>
  • Loading branch information
weidel-p and PhilippPlank authored Nov 28, 2023
1 parent 0611064 commit b2c8134
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@ def start(self):

def build_actor(self, target_fn: ty.Callable, builder: ty.Union[
ty.Dict['AbstractProcess', 'PyProcessBuilder'], ty.Dict[
SyncDomain, 'RuntimeServiceBuilder']]) -> ty.Any:
SyncDomain, 'RuntimeServiceBuilder']],
exception_q: mp.Queue = None) -> ty.Any:
"""Given a target_fn starts a system (os) process"""
system_process = SystemProcess(target=target_fn,
args=(),
kwargs={"builder": builder})
kwargs={"builder": builder,
"exception_q": exception_q})
system_process.start()
self._actors.append(system_process)
return system_process
Expand Down
30 changes: 24 additions & 6 deletions src/lava/magma/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from lava.magma.core.run_conditions import (AbstractRunCondition,
RunContinuous, RunSteps)
from lava.magma.compiler.channels.watchdog import WatchdogManagerInterface
from multiprocessing import Queue

"""Defines a Runtime which takes a lava executable and a pluggable message
passing infrastructure (for instance multiprocessing+shared memory or ray in
Expand Down Expand Up @@ -94,13 +95,15 @@ def target_fn(*args, **kwargs):
"""
try:
builder = kwargs.pop("builder")
exception_q = kwargs.pop('exception_q')
actor = builder.build()
# No exception occured
exception_q.put(None)
actor.start(*args, **kwargs)
except Exception as e:
print("Encountered Fatal Exception: " + str(e))
print("Traceback: ")
print(traceback.format_exc())
raise e
e.trace = traceback.format_exc()
exception_q.put(e)
raise(e)


class Runtime:
Expand Down Expand Up @@ -134,6 +137,7 @@ def __init__(self,
self.num_steps: int = 0

self._watchdog_manager = None
self.exception_q = []

def __del__(self):
"""On destruction, terminate Runtime automatically to
Expand All @@ -160,6 +164,15 @@ def initialize(self, node_cfg_idx: int = 0):
self._build_processes()
self._build_runtime_services()
self._start_ports()

# Check if any exception was thrown
for q in self.exception_q:
e = q.get()
if e:
print(str(e), e.trace)
raise(e)
del self.exception_q

self.log.debug("Runtime Initialization Complete")
self._is_initialized = True

Expand Down Expand Up @@ -293,17 +306,22 @@ def _build_processes(self):
if isinstance(proc_builder, PyProcessBuilder):
# Assign current Runtime to process
proc._runtime = self
exception_q = Queue()
self.exception_q.append(exception_q)
self._messaging_infrastructure.build_actor(target_fn,
proc_builder)
proc_builder,
exception_q)

def _build_runtime_services(self):
"""Builds the runtime services"""
runtime_service_builders = self._executable.runtime_service_builders
if self._executable.runtime_service_builders:
for _, rs_builder in runtime_service_builders.items():
self.exception_q.append(Queue())
self._messaging_infrastructure. \
build_actor(target_fn,
rs_builder)
rs_builder,
self.exception_q[-1])

def _get_resp_for_run(self):
"""
Expand Down

0 comments on commit b2c8134

Please sign in to comment.