diff --git a/src/lava/magma/runtime/runtime.py b/src/lava/magma/runtime/runtime.py index 7a182a82c..1829ba7ed 100644 --- a/src/lava/magma/runtime/runtime.py +++ b/src/lava/magma/runtime/runtime.py @@ -10,7 +10,6 @@ import typing as ty import numpy as np -from lava.magma.core.model.py.ports import PyInPort, PyOutPort from scipy.sparse import csr_matrix from lava.magma.compiler.var_model import AbstractVarModel, LoihiSynapseVarModel from lava.magma.core.process.message_interface_enum import ActorType @@ -310,38 +309,9 @@ def _build_processes(self): exception_q = Queue() self.exception_q.append(exception_q) - for name, py_port in proc_builder.py_ports.items(): - port = getattr(proc, name) - - if port.external_pipe_flag: - if isinstance(port, InPort): - pypychannel = PyPyChannel( - message_infrastructure=self._messaging_infrastructure, - src_name="src", - dst_name=name, - shape=py_port.shape, - dtype=py_port.d_type, - size=64) - - proc_builder.set_csp_ports([pypychannel.dst_port]) - - port.external_pipe_csp_send_port = pypychannel.src_port - port.external_pipe_csp_send_port.start() - - if isinstance(port, OutPort): - pypychannel = PyPyChannel( - message_infrastructure=self._messaging_infrastructure, - src_name=name, - dst_name="dst", - shape=py_port.shape, - dtype=py_port.d_type, - size=64) - - proc_builder.set_csp_ports([pypychannel.src_port]) - - port.external_pipe_csp_recv_port = pypychannel.dst_port - port.external_pipe_csp_recv_port.start() - + # Create any external pypychannels + self._create_external_channels(proc, proc_builder) + self._messaging_infrastructure.build_actor(target_fn, proc_builder, exception_q) @@ -357,6 +327,44 @@ def _build_runtime_services(self): rs_builder, self.exception_q[-1]) + def _create_external_channels(self, + proc: AbstractProcess, + proc_builder: AbstractProcessBuilder): + """Creates a csp channel which can be connected to/from a + non-procss/Lava python environment. This enables I/O to Lava from + external sources.""" + for name, py_port in proc_builder.py_ports.items(): + port = getattr(proc, name) + + if port.external_pipe_flag: + if isinstance(port, InPort): + pypychannel = PyPyChannel( + message_infrastructure=self._messaging_infrastructure, + src_name="src", + dst_name=name, + shape=py_port.shape, + dtype=py_port.d_type, + size=port.external_pipe_buffer_size) + + proc_builder.set_csp_ports([pypychannel.dst_port]) + + port.external_pipe_csp_send_port = pypychannel.src_port + port.external_pipe_csp_send_port.start() + + if isinstance(port, OutPort): + pypychannel = PyPyChannel( + message_infrastructure=self._messaging_infrastructure, + src_name=name, + dst_name="dst", + shape=py_port.shape, + dtype=py_port.d_type, + size=port.external_pipe_buffer_size) + + proc_builder.set_csp_ports([pypychannel.src_port]) + + port.external_pipe_csp_recv_port = pypychannel.dst_port + port.external_pipe_csp_recv_port.start() + def _get_resp_for_run(self): """ Gets response from RuntimeServices diff --git a/src/lava/proc/io/extractor.py b/src/lava/proc/io/extractor.py index 1f0ca1cf8..95468f3c1 100644 --- a/src/lava/proc/io/extractor.py +++ b/src/lava/proc/io/extractor.py @@ -6,13 +6,13 @@ import typing as ty from lava.magma.core.process.process import AbstractProcess -from lava.magma.core.process.ports.ports import InPort, RefPort, Var +from lava.magma.core.process.ports.ports import InPort, OutPort, RefPort, Var from lava.magma.core.resources import CPU from lava.magma.core.decorator import implements, requires from lava.magma.core.model.py.model import PyLoihiProcessModel from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol from lava.magma.core.model.py.type import LavaPyType -from lava.magma.core.model.py.ports import PyInPort, PyRefPort +from lava.magma.core.model.py.ports import PyInPort, PyOutPort, PyRefPort from lava.magma.compiler.channels.pypychannel import PyPyChannel from lava.magma.runtime.message_infrastructure.multiprocessing import \ MultiProcessing @@ -51,9 +51,9 @@ class Extractor(AbstractProcess): def __init__(self, shape: ty.Tuple[int, ...], buffer_size: ty.Optional[int] = 50, - channel_config: ty.Optional[utils.ChannelConfig] = None) -> \ - None: - super().__init__() + channel_config: ty.Optional[utils.ChannelConfig] = None, + **kwargs) -> None: + super().__init__(shape_1=shape, **kwargs) channel_config = channel_config or utils.ChannelConfig() @@ -63,78 +63,64 @@ def __init__(self, self._shape = shape - self._multi_processing = MultiProcessing() - self._multi_processing.start() - - # Stands for ProcessModel to Process - pm_to_p = PyPyChannel(message_infrastructure=self._multi_processing, - src_name="src", - dst_name="dst", - shape=self._shape, - dtype=float, - size=buffer_size) - self._pm_to_p_dst_port = pm_to_p.dst_port - self._pm_to_p_dst_port.start() - self.proc_params["channel_config"] = channel_config - self.proc_params["pm_to_p_src_port"] = pm_to_p.src_port self._receive_when_empty = channel_config.get_receive_empty_function() self._receive_when_not_empty = \ channel_config.get_receive_not_empty_function() - self.in_port = InPort(shape=self._shape) + self.in_port = InPort(shape=shape) + self.out_port = OutPort(shape=shape) + self.out_port.flag_external_pipe(buffer_size=buffer_size) def receive(self) -> np.ndarray: """Receive data from the ProcessModel. - The data is received from pm_to_p.dst_port. + The data is received from out_port. Returns ---------- data : np.ndarray Data received. """ - elements_in_buffer = self._pm_to_p_dst_port._queue.qsize() + if not hasattr(self.out_port, 'external_pipe_csp_recv_port'): + raise AssertionError("The Runtime needs to be created before" + "calling . Please use the method " + " or on your Lava" + " network before using .") + + elements_in_buffer = self.out_port.external_pipe_csp_recv_port._queue.qsize() if elements_in_buffer == 0: data = self._receive_when_empty( - self._pm_to_p_dst_port, + self.out_port.external_pipe_csp_recv_port, np.zeros(self._shape)) else: data = self._receive_when_not_empty( - self._pm_to_p_dst_port, + self.out_port.external_pipe_csp_recv_port, np.zeros(self._shape), elements_in_buffer) return data - def __del__(self) -> None: - super().__del__() - - self._multi_processing.stop() - self._pm_to_p_dst_port.join() - @implements(proc=Extractor, protocol=LoihiProtocol) @requires(CPU) class PyLoihiExtractorModel(PyLoihiProcessModel): + """PyLoihiProcessModel for the Extractor Process.""" in_port: PyInPort = LavaPyType(PyInPort.VEC_DENSE, float) + out_port: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, float) def __init__(self, proc_params: dict) -> None: super().__init__(proc_params=proc_params) channel_config = self.proc_params["channel_config"] - self._pm_to_p_src_port = self.proc_params["pm_to_p_src_port"] - self._pm_to_p_src_port.start() self._send = channel_config.get_send_full_function() def run_spk(self) -> None: - self._send(self._pm_to_p_src_port, self.in_port.recv()) - - def __del__(self) -> None: - self._pm_to_p_src_port.join() + self._send(self.out_port.csp_ports[-1], + self.in_port.recv()) class VarWire(AbstractProcess): diff --git a/src/lava/proc/io/injector.py b/src/lava/proc/io/injector.py index e0c4207e5..a40662a06 100644 --- a/src/lava/proc/io/injector.py +++ b/src/lava/proc/io/injector.py @@ -6,16 +6,13 @@ import typing as ty from lava.magma.core.process.process import AbstractProcess -from lava.magma.core.process.ports.ports import OutPort +from lava.magma.core.process.ports.ports import InPort, OutPort from lava.magma.core.resources import CPU from lava.magma.core.decorator import implements, requires from lava.magma.core.model.py.model import PyLoihiProcessModel from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol from lava.magma.core.model.py.type import LavaPyType -from lava.magma.core.model.py.ports import PyOutPort -from lava.magma.runtime.message_infrastructure.multiprocessing import \ - MultiProcessing -from lava.magma.compiler.channels.pypychannel import PyPyChannel +from lava.magma.core.model.py.ports import PyInPort, PyOutPort from lava.proc.io import utils @@ -47,13 +44,12 @@ class Injector(AbstractProcess): buffer is full and how the dst_port behaves when the buffer is empty and not empty. """ - def __init__(self, shape: ty.Tuple[int, ...], buffer_size: ty.Optional[int] = 50, - channel_config: ty.Optional[utils.ChannelConfig] = None) -> \ - None: - super().__init__() + channel_config: ty.Optional[utils.ChannelConfig] = None, + **kwargs) -> None: + super().__init__(shape_1=shape, **kwargs) channel_config = channel_config or utils.ChannelConfig() @@ -61,50 +57,43 @@ def __init__(self, utils.validate_buffer_size(buffer_size) utils.validate_channel_config(channel_config) - self._multi_processing = MultiProcessing() - self._multi_processing.start() - - # Stands for Process to ProcessModel - p_to_pm = PyPyChannel(message_infrastructure=self._multi_processing, - src_name="src", - dst_name="dst", - shape=shape, - dtype=float, - size=buffer_size) - self._p_to_pm_src_port = p_to_pm.src_port - self._p_to_pm_src_port.start() + self.in_port = InPort(shape=shape) + self.in_port.flag_external_pipe(buffer_size=buffer_size) + self.out_port = OutPort(shape=shape) self.proc_params["shape"] = shape self.proc_params["channel_config"] = channel_config - self.proc_params["p_to_pm_dst_port"] = p_to_pm.dst_port self._send = channel_config.get_send_full_function() - - self.out_port = OutPort(shape=shape) - + def send(self, data: np.ndarray) -> None: - """Send data to the ProcessModel. - - The data is sent through p_to_pm.src_port. + """Send data to connected process. Parameters ---------- data : np.ndarray Data to be sent. + + Raises + ------ + AssertionError + If the runtime of the Lava network was not created. """ - self._send(self._p_to_pm_src_port, data) - - def __del__(self) -> None: - super().__del__() - - self._multi_processing.stop() - self._p_to_pm_src_port.join() + # The csp channel is created by the runtime + if hasattr(self.in_port, 'external_pipe_csp_send_port'): + self._send(self.in_port.external_pipe_csp_send_port, data) + else: + raise AssertionError("The Runtime needs to be created before" + "calling . Please use the method " + " or on your Lava" + " network before using .") @implements(proc=Injector, protocol=LoihiProtocol) @requires(CPU) class PyLoihiInjectorModel(PyLoihiProcessModel): """PyLoihiProcessModel for the Injector Process.""" + in_port: PyInPort = LavaPyType(PyInPort.VEC_DENSE, float) out_port: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, float) def __init__(self, proc_params: dict) -> None: @@ -112,9 +101,6 @@ def __init__(self, proc_params: dict) -> None: shape = self.proc_params["shape"] channel_config = self.proc_params["channel_config"] - self._p_to_pm_dst_port = self.proc_params["p_to_pm_dst_port"] - self._p_to_pm_dst_port.start() - self._zeros = np.zeros(shape) self._receive_when_empty = channel_config.get_receive_empty_function() @@ -123,19 +109,16 @@ def __init__(self, proc_params: dict) -> None: def run_spk(self) -> None: self._zeros.fill(0) - elements_in_buffer = self._p_to_pm_dst_port._queue.qsize() + elements_in_buffer = self.in_port.csp_ports[-1]._queue.qsize() if elements_in_buffer == 0: data = self._receive_when_empty( - self._p_to_pm_dst_port, + self.in_port, self._zeros) else: data = self._receive_when_not_empty( - self._p_to_pm_dst_port, + self.in_port, self._zeros, elements_in_buffer) self.out_port.send(data) - - def __del__(self) -> None: - self._p_to_pm_dst_port.join() diff --git a/tests/lava/proc/io/test_extractor.py b/tests/lava/proc/io/test_extractor.py index d918c1e10..ed73ed902 100644 --- a/tests/lava/proc/io/test_extractor.py +++ b/tests/lava/proc/io/test_extractor.py @@ -61,9 +61,9 @@ def run_spk(self) -> None: class TestExtractor(unittest.TestCase): def test_init(self): """Test that the Extractor Process is instantiated correctly.""" - in_shape = (1,) + shape = (1,) - extractor = Extractor(shape=in_shape) + extractor = Extractor(shape=shape) self.assertIsInstance(extractor, Extractor) @@ -73,11 +73,10 @@ def test_init(self): self.assertEqual(config.receive_empty, utils.ReceiveEmpty.BLOCKING) self.assertEqual(config.receive_not_empty, utils.ReceiveNotEmpty.FIFO) - self.assertIsInstance(extractor.proc_params["pm_to_p_src_port"], - CspSendPort) - self.assertIsInstance(extractor.in_port, InPort) - self.assertEqual(extractor.in_port.shape, in_shape) + self.assertEqual(extractor.in_port.shape, shape) + self.assertIsInstance(extractor.out_port, OutPort) + self.assertEqual(extractor.out_port.shape, shape) def test_invalid_shape(self): """Test that instantiating the Extractor Process with an invalid @@ -142,20 +141,8 @@ class TestPyLoihiExtractorModel(unittest.TestCase): def test_init(self): """Test that the PyLoihiExtractorModel ProcessModel is instantiated correctly.""" - shape = (1, ) - buffer_size = 10 - multi_processing = MultiProcessing() - multi_processing.start() - channel = PyPyChannel(message_infrastructure=multi_processing, - src_name="src", - dst_name="dst", - shape=shape, - dtype=float, - size=buffer_size) - - proc_params = {"channel_config": utils.ChannelConfig(), - "pm_to_p_src_port": channel.src_port} + proc_params = {"channel_config": utils.ChannelConfig()} pm = PyLoihiExtractorModel(proc_params) @@ -295,6 +282,8 @@ def test_receive_data_receive_empty_blocking(self): run_condition = RunSteps(num_steps=num_steps) run_cfg = Loihi2SimCfg() + ex = extractor.compile(run_cfg=run_cfg) + extractor.create_runtime(run_cfg=run_cfg, executable=ex) shared_queue = Queue(2) @@ -345,7 +334,14 @@ def test_receive_data_receive_empty_non_blocking_zeros(self): extractor = Extractor(shape=data_shape, buffer_size=buffer_size, channel_config=channel_config) + run_cfg = Loihi2SimCfg() + run_condition = RunSteps(num_steps=1) + ex = extractor.compile(run_cfg=run_cfg) + extractor.create_runtime(run_cfg=run_cfg, executable=ex) + recv_data = extractor.receive() + extractor.run(condition=run_condition) + extractor.stop() np.testing.assert_equal(recv_data, np.zeros(data_shape)) @@ -440,6 +436,8 @@ def test_run_steps_blocking(self): run_condition = RunSteps(num_steps=num_steps) run_cfg = Loihi2SimCfg() + ex = extractor.compile(run_cfg=run_cfg) + extractor.create_runtime(run_cfg=run_cfg, executable=ex) shared_queue = Queue(num_steps) @@ -532,9 +530,6 @@ def test_init(self): self.assertEqual(config.receive_empty, utils.ReceiveEmpty.BLOCKING) self.assertEqual(config.receive_not_empty, utils.ReceiveNotEmpty.FIFO) - self.assertIsInstance(listener.proc_params["pm_to_p_src_port"], - CspSendPort) - self.assertIsInstance(listener.wire_tap, RefPort) self.assertEqual(listener.wire_tap.shape, lif.u.shape) diff --git a/tests/lava/proc/io/test_injector.py b/tests/lava/proc/io/test_injector.py index a200d705d..df999bb59 100644 --- a/tests/lava/proc/io/test_injector.py +++ b/tests/lava/proc/io/test_injector.py @@ -79,9 +79,6 @@ def test_init(self): self.assertEqual(config.receive_empty, utils.ReceiveEmpty.BLOCKING) self.assertEqual(config.receive_not_empty, utils.ReceiveNotEmpty.FIFO) - self.assertIsInstance(injector.proc_params["p_to_pm_dst_port"], - CspRecvPort) - self.assertIsInstance(injector.out_port, OutPort) self.assertEqual(injector.out_port.shape, out_shape) @@ -212,6 +209,8 @@ def _test_send_full_policy(send_full: utils.SendFull) \ run_condition = RunSteps(num_steps=num_steps) run_cfg = Loihi2SimCfg() + ex = injector.compile(run_cfg=run_cfg) + injector.create_runtime(run_cfg=run_cfg, executable=ex) shared_queue = Queue(2) @@ -299,6 +298,8 @@ def test_send_data_receive_empty_blocking(self): run_condition = RunSteps(num_steps=num_steps) run_cfg = Loihi2SimCfg() + ex = injector.compile(run_cfg=run_cfg) + injector.create_runtime(run_cfg=run_cfg, executable=ex) injector.send(np.ones(data_shape)) injector.run(condition=run_condition, run_cfg=run_cfg) @@ -405,6 +406,8 @@ def _test_receive_not_empty_policy(receive_not_empty: utils.ReceiveNotEmpty, run_condition = RunSteps(num_steps=num_steps) run_cfg = Loihi2SimCfg() + ex = injector.compile(run_cfg=run_cfg) + injector.create_runtime(run_cfg=run_cfg, executable=ex) injector.send(send_data[0]) injector.send(send_data[1]) @@ -457,6 +460,8 @@ def test_run_steps_blocking(self): run_condition = RunSteps(num_steps=num_steps) run_cfg = Loihi2SimCfg() + ex = injector.compile(run_cfg=run_cfg) + injector.create_runtime(run_cfg=run_cfg, executable=ex) def thread_2_fn() -> None: for send_data_single_item in data: @@ -468,6 +473,7 @@ def thread_2_fn() -> None: injector.run(condition=run_condition, run_cfg=run_cfg) recv_var_data = recv.var.get() injector.stop() + thread_2.join() np.testing.assert_equal(recv_var_data, data)