diff --git a/src/lava/magma/core/process/ports/ports.py b/src/lava/magma/core/process/ports/ports.py index be16cf63a..986442f48 100644 --- a/src/lava/magma/core/process/ports/ports.py +++ b/src/lava/magma/core/process/ports/ports.py @@ -432,6 +432,17 @@ class OutPort(AbstractIOPort, AbstractSrcPort): sub processes. """ + def __init__(self, shape: ty.Tuple[int, ...]): + super().__init__(shape) + self.external_pipe_flag = False + self.external_pipe_buffer_size = 64 + + def flag_external_pipe(self, buffer_size=None): + self.external_pipe_flag = True + + if buffer_size is not None: + self.external_pipe_buffer_size = buffer_size + def connect( self, ports: ty.Union["AbstractIOPort", ty.List["AbstractIOPort"]], @@ -493,6 +504,15 @@ def __init__( super().__init__(shape) self._reduce_op = reduce_op + self.external_pipe_flag = False + self.external_pipe_buffer_size = 64 + + def flag_external_pipe(self, buffer_size=None): + self.external_pipe_flag = True + + if buffer_size is not None: + self.external_pipe_buffer_size = buffer_size + def connect(self, ports: ty.Union["InPort", ty.List["InPort"]], connection_configs: ty.Optional[ConnectionConfigs] = None): diff --git a/src/lava/magma/runtime/runtime.py b/src/lava/magma/runtime/runtime.py index f1922f3a3..507def566 100644 --- a/src/lava/magma/runtime/runtime.py +++ b/src/lava/magma/runtime/runtime.py @@ -26,7 +26,7 @@ if ty.TYPE_CHECKING: from lava.magma.core.process.process import AbstractProcess from lava.magma.compiler.channels.pypychannel import CspRecvPort, CspSendPort, \ - CspSelector + CspSelector, PyPyChannel from lava.magma.compiler.builders.channel_builder import ( ChannelBuilderMp, RuntimeChannelBuilderMp, ServiceChannelBuilderMp, ChannelBuilderPyNc) @@ -38,7 +38,7 @@ ChannelType from lava.magma.compiler.executable import Executable from lava.magma.compiler.node import NodeConfig -from lava.magma.core.process.ports.ports import create_port_id +from lava.magma.core.process.ports.ports import create_port_id, InPort, OutPort from lava.magma.core.run_conditions import (AbstractRunCondition, RunContinuous, RunSteps) from lava.magma.compiler.channels.watchdog import WatchdogManagerInterface @@ -308,6 +308,10 @@ def _build_processes(self): proc._runtime = self exception_q = Queue() self.exception_q.append(exception_q) + + # Create any external pypychannels + self._create_external_channels(proc, proc_builder) + self._messaging_infrastructure.build_actor(target_fn, proc_builder, exception_q) @@ -323,6 +327,44 @@ def _build_runtime_services(self): rs_builder, self.exception_q[-1]) + def _create_external_channels(self, + proc: AbstractProcess, + proc_builder: AbstractProcessBuilder): + """Creates a csp channel which can be connected to/from a + non-procss/Lava python environment. This enables I/O to Lava from + external sources.""" + for name, py_port in proc_builder.py_ports.items(): + port = getattr(proc, name) + + if port.external_pipe_flag: + if isinstance(port, InPort): + pypychannel = PyPyChannel( + message_infrastructure=self._messaging_infrastructure, + src_name="src", + dst_name=name, + shape=py_port.shape, + dtype=py_port.d_type, + size=port.external_pipe_buffer_size) + + proc_builder.set_csp_ports([pypychannel.dst_port]) + + port.external_pipe_csp_send_port = pypychannel.src_port + port.external_pipe_csp_send_port.start() + + if isinstance(port, OutPort): + pypychannel = PyPyChannel( + message_infrastructure=self._messaging_infrastructure, + src_name=name, + dst_name="dst", + shape=py_port.shape, + dtype=py_port.d_type, + size=port.external_pipe_buffer_size) + + proc_builder.set_csp_ports([pypychannel.src_port]) + + port.external_pipe_csp_recv_port = pypychannel.dst_port + port.external_pipe_csp_recv_port.start() + def _get_resp_for_run(self): """ Gets response from RuntimeServices diff --git a/src/lava/proc/io/extractor.py b/src/lava/proc/io/extractor.py index 1f0ca1cf8..7316ac17f 100644 --- a/src/lava/proc/io/extractor.py +++ b/src/lava/proc/io/extractor.py @@ -6,13 +6,13 @@ import typing as ty from lava.magma.core.process.process import AbstractProcess -from lava.magma.core.process.ports.ports import InPort, RefPort, Var +from lava.magma.core.process.ports.ports import InPort, OutPort, RefPort, Var from lava.magma.core.resources import CPU from lava.magma.core.decorator import implements, requires from lava.magma.core.model.py.model import PyLoihiProcessModel from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol from lava.magma.core.model.py.type import LavaPyType -from lava.magma.core.model.py.ports import PyInPort, PyRefPort +from lava.magma.core.model.py.ports import PyInPort, PyOutPort, PyRefPort from lava.magma.compiler.channels.pypychannel import PyPyChannel from lava.magma.runtime.message_infrastructure.multiprocessing import \ MultiProcessing @@ -51,9 +51,9 @@ class Extractor(AbstractProcess): def __init__(self, shape: ty.Tuple[int, ...], buffer_size: ty.Optional[int] = 50, - channel_config: ty.Optional[utils.ChannelConfig] = None) -> \ - None: - super().__init__() + channel_config: ty.Optional[utils.ChannelConfig] = None, + **kwargs) -> None: + super().__init__(shape_1=shape, **kwargs) channel_config = channel_config or utils.ChannelConfig() @@ -63,78 +63,65 @@ def __init__(self, self._shape = shape - self._multi_processing = MultiProcessing() - self._multi_processing.start() - - # Stands for ProcessModel to Process - pm_to_p = PyPyChannel(message_infrastructure=self._multi_processing, - src_name="src", - dst_name="dst", - shape=self._shape, - dtype=float, - size=buffer_size) - self._pm_to_p_dst_port = pm_to_p.dst_port - self._pm_to_p_dst_port.start() - self.proc_params["channel_config"] = channel_config - self.proc_params["pm_to_p_src_port"] = pm_to_p.src_port self._receive_when_empty = channel_config.get_receive_empty_function() self._receive_when_not_empty = \ channel_config.get_receive_not_empty_function() - self.in_port = InPort(shape=self._shape) + self.in_port = InPort(shape=shape) + self.out_port = OutPort(shape=shape) + self.out_port.flag_external_pipe(buffer_size=buffer_size) def receive(self) -> np.ndarray: """Receive data from the ProcessModel. - The data is received from pm_to_p.dst_port. + The data is received from out_port. Returns ---------- data : np.ndarray Data received. """ - elements_in_buffer = self._pm_to_p_dst_port._queue.qsize() + if not hasattr(self.out_port, 'external_pipe_csp_recv_port'): + raise AssertionError("The Runtime needs to be created before" + "calling . Please use the method " + " or on your Lava" + " network before using .") + + elements_in_buffer = \ + self.out_port.external_pipe_csp_recv_port._queue.qsize() if elements_in_buffer == 0: data = self._receive_when_empty( - self._pm_to_p_dst_port, + self.out_port.external_pipe_csp_recv_port, np.zeros(self._shape)) else: data = self._receive_when_not_empty( - self._pm_to_p_dst_port, + self.out_port.external_pipe_csp_recv_port, np.zeros(self._shape), elements_in_buffer) return data - def __del__(self) -> None: - super().__del__() - - self._multi_processing.stop() - self._pm_to_p_dst_port.join() - @implements(proc=Extractor, protocol=LoihiProtocol) @requires(CPU) class PyLoihiExtractorModel(PyLoihiProcessModel): + """PyLoihiProcessModel for the Extractor Process.""" in_port: PyInPort = LavaPyType(PyInPort.VEC_DENSE, float) + out_port: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, float) def __init__(self, proc_params: dict) -> None: super().__init__(proc_params=proc_params) channel_config = self.proc_params["channel_config"] - self._pm_to_p_src_port = self.proc_params["pm_to_p_src_port"] - self._pm_to_p_src_port.start() self._send = channel_config.get_send_full_function() def run_spk(self) -> None: - self._send(self._pm_to_p_src_port, self.in_port.recv()) - - def __del__(self) -> None: - self._pm_to_p_src_port.join() + self._send(self.out_port.csp_ports[-1], + self.in_port.recv()) class VarWire(AbstractProcess): diff --git a/src/lava/proc/io/injector.py b/src/lava/proc/io/injector.py index e0c4207e5..78d200fac 100644 --- a/src/lava/proc/io/injector.py +++ b/src/lava/proc/io/injector.py @@ -6,16 +6,13 @@ import typing as ty from lava.magma.core.process.process import AbstractProcess -from lava.magma.core.process.ports.ports import OutPort +from lava.magma.core.process.ports.ports import InPort, OutPort from lava.magma.core.resources import CPU from lava.magma.core.decorator import implements, requires from lava.magma.core.model.py.model import PyLoihiProcessModel from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol from lava.magma.core.model.py.type import LavaPyType -from lava.magma.core.model.py.ports import PyOutPort -from lava.magma.runtime.message_infrastructure.multiprocessing import \ - MultiProcessing -from lava.magma.compiler.channels.pypychannel import PyPyChannel +from lava.magma.core.model.py.ports import PyInPort, PyOutPort from lava.proc.io import utils @@ -47,13 +44,12 @@ class Injector(AbstractProcess): buffer is full and how the dst_port behaves when the buffer is empty and not empty. """ - def __init__(self, shape: ty.Tuple[int, ...], buffer_size: ty.Optional[int] = 50, - channel_config: ty.Optional[utils.ChannelConfig] = None) -> \ - None: - super().__init__() + channel_config: ty.Optional[utils.ChannelConfig] = None, + **kwargs) -> None: + super().__init__(shape_1=shape, **kwargs) channel_config = channel_config or utils.ChannelConfig() @@ -61,50 +57,43 @@ def __init__(self, utils.validate_buffer_size(buffer_size) utils.validate_channel_config(channel_config) - self._multi_processing = MultiProcessing() - self._multi_processing.start() - - # Stands for Process to ProcessModel - p_to_pm = PyPyChannel(message_infrastructure=self._multi_processing, - src_name="src", - dst_name="dst", - shape=shape, - dtype=float, - size=buffer_size) - self._p_to_pm_src_port = p_to_pm.src_port - self._p_to_pm_src_port.start() + self.in_port = InPort(shape=shape) + self.in_port.flag_external_pipe(buffer_size=buffer_size) + self.out_port = OutPort(shape=shape) self.proc_params["shape"] = shape self.proc_params["channel_config"] = channel_config - self.proc_params["p_to_pm_dst_port"] = p_to_pm.dst_port self._send = channel_config.get_send_full_function() - self.out_port = OutPort(shape=shape) - def send(self, data: np.ndarray) -> None: - """Send data to the ProcessModel. - - The data is sent through p_to_pm.src_port. + """Send data to connected process. Parameters ---------- data : np.ndarray Data to be sent. - """ - self._send(self._p_to_pm_src_port, data) - - def __del__(self) -> None: - super().__del__() - self._multi_processing.stop() - self._p_to_pm_src_port.join() + Raises + ------ + AssertionError + If the runtime of the Lava network was not created. + """ + # The csp channel is created by the runtime + if hasattr(self.in_port, 'external_pipe_csp_send_port'): + self._send(self.in_port.external_pipe_csp_send_port, data) + else: + raise AssertionError("The Runtime needs to be created before" + "calling . Please use the method " + " or on your Lava" + " network before using .") @implements(proc=Injector, protocol=LoihiProtocol) @requires(CPU) class PyLoihiInjectorModel(PyLoihiProcessModel): """PyLoihiProcessModel for the Injector Process.""" + in_port: PyInPort = LavaPyType(PyInPort.VEC_DENSE, float) out_port: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, float) def __init__(self, proc_params: dict) -> None: @@ -112,9 +101,6 @@ def __init__(self, proc_params: dict) -> None: shape = self.proc_params["shape"] channel_config = self.proc_params["channel_config"] - self._p_to_pm_dst_port = self.proc_params["p_to_pm_dst_port"] - self._p_to_pm_dst_port.start() - self._zeros = np.zeros(shape) self._receive_when_empty = channel_config.get_receive_empty_function() @@ -123,19 +109,16 @@ def __init__(self, proc_params: dict) -> None: def run_spk(self) -> None: self._zeros.fill(0) - elements_in_buffer = self._p_to_pm_dst_port._queue.qsize() + elements_in_buffer = self.in_port.csp_ports[-1]._queue.qsize() if elements_in_buffer == 0: data = self._receive_when_empty( - self._p_to_pm_dst_port, + self.in_port, self._zeros) else: data = self._receive_when_not_empty( - self._p_to_pm_dst_port, + self.in_port, self._zeros, elements_in_buffer) self.out_port.send(data) - - def __del__(self) -> None: - self._p_to_pm_dst_port.join() diff --git a/tests/lava/magma/runtime/test_external_pipe_io.py b/tests/lava/magma/runtime/test_external_pipe_io.py new file mode 100644 index 000000000..0e723fbad --- /dev/null +++ b/tests/lava/magma/runtime/test_external_pipe_io.py @@ -0,0 +1,96 @@ +# Copyright (C) 2022 Intel Corporation +# SPDX-License-Identifier: LGPL 2.1 or later +# See: https://spdx.org/licenses/ +import threading +import unittest +import typing as ty + +import numpy as np + +from lava.magma.core.decorator import implements, requires +from lava.magma.core.model.py.model import PyLoihiProcessModel +from lava.magma.core.model.py.ports import PyOutPort, PyInPort +from lava.magma.core.model.py.type import LavaPyType +from lava.magma.core.process.ports.ports import OutPort, InPort +from lava.magma.core.process.process import AbstractProcess +from lava.magma.core.resources import CPU +from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol +from lava.magma.core.run_conditions import RunSteps +from lava.magma.core.run_configs import Loihi2SimCfg + + +class Process1(AbstractProcess): + def __init__(self, shape_1: ty.Tuple, **kwargs): + super().__init__(shape_1=shape_1, **kwargs) + + self.in_1 = InPort(shape=shape_1) + self.out_1 = OutPort(shape=shape_1) + + +@implements(proc=Process1, protocol=LoihiProtocol) +@requires(CPU) +class LoihiDenseSpkPyProcess1PM(PyLoihiProcessModel): + in_1: PyInPort = LavaPyType(PyInPort.VEC_DENSE, float) + out_1: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, float) + + def __init__(self, proc_params): + super().__init__(proc_params) + + def run_spk(self): + print(f"Receiving in Process...") + data_1 = self.in_1.recv() + print(f"Received {data_1} in Process...") + + print(f"Sending {data_1} from Process...") + self.out_1.send(data_1) + print(f"Sent {data_1} from Process!") + + +class TestExternalPipeIO(unittest.TestCase): + def test_run_steps_non_blocking(self): + data = [[1], [2], [3], [4], [5]] + + relay = Process1(shape_1=(1,)) + # Control buffer size with buffer_size arg, default is 64 + relay.in_1.flag_external_pipe() + # Control buffer size with buffer_size arg, default is 64 + relay.out_1.flag_external_pipe() + + run_cfg = Loihi2SimCfg() + run_condition = RunSteps(num_steps=5, blocking=False) + + def thread_inject_fn() -> None: + for send_data_single_item in data: + print(f"Sending {send_data_single_item} from thread_inject...") + # Use probe() before send() to know whether or not send() will + # block (i.e if the buffer of external_pipe_csp_send_port + # is full). + relay.in_1.external_pipe_csp_send_port.send( + np.array(send_data_single_item)) + print(f"Sent {send_data_single_item} from thread_inject!") + + def thread_extract_fn() -> None: + for _ in range(len(data)): + print(f"Receiving in thread_extract...") + # Use probe() before recv() to know whether or not recv() will + # block (i.e if the buffer of external_pipe_csp_recv_port + # is empty). + received_data = relay.out_1.external_pipe_csp_recv_port.recv() + print(f"Received {received_data} in thread_extract!") + + thread_inject = threading.Thread(target=thread_inject_fn, + daemon=True) + thread_extract = threading.Thread(target=thread_extract_fn, + daemon=True) + + relay.run(condition=run_condition, run_cfg=run_cfg) + + thread_inject.start() + thread_extract.start() + + relay.wait() + relay.stop() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/lava/proc/io/test_extractor.py b/tests/lava/proc/io/test_extractor.py index d918c1e10..ed73ed902 100644 --- a/tests/lava/proc/io/test_extractor.py +++ b/tests/lava/proc/io/test_extractor.py @@ -61,9 +61,9 @@ def run_spk(self) -> None: class TestExtractor(unittest.TestCase): def test_init(self): """Test that the Extractor Process is instantiated correctly.""" - in_shape = (1,) + shape = (1,) - extractor = Extractor(shape=in_shape) + extractor = Extractor(shape=shape) self.assertIsInstance(extractor, Extractor) @@ -73,11 +73,10 @@ def test_init(self): self.assertEqual(config.receive_empty, utils.ReceiveEmpty.BLOCKING) self.assertEqual(config.receive_not_empty, utils.ReceiveNotEmpty.FIFO) - self.assertIsInstance(extractor.proc_params["pm_to_p_src_port"], - CspSendPort) - self.assertIsInstance(extractor.in_port, InPort) - self.assertEqual(extractor.in_port.shape, in_shape) + self.assertEqual(extractor.in_port.shape, shape) + self.assertIsInstance(extractor.out_port, OutPort) + self.assertEqual(extractor.out_port.shape, shape) def test_invalid_shape(self): """Test that instantiating the Extractor Process with an invalid @@ -142,20 +141,8 @@ class TestPyLoihiExtractorModel(unittest.TestCase): def test_init(self): """Test that the PyLoihiExtractorModel ProcessModel is instantiated correctly.""" - shape = (1, ) - buffer_size = 10 - multi_processing = MultiProcessing() - multi_processing.start() - channel = PyPyChannel(message_infrastructure=multi_processing, - src_name="src", - dst_name="dst", - shape=shape, - dtype=float, - size=buffer_size) - - proc_params = {"channel_config": utils.ChannelConfig(), - "pm_to_p_src_port": channel.src_port} + proc_params = {"channel_config": utils.ChannelConfig()} pm = PyLoihiExtractorModel(proc_params) @@ -295,6 +282,8 @@ def test_receive_data_receive_empty_blocking(self): run_condition = RunSteps(num_steps=num_steps) run_cfg = Loihi2SimCfg() + ex = extractor.compile(run_cfg=run_cfg) + extractor.create_runtime(run_cfg=run_cfg, executable=ex) shared_queue = Queue(2) @@ -345,7 +334,14 @@ def test_receive_data_receive_empty_non_blocking_zeros(self): extractor = Extractor(shape=data_shape, buffer_size=buffer_size, channel_config=channel_config) + run_cfg = Loihi2SimCfg() + run_condition = RunSteps(num_steps=1) + ex = extractor.compile(run_cfg=run_cfg) + extractor.create_runtime(run_cfg=run_cfg, executable=ex) + recv_data = extractor.receive() + extractor.run(condition=run_condition) + extractor.stop() np.testing.assert_equal(recv_data, np.zeros(data_shape)) @@ -440,6 +436,8 @@ def test_run_steps_blocking(self): run_condition = RunSteps(num_steps=num_steps) run_cfg = Loihi2SimCfg() + ex = extractor.compile(run_cfg=run_cfg) + extractor.create_runtime(run_cfg=run_cfg, executable=ex) shared_queue = Queue(num_steps) @@ -532,9 +530,6 @@ def test_init(self): self.assertEqual(config.receive_empty, utils.ReceiveEmpty.BLOCKING) self.assertEqual(config.receive_not_empty, utils.ReceiveNotEmpty.FIFO) - self.assertIsInstance(listener.proc_params["pm_to_p_src_port"], - CspSendPort) - self.assertIsInstance(listener.wire_tap, RefPort) self.assertEqual(listener.wire_tap.shape, lif.u.shape) diff --git a/tests/lava/proc/io/test_injector.py b/tests/lava/proc/io/test_injector.py index a200d705d..df999bb59 100644 --- a/tests/lava/proc/io/test_injector.py +++ b/tests/lava/proc/io/test_injector.py @@ -79,9 +79,6 @@ def test_init(self): self.assertEqual(config.receive_empty, utils.ReceiveEmpty.BLOCKING) self.assertEqual(config.receive_not_empty, utils.ReceiveNotEmpty.FIFO) - self.assertIsInstance(injector.proc_params["p_to_pm_dst_port"], - CspRecvPort) - self.assertIsInstance(injector.out_port, OutPort) self.assertEqual(injector.out_port.shape, out_shape) @@ -212,6 +209,8 @@ def _test_send_full_policy(send_full: utils.SendFull) \ run_condition = RunSteps(num_steps=num_steps) run_cfg = Loihi2SimCfg() + ex = injector.compile(run_cfg=run_cfg) + injector.create_runtime(run_cfg=run_cfg, executable=ex) shared_queue = Queue(2) @@ -299,6 +298,8 @@ def test_send_data_receive_empty_blocking(self): run_condition = RunSteps(num_steps=num_steps) run_cfg = Loihi2SimCfg() + ex = injector.compile(run_cfg=run_cfg) + injector.create_runtime(run_cfg=run_cfg, executable=ex) injector.send(np.ones(data_shape)) injector.run(condition=run_condition, run_cfg=run_cfg) @@ -405,6 +406,8 @@ def _test_receive_not_empty_policy(receive_not_empty: utils.ReceiveNotEmpty, run_condition = RunSteps(num_steps=num_steps) run_cfg = Loihi2SimCfg() + ex = injector.compile(run_cfg=run_cfg) + injector.create_runtime(run_cfg=run_cfg, executable=ex) injector.send(send_data[0]) injector.send(send_data[1]) @@ -457,6 +460,8 @@ def test_run_steps_blocking(self): run_condition = RunSteps(num_steps=num_steps) run_cfg = Loihi2SimCfg() + ex = injector.compile(run_cfg=run_cfg) + injector.create_runtime(run_cfg=run_cfg, executable=ex) def thread_2_fn() -> None: for send_data_single_item in data: @@ -468,6 +473,7 @@ def thread_2_fn() -> None: injector.run(condition=run_condition, run_cfg=run_cfg) recv_var_data = recv.var.get() injector.stop() + thread_2.join() np.testing.assert_equal(recv_var_data, data)